In [1]:
# Import required libraries
import os
from google.adk.agents import LlmAgent
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.adk.tools.agent_tool import AgentTool
from google.adk.tools import google_search
from google.adk.tools import FunctionTool
from google.genai import types
import json
import datetime
import time
import traceback
from tqdm.notebook import tqdm
from IPython.display import display, Markdown, HTML

# Import settings from your config module
from app.core.config import settings

# Set up Gemini API key from settings
os.environ["GOOGLE_API_KEY"] = settings.GEMINI_GENERATE_KEY
os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "FALSE"  # Explicitly disable Vertex AI authentication

# Helper functions for displaying research progress
def format_time():
    """Return a formatted timestamp for logging."""
    return datetime.datetime.now().strftime("%H:%M:%S")

def display_log(message, agent_name=None, message_type="info", include_timestamp=True):
    """Display a formatted log message in the notebook."""
    colors = {
        "info": "#3498db",  # Blue
        "search": "#e67e22",  # Orange
        "result": "#2ecc71",  # Green
        "error": "#e74c3c",  # Red
        "warning": "#f39c12",  # Amber
        "manager": "#9b59b6",  # Purple
        "search_agent": "#f1c40f",  # Yellow
        "system": "#7f8c8d",  # Gray
        "debug": "#1abc9c"   # Teal
    }
    
    if agent_name:
        if agent_name.lower() in ["research_manager", "manager"]:
            message_type = "manager"
        elif agent_name.lower() in ["search_agent", "search"]:
            message_type = "search_agent"
    
    color = colors.get(message_type, colors["info"])
    prefix = f"[{format_time()}] " if include_timestamp else ""
    if agent_name:
        prefix += f"[{agent_name}] "
    
    html = f"<div style='margin-bottom: 10px;'><span style='color: {color}; font-weight: bold;'>{prefix}</span> {message}</div>"
    display(HTML(html))
    
    # Also log to console for backup
    console_prefix = f"[{format_time()}] " if include_timestamp else ""
    if agent_name:
        console_prefix += f"[{agent_name}] "
    print(f"{console_prefix}{message}")

def display_report(report, title="Research Report"):
    """Display a formatted research report in the notebook."""
    display(Markdown(f"## {title}"))
    display(Markdown(report))
    
    # Also log to console for backup
    print(f"\n{'='*50}\n{title}\n{'='*50}\n{report}\n{'='*50}\n")

def display_separator(title=None):
    """Display a visual separator in the logs."""
    if title:
        display_log(f"{'='*20} {title} {'='*20}", message_type="system", include_timestamp=False)
    else:
        display_log(f"{'='*50}", message_type="system", include_timestamp=False)

def save_research_results(results, filename=None):
    """Save research results to a JSON file."""
    if filename is None:
        words = results["question"].split()[:5]
        safe_words = [word.lower() for word in words if word.isalnum()]
        timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
        filename = f"research_{'_'.join(safe_words)}_{timestamp}.json"
    
    with open(filename, 'w', encoding='utf-8') as f:
        json.dump(results, f, indent=2, ensure_ascii=False)
    
    display_log(f"Research results saved to {filename}", message_type="system")
    
    # Log the content that was saved
    display_log(f"Report saved ({len(results['report'])} chars, {len(results['steps'])} steps)", message_type="debug")
    if not results["report"]:
        display_log(f"WARNING: Empty report saved to file!", message_type="warning")
    
    return filename

# Create a function to generate specific research queries
def generate_research_queries(topic: str) -> dict:
    """
    Generate specific research queries for a topic.
    
    Args:
        topic: The main research topic or question
        
    Returns:
        dict: A dictionary with recommended search queries
    """
    display_log(f"Generating research queries for: {topic}", message_type="system")
    
    # Return a template structure
    return {
        "status": "generated",
        "message": "Please use these query templates with the search_agent tool.",
        "instructions": """
        REQUIRED STEPS:
        1. For each query template below, replace it with a specific, detailed search query
        2. Call search_agent(query="your specific query") for EACH of the 5 queries
        3. You MUST make exactly 5 separate search calls
        
        Replace each template with a specific query related to the topic.
        """,
        "query_templates": [
            f"basic explanation of {topic} for children",
            f"how does {topic} work simple explanation",
            f"parts of {topic} and their functions",
            f"{topic} simple diagram explanation",
            f"{topic} example of how it's used"
        ]
    }

# Function to finalize a research report
def finalize_report(research_findings: str, target_audience: str) -> dict:
    """
    Finalize and format a research report.
    
    Args:
        research_findings: The compiled research findings
        target_audience: Who the report is intended for (e.g., "children", "adults", "experts")
        
    Returns:
        dict: Report formatting instructions
    """
    if not research_findings or len(research_findings.strip()) < 100:
        display_log(f"WARNING: Research findings too short ({len(research_findings) if research_findings else 0} chars)", message_type="warning")
        return {
            "status": "error",
            "message": "Research findings are too short. Please provide more comprehensive findings."
        }
    
    display_log(f"Finalizing report for {target_audience} audience", message_type="system")
    
    # Return report formatting template
    return {
        "status": "ready_to_finalize",
        "message": "Please format your final report according to these instructions.",
        "instructions": f"""
        FINAL REPORT FORMAT:
        
        Your report MUST begin with: "### FINAL RESEARCH REPORT ###"
        
        Structure your report with these sections:
        1. Introduction - What is {research_findings[:30]}... and why it's interesting
        2. How It Works - Simple explanation tailored for {target_audience}
        3. Key Parts - The main components and what they do
        4. Real World Examples - How it's used in everyday situations
        5. Fun Facts - Interesting tidbits discovered in your research
        
        Your report MUST end with: "### END OF REPORT ###"
        
        Important:
        - Include citations to sources throughout
        - Use language appropriate for {target_audience}
        - Include bullet points and simple explanations
        """
    }

# Application constants
APP_NAME = "deep_research_app"
USER_ID = "user1234"
SESSION_ID = f"session_{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}"

# 1. Create the Search Agent with direct access to the google_search tool
search_agent = LlmAgent(
    name="search_agent",
    model="gemini-2.0-flash",
    description="Agent specialized in web search and information retrieval",
    instruction="""
    You are a specialized web search agent that performs targeted research.
    
    PROCESS:
    1. You will receive a specific search query from the research manager
    2. Execute the search using the google_search tool with EXACTLY that query
    3. Analyze the search results and extract relevant information
    4. Format your response in a clear, structured way
    
    RESPONSE FORMAT:
    Always structure your response as follows:
    
    === SEARCH RESULTS ===
    QUERY: [the exact query you searched for]
    
    SUMMARY:
    [2-3 sentence overview of what you found]
    
    KEY FINDINGS:
    • [First important point with source]
    • [Second important point with source]
    • [Continue with additional points]
    
    SOURCES:
    1. [Name/title of source]: [Brief description]
    2. [Name/title of source]: [Brief description]
    
    === END RESULTS ===
    
    IMPORTANT:
    - You MUST use the google_search tool with the EXACT query provided
    - If the query is empty or unclear, ask for clarification
    - Always cite sources for your information
    - Focus on being informative and complete
    """,
    tools=[google_search]  # Only the search agent has access to google_search
)

# 2. Create the Agent Tool wrapper for the search agent
search_agent_tool = AgentTool(
    agent=search_agent
)

# 3. Create the Research Manager Agent
manager_agent = LlmAgent(
    name="research_manager",
    model="gemini-2.5-flash-preview-04-17",
    description="Manager agent that coordinates deep research processes",
    instruction="""
    You are a research manager responsible for thoroughly researching topics and creating comprehensive reports.
    
    MANDATORY WORKFLOW - You MUST follow these steps in order:
    
    1. PLANNING PHASE:
       - When given a research topic, call generate_research_queries(topic)
       - Use the provided query templates to create 5 SPECIFIC search queries
    
    2. RESEARCH PHASE:
       - For EACH of your 5 queries, call: search_agent(query="your specific query")
       - Make exactly 5 separate search_agent calls with different queries
       - Review each set of results before making the next search
    
    3. SYNTHESIS PHASE:
       - Compile and synthesize all the information gathered
       - Call finalize_report() with your compiled findings and the target audience
    
    4. REPORT CREATION PHASE:
       - Create your final report following the provided format EXACTLY
       - Your report MUST begin with "### FINAL RESEARCH REPORT ###"
       - Your report MUST end with "### END OF REPORT ###"
       - Include all key information with proper citations
    
    CRITICAL RULES:
    - You CANNOT answer any question directly without completing ALL research steps
    - ALL of your search queries must be specific and detailed
    - Your final report MUST follow the exact format specified
    - You MUST make exactly 5 distinct search_agent calls
    
    Remember that your goal is to create a comprehensive, accurate report based on actual search results, not your pre-existing knowledge.
    """,
    tools=[
        FunctionTool(generate_research_queries),
        FunctionTool(finalize_report),
        search_agent_tool
    ]
)

# Set up session service and runner
session_service = InMemorySessionService()
runner = Runner(
    agent=manager_agent,
    app_name=APP_NAME,
    session_service=session_service
)
session = session_service.create_session(
    app_name=APP_NAME,
    user_id=USER_ID,
    session_id=SESSION_ID
)

# Main function to run deep research
def run_deep_research(research_question, audience="children"):
    """
    Execute deep research on a given question using the manager-search agent system.
    
    Args:
        research_question: The main research question or topic
        audience: Target audience for the report (default: "children")
        
    Returns:
        dict: Research results including steps and final report
    """
    display_separator("STARTING RESEARCH")
    display_log(f"Research topic: {research_question}", message_type="info")
    display_log(f"Target audience: {audience}", message_type="info")
    
    # Modify the question to explicitly require following the process
    enhanced_prompt = f"""
    RESEARCH ASSIGNMENT:
    
    Topic: {research_question}
    Target audience: {audience}
    
    Please research this topic thoroughly by following your mandatory workflow EXACTLY:
    1. Generate specific research queries
    2. Perform 5 separate searches using the search_agent
    3. Synthesize the information
    4. Create a final report with proper format and citations
    
    Critically important: You MUST follow ALL steps in order. You CANNOT answer based on your own knowledge.
    Your report MUST begin with "### FINAL RESEARCH REPORT ###" and end with "### END OF REPORT ###".
    """
    
    # Create content object with research question
    content = types.Content(
        role='user',
        parts=[types.Part(text=enhanced_prompt)]
    )
    
    # Track research process and results
    research_steps = []
    final_report = ""
    search_count = 0
    large_response = ""
    
    # Create progress bar
    progress = tqdm(desc="Research in progress", unit="steps")
    
    try:
        # Process the research through the runner
        for event in runner.run(
            user_id=USER_ID,
            session_id=SESSION_ID,
            new_message=content
        ):
            # Handle events based on type
            if hasattr(event, 'content') and event.content:
                for part in event.content.parts:
                    # Track function calls
                    if hasattr(part, 'function_call') and part.function_call:
                        function_name = part.function_call.name
                        
                        if function_name == "search_agent":
                            # Extract the query parameter from the function call args
                            args = part.function_call.args if hasattr(part.function_call, 'args') else {}
                            search_query = args.get('query', '')
                            
                            if search_query and len(search_query.strip()) > 0:
                                search_count += 1
                                step = {
                                    "type": "search",
                                    "query": search_query,
                                    "agent": event.author,
                                    "timestamp": format_time()
                                }
                                research_steps.append(step)
                                display_log(f"Search #{search_count}: {search_query}", event.author, "search")
                            else:
                                display_log(f"WARNING: Empty search query detected", event.author, "warning")
                        
                        elif function_name == "generate_research_queries":
                            # Track when the generate_research_queries function is called
                            args = part.function_call.args if hasattr(part.function_call, 'args') else {}
                            topic = args.get('topic', '')
                            display_log(f"Generating queries for: {topic}", event.author, "manager")
                        
                        elif function_name == "finalize_report":
                            # Track when the finalize_report function is called
                            args = part.function_call.args if hasattr(part.function_call, 'args') else {}
                            findings_preview = (args.get('research_findings', '') or '')[:50] + "..."
                            target = args.get('target_audience', '')
                            display_log(f"Finalizing report for {target} audience: {findings_preview}", event.author, "manager")
                        
                        elif function_name == "google_search":
                            # This shows when the search agent actually uses google_search
                            args = part.function_call.args if hasattr(part.function_call, 'args') else {}
                            search_query = args.get('query', '')
                            
                            if search_query and len(search_query.strip()) > 0:
                                display_log(f"Executing Google search for: {search_query}", "search_agent", "search")
                            else:
                                display_log(f"WARNING: Empty Google search query", "search_agent", "warning")
                        
                        else:
                            display_log(f"Function call: {function_name}", event.author, "debug")
                        
                        progress.update(1)
                    
                    # Track function responses
                    if hasattr(part, 'function_response') and part.function_response:
                        function_name = part.function_response.name
                        
                        if function_name == "search_agent":
                            display_log(f"Received search results (search #{search_count})", event.author, "result")
                        elif function_name == "google_search":
                            display_log(f"Google search completed", "search_agent", "result")
                        elif function_name == "generate_research_queries":
                            display_log(f"Research queries generated", event.author, "result")
                        elif function_name == "finalize_report":
                            display_log(f"Report format provided", event.author, "result")
                        else:
                            display_log(f"Function response: {function_name}", event.author, "debug")
                    
                    # Track text responses
                    if hasattr(part, 'text') and part.text:
                        text = part.text
                        
                        # Check for report markers
                        if "### FINAL RESEARCH REPORT ###" in text and "### END OF REPORT ###" in text:
                            # Extract everything between the markers
                            start_marker = "### FINAL RESEARCH REPORT ###"
                            end_marker = "### END OF REPORT ###"
                            start_index = text.find(start_marker)
                            end_index = text.find(end_marker) + len(end_marker)
                            
                            if start_index >= 0 and end_index > start_index:
                                final_report = text[start_index:end_index]
                                display_log(f"Final report detected ({len(final_report)} chars)", event.author, "result")
                            else:
                                display_log(f"WARNING: Report markers found but couldn't extract report properly", event.author, "warning")
                        
                        # Also track the largest response as a backup
                        if len(text) > len(large_response):
                            large_response = text
                            if len(text) > 500:
                                display_log(f"Substantial response detected ({len(text)} chars)", event.author, "debug")
                        
                        # Log the message with preview
                        if event.author == "research_manager":
                            if len(text) > 100:
                                display_log(f"Message: {text[:100]}...", event.author, "manager")
                            else:
                                display_log(f"Message: {text}", event.author, "manager")
                        elif event.author == "search_agent":
                            if len(text) > 100:
                                display_log(f"Message: {text[:100]}...", event.author, "search_agent")
                            else:
                                display_log(f"Message: {text}", event.author, "search_agent")
        
        progress.close()
        
        # If no final report with correct markers was found, use backup strategies
        if not final_report:
            display_log("No properly formatted report found. Checking for alternatives...", message_type="warning")
            
            # Check if large_response contains substantial content that might be a report
            if large_response and len(large_response) > 500:
                display_log(f"Using largest response as report ({len(large_response)} chars)", message_type="system")
                final_report = large_response
            else:
                display_log("No substantial response found either. Checking full history...", message_type="warning")
                
                # As a last resort, scan through the entire conversation history
                history_events = list(runner.get_history(USER_ID, SESSION_ID))
                
                display_log(f"Scanning {len(history_events)} events in history...", message_type="debug")
                
                # Look for any substantial text from the manager
                for event in reversed(history_events):
                    if (hasattr(event, 'author') and event.author == "research_manager" and 
                        hasattr(event, 'content') and event.content):
                        
                        for part in event.content.parts:
                            if (hasattr(part, 'text') and part.text and 
                                len(part.text) > 300):
                                
                                if not final_report or len(part.text) > len(final_report):
                                    final_report = part.text
                                    display_log(f"Found potential report in history ({len(final_report)} chars)", message_type="system")
                
                if not final_report:
                    display_log("CRITICAL: Could not find any suitable report content!", message_type="error")
        
        # Prepare results dict
        results = {
            "question": research_question,
            "steps": research_steps,
            "report": final_report,
            "timestamp": datetime.datetime.now().isoformat(),
            "search_count": search_count,
            "audience": audience
        }
        
        # Display the final report
        if final_report:
            display_separator("FINAL REPORT")
            display_report(final_report)
        else:
            display_log("No final report was generated", message_type="error")
            
            # Print debug info
            display_separator("DEBUG INFO")
            display_log(f"Total search calls: {search_count}", message_type="debug")
            display_log(f"Total research steps: {len(research_steps)}", message_type="debug")
            display_log(f"Largest response size: {len(large_response)} chars", message_type="debug")
        
        return results
        
    except Exception as e:
        display_log(f"ERROR: {str(e)}", message_type="error")
        display_log(traceback.format_exc(), message_type="error")
        
        # Try to salvage what we can
        results = {
            "question": research_question,
            "steps": research_steps,
            "report": final_report or large_response or "Error occurred during research",
            "timestamp": datetime.datetime.now().isoformat(),
            "error": str(e)
        }
        
        return results


research_question = "From the perspective of a 5 year old, can you explain how excavators work?"
results = run_deep_research(research_question, audience="5 year old children")

# Save results to file
filename = save_research_results(results)
display_log(f"Research completed and saved to {filename}", message_type="info")




[15:25:02] Research topic: From the perspective of a 5 year old, can you explain how excavators work?


[15:25:02] Target audience: 5 year old children


Research in progress: 0steps [00:00, ?steps/s]



[15:25:04] Generating research queries for: From the perspective of a 5 year old, can you explain how excavators work?
[15:25:04] [research_manager] Message: Okay, I understand the research assignment. I will follow the mandatory workflow precisely to resear...


[15:25:04] [research_manager] Generating queries for: From the perspective of a 5 year old, can you explain how excavators work?


[15:25:04] [research_manager] Research queries generated




[15:25:06] [research_manager] Message: Okay, I have the query templates. Now I will proceed with the research phase by performing 5 separat...




[15:25:09] [research_manager] Received search results (search #0)




[15:25:11] [research_manager] Message: Now I will perform the second search.





[15:25:27] [research_manager] Received search results (search #0)




[15:25:28] [research_manager] Message: Here is my third search query:





[15:25:31] [research_manager] Received search results (search #0)




[15:25:32] [research_manager] Message: Here is my fourth search query:





[15:25:39] [research_manager] Received search results (search #0)




[15:25:39] [research_manager] Message: Here is my fifth and final search query:





[15:25:56] [research_manager] Received search results (search #0)




[15:26:01] Finalizing report for 5 year old children audience
[15:26:01] [research_manager] Finalizing report for 5 year old children audience: Excavators are big machines that are like giant sh...


[15:26:01] [research_manager] Report format provided


[15:26:08] [research_manager] Final report detected (2465 chars)


[15:26:08] [research_manager] Substantial response detected (2465 chars)


[15:26:08] [research_manager] Message: ### FINAL RESEARCH REPORT ###

**1. Introduction**
Excavators are really, really big machines! They ...




## Research Report

### FINAL RESEARCH REPORT ###

**1. Introduction**
Excavators are really, really big machines! They look like giant toy trucks with a big arm. They are super cool because they help people build things like houses and roads by digging in the dirt and moving big piles of stuff. [Search 1, 4]

**2. How It Works**
Imagine an excavator is a strong robot helper! A driver sits inside a little room called the cab and uses special handles, like joysticks in a video game, to tell the excavator what to do. [Search 1, 2, 3, 4, 5] The handles make the big arm move up, down, and side to side. At the end of the arm is a giant scoop, called a bucket. The driver makes the bucket dig into the dirt, scoop it up, lift it high, and then dump it somewhere else! [Search 1, 3, 5] The top part of the excavator can even spin all the way around so it can work in different spots without moving its feet! [Search 1, 4]

**3. Key Parts**
Excavators have important parts that help them do their job:
*   **Cab:** This is the little house where the driver sits to control everything. [Search 2, 4, 5]
*   **Arm:** This is the long part that reaches out. It's like the excavator's strong arm! It has two parts that bend, the boom and the stick. [Search 2, 5]
*   **Bucket:** This is the big scoop at the end of the arm. It's like a super strong spoon that digs and picks up dirt, rocks, and sand. [Search 1, 2, 3, 4, 5]
*   **Tracks:** These are like giant tank treads or big feet that help the excavator move around, even on bumpy ground or mud. [Search 2, 4, 5]
*   **Engine:** This part makes the excavator go and gives it power, kind of like the engine in a car. [Search 2]
*   **Hydraulics:** This is a special system that uses fluid to make the arm and bucket move with lots of strength! [Search 2, 3, 5]
*   **Counterweight:** This is a heavy part at the back that helps the excavator stay balanced when it lifts heavy things so it doesn't tip over! [Search 2]

**4. Real World Examples**
Excavators are busy helpers in many places!
*   They dig big holes when people are building new houses or swimming pools. [Search 1, 4, 5]
*   They help build roads by moving lots of dirt and rocks. [Search 1, 4]
*   They can move really big and heavy things that people can't lift. [Search 4, 5]

**5. Fun Facts**
*   Sometimes excavators are called diggers or backhoes! [Search 4]
*   They can be really big, some as heavy as lots and lots of elephants! [Search 5]

### END OF REPORT ###


Research Report
### FINAL RESEARCH REPORT ###

**1. Introduction**
Excavators are really, really big machines! They look like giant toy trucks with a big arm. They are super cool because they help people build things like houses and roads by digging in the dirt and moving big piles of stuff. [Search 1, 4]

**2. How It Works**
Imagine an excavator is a strong robot helper! A driver sits inside a little room called the cab and uses special handles, like joysticks in a video game, to tell the excavator what to do. [Search 1, 2, 3, 4, 5] The handles make the big arm move up, down, and side to side. At the end of the arm is a giant scoop, called a bucket. The driver makes the bucket dig into the dirt, scoop it up, lift it high, and then dump it somewhere else! [Search 1, 3, 5] The top part of the excavator can even spin all the way around so it can work in different spots without moving its feet! [Search 1, 4]

**3. Key Parts**
Excavators have important parts that help them do their job:
*

[15:26:08] Research results saved to research_from_the_perspective_of_a_20250501_152608.json


[15:26:08] Report saved (2465 chars, 0 steps)


[15:26:08] Research completed and saved to research_from_the_perspective_of_a_20250501_152608.json


In [None]:
import os
import json
import uuid
import datetime
import logging
from typing import Dict, List, Optional
from google.adk.agents import LlmAgent, Agent
from google.adk.runners import Runner
from google.adk.sessions import SessionService
from google.adk.tools import google_search
from google.genai import types
from google.api_core import exceptions
from slugify import slugify
import asyncio

# Hypothetical arXiv search tool (replace with actual implementation if available)
async def arxiv_search(query: str) -> Dict:
    """Mock arXiv search tool for demonstration."""
    return {"results": [{"title": f"Mock arXiv paper on {query}", "url": "http://arxiv.org/mock"}]}

# Configuration (replace with your settings module)
class Settings:
    GEMINI_GENERATE_KEY = os.getenv("GEMINI_GENERATE_KEY", "your-api-key-here")

settings = Settings()

# Set up Gemini API key
os.environ["GOOGLE_API_KEY"] = settings.GEMINI_GENERATE_KEY
os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "FALSE"

# Custom file-based session service for persistence (simplified for example)
class FileSessionService(SessionService):
    def __init__(self, storage_path: str = "sessions"):
        self.storage_path = storage_path
        os.makedirs(storage_path, exist_ok=True)
    
    def create_session(self, app_name: str, user_id: str, session_id: str) -> Dict:
        session_data = {"app_name": app_name, "user_id": user_id, "session_id": session_id, "data": {}}
        file_path = os.path.join(self.storage_path, f"{session_id}.json")
        with open(file_path, 'w') as f:
            json.dump(session_data, f)
        return session_data
    
    def get_session(self, session_id: str) -> Optional[Dict]:
        file_path = os.path.join(self.storage_path, f"{session_id}.json")
        try:
            with open(file_path, 'r') as f:
                return json.load(f)
        except FileNotFoundError:
            return None

# Configure logging for both notebook and non-notebook environments
class NotebookFormatter(logging.Formatter):
    COLORS = {
        "INFO": "#3498db",  # Blue
        "SEARCH": "#e67e22",  # Orange
        "RESULT": "#2ecc71",  # Green
        "ERROR": "#e74c3c",  # Red
        "MANAGER": "#9b59b6",  # Purple
        "SEARCH_AGENT": "#f1c40f"  # Yellow
    }
    
    def __init__(self, is_notebook: bool = False):
        self.is_notebook = is_notebook
        super().__init__()
    
    def format(self, record):
        level = record.levelname if record.levelname != "SEARCH" else "SEARCH"
        message = record.getMessage()
        timestamp = datetime.datetime.now().strftime("%H:%M:%S")
        prefix = f"[{timestamp}]"
        if hasattr(record, "agent_name") and record.agent_name:
            prefix += f" [{record.agent_name}]"
        
        if self.is_notebook:
            color = self.COLORS.get(level, "#3498db")
            return f"<div style='margin-bottom: 10px;'><span style='color: {color}; font-weight: bold;'>{prefix}</span> {message}</div>"
        return f"{prefix} {level}: {message}"

def setup_logging(is_notebook: bool = False) -> logging.Logger:
    logger = logging.getLogger("DeepResearch")
    logger.setLevel(logging.INFO)
    handler = logging.StreamHandler()
    handler.setFormatter(NotebookFormatter(is_notebook=is_notebook))
    logger.handlers = [handler]
    return logger

# Helper functions
def display_report(report: str, title: str = "Research Report", is_notebook: bool = False, logger: logging.Logger = None):
    """Display a formatted research report."""
    if is_notebook:
        from IPython.display import display, Markdown
        display(Markdown(f"## {title}"))
        display(Markdown(report))
    else:
        logger.info(f"{title}:\n{report}")

def save_research_results(results: Dict, filename: Optional[str] = None, logger: logging.Logger = None) -> str:
    """Save research results to a JSON file."""
    if filename is None:
        safe_name = slugify(results["question"][:50], separator="_")
        timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
        filename = f"research_{safe_name}_{timestamp}.json"
    
    try:
        with open(filename, 'w') as f:
            json.dump(results, f, indent=2)
        logger.info(f"Research results saved to {filename}")
        return filename
    except Exception as e:
        logger.error(f"Failed to save results: {str(e)}")
        return ""

# Application constants
APP_NAME = "deep_research_app"
USER_ID = f"user_{uuid.uuid4()}"  # Dynamic user ID
SESSION_ID = f"session_{uuid.uuid4()}"  # Dynamic session ID

# Create agents
def create_agents() -> tuple[LlmAgent, LlmAgent]:
    search_agent = LlmAgent(
        name="web_search_agent",
        model="gemini-1.5-flash",  # Lightweight model for search
        description="Agent specialized in web and academic search",
        instruction="""
        You are a web search agent. Your tasks:
        1. Receive search queries from the research manager.
        2. Execute searches using available tools (e.g., Google, arXiv).
        3. Summarize relevant information in a concise, factual format.
        4. Include source URLs or identifiers for all information.
        Return responses as JSON with fields: summary, sources.
        """,
        tools=[google_search, arxiv_search]
    )
    
    manager_agent = LlmAgent(
        name="research_manager",
        model="gemini-1.5-pro",  # Stronger model for coordination
        description="Manager agent coordinating deep research",
        instruction="""
        You are a research manager agent. For each research question:
        1. Create a plan with 3-5 specific search queries.
        2. Delegate queries to the web search agent.
        3. Synthesize results into a report with sections: Introduction, Findings, Conclusion, Citations.
        4. Identify knowledge gaps and request additional searches if needed.
        Return the final report as a markdown-formatted string with clear citations.
        """,
        sub_agents=[search_agent]
    )
    
    return search_agent, manager_agent

# Main research function
async def run_deep_research(
    research_question: str,
    logger: logging.Logger,
    is_notebook: bool = False,
    session_service: Optional[SessionService] = None
) -> Dict:
    """Execute deep research on a given question."""
    logger.info(f"Starting deep research: {research_question}")
    
    # Initialize session and runner
    session_service = session_service or FileSessionService()
    search_agent, manager_agent = create_agents()
    runner = Runner(
        agent=manager_agent,
        app_name=APP_NAME,
        session_service=session_service
    )
    session = session_service.create_session(app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID)
    
    # Create content object
    content = types.Content(role='user', parts=[types.Part(text=research_question)])
    
    # Track research process
    research_steps = []
    manager_outputs = []
    final_report = ""
    
    try:
        if is_notebook:
            from tqdm.notebook import tqdm
        else:
            from tqdm import tqdm
        
        # Use indeterminate progress bar
        with tqdm(desc="Research in progress", total=None) as progress:
            for event in runner.run(user_id=USER_ID, session_id=SESSION_ID, new_message=content):
                if not hasattr(event, 'content') or not event.content:
                    logger.warning("Empty event received")
                    continue
                
                for part in event.content.parts:
                    if hasattr(part, 'function_call') and part.function_call:
                        search_query = part.function_call.args.get('query', '')
                        step = {
                            "type": "search",
                            "query": search_query,
                            "agent": event.author,
                            "timestamp": datetime.datetime.now().strftime("%H:%M:%S")
                        }
                        research_steps.append(step)
                        logger.info(f"Searching for: {search_query}", extra={"agent_name": event.author, "level": "SEARCH"})
                        progress.update()
                    
                    if hasattr(part, 'function_response') and part.function_response:
                        logger.info(f"Received search results", extra={"agent_name": event.author, "level": "RESULT"})
                    
                    if hasattr(part, 'text') and part.text and event.author == "research_manager":
                        manager_outputs.append(part.text)
                        if "Final Report" in part.text or any(k in part.text.lower() for k in ["conclusion", "summary", "findings"]):
                            final_report = part.text
                            logger.info(f"Research report generated", extra={"agent_name": event.author, "level": "MANAGER"})
        
        # Fallback: combine manager outputs if no explicit report
        if not final_report and manager_outputs:
            final_report = "\n".join(manager_outputs)
            logger.warning("No explicit report found; combined manager outputs")
        
        if not final_report:
            logger.error("No final report generated")
        
        # Prepare results
        results = {
            "question": research_question,
            "steps": research_steps,
            "report": final_report,
            "timestamp": datetime.datetime.now().isoformat()
        }
        
        # Display and save results
        if final_report:
            display_report(final_report, is_notebook=is_notebook, logger=logger)
        save_research_results(results, logger=logger)
        
        return results
    
    except exceptions.GoogleAPIError as e:
        logger.error(f"API error during research: {str(e)}")
        return {"question": research_question, "error": str(e)}
    except Exception as e:
        logger.error(f"Unexpected error: {str(e)}")
        return {"question": research_question, "error": str(e)}


research_question = "What are the latest advancements in quantum computing and their potential applications?"
results = asyncio.run(run_deep_research(research_question, logger=logger, is_notebook=is_notebook))

ImportError: cannot import name 'SessionService' from 'google.adk.sessions' (c:\Users\xbox3\miniforge-pypy3\envs\py311env\Lib\site-packages\google\adk\sessions\__init__.py)

In [None]:
"""
Simplified AI Interview System using Google's Agent Development Kit (ADK)

This system creates a natural conversation between two AI agents:
- Expert Agent: Provides expertise on a specified topic
- Interviewer Agent: Asks questions about the topic

The conversation is saved as both text and MP3 audio files.
"""

import os
import time
import json
import uuid
import argparse
import logging
from typing import Dict, List, Any, Optional
from pathlib import Path
from io import BytesIO

# Google Agent Development Kit and Gemini imports
import google.generativeai as genai
from google.adk import LlmAgent, Runner, InMemorySessionService
from google.generativeai import types

# Audio generation and processing
from google.cloud import texttospeech
from pydub import AudioSegment

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Configuration constants
APP_NAME = "interview_simulation"
USER_ID = "user_1"


def setup_interview_agents(topic: str):
    """Set up the expert and interviewer agents using ADK.
    
    Args:
        topic: The topic for the interview
        
    Returns:
        tuple: (expert_agent, interviewer_agent)
    """
    # Create the Expert Agent
    expert_agent = LlmAgent(
        name="expert",
        model="gemini-2.0-flash-exp",  # Using Gemini Flash for quick responses
        description=f"Expert on {topic} providing authoritative but accessible information",
        instruction=f"""
        You are an expert on {topic} with deep knowledge and experience.
        
        Your role:
        - Provide factual, informative responses about {topic}
        - Share expert insights and perspectives based on your specialized knowledge
        - Present complex information in an accessible, conversational manner
        - Maintain a confident but approachable tone
        
        Guidelines:
        - Keep your responses concise and focused (2-4 sentences per response)
        - Use a conversational tone suitable for audio content
        - Avoid jargon unless necessary, and explain technical terms when used
        - Speak as if you were being interviewed for a podcast or radio show
        
        Remember, your responses will be converted to audio, so maintain a natural 
        conversational flow.
        """
    )
    
    # Define the ask_expert tool
    ask_expert_tool = {
        "name": "ask_expert",
        "description": f"Ask the expert a question about {topic}",
        "parameters": {
            "type": "object",
            "properties": {
                "question": {
                    "type": "string",
                    "description": "The question to ask the expert"
                }
            },
            "required": ["question"]
        }
    }
    
    # Create the Interviewer Agent
    interviewer_agent = LlmAgent(
        name="interviewer",
        model="gemini-2.0-flash-exp",  # Using Gemini Flash for quick responses
        description=f"Professional interviewer conducting an interview about {topic}",
        instruction=f"""
        You are a professional interviewer conducting an interview about {topic}.
        
        Your role:
        - Ask thoughtful, engaging questions about {topic}
        - Guide the conversation to cover important aspects of the topic
        - Follow up on interesting points the expert mentions
        - Represent the curiosity of your audience
        
        Conversation structure:
        1. Start with an introduction and an opening question
        2. Ask follow-up questions based on the expert's responses
        3. Explore different aspects of {topic}
        4. Conclude with a final question about future implications or advice
        
        Guidelines:
        - Keep questions concise and clear (1-2 sentences)
        - Maintain a conversational tone suitable for audio
        - Don't ask multiple questions at once
        - Listen to the expert's responses and reference them in follow-ups
        
        When you need to ask a question to the expert, call the 'ask_expert' function with your question.
        """,
        tools=[ask_expert_tool],
        sub_agents=[expert_agent]  # The interviewer can delegate to the expert
    )
    
    logger.info(f"Agents set up for topic: {topic}")
    return expert_agent, interviewer_agent


def text_to_speech(text: str, voice_config: Dict[str, Any]) -> AudioSegment:
    """Convert text to speech using Google's Text-to-Speech API.
    
    Args:
        text: The text to convert to speech
        voice_config: Configuration for the voice (gender, language, etc.)
        
    Returns:
        Audio segment with the synthesized speech
    """
    try:
        client = texttospeech.TextToSpeechClient()
        
        synthesis_input = texttospeech.SynthesisInput(text=text)
        
        # Default to NEUTRAL gender if not specified
        gender = voice_config.get("gender", texttospeech.SsmlVoiceGender.NEUTRAL)
        
        voice = texttospeech.VoiceSelectionParams(
            language_code=voice_config.get("language_code", "en-US"),
            ssml_gender=gender,
            name=voice_config.get("name", None)
        )
        
        audio_config = texttospeech.AudioConfig(
            audio_encoding=texttospeech.AudioEncoding.MP3,
            speaking_rate=voice_config.get("speaking_rate", 1.0),
            pitch=voice_config.get("pitch", 0.0)
        )
        
        response = client.synthesize_speech(
            input=synthesis_input, voice=voice, audio_config=audio_config
        )
        
        # Convert response to AudioSegment
        audio_data = BytesIO(response.audio_content)
        audio_segment = AudioSegment.from_mp3(audio_data)
        
        return audio_segment
        
    except Exception as e:
        logger.error(f"Error in text-to-speech conversion: {e}")
        # Return silent audio as fallback
        return AudioSegment.silent(duration=500)


def process_event(event, conversation, audio_segments, interviewer_voice, expert_voice):
    """Process a single event from the ADK runner.
    
    Args:
        event: The event to process
        conversation: List to store conversation entries
        audio_segments: List to store audio segments
        interviewer_voice: Voice configuration for the interviewer
        expert_voice: Voice configuration for the expert
    """
    try:
        if event.type == "text" and event.text.role == "assistant":
            # Interviewer's statement/question
            interviewer_text = event.text.text
            if interviewer_text.strip():
                logger.info("Processing interviewer text")
                conversation.append({
                    "speaker": "Interviewer",
                    "text": interviewer_text
                })
                # Convert to audio
                interviewer_audio = text_to_speech(interviewer_text, interviewer_voice)
                audio_segments.append(interviewer_audio)
                
        elif event.type == "function_call" and event.function_call.name == "ask_expert":
            # Question for the expert (just log it, no need to store)
            question = event.function_call.args.get("question", "")
            logger.info(f"Expert asked: {question[:50]}...")
            
        elif event.type == "function_response" and event.function_response.name == "ask_expert":
            # Expert's response
            expert_text = event.function_response.content
            if expert_text.strip():
                logger.info("Processing expert response")
                conversation.append({
                    "speaker": "Expert",
                    "text": expert_text
                })
                # Convert to audio
                expert_audio = text_to_speech(expert_text, expert_voice)
                audio_segments.append(expert_audio)
    
    except Exception as e:
        logger.error(f"Error processing event: {e}")


def save_interview_results(topic, session_id, conversation, audio_segments):
    """Save the interview results to files.
    
    Args:
        topic: The interview topic
        session_id: The session ID
        conversation: The conversation entries
        audio_segments: The audio segments
        
    Returns:
        Dictionary with information about the saved files
    """
    try:
        # Create output directory if it doesn't exist
        output_dir = Path("interviews")
        output_dir.mkdir(exist_ok=True)
        
        # Format the topic for filenames
        formatted_topic = topic.replace(' ', '_').lower()
        
        # Save the transcript
        transcript_path = output_dir / f"interview_{formatted_topic}_{session_id[:8]}.txt"
        with open(transcript_path, "w") as f:
            for entry in conversation:
                f.write(f"{entry['speaker']}: {entry['text']}\n\n")
        
        # Combine and save the audio
        audio_path = output_dir / f"interview_{formatted_topic}_{session_id[:8]}.mp3"
        
        if audio_segments:
            # Combine all audio segments
            combined_audio = audio_segments[0]
            for segment in audio_segments[1:]:
                # Add a small pause between segments
                silence = AudioSegment.silent(duration=500)  # 500ms pause
                combined_audio += silence + segment
            
            # Export the combined audio
            combined_audio.export(str(audio_path), format="mp3")
            
        # Save conversation as JSON for potential future use
        json_path = output_dir / f"interview_{formatted_topic}_{session_id[:8]}.json"
        with open(json_path, "w") as f:
            json.dump({
                "topic": topic,
                "session_id": session_id,
                "conversation": conversation,
                "timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
            }, f, indent=2)
        
        logger.info(f"Results saved to {output_dir}")
        
        return {
            "topic": topic,
            "session_id": session_id,
            "conversation": conversation,
            "transcript_path": str(transcript_path),
            "audio_path": str(audio_path),
            "json_path": str(json_path)
        }
    except Exception as e:
        logger.error(f"Error saving results: {e}")
        return {
            "topic": topic,
            "session_id": session_id,
            "conversation": conversation,
            "error": str(e)
        }


def run_interview(api_key: str, topic: str, num_turns: int = 5) -> Dict[str, Any]:
    """Run the interview using ADK.
    
    Args:
        api_key: Google Gemini API key
        topic: Topic for the interview
        num_turns: Number of exchange turns to complete
        
    Returns:
        Dictionary with the complete interview information
    """
    # Configure Gemini API
    genai.configure(api_key=api_key)
    
    # Generate a unique session ID
    session_id = str(uuid.uuid4())
    logger.info(f"Starting interview session {session_id[:8]} on topic: {topic}")
    
    # Set up the agents
    expert_agent, interviewer_agent = setup_interview_agents(topic)
    
    # Set up session and runner
    session_service = InMemorySessionService()
    runner = Runner(
        agent=interviewer_agent,  # Interviewer is the main agent
        app_name=APP_NAME,
        session_service=session_service
    )
    
    # Create the session
    session = session_service.create_session(
        app_name=APP_NAME,
        user_id=USER_ID,
        session_id=session_id
    )
    
    # Storage for the conversation
    conversation = []
    audio_segments = []
    
    # Voice configurations
    interviewer_voice = {
        "gender": texttospeech.SsmlVoiceGender.MALE,
        "language_code": "en-US",
        "speaking_rate": 1.0
    }
    
    expert_voice = {
        "gender": texttospeech.SsmlVoiceGender.FEMALE,
        "language_code": "en-US",
        "speaking_rate": 1.0
    }
    
    # Run the interview
    try:
        # Initialize the conversation
        initial_prompt = f"Let's conduct an interview about {topic}. Please start by introducing yourself and the topic, then ask your first question to the expert."
        
        content = types.Content(
            role='user',
            parts=[types.Part(text=initial_prompt)]
        )
        
        # Process the interview in turns
        for turn_number in range(num_turns + 1):  # +1 for final closing remarks
            logger.info(f"Starting turn {turn_number}/{num_turns}")
            
            # Determine the prompt based on the turn
            if turn_number == 0:
                # Initial prompt for the first turn
                prompt = initial_prompt
            elif turn_number == num_turns:
                # Closing prompt for the last turn
                prompt = "Please wrap up the interview with a final question and closing remarks."
            else:
                # Continuation prompt for middle turns
                prompt = "Please continue the interview with your next question."
            
            # Create the message content
            content = types.Content(
                role='user',
                parts=[types.Part(text=prompt)]
            )
            
            # Process all events for this turn
            for event in runner.run(
                user_id=USER_ID,
                session_id=session_id,
                new_message=content
            ):
                process_event(event, conversation, audio_segments, interviewer_voice, expert_voice)
            
            # Small delay to avoid API rate limits
            if turn_number < num_turns:
                time.sleep(1)
    
    except Exception as e:
        logger.error(f"Error during interview: {e}")
        # We'll still try to save what we have so far
    
    # Save the results
    results = save_interview_results(topic, session_id, conversation, audio_segments)
    logger.info(f"Interview completed with {len(conversation)} exchanges")
    
    return results


def main():
    """Main function to run the interview system."""
    parser = argparse.ArgumentParser(description="AI Interview Simulation System")
    parser.add_argument("--api_key", required=True, help="Google Gemini API key")
    parser.add_argument("--topic", required=True, help="Topic for the interview")
    parser.add_argument("--turns", type=int, default=5, help="Number of conversation turns")
    args = parser.parse_args()
    
    # Run the interview
    try:
        results = run_interview(
            api_key=args.api_key, 
            topic=args.topic,
            num_turns=args.turns
        )
        
        # Print summary information
        print("\n" + "="*50)
        print(f"Interview on '{args.topic}' completed successfully!")
        print("="*50)
        
        if "error" not in results:
            print(f"\nTranscript saved to: {results['transcript_path']}")
            print(f"Audio saved to: {results['audio_path']}")
            print(f"JSON data saved to: {results['json_path']}")
            
            # Print a sample of the conversation
            print("\nSample of the conversation:")
            for i, entry in enumerate(results['conversation'][:4]):
                print(f"{entry['speaker']}: {entry['text']}")
                if i < 3:
                    print()
        else:
            print(f"\nWarning: Encountered an error: {results['error']}")
            print(f"Partial conversation with {len(results['conversation'])} exchanges was saved.")
    
    except Exception as e:
        print(f"Error running interview: {e}")
        print("Please check logs for more details.")


if __name__ == "__main__":
    main()