# Tutorial 07: Advanced Multi-Agent Workflows

##  Learning Objectives
By the end of this notebook, you will:
- Build **custom workflow graphs** with `WorkflowBuilder` for complex routing
- Create **custom executors** with business logic
- Implement **fan-out/fan-in patterns** for parallel processing
- Use **AI-powered orchestration** with `MagenticBuilder`
- Handle **conditional routing** and complex workflow patterns
- Understand **when to use advanced patterns** vs basic workflows

##  Prerequisites

Before starting this tutorial, you should have completed:
- **Tutorial 06: Multi-Agent Workflows** (SequentialBuilder, ConcurrentBuilder)

This tutorial builds on those concepts with more advanced patterns!

##  What Makes These "Advanced"?

**Basic Workflows (Tutorial 06):**
-  Simple, declarative patterns
-  Sequential or concurrent execution
-  No custom logic needed
- Examples: `SequentialBuilder`, `ConcurrentBuilder`

**Advanced Workflows (This Tutorial):**
-  Custom business logic in executors
-  Conditional routing and branching
-  Fan-out/fan-in with custom gates
-  AI-powered dynamic orchestration
- Examples: `WorkflowBuilder`, `MagenticBuilder`

### When to Use Advanced Patterns

| Use Case | Pattern | Why? |
|----------|---------|------|
| Approval workflows | WorkflowBuilder | Need conditional routing based on approval status |
| Budget validation gates | WorkflowBuilder | Need to validate before proceeding |
| Multi-stage pipelines | WorkflowBuilder | Need custom control flow |
| Dynamic task planning | MagenticBuilder | Don't know optimal workflow ahead of time |
| Adaptive systems | MagenticBuilder | Need AI to decide coordination strategy |

---

## Step 1: Setup and Imports

In [1]:
import asyncio
from typing import cast

from agent_framework import (
    # Advanced Workflow Builders
    WorkflowBuilder,
    MagenticBuilder,
    # Workflow Components
    Executor,
    AgentExecutor,
    handler,
    WorkflowContext,
    # Events for tracking
    WorkflowOutputEvent,
    AgentRunEvent,
    ExecutorInvokedEvent,
    # Basic types
    ChatMessage,
    Role,
)
from agent_framework.azure import AzureAIAgentClient
from azure.identity.aio import AzureCliCredential
from dotenv import load_dotenv

load_dotenv()
print(" Imports successful!")
print("ðŸ“¦ Advanced workflow builders ready: WorkflowBuilder, MagenticBuilder")

ModuleNotFoundError: No module named 'agent_framework'

## Step 2: Create Specialized Agents

We'll reuse the travel agent specialists from Tutorial 06.

In [16]:
async def create_travel_agents():
    """
    Create specialized travel planning agents.
    """
    # Note: Using AzureCliCredential without context manager to avoid session closure issues
    # The credential is reused across multiple agent calls
    credential = AzureCliCredential()
    chat_client = AzureAIAgentClient(async_credential=credential)
    
    # Flight Expert
    flight_agent = chat_client.create_agent(
        instructions="""
        You are an expert flight booking specialist. 
        Provide concise, practical flight recommendations considering:
        - Best times to book
        - Airline preferences and quality
        - Connection strategies
        - Price vs convenience tradeoffs
        
        Keep responses brief (2-3 sentences max).
        """,
        name="FlightExpert",
    )
    
    # Hotel Expert
    hotel_agent = chat_client.create_agent(
        instructions="""
        You are an expert hotel booking specialist.
        Provide concise hotel recommendations considering:
        - Best neighborhoods for tourists
        - Value for money
        - Proximity to attractions and transport
        - Hotel quality and amenities
        
        Keep responses brief (2-3 sentences max).
        """,
        name="HotelExpert",
    )
    
    # Activities Expert
    activities_agent = chat_client.create_agent(
        instructions="""
        You are an expert local activities and experiences specialist.
        Provide concise activity recommendations considering:
        - Must-see attractions
        - Local favorites and hidden gems
        - Cultural experiences
        - Food and dining
        
        Keep responses brief (2-3 sentences max).
        """,
        name="ActivitiesExpert",
    )
    
    return flight_agent, hotel_agent, activities_agent

print(" Agent factory created")

 Agent factory created


## Step 3: Custom Executors - WorkflowBuilder

**Custom executors** allow you to add business logic to your workflows:
- Validation gates (check conditions before proceeding)
- Data transformation (modify messages between agents)
- Conditional routing (send to different agents based on logic)
- Aggregation logic (combine results in custom ways)

### Creating a Custom Executor

All custom executors must:
1. **Inherit from `Executor`**
2. **Call `super().__init__(id="unique_id")`** in `__init__`
3. **Define a `@handler` method** with `WorkflowContext` type annotation
4. **Use `ctx.send_message()`** to pass data to downstream executors

In [17]:
# Custom executor for budget validation
class BudgetValidator(Executor):
    """Validates trip budget before proceeding to planning agents."""
    
    def __init__(self):
        # REQUIRED: Call super().__init__() with unique ID
        super().__init__(id="budget_validator")
    
    @handler
    async def validate_budget(self, request: str, ctx: WorkflowContext[str]):
        """
        Check if budget is mentioned and forward to downstream agents.
        
        This demonstrates:
        - Business logic execution (budget keyword detection)
        - Message transformation (adding context)
        - Passing to downstream executors via ctx.send_message()
        """
        print(f"    BudgetValidator: Analyzing request...")
        
        # Business logic: Check for budget-related keywords
        budget_keywords = ['budget', '$', 'price', 'cost', 'cheap', 'expensive', 'affordable']
        has_budget = any(word in request.lower() for word in budget_keywords)
        
        if has_budget:
            print(f"    Budget considerations detected")
            # Transform message with budget context
            enhanced_request = f"Budget-conscious trip request: {request}"
        else:
            print(f"     No budget specified - assuming standard pricing")
            # Add budget reminder
            enhanced_request = f"Standard trip request (consider mid-range options): {request}"
        
        # Send to downstream agents (fan-out will happen via edges)
        await ctx.send_message(enhanced_request)

print(" BudgetValidator executor created")

 BudgetValidator executor created


## Step 4: Building Custom Workflow Graphs

**WorkflowBuilder** gives you full control over the workflow graph:
- Add executors and edges manually
- Create fan-out patterns (1 â†’ many)
- Create fan-in patterns (many â†’ 1)
- Add conditional routing
- Build complex DAGs

### Graph Construction Pattern

```python
workflow = (
    WorkflowBuilder()
    .set_start_executor(start_node)           # Define entry point
    .add_fan_out_edges(validator, [a, b, c])  # 1 â†’ many
    .add_edge(a, aggregator)                   # Individual edges
    .add_edge(b, aggregator)
    .add_edge(c, aggregator)
    .build()                                   # Validate and create
)
```

In [24]:
async def custom_workflow_demo():
    """
    Demonstrate WorkflowBuilder: custom DAG with validation gate and fan-out/fan-in.
    
    Workflow Graph:
    User Input â†’ BudgetValidator â†’ Fan-out to 3 agents â†’ Aggregator â†’ Final Output
    """
    print("="*70)
    print("CUSTOM WORKFLOW: Budget Validation + Parallel Agents + Aggregation")
    print("="*70)
    print("\nWorkflow Graph:")
    print("  User â†’ BudgetValidator â†’ Fan-out â”¬â†’ FlightExpert")
    print("                                    â”œâ†’ HotelExpert    â†’ Aggregator")
    print("                                    â””â†’ ActivitiesExpert")
    print()
    
    # Import necessary types
    from agent_framework import (
        AgentExecutor,
        AgentExecutorRequest,
        AgentExecutorResponse,
    )
    from typing import Never
    
    # Create an aggregator to collect and format all agent responses
    class TravelPlanAggregator(Executor):
        """Collects responses from all travel agents and formats final plan"""
        
        def __init__(self):
            super().__init__(id="travel_plan_aggregator")
        
        @handler
        async def aggregate(
            self, 
            results: list[AgentExecutorResponse], 
            ctx: WorkflowContext[Never, str]
        ) -> None:
            """Aggregate all agent responses into a formatted travel plan"""
            # Extract responses by agent (silently - no print here)
            responses_by_agent = {}
            for result in results:
                agent_id = result.executor_id
                response_text = result.agent_run_response.text if result.agent_run_response else ""
                if response_text:
                    responses_by_agent[agent_id] = response_text
            
            # Format the consolidated plan
            plan = "\n" + "="*70 + "\n"
            plan += "ðŸ“‹ YOUR COMPLETE BUDGET BARCELONA TRAVEL PLAN\n"
            plan += "="*70 + "\n\n"
            
            if "FlightExpert" in responses_by_agent:
                plan += "  FLIGHTS:\n"
                plan += "â”€"*70 + "\n"
                plan += f"{responses_by_agent['FlightExpert']}\n\n"
            
            if "HotelExpert" in responses_by_agent:
                plan += " ACCOMMODATION:\n"
                plan += "â”€"*70 + "\n"
                plan += f"{responses_by_agent['HotelExpert']}\n\n"
            
            if "ActivitiesExpert" in responses_by_agent:
                plan += "ðŸŽ­ ACTIVITIES & EXPERIENCES:\n"
                plan += "â”€"*70 + "\n"
                plan += f"{responses_by_agent['ActivitiesExpert']}\n\n"
            
            plan += "="*70 + "\n"
            plan += " All recommendations are budget-friendly as requested!\n"
            plan += "="*70
            
            await ctx.yield_output(plan)
    
    # Create agents
    flight_agent, hotel_agent, activities_agent = await create_travel_agents()
    
    # Wrap agents in AgentExecutor
    flight_executor = AgentExecutor(flight_agent, id="FlightExpert")
    hotel_executor = AgentExecutor(hotel_agent, id="HotelExpert")
    activities_executor = AgentExecutor(activities_agent, id="ActivitiesExpert")
    
    # Create custom executors
    budget_validator = BudgetValidator()
    aggregator = TravelPlanAggregator()
    
    # Build workflow
    workflow = (
        WorkflowBuilder()
        .set_start_executor(budget_validator)
        .add_fan_out_edges(
            budget_validator,
            [flight_executor, hotel_executor, activities_executor]
        )
        .add_fan_in_edges(
            [flight_executor, hotel_executor, activities_executor],
            aggregator
        )
        .build()
    )
    
    request = "Plan a budget-friendly trip to Barcelona. Need cheap flights, affordable hotel, and free activities."
    print(f" USER REQUEST:")
    print(f"{'â”€'*70}")
    print(f"  {request}")
    print(f"{'â”€'*70}\n")
    
    print(" Executing workflow...\n")
    print(" EXECUTION FLOW:")
    print("="*70)
    
    # Track execution with clean output (no duplicates)
    validator_shown = False
    step2_shown = False
    aggregator_shown = False
    agents_completed = set()
    final_plan = None
    
    async for event in workflow.run_stream(request):
        if isinstance(event, ExecutorInvokedEvent):
            if event.executor_id == "budget_validator" and not validator_shown:
                print(" Step 1: Budget Validator")
                print("   â†’ Analyzing request for budget keywords...")
                validator_shown = True
            elif event.executor_id in ["FlightExpert", "HotelExpert", "ActivitiesExpert"]:
                if not step2_shown:
                    print("\n Step 2: Fan-out to Specialist Agents (Parallel)")
                    step2_shown = True
            elif event.executor_id == "travel_plan_aggregator" and not aggregator_shown:
                print("\n Step 3: Aggregating Results")
                print("   â†’ Collecting responses from all agents...")
                aggregator_shown = True
        
        elif isinstance(event, AgentRunEvent):
            if event.executor_id not in agents_completed:
                agents_completed.add(event.executor_id)
                print(f"   âœ“ {event.executor_id}")
        
        elif isinstance(event, WorkflowOutputEvent):
            final_plan = event.data
    
    # Display the final plan (only once)
    if final_plan:
        print(f"\n{final_plan}\n")
    else:
        print("\n No final plan generated\n")
    
    print("="*70)
    print(" WORKFLOW COMPLETE!")
    print("="*70)
    print("\n What Happened:")
    print("  1âƒ£  BudgetValidator detected budget keywords âœ“")
    print("  2âƒ£  Enhanced request with budget context âœ“")
    print("  3âƒ£  Fanned out to 3 agents IN PARALLEL âœ“")
    print("  4âƒ£  Each agent provided specialized advice âœ“")
    print("  5âƒ£  Aggregator collected and formatted ALL responses âœ“")
    print("\n Key Benefits of WorkflowBuilder:")
    print("   Custom business logic (budget validation)")
    print("   Parallel execution (fan-out pattern)")
    print("   Result aggregation (fan-in pattern)")
    print("   Full control over workflow structure")
    print("   Deterministic and debuggable execution")

await custom_workflow_demo()

CUSTOM WORKFLOW: Budget Validation + Parallel Agents + Aggregation

Workflow Graph:
  User â†’ BudgetValidator â†’ Fan-out â”¬â†’ FlightExpert
                                    â”œâ†’ HotelExpert    â†’ Aggregator
                                    â””â†’ ActivitiesExpert

 USER REQUEST:
â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€
  Plan a budget-friendly trip to Barcelona. Need cheap flights, affordable hotel, and free activities.
â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€

 Executing workflow...

 EXECUTION FLOW:
    BudgetValidator: Analyzing request...
    Budget considerations detected
 Step 1: Budget Validator
   â†’ Analyzing request for budget ke

## Step 5: AI-Powered Orchestration - MagenticBuilder

**MagenticBuilder** uses an AI orchestrator to dynamically create and manage workflows:

### How It Works

1. **You provide**: A set of available agents
2. **AI orchestrator**: 
   - Analyzes the user's request
   - Creates an execution plan
   - Selects which agents to call
   - Determines the order of execution
   - Adapts the plan based on results

### When to Use MagenticBuilder

 **Use when:**
- Task requirements are complex and unpredictable
- Optimal workflow isn't known ahead of time
- Need adaptive planning based on intermediate results
- Task involves open-ended problem-solving

 **Don't use when:**
- Workflow is well-defined and predictable
- Need guaranteed execution order
- Performance is critical (AI planning adds overhead)
- Simple sequential or parallel patterns suffice

### Architecture

```
User Request
     â†“
AI Orchestrator (LLM)
     â†“
Creates Dynamic Plan
     â†“
Executes Agents as Needed
     â†“
Adapts Plan Based on Results
```

In [23]:
async def magentic_workflow_demo():
    """
    Demonstrate MagenticBuilder: AI-powered dynamic orchestration.
    
    The AI orchestrator will:
    1. Analyze the request
    2. Create an execution plan
    3. Decide which agents to call and in what order
    4. Adapt based on intermediate results
    """
    print("="*70)
    print("MAGENTIC WORKFLOW: AI-Powered Dynamic Orchestration")
    print("="*70)
    print("\nðŸ§  How AI Orchestration Works:")
    print("  1. AI Orchestrator analyzes the request")
    print("  2. Creates dynamic execution plan")
    print("  3. Selects and coordinates agents intelligently")
    print("  4. Adapts plan based on intermediate results")
    print("  5. Decides WHEN and IN WHAT ORDER to call agents\n")
    
    # Create agents
    flight_agent, hotel_agent, activities_agent = await create_travel_agents()
    
    # Build magentic workflow - AI orchestrator coordinates everything
    workflow = (
        MagenticBuilder()
        .participants(
            flight_expert=flight_agent,
            hotel_expert=hotel_agent, 
            activities_expert=activities_agent
        )
        .with_standard_manager(
            chat_client=AzureAIAgentClient(async_credential=AzureCliCredential()),
            max_round_count=10,
            max_stall_count=3,
            max_reset_count=2
        )
        .build()
    )
    
    # The orchestrator will decide which agents to call and in what order
    request = (
        "I need to plan a romantic anniversary trip to Venice for 4 days. "
        "We want a nice hotel near San Marco, romantic restaurants, and gondola rides. "
        "What should we do?"
    )
    
    print(f" USER REQUEST:")
    print(f"{'â”€'*70}")
    print(f"  {request}")
    print(f"{'â”€'*70}\n")
    
    print(" Starting AI Orchestration...\n")
    print(" EXECUTION FLOW:")
    print("="*70)
    
    # Track what happens
    executor_invoked = []
    agent_runs = []
    final_output = None
    
    # Run workflow and track events
    async for event in workflow.run_stream(request):
        if isinstance(event, ExecutorInvokedEvent):
            executor_invoked.append(event.executor_id)
            # Show orchestrator activity
            if "orchestrator" in event.executor_id.lower():
                print("   ðŸ§  AI Orchestrator: Creating execution plan...")
            else:
                agent_name = event.executor_id.replace('agent_', '').replace('_expert', ' Expert')
                print(f"   ðŸ¤– Delegated to: {agent_name}")
        
        elif isinstance(event, AgentRunEvent):
            if event.data and event.executor_id:
                agent_runs.append(event.executor_id)
                agent_name = event.executor_id.replace('agent_', '').replace('_expert', ' Expert')
                print(f"   âœ“ {agent_name} completed")
        
        elif isinstance(event, WorkflowOutputEvent):
            final_output = event.data
    
    # Display what happened
    print("\n" + "="*70)
    print(" ORCHESTRATION SUMMARY")
    print("="*70 + "\n")
    
    print(f"ðŸ§  Orchestration Details:")
    print(f"   â€¢ Executors invoked: {len(executor_invoked)}")
    print(f"   â€¢ Agents delegated to: {len([e for e in executor_invoked if 'expert' in e.lower()])}")
    print(f"   â€¢ Orchestrator planning rounds: {len([e for e in executor_invoked if 'orchestrator' in e.lower()])}")
    
    agents_called = [e for e in executor_invoked if 'expert' in e.lower()]
    if agents_called:
        print(f"\nðŸ¤– Agents Called (in order):")
        for agent_id in agents_called:
            clean_name = agent_id.replace('agent_', '').replace('_expert', ' Expert')
            print(f"   â€¢ {clean_name}")
    
    # Display final result
    if final_output:
        print("\n" + "="*70)
        print(" FINAL ORCHESTRATED RESULT")
        print("="*70 + "\n")
        
        # Extract text from ChatMessage if that's what we got
        result_text = None
        if hasattr(final_output, 'text'):
            result_text = final_output.text
        elif isinstance(final_output, str):
            result_text = final_output
        elif isinstance(final_output, list):
            # It might be a list of messages
            texts = []
            for item in final_output:
                if hasattr(item, 'text'):
                    texts.append(item.text)
                elif isinstance(item, str):
                    texts.append(item)
            result_text = " ".join(texts) if texts else None
        
        if result_text:
            # Wrap text nicely
            words = result_text.split()
            line = ""
            for word in words:
                if len(line + word) > 66:
                    print(f"  {line}")
                    line = word + " "
                else:
                    line += word + " "
            if line:
                print(f"  {line.strip()}")
        else:
            print(f"  Type: {type(final_output)}")
            print(f"  Data: {final_output}")
        print()
    else:
        print("\n No final output captured\n")
    
    print("="*70)
    print(" MAGENTIC WORKFLOW COMPLETE!")
    print("="*70)
    print("\n What Just Happened:")
    print(f"  1âƒ£  AI Orchestrator analyzed the romantic Venice request")
    print(f"  2âƒ£  Dynamically selected {len(agents_called)} agent(s) to call")
    print(f"  3âƒ£  Created {len([e for e in executor_invoked if 'orchestrator' in e.lower()])} planning round(s)")
    print(f"  4âƒ£  Synthesized final travel plan")
    print(f"  5âƒ£  You didn't have to code ANY of the coordination logic!")
    print("\n Key Benefits of MagenticBuilder:")
    print("   AI creates optimal execution plan dynamically")
    print("   Adapts to request complexity automatically")
    print("   No hardcoded workflow logic needed")
    print("   Scales easily - just add more specialized agents")
    print("   Handles unpredictable, complex requests gracefully")

await magentic_workflow_demo()

MAGENTIC WORKFLOW: AI-Powered Dynamic Orchestration

ðŸ§  How AI Orchestration Works:
  1. AI Orchestrator analyzes the request
  2. Creates dynamic execution plan
  3. Selects and coordinates agents intelligently
  4. Adapts plan based on intermediate results
  5. Decides WHEN and IN WHAT ORDER to call agents

 USER REQUEST:
â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€
  I need to plan a romantic anniversary trip to Venice for 4 days. We want a nice hotel near San Marco, romantic restaurants, and gondola rides. What should we do?
â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€

 Starting AI Orchestration...

 EXECUTION FLOW:
   ðŸ§  AI Orchestrator: Creating exe

##  Advanced Pattern Comparison

### WorkflowBuilder vs MagenticBuilder

| Aspect | WorkflowBuilder | MagenticBuilder |
|--------|----------------|----------------|
| **Control** | Full manual control | AI-driven |
| **Predictability** | Deterministic | Adaptive |
| **Setup Complexity** | Higher (define graph) | Lower (name agents) |
| **Runtime Overhead** | Low | Higher (AI planning) |
| **Best For** | Known workflows | Unknown/complex tasks |
| **Debugging** | Easier (explicit graph) | Harder (AI decisions) |
| **Scalability** | Manual updates needed | Auto-adapts to new agents |

### Real-World Use Cases

**WorkflowBuilder Examples:**
```python
# Approval Workflow
workflow = (
    WorkflowBuilder()
    .set_start_executor(request_handler)
    .add_edge(request_handler, manager_approval)
    .add_switch_case_edge_group(
        manager_approval,
        [
            Case(lambda x: x.approved, finance_review),
            Default(rejection_handler)
        ]
    )
    .build()
)

# Multi-Stage Pipeline
workflow = (
    WorkflowBuilder()
    .add_chain([data_validator, processor, quality_check, publisher])
    .build()
)

# Fan-out/Fan-in Pattern
workflow = (
    WorkflowBuilder()
    .set_start_executor(splitter)
    .add_fan_out_edges(splitter, [worker1, worker2, worker3])
    .add_fan_in_edges([worker1, worker2, worker3], aggregator)
    .build()
)
```

**MagenticBuilder Examples:**
```python
# Research Assistant
workflow = (
    MagenticBuilder()
    .participants(
        researcher=web_researcher, 
        scholar=academic_searcher, 
        analyst=data_analyst, 
        writer=summarizer
    )
    .with_standard_manager(
        chat_client=AzureAIAgentClient(async_credential=AzureCliCredential()),
        max_round_count=10
    )
    .build()
)

# Customer Support
workflow = (
    MagenticBuilder()
    .participants(
        product=product_expert, 
        billing=billing_expert, 
        tech=tech_support, 
        escalation=escalation_handler
    )
    .with_standard_manager(
        chat_client=AzureAIAgentClient(async_credential=AzureCliCredential()),
        max_round_count=8
    )
    .build()
)
```

### Key Differences

**WorkflowBuilder:**
-  You define the entire execution graph
-  Predictable, testable, repeatable
-  Good for compliance, auditing
-  Requires upfront design
-  Rigid structure

**MagenticBuilder:**
-  AI orchestrator creates execution plan
-  Adapts to complex, varied requests
-  Easy to add new specialized agents
-  Less predictable
-  Harder to debug AI decisions

##  Comparing Both Workflows Side-by-Side

Let's run both workflows with the **same request** to see how they differ in execution and output.


In [25]:
async def compare_workflows():
    """
    Run both workflows with the SAME request to compare:
    - Execution patterns
    - Agent coordination
    - Output format
    - Performance characteristics
    """
    from agent_framework import (
        AgentExecutor,
        AgentExecutorResponse,
    )
    from typing import Never
    
    # Same request for both workflows
    test_request = "Plan a 3-day trip to Tokyo on a budget. Need flight tips, cheap hotels, and free activities."
    
    print("="*80)
    print("ðŸ”¬ WORKFLOW COMPARISON TEST")
    print("="*80)
    print(f"\n Test Request:")
    print(f"{'â”€'*80}")
    print(f"  {test_request}")
    print(f"{'â”€'*80}\n")
    
    # ============================================================================
    # WORKFLOW 1: WorkflowBuilder (Custom DAG)
    # ============================================================================
    print("\n" + "ðŸ”µ "*40)
    print("WORKFLOW 1: WorkflowBuilder (Custom DAG with Fan-Out/Fan-In)")
    print("ðŸ”µ "*40 + "\n")
    
    # Create aggregator for WorkflowBuilder
    class ComparisonAggregator(Executor):
        def __init__(self):
            super().__init__(id="comparison_aggregator")
        
        @handler
        async def aggregate(
            self, 
            results: list[AgentExecutorResponse], 
            ctx: WorkflowContext[Never, str]
        ) -> None:
            responses_by_agent = {}
            for result in results:
                agent_id = result.executor_id
                response_text = result.agent_run_response.text if result.agent_run_response else ""
                if response_text:
                    responses_by_agent[agent_id] = response_text
            
            plan = "\nðŸ“‹ WORKFLOWBUILDER OUTPUT:\n" + "â”€"*80 + "\n\n"
            
            if "FlightExpert" in responses_by_agent:
                plan += f"  {responses_by_agent['FlightExpert']}\n\n"
            if "HotelExpert" in responses_by_agent:
                plan += f" {responses_by_agent['HotelExpert']}\n\n"
            if "ActivitiesExpert" in responses_by_agent:
                plan += f"ðŸŽ­ {responses_by_agent['ActivitiesExpert']}\n\n"
            
            plan += "â”€"*80
            await ctx.yield_output(plan)
    
    # Create agents and executors
    flight_agent, hotel_agent, activities_agent = await create_travel_agents()
    flight_executor = AgentExecutor(flight_agent, id="FlightExpert")
    hotel_executor = AgentExecutor(hotel_agent, id="HotelExpert")
    activities_executor = AgentExecutor(activities_agent, id="ActivitiesExpert")
    budget_validator = BudgetValidator()
    aggregator = ComparisonAggregator()
    
    # Build workflow
    workflow1 = (
        WorkflowBuilder()
        .set_start_executor(budget_validator)
        .add_fan_out_edges(budget_validator, [flight_executor, hotel_executor, activities_executor])
        .add_fan_in_edges([flight_executor, hotel_executor, activities_executor], aggregator)
        .build()
    )
    
    # Track execution
    import time
    start_time = time.time()
    workflow1_output = None
    workflow1_agents_called = []
    workflow1_executors = []
    
    async for event in workflow1.run_stream(test_request):
        if isinstance(event, ExecutorInvokedEvent):
            workflow1_executors.append(event.executor_id)
            if event.executor_id == "budget_validator":
                print("   âš™  BudgetValidator: Analyzing request...")
            elif event.executor_id in ["FlightExpert", "HotelExpert", "ActivitiesExpert"]:
                if event.executor_id not in workflow1_agents_called:
                    workflow1_agents_called.append(event.executor_id)
                    print(f"   ðŸ¤– Executing: {event.executor_id}")
            elif event.executor_id == "comparison_aggregator":
                print("    Aggregating results...")
        
        elif isinstance(event, WorkflowOutputEvent):
            workflow1_output = event.data
    
    workflow1_duration = time.time() - start_time
    
    print(f"\n WorkflowBuilder completed in {workflow1_duration:.2f}s")
    print(f"   â€¢ Executors invoked: {len(workflow1_executors)}")
    print(f"   â€¢ Agents called: {len(workflow1_agents_called)} (parallel)")
    print(f"   â€¢ Execution pattern: Validator â†’ Fan-out â†’ Aggregator")
    
    # ============================================================================
    # WORKFLOW 2: MagenticBuilder (AI Orchestration)
    # ============================================================================
    print("\n" + "ðŸŸ¢ "*40)
    print("WORKFLOW 2: MagenticBuilder (AI-Powered Orchestration)")
    print("ðŸŸ¢ "*40 + "\n")
    
    # Create fresh agents for magentic workflow
    flight_agent2, hotel_agent2, activities_agent2 = await create_travel_agents()
    
    # Build magentic workflow
    workflow2 = (
        MagenticBuilder()
        .participants(
            flight_expert=flight_agent2,
            hotel_expert=hotel_agent2,
            activities_expert=activities_agent2
        )
        .with_standard_manager(
            chat_client=AzureAIAgentClient(async_credential=AzureCliCredential()),
            max_round_count=10,
            max_stall_count=3,
            max_reset_count=2
        )
        .build()
    )
    
    # Track execution
    start_time = time.time()
    workflow2_output = None
    workflow2_agents_called = []
    workflow2_executors = []
    orchestrator_rounds = 0
    
    async for event in workflow2.run_stream(test_request):
        if isinstance(event, ExecutorInvokedEvent):
            workflow2_executors.append(event.executor_id)
            if "orchestrator" in event.executor_id.lower():
                orchestrator_rounds += 1
                print(f"   ðŸ§  AI Orchestrator: Planning round {orchestrator_rounds}...")
            elif "expert" in event.executor_id.lower():
                if event.executor_id not in workflow2_agents_called:
                    workflow2_agents_called.append(event.executor_id)
                    agent_name = event.executor_id.replace('agent_', '').replace('_expert', ' Expert')
                    print(f"   ðŸ¤– AI delegated to: {agent_name}")
        
        elif isinstance(event, WorkflowOutputEvent):
            workflow2_output = event.data
    
    workflow2_duration = time.time() - start_time
    
    print(f"\n MagenticBuilder completed in {workflow2_duration:.2f}s")
    print(f"   â€¢ Executors invoked: {len(workflow2_executors)}")
    print(f"   â€¢ Orchestrator rounds: {orchestrator_rounds}")
    print(f"   â€¢ Agents called: {len(workflow2_agents_called)} (AI-selected)")
    print(f"   â€¢ Execution pattern: AI-determined dynamically")
    
    # ============================================================================
    # COMPARISON SUMMARY
    # ============================================================================
    print("\n" + "="*80)
    print(" COMPARISON SUMMARY")
    print("="*80 + "\n")
    
    # Execution Comparison Table
    print("  EXECUTION METRICS:")
    print("â”€"*80)
    print(f"{'Metric':<30} {'WorkflowBuilder':<25} {'MagenticBuilder':<25}")
    print("â”€"*80)
    print(f"{'Duration':<30} {f'{workflow1_duration:.2f}s':<25} {f'{workflow2_duration:.2f}s':<25}")
    print(f"{'Total Executors Invoked':<30} {len(workflow1_executors):<25} {len(workflow2_executors):<25}")
    print(f"{'Agents Called':<30} {len(workflow1_agents_called):<25} {len(workflow2_agents_called):<25}")
    print(f"{'Orchestration Rounds':<30} {'N/A (hardcoded)':<25} {orchestrator_rounds:<25}")
    print(f"{'Execution Pattern':<30} {'Deterministic':<25} {'AI-Adaptive':<25}")
    print("â”€"*80 + "\n")
    
    # Pattern Comparison
    print(" EXECUTION PATTERNS:")
    print("â”€"*80)
    print("WorkflowBuilder:")
    print("   1. BudgetValidator (custom logic)")
    print("   2. Fan-out to 3 agents IN PARALLEL")
    print("   3. Aggregator collects all responses")
    print("   4. Formatted output")
    print()
    print("MagenticBuilder:")
    print("   1. AI Orchestrator analyzes request")
    print("   2. AI creates execution plan")
    print("   3. AI delegates to agents as needed")
    print("   4. AI synthesizes final output")
    print("â”€"*80 + "\n")
    
    # Output Comparison
    print(" OUTPUT COMPARISON:")
    print("â”€"*80)
    
    if workflow1_output:
        print("\nðŸ”µ WORKFLOWBUILDER OUTPUT:")
        print(workflow1_output)
    else:
        print("\nðŸ”µ WORKFLOWBUILDER OUTPUT: (none captured)")
    
    print("\n" + "â”€"*80)
    
    if workflow2_output:
        print("\nðŸŸ¢ MAGENTICBUILDER OUTPUT:")
        # Extract text from ChatMessage
        result_text = None
        if hasattr(workflow2_output, 'text'):
            result_text = workflow2_output.text
        elif isinstance(workflow2_output, str):
            result_text = workflow2_output
        
        if result_text:
            print(result_text)
        else:
            print(f"(Type: {type(workflow2_output)})")
    else:
        print("\nðŸŸ¢ MAGENTICBUILDER OUTPUT: (none captured)")
    
    print("\n" + "="*80)
    
    # Key Differences
    print("\n KEY DIFFERENCES:")
    print("â”€"*80)
    print("WorkflowBuilder:")
    print("    You control exact execution flow")
    print("    Predictable, deterministic")
    print("    Custom business logic (BudgetValidator)")
    print("    Parallel execution (fan-out)")
    print("    Custom aggregation logic")
    print("    Requires manual workflow design")
    print()
    print("MagenticBuilder:")
    print("    AI decides execution flow")
    print("    Adaptive to request complexity")
    print("    Zero workflow coding needed")
    print("    Automatically synthesizes output")
    print("    Scales with new agents")
    print("    Less predictable behavior")
    print("    Higher overhead (AI planning)")
    print("â”€"*80)
    
    # When to Use Each
    print("\n WHEN TO USE EACH:")
    print("â”€"*80)
    print("Use WorkflowBuilder when:")
    print("   â€¢ You know the exact workflow needed")
    print("   â€¢ Need deterministic, repeatable execution")
    print("   â€¢ Have complex business logic (validation, routing)")
    print("   â€¢ Performance is critical")
    print("   â€¢ Need audit trail of exact steps")
    print()
    print("Use MagenticBuilder when:")
    print("   â€¢ Request complexity varies widely")
    print("   â€¢ Optimal workflow isn't known ahead of time")
    print("   â€¢ Want AI to adapt to user needs")
    print("   â€¢ Frequently adding new specialized agents")
    print("   â€¢ Flexibility > predictability")
    print("="*80)

await compare_workflows()


[2025-10-02 16:47:20 - /opt/homebrew/Cellar/python@3.12/3.12.11_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/asyncio/base_events.py:1833 - ERROR] Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x110a3b2c0>


ðŸ”¬ WORKFLOW COMPARISON TEST

 Test Request:
â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€
  Plan a 3-day trip to Tokyo on a budget. Need flight tips, cheap hotels, and free activities.
â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€


ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ 
WORKFLOW 1: WorkflowBuilder (Custom DAG with Fan-Out/Fan-In)
ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”µ ðŸ”

##  Key Takeaways

### What We Learned

1. **Custom Executors**
   - Inherit from `Executor` and call `super().__init__(id="...")`
   - Use `@handler` with `WorkflowContext` type annotation
   - Implement business logic before passing to agents
   - Examples: Validation, transformation, routing

2. **WorkflowBuilder**
   - Full control over workflow graph structure
   - Supports fan-out, fan-in, conditional routing
   - Best for well-defined, complex workflows
   - Deterministic and debuggable

3. **MagenticBuilder**
   - AI orchestrator manages execution dynamically
   - Best for unpredictable, adaptive workflows
   - Scales easily to many specialized agents
   - Higher overhead but more flexible

4. **Choosing the Right Pattern**
   - **Known workflow** â†’ WorkflowBuilder
   - **Unknown/adaptive** â†’ MagenticBuilder
   - **Simple sequential** â†’ SequentialBuilder (Tutorial 06)
   - **Simple parallel** â†’ ConcurrentBuilder (Tutorial 06)

### Production Patterns

```python
# Custom executor template
class MyExecutor(Executor):
    def __init__(self):
        super().__init__(id="my_executor")
    
    @handler
    async def process(self, input: str, ctx: WorkflowContext[str]):
        # Your business logic here
        result = do_something(input)
        await ctx.send_message(result)

# WorkflowBuilder with validation gate
workflow = (
    WorkflowBuilder()
    .set_start_executor(validator)
    .add_fan_out_edges(validator, [agent1, agent2, agent3])
    .build()
)

# MagenticBuilder for adaptive tasks
workflow = (
    MagenticBuilder()
    .agents([specialist1, specialist2, specialist3])
    .build()
)
```

##  Practice Exercises

### Exercise 1: Multi-Stage Approval Workflow

Create a workflow with conditional routing:
```
Request â†’ Validator â†’ (if valid) â†’ Manager Approval
                                  â”œâ”€â†’ (if approved) â†’ Finance
                                  â””â”€â†’ (if rejected) â†’ Rejection Handler
```

**Hint:** Use `add_switch_case_edge_group()` for conditional routing.

### Exercise 2: Data Processing Pipeline

Create a fan-out/fan-in pattern:
```
Data Splitter â†’ Worker 1 â†˜
             â†’ Worker 2  â†’ Aggregator â†’ Final Processor
             â†’ Worker 3 â†—
```

**Hint:** Use `add_fan_out_edges()` and `add_fan_in_edges()`.

### Exercise 3: Adaptive Research System

Create a magentic workflow with:
- Web Researcher
- Academic Searcher  
- Data Analyst
- Fact Checker
- Summarizer

Let the AI orchestrator decide how to coordinate them!

In [None]:
# Exercise playground - implement your solutions here!

async def approval_workflow_exercise():
    """
    Exercise 1: Build a multi-stage approval workflow.
    """
    # TODO: Implement approval workflow with conditional routing
    pass

async def pipeline_exercise():
    """
    Exercise 2: Build a fan-out/fan-in data processing pipeline.
    """
    # TODO: Implement data processing pipeline
    pass

async def research_system_exercise():
    """
    Exercise 3: Build an adaptive research system.
    """
    # TODO: Implement magentic research workflow
    pass

print(" Exercise templates ready - implement your solutions above!")

##  What's Next?

Congratulations! You've mastered advanced multi-agent workflow patterns!

You now know how to:
-  Build custom executors with business logic
-  Create complex workflow graphs with WorkflowBuilder
-  Implement fan-out/fan-in patterns
-  Use AI-powered orchestration with MagenticBuilder
-  Choose the right pattern for your use case

**In Tutorial 08: Human-in-the-Loop & Approvals**, you'll learn:
- Pause workflows for human approval
- Implement interactive approval gates
- Handle human feedback in agent systems
- Build checkpointed workflows that can be resumed

---

### Quick Reference

**Custom Executor:**
```python
class MyExecutor(Executor):
    def __init__(self):
        super().__init__(id="my_id")
    
    @handler
    async def process(self, msg: str, ctx: WorkflowContext[str]):
        result = transform(msg)
        await ctx.send_message(result)
```

**WorkflowBuilder:**
```python
workflow = (
    WorkflowBuilder()
    .set_start_executor(start)
    .add_fan_out_edges(start, [a, b, c])
    .add_edge(a, end)
    .build()
)
```

**MagenticBuilder:**
```python
workflow = (
    MagenticBuilder()
    .agents([agent1, agent2, agent3])
    .build()
)
```

**Run Workflows:**
```python
# Streaming
async for event in workflow.run_stream(input):
    if isinstance(event, AgentRunEvent):
        print(event.data)

# Non-streaming
result = await workflow.run(input)
outputs = result.get_outputs()
```