## Error Handling Comparison: `dspy.Parallel` vs Async

Both approaches convert errors to strings that the LLM can reason about:

### DSPy.Parallel approach:
```python
# Errors automatically become strings
par = dspy.Parallel(num_threads=5)
outputs = par.forward(jobs)
# If a tool fails, output is: "Error: [details]"
```

### Our async approach:
```python
# Same behavior with return_exceptions=True
results = await asyncio.gather(*tasks, return_exceptions=True)
# Errors are converted to: "[ERROR] ExceptionType: details"
```

**Key insight**: The LLM receives error information in both cases and can:
1. See which specific tools failed
2. Understand why they failed
3. Decide how to proceed (retry, use alternative tool, etc.)

This maintains DSPy's philosophy of "errors as information" rather than crashes!

In [ ]:
# Demonstrate error handling - async maintains DSPy's elegant error handling
async def test_error_handling():
    """Show how async handles errors just like dspy.Parallel"""
    
    from agent import async_batch_call
    
    # Mix of valid and invalid tool calls
    calls = [
        {"tool_name": "web_search", "args": {"query": "valid search", "count": 2}},
        {"tool_name": "invalid_tool", "args": {"query": "this will fail"}},  # Invalid tool
        {"tool_name": "wikipedia_search", "args": {"query": "Python programming", "sentences": 3}},
        {"tool_name": "web_search", "args": {}},  # Missing required 'query' argument
    ]
    
    print("Testing error handling with mixed valid/invalid calls:\n")
    results = await async_batch_call(calls)
    
    # Show how errors are converted to strings for LLM reasoning
    for i, result in enumerate(results):
        print(f"Call {i+1}: {result[:150]}...")
        if "[ERROR]" in result:
            print("  ^ LLM can see this error and reason about what to do next\n")
        else:
            print("  ^ Success\n")

await test_error_handling()

## Async Multi-Agent Research System

This notebook demonstrates the async implementation of our multi-agent research system using DSPy 3.0.

### Key Features:
1. **True async execution** - All web searches and LLM calls run concurrently
2. **Clean architecture** - Reusable components in `agent.py`
3. **Native Jupyter support** - No need for `nest_asyncio` hacks
4. **Preserved functionality** - All data models and business logic unchanged

### Performance Benefits:
- Parallel tool execution (3-5x faster)
- Non-blocking I/O operations
- Better resource utilization
- Scalable to many concurrent operations

In [ ]:
# Advanced usage: Custom async research with direct tool access
async def custom_research(topic: str):
    """Example of building custom async research workflows"""
    print(f"Researching: {topic}\n")
    
    # Phase 1: Parallel reconnaissance
    recon_calls = [
        {"tool_name": "web_search", "args": {"query": f"{topic} overview", "count": 2}},
        {"tool_name": "wikipedia_search", "args": {"query": topic, "sentences": 5}}
    ]
    
    print("Phase 1: Reconnaissance...")
    recon_results = await async_batch_call(recon_calls)
    
    # Phase 2: Deep dive based on reconnaissance
    print("\nPhase 2: Deep dive...")
    deep_calls = [
        {"tool_name": "web_search", "args": {"query": f"{topic} latest research 2024", "count": 3}},
        {"tool_name": "web_search", "args": {"query": f"{topic} expert analysis", "count": 3}}
    ]
    
    deep_results = await async_batch_call(deep_calls)
    
    return {
        "reconnaissance": recon_results,
        "deep_dive": deep_results
    }

# Example usage
research_results = await custom_research("multi-agent systems AI")
print(f"\nCompleted research with {len(research_results['reconnaissance']) + len(research_results['deep_dive'])} total searches")

In [ ]:
# Import required libraries
import dspy
import pydantic
import openai
import wikipedia
import asyncio
from agent import run_research, AsyncLeadAgent, TOOLS

# Print versions
print(f"DSPy version: {dspy.__version__}")
print(f"Wikipedia version: {wikipedia.__version__}")
print(f"Pydantic version: {pydantic.__version__}")
print(f"OpenAI version: {openai.__version__}")

In [ ]:
# Config setup
import os
from dotenv import load_dotenv

load_dotenv()

# Get configuration from environment
MODEL = os.getenv("MODEL_NAME")
PLANNER_MODEL = os.getenv("PLANNER_MODEL")
TEMPERATURE = float(os.getenv("TEMPERATURE", "1.0"))
MAX_TOKENS = int(os.getenv("MAX_TOKENS", "4000"))

print(f"MODEL: {MODEL}")
print(f"PLANNER_MODEL: {PLANNER_MODEL}")
print(f"TEMPERATURE: {TEMPERATURE}")
print(f"MAX_TOKENS: {MAX_TOKENS}")

# Configure DSPy with default LM
lm = dspy.LM(model=MODEL, temperature=TEMPERATURE, max_tokens=MAX_TOKENS)
dspy.configure(lm=lm)

# Create planner LM for more advanced planning
planner_lm = dspy.LM(
    model=PLANNER_MODEL,
    temperature=TEMPERATURE,
    max_tokens=MAX_TOKENS
)

In [ ]:
# Test basic async functionality
async def test_async():
    # Test single async web search
    result = await TOOLS["web_search"].acall(query="DSPy framework", count=2)
    print("Single async search result:")
    print(result)
    print("\n" + "="*50 + "\n")
    
    # Test parallel searches
    from agent import async_batch_call
    calls = [
        {"tool_name": "web_search", "args": {"query": "Lamine Yamal stats", "count": 2}},
        {"tool_name": "web_search", "args": {"query": "Desire Doue stats", "count": 2}},
        {"tool_name": "wikipedia_search", "args": {"query": "Lamine Yamal", "sentences": 3}}
    ]
    
    results = await async_batch_call(calls)
    print("Parallel search results:")
    for i, result in enumerate(results):
        print(f"\nResult {i+1}:")
        print(result[:200] + "..." if len(result) > 200 else result)

# Run the test
await test_async()

In [ ]:
# Define our research query
query = "Research on Lamine Yamal vs Desire Doue as wingers"

# Run async research with the planner LM
analysis, plan = await run_research(query, planner_lm=planner_lm, verbose=True)

In [ ]:
# Compare sync vs async performance
import time

# Test synchronous approach (simulated)
print("=== Synchronous Execution (Simulated) ===")
start_sync = time.time()

# Simulate sequential searches (3 seconds each)
search_times = [3, 3, 2]  # Simulated times for web searches
total_sync_time = sum(search_times)
print(f"Sequential execution would take: {total_sync_time} seconds")

# Test async approach
print("\n=== Async Execution (Actual) ===")
start_async = time.time()

# Run multiple searches in parallel
calls = [
    {"tool_name": "web_search", "args": {"query": "Lamine Yamal Barcelona", "count": 3}},
    {"tool_name": "web_search", "args": {"query": "Desire Doue Rennes", "count": 3}},
    {"tool_name": "web_search", "args": {"query": "young wingers comparison 2024", "count": 2}}
]

from agent import async_batch_call
results = await async_batch_call(calls)

end_async = time.time()
async_time = end_async - start_async

print(f"Async execution took: {async_time:.2f} seconds")
print(f"Speed improvement: {total_sync_time / async_time:.1f}x faster!")
print(f"\nGot {len(results)} results in parallel")

In [42]:
# DSPY SIGNATURE

class AnalyzeQuery(dspy.Signature):
    """Analyze query to determine research strategy. Categorize as depth-first (one topic, multiple angles), breadth-first (multiple independent topics), or straightforward (simple fact)."""
    query: str = dspy.InputField(desc="The user's research query")
    analysis: QueryAnalysis = dspy.OutputField(desc="Strategic analysis for delegation planning")

class PlanResearch(dspy.Signature):
    """Create delegation plan for subagents based on analysis. Use tools sparingly for reconnaissance only (verify entities, assess scope). Output specific research tasks for subagents, not research results."""
    query: str = dspy.InputField(desc="The user's research query")
    analysis: QueryAnalysis = dspy.InputField(desc="Strategic analysis from previous step")
    plan: ResearchPlan = dspy.OutputField(desc="Delegation plan with specific tasks for subagents")

In [44]:
# Simple Tools
import nest_asyncio
nest_asyncio.apply()  # Enable nested event loops for Jupyter

def web_search(query: str, count: int = 5) -> str:
    """Search the web using Brave Search.
    
    Args:
        query: Search query
        count: Number of results (default: 5)
        
    Returns:
        Formatted search results
    """
    if count > 5:
        count = 5
    
    async def _search():
        client = BraveSearch(api_key=BRAVE_SEARCH_API_KEY)
        return await client.web(WebSearchRequest(q=query, count=count))
    
    try:
        response = asyncio.run(_search())
        
        if not response.web or not response.web.results:
            return f"No results found for '{query}'"

        results = []
        for i, result in enumerate(response.web.results[:count], 1):
            results.append(f"{i}. {result.title}\n   {result.description}\n   {result.url}")
        
        return f"Search results for '{query}':\n\n" + "\n\n".join(results)
    except Exception as e:
        return f"Search error: {e}"

def wikipedia_search(query: str, sentences: int = 3) -> str:
    """
    Return a concise English summary for `query` (≤ `sentences` sentences).

    If Wikipedia returns multiple possible pages (disambiguation), we list the
    top 5 options so the calling agent can decide what to do next.
    """
    try:
        wikipedia.set_lang("en")
        titles = wikipedia.search(query, results=1)
        if not titles:
            return f"No Wikipedia article found for '{query}'."

        title = titles[0]
        summary = wikipedia.summary(title, sentences=sentences, auto_suggest=False)
        return f"Wikipedia – {title}\n\n{summary}"

    except wikipedia.exceptions.DisambiguationError as e:
        # Show a short disambiguation list
        opts = "\n • ".join(e.options[:5])
        return f"Wikipedia disambiguation for '{query}'. Try one of:\n • {opts}"
    except Exception as err:
        return f"Wikipedia error: {err}"

# Register tools
TOOLS = {
    "web_search": dspy.Tool(web_search),
    "wikipedia_search": dspy.Tool(wikipedia_search),
}


def batch_tool_call(calls: list[dict]) -> list[str]:
    """
    Execute multiple tool calls in parallel for efficiency.
    
    Args:
        calls: List of dicts, each with:
            - tool_name: Name of the tool ('web_search' or 'wikipedia_search')
            - args: Dictionary of arguments for that tool
            
    Example:
        calls = [
            {"tool_name": "web_search", "args": {"query": "Lamine Yamal stats", "count": 2}},
            {"tool_name": "web_search", "args": {"query": "Desire Doue stats", "count": 2}},
            {"tool_name": "wikipedia_search", "args": {"query": "Lamine Yamal", "sentences": 5}},
            {"tool_name": "wikipedia_search", "args": {"query": "Desire Doue", "sentences": 5}}
        ]
    """
    jobs = []
    results = []
    for call in calls:
        tool_name = call.get("tool_name")
        args = call.get("args", {})

        if not tool_name or tool_name not in TOOLS:
            results.append(f"[ERROR] Unknown tool: {tool_name}")
        else:
            jobs.append((TOOLS[tool_name], args))

    if jobs:
        num_workers = len(calls)
        par = dspy.Parallel(num_threads=num_workers)
        outputs = par.forward(jobs)
        for i, output in enumerate(outputs):
            tool = jobs[i][0]
            if isinstance(output, Exception):
                output = f"[ERROR] {output}"
            results.append(f"{tool.name}: {output}")

    return results

In [46]:
# Test the improved orchestrator
# Phase 1: Analyze the query
analysis = query_analysis(query=query)
print("=== Query Analysis ===")
print(f"Type: {analysis.analysis.query_type}")
print(f"Complexity: {analysis.analysis.complexity}")
print(f"Main concepts: {analysis.analysis.main_concepts}")
print(f"Key entities: {analysis.analysis.key_entities}")

# Phase 2: Create research plan with minimal reconnaissance
print("\n=== Creating Research Plan ===")
plan = research_planner(query=query, analysis=analysis.analysis)
print(f"\nPlan has {len(plan.plan.steps)} steps:")
for step in plan.plan.steps:
    print(f"\nStep {step.id}: {step.description}")
    print(f"  Budget: {step.budget_calls} tool calls")
    print(f"  Depends on: {step.depends_on}")

[92m17:07:27 - LiteLLM:INFO[0m: utils.py:3119 - 
LiteLLM completion() model= google/gemini-2.5-flash-lite-preview-06-17; provider = openrouter


=== Query Analysis ===
Type: depth_first
Complexity: complex
Main concepts: ['Lamine Yamal', 'Desire Doue', 'Winger performance', 'Soccer player comparison']
Key entities: ['Lamine Yamal', 'Desire Doue']

=== Creating Research Plan ===


[92m17:07:30 - LiteLLM:INFO[0m: utils.py:1215 - Wrapper: Completed Call, calling success_handler
[92m17:07:30 - LiteLLM:INFO[0m: cost_calculator.py:655 - selected model name for cost calculation: openrouter/google/gemini-2.5-flash-lite-preview-06-17
[92m17:07:30 - LiteLLM:INFO[0m: cost_calculator.py:655 - selected model name for cost calculation: openrouter/google/gemini-2.5-flash-lite-preview-06-17


Processed 2 / 2 examples: 100%|████████████████████████████| 2/2 [00:01<00:00,  1.20it/s]

[92m17:07:31 - LiteLLM:INFO[0m: utils.py:3119 - 
LiteLLM completion() model= google/gemini-2.5-flash-lite-preview-06-17; provider = openrouter





[92m17:07:33 - LiteLLM:INFO[0m: utils.py:1215 - Wrapper: Completed Call, calling success_handler
[92m17:07:33 - LiteLLM:INFO[0m: cost_calculator.py:655 - selected model name for cost calculation: openrouter/google/gemini-2.5-flash-lite-preview-06-17
[92m17:07:33 - LiteLLM:INFO[0m: cost_calculator.py:655 - selected model name for cost calculation: openrouter/google/gemini-2.5-flash-lite-preview-06-17
[92m17:07:33 - LiteLLM:INFO[0m: utils.py:3119 - 
LiteLLM completion() model= google/gemini-2.5-flash-lite-preview-06-17; provider = openrouter
[92m17:07:35 - LiteLLM:INFO[0m: utils.py:1215 - Wrapper: Completed Call, calling success_handler
[92m17:07:35 - LiteLLM:INFO[0m: cost_calculator.py:655 - selected model name for cost calculation: openrouter/google/gemini-2.5-flash-lite-preview-06-17
[92m17:07:35 - LiteLLM:INFO[0m: cost_calculator.py:655 - selected model name for cost calculation: openrouter/google/gemini-2.5-flash-lite-preview-06-17
[92m17:07:36 - LiteLLM:INFO[0m: uti

Processed 2 / 2 examples: 100%|████████████████████████████| 2/2 [00:01<00:00,  1.90it/s]

[92m17:07:40 - LiteLLM:INFO[0m: utils.py:3119 - 
LiteLLM completion() model= google/gemini-2.5-flash-lite-preview-06-17; provider = openrouter





[92m17:07:42 - LiteLLM:INFO[0m: utils.py:1215 - Wrapper: Completed Call, calling success_handler
[92m17:07:42 - LiteLLM:INFO[0m: cost_calculator.py:655 - selected model name for cost calculation: openrouter/google/gemini-2.5-flash-lite-preview-06-17
[92m17:07:42 - LiteLLM:INFO[0m: cost_calculator.py:655 - selected model name for cost calculation: openrouter/google/gemini-2.5-flash-lite-preview-06-17



Plan has 2 steps:

Step 1: Search for detailed winger performance statistics for Lamine Yamal, focusing on dribbling, crossing, chance creation, goals, assists, and defensive contributions. Use multiple reputable sports statistics websites.
  Budget: 5 tool calls
  Depends on: []

Step 2: Search for detailed winger performance statistics for Desire Doue, focusing on dribbling, crossing, chance creation, goals, assists, and defensive contributions. Use multiple reputable sports statistics websites.
  Budget: 5 tool calls
  Depends on: [1]


In [None]:
# Test the improved orchestrator with GPT-4o
# Phase 1: Analyze the query
analysis = query_analysis(query=query)
print("=== Query Analysis ===")
print(f"Type: {analysis.analysis.query_type}")
print(f"Complexity: {analysis.analysis.complexity}")
print(f"Main concepts: {analysis.analysis.main_concepts}")
print(f"Key entities: {analysis.analysis.key_entities}")

# Phase 2: Create research plan with GPT-4o planner
print("\n=== Creating Research Plan with GPT-4o ===")
plan = research_planner(query=query, analysis=analysis.analysis)
print(f"\nPlan has {len(plan.plan.steps)} steps:")
for step in plan.plan.steps:
    print(f"\nStep {step.id}: {step.description}")
    print(f"  Budget: {step.budget_calls} tool calls")
    print(f"  Depends on: {step.depends_on}")