# Agent Framework Workflow Tutorial

This notebook demonstrates the three fundamental steps of working with workflows in the Agent Framework:

1. **Step 1: Executors and Edges** - Basic workflow building blocks
2. **Step 2: Agents in a Workflow** - Integrating AI agents into workflows (non-streaming)
3. **Step 3: Streaming** - Real-time event streaming from workflows

## Prerequisites

- Azure OpenAI endpoint and deployment configured
- Environment variables set in `.env` file
- Run `az login` for Azure authentication

## Setup: Load Environment Variables

In [1]:
# Load environment variables from .env file
import os
from pathlib import Path
from dotenv import load_dotenv

# Navigate to the .env file location
# Current notebook path: python/samples/getting_started/workflow/_start-here/workflow_tutorial.ipynb
# .env is at: python/.env
# So we need to go up 4 levels: _start-here -> workflow -> getting_started -> samples -> python
env_path = Path(__file__).resolve().parents[4] / ".env" if '__file__' in globals() else Path().resolve().parents[4] / ".env"

# For notebooks, __file__ doesn't exist, so we calculate from the current working directory
# Notebook is in: python/samples/getting_started/workflow/_start-here/
# Need to go back to: python/
import sys
if 'ipykernel' in sys.modules:
    # Running in Jupyter/notebook environment
    # Get the notebook directory and navigate to python/.env
    notebook_dir = Path().resolve()
    # Go up from _start-here -> workflow -> getting_started -> samples -> python
    env_path = notebook_dir.parents[3] / ".env"
else:
    # Running as a script
    env_path = Path(__file__).resolve().parents[4] / ".env"

print(f"Looking for .env at: {env_path}")
print(f"File exists: {env_path.exists()}")

if env_path.exists():
    load_dotenv(dotenv_path=env_path, override=True)
    print(f"✓ Environment variables loaded from: {env_path}")
else:
    print(f"❌ .env file not found at: {env_path}")
    print(f"Current working directory: {Path().resolve()}")

# Verify critical environment variables are loaded
required_vars = [
    "AZURE_OPENAI_ENDPOINT",
    "AZURE_OPENAI_CHAT_DEPLOYMENT_NAME",
]

print("\n📋 Checking environment variables:")
for var in required_vars:
    value = os.getenv(var)
    if value:
        print(f"  ✅ {var}: {value[:50]}..." if len(value) > 50 else f"  ✅ {var}: {value}")
    else:
        print(f"  ❌ {var}: NOT SET")

if all(os.getenv(var) for var in required_vars):
    print("\n✓ Ready to run workflows!")
else:
    print("\n⚠️  Some required environment variables are missing. Please check your .env file.")

Looking for .env at: /Users/arturoquiroga/GITHUB/agent-framework/python/.env
File exists: True
✓ Environment variables loaded from: /Users/arturoquiroga/GITHUB/agent-framework/python/.env

📋 Checking environment variables:
  ✅ AZURE_OPENAI_ENDPOINT: https://aq-ai-foundry-sweden-central.openai.azure....
  ✅ AZURE_OPENAI_CHAT_DEPLOYMENT_NAME: gpt-4.1

✓ Ready to run workflows!


---

# Step 1: Executors and Edges

Learn the foundational patterns for building workflows:
- Two ways to define a unit of work (an Executor node)
- How to connect nodes with edges
- Running workflows and getting outputs

## What is an Executor?

An **Executor** is a node in your workflow that performs a specific task. You can define executors in two ways:

1. **Custom class** that subclasses `Executor` with a `@handler` method
2. **Standalone function** decorated with `@executor`

In [2]:
import asyncio
from typing_extensions import Never
from agent_framework import (
    Executor,
    WorkflowBuilder,
    WorkflowContext,
    executor,
    handler,
)

### Example 1: Custom Executor Class

Create a custom executor that converts text to uppercase:

In [3]:
class UpperCase(Executor):
    """Convert input text to uppercase and forward to the next node."""
    
    def __init__(self, id: str):
        super().__init__(id=id)

    @handler
    async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:
        """Convert the input to uppercase and forward it to the next node.
        
        Args:
            text: Input string to convert
            ctx: WorkflowContext[str] - expects to send str to downstream nodes
        """
        result = text.upper()
        print(f"UpperCase: '{text}' -> '{result}'")
        
        # Send the result to the next executor in the workflow
        await ctx.send_message(result)

### Example 2: Function-Based Executor

For simpler tasks, use a function decorated with `@executor`:

In [4]:
@executor(id="reverse_text_executor")
async def reverse_text(text: str, ctx: WorkflowContext[Never, str]) -> None:
    """Reverse the input string and yield the workflow output.
    
    Args:
        text: Input string to reverse
        ctx: WorkflowContext[Never, str] - yields str output, doesn't send to downstream
    """
    result = text[::-1]
    print(f"ReverseText: '{text}' -> '{result}'")
    
    # Yield the final output - the workflow will complete when idle
    await ctx.yield_output(result)

### Build and Run the Workflow

Connect the executors with edges and run the workflow:

In [5]:
async def run_step1_workflow():
    """Build and run a simple 2-step workflow."""
    
    # Create the executor instances
    upper_case = UpperCase(id="upper_case_executor")
    
    # Build the workflow:
    # 1) add_edge(from_node, to_node) defines upper_case -> reverse_text
    # 2) set_start_executor(node) declares the entry point
    # 3) build() returns an immutable Workflow object
    workflow = (
        WorkflowBuilder()
        .add_edge(upper_case, reverse_text)
        .set_start_executor(upper_case)
        .build()
    )
    
    # Run the workflow with initial input
    print("\n🚀 Running Step 1 Workflow...")
    print("=" * 50)
    events = await workflow.run("hello world")
    
    print("=" * 50)
    print(f"\n📤 Workflow Outputs: {events.get_outputs()}")
    print(f"✓ Final State: {events.get_final_state()}")
    
# Run the workflow
await run_step1_workflow()


🚀 Running Step 1 Workflow...
UpperCase: 'hello world' -> 'HELLO WORLD'
ReverseText: 'HELLO WORLD' -> 'DLROW OLLEH'

📤 Workflow Outputs: ['DLROW OLLEH']
✓ Final State: WorkflowRunState.IDLE


---

# Step 2: Agents in a Workflow (Non-Streaming)

Integrate AI agents into your workflows. This example shows:
- Creating agents with `AzureOpenAIChatClient`
- Using agents as workflow nodes
- Chaining agents together (Writer → Reviewer)

## Use Case: Content Creation Pipeline

1. **Writer Agent**: Creates content based on user request
2. **Reviewer Agent**: Reviews and provides feedback on the content

In [6]:
from agent_framework import AgentRunEvent
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential

In [7]:
async def run_step2_workflow():
    """Build and run a two-agent workflow: Writer then Reviewer."""
    
    # Create the Azure chat client using Azure CLI credentials
    print("🔐 Authenticating with Azure...")
    chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
    
    # Create specialized agents
    print("🤖 Creating Writer agent...")
    writer_agent = chat_client.create_agent(
        instructions=(
            "You are an excellent content writer. You create new content and "
            "edit contents based on the feedback."
        ),
        name="writer",
    )
    
    print("🤖 Creating Reviewer agent...")
    reviewer_agent = chat_client.create_agent(
        instructions=(
            "You are an excellent content reviewer. "
            "Provide actionable feedback to the writer about the provided content. "
            "Provide the feedback in the most concise manner possible."
        ),
        name="reviewer",
    )
    
    # Build the workflow
    print("🔧 Building workflow...")
    workflow = (
        WorkflowBuilder()
        .set_start_executor(writer_agent)
        .add_edge(writer_agent, reviewer_agent)
        .build()
    )
    
    # Run the workflow
    print("\n🚀 Running Step 2 Workflow...")
    print("=" * 60)
    
    user_request = "Create a slogan for a new electric SUV that is affordable and fun to drive."
    print(f"👤 User Request: {user_request}\n")
    
    events = await workflow.run(user_request)
    
    # Print agent outputs
    for event in events:
        if isinstance(event, AgentRunEvent):
            print(f"\n🤖 {event.executor_id}:")
            print(f"   {event.data}")
    
    print("\n" + "=" * 60)
    print(f"\n📤 Workflow Outputs:\n{events.get_outputs()[0] if events.get_outputs() else 'No outputs'}")
    print(f"\n✓ Final State: {events.get_final_state()}")

# Run the workflow
await run_step2_workflow()

🔐 Authenticating with Azure...
🤖 Creating Writer agent...
🤖 Creating Reviewer agent...
🔧 Building workflow...

🚀 Running Step 2 Workflow...
👤 User Request: Create a slogan for a new electric SUV that is affordable and fun to drive.

🤖 Creating Writer agent...
🤖 Creating Reviewer agent...
🔧 Building workflow...

🚀 Running Step 2 Workflow...
👤 User Request: Create a slogan for a new electric SUV that is affordable and fun to drive.


🤖 writer:
   Drive Joyfully, Go Electrically—Affordability Meets Adventure.

🤖 reviewer:
   Slogan: "Charge Up Fun—Affordable Adventure in Every Drive."

**Feedback:**  
- Cleverly combines affordability and fun.
- "Charge Up" and "Adventure" reinforce the electric and SUV aspects.
- "Fun" could be presented with a more dynamic verb for energy.
- Consider making it even shorter for memorability. For example: "Affordable Fun. Electric Adventure."


📤 Workflow Outputs:
Slogan: "Charge Up Fun—Affordable Adventure in Every Drive."

**Feedback:**  
- Cleverly com

---

# Step 3: Streaming Workflows

Stream events in real-time as your workflow executes:
- See executor invocations as they happen
- Monitor workflow state changes
- Capture outputs and errors immediately

## Custom Executors with Agents

We'll create custom executor classes that wrap agents and use the `@handler` pattern:

In [8]:
from agent_framework import (
    ChatAgent,
    ChatMessage,
    ExecutorFailedEvent,
    WorkflowFailedEvent,
    WorkflowRunState,
    WorkflowStatusEvent,
)
from agent_framework._workflow._events import WorkflowOutputEvent

In [9]:
class Writer(Executor):
    """Custom executor that owns a content generation agent."""
    
    agent: ChatAgent

    def __init__(self, chat_client: AzureOpenAIChatClient, id: str = "writer"):
        # Create a domain-specific agent
        agent = chat_client.create_agent(
            instructions=(
                "You are an excellent content writer. You create new content and "
                "edit contents based on the feedback."
            ),
        )
        super().__init__(agent=agent, id=id)

    @handler
    async def handle(self, message: ChatMessage, ctx: WorkflowContext[list[ChatMessage]]) -> None:
        """Generate content and forward the conversation to the next node.
        
        Args:
            message: Inbound user ChatMessage
            ctx: WorkflowContext expecting list[ChatMessage] downstream
        """
        # Start conversation with the user message
        messages: list[ChatMessage] = [message]
        
        # Run the agent and extend the conversation
        response = await self.agent.run(messages)
        messages.extend(response.messages)
        
        # Forward the accumulated messages to the next executor
        await ctx.send_message(messages)


class Reviewer(Executor):
    """Custom executor that owns a review agent."""
    
    agent: ChatAgent

    def __init__(self, chat_client: AzureOpenAIChatClient, id: str = "reviewer"):
        # Create a review agent
        agent = chat_client.create_agent(
            instructions=(
                "You are an excellent content reviewer. "
                "You review the content and provide feedback to the writer."
            ),
        )
        super().__init__(agent=agent, id=id)

    @handler
    async def handle(self, messages: list[ChatMessage], ctx: WorkflowContext[Never, str]) -> None:
        """Review the conversation and yield the final output.
        
        Args:
            messages: Full conversation history
            ctx: WorkflowContext that yields str output
        """
        response = await self.agent.run(messages)
        await ctx.yield_output(response.text)

### Run with Streaming

Now let's run the workflow with streaming to see events in real-time:

In [10]:
async def run_step3_streaming_workflow():
    """Build and run a streaming workflow with real-time event monitoring."""
    
    # Create the Azure chat client
    print("🔐 Authenticating with Azure...")
    chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
    
    # Instantiate the custom agent-backed executors
    print("🤖 Creating Writer and Reviewer executors...")
    writer = Writer(chat_client)
    reviewer = Reviewer(chat_client)
    
    # Build the workflow
    print("🔧 Building workflow...")
    workflow = (
        WorkflowBuilder()
        .set_start_executor(writer)
        .add_edge(writer, reviewer)
        .build()
    )
    
    # Run with streaming
    print("\n🚀 Running Step 3 Streaming Workflow...")
    print("=" * 60)
    
    user_message = ChatMessage(
        role="user",
        text="Create a slogan for a new electric SUV that is affordable and fun to drive."
    )
    print(f"👤 User Request: {user_message.text}\n")
    
    # Stream events as they occur
    async for event in workflow.run_stream(user_message):
        if isinstance(event, WorkflowStatusEvent):
            state_emoji = {
                WorkflowRunState.IN_PROGRESS: "⚙️",
                WorkflowRunState.IN_PROGRESS_PENDING_REQUESTS: "⏳",
                WorkflowRunState.IDLE: "💤",
                WorkflowRunState.IDLE_WITH_PENDING_REQUESTS: "⏸️",
            }.get(event.state, "📊")
            print(f"\n{state_emoji} State ({event.origin.value}): {event.state.value}")
            
        elif isinstance(event, WorkflowOutputEvent):
            print(f"\n📤 Workflow Output ({event.origin.value}):")
            print(f"   {event.data}")
            
        elif isinstance(event, ExecutorFailedEvent):
            print(
                f"\n❌ Executor Failed ({event.origin.value}): "
                f"{event.executor_id} - {event.details.error_type}: {event.details.message}"
            )
            
        elif isinstance(event, WorkflowFailedEvent):
            print(
                f"\n❌ Workflow Failed ({event.origin.value}): "
                f"{event.details.error_type}: {event.details.message}"
            )
            
        else:
            print(f"📋 {event.__class__.__name__} ({event.origin.value}): {event}")
    
    print("\n" + "=" * 60)
    print("\n✓ Streaming workflow completed!")

# Run the streaming workflow
await run_step3_streaming_workflow()

🔐 Authenticating with Azure...
🤖 Creating Writer and Reviewer executors...
🔧 Building workflow...

🚀 Running Step 3 Streaming Workflow...
👤 User Request: Create a slogan for a new electric SUV that is affordable and fun to drive.

📋 WorkflowStartedEvent (FRAMEWORK): WorkflowStartedEvent(origin=WorkflowEventSource.FRAMEWORK, data=None)

⚙️ State (FRAMEWORK): IN_PROGRESS
🤖 Creating Writer and Reviewer executors...
🔧 Building workflow...

🚀 Running Step 3 Streaming Workflow...
👤 User Request: Create a slogan for a new electric SUV that is affordable and fun to drive.

📋 WorkflowStartedEvent (FRAMEWORK): WorkflowStartedEvent(origin=WorkflowEventSource.FRAMEWORK, data=None)

⚙️ State (FRAMEWORK): IN_PROGRESS
📋 ExecutorInvokedEvent (FRAMEWORK): ExecutorInvokedEvent(executor_id=writer, data=None)
📋 ExecutorCompletedEvent (FRAMEWORK): ExecutorCompletedEvent(executor_id=writer, data=None)
📋 ExecutorInvokedEvent (FRAMEWORK): ExecutorInvokedEvent(executor_id=reviewer, data=None)
📋 ExecutorInvoked

---

## Summary

You've learned the three key patterns for working with workflows:

### Step 1: Executors and Edges
- ✅ Create executors as classes or functions
- ✅ Connect nodes with `add_edge()`
- ✅ Use `WorkflowContext` to send messages and yield outputs

### Step 2: Agents in Workflows
- ✅ Integrate AI agents as workflow nodes
- ✅ Chain multiple agents together
- ✅ Collect outputs from terminal nodes

### Step 3: Streaming
- ✅ Monitor workflows in real-time with `run_stream()`
- ✅ Handle different event types
- ✅ Track workflow state changes

## Next Steps

- Explore more complex workflow patterns with branching and conditions
- Add error handling and retry logic
- Integrate custom tools and external APIs
- Build multi-agent systems with specialized roles