# Microsoft Agent Framework

![Microsoft Agent Framework](images/maf.png)

The **Microsoft Agent Framework** is a Python SDK for building AI agents and multi-agent workflows. It provides a unified interface for creating intelligent systems that can reason, take actions, and collaborate.

**Key Capabilities:**
- Single and multi-agent orchestration
- Tool integration and function calling
- Memory and context management
- Workflow patterns (sequential, parallel, branching)
- Built-in observability and middleware

---

## What is an Agent?

![What is an Agent](images/what-is-agent.png)

Unlike traditional LLM deployments that simply respond to prompts, agents follow the **ReAct pattern** (Reasoning + Acting):

| Traditional LLM | Agent (ReAct) |
|-----------------|---------------|
| Input ‚Üí Output | Input ‚Üí Reason ‚Üí Act ‚Üí Observe ‚Üí Repeat |
| Single response | Multi-step execution |
| No tool access | Tool integration |
| Stateless | Memory & context |

Agents autonomously decide *what* to do, *which* tools to use, and *when* to stop.

---

## Workflows & Multi-Agent Orchestration

![Workflow Example](images/workflow-example.png)

Complex tasks require coordination between multiple specialized agents. The Agent Framework provides workflow primitives:

- **Sequential** ‚Äî Agents execute in order (A ‚Üí B ‚Üí C)
- **Parallel (Fan-out/Fan-in)** ‚Äî Concurrent execution with result aggregation
- **Branching** ‚Äî Conditional routing based on outputs
- **Group Chat** ‚Äî Collaborative multi-agent discussions

---

## Demo Overview

We'll build a **Support Email Copilot** that demonstrates core framework concepts:

| Section | Concept |
|---------|---------|
| 1-2 | Agent basics & streaming |
| 3-4 | Conversations & function tools |
| 5-7 | Approvals, middleware, memory |
| 8-10 | Workflows: sequential, branching, parallel |
| 11-12 | Multi-agent collaboration & capstone |

---

## Prerequisites

- Azure subscription with Azure OpenAI access
- Azure OpenAI resource with deployed model (e.g., `gpt-4o-mini`)
- Azure CLI installed and authenticated (`az login`)
- Python 3.10+

# 0. Environment Setup

## Create Virtual Environment

Run the following in your terminal to set up the environment:

```bash
python3.10 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
```

Or run the cell below to install dependencies directly.

In [1]:
# Create and configure the virtual environment (run once)
import subprocess
import shutil

def find_python():
    """Find a Python 3.10+ interpreter on the system."""
    # Check common Python commands in order of preference
    candidates = [
        "python3.13", "python3.12", "python3.11", "python3.10",
        "python3", "python"
    ]
    
    for cmd in candidates:
        path = shutil.which(cmd)
        if path:
            # Verify version is 3.10+
            try:
                result = subprocess.run(
                    [path, "-c", "import sys; print(f'{sys.version_info.major}.{sys.version_info.minor}')"],
                    capture_output=True, text=True
                )
                version = result.stdout.strip()
                major, minor = map(int, version.split('.'))
                if major >= 3 and minor >= 10:
                    return path, version
            except:
                continue
    
    raise RuntimeError("No Python 3.10+ found. Please install Python 3.10 or higher.")

# Find suitable Python
python_path, python_version = find_python()
print(f"‚úÖ Found Python {python_version}: {python_path}")

# Create .venv
subprocess.run([python_path, "-m", "venv", ".venv"])

# Install requirements with pre-release flag
subprocess.run([".venv/bin/pip", "install", "-r", "requirements.txt", "--pre"])

print("\n‚úÖ Virtual environment created at .venv")
print("   Activate with: source .venv/bin/activate")

‚úÖ Found Python 3.12: /Users/glswht/Desktop/magentic-workflow/SDK-Magentic-Workflow/.venv/bin/python3.12

‚úÖ Virtual environment created at .venv
   Activate with: source .venv/bin/activate



[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.1.1[0m[39;49m -> [0m[32;49m26.0[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


## Initialize Chat Client

Load environment variables and create the Azure OpenAI client.

In [2]:
import nest_asyncio
nest_asyncio.apply()

import asyncio
from dotenv import load_dotenv
from azure.identity import AzureCliCredential
from agent_framework.azure import AzureOpenAIChatClient

# Load environment variables
load_dotenv()

# Create ONE chat client - reused throughout the notebook
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())

print("‚úÖ Environment loaded and chat_client created")

‚úÖ Environment loaded and chat_client created


## Data Models

Pydantic models for structured input/output throughout the demo.

In [3]:
from typing import Literal, Annotated
from pydantic import BaseModel, Field

# === Input Model ===
class EmailInput(BaseModel):
    """Incoming support email."""
    sender: str = Field(description="Email sender address")
    subject: str = Field(description="Email subject line")
    body: str = Field(description="Email body content")
    customer_id: str | None = Field(default=None, description="Customer ID if known")
    ticket_id: str | None = Field(default=None, description="Related ticket ID if any")

# === Classification Model ===
class ClassificationResult(BaseModel):
    """Result of email classification."""
    category: Literal["spam", "not_spam", "uncertain"] = Field(description="Email category")
    confidence: float = Field(ge=0.0, le=1.0, description="Confidence score 0-1")
    reason: str = Field(description="Brief explanation of classification")

# === Draft Response Model ===
class DraftResponse(BaseModel):
    """Draft reply to customer email."""
    subject: str = Field(description="Reply subject line")
    body: str = Field(description="Reply body")
    tone: Literal["formal", "friendly", "apologetic"] = Field(description="Tone used")
    needs_review: bool = Field(default=False, description="Flag if needs human review")

# === Final Response Model ===
class FinalResponse(BaseModel):
    """Final approved response."""
    classification: ClassificationResult
    draft: DraftResponse | None = Field(default=None, description="Draft if not spam")
    review_notes: str | None = Field(default=None, description="Reviewer comments")
    approved: bool = Field(default=False, description="Whether approved to send")

print("‚úÖ Shared models defined: EmailInput, ClassificationResult, DraftResponse, FinalResponse")

‚úÖ Shared models defined: EmailInput, ClassificationResult, DraftResponse, FinalResponse


## Sample Data

Test emails used throughout the demo.

In [4]:
# === LEGITIMATE EMAIL ===
LEGIT_EMAIL = EmailInput(
    sender="sarah.chen@acmecorp.com",
    subject="Order #12345 - Delivery Issue",
    body="""Hi Support Team,

I placed order #12345 last week and the tracking shows it was delivered, 
but I never received the package. I've checked with my neighbors and the building 
concierge, but no one has seen it.

This is urgent as the items were needed for a client presentation on Friday.
Can you please help me locate the package or arrange a replacement?

Thank you,
Sarah Chen
Account: ACME-7891
""",
    customer_id="CUST-7891",
    ticket_id="TKT-2024-001"
)

# === SPAM EMAIL ===
SPAM_EMAIL = EmailInput(
    sender="winner@prize-notifications.biz",
    subject="üéâ CONGRATULATIONS! You've WON $1,000,000!!!",
    body="""URGENT NOTIFICATION!!!

You have been selected as the WINNER of our international lottery!
To claim your $1,000,000 prize, simply send your bank details and 
a processing fee of $500 to unlock your winnings.

ACT NOW - This offer expires in 24 HOURS!!!

Click here to claim: http://totally-legit-prize.com/claim
""",
    customer_id=None,
    ticket_id=None
)

# === AMBIGUOUS EMAIL ===
AMBIGUOUS_EMAIL = EmailInput(
    sender="j.smith@unknown-domain.net",
    subject="Partnership Opportunity",
    body="""Hello,

I found your company online and I'm interested in discussing a potential 
business partnership. We have a new product line that might complement your services.

Can we schedule a call this week?

Best,
J. Smith
""",
    customer_id=None,
    ticket_id=None
)

print("‚úÖ Sample emails defined: LEGIT_EMAIL, SPAM_EMAIL, AMBIGUOUS_EMAIL")

‚úÖ Sample emails defined: LEGIT_EMAIL, SPAM_EMAIL, AMBIGUOUS_EMAIL


# 1. Basic Agent

![Agent Components](images/agent-components.png)

Create a support agent using `chat_client.as_agent()` with instructions.

In [5]:
# Create the core Support Agent - we'll enhance this throughout the notebook
support_agent = chat_client.as_agent(
    name="SupportAgent",
    instructions="""You are a helpful customer support agent for an e-commerce company.
Your job is to:
1. Understand customer issues from their emails
2. Draft professional, empathetic responses
3. Provide clear next steps when possible

Always be polite, acknowledge the customer's frustration, and offer concrete solutions."""
)

print("‚úÖ support_agent created")

‚úÖ support_agent created


## Run the Agent

Execute the agent with `agent.run()`. Returns an `AgentResponse` with `.text` output.

In [6]:
# Run the support agent on our legitimate email
async def run_basic_agent():
    prompt = f"""Please draft a response to this customer email:

From: {LEGIT_EMAIL.sender}
Subject: {LEGIT_EMAIL.subject}

{LEGIT_EMAIL.body}
"""
    result = await support_agent.run(prompt)
    print("üìß Draft Response:\n")
    print(result.text)

asyncio.run(run_basic_agent())

üìß Draft Response:

Subject: Re: Order #12345 - Delivery Issue

Dear Sarah,

Thank you for reaching out to us about your delivery issue. I understand how frustrating it must be to see that your order was marked as delivered but not receive it, especially with a client presentation coming up on Friday. 

I apologize for the inconvenience this has caused you. To assist in resolving this matter, I will start a trace with our carrier to locate your package. Additionally, I recommend checking with the shipping carrier‚Äôs customer service as they might have more specific information about the delivery.

In parallel, I can arrange for a replacement shipment to ensure you have your items in time for your presentation. Please confirm if you would like me to proceed with this option, and I will prioritize it to ensure timely delivery.

Thank you for your patience while we work on this. I will keep you updated as soon as I receive any information.

Best regards,

[Your Name]  
Customer Support

# 2. Streaming Responses

Stream responses token-by-token using `agent.run_stream()` for real-time output.

In [7]:
### Stream the response token by token using the SAME support_agent
async def stream_support_response():
    prompt = f"""Please draft a response to this customer email:

From: {LEGIT_EMAIL.sender}
Subject: {LEGIT_EMAIL.subject}

{LEGIT_EMAIL.body}
"""
    print("üìß Streaming Draft Response:\n")
    async for update in support_agent.run_stream(prompt):
        if update.text:
            print(update.text, end="", flush=True)
    print()  # New line after streaming

asyncio.run(stream_support_response())

üìß Streaming Draft Response:

Subject: Re: Order #12345 - Delivery Issue

Dear Sarah,

Thank you for reaching out to us regarding your order #12345. I sincerely apologize for the inconvenience you are experiencing with your delivery. I understand how important it is to have your items for your upcoming client presentation, and I'm here to assist you with this issue.

To help us locate your package, I will initiate an investigation with our shipping carrier to gather more information about the delivery. This process typically takes 24-48 hours. In the meantime, I recommend checking any areas where the package might have been left, such as different entrances or reception areas, if applicable.

If the investigation confirms that the package cannot be located, we can arrange for a replacement to be sent out to you as soon as possible. I will keep you updated on the progress and will prioritize this matter so that you can receive your items in time for your presentation.

Thank you for y

# 3. Multi-Turn Conversations

![Threads and Memory](images/threads-and-memory.png)

Agents are stateless by default. Use **Threads** to maintain conversation context across turns.

## Using Threads

Create a thread with `agent.get_new_thread()` and pass it to each call.

In [8]:
# Create a thread for multi-turn conversation
thread = support_agent.get_new_thread()

# Turn 1: Summarize the customer issue
print("Turn 1: Summarize the issue")
print("-" * 50)
result1 = await support_agent.run(
    f"Summarize the key issues in this email in 2-3 bullet points:\n\n{LEGIT_EMAIL.body}", 
    thread=thread
)
print(result1.text)
print()

# Turn 2: Draft a response (agent remembers the summary from Turn 1)
print("Turn 2: Draft response with professional tone")
print("-" * 50)
result2 = await support_agent.run(
    "Now draft a professional response addressing each of those issues. Use a formal but empathetic tone.",
    thread=thread
)
print(result2.text)

Turn 1: Summarize the issue
--------------------------------------------------
- The tracking for order #12345 indicates it was delivered, but Sarah has not received the package.
- She has already checked with her neighbors and the building concierge with no luck finding the package.
- Sarah urgently needs assistance to locate the package or arrange for a replacement due to an upcoming client presentation on Friday.

Turn 2: Draft response with professional tone
--------------------------------------------------
Subject: Assistance with Your Order #12345

Dear Sarah,

Thank you for reaching out to us regarding your recent order. I sincerely apologize for the inconvenience and frustration you are experiencing with the delivery of your package. I understand how important it is, especially with your client presentation approaching.

Regarding the tracking information indicating delivery, it can sometimes lead to misunderstandings. I appreciate you taking the time to check with your neighb

# 4. Function Tools

Extend agent capabilities by registering Python functions as tools.

## Define Tools

Use the `@tool` decorator to expose functions to the agent.

In [9]:
from agent_framework import tool
# Simulated database of customer SLAs
CUSTOMER_SLAS = {
    "CUST-7891": {"tier": "Premium", "response_time": "4 hours", "replacement_policy": "Free expedited replacement"},
    "CUST-1234": {"tier": "Standard", "response_time": "24 hours", "replacement_policy": "Standard replacement"},
}

# Simulated ticket database
TICKET_STATUSES = {
    "TKT-2024-001": {"status": "Open", "priority": "High", "assigned_to": "Support Team", "last_update": "2024-01-15"},
    "TKT-2024-002": {"status": "Resolved", "priority": "Low", "assigned_to": "Bot", "last_update": "2024-01-10"},
}

@tool(name="lookup_customer_sla", description="Look up a customer's SLA tier and policies")
def lookup_customer_sla(
    customer_id: Annotated[str, Field(description="The customer ID to look up (e.g., CUST-7891)")]
) -> str:
    """Look up customer SLA information."""
    if customer_id in CUSTOMER_SLAS:
        sla = CUSTOMER_SLAS[customer_id]
        return f"Customer {customer_id}: {sla['tier']} tier, {sla['response_time']} response time, {sla['replacement_policy']}"
    return f"Customer {customer_id} not found in system."

@tool(name="get_incident_status", description="Get the current status of a support ticket")
def get_incident_status(
    ticket_id: Annotated[str, Field(description="The ticket ID to check (e.g., TKT-2024-001)")]
) -> str:
    """Get ticket status information."""
    if ticket_id in TICKET_STATUSES:
        ticket = TICKET_STATUSES[ticket_id]
        return f"Ticket {ticket_id}: Status={ticket['status']}, Priority={ticket['priority']}, Assigned to={ticket['assigned_to']}, Last update={ticket['last_update']}"
    return f"Ticket {ticket_id} not found in system."

print("‚úÖ Support tools defined: lookup_customer_sla, get_incident_status")

‚úÖ Support tools defined: lookup_customer_sla, get_incident_status


## Attach Tools to Agent

Pass tools when creating the agent.

In [10]:
# Create support agent with tools
support_agent_with_tools = chat_client.as_agent(
    name="SupportAgentWithTools",
    instructions="""You are a customer support agent with access to internal systems.
When handling emails:
1. Look up the customer's SLA tier to understand their service level
2. Check ticket status if a ticket ID is mentioned
3. Use this information to provide appropriate responses and set expectations

Always be empathetic and use the customer's SLA tier to guide your response (e.g., Premium customers get expedited service).""",
    tools=[lookup_customer_sla, get_incident_status]
)

print("‚úÖ support_agent_with_tools created")

‚úÖ support_agent_with_tools created


## Execute with Tools

The agent autonomously decides when to invoke tools.

In [11]:
# Test with the legitimate email that has customer_id and ticket_id
prompt = f"""Handle this customer support email. Look up their SLA and ticket status first:

From: {LEGIT_EMAIL.sender}
Subject: {LEGIT_EMAIL.subject}
Customer ID: {LEGIT_EMAIL.customer_id}
Ticket ID: {LEGIT_EMAIL.ticket_id}

{LEGIT_EMAIL.body}
"""

result = await support_agent_with_tools.run(prompt)
print("üìß Response (with tool lookups):\n")
print(result.text)

üìß Response (with tool lookups):

Subject: RE: Order #12345 - Delivery Issue

Hi Sarah,

Thank you for reaching out, and I understand how urgent this situation is, especially with your client presentation on Friday. 

I see that you are a Premium customer, which means you have a 4-hour response time for your inquiries and are eligible for a free expedited replacement. 

I‚Äôve checked the status of your ticket (TKT-2024-001), and it is currently open with high priority. Our support team is already aware of your issue, and I will ensure that we expedite the process to locate your package or arrange for a replacement.

I will follow up with you shortly with updates. In the meantime, if you have any other questions or need further assistance, please feel free to let me know.

Thank you for your patience!

Best,
[Your Name]  
Customer Support Team


# 5. Human-in-the-Loop Approval

Require human confirmation before executing sensitive actions.

## Approval-Required Tool

Set `approval_mode="always_require"` on sensitive tools.

In [12]:
from agent_framework import ChatMessage, Content, Role

# Tool that requires human approval before sending
@tool(approval_mode="always_require", name="send_email_reply", description="Send an email reply to the customer. Requires human approval.")
def send_email_reply(
    to: Annotated[str, Field(description="Recipient email address")],
    subject: Annotated[str, Field(description="Email subject")],
    body: Annotated[str, Field(description="Email body content")]
) -> str:
    """Send an email reply to the customer. Requires human approval."""
    # In production, this would actually send the email
    return f"‚úÖ Email sent to {to} with subject '{subject}'"

# Create agent with the approval-required tool
approval_agent = chat_client.as_agent(
    name="ApprovalSupportAgent",
    instructions="""You are a customer support agent. After drafting a response, 
use the send_email_reply tool to send it. This will require human approval.""",
    tools=[lookup_customer_sla, get_incident_status, send_email_reply]
)

print("‚úÖ approval_agent created with send_email_reply tool")

‚úÖ approval_agent created with send_email_reply tool


## Check for Pending Approvals

Approval-required calls return `user_input_requests` instead of executing.

In [13]:
# Ask the agent to handle and send a response
prompt = f"""Handle this email and IMMEDIATELY use the send_email_reply tool to send a response. 
Do not ask for permission - just use the tool directly.

From: {LEGIT_EMAIL.sender}
Subject: {LEGIT_EMAIL.subject}
Customer ID: {LEGIT_EMAIL.customer_id}

{LEGIT_EMAIL.body}
"""

result = await approval_agent.run(prompt)

# Check if approval is needed
if result.user_input_requests:
    print("üîí APPROVAL REQUIRED!")
    for user_input_needed in result.user_input_requests:
        print(f"  Function: {user_input_needed.function_call.name}")
        print(f"  Arguments: {user_input_needed.function_call.arguments}")
else:
    print("‚ö†Ô∏è No approval requested - agent didn't call the tool")
    print(result.text)

üîí APPROVAL REQUIRED!
  Function: send_email_reply
  Arguments: {"to":"sarah.chen@acmecorp.com","subject":"Re: Order #12345 - Delivery Issue","body":"Hi Sarah,\n\nThank you for reaching out regarding your order #12345. I understand how important it is to have your items for the client presentation on Friday.\n\nSince the tracking shows that the package has been marked as delivered, I recommend checking again with your neighbors and concierge. However, given the urgency of your situation, we can expedite a replacement for you at no additional charge.\n\nPlease confirm if you'd like me to proceed with arranging a replacement, and I'll get that sorted out right away.\n\nThank you for your patience, and I look forward to hearing back from you soon!\n\nBest regards,\n[Your Name]\nCustomer Support Team"}


## Grant Approval

Respond with `to_function_approval_response(True/False)`.

In [14]:
print("\n--- Handling Approval ---\n")

# Provide approval and continue the conversation
if result.user_input_requests:
    user_input_needed = result.user_input_requests[0]
    
    # Simulate human approval (in production, this would be interactive)
    user_approval = True
    print(f"‚úÖ Human approved: {user_approval}\n")
    
    # Create approval response message
    approval_message = ChatMessage(
        role=Role.USER,
        contents=[user_input_needed.to_function_approval_response(user_approval)]
    )
    
    # Continue with approval
    final_result = await approval_agent.run([
        prompt,
        ChatMessage(role=Role.ASSISTANT, contents=[user_input_needed]),
        approval_message
    ])
    print(f"üìä Final Result:\n{final_result.text}")
else:
    print("‚ùå No approval was requested in the previous cell.")
    print("   The agent needs to call the send_email_reply tool to trigger approval.")
    print("   Re-run the previous cell to try again.")


--- Handling Approval ---

‚úÖ Human approved: True

üìä Final Result:
I have successfully sent the response to Sarah Chen regarding her delivery issue for order #12345. If you need further assistance, feel free to ask!


# 6. Middleware

Intercept agent execution for logging, metrics, and observability.

## Define Middleware

Middleware wraps execution with `context` and `next` function.

In [15]:
from typing import Callable, Awaitable
from agent_framework import AgentRunContext, FunctionInvocationContext
import time

async def logging_agent_middleware(
    context: AgentRunContext,
    next: Callable[[AgentRunContext], Awaitable[None]],
) -> None:
    """Log agent execution with timing."""
    print(f"üöÄ Agent starting... ({len(context.messages)} message(s))")
    start_time = time.time()
    
    await next(context)  # Continue to agent execution
    
    elapsed = time.time() - start_time
    print(f"‚úÖ Agent finished in {elapsed:.2f}s")

async def logging_function_middleware(
    context: FunctionInvocationContext,
    next: Callable[[FunctionInvocationContext], Awaitable[None]],
) -> None:
    """Log function tool calls."""
    print(f"  üìû Calling: {context.function.name}({context.arguments})")
    
    await next(context)
    
    print(f"  üì§ Result: {context.result[:100]}..." if len(str(context.result)) > 100 else f"  üì§ Result: {context.result}")

print("‚úÖ Middleware defined: logging_agent_middleware, logging_function_middleware")

‚úÖ Middleware defined: logging_agent_middleware, logging_function_middleware


## Attach Middleware

Pass middleware list when creating the agent.

In [16]:
# Create agent with middleware for logging
middleware_agent = chat_client.as_agent(
    name="LoggingSupportAgent",
    instructions="You are a support agent. Look up customer information when handling requests.",
    tools=[lookup_customer_sla, get_incident_status],
    middleware=[logging_agent_middleware, logging_function_middleware]
)

# Test - you'll see logs for agent and function calls
prompt = f"Check the SLA for customer {LEGIT_EMAIL.customer_id} and ticket status for {LEGIT_EMAIL.ticket_id}"
result = await middleware_agent.run(prompt)
print(f"\nüí¨ Response: {result.text}")

üöÄ Agent starting... (1 message(s))
  üìû Calling: lookup_customer_sla(customer_id='CUST-7891')
  üì§ Result: Customer CUST-7891: Premium tier, 4 hours response time, Free expedited replacement
  üìû Calling: get_incident_status(ticket_id='TKT-2024-001')
  üì§ Result: Ticket TKT-2024-001: Status=Open, Priority=High, Assigned to=Support Team, Last update=2024-01-15
‚úÖ Agent finished in 3.17s

üí¨ Response: Here are the details you requested:

**SLA for Customer CUST-7891:**
- Tier: Premium
- Response Time: 4 hours
- Policy: Free expedited replacement

**Status of Ticket TKT-2024-001:**
- Status: Open
- Priority: High
- Assigned to: Support Team
- Last Update: January 15, 2024

If you have any further questions or need assistance, feel free to ask!


# 7. Agent Memory

Persist context across calls using a `ContextProvider`.

## Preferences Model

Define what to remember.

In [17]:
class SupportPreferences(BaseModel):
    """User preferences for support interactions."""
    name: str | None = None
    preferred_language: Literal["English", "Hebrew", "Spanish"] = "English"
    preferred_tone: Literal["formal", "friendly", "brief"] = "formal"

print("‚úÖ SupportPreferences model defined")

‚úÖ SupportPreferences model defined


## Implement ContextProvider

Two methods: `invoking` (inject context before calls) and `invoked` (extract state after calls).

In [18]:
from collections.abc import MutableSequence, Sequence
from typing import Any

from agent_framework import ContextProvider, Context, ChatAgent, ChatOptions


class SupportMemory(ContextProvider):
    """Memory that tracks user preferences for support interactions."""
    
    def __init__(self, chat_client, preferences: SupportPreferences | None = None, **kwargs: Any):
        """Create the memory.
        
        Args:
            chat_client: The chat client to use for extracting structured data
            preferences: Optional initial preferences
            **kwargs: Additional keyword arguments for deserialization
        """
        self._chat_client = chat_client
        if preferences:
            self.preferences = preferences
        elif kwargs:
            self.preferences = SupportPreferences.model_validate(kwargs)
        else:
            self.preferences = SupportPreferences()
    
    async def invoked(
        self,
        request_messages: ChatMessage | Sequence[ChatMessage],
        response_messages: ChatMessage | Sequence[ChatMessage] | None = None,
        invoke_exception: Exception | None = None,
        **kwargs: Any,
    ) -> None:
        """Extract preferences from user messages after each call."""
        # Ensure request_messages is a list
        messages_list = [request_messages] if isinstance(request_messages, ChatMessage) else list(request_messages)
        
        # Check if we have user messages
        user_messages = [msg for msg in messages_list if msg.role.value == "user"]
        
        if user_messages:
            try:
                # Use the chat client to extract structured information
                # NOTE: Use `options=` not `chat_options=`
                result = await self._chat_client.get_response(
                    messages=messages_list,
                    options=ChatOptions(
                        instructions=(
                            "Extract the user's name, preferred tone (formal/friendly/brief), "
                            "and preferred language (English/Hebrew/Spanish) from the messages if present. "
                            "If not present, return None for that field."
                        ),
                        response_format=SupportPreferences,
                    ),
                )
                
                # result.value should now be a SupportPreferences instance
                extracted = result.value
                
                # Update preferences with extracted data
                if extracted and isinstance(extracted, SupportPreferences):
                    if self.preferences.name is None and extracted.name:
                        self.preferences.name = extracted.name
                        print(f"   üß† Memory updated: name = {extracted.name}")
                    
                    if extracted.preferred_tone != "formal":  # formal is default
                        self.preferences.preferred_tone = extracted.preferred_tone
                        print(f"   üß† Memory updated: tone = {extracted.preferred_tone}")
                    
                    if extracted.preferred_language != "English":  # English is default
                        self.preferences.preferred_language = extracted.preferred_language
                        print(f"   üß† Memory updated: language = {extracted.preferred_language}")
                        
            except Exception as e:
                print(f"   ‚ö†Ô∏è Failed to extract preferences: {e}")
    
    async def invoking(self, messages: ChatMessage | MutableSequence[ChatMessage], **kwargs: Any) -> Context:
        """Provide preference context before each agent call."""
        instructions: list[str] = []
        
        if self.preferences.name:
            instructions.append(f"The user's name is {self.preferences.name}. Address them by name.")
        
        instructions.append(f"Respond in {self.preferences.preferred_language}.")
        instructions.append(f"Use a {self.preferences.preferred_tone} tone.")
        
        return Context(instructions=" ".join(instructions))
    
    def serialize(self) -> str:
        """Serialize for persistence."""
        return self.preferences.model_dump_json()

print("‚úÖ SupportMemory ContextProvider defined")

‚úÖ SupportMemory ContextProvider defined


## Test Memory

The agent automatically extracts and applies preferences across turns.

In [19]:
# Create the memory provider using the existing chat_client
support_memory = SupportMemory(chat_client)

# Create the agent with memory
memory_agent = ChatAgent(
    name="MemorySupportAgent",
    instructions="You are a friendly support agent. Adapt your responses based on user preferences.",
    chat_client=chat_client,
    context_provider=support_memory,
)

# Turn 1: User introduces themselves
print("Turn 1: User introduction")
print("-" * 50)
result1 = await memory_agent.run("Hi, my name is David")
print(f"Agent: {result1.text}\n")

# Turn 2: User sets preference
print("Turn 2: Setting preference")
print("-" * 50)
result2 = await memory_agent.run("Please keep responses brief and casual")
print(f"Agent: {result2.text}\n")

# Turn 3: Ask a question - memory should apply name and brief tone
print("Turn 3: Question with preferences applied")
print("-" * 50)
result3 = await memory_agent.run("What's your return policy?")
print(f"Agent: {result3.text}\n")

# Check memory state - access the original support_memory object directly
print("üß† Memory State (tracked by ContextProvider):")
print(f"   Name: {support_memory.preferences.name}")
print(f"   Language: {support_memory.preferences.preferred_language}")
print(f"   Tone: {support_memory.preferences.preferred_tone}")

Turn 1: User introduction
--------------------------------------------------
   üß† Memory updated: name = David
   üß† Memory updated: tone = friendly
Agent: Hello, David! How may I assist you today?

Turn 2: Setting preference
--------------------------------------------------
   üß† Memory updated: tone = brief
Agent: Got it, David! I'll keep it short and casual for you. What do you need help with?

Turn 3: Question with preferences applied
--------------------------------------------------
   üß† Memory updated: tone = brief
Agent: Hi David! Our return policy typically allows for returns within 30 days of purchase, provided the item is unused and in its original packaging. However, I recommend checking the specific details on our website or your receipt, as policies can vary by product. Let me know if you need more help!

üß† Memory State (tracked by ContextProvider):
   Name: David
   Language: English
   Tone: brief


# 8. Sequential Workflows

![Sequential Workflow](images/sequential-workflow.png)

Chain multiple agents/executors in sequence: Classify ‚Üí Draft ‚Üí Review.

**When to Use:**
- Tasks with clear, ordered steps (e.g., parse ‚Üí validate ‚Üí transform)
- When each step's output is the next step's input
- Processing pipelines where order matters

**When NOT to Use:**
- Steps can run independently (use Concurrent instead)
- Dynamic routing needed (use Branching instead)

## Core Concepts

| Concept | Description |
|---------|-------------|
| **Executor** | Unit of work (`@executor` or class with `@handler`) |
| **WorkflowBuilder** | Connects executors with `add_edge()` |
| `ctx.send_message()` | Pass data to next executor |
| `ctx.yield_output()` | Return final result |

## Define Executors

Create agent executors for classification, writing, and review.

In [20]:
from typing_extensions import Never
from agent_framework import (
    WorkflowBuilder, WorkflowContext, WorkflowOutputEvent,
    Executor, executor, handler, AgentExecutor, AgentExecutorRequest, AgentExecutorResponse
)

# === CLASSIFIER AGENT ===
classifier_agent = AgentExecutor(
    chat_client.as_agent(
        name="Classifier",
        instructions="""Classify incoming emails. Return JSON with:
- category: "spam", "not_spam", or "uncertain"
- confidence: float 0-1
- reason: brief explanation""",
        response_format=ClassificationResult,
    ),
    id="classifier",
)

# === DRAFT WRITER AGENT ===
writer_agent = AgentExecutor(
    chat_client.as_agent(
        name="DraftWriter",
        instructions="""Draft professional support responses. Return JSON with:
- subject: reply subject line
- body: reply body
- tone: "formal", "friendly", or "apologetic"
- needs_review: true if sensitive or complex""",
        response_format=DraftResponse,
    ),
    id="writer",
)

# === REVIEWER AGENT ===
reviewer_agent = AgentExecutor(
    chat_client.as_agent(
        name="Reviewer",
        instructions="""Review draft responses for quality. Check:
- Professionalism and tone
- Accuracy of information
- Completeness
Return approval decision with notes.""",
    ),
    id="reviewer",
)

print("‚úÖ Workflow agents defined: classifier, writer, reviewer")

‚úÖ Workflow agents defined: classifier, writer, reviewer


## Build & Run

Connect executors with `add_edge()` and execute.

In [21]:
# Build sequential workflow
sequential_support_workflow = (
    WorkflowBuilder()
    .set_start_executor(classifier_agent)
    .add_edge(classifier_agent, writer_agent)
    .add_edge(writer_agent, reviewer_agent)
    .build()
)

# Run with legitimate email
async def run_sequential_workflow():
    email_prompt = f"""Process this support email:

From: {LEGIT_EMAIL.sender}
Subject: {LEGIT_EMAIL.subject}
Customer ID: {LEGIT_EMAIL.customer_id}

{LEGIT_EMAIL.body}
"""
    
    print("üìß Processing email through workflow: Classify ‚Üí Draft ‚Üí Review\n")
    print("-" * 60)
    
    request = AgentExecutorRequest(
        messages=[ChatMessage(Role.USER, text=email_prompt)],
        should_respond=True
    )
    
    from agent_framework._workflows._events import ExecutorCompletedEvent
    
    async for event in sequential_support_workflow.run_stream(request):
        if isinstance(event, ExecutorCompletedEvent) and event.data:
            data = event.data[0] if isinstance(event.data, list) else event.data
            if hasattr(data, 'agent_response'):
                print(f"\n‚úÖ [{event.executor_id}]:")
                print(f"   {data.agent_response.text[:300]}...")
        elif isinstance(event, WorkflowOutputEvent):
            print(f"\nüéØ FINAL OUTPUT:")
            if isinstance(event.data, list) and event.data:
                final = event.data[0]
                if hasattr(final, 'agent_response'):
                    print(final.agent_response.text)

await run_sequential_workflow()

üìß Processing email through workflow: Classify ‚Üí Draft ‚Üí Review

------------------------------------------------------------

‚úÖ [classifier]:
   {
  "category": "not_spam",
  "confidence": 0.95,
  "reason": "The email is a legitimate support request regarding a delivery issue from a customer, containing specific details relevant to an order."
}...

‚úÖ [writer]:
   {
  "subject": "Assistance with Your Order #12345 - Delivery Issue",
  "body": "Dear Ms. Chen,\n\nThank you for reaching out to us regarding the delivery issue with your recent order #12345. I understand the urgency of the situation, especially with your client presentation approaching on Friday.\n\...

‚úÖ [reviewer]:
   Approval Decision: **Approved**

Notes:
- The response maintains a professional and empathetic tone, recognizing the urgency of the customer's situation while reassuring them that their issue is being addressed.
- It accurately acknowledges the details provided by the customer regarding the order n

# 9. Branching Logic

Route execution based on conditions: Spam ‚Üí Block, NotSpam ‚Üí Draft, Uncertain ‚Üí Review.

**When to Use:**
- Different paths based on classification or conditions
- Error handling with fallback routes
- Multi-way routing (switch-case patterns)

**When NOT to Use:**
- All items follow the same path (use Sequential)
- Need parallel execution of branches (use Fan-Out)

## Routing Patterns

| Pattern | Use Case |
|---------|----------|
| **Conditional Edge** | Binary if/else |
| **Switch-Case** | Multi-way routing |
| **Multi-Selection** | Dynamic fan-out |

## Define Branch Handlers

Create handlers for each classification outcome.

In [22]:
from dataclasses import dataclass
from uuid import uuid4
from agent_framework import Case, Default

# Internal payload for routing
@dataclass
class ClassifiedEmail:
    email_id: str
    category: str  # spam, not_spam, uncertain
    confidence: float
    reason: str
    original_content: str

# Shared state keys
EMAIL_KEY = "current_email"

# Helper to extract JSON from markdown code blocks
def extract_json(text: str) -> str:
    """Extract JSON from text, stripping markdown code blocks if present."""
    import re
    match = re.search(r'```(?:json)?\s*([\s\S]*?)```', text)
    if match:
        return match.group(1).strip()
    return text.strip()

# Transform classification result to routable payload
@executor(id="extract_classification")
async def extract_classification(response: Any, ctx: WorkflowContext[ClassifiedEmail]) -> None:
    """Extract classification from agent response for routing."""
    if isinstance(response, list):
        response = response[0]
    
    # Extract JSON (handles markdown code blocks)
    json_text = extract_json(response.agent_response.text)
    classification = ClassificationResult.model_validate_json(json_text)
    
    # Get original email from shared state
    original_content = await ctx.get_shared_state(EMAIL_KEY) or "Unknown"
    
    payload = ClassifiedEmail(
        email_id=str(uuid4()),
        category=classification.category,
        confidence=classification.confidence,
        reason=classification.reason,
        original_content=original_content
    )
    await ctx.send_message(payload)

# Route conditions
def is_spam(message: Any) -> bool:
    return isinstance(message, ClassifiedEmail) and message.category == "spam"

def is_not_spam(message: Any) -> bool:
    return isinstance(message, ClassifiedEmail) and message.category == "not_spam"

def is_uncertain(message: Any) -> bool:
    return isinstance(message, ClassifiedEmail) and message.category == "uncertain"

# Terminal handlers
@executor(id="handle_spam")
async def handle_spam_terminal(email: ClassifiedEmail, ctx: WorkflowContext[Never, str]) -> None:
    """Handle spam: block and log."""
    await ctx.yield_output(f"üö´ SPAM BLOCKED: {email.reason} (confidence: {email.confidence:.0%})")

@executor(id="handle_not_spam")
async def handle_not_spam_continue(email: ClassifiedEmail, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
    """Handle not_spam: forward to writer."""
    await ctx.send_message(AgentExecutorRequest(
        messages=[ChatMessage(Role.USER, text=f"Draft a response to: {email.original_content}")],
        should_respond=True
    ))

@executor(id="finalize_draft")
async def finalize_draft(response: Any, ctx: WorkflowContext[Never, str]) -> None:
    """Output the final draft."""
    if isinstance(response, list):
        response = response[0]
    # Extract JSON (handles markdown code blocks)
    json_text = extract_json(response.agent_response.text)
    draft = DraftResponse.model_validate_json(json_text)
    await ctx.yield_output(f"‚úâÔ∏è DRAFT READY:\nSubject: {draft.subject}\n\n{draft.body}")

@executor(id="handle_uncertain")
async def handle_uncertain_terminal(email: ClassifiedEmail, ctx: WorkflowContext[Never, str]) -> None:
    """Handle uncertain: flag for human review."""
    await ctx.yield_output(f"‚ö†Ô∏è NEEDS HUMAN REVIEW: {email.reason} (confidence: {email.confidence:.0%})\n\nOriginal: {email.original_content[:200]}...")

print("‚úÖ Branching executors defined")

‚úÖ Branching executors defined


## Build Switch-Case Workflow

Route based on classification result.

In [23]:
# Store email and start classification
@executor(id="start_classification")
async def start_classification(email_text: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
    """Store email and send for classification."""
    await ctx.set_shared_state(EMAIL_KEY, email_text)
    await ctx.send_message(AgentExecutorRequest(
        messages=[ChatMessage(Role.USER, text=f"Classify this email:\n\n{email_text}")],
        should_respond=True
    ))

# Build branching workflow
branching_workflow = (
    WorkflowBuilder()
    .set_start_executor(start_classification)
    .add_edge(start_classification, classifier_agent)
    .add_edge(classifier_agent, extract_classification)
    # Switch-case routing
    .add_switch_case_edge_group(
        extract_classification,
        [
            Case(condition=is_spam, target=handle_spam_terminal),
            Case(condition=is_not_spam, target=handle_not_spam_continue),
            Default(target=handle_uncertain_terminal),  # Catches uncertain + unexpected
        ],
    )
    # Continue not_spam path to draft
    .add_edge(handle_not_spam_continue, writer_agent)
    .add_edge(writer_agent, finalize_draft)
    .build()
)

print("‚úÖ Branching workflow built")

‚úÖ Branching workflow built


## Test Branching

Run all three email types through the workflow.

In [24]:
# Test all three paths
async def test_branching():
    test_cases = [
        ("LEGITIMATE", LEGIT_EMAIL),
        ("SPAM", SPAM_EMAIL),
        ("AMBIGUOUS", AMBIGUOUS_EMAIL),
    ]
    
    for label, email in test_cases:
        print(f"\nüìß Testing {label} email...")
        print("-" * 50)
        
        email_text = f"From: {email.sender}\nSubject: {email.subject}\n\n{email.body}"
        
        async for event in branching_workflow.run_stream(email_text):
            if isinstance(event, WorkflowOutputEvent):
                print(event.data)

await test_branching()


üìß Testing LEGITIMATE email...
--------------------------------------------------
‚úâÔ∏è DRAFT READY:
Subject: Re: Order #12345 - Delivery Issue

Dear Ms. Chen,

Thank you for contacting us regarding the issue with your order #12345. I sincerely apologize for the inconvenience this has caused, especially with your client presentation approaching on Friday.

We are currently investigating the delivery status of your package. I appreciate your efforts in checking with your neighbors and the concierge. In the meantime, I will expedite this matter and ensure we provide you with a solution as quickly as possible, whether it is locating your package or arranging a replacement if necessary.

Please rest assured that we are on it, and I will keep you updated throughout the process.

Thank you for your understanding and patience.

Best regards,

[Your Name]  
Customer Support Team  
ACME Corporation

üìß Testing SPAM email...
--------------------------------------------------
üö´ SPAM BLOC

# 10. Fan-Out / Fan-In

![Concurrent Workflow](images/concurrent-workflow.png)

Process multiple paths in parallel and aggregate results.

**When to Use:**
- Independent tasks that can run concurrently
- Aggregating results from multiple sources
- Performance optimization through parallelization

**When NOT to Use:**
- Tasks have dependencies on each other
- Order of execution matters

## Define Parallel Paths

For long emails: respond AND summarize concurrently.

In [25]:
# Summary model
class EmailSummary(BaseModel):
    """Concise email summary."""
    key_points: list[str] = Field(description="Main points from the email")
    urgency: Literal["low", "medium", "high"] = Field(description="Urgency level")
    action_required: str = Field(description="Primary action needed")

# Summarizer agent
summarizer_agent = AgentExecutor(
    chat_client.as_agent(
        name="Summarizer",
        instructions="""Summarize emails concisely. Return JSON with:
- key_points: list of main points
- urgency: low/medium/high
- action_required: primary action needed""",
        response_format=EmailSummary,
    ),
    id="summarizer",
)

# Threshold for "long" emails
LONG_EMAIL_THRESHOLD = 200  # characters

@dataclass
class EnrichedEmail:
    """Email with metadata for routing."""
    email_id: str
    content: str
    is_long: bool
    category: str

# Selection function for multi-selection routing
def select_parallel_paths(email: EnrichedEmail, target_ids: list[str]) -> list[str]:
    """Select paths based on email length."""
    # target_ids order: [respond_path, summarize_path]
    respond_id, summarize_id = target_ids
    
    if email.is_long:
        return [respond_id, summarize_id]  # Both paths in parallel
    else:
        return [respond_id]  # Only respond for short emails

# Executors for parallel paths
@executor(id="prepare_parallel")
async def prepare_parallel(classified: ClassifiedEmail, ctx: WorkflowContext[EnrichedEmail]) -> None:
    """Prepare email for parallel processing."""
    enriched = EnrichedEmail(
        email_id=classified.email_id,
        content=classified.original_content,
        is_long=len(classified.original_content) > LONG_EMAIL_THRESHOLD,
        category=classified.category
    )
    await ctx.send_message(enriched)

@executor(id="respond_path")
async def respond_path(email: EnrichedEmail, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
    """Send to writer for response."""
    await ctx.send_message(AgentExecutorRequest(
        messages=[ChatMessage(Role.USER, text=f"Draft a response to:\n{email.content}")],
        should_respond=True
    ))

@executor(id="summarize_path")
async def summarize_path(email: EnrichedEmail, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
    """Send to summarizer."""
    await ctx.send_message(AgentExecutorRequest(
        messages=[ChatMessage(Role.USER, text=f"Summarize this email:\n{email.content}")],
        should_respond=True
    ))

# Aggregator to combine parallel results
class ParallelAggregator(Executor):
    def __init__(self):
        super().__init__(id="parallel_aggregator")
    
    @handler
    async def aggregate(self, results: list[Any], ctx: WorkflowContext[Never, str]) -> None:
        """Combine response and summary."""
        output_parts = []
        
        for result in results:
            if isinstance(result, AgentExecutorResponse):
                try:
                    draft = DraftResponse.model_validate_json(result.agent_response.text)
                    output_parts.append(f"üìß DRAFT RESPONSE:\nSubject: {draft.subject}\n{draft.body}")
                except:
                    try:
                        summary = EmailSummary.model_validate_json(result.agent_response.text)
                        points = "\n".join(f"  ‚Ä¢ {p}" for p in summary.key_points)
                        output_parts.append(f"üìã SUMMARY:\n{points}\nUrgency: {summary.urgency}\nAction: {summary.action_required}")
                    except:
                        output_parts.append(f"Result: {result.agent_response.text[:200]}...")
        
        await ctx.yield_output("\n\n" + "="*40 + "\n\n".join(output_parts))

aggregator = ParallelAggregator()

print("‚úÖ Parallel processing executors defined")

‚úÖ Parallel processing executors defined


## Build Fan-Out/Fan-In Workflow

Short emails ‚Üí respond only. Long emails ‚Üí respond + summarize in parallel.

In [26]:
from agent_framework import WorkflowBuilder
from agent_framework._workflows._events import ExecutorCompletedEvent
from datetime import datetime

# Constants
LONG_EMAIL_THRESHOLD = 200  # Characters

# Start executor - entry point stores email and passes it forward
@executor(id="fanout_start")
async def fanout_start(email_text: str, ctx: WorkflowContext[str]) -> None:
    """Entry point: store email length, forward email text."""
    # Store email length in shared state for selection
    await ctx.set_shared_state("email_length", len(email_text))
    # Store workflow start time
    await ctx.set_shared_state("workflow_start_time", time.time())
    await ctx.send_message(email_text)

# Selection function that uses shared state
def fanout_select_paths(email_text: str, target_ids: list[str]) -> list[str]:
    """Select paths based on email length (stored in text)."""
    # The email_text is still the raw string at this point
    if len(email_text) > LONG_EMAIL_THRESHOLD:
        return target_ids  # Both paths for long emails
    return [target_ids[0]]  # Only response path for short emails

# Response path preparer with timing
@executor(id="fanout_respond_prep")
async def fanout_respond_prep(email_text: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
    """Prepare email for writer agent."""
    workflow_start = await ctx.get_shared_state("workflow_start_time")
    start_time = time.time()
    elapsed = start_time - workflow_start
    print(f"   ‚è±Ô∏è  [+{elapsed:.2f}s] üìù RESPONSE PATH started")
    
    await ctx.set_shared_state("response_start_time", start_time)
    await ctx.send_message(AgentExecutorRequest(
        messages=[ChatMessage(Role.USER, text=f"Draft a response to:\n{email_text}")],
        should_respond=True
    ))

# Summary path preparer with timing
@executor(id="fanout_summarize_prep")
async def fanout_summarize_prep(email_text: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
    """Prepare email for summarizer agent."""
    workflow_start = await ctx.get_shared_state("workflow_start_time")
    start_time = time.time()
    elapsed = start_time - workflow_start
    print(f"   ‚è±Ô∏è  [+{elapsed:.2f}s] üìã SUMMARY PATH started")
    
    await ctx.set_shared_state("summary_start_time", start_time)
    await ctx.send_message(AgentExecutorRequest(
        messages=[ChatMessage(Role.USER, text=f"Summarize this email:\n{email_text}")],
        should_respond=True
    ))

# Capture completion time immediately after writer finishes
@executor(id="capture_writer_completion")
async def capture_writer_completion(result: Any, ctx: WorkflowContext[Any]) -> None:
    """Capture writer completion time."""
    workflow_start = await ctx.get_shared_state("workflow_start_time")
    response_start = await ctx.get_shared_state("response_start_time")
    end_time = time.time()
    
    elapsed_from_start = end_time - workflow_start
    duration = end_time - response_start
    print(f"   ‚è±Ô∏è  [+{elapsed_from_start:.2f}s] ‚úÖ RESPONSE PATH completed ({duration:.2f}s)")
    
    await ctx.set_shared_state("response_end_time", end_time)
    await ctx.send_message(result)

# Capture completion time immediately after summarizer finishes
@executor(id="capture_summarizer_completion")
async def capture_summarizer_completion(result: Any, ctx: WorkflowContext[Any]) -> None:
    """Capture summarizer completion time."""
    workflow_start = await ctx.get_shared_state("workflow_start_time")
    summary_start = await ctx.get_shared_state("summary_start_time")
    end_time = time.time()
    
    elapsed_from_start = end_time - workflow_start
    duration = end_time - summary_start
    print(f"   ‚è±Ô∏è  [+{elapsed_from_start:.2f}s] ‚úÖ SUMMARY PATH completed ({duration:.2f}s)")
    
    await ctx.set_shared_state("summary_end_time", end_time)
    await ctx.send_message(result)

# Aggregator - combines results from parallel paths with timing
@executor(id="fanout_aggregator")
async def fanout_aggregator(results: list[Any], ctx: WorkflowContext[Never, str]) -> None:
    """Combine response and summary results with timing information."""
    response_start = await ctx.get_shared_state("response_start_time")
    summary_start = await ctx.get_shared_state("summary_start_time")
    response_end = await ctx.get_shared_state("response_end_time")
    summary_end = await ctx.get_shared_state("summary_end_time")
    
    output_parts = []
    response_time = None
    summary_time = None
    
    # Calculate durations from stored times
    if response_start and response_end:
        response_time = response_end - response_start
    if summary_start and summary_end:
        summary_time = summary_end - summary_start
    
    for result in results:
        if isinstance(result, AgentExecutorResponse):
            try:
                draft = DraftResponse.model_validate_json(extract_json(result.agent_response.text))
                output_parts.append(
                    f"üì¨ RESPONSE (completed in {response_time:.2f}s):\n"
                    f"Subject: {draft.subject}\n{draft.body}"
                )
            except:
                try:
                    summary = EmailSummary.model_validate_json(extract_json(result.agent_response.text))
                    points = "\n".join(f"  ‚Ä¢ {p}" for p in summary.key_points)
                    output_parts.append(
                        f"üìã SUMMARY (completed in {summary_time:.2f}s):\n"
                        f"{points}\n"
                        f"Urgency: {summary.urgency}\n"
                        f"Action: {summary.action_required}"
                    )
                except:
                    output_parts.append(f"Result: {result.agent_response.text[:200]}...")
    
    # Calculate overlap to show parallelization
    if response_time and summary_time:
        total_sequential = response_time + summary_time
        total_parallel = max(response_time, summary_time)
        time_saved = total_sequential - total_parallel
        output_parts.append(
            f"\n‚ö° PARALLEL EXECUTION BENEFIT:\n"
            f"   Sequential time: {total_sequential:.2f}s\n"
            f"   Parallel time: {total_parallel:.2f}s\n"
            f"   Time saved: {time_saved:.2f}s ({time_saved/total_sequential*100:.1f}%)"
        )
    
    await ctx.yield_output("\n\n" + "="*50 + "\n\n".join(output_parts))

# Build the fan-out workflow
# Pattern: start -> [fanout to preparers] -> [agents] -> [capture timing] -> aggregator
fanout_workflow = (
    WorkflowBuilder()
    .set_start_executor(fanout_start)
    # Fan-out from start directly to path preparers based on email length
    .add_multi_selection_edge_group(
        fanout_start,
        targets=[fanout_respond_prep, fanout_summarize_prep],
        selection_func=fanout_select_paths,
    )
    # Each preparer sends to its agent
    .add_edge(fanout_respond_prep, writer_agent)
    .add_edge(fanout_summarize_prep, summarizer_agent)
    # Capture completion times immediately after each agent
    .add_edge(writer_agent, capture_writer_completion)
    .add_edge(summarizer_agent, capture_summarizer_completion)
    # Fan-in: collect all results
    .add_fan_in_edges([capture_writer_completion, capture_summarizer_completion], fanout_aggregator)
    .build()
)

print("‚úÖ Fan-out/fan-in workflow built")

‚úÖ Fan-out/fan-in workflow built


## Test Parallel Execution

Long emails trigger both response and summary paths concurrently.

In [27]:
# Test with long legitimate email
async def test_fanout():
    email_text = f"From: {LEGIT_EMAIL.sender}\nSubject: {LEGIT_EMAIL.subject}\n\n{LEGIT_EMAIL.body}"
    
    print(f"üìß Testing LONG email ({len(email_text)} chars > {LONG_EMAIL_THRESHOLD} threshold)")
    print("Expected: Response AND Summary in parallel\n")
    print("-" * 60)
    
    async for event in fanout_workflow.run_stream(email_text):
        if isinstance(event, WorkflowOutputEvent):
            print(event.data)

await test_fanout()

üìß Testing LONG email (468 chars > 200 threshold)
Expected: Response AND Summary in parallel

------------------------------------------------------------
   ‚è±Ô∏è  [+0.00s] üìù RESPONSE PATH started
   ‚è±Ô∏è  [+0.00s] üìã SUMMARY PATH started
   ‚è±Ô∏è  [+3.82s] ‚úÖ SUMMARY PATH completed (3.82s)
   ‚è±Ô∏è  [+3.83s] ‚úÖ RESPONSE PATH completed (3.83s)


Subject: Re: Order #12345 - Delivery Issue
Dear Ms. Chen,

Thank you for reaching out regarding your order #12345. I understand how important this order is for your upcoming client presentation, and I apologize for the inconvenience you've experienced with the delivery.

I will investigate this matter immediately. Please allow me some time to check with our shipping department and track down the package. In the event we cannot locate it, I will ensure we arrange a replacement for you as soon as possible so that you have your items in time for your presentation on Friday.

I appreciate your patience and understanding during this p

# 11. Group Chat Orchestration

![Group Chat Pattern](images/group-chat.png)

Multiple agents collaborate in a shared conversation, coordinated by an orchestrator.

**When to Use:**
- Iterative refinement with multiple review rounds
- Collaborative problem-solving with shared context
- Multi-perspective analysis (e.g., writer-reviewer workflows)

**When NOT to Use:**
- Agents should work independently (use Concurrent)
- Complex dynamic planning needed (use Magentic)

## Key Differences

| Pattern | Coordination | Use Case |
|---------|--------------|----------|
| **Concurrent** | No coordination | Independent parallel tasks |
| **Group Chat** | Orchestrator selects speakers | Iterative refinement, shared context |
| **Magentic** | Manager with dynamic planning | Complex open-ended tasks |

## Define Specialists

Create agents with distinct review roles. All agents will see the shared conversation.

In [34]:
from agent_framework import GroupChatBuilder, GroupChatState, ConcurrentBuilder, MagenticBuilder

# Three specialized reviewers - order matters! Last one produces final output.

# 1st: Security reviewer - identifies security/compliance issues
security_reviewer = ChatAgent(
    name="SecurityReviewer",
    description="Security and compliance specialist - reviews first",
    instructions="""You are the FIRST reviewer. Analyze the support response for:
- Data exposure risks (customer IDs, case numbers that shouldn't be in emails)
- PII handling concerns (names, order details)
- Policy compliance issues

Be concise. List only the security issues you find. Do NOT rewrite the email - just identify problems for later reviewers to address.""",
    chat_client=chat_client,
)

# 2nd: Accuracy reviewer - checks facts and promises
accuracy_reviewer = ChatAgent(
    name="AccuracyReviewer", 
    description="Factual accuracy specialist - reviews second",
    instructions="""You are the SECOND reviewer. Analyze the support response for:
- Unrealistic promises or timelines
- Unverifiable claims
- Compensation appropriateness

Consider the security feedback from the previous reviewer. Be concise. List only the accuracy issues. Do NOT rewrite the email - just identify problems for the final reviewer to address.""",
    chat_client=chat_client,
)

# 3rd: Tone reviewer - applies all feedback and produces final email
tone_reviewer = ChatAgent(
    name="ToneReviewer",
    description="Tone specialist and final editor - produces revised email",
    instructions="""You are the FINAL reviewer. Your job is to:
1. Consider ALL feedback from SecurityReviewer and AccuracyReviewer
2. Review the tone and empathy of the original email
3. **PRODUCE A FINAL REVISED EMAIL** that:
   - Addresses security concerns (remove/mask sensitive identifiers if needed)
   - Fixes accuracy issues (realistic timelines, appropriate promises)
   - Maintains professional, empathetic tone
   - Is ready to send to the customer

End your response with the complete revised email in a clear format.""",
    chat_client=chat_client,
)

print("‚úÖ Three specialist reviewers defined:")
print("   1. SecurityReviewer - identifies security issues")
print("   2. AccuracyReviewer - checks facts and promises")  
print("   3. ToneReviewer - applies all feedback and produces FINAL email")

‚úÖ Three specialist reviewers defined:
   1. SecurityReviewer - identifies security issues
   2. AccuracyReviewer - checks facts and promises
   3. ToneReviewer - applies all feedback and produces FINAL email


## Build Group Chat with Round-Robin

Simple selection: each reviewer speaks in turn.

In [35]:
# Sample draft response to review
draft_to_review = """
Subject: Re: Order #12345 - Delivery Issue

Dear Sarah,

I'm so sorry to hear about the missing package! This must be incredibly frustrating.

I've located your order and can confirm it was marked as delivered on Monday. Here's what I'll do:

1. I've opened an investigation with our shipping partner (Case #INV-789)
2. As a Premium customer, I'm expediting a replacement shipment TODAY
3. The replacement will arrive by Thursday, well before your Friday presentation

Your account has also been credited $50 for the inconvenience.

If you need anything else, reply directly to this email - I'm here to help!

Best regards,
Support Team
"""

# Round-robin selector: each reviewer speaks in order
def round_robin_selector(state: GroupChatState) -> str:
    """Pick the next speaker based on round index."""
    participants = list(state.participants.keys())
    return participants[state.current_round % len(participants)]

# Build group chat with round-robin selection
# ORDER MATTERS: Security ‚Üí Accuracy ‚Üí Tone (final editor)
review_group_chat = (
    GroupChatBuilder()
    .with_orchestrator(selection_func=round_robin_selector, orchestrator_name="RoundRobinOrchestrator")
    .participants([security_reviewer, accuracy_reviewer, tone_reviewer])  # Order: Security ‚Üí Accuracy ‚Üí Tone
    .with_termination_condition(lambda msgs: len([m for m in msgs if m.role.value == "assistant"]) >= 3)
    .build()
)

print("‚úÖ Group chat built with round-robin selection")
print("   Order: SecurityReviewer ‚Üí AccuracyReviewer ‚Üí ToneReviewer (final)")

‚úÖ Group chat built with round-robin selection
   Order: SecurityReviewer ‚Üí AccuracyReviewer ‚Üí ToneReviewer (final)


## Test Round-Robin Group Chat

Each reviewer analyzes the draft in turn, building on previous insights.

In [36]:
# Run the group chat with round-robin selection
from agent_framework._workflows._events import AgentRunUpdateEvent

async def test_round_robin_group_chat():
    print("üìù DRAFT TO REVIEW:")
    print(draft_to_review)
    print("-" * 60)
    print("\nüîÑ ROUND-ROBIN GROUP CHAT (each reviewer speaks in turn):\n")
    
    last_executor_id: str | None = None
    agent_order = []
    
    async for event in review_group_chat.run_stream(f"Review this support response:\n{draft_to_review}"):
        if isinstance(event, AgentRunUpdateEvent):
            eid = event.executor_id
            if eid != last_executor_id:
                if last_executor_id is not None:
                    print("\n")
                agent_order.append(eid)
                print(f"\nü§ñ [{eid}] (Turn #{len(agent_order)}):", end=" ", flush=True)
                last_executor_id = eid
            print(event.data, end="", flush=True)
        
        elif isinstance(event, WorkflowOutputEvent):
            print("\n\n" + "=" * 60)
            print(f"üìä EXECUTION ORDER: {' ‚Üí '.join(agent_order)}")
            print("=" * 60)

await test_round_robin_group_chat()

üìù DRAFT TO REVIEW:

Subject: Re: Order #12345 - Delivery Issue

Dear Sarah,

I'm so sorry to hear about the missing package! This must be incredibly frustrating.

I've located your order and can confirm it was marked as delivered on Monday. Here's what I'll do:

1. I've opened an investigation with our shipping partner (Case #INV-789)
2. As a Premium customer, I'm expediting a replacement shipment TODAY
3. The replacement will arrive by Thursday, well before your Friday presentation

Your account has also been credited $50 for the inconvenience.

If you need anything else, reply directly to this email - I'm here to help!

Best regards,
Support Team

------------------------------------------------------------

üîÑ ROUND-ROBIN GROUP CHAT (each reviewer speaks in turn):


ü§ñ [SecurityReviewer] (Turn #1): 1. **Data Exposure Risks:** 
   - The inclusion of the order number (#12345) and case number (INV-789) in the email could potentially expose sensitive information if this email is 

In [39]:
# Agent-based orchestrator for intelligent speaker selection
from typing import cast
from agent_framework._workflows._events import AgentRunUpdateEvent, WorkflowOutputEvent
from agent_framework._types import ChatMessage

orchestrator_agent = ChatAgent(
    name="ReviewOrchestrator",
    description="Coordinates multi-agent review process",
    instructions=f"""You coordinate a team reviewing this support response:

{draft_to_review}

YOUR TEAM:
- SecurityReviewer: Identifies security/PII issues (reviews first)
- AccuracyReviewer: Checks facts and promises (reviews second)
- ToneReviewer: Final editor who produces the revised email (reviews last)

YOUR PROCESS:
1. Start with SecurityReviewer to check data safety and PII
2. Then AccuracyReviewer to verify claims and timelines
3. **Finally, ToneReviewer to produce the FINAL REVISED EMAIL** incorporating all feedback
4. If needed, you may ask follow-up questions to any reviewer
5. End when ToneReviewer delivers the complete revised email

Select speakers intelligently. CRITICAL: ToneReviewer must go last and produce the final email.""",
    chat_client=chat_client,
)

# Build group chat with agent-based orchestration
# ORDER: Security ‚Üí Accuracy ‚Üí Tone (final editor)
intelligent_review_chat = (
    GroupChatBuilder()
    .with_orchestrator(agent=orchestrator_agent)
    .participants([security_reviewer, accuracy_reviewer, tone_reviewer])
    .with_termination_condition(lambda msgs: len([m for m in msgs if m.role.value == "assistant"]) >= 5)
    .build()
)

# Run with detailed logging
async def test_agent_orchestrated_group_chat():
    print("üìù DRAFT TO REVIEW:")
    print(draft_to_review)
    print("-" * 60)
    print("\nüß† AGENT-ORCHESTRATED GROUP CHAT (intelligent speaker selection):\n")
    
    last_executor_id: str | None = None
    agent_calls: dict[str, int] = {}
    
    async for event in intelligent_review_chat.run_stream("Review this support response. Security and Accuracy reviewers identify issues, then ToneReviewer produces the final revised email."):
        if isinstance(event, AgentRunUpdateEvent):
            eid = event.executor_id
            if eid != last_executor_id:
                if last_executor_id is not None:
                    print("\n")
                agent_calls[eid] = agent_calls.get(eid, 0) + 1
                print(f"\nü§ñ [{eid}] (Call #{agent_calls[eid]}):", end=" ", flush=True)
                last_executor_id = eid
            print(event.data, end="", flush=True)
        
        elif isinstance(event, WorkflowOutputEvent):
            output_messages = cast(list[ChatMessage], event.data)
            
            print("\n\n" + "=" * 60)
            print("üìä EXECUTION SUMMARY")
            print("=" * 60)
            print(f"   Total calls: {sum(agent_calls.values())}")
            print("\n   Calls per agent:")
            for agent, count in sorted(agent_calls.items()):
                print(f"      {agent}: {count} call(s)")
            
            print("\n   üí° The orchestrator dynamically selected speakers")
            print("      based on what was needed at each step")
            
            print("\n" + "=" * 60)
            print("üìß FINAL REVISED EMAIL (from ToneReviewer)")
            print("=" * 60)
            for msg in reversed(output_messages):
                if msg.role.value == "assistant" and "ToneReviewer" in str(msg):
                    print(msg.text)
                    break

await test_agent_orchestrated_group_chat()

üìù DRAFT TO REVIEW:

Subject: Re: Order #12345 - Delivery Issue

Dear Sarah,

I'm so sorry to hear about the missing package! This must be incredibly frustrating.

I've located your order and can confirm it was marked as delivered on Monday. Here's what I'll do:

1. I've opened an investigation with our shipping partner (Case #INV-789)
2. As a Premium customer, I'm expediting a replacement shipment TODAY
3. The replacement will arrive by Thursday, well before your Friday presentation

Your account has also been credited $50 for the inconvenience.

If you need anything else, reply directly to this email - I'm here to help!

Best regards,
Support Team

------------------------------------------------------------

üß† AGENT-ORCHESTRATED GROUP CHAT (intelligent speaker selection):


ü§ñ [SecurityReviewer] (Call #1): 1. Data exposure risks: 
   - Customer IDs and case numbers included in emails could lead to unauthorized access to sensitive information.

2. PII handling concerns: 
   - 

# 12. Magentic Orchestration

![Magentic Pattern](images/magentic-workflow.png)

Magentic is the most powerful orchestration pattern - a manager dynamically plans and delegates to specialists based on evolving task requirements.

**When to Use:**
- Complex, open-ended tasks requiring multiple iterations
- Tasks where the solution path isn't known in advance
- Research + analysis workflows with code execution

**When NOT to Use:**
- Simple linear pipelines (use Sequential)
- Fixed review rounds (use Group Chat)

## Use Case: Market Research Report

A complex task requiring:
1. **Research Agent** - Web search for current data
2. **Analyst Agent** - Code execution for data processing
3. **Manager** - Dynamic planning and synthesis

The manager autonomously decides which agent to call and when based on progress.

In [None]:
# Magentic Orchestration: Research + Analysis workflow
import json
from typing import cast
from agent_framework import (
    AgentRunUpdateEvent,
    MagenticOrchestratorEvent,
    MagenticProgressLedger,
)

# Research Agent - uses web search capability
# Note: In production, use OpenAI's gpt-4o-search-preview or add web search tools
researcher_agent = ChatAgent(
    name="ResearcherAgent",
    description="Specialist in research and information gathering about markets, trends, and data",
    instructions="""You are a Research Specialist. Your job is to:
- Gather factual information about topics
- Find current statistics and trends
- Provide sources for your findings

When asked about market data, provide realistic example data with citations.
Be concise and factual. Format data clearly for analysis.""",
    chat_client=chat_client,
)

# Analyst Agent - processes and analyzes data
# Note: In production, add HostedCodeInterpreterTool for real code execution
analyst_agent = ChatAgent(
    name="AnalystAgent",
    description="Data analyst who processes information and creates insights with calculations",
    instructions="""You are a Data Analyst. Your job is to:
- Process and analyze data provided by the researcher
- Perform calculations (growth rates, comparisons, projections)
- Create clear tables and visualizations descriptions
- Identify trends and insights

Show your calculations step by step. Format results in clear tables.""",
    chat_client=chat_client,
)

# Manager Agent - orchestrates the research workflow
manager_agent = ChatAgent(
    name="ResearchManager",
    description="Orchestrator that coordinates research and analysis workflows",
    instructions="""You manage a research team to complete comprehensive analysis tasks.

YOUR TEAM:
- ResearcherAgent: Gathers information, statistics, and market data
- AnalystAgent: Processes data, performs calculations, creates insights

YOUR PROCESS:
1. Break down the research request into subtasks
2. Delegate to ResearcherAgent to gather relevant data
3. Delegate to AnalystAgent to process and analyze the data
4. Continue iterating until you have comprehensive insights
5. Synthesize all findings into a final report

You dynamically decide who to call based on what's needed. You may call agents multiple times.""",
    chat_client=chat_client,
)

print("‚úÖ Magentic agents defined: ResearcherAgent, AnalystAgent, ResearchManager")

üöÄ SUPPORT EMAIL COPILOT - CAPSTONE DEMO


üìß PROCESSING: LEGITIMATE SUPPORT REQUEST
From: sarah.chen@acmecorp.com
Subject: Order #12345 - Delivery Issue
Customer ID: CUST-7891
--------------------------------------------------

üìä Step 1: CLASSIFICATION
   Category: not_spam
   Confidence: 95%
   Reason: The email is from a corporate domain and discusses a legitimate delivery issue, likely from a real customer.

üîç Step 3: CUSTOMER LOOKUP
   SLA: Customer CUST-7891: Premium tier, 4 hours response time, Free expedited replacement
   Ticket: Ticket TKT-2024-001: Status=Open, Priority=High, Assigned to=Support Team, Last update=2024-01-15

‚úçÔ∏è Step 4: DRAFTING RESPONSE
   Subject: Re: Order #12345 - Delivery Issue
   Tone: apologetic
   Body preview: Dear Ms. Chen,

Thank you for reaching out to us regarding the delivery issue with your recent order.

I sincerely apologize for the inconvenience this has caused, especially with your client presenta...

üîç Step 5: MULTI-AGENT 

## Build & Run Magentic Workflow

The manager dynamically plans and delegates. Watch how it calls different agents based on the evolving task.

In [None]:
# Build Magentic workflow
magentic_research_workflow = (
    MagenticBuilder()
    .participants([researcher_agent, analyst_agent])
    .with_manager(
        agent=manager_agent,
        max_round_count=10,  # Maximum delegation rounds
        max_stall_count=2,   # Replan after 2 stalls
    )
    .build()
)

# Research task - complex enough to require multiple agent interactions
research_task = """
Analyze the global electric vehicle (EV) market:
1. Find the top 5 EV manufacturers by market share
2. Calculate year-over-year growth rates
3. Compare EV adoption rates in US, Europe, and China
4. Provide a summary table and key insights
"""

async def run_magentic_research():
    print("üî¨ MAGENTIC RESEARCH WORKFLOW")
    print("=" * 60)
    print(f"üìã TASK:\n{research_task}")
    print("=" * 60)
    
    last_message_id: str | None = None
    agent_calls: dict[str, int] = {}
    
    async for event in magentic_research_workflow.run_stream(research_task):
        # Track streaming from agents
        if isinstance(event, AgentRunUpdateEvent):
            message_id = event.data.message_id
            executor_id = event.executor_id
            
            if message_id != last_message_id:
                if last_message_id is not None:
                    print("\n")
                agent_calls[executor_id] = agent_calls.get(executor_id, 0) + 1
                print(f"\nü§ñ [{executor_id}] (Call #{agent_calls[executor_id]}):", end=" ", flush=True)
                last_message_id = message_id
            
            print(event.data, end="", flush=True)
        
        # Track orchestration events
        elif isinstance(event, MagenticOrchestratorEvent):
            print(f"\n\n{'='*55}")
            print(f"üìã ORCHESTRATOR: {event.event_type.name}")
            print(f"{'='*55}")
            
            if isinstance(event.data, MagenticProgressLedger):
                ledger = event.data.to_dict()
                if "next_speaker" in ledger:
                    next_info = ledger.get('next_speaker', {})
                    if isinstance(next_info, dict):
                        print(f"   ‚û°Ô∏è Next: {next_info.get('answer', 'N/A')}")
                        reason = next_info.get('reason', '')
                        if reason:
                            print(f"   üí≠ Why: {reason[:100]}...")
                    else:
                        print(f"   ‚û°Ô∏è Next: {next_info}")
        
        # Final output
        elif isinstance(event, WorkflowOutputEvent):
            output_messages = cast(list[ChatMessage], event.data)
            
            print("\n\n" + "=" * 60)
            print("üìä EXECUTION SUMMARY")
            print("=" * 60)
            print(f"   Total agent calls: {sum(agent_calls.values())}")
            print("\n   Calls per agent:")
            for agent, count in sorted(agent_calls.items()):
                print(f"      {agent}: {count} call(s)")
            
            print("\n   ‚ú® Manager dynamically orchestrated:")
            print(f"      - Broke down complex task into subtasks")
            print(f"      - Called ResearcherAgent for data gathering")
            print(f"      - Called AnalystAgent for processing")
            print(f"      - Synthesized into final report")
            
            print("\n" + "=" * 60)
            print("üìë FINAL RESEARCH REPORT")
            print("=" * 60)
            for msg in reversed(output_messages):
                if msg.role.value == "assistant":
                    print(msg.text)
                    break

await run_magentic_research()