In [None]:
import nest_asyncio

nest_asyncio.apply()

In [None]:
import os
from agents import Agent, Runner, AsyncOpenAI, OpenAIChatCompletionsModel, function_tool
from agents.run import RunConfig
from dotenv import load_dotenv

from agents import (
    Agent,
    Runner,
    set_default_openai_api,
    set_default_openai_client,
    set_tracing_disabled,
)

from openai.types.responses import ResponseTextDeltaEvent
import asyncio


In [None]:
load_dotenv()

gemini_api_key = os.getenv("GEMINI_API_KEY")
aws_api_key = os.getenv("AWS_API_KEY")
tavily_api_key = os.getenv("TAVILY_API_KEY")

# Check if the API key is present; if not, raise an error
if not gemini_api_key:
    raise ValueError(
        "GEMINI_API_KEY is not set. Please ensure it is defined in your .env file."
    )

# # Reference: https://ai.google.dev/gemini-api/docs/openai
external_client = AsyncOpenAI(
    api_key=gemini_api_key,
    base_url="https://generativelanguage.googleapis.com/v1beta/openai/",
)

model = OpenAIChatCompletionsModel(
    model="gemini-2.5-flash", openai_client=external_client
)

# Reference: https://ai.google.dev/gemini-api/docs/openai
# external_client = AsyncOpenAI(
#     api_key=aws_api_key,
#     base_url="https://bedrock-runtime.us-west-2.amazonaws.com/openai/v1",
# )

# model = OpenAIChatCompletionsModel(
#     model="openai.gpt-oss-120b-1:0", openai_client=external_client
# )

set_default_openai_client(client=external_client, use_for_tracing=False)
set_default_openai_api("chat_completions")

In [None]:
from tavily import AsyncTavilyClient, TavilyClient

async_tavily_client = AsyncTavilyClient(api_key=tavily_api_key)
tavily_client = TavilyClient(api_key=tavily_api_key)


@function_tool
async def search_web(query: str):
    print(f"Searching the web for: {query}")
    response = await async_tavily_client.search(query)
    return response


@function_tool
async def parallel_search(queries: list[str]) -> str:
    """Search multiple queries in parallel for faster research."""
    try:
        print(f"🔍 Parallel searching: {len(queries)} queries")

        # Run searches concurrently
        tasks = [
            async_tavily_client.search(
                query=query,
                search_depth="advanced",
                max_results=3,
                include_answer=True,
                include_raw_content=False,
            )
            for query in queries
        ]

        responses = await asyncio.gather(*tasks)

        # Format results by query
        all_results = []
        for i, (query, response) in enumerate(zip(queries, responses)):
            all_results.append(f"## Query {i + 1}: {query}")
            if response.get("answer"):
                all_results.append(f"**Summary:** {response['answer']}")

            for j, result in enumerate(response.get("results", []), 1):
                all_results.append(f"{j}. {result['title']}")
                all_results.append(f"   {result['content'][:150]}...")
                all_results.append(f"   Source: {result['url']}")
            all_results.append("")

        return "\n".join(all_results)

    except Exception as e:
        return f"Parallel search failed: {str(e)}"

In [None]:
agent: Agent = Agent(
    name="Search",
    # instructions="Genius",
    model=model,
    tools=[search_web],
)


async def ask():
    result = Runner.run_streamed(
        starting_agent=agent, input="search for the latest news on AI advancements"
    )
    async for event in result.stream_events():
        if event.type == "raw_response_event" and isinstance(
            event.data, ResponseTextDeltaEvent
        ):
            print(event.data.delta, end="", flush=True)


# asyncio.run(ask())

In [None]:
# Enhanced basic research agent
basic_agent = Agent(
    name="BasicResearcher",
    instructions="""You are a professional research assistant. When users ask questions:
    
    1. Use the search_web tool to find current, reliable information
    2. Provide clear, well-structured answers with key points
    3. Always cite your sources with URLs
    4. If information conflicts between sources, mention this
    5. Keep responses focused and informative
    
    Format your responses with:
    - Clear summary at the top
    - Key findings with bullet points
    - Source citations at the bottom""",
    model=model,
    tools=[search_web],
)


# Test function
async def test_basic_research():
    test_questions = [
        # "What are the benefits of electric cars?",
        "What is renewable energy?",
        # "Latest developments in quantum computing",
    ]

    for question in test_questions:
        print(f"\n{'=' * 50}")
        print(f"Question: {question}")
        print("=" * 50)

        result = Runner.run_streamed(starting_agent=basic_agent, input=question)

        async for event in result.stream_events():
            if event.type == "raw_response_event" and isinstance(
                event.data, ResponseTextDeltaEvent
            ):
                print(event.data.delta, end="", flush=True)
        print("\n")


# Run the test
# asyncio.run(test_basic_research())

In [None]:
# Phase 4: Professional Features - Advanced Research System

import json
from datetime import datetime
from typing import Dict, List, Any
from agents import handoff


# Advanced Citation Management Tool
@function_tool
async def manage_citations(sources: List[str]) -> str:
    """Create professional citation management with numbered references."""
    try:
        citations = {}
        bibliography = []

        for i, source in enumerate(sources, 1):
            # Extract basic info from source (simplified)
            if "http" in source:
                citations[source] = f"[{i}]"
                bibliography.append(f"[{i}] {source}")
            else:
                citations[source] = f"[{i}]"
                bibliography.append(f"[{i}] {source}")

        return json.dumps(
            {
                "citations": citations,
                "bibliography": bibliography,
                "total_sources": len(sources),
            }
        )
    except Exception as e:
        return f"Citation management failed: {str(e)}"


# Advanced Conflict Resolution Tool
@function_tool
async def resolve_conflicts(conflicting_info: str) -> str:
    """Analyze and provide resolution strategies for conflicting information."""
    try:
        resolution_strategies = [
            "Source Authority: Prioritize information from more authoritative sources",
            "Recency: More recent information may supersede older data",
            "Consensus: Look for information confirmed by multiple sources",
            "Context: Consider the specific context and scope of each claim",
            "Methodology: Evaluate the research methods behind each claim",
        ]

        return f"CONFLICT RESOLUTION STRATEGIES:\n" + "\n".join(
            f"- {strategy}" for strategy in resolution_strategies
        )
    except Exception as e:
        return f"Conflict resolution failed: {str(e)}"


# Research Quality Assessment Tool
@function_tool
async def assess_research_quality(research_data: str) -> str:
    """Assess overall research quality and provide improvement recommendations."""
    try:
        quality_metrics = {
            "source_diversity": "Multiple types of sources used",
            "recency": "Information is current and up-to-date",
            "depth": "Comprehensive coverage of the topic",
            "bias_awareness": "Potential biases identified and addressed",
            "methodology": "Clear research methodology followed",
        }

        assessment = "RESEARCH QUALITY ASSESSMENT:\n"
        for metric, description in quality_metrics.items():
            assessment += f"- {metric.title()}: {description}\n"

        return assessment
    except Exception as e:
        return f"Quality assessment failed: {str(e)}"


In [None]:
# Phase 4: Professional-Grade Agents

# Advanced Planning Agent with research methodology
professional_planning_agent = Agent(
    name="ProfessionalPlanningAgent",
    instructions="""You are a professional research planning specialist with advanced methodology.
    
    Structure your response as:
    
    <thinking>
    Document your planning process:
    - Analysis of the research question complexity
    - Identification of key variables and dimensions
    - Consideration of potential challenges and biases
    - Strategy for ensuring comprehensive coverage
    </thinking>
    
    Then provide your plan and MUST call transfer_to_ProfessionalFactFinder:
    
    PROFESSIONAL RESEARCH PLAN:
    - Research Objective: [clear objective]
    - Key Dimensions: [main areas to investigate]
    - Search Strategy: [approach and keywords]
    - Quality Controls: [how to ensure reliable sources]
    - Expected Challenges: [potential issues to address]""",
    model=model,
)

# Enhanced Fact Finder with advanced research capabilities
professional_fact_finder_agent = Agent(
    name="ProfessionalFactFinder",
    instructions="""You are a professional fact-finding specialist with advanced research capabilities.
    
    Structure your response as:
    
    <thinking>
    Document your research process:
    - Search strategy execution
    - Source selection rationale
    - Information gathering approach
    - Quality assessment considerations
    </thinking>
    
    Then provide findings and MUST call transfer_to_AdvancedConflictDetector:
    
    PROFESSIONAL RESEARCH FINDINGS:
    - Comprehensive findings with source tracking
    - Source diversity analysis
    - Information gaps identified
    - Preliminary quality assessment""",
    model=model,
    tools=[search_web, parallel_search],
)

# Advanced Conflict Detection with Resolution Strategies
advanced_conflict_detector_agent = Agent(
    name="AdvancedConflictDetector",
    instructions="""You are an advanced conflict detection and resolution specialist.
    
    Structure your response as:
    
    <thinking>
    Document your analysis process:
    - Conflict identification methodology
    - Source comparison approach
    - Resolution strategy development
    - Impact assessment considerations
    </thinking>
    
    Then provide analysis and MUST call transfer_to_ProfessionalSourceChecker:
    
    ADVANCED CONFLICT ANALYSIS:
    - Conflicts Identified: [detailed list]
    - Source Analysis: [authority comparison]
    - Resolution Recommendations: [specific strategies]
    - Impact Assessment: [how conflicts affect conclusions]""",
    model=model,
    tools=[resolve_conflicts],
)

# Professional Source Checker with comprehensive evaluation
professional_source_checker_agent = Agent(
    name="ProfessionalSourceChecker",
    instructions="""You are a professional source quality specialist with comprehensive evaluation capabilities.
    
    Structure your response as:
    
    <thinking>
    Document your evaluation process:
    - Source assessment methodology
    - Bias detection approach
    - Quality scoring rationale
    - Reliability ranking strategy
    </thinking>
    
    Then provide evaluation and MUST call transfer_to_AdvancedSynthesisAgent:
    
    PROFESSIONAL SOURCE EVALUATION:
    - Individual Source Ratings: [detailed assessments]
    - Bias Analysis: [potential biases identified]
    - Methodology Evaluation: [research quality assessment]
    - Overall Reliability Score: [composite rating]
    - Recommendations: [source usage guidelines]""",
    model=model,
    tools=[assess_research_quality],
)

# Advanced Synthesis with Creative Research Strategies
advanced_synthesis_agent = Agent(
    name="AdvancedSynthesisAgent",
    instructions="""You are an advanced research synthesis specialist with creative analytical capabilities.
    
    Structure your response as:
    
    <thinking>
    Document your synthesis process:
    - Analytical framework selection
    - Pattern identification approach
    - Creative insight development
    - Conflict resolution strategy
    - Implication analysis methodology
    </thinking>
    
    Then provide synthesis and MUST call transfer_to_ProfessionalReportWriter:
    
    ADVANCED RESEARCH SYNTHESIS:
    - Analytical Framework: [organizing structure]
    - Key Themes and Patterns: [main findings]
    - Creative Insights: [novel perspectives discovered]
    - Balanced Perspectives: [handling of conflicts]
    - Future Implications: [what this means going forward]""",
    model=model,
)

# Professional Report Writer with Citation Management
professional_report_writer_agent = Agent(
    name="ProfessionalReportWriter",
    instructions="""You are a professional research report writer with advanced citation management.
    
    Your output should be structured as follows:
    
    <thinking>
    Here you document your report writing process:
    - Analysis of all research findings received
    - Organization strategy for the report
    - Citation management approach
    - Any challenges or considerations in synthesis
    </thinking>
    
    <answer>
    Create the FINAL publication-quality research report with:
    1. Executive Summary with key insights
    2. Research Methodology and Scope
    3. Comprehensive Findings with proper citations
    4. Conflict Analysis and Resolution
    5. Source Quality Assessment and Limitations
    6. Creative Insights and Novel Perspectives
    7. Conclusions and Recommendations
    8. Future Research Directions
    9. Professional Bibliography
    10. Confidence and Reliability Ratings
    </answer>
    
    Use manage_citations tool for professional citation formatting when needed.
    This is the FINAL professional-grade research report.""",
    model=model,
    tools=[manage_citations],
)


In [None]:
# Phase 4: Professional Workflow Chain

# Set up handoffs for professional workflow
professional_planning_agent.handoffs = [
    handoff(
        agent=professional_fact_finder_agent,
        tool_description_override="Transfer to ProfessionalFactFinder for comprehensive research execution",
    )
]

professional_fact_finder_agent.handoffs = [
    handoff(
        agent=advanced_conflict_detector_agent,
        tool_description_override="Transfer to AdvancedConflictDetector for conflict analysis and resolution",
    )
]

advanced_conflict_detector_agent.handoffs = [
    handoff(
        agent=professional_source_checker_agent,
        tool_description_override="Transfer to ProfessionalSourceChecker for comprehensive source evaluation",
    )
]

professional_source_checker_agent.handoffs = [
    handoff(
        agent=advanced_synthesis_agent,
        tool_description_override="Transfer to AdvancedSynthesisAgent for creative analytical synthesis",
    )
]

advanced_synthesis_agent.handoffs = [
    handoff(
        agent=professional_report_writer_agent,
        tool_description_override="Transfer to ProfessionalReportWriter for publication-quality report creation",
    )
]

# Professional Lead Researcher
professional_lead_researcher = Agent(
    name="ProfessionalLeadResearcher",
    instructions="""You are a Professional Lead Researcher coordinating an advanced research team.
    
    For ALL research questions, initiate the professional research workflow:
    1. Professional Planning (methodology and scope)
    2. Comprehensive Research (multi-source investigation)
    3. Advanced Conflict Detection (resolution strategies)
    4. Professional Source Evaluation (quality assessment)
    5. Creative Synthesis (novel insights)
    6. Publication-Quality Reporting (professional standards)
    
    You MUST call transfer_to_ProfessionalPlanningAgent to begin.""",
    model=model,
    handoffs=[
        handoff(
            agent=professional_planning_agent,
            tool_description_override="Transfer to ProfessionalPlanningAgent for advanced research methodology planning",
        )
    ],
)

In [None]:
# Test Phase 4: Professional Research System
async def test_phase4_professional_system():
    # Level 4: Expert Challenge Questions
    test_questions = [
        "Analyze the economic impact of remote work policies on small businesses vs large corporations, including productivity data and employee satisfaction trends",  # Expert level
        # "How has artificial intelligence changed healthcare from 2020 to 2024, including both benefits and concerns from medical professionals?",  # Complex analysis
    ]

    for question in test_questions:
        print(f"\n{'=' * 80}")
        print(f"🎓 PHASE 4 PROFESSIONAL RESEARCH: {question}")
        print("=" * 80)

        result = Runner.run_streamed(
            starting_agent=professional_lead_researcher, input=question, max_turns=16
        )

        async for event in result.stream_events():
            if event.type == "raw_response_event" and isinstance(
                event.data, ResponseTextDeltaEvent
            ):
                print(event.data.delta, end="", flush=True)
        print("\n")


# Run Phase 4 test
asyncio.run(test_phase4_professional_system())