# Workflows in Agent Framework

**AI Agents** are dynamic - the LLM decides what steps to take at runtime based on context and available tools. Use when you need flexibility, reasoning, and open-ended problem solving.

**Workflows** are predefined sequences with explicit execution paths. The structure is fixed at design time, but can include conditional branching and parallel execution. Use when you need predictability, auditability, or coordination across multiple agents and external systems.

Workflows can contain agents as components - giving you LLM intelligence within a controlled structure.

### Core Concepts
1. **Executors**: Processing units that receive messages, perform tasks, and produce outputs. Can be agents or custom logic.
2. **Edges**: Connections between executors that control message flow. Support conditional routing.
3. **Workflows**: Directed graphs of executors and edges with a defined start point.

### Setup

First, let's validate our environment to ensure all required variables are configured.

In [None]:
import sys
sys.path.insert(0, '..')  # Add parent directory to path

from workshop_utils import validate_env

validate_env()

### Basic Executor Structure
Executors are the building blocks that process messages. They inherit from `Executor` and use the `@handler` decorator to define message handlers.

**Handler signature**: `(self, input: T_In, ctx: WorkflowContext[T_Out]) -> None`
- First param: the typed input message
- Second param: context for sending outputs downstream

#### Pattern 1: Class-based Executor
Here's an executor that validates an order and calculates the total:

In [None]:
from agent_framework import Executor, WorkflowContext, handler

class ValidateOrder(Executor):
    def __init__(self):
        super().__init__(id="validate_order")
    
    @handler
    async def handle(self, order: dict, ctx: WorkflowContext[dict]) -> None:
        # Add validation status and calculate total
        order["is_valid"] = order.get("quantity", 0) > 0
        order["total"] = order.get("quantity", 0) * order.get("unit_price", 0)
        await ctx.send_message(order)

#### Pattern 2: Function-based Executor
For simple, stateless executors, use the `@executor` decorator on an async function.

In [None]:
from agent_framework import executor, WorkflowContext

@executor(id="apply_discount")
async def apply_discount(order: dict, ctx: WorkflowContext[dict]) -> None:
    # Apply 10% discount for orders over $100
    if order.get("total", 0) > 100:
        order["discount"] = order["total"] * 0.10
        order["final_total"] = order["total"] - order["discount"]
    else:
        order["discount"] = 0
        order["final_total"] = order["total"]
    await ctx.send_message(order)

### WorkflowContext Methods
`WorkflowContext` provides two key methods for handlers:

| Method | Purpose | Context Type |
|--------|---------|-------------|
| `send_message(value)` | Forward to downstream executors | `WorkflowContext[T_Out]` |
| `yield_output(value)` | Emit as final workflow output | `WorkflowContext[Never, T_WorkflowOut]` |

If a handler does neither, use `WorkflowContext` with no type params.

**What is `Never`?** The `Never` type (from `typing_extensions`) indicates a **terminal executor** - the last step in a workflow branch. Terminal executors don't pass data downstream to other executors; instead, they produce the workflow's final output. Using `Never` as the first type parameter tells the type checker that `send_message()` should not be called.

In [None]:
from agent_framework import Executor, WorkflowContext, handler
from typing_extensions import Never

class OrderProcessor(Executor):
    def __init__(self):
        super().__init__(id="order_processor")
    
    @handler
    async def forward_to_next(self, order: dict, ctx: WorkflowContext[dict]) -> None:
        """Send to downstream executor."""
        order["status"] = "processing"
        await ctx.send_message(order)

    @handler
    async def complete_order(self, order: dict, ctx: WorkflowContext[Never, str]) -> None:
        """Yield as workflow output (terminal node)."""
        await ctx.yield_output(f"Order {order['id']} completed. Total: ${order['final_total']:.2f}")

    @handler
    async def log_order(self, order: dict, ctx: WorkflowContext) -> None:
        """No output - just perform side effects."""
        print(f"Logging order {order['id']} to audit trail")

### Building Workflows with Edges

A **Workflow** connects executors and runs them. Use `WorkflowBuilder` to define the graph, then call `workflow.run(input)` to execute it. Your input goes to the start executor, which processes it and passes results to connected executors via edges, continuing until no more work remains.

**Edges** define how messages flow between executors. The framework supports:

- **Direct**: Simple one-to-one connections
- **Conditional**: Route based on message content
- **Fan-out**: One source to multiple targets
- **Fan-in**: Multiple sources to one target

Let's build an order processing workflow with conditional routing:

In [None]:
from agent_framework import WorkflowBuilder, WorkflowContext, executor
from typing_extensions import Never

@executor(id="validate_order")
async def validate_order(order: dict, ctx: WorkflowContext[dict]) -> None:
    """Validate and enrich the order."""
    order["is_valid"] = order.get("quantity", 0) > 0
    order["total"] = order.get("quantity", 0) * order.get("unit_price", 0)
    await ctx.send_message(order)

@executor(id="standard_processing")
async def standard_processing(order: dict, ctx: WorkflowContext[Never, str]) -> None:
    """Process standard orders."""
    await ctx.yield_output(f"Standard order processed: ${order['total']:.2f}")

@executor(id="priority_processing")
async def priority_processing(order: dict, ctx: WorkflowContext[Never, str]) -> None:
    """Process high-value orders with priority handling."""
    discount = order["total"] * 0.05  # 5% loyalty discount
    await ctx.yield_output(f"Priority order processed: ${order['total']:.2f} (loyalty discount: ${discount:.2f})")

# Build workflow with conditional routing based on order value
workflow = (
    WorkflowBuilder()
    .add_edge(validate_order, standard_processing)  # Direct: always executes
    .add_edge(validate_order, priority_processing, condition=lambda order: order.get("total", 0) > 500)  # Conditional: high-value orders
    .set_start_executor(validate_order)
    .build()
)

# Test with small order (only standard_processing runs)
small_order = {"id": "ORD-001", "quantity": 2, "unit_price": 25.00}
print(f"Small order: {small_order}")
events = await workflow.run(small_order)
print(f"Results: {events.get_outputs()}")

print()

# Test with large order (both processors run - fan-out)
large_order = {"id": "ORD-002", "quantity": 50, "unit_price": 25.00}
print(f"Large order: {large_order}")
events = await workflow.run(large_order)
print(f"Results: {events.get_outputs()}")

### Using Agents in Workflows

In this step, the agents we add to the workflow will be instantiated using the `AzureOpenAIChatClient` class that we already used earlier in *Section 01.1*:

In [None]:
import os
from dotenv import load_dotenv
from agent_framework.azure import AzureOpenAIChatClient

load_dotenv()
endpoint = os.getenv("AZURE_OPENAI_ENDPOINT")
api_key = os.getenv("AZURE_OPENAI_API_KEY")
api_version = os.getenv("AZURE_OPENAI_API_VERSION")
deployment = os.getenv("AZURE_OPENAI_CHAT_DEPLOYMENT_NAME")

chat_client=AzureOpenAIChatClient(
        endpoint=endpoint,
        api_key=api_key,
        api_version=api_version,
        deployment_name=deployment,
    )

When you add an agent to a workflow:

- It behaves like an executor but runs with its own instructions and context.
- You can chain multiple agents together to create collaborative flows (e.g., Writer → Reviewer).
- The workflow orchestrates their interaction, passing messages and collecting outputs.

In the following example, two agents are created:
-  `writer_agent` generates and edits content and `reviewer_agent` reviews content and provides feedback
- The workflow setup uses `WorkflowBuilder` to set the start executor as the writer agent, add an edge from the writer to the reviewer and builds the workflow graph.

In [None]:
from agent_framework import AgentRunEvent, WorkflowBuilder

# Define agent factory functions for proper state isolation.
# Each build() will create fresh agent instances, preventing conversation leakage.
def create_writer_agent():
    return chat_client.as_agent(
        instructions="You are an excellent content writer. You create new content and edit contents based on the feedback.",
        name="writer",
    )

def create_reviewer_agent():
    return chat_client.as_agent(
        instructions=(
            "You are an excellent content reviewer. "
            "Provide actionable feedback to the writer about the provided content. "
            "Provide the feedback in the most concise manner possible."
        ),
        name="reviewer",
    )

# Build workflow using registered agent names for state isolation
workflow = (
    WorkflowBuilder()
    .register_agent(factory_func=create_writer_agent, name="writer")
    .register_agent(factory_func=create_reviewer_agent, name="reviewer")
    .set_start_executor("writer")
    .add_edge("writer", "reviewer")
    .build()
)

# Run the workflow
events = await workflow.run("Create a slogan for a new electric SUV that is affordable and fun to drive.")

# Print agent outputs
for event in events:
    if isinstance(event, AgentRunEvent):
        print(f"\nExecutor: {event.executor_id}")
        print(f"Output:\n{event.data}")
        print("-" * 40)

print(f"Workflow State: {events.get_final_state()}")

### Exercise - Create a Multi-Agent Workflow

Build your own workflow with **3 agents** that collaborate on a task. Some scenario ideas:

| Scenario | Agent 1 | Agent 2 | Agent 3 |
|----------|---------|---------|----------|
| Business case validation | Analyst (drafts case) | Finance (reviews risks) | Manager (approves/rejects) |
| Content pipeline | Writer (creates draft) | Editor (improves clarity) | Fact-checker (verifies claims) |
| Code review | Developer (writes code) | Reviewer (suggests improvements) | Security (checks vulnerabilities) |

**Hints:**
- Each agent needs a factory function and clear instructions
- Use `register_agent()` for each agent, then connect them with `add_edge()`
- The workflow is linear: Agent1 → Agent2 → Agent3

In [None]:
from agent_framework import AgentRunEvent, WorkflowBuilder

# Step 1: Define your agent factory functions
def create_agent1():
    return chat_client.as_agent(
        instructions="TODO: Add instructions for your first agent",
        name="agent1",
    )

def create_agent2():
    return chat_client.as_agent(
        instructions="TODO: Add instructions for your second agent",
        name="agent2",
    )

def create_agent3():
    return chat_client.as_agent(
        instructions="TODO: Add instructions for your third agent",
        name="agent3",
    )

# Step 2: Build the workflow
workflow = (
    WorkflowBuilder()
    # TODO: Register your agents and connect them with edges
    # .register_agent(factory_func=..., name="...")
    # .set_start_executor("...")
    # .add_edge("...", "...")
    .build()
)

# Step 3: Run the workflow
events = await workflow.run("TODO: Your input task here")

# Step 4: Print agent outputs
for event in events:
    if isinstance(event, AgentRunEvent):
        print(f"\nExecutor: {event.executor_id}")
        print(f"Output:\n{event.data}")
        print("-" * 40)

print(f"Workflow State: {events.get_final_state()}")

<details>
<summary>See the solution</summary>

```python
from agent_framework import AgentRunEvent, WorkflowBuilder

# Define agent factory functions
def create_analyst_agent():
    return chat_client.as_agent(
        instructions="You are a business analyst. Draft a short business case based on the provided idea.",
        name="analyst",
    )

def create_finance_agent():
    return chat_client.as_agent(
        instructions="You are a finance expert. Review the business case and highlight any financial risks or constraints.",
        name="finance",
    )

def create_approval_agent():
    return chat_client.as_agent(
        instructions=(
            "You are a senior manager. Decide whether to approve the business case based on strategic alignment. "
            "Respond with 'Approved' or 'Needs Revision' and provide reasoning."
        ),
        name="approval",
    )

# Build workflow: Analyst → Finance → Approval
workflow = (
    WorkflowBuilder()
    .register_agent(factory_func=create_analyst_agent, name="analyst")
    .register_agent(factory_func=create_finance_agent, name="finance")
    .register_agent(factory_func=create_approval_agent, name="approval")
    .set_start_executor("analyst")
    .add_edge("analyst", "finance")
    .add_edge("finance", "approval")
    .build()
)

# Run the workflow
events = await workflow.run("Develop a business case for implementing an AI-driven customer support service.")

# Print agent outputs
for event in events:
    if isinstance(event, AgentRunEvent):
        print(f"\nExecutor: {event.executor_id}")
        print(f"Output:\n{event.data}")
        print("-" * 40)

print(f"Workflow State: {events.get_final_state()}")
```
</details>

### Custom Executors

So far we've used `register_agent()` to add agents directly to workflows. But sometimes you need more control - custom input/output handling, preprocessing, or managing conversation history between agents.

**Custom executors** let you wrap agents in your own classes. You subclass `Executor`, create the agent in `__init__`, and define a `@handler` method to control how messages flow.

Here's a Writer executor that:
- Receives a `ChatMessage` as input
- Runs its internal agent
- Passes the full conversation history downstream

In [None]:
from agent_framework import ChatAgent, ChatMessage, Executor, WorkflowContext, handler
from typing_extensions import Never

class Writer(Executor):
    """Custom executor that owns a writer agent."""

    agent: ChatAgent

    def __init__(self, chat_client, id: str = "writer"):
        self.agent = chat_client.as_agent(
            instructions="You are an excellent content writer. You create new content and edit contents based on the feedback.",
        )
        super().__init__(id=id)

    @handler
    async def handle(self, message: ChatMessage, ctx: WorkflowContext[list[ChatMessage]]) -> None:
        """Generate content and forward the conversation history."""
        messages: list[ChatMessage] = [message]
        response = await self.agent.run(messages)
        messages.extend(response.messages)
        await ctx.send_message(messages)

And a Reviewer executor that:
- Receives the conversation history (`list[ChatMessage]`)
- Reviews the content
- Yields the final output (terminal node)

In [None]:
class Reviewer(Executor):
    """Custom executor that owns a reviewer agent."""

    agent: ChatAgent

    def __init__(self, chat_client, id: str = "reviewer"):
        self.agent = chat_client.as_agent(
            instructions="You are an excellent content reviewer. Review the content and provide constructive feedback.",
        )
        super().__init__(id=id)

    @handler
    async def handle(self, messages: list[ChatMessage], ctx: WorkflowContext[Never, str]) -> None:
        """Review content and yield final output."""
        response = await self.agent.run(messages)
        await ctx.yield_output(response.text)

Now let's build and run the workflow with these custom executors.

Note: We use `register_executor()` (not `register_agent()`) since these are custom executor classes:

In [None]:
from agent_framework import WorkflowBuilder

# Factory functions for state isolation
def create_writer():
    return Writer(chat_client)

def create_reviewer():
    return Reviewer(chat_client)

# Build workflow
workflow = (
    WorkflowBuilder()
    .register_executor(factory_func=create_writer, name="writer")
    .register_executor(factory_func=create_reviewer, name="reviewer")
    .set_start_executor("writer")
    .add_edge("writer", "reviewer")
    .build()
)

# Run the workflow
events = await workflow.run(
    ChatMessage(role="user", text="Create a slogan for a new electric SUV that is affordable and fun to drive.")
)

print(f"Outputs: {events.get_outputs()[0]}")
print(f"State: {events.get_final_state()}")

### Streaming Workflow Events

For real-time observability, use `workflow.run_stream()` instead of `workflow.run()`. This yields events as they happen:

| Event | Description |
|-------|-------------|
| `WorkflowStatusEvent` | State changes (IN_PROGRESS, IDLE, etc.) |
| `WorkflowOutputEvent` | Final outputs from terminal executors |
| `ExecutorFailedEvent` | An executor encountered an error |
| `WorkflowFailedEvent` | The entire workflow failed |

In [None]:
from agent_framework import (
    ExecutorFailedEvent,
    WorkflowFailedEvent,
    WorkflowOutputEvent,
    WorkflowStatusEvent,
)

# Rebuild workflow (fresh state)
workflow = (
    WorkflowBuilder()
    .register_executor(factory_func=create_writer, name="writer")
    .register_executor(factory_func=create_reviewer, name="reviewer")
    .set_start_executor("writer")
    .add_edge("writer", "reviewer")
    .build()
)

# Stream events
async for event in workflow.run_stream(
    ChatMessage(role="user", text="Create a slogan for a new electric SUV that is affordable and fun to drive.")
):
    if isinstance(event, WorkflowStatusEvent):
        print(f"Status: {event.state}")
    elif isinstance(event, WorkflowOutputEvent):
        print(f"Output: {event.data}")
    elif isinstance(event, ExecutorFailedEvent):
        print(f"Executor failed: {event.executor_id} - {event.details.message}")
    elif isinstance(event, WorkflowFailedEvent):
        print(f"Workflow failed: {event.details.message}")

### Workflows as Agents

You can convert any workflow into an agent using `as_agent()`. This is useful when you want to:
- Use a workflow with APIs that expect the agent interface
- Compose workflows as tools for other agents
- Maintain a consistent interface across your system

```python
# Convert workflow to agent
workflow_agent = workflow.as_agent(name="WriterReviewer")

# Use it like any other agent
response = await workflow_agent.run("Write a tagline for our product")
print(response.text)
```

### Exercise: Build a Custom Executor Workflow with Streaming

Create a workflow using **custom executors** that process a support ticket. The workflow should:

1. **Classifier** - Analyzes the ticket and categorizes it (billing, technical, general)
2. **Responder** - Generates an appropriate response based on the classification

Use `run_stream()` to observe the workflow events in real-time.

**Hints:**
- Create a custom `Classifier` executor that receives a `ChatMessage` and sends a `dict` with the ticket and category
- Create a custom `Responder` executor that receives the `dict` and yields the final response
- Use `register_executor()` with factory functions for custom executor classes

In [None]:
from agent_framework import ChatAgent, ChatMessage, Executor, WorkflowBuilder, WorkflowContext, handler
from agent_framework import WorkflowStatusEvent, WorkflowOutputEvent, ExecutorFailedEvent, WorkflowFailedEvent
from typing_extensions import Never

# Step 1: Define custom Classifier executor
class Classifier(Executor):
    agent: ChatAgent

    def __init__(self, chat_client, id: str = "classifier"):
        self.agent = chat_client.as_agent(
            instructions="TODO: Add classifier instructions",
        )
        super().__init__(id=id)

    @handler
    async def handle(self, message: ChatMessage, ctx: WorkflowContext[dict]) -> None:
        # TODO: Classify the ticket and send dict with ticket + category
        pass

# Step 2: Define custom Responder executor
class Responder(Executor):
    agent: ChatAgent

    def __init__(self, chat_client, id: str = "responder"):
        self.agent = chat_client.as_agent(
            instructions="TODO: Add responder instructions",
        )
        super().__init__(id=id)

    @handler
    async def handle(self, data: dict, ctx: WorkflowContext[Never, str]) -> None:
        # TODO: Generate response based on classification and yield output
        pass

# Step 3: Factory functions
def create_classifier():
    return Classifier(chat_client)

def create_responder():
    return Responder(chat_client)

# Step 4: Build workflow
workflow = (
    WorkflowBuilder()
    # TODO: Register executors and connect with edges
    .build()
)

# Step 5: Run with streaming
async for event in workflow.run_stream(
    ChatMessage(role="user", text="I was charged twice for my subscription last month. Please help!")
):
    if isinstance(event, WorkflowStatusEvent):
        print(f"Status: {event.state}")
    elif isinstance(event, WorkflowOutputEvent):
        print(f"Response: {event.data}")

<details>
<summary>See the solution</summary>

```python
from agent_framework import ChatAgent, ChatMessage, Executor, WorkflowBuilder, WorkflowContext, handler
from agent_framework import WorkflowStatusEvent, WorkflowOutputEvent
from typing_extensions import Never

class Classifier(Executor):
    """Classifies support tickets into categories."""
    agent: ChatAgent

    def __init__(self, chat_client, id: str = "classifier"):
        self.agent = chat_client.as_agent(
            instructions=(
                "You classify support tickets into exactly one category: 'billing', 'technical', or 'general'. "
                "Respond with ONLY the category name, nothing else."
            ),
        )
        super().__init__(id=id)

    @handler
    async def handle(self, message: ChatMessage, ctx: WorkflowContext[dict]) -> None:
        response = await self.agent.run([message])
        category = response.text.strip().lower()
        await ctx.send_message({"ticket": message.text, "category": category})


class Responder(Executor):
    """Generates responses based on ticket category."""
    agent: ChatAgent

    def __init__(self, chat_client, id: str = "responder"):
        self.agent = chat_client.as_agent(
            instructions=(
                "You are a helpful support agent. Generate a professional response to the customer's ticket. "
                "Tailor your response based on the category provided. Be concise and helpful."
            ),
        )
        super().__init__(id=id)

    @handler
    async def handle(self, data: dict, ctx: WorkflowContext[Never, str]) -> None:
        prompt = f"Category: {data['category']}\nTicket: {data['ticket']}"
        response = await self.agent.run([ChatMessage(role="user", text=prompt)])
        await ctx.yield_output(response.text)


# Factory functions
def create_classifier():
    return Classifier(chat_client)

def create_responder():
    return Responder(chat_client)

# Build workflow
workflow = (
    WorkflowBuilder()
    .register_executor(factory_func=create_classifier, name="classifier")
    .register_executor(factory_func=create_responder, name="responder")
    .set_start_executor("classifier")
    .add_edge("classifier", "responder")
    .build()
)

# Run with streaming
async for event in workflow.run_stream(
    ChatMessage(role="user", text="I was charged twice for my subscription last month. Please help!")
):
    if isinstance(event, WorkflowStatusEvent):
        print(f"Status: {event.state}")
    elif isinstance(event, WorkflowOutputEvent):
        print(f"Response: {event.data}")
```
</details>

---
## Summary & Recap

In this notebook, you learned the fundamentals of building workflows with the Microsoft Agent Framework:

### Key Concepts

| Concept | Description |
|---------|-------------|
| **Executor** | A processing unit that receives typed messages, performs operations, and produces outputs |
| **WorkflowContext** | Provides methods (`send_message`, `yield_output`) for handlers to interact with the workflow |
| **Edges** | Define connections between executors with optional conditions for routing |
| **WorkflowBuilder** | Fluent API for constructing workflows by connecting executors |
| **Workflow Events** | Observable events (`WorkflowOutputEvent`, `ExecutorFailedEvent`, etc.) for monitoring execution |

### What You Built

1. **Basic Executors** - Class-based and function-based executors with `@handler` decorators
2. **Sequential Workflows** - Linear pipelines connecting multiple executors
3. **Concurrent Workflows** - Fan-out/fan-in patterns for parallel processing
4. **Agent Workflows** - Integrated AI agents as workflow executors
5. **Custom Agent Executors** - Wrapped agents with streaming event handling

### Key Patterns

```python
# Function-based executor (stateless - can pass directly)
@executor(id="validate_order")
async def validate_order(order: dict, ctx: WorkflowContext[dict]) -> None:
    order["is_valid"] = order.get("quantity", 0) > 0
    await ctx.send_message(order)

# Simple workflow with stateless executors
workflow = (
    WorkflowBuilder()
    .add_edge(validate_order, process_order)
    .set_start_executor(validate_order)
    .build()
)

# For agents/stateful executors - use factory functions for state isolation
def create_writer_agent():
    return chat_client.as_agent(instructions="...", name="writer")

workflow = (
    WorkflowBuilder()
    .register_agent(factory_func=create_writer_agent, name="writer")
    .set_start_executor("writer")
    .build()
)
```

### Next Steps

In the next notebook (`02.2-orchestrations.ipynb`), you'll learn about **pre-built orchestration patterns** including Sequential, Concurrent, and Group Chat orchestrations that simplify multi-agent coordination.