In [1]:
# Install dependencies (run this cell first!)
!uv pip install -q "pydantic-ai-slim[logfire,openai,tavily,a2a]>=0.0.49" "pydantic-settings>=2.0.0" "logfire[httpx]>=4.3.3" "httpx>=0.27.0"
print("✅ Dependencies installed successfully!")

✅ Dependencies installed successfully!


# Lecture 04: Agent Workflow Collaboration Pattern

This notebook demonstrates how a workflow can orchestrate multiple specialized agents, combining their outputs with regular code flow to provide a comprehensive result.

## Workflow Architecture

```mermaid
graph TD
    Start([👤 User Request:<br/>Get Trending Stocks Report]) --> A1[🔍 Agent 1: Trend Finder<br/>Web search for 3 trending stocks]
    
    A1 --> Stocks[📋 List of 3 Stocks<br/>STOCK1, STOCK2, STOCK3]
    
    Stocks --> Parallel{Parallel Processing<br/>for each stock}
    
    Parallel -.-> A2_1[📊 Agent 2a: Details Gatherer<br/>Get STOCK1 details]
    Parallel -.-> A2_2[📊 Agent 2b: Details Gatherer<br/>Get STOCK2 details]
    Parallel -.-> A2_3[📊 Agent 2c: Details Gatherer<br/>Get STOCK3 details]
    
    A2_1 --> A3_1[⭐ Agent 3a: Evaluator<br/>Evaluate & recommend STOCK1]
    A2_2 --> A3_2[⭐ Agent 3b: Evaluator<br/>Evaluate & recommend STOCK2]
    A2_3 --> A3_3[⭐ Agent 3c: Evaluator<br/>Evaluate & recommend STOCK3]
    
    A3_1 --> Combine[🔄 Code Flow:<br/>Combine all data]
    A3_2 --> Combine
    A3_3 --> Combine
    
    Combine --> A4[📝 Agent 4: Report Generator<br/>Create comprehensive report]
    
    A4 --> End([✅ Final Report<br/>with all 3 stocks])
    
    style Start fill:#e1f5ff,stroke:#01579b,stroke-width:3px
    style A1 fill:#fff9c4,stroke:#f57f17,stroke-width:2px
    style Stocks fill:#f3e5f5,stroke:#4a148c,stroke-width:2px
    style Parallel fill:#ffebee,stroke:#c62828,stroke-width:2px
    style A2_1 fill:#e8f5e9,stroke:#1b5e20,stroke-width:2px
    style A2_2 fill:#e8f5e9,stroke:#1b5e20,stroke-width:2px
    style A2_3 fill:#e8f5e9,stroke:#1b5e20,stroke-width:2px
    style A3_1 fill:#fff3e0,stroke:#e65100,stroke-width:2px
    style A3_2 fill:#fff3e0,stroke:#e65100,stroke-width:2px
    style A3_3 fill:#fff3e0,stroke:#e65100,stroke-width:2px
    style Combine fill:#f3e5f5,stroke:#4a148c,stroke-width:2px
    style A4 fill:#e0f2f1,stroke:#004d40,stroke-width:2px
    style End fill:#e1f5ff,stroke:#01579b,stroke-width:3px
```

**Key Components:**
1. **Agent 1 (Trend Finder)**: Uses web search to identify 3 trending stocks
2. **Agent 2 (Details Gatherer)**: For each stock, gathers detailed information in structured format
3. **Agent 3 (Evaluator)**: For each stock, provides evaluation and recommendation
4. **Code Flow**: Combines all structured outputs from multiple agents
5. **Agent 4 (Report Generator)**: Creates comprehensive final report from all data


In [None]:
# Setup: Import necessary modules
from pydantic import BaseModel, Field
from pydantic_ai import Agent
from common.tools import web_search_tool
from common.utils import create_agent_model, setup_logging, get_current_date

# Initialize logging
setup_logging()

In [None]:
# Add Logfire setup
from common.utils import setup_logfire

setup_logfire(
    service_name="workflow-pattern-lecture04",
    start_message="🚀 Lecture 04 - Workflow Pattern Notebook Started",
)

In [None]:
# Import asyncio for parallel execution
import asyncio

# Define structured output models for each agent


class TrendingStocks(BaseModel):
    """List of trending stocks - output from Agent 1."""

    stocks: list[str] = Field(
        description="List of 3 stock symbols (e.g., ['AAPL', 'MSFT', 'NVDA'])"
    )
    reasoning: str = Field(
        description="Brief explanation of why these stocks are trending"
    )


class StockDetails(BaseModel):
    """Detailed stock information - output from Agent 2."""

    symbol: str
    company_name: str
    current_price: float | None = None
    market_cap: str | None = None
    sector: str
    key_metrics: list[str] = Field(
        default_factory=list, description="Key financial metrics"
    )
    recent_news: list[str] = Field(
        default_factory=list, description="Recent news items"
    )
    analysis_date: str


class StockEvaluation(BaseModel):
    """Stock evaluation and recommendation - output from Agent 3."""

    symbol: str
    rating: str = Field(
        description="Rating: Strong Buy, Buy, Hold, Sell, or Strong Sell"
    )
    recommendation: str = Field(description="Detailed investment recommendation")
    strengths: list[str] = Field(default_factory=list)
    risks: list[str] = Field(default_factory=list)
    target_price: float | None = None


class FinalReport(BaseModel):
    """Comprehensive report for all stocks - output from Agent 4."""

    title: str
    executive_summary: str
    stocks_analysis: list[dict] = Field(
        default_factory=list, description="Combined analysis for each stock"
    )
    market_overview: str
    overall_recommendation: str
    report_date: str


# Agent 1: Trend Finder - identifies trending stocks
trend_finder_agent = Agent(
    model=create_agent_model(),
    tools=[web_search_tool],
    output_type=TrendingStocks,
    system_prompt=f"""You are a market analyst specializing in identifying trending stocks.
    
Your task is to find exactly 3 currently trending stocks using web search.
Today's date is {get_current_date()}.

Return the stock symbols and explain why they are trending.""",
)

# Agent 2: Details Gatherer - gets detailed information for a stock
details_gatherer_agent = Agent(
    model=create_agent_model(),
    tools=[web_search_tool],
    output_type=StockDetails,
    system_prompt=f"""You are a financial data analyst. Gather comprehensive stock details.
    
Analysis date: {get_current_date()}

Focus on:
- Company information
- Current price and market cap
- Key financial metrics
- Recent relevant news

Provide accurate, up-to-date information.""",
)

# Agent 3: Evaluator - evaluates stock and provides recommendation
evaluator_agent = Agent(
    model=create_agent_model(),
    output_type=StockEvaluation,
    system_prompt="""You are an investment analyst providing stock evaluations.

Based on the provided stock details, generate:
- A clear rating (Strong Buy, Buy, Hold, Sell, or Strong Sell)
- Detailed investment recommendation
- Key strengths
- Risk factors
- Target price estimate if possible

Be analytical and balanced in your assessment.""",
)

# Agent 4: Report Generator - creates comprehensive final report
report_generator_agent = Agent(
    model=create_agent_model(),
    output_type=FinalReport,
    system_prompt=f"""You are a senior financial report writer.

Create a comprehensive, professional report that synthesizes all the stock analyses provided.
Report date: {get_current_date()}

The report should include:
- An engaging title
- Executive summary of the market situation
- Clear presentation of each stock's analysis
- Market overview
- Overall investment recommendations

Write in a professional but accessible style.""",
)


# Main workflow function
async def generate_trending_stocks_report() -> FinalReport:
    """
    Orchestrates multiple agents to create a comprehensive trending stocks report.

    Workflow:
    1. Agent 1: Find 3 trending stocks
    2. Agent 2: For each stock, gather detailed information (parallel)
    3. Agent 3: For each stock, evaluate and recommend (sequential after details)
    4. Code: Combine all data
    5. Agent 4: Generate final comprehensive report
    """
    print("🔍 Step 1: Finding trending stocks...")
    # Step 1: Get trending stocks
    trending_result = await trend_finder_agent.run(
        f"Find 3 trending stocks today ({get_current_date()}). Return exactly 3 stock symbols."
    )
    trending_stocks = trending_result.output
    print(f"   Found stocks: {', '.join(trending_stocks.stocks)}")
    print(f"   Reasoning: {trending_stocks.reasoning}\n")

    # Step 2 & 3: For each stock, gather details and evaluate (parallel processing)
    print("📊 Step 2 & 3: Gathering details and evaluating stocks (in parallel)...")

    async def process_stock(symbol: str) -> tuple[StockDetails, StockEvaluation]:
        """Process a single stock: get details, then evaluate."""
        print(f"   Processing {symbol}...")

        # Get details
        details_result = await details_gatherer_agent.run(
            f"Gather detailed information for {symbol} stock."
        )
        details = details_result.output

        # Evaluate based on details
        eval_prompt = f"""Evaluate {symbol} stock based on this information:

Company: {details.company_name}
Sector: {details.sector}
Current Price: ${details.current_price if details.current_price else 'N/A'}
Market Cap: {details.market_cap or 'N/A'}

Key Metrics:
{chr(10).join(f'- {metric}' for metric in details.key_metrics)}

Recent News:
{chr(10).join(f'- {news}' for news in details.recent_news[:3])}

Provide your evaluation and recommendation."""

        eval_result = await evaluator_agent.run(eval_prompt)
        evaluation = eval_result.output

        print(f"   ✓ {symbol} complete: {evaluation.rating}")
        return details, evaluation

    # Process all stocks in parallel
    results = await asyncio.gather(
        *[process_stock(symbol) for symbol in trending_stocks.stocks]
    )

    print("\n🔄 Step 4: Combining all data...")
    # Step 4: Combine all data (regular code flow)
    combined_data = []
    for i, symbol in enumerate(trending_stocks.stocks):
        details, evaluation = results[i]
        combined_data.append(
            {
                "symbol": symbol,
                "company_name": details.company_name,
                "sector": details.sector,
                "current_price": details.current_price,
                "market_cap": details.market_cap,
                "key_metrics": details.key_metrics,
                "recent_news": details.recent_news,
                "rating": evaluation.rating,
                "recommendation": evaluation.recommendation,
                "strengths": evaluation.strengths,
                "risks": evaluation.risks,
                "target_price": evaluation.target_price,
            }
        )

    print("📝 Step 5: Generating final comprehensive report...\n")
    # Step 5: Generate final report
    report_prompt = f"""Create a comprehensive trending stocks report.

Trending Stocks Context:
{trending_stocks.reasoning}

Stock Analyses:
"""

    for data in combined_data:
        report_prompt += f"""
--- {data['symbol']} ({data['company_name']}) ---
Sector: {data['sector']}
Price: ${data['current_price'] if data['current_price'] else 'N/A'}
Rating: {data['rating']}
Recommendation: {data['recommendation']}
Strengths: {', '.join(data['strengths'])}
Risks: {', '.join(data['risks'])}
"""

    final_result = await report_generator_agent.run(report_prompt)
    return final_result.output

In [None]:
# Execute the workflow to generate trending stocks report
# This will orchestrate all 4 agents + code flow
print("=" * 80)
print("STARTING AGENT WORKFLOW COLLABORATION")
print("=" * 80)
print()

report = await generate_trending_stocks_report()

print()
print("=" * 80)
print("FINAL REPORT GENERATED")
print("=" * 80)
print()
print(f"Title: {report.title}")
print(f"\nExecutive Summary:\n{report.executive_summary}")
print(f"\nMarket Overview:\n{report.market_overview}")
print(f"\nOverall Recommendation:\n{report.overall_recommendation}")
print(f"\nReport Date: {report.report_date}")

In [None]:
# Display detailed information for each stock in the report
print("=" * 80)
print("DETAILED STOCK ANALYSIS")
print("=" * 80)
print()

for stock_data in report.stocks_analysis:
    print(f"\n{'='*80}")
    print(f"  {stock_data['symbol']} - {stock_data['company_name']}")
    print(f"{'='*80}")
    print("\n📊 DETAILS:")
    print(f"  Sector: {stock_data['sector']}")
    print(
        f"  Current Price: ${stock_data['current_price'] if stock_data['current_price'] else 'N/A'}"
    )
    print(f"  Market Cap: {stock_data['market_cap'] or 'N/A'}")

    print("\n⭐ EVALUATION:")
    print(f"  Rating: {stock_data['rating']}")
    print(
        f"  Target Price: ${stock_data['target_price'] if stock_data.get('target_price') else 'N/A'}"
    )

    print("\n💪 STRENGTHS:")
    for strength in stock_data["strengths"]:
        print(f"  • {strength}")

    print("\n⚠️  RISKS:")
    for risk in stock_data["risks"]:
        print(f"  • {risk}")

    print("\n💡 RECOMMENDATION:")
    print(f"  {stock_data['recommendation']}")
    print()

In [None]:
# Optional: Run the workflow again to get fresh trending stocks
# Uncomment to execute
# print("\n\n🔄 Running workflow again for fresh data...\n")
# new_report = await generate_trending_stocks_report()
# print(f"\nNew Report Title: {new_report.title}")
# print(f"New Trending Stocks: {', '.join([s['symbol'] for s in new_report.stocks_analysis])}")

## Workflow Pattern Summary

This notebook demonstrated the **Agent Workflow Collaboration Pattern**, where:

### Key Characteristics:
1. **Multiple Specialized Agents**: Each agent has a specific role and expertise
   - Trend Finder: Discovers trending stocks
   - Details Gatherer: Collects comprehensive stock information
   - Evaluator: Analyzes and provides recommendations
   - Report Generator: Synthesizes everything into a final report

2. **Structured Outputs**: Each agent produces well-defined structured data using Pydantic models
   - Enables reliable data passing between agents
   - Ensures consistency and type safety
   - Makes the workflow predictable

3. **Parallel Execution**: Processing multiple stocks simultaneously using `asyncio.gather()`
   - Improves performance
   - Demonstrates coordination of concurrent agent tasks

4. **Code-Driven Orchestration**: Regular Python code combines agent outputs
   - Workflow logic is explicit and controllable
   - Flexibility to add business logic between agent calls
   - Easy to debug and modify

5. **Sequential Dependencies**: Some agents depend on outputs from previous agents
   - Details Gatherer runs first for each stock
   - Evaluator uses the details to make recommendations
   - Report Generator synthesizes all previous results

### Benefits:
- ✅ **Separation of Concerns**: Each agent focuses on one task
- ✅ **Reusability**: Agents can be reused in different workflows
- ✅ **Scalability**: Easy to add more agents or processing steps
- ✅ **Maintainability**: Clear structure makes debugging easier
- ✅ **Performance**: Parallel processing where possible

### When to Use This Pattern:
- Complex tasks requiring multiple specialized skills
- Need for structured, predictable outputs
- Tasks that can benefit from parallel processing
- Situations where you need fine control over workflow logic
