In [None]:
import os
import asyncio
from typing import List, Literal, Optional
from pydantic import BaseModel, Field

from autogen_agentchat.agents import AssistantAgent, UserProxyAgent
from autogen_agentchat.messages import TextMessage, StructuredMessage
from autogen_agentchat.ui import Console
from autogen_core import CancellationToken
from autogen_ext.models.openai import OpenAIChatCompletionClient

# Define structured output models for our agents
class PersonaResearchResult(BaseModel):
    """Result of persona research"""
    persona_name: str = Field(description="Name of the target persona")
    demographics: str = Field(description="Demographics of the target persona")
    behaviors: str = Field(description="Behaviors and habits of the target persona")
    needs: str = Field(description="Needs and goals of the target persona")
    online_communities: List[str] = Field(description="Online communities where this persona can be found")
    reasoning: str = Field(description="Reasoning behind identifying this persona")

class PainPointAnalysisResult(BaseModel):
    """Result of pain point analysis"""
    pain_points: List[str] = Field(description="List of identified pain points")
    categorization: dict = Field(description="Categorization of pain points")
    prioritization: List[str] = Field(description="Prioritized list of pain points")
    examples: List[str] = Field(description="Real-world examples of these pain points")
    reasoning: str = Field(description="Reasoning behind the analysis")

class CompetitorAnalysisResult(BaseModel):
    """Result of competitor analysis"""
    competitors: List[str] = Field(description="List of key competitors")
    strengths: dict = Field(description="Strengths of each competitor")
    weaknesses: dict = Field(description="Weaknesses of each competitor")
    positioning: dict = Field(description="Market positioning of each competitor")
    differentiation_opportunities: List[str] = Field(description="Opportunities for differentiation")
    reasoning: str = Field(description="Reasoning behind the analysis")

class MarketResearchReport(BaseModel):
    """Final market research report"""
    target_persona: PersonaResearchResult = Field(description="Target persona information")
    pain_points: PainPointAnalysisResult = Field(description="Pain point analysis")
    competitors: CompetitorAnalysisResult = Field(description="Competitor analysis")
    recommendations: List[str] = Field(description="Recommendations based on the research")
    summary: str = Field(description="Executive summary of the research")

# Create model client
model_client = OpenAIChatCompletionClient(
    model="gpt-4o",
    api_key=os.getenv("OPENAI_API_KEY"),  # Uncomment and set your API key if needed
)

# Create the agents with properly defined tools
from autogen_core.tools import FunctionTool

import os
from tavily import TavilyClient, MissingAPIKeyError, InvalidAPIKeyError, UsageLimitExceededError, BadRequestError

# Initialize Tavily client with proper error handling
def get_tavily_client():
    # tavily_api_key = os.getenv("TAVILY_API_KEY")
    tavily_api_key = ""
    if not tavily_api_key:
        print("Warning: TAVILY_API_KEY environment variable not set. Using mock data.")
        return None
    return TavilyClient(api_key=tavily_api_key)

tavily_client = get_tavily_client()

async def web_search(query: str, search_depth: str = "basic") -> str:
    """
    Search the web for information using Tavily API
    
    Args:
        query: The search query
        search_depth: Either "basic" (faster, 1 credit) or "advanced" (more thorough, 2 credits)
    
    Returns:
        Formatted search results as a string
    """
    if not tavily_client:
        # Fallback to mock data if client initialization failed
        return f"Mock search results for: {query}\n- Result 1: Example information\n- Result 2: More example data"
    
    try:
        # Convert to synchronous call since Tavily doesn't have async API
        response = tavily_client.search(
            query=query,
            search_depth=search_depth,
            include_answer=True,
            max_results=5
        )
        
        results = f"Search results for: {query}\n\n"
        
        # Add the AI-generated answer if available
        if response.get("answer"):
            results += f"Summary: {response.get('answer')}\n\n"
        
        results += "Sources:\n"
        for i, result in enumerate(response.get("results", []), 1):
            title = result.get("title", "No title")
            url = result.get("url", "No URL")
            content_snippet = result.get("content", "")[:200] + "..." if result.get("content") else ""
            
            results += f"{i}. {title}\n"
            results += f"   URL: {url}\n"
            results += f"   Snippet: {content_snippet}\n\n"
        
        return results
    except (MissingAPIKeyError, InvalidAPIKeyError) as e:
        return f"API key error: {str(e)}\nPlease check your Tavily API key configuration."
    except UsageLimitExceededError:
        return f"Tavily API usage limit exceeded. Consider upgrading your plan."
    except Exception as e:
        return f"Error performing search: {str(e)}"

async def find_communities(persona: str, platform: str = None) -> str:
    """
    Find online communities for a specific persona
    
    Args:
        persona: The target persona/audience
        platform: Optional specific platform to search for communities
    
    Returns:
        List of relevant communities
    """
    if not tavily_client:
        # Fallback to mock data
        communities = f"Communities for {persona}:\n"
        communities += f"- Reddit r/{persona.lower().replace(' ', '')}\n"
        communities += "- LinkedIn groups for professionals\n"
        communities += "- Facebook industry groups\n"
        return communities
    
    try:
        platform_str = f"on {platform}" if platform else ""
        query = f"online communities for {persona} {platform_str} discussion forums groups"
        
        response = tavily_client.search(
            query=query,
            search_depth="basic",
            include_answer=True,
            max_results=5
        )
        
        results = f"Communities for {persona}{' on ' + platform if platform else ''}:\n\n"
        
        # Add the AI-generated answer if available
        if response.get("answer"):
            results += f"{response.get('answer')}\n\n"
        
        results += "Sources:\n"
        for i, result in enumerate(response.get("results", []), 1):
            title = result.get("title", "No title")
            url = result.get("url", "No URL")
            
            results += f"{i}. {title}\n"
            results += f"   URL: {url}\n\n"
        
        return results
    except Exception as e:
        # Fallback to mock data if search fails
        fallback = f"Communities for {persona}:\n"
        fallback += f"- Reddit r/{persona.lower().replace(' ', '')}\n"
        fallback += "- LinkedIn groups for professionals\n"
        fallback += "- Facebook industry groups\n"
        fallback += f"Note: Using fallback data due to search error: {str(e)}"
        return fallback

async def industry_trends(industry: str, timeframe: str = "recent") -> str:
    """
    Search for industry trends in a specific timeframe
    
    Args:
        industry: The industry to research
        timeframe: Time period for trends (recent, yearly, etc.)
    
    Returns:
        Formatted trend information
    """
    if not tavily_client:
        return f"Mock {timeframe} trends in {industry} industry:\n- Trend 1: Example trend\n- Trend 2: Another trend"
    
    try:
        query = f"latest trends in {industry} industry {timeframe}"
        response = tavily_client.search(
            query=query,
            search_depth="basic",
            include_answer=True,
            max_results=5
        )
        
        results = f"{timeframe.capitalize()} trends in {industry}:\n\n"
        
        # Add the AI-generated answer if available
        if response.get("answer"):
            results += f"{response.get('answer')}\n\n"
        
        results += "Sources:\n"
        for i, result in enumerate(response.get("results", []), 1):
            title = result.get("title", "No title")
            url = result.get("url", "No URL")
            
            results += f"{i}. {title}\n"
            results += f"   URL: {url}\n\n"
        
        return results
    except Exception as e:
        return f"Error searching for industry trends: {str(e)}"

# Create function tools with the updated implementations
web_search_tool = FunctionTool(web_search, description="Search the web for information", strict=True)
find_communities_tool = FunctionTool(find_communities, description="Find online communities for a specific persona", strict=True)
industry_trends_tool = FunctionTool(industry_trends, description="Research trends in a specific industry", strict=True)

# Create the agents
market_research_manager = UserProxyAgent(
    name="Market_Research_Manager",
    description="You are a market research manager responsible for coordinating research tasks and presenting findings. You'll work with specialized agents to conduct comprehensive market research.",
)

persona_researcher = AssistantAgent(
    name="Persona_Researcher",
    model_client=model_client,
    tools=[web_search_tool, find_communities_tool],  # Use the strict function tools
    system_message="You are an expert in identifying target personas and gathering information about their demographics, behaviors, and needs. Use web search to find relevant information and identify online communities where these personas hang out.",
    output_content_type=PersonaResearchResult,
)

pain_point_analyzer = AssistantAgent(
    name="PainPoint_Analyzer",
    model_client=model_client,
    tools=[web_search_tool],  # Use the strict function tool
    system_message="You are skilled at analyzing text data to identify common pain points, frustrations, and unmet needs. Analyze discussions in online communities to identify common pain points, categorize them, and prioritize based on frequency and severity.",
    output_content_type=PainPointAnalysisResult,
)

competitor_analyzer = AssistantAgent(
    name="Competitor_Analyzer",
    model_client=model_client,
    tools=[web_search_tool],  # Use the strict function tool
    system_message="You are an expert in analyzing competitor websites and marketing materials to identify their strengths, weaknesses, and positioning. Identify key competitors in the market and analyze their offerings to find opportunities for differentiation.",
    output_content_type=CompetitorAnalysisResult,
)

# Main function to run the market research workflow
async def conduct_market_research(industry: str, product_service: str):
    # Initialize the cancellation token
    cancellation_token = CancellationToken()
    
    print("=== Starting Market Research Process ===")
    
    # Step 1: Persona Research
    print("\n=== Step 1: Persona Research ===")
    persona_task = f"Identify potential target personas for a {product_service} in the {industry} industry. Research their demographics, behaviors, and needs."
    persona_response = await persona_researcher.on_messages(
        [TextMessage(content=persona_task, source=market_research_manager.name)],
        cancellation_token=cancellation_token,
    )
    persona_result = persona_response.chat_message.content
    print(f"Persona Research Complete: {persona_result.persona_name}")
    
    # Step 2: Pain Point Analysis
    print("\n=== Step 2: Pain Point Analysis ===")
    pain_point_task = f"Analyze the pain points for the {persona_result.persona_name} persona in the {industry} industry, particularly related to {product_service}. Use the communities identified: {', '.join(persona_result.online_communities)}"
    pain_point_response = await pain_point_analyzer.on_messages(
        [TextMessage(content=pain_point_task, source=market_research_manager.name)],
        cancellation_token=cancellation_token,
    )
    pain_point_result = pain_point_response.chat_message.content
    print(f"Pain Point Analysis Complete: {len(pain_point_result.pain_points)} pain points identified")
    
    # Step 3: Competitor Analysis
    print("\n=== Step 3: Competitor Analysis ===")
    competitor_task = f"Identify and analyze key competitors offering {product_service} to {persona_result.persona_name} in the {industry} industry. Focus on their strengths, weaknesses, and positioning."
    competitor_response = await competitor_analyzer.on_messages(
        [TextMessage(content=competitor_task, source=market_research_manager.name)],
        cancellation_token=cancellation_token,
    )
    competitor_result = competitor_response.chat_message.content
    print(f"Competitor Analysis Complete: {len(competitor_result.competitors)} competitors analyzed")
    
    # Step 4: Generate Final Report
    print("\n=== Step 4: Generating Final Report ===")
    final_report = MarketResearchReport(
        target_persona=persona_result,
        pain_points=pain_point_result,
        competitors=competitor_result,
        recommendations=[
            f"Focus on solving the top pain point: {pain_point_result.prioritization[0]}",
            f"Differentiate from competitors by: {competitor_result.differentiation_opportunities[0]}",
            "Develop targeted messaging that resonates with the persona's needs",
            "Create content that addresses specific pain points identified"
        ],
        summary=f"This market research report identifies {persona_result.persona_name} as the primary target persona for {product_service} in the {industry} industry. The research uncovered {len(pain_point_result.pain_points)} key pain points and analyzed {len(competitor_result.competitors)} major competitors, revealing several opportunities for differentiation and market positioning."
    )
    
    print("\n=== Market Research Complete ===")
    return final_report

# Example usage
async def main():
    industry = "professional services"
    product_service = "business technology optimization consulting"
    
    report = await conduct_market_research(industry, product_service)
    
    # Print the final report in a structured format
    print("\n=== FINAL MARKET RESEARCH REPORT ===")
    print(f"EXECUTIVE SUMMARY:\n{report.summary}\n")
    
    print("TARGET PERSONA:")
    print(f"Name: {report.target_persona.persona_name}")
    print(f"Demographics: {report.target_persona.demographics}")
    print(f"Behaviors: {report.target_persona.behaviors}")
    print(f"Needs: {report.target_persona.needs}")
    print(f"Online Communities: {', '.join(report.target_persona.online_communities)}")
    
    print("\nKEY PAIN POINTS:")
    for i, pain in enumerate(report.pain_points.prioritization[:3], 1):
        print(f"{i}. {pain}")
    
    print("\nCOMPETITOR ANALYSIS:")
    for comp in report.competitors.competitors[:3]:
        print(f"- {comp}")
        print(f"  Strengths: {report.competitors.strengths.get(comp, 'N/A')}")
        print(f"  Weaknesses: {report.competitors.weaknesses.get(comp, 'N/A')}")
    
    print("\nRECOMMENDATIONS:")
    for i, rec in enumerate(report.recommendations, 1):
        print(f"{i}. {rec}")
    
    # Close the model client
    await model_client.close()

# For Jupyter notebooks, provide a ready-to-use function
async def run_research():
    await main()
    
# Only use asyncio.run when running as a script
if __name__ == "__main__":
    asyncio.run(main())
else:
    print("Market Research Agent initialized. Run the following to execute:")
    print("await run_research()")

In [None]:
await run_research()