# Competitors research agent
Steps for competitors research:
1. Find list of competitors  
2. Go to their website  
3. Collect information including:  
	- standout features
	- product and pricing tiers
	- unique service proposition
	- marketing messages  
4. Analyze competitors  
	- identify common patterns  
	- spot potential gaps  
	- compare pricing strategies  
	- compare messaging themes

In [1]:
import logging
import os
import re
from typing import List, Dict, Any
import json
from datetime import datetime
import asyncio
from dotenv import load_dotenv

from google import genai

In [2]:
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [3]:
class RateLimiter:
    def __init__(self, calls_per_minute: int):
        self.calls_per_minute = calls_per_minute
        self.calls = []
        
    async def wait_if_needed(self):
        now = datetime.now()
        self.calls = [call for call in self.calls 
                     if (now - call).total_seconds() < 60]
        
        if len(self.calls) >= self.calls_per_minute:
            sleep_time = 60 - (now - self.calls[0]).total_seconds()
            await asyncio.sleep(sleep_time)
            
        self.calls.append(now)

In [4]:
class Config:
    def __init__(self):
        self.gemini_client = genai.Client(api_key=os.getenv('GOOGLE_API_KEY'))
        self.rate_limiter = RateLimiter(calls_per_minute=50)

In [5]:
class BaseAgent:
    def __init__(self, config: Config):
        self.config = config
        self.system_prompt = ""
        self.role = ""
        self.goal = ""
        self.backstory = ""

    async def execute(self, input_data: Any) -> Any:
        await self.config.rate_limiter.wait_if_needed()
        
        prompt = f"""
        Role: {self.role}
        Goal: {self.goal}
        Backstory: {self.backstory}
        System Instructions: {self.system_prompt}
        
        Input Data:
        {json.dumps(input_data, indent=2)}
        
        Please provide your analysis based on the above information.
        """
        
        try:
            response = self.config.gemini_client.models.generate_content(
                model="gemini-2.0-flash-exp",
                contents=prompt
            )
            
            return response.text
            
        except Exception as e:
            logger.error(f"Error in agent execution: {str(e)}")
            raise

In [6]:
class MarketIntelligenceScout(BaseAgent):
    def __init__(self, config: Config):
        super().__init__(config)
        self.role = "Expert market researcher specializing in competitor identification"
        self.goal = "Identify and categorize the most relevant competitors"
        self.backstory = "Former market research director with 15 years of experience"
        self.system_prompt = """
        1. Search for top competitors in the given market
        2. Return results in JSON format with the following structure:
        {
            "competitors": [
                {
                    "name": "",
                    "website": "",
                    "market_segment": "",
                    "threat_level": ""
                }
            ]
        }
        """

In [7]:
class DigitalProductAnalyst(BaseAgent):
    def __init__(self, config: Config):
        super().__init__(config)
        self.role = "Product analysis specialist"
        self.goal = "Analyze product features and capabilities"
        self.backstory = "Previously a product manager at major tech companies"
        self.system_prompt = """
        Analyze each competitor's product features and capabilities.
        Return results in JSON format:
        {
            "competitor_name": {
                "key_features": [],
                "unique_capabilities": [],
                "user_experience": "",
                "product_maturity": ""
            }
        }
        """

In [8]:
class MarketingMessageDecoder(BaseAgent):
    def __init__(self, config: Config):
        super().__init__(config)
        self.role = "Marketing communications analyst"
        self.goal = "Decode and analyze competitors' marketing strategies"
        self.backstory = "Former copywriter turned marketing strategist"
        self.system_prompt = """
        Analyze marketing messages and positioning.
        Return results in JSON format:
        {
            "competitor_name": {
                "value_propositions": [],
                "messaging_tone": "",
                "target_audience": "",
                "unique_selling_points": []
            }
        }
        """

In [9]:
class TechnicalFeatureComparator(BaseAgent):
    def __init__(self, config: Config):
        super().__init__(config)
        self.role = "Technical analyst specializing in feature comparison"
        self.goal = "Provide detailed technical comparison of competitor products"
        self.backstory = "Senior solutions architect with cross-industry experience"
        self.system_prompt = """
        Compare technical features across competitors.
        Return results in JSON format:
        {
            "competitor_name": {
                "tech_stack": [],
                "api_capabilities": [],
                "scalability_features": [],
                "technical_advantages": [],
                "technical_limitations": []
            }
        }
        """

In [10]:
class PricingStrategySpecialist(BaseAgent):
    def __init__(self, config: Config):
        super().__init__(config)
        self.role = "Pricing analysis expert"
        self.goal = "Analyze and compare pricing models and strategies"
        self.backstory = "Former pricing consultant in SaaS industry"
        self.system_prompt = """
        Analyze pricing strategies and models.
        Return results in JSON format:
        {
            "competitor_name": {
                "pricing_tiers": [],
                "pricing_model": "",
                "discount_strategies": [],
                "pricing_positioning": ""
            }
        }
        """

In [11]:
class CompetitiveStrategyAnalyst(BaseAgent):
    def __init__(self, config: Config):
        super().__init__(config)
        self.role = "Strategic analyst specializing in competitive analysis"
        self.goal = "Synthesize competitive intelligence into strategic insights"
        self.backstory = "Strategy consultant from major consulting firms"
        self.system_prompt = """
        Synthesize all competitive data into strategic insights.
        Return results in JSON format:
        {
            "market_patterns": [],
            "competitive_advantages": {},
            "market_gaps": [],
            "strategic_recommendations": [],
            "threat_assessment": {}
        }
        """

In [12]:
class CompetitiveIntelligenceReportSpecialist(BaseAgent):
    def __init__(self, config: Config):
        super().__init__(config)
        self.role = "Report creation specialist"
        self.goal = "Create clear, actionable reports from competitive analysis"
        self.backstory = "Communications expert in data visualization"
        self.system_prompt = """
        Create a comprehensive report from all analyses.
        Return results in JSON format:
        {
            "executive_summary": "",
            "key_findings": [],
            "detailed_analysis": {},
            "recommendations": [],
            "market_overview": "",
            "appendix": {}
        }
        """

In [13]:
class CompetitiveAnalysisWorkflow:
    def __init__(self, config: Config):
        self.config = config
        self.market_scout = MarketIntelligenceScout(config)
        self.product_analyst = DigitalProductAnalyst(config)
        self.marketing_decoder = MarketingMessageDecoder(config)
        self.technical_comparator = TechnicalFeatureComparator(config)
        self.pricing_specialist = PricingStrategySpecialist(config)
        self.competitive_analyst = CompetitiveStrategyAnalyst(config)
        self.report_specialist = CompetitiveIntelligenceReportSpecialist(config)

    async def run_parallel_analysis(self, competitors_data: Dict) -> Dict[str, Any]:
        """Run parallel analysis tasks"""
        tasks = [
            self.product_analyst.execute(competitors_data),
            self.marketing_decoder.execute(competitors_data),
            self.technical_comparator.execute(competitors_data),
            self.pricing_specialist.execute(competitors_data)
        ]
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        return {
            "product_analysis": results[0],
            "marketing_analysis": results[1],
            "technical_analysis": results[2],
            "pricing_analysis": results[3]
        }

    async def execute_workflow(self, market_segment: str) -> Dict:
        try:
            logger.info(f"Starting competitive analysis for {market_segment}")
            
            # Step 1: Identify competitors
            competitors_data = await self.market_scout.execute({"market_segment": market_segment})
            logger.info("Completed competitor identification")

            # Step 2: Run parallel analysis
            parallel_results = await self.run_parallel_analysis(competitors_data)
            logger.info("Completed parallel analysis")

            # Step 3: Strategic analysis
            strategic_analysis = await self.competitive_analyst.execute({
                "competitors_data": competitors_data,
                "parallel_results": parallel_results
            })
            logger.info("Completed strategic analysis")

            # Step 4: Generate report
            final_report = await self.report_specialist.execute({
                "strategic_analysis": strategic_analysis,
                "raw_data": {
                    "competitors": competitors_data,
                    "analysis": parallel_results
                }
            })
            logger.info("Completed final report generation")

            return final_report

        except Exception as e:
            logger.error(f"Error in workflow execution: {str(e)}")
            raise

In [14]:
async def test_workflow():
    config = Config()
    workflow = CompetitiveAnalysisWorkflow(config)
    result = await workflow.execute_workflow("AI Development Platforms")
    print("\nFinal Result:")
    print(json.dumps(result, indent=2))
    return result


In [15]:
result = await test_workflow()

INFO:__main__:Starting competitive analysis for AI Development Platforms
INFO:root:AFC is enabled with max remote calls: 10.
INFO:__main__:Completed competitor identification
INFO:root:AFC is enabled with max remote calls: 10.
INFO:root:AFC is enabled with max remote calls: 10.
INFO:root:AFC is enabled with max remote calls: 10.
INFO:root:AFC is enabled with max remote calls: 10.
INFO:__main__:Completed parallel analysis
INFO:root:AFC is enabled with max remote calls: 10.
INFO:__main__:Completed strategic analysis
INFO:root:AFC is enabled with max remote calls: 10.
INFO:__main__:Completed final report generation



Final Result:
"```json\n{\n    \"executive_summary\": \"The competitive landscape for AI development platforms is dominated by major cloud providers (Google, Amazon, Microsoft), each offering comprehensive, integrated solutions. These platforms leverage their existing ecosystems to provide scalable infrastructure, pre-built algorithms, and AutoML capabilities. Open-source libraries like TensorFlow and PyTorch are highly influential, forming the foundation for much of the AI development. End-to-end platforms such as Dataiku and RapidMiner focus on user-friendliness and collaboration, catering to less technical users. The market shows gaps in addressing explainable AI, industry-specific tools, and ease of use for non-technical users. A key strategic need is to balance ease of use with customizability, bridging the gap between open-source flexibility and enterprise-grade manageability.\",\n    \"key_findings\": [\n        \"Cloud-based platforms from Google, Amazon, and Microsoft are lea

In [18]:
def save_json_to_markdown(json_str, output_file="output.md"):
    """
    Converts a JSON string to a Markdown formatted string and saves it to a file.
    Handles incomplete JSON strings due to potential truncation.

    Args:
        json_str: A string containing potentially incomplete JSON data.
        output_file: The name of the file to save the markdown output to. Defaults to "output.md".

    Returns:
        A string containing the Markdown representation of the JSON data, or an error message.
        Also saves the markdown output to the specified file if successful.
    """
    # Remove the ```json\n and \n from the string
    json_str = json_str.replace("```json\n", "").strip()

    # 1. Check for Incomplete JSON
    if not json_str:
        return "Error: Empty JSON string provided."

    # Basic check for closing brace or bracket
    if not (json_str.endswith('}') or json_str.endswith(']')):
        # More robust check using regex
        if not re.search(r'(\{|\[)\s*([^\}\]]*\s*)*(\}|\])\s*$', json_str):
            # 2. Attempt to Repair the JSON
            if json_str.count('{') > json_str.count('}'):
                json_str += '}'
            elif json_str.count('[') > json_str.count(']'):
                json_str += ']'
            else:
                # Attempt to truncate to last valid structure
                try:
                    last_valid_index = max(json_str.rfind('{'), json_str.rfind('['))
                    if last_valid_index != -1:
                        json_str = json_str[:last_valid_index + 1]
                        if not (json_str.endswith('}') or json_str.endswith(']')):
                            return "Error: Incomplete JSON string, could not repair."
                    else:
                        return "Error: Incomplete JSON string, could not repair."
                except:
                    return "Error: Incomplete JSON string, could not repair."

    try:
        data = json.loads(json_str)
    except json.JSONDecodeError as e:
        return f"Error: Invalid JSON string after repair: {e}"

    markdown_output = ""

    def process_item(key, value, level=0):
        nonlocal markdown_output
        indent = "  " * level
        if isinstance(value, dict):
            markdown_output += f"{indent}**{key.replace('_', ' ').title()}:**\n"
            for k, v in value.items():
                process_item(k, v, level + 1)
        elif isinstance(value, list):
            markdown_output += f"{indent}**{key.replace('_', ' ').title()}:**\n"
            for item in value:
                if isinstance(item, dict):
                    process_item("item", item, level + 1)
                else:
                    markdown_output += f"{indent}  - {item}\n"
        else:
            markdown_output += f"{indent}**{key.replace('_', ' ').title()}:** {value}\n"

    for key, value in data.items():
        process_item(key, value)

    # Save to file
    try:
        with open(output_file, "w", encoding="utf-8") as f:
            f.write(markdown_output)
        print(f"Markdown output saved to {output_file}")
    except Exception as e:
        print(f"Error saving to file: {e}")

    return markdown_output

In [None]:
save_json_to_markdown(result, "AI Development Platforms Report.md")

NameError: name 'save_json_to_markdown' is not defined

KeyboardInterrupt: 