# Agents in Workflows (Python) Tutorial Notebook

This notebook walks through the steps from the Microsoft Learn tutorial: **Agents in Workflows**.

Reference: https://learn.microsoft.com/en-us/agent-framework/tutorials/workflows/agents-in-workflows?pivots=programming-language-python

You will learn how to:
- Create Azure AI (Foundry) / Azure OpenAI backed agents.
- Connect them in a sequential workflow (Writer → Reviewer).
- Run the workflow in streaming mode (token updates) and non‑streaming mode.
- Understand events (`AgentRunUpdateEvent`, `WorkflowOutputEvent`).
- Manage async resource lifetimes correctly with `AsyncExitStack`.

## Prerequisites
1. Python 3.10+
2. Install packages (choose the one(s) you need):
   - `pip install agent-framework-azure-ai` (for `AzureAIAgentClient`)
   - `pip install agent-framework-core` (core abstractions if not already included)
   - `pip install python-dotenv azure-identity`
3. Azure CLI authenticated: `az login`
4. Environment variables for model backends. Examples (adapt to your setup):
   - Azure AI Agent Service (Foundry style):
     - `AZURE_AI_CHAT_MODEL` (e.g. gpt-4o-mini)
     - Optionally endpoint variables if required by your deployment.
   - Azure OpenAI (for non‑streaming example using `AzureOpenAIChatClient`):
     - `AZURE_OPENAI_ENDPOINT`
     - `AZURE_OPENAI_CHAT_DEPLOYMENT_NAME`
5. (Optional) A `.env` file colocated with this notebook path to simplify configuration.

> If you only want the streaming sample, you can skip the Azure OpenAI specific variables.

In [1]:
# (Optional) Installation helper – comment out if already installed.
# %pip install agent-framework-azure-ai python-dotenv azure-identity
# %pip install agent-framework-core
import sys, platform, importlib
print(f'Python: {sys.version.split()[0]}  Platform: {platform.system()}')
for pkg in ('agent_framework', 'azure.identity'):
    try:
        importlib.import_module(pkg)
        print(f'✓ {pkg} available')
    except ImportError:
        print(f'✗ {pkg} missing – install if required')

Python: 3.13.7  Platform: Darwin
✓ agent_framework available
✓ azure.identity available


## Step 1: Imports (Streaming Example)
The streaming variant uses `AzureAIAgentClient` which manages Azure AI agents and provides streaming token updates via `AgentRunUpdateEvent`.

In [2]:
import asyncio
from collections.abc import Awaitable, Callable
from contextlib import AsyncExitStack
from typing import Any
from pathlib import Path

from agent_framework import AgentRunUpdateEvent, WorkflowBuilder, WorkflowOutputEvent
from agent_framework.azure import AzureAIAgentClient, AzureOpenAIChatClient
from azure.identity.aio import AzureCliCredential as AsyncAzureCliCredential
from azure.identity import AzureCliCredential
from dotenv import load_dotenv
import os

# Load .env if present (non-fatal).
env_path = Path.cwd() / '.env'
if env_path.exists():
    load_dotenv(env_path)
    print('Loaded .env file')
else:
    print('No .env file found – ensure required environment variables are set.')

Loaded .env file


## Step 2: Create an Azure AI Agent Factory (`AsyncExitStack`)
We encapsulate the async context so multiple agents share a single client and credential.

In [3]:
async def create_azure_ai_agent_factory() -> tuple[Callable[..., Awaitable[Any]], Callable[[], Awaitable[None]]]:
    """Returns (agent_creator, close)
    agent_creator(**kwargs) -> created agent (async context entered)
    close() -> gracefully close all contexts
    """
    stack = AsyncExitStack()
    cred = await stack.enter_async_context(AsyncAzureCliCredential())
    client = await stack.enter_async_context(AzureAIAgentClient(async_credential=cred))

    async def agent_creator(**kwargs: Any) -> Any:
        return await stack.enter_async_context(client.create_agent(**kwargs))

    async def close() -> None:
        await stack.aclose()

    return agent_creator, close

## Step 3–5: Create Agents, Build Workflow, Stream Execution
We create a Writer and Reviewer agent, connect them, and stream token updates (`AgentRunUpdateEvent`).

In [4]:
async def run_streaming_workflow(prompt: str):
    agent_factory, close = await create_azure_ai_agent_factory()
    last_executor_id: str | None = None
    try:
        writer = await agent_factory(
            name="Writer",
            instructions=("You are an excellent content writer. You create new content and edit contents based on feedback."),
        )
        reviewer = await agent_factory(
            name="Reviewer",
            instructions=(
                "You are an excellent content reviewer. Provide actionable, concise feedback to improve the content."
            ),
        )
        workflow = WorkflowBuilder().set_start_executor(writer).add_edge(writer, reviewer).build()
        print('--- Streaming run start ---')
        async for event in workflow.run_stream(prompt):
            if isinstance(event, AgentRunUpdateEvent):
                if event.executor_id != last_executor_id:
                    if last_executor_id is not None:
                        print()
                    print(f"{event.executor_id}: ", end='', flush=True)
                    last_executor_id = event.executor_id
                print(event.data, end='', flush=True)
            elif isinstance(event, WorkflowOutputEvent):
                print("\n===== Final Output =====")
                print(event.data)
        print("\n--- Streaming run complete ---")
    finally:
        await close()

# Example invocation (uncomment to run once credentials & env set)
# await run_streaming_workflow("Create a slogan for a new electric SUV that is affordable and fun to drive.")

In [5]:
# Run the streaming workflow (execute this cell after configuring credentials)
# (You can skip this if you plan to use the widget version below.)
try:
    asyncio.run(run_streaming_workflow("Create a slogan for a new electric SUV that is affordable and fun to drive."))
except RuntimeError:  # Jupyter nested loop fallback
    import nest_asyncio, asyncio as _asyncio
    nest_asyncio.apply()
    _asyncio.get_event_loop().run_until_complete(run_streaming_workflow("Create a slogan for a new electric SUV that is affordable and fun to drive."))

--- Streaming run start ---
Writer: Electrify Your Drive: Affordable Adventure, Pure Fun.
Reviewer: Drive Fun Forward: Affordable Electric Adventure Awaits.
===== Final Output =====
Drive Fun Forward: Affordable Electric Adventure Awaits.

--- Streaming run complete ---


## Optional: Add a Summarizer Agent
This variant adds a third **Summarizer** agent after the Reviewer. It produces a concise final summary and an improved slogan.

In [6]:
async def run_streaming_workflow_with_summarizer(prompt: str):
    """Run a 3-agent streaming workflow: Writer -> Reviewer -> Summarizer.

    The Summarizer produces:
      1. A refined final slogan (prefixed with 'Final Slogan:')
      2. A brief (<=3 bullets) summary of the reviewer's key feedback points.
    """
    agent_factory, close = await create_azure_ai_agent_factory()
    last_executor_id: str | None = None
    try:
        writer = await agent_factory(
            name="Writer",
            instructions=("You are an excellent content writer. You create new content and edit contents based on feedback."),
        )
        reviewer = await agent_factory(
            name="Reviewer",
            instructions=("You are an excellent content reviewer. Provide actionable, concise feedback to improve the content."),
        )
        summarizer = await agent_factory(
            name="Summarizer",
            instructions=("You are an expert summarizer. Given the writer's content and the reviewer's feedback: "
                        "1) Produce an improved single-line final slogan prefixed with 'Final Slogan:'. "
                        "2) Provide at most 3 concise bullet points summarizing key feedback themes. "
                        "Be succinct."),
        )
        workflow = (
            WorkflowBuilder()
            .set_start_executor(writer)
            .add_edge(writer, reviewer)
            .add_edge(reviewer, summarizer)
            .build()
        )
        print('--- Streaming run (with summarizer) start ---')
        async for event in workflow.run_stream(prompt):
            if isinstance(event, AgentRunUpdateEvent):
                if event.executor_id != last_executor_id:
                    if last_executor_id is not None:
                        print()
                    print(f"{event.executor_id}: ", end='', flush=True)
                    last_executor_id = event.executor_id
                print(event.data, end='', flush=True)
            elif isinstance(event, WorkflowOutputEvent):
                print("\n===== Final Output (Summarized) =====")
                print(event.data)
        print("\n--- Streaming run (with summarizer) complete ---")
    finally:
        await close()

  result = tuple_new(cls, iterable)


## Interactive Prompt & Agent Selector
Use the widget below to enter a custom prompt and choose whether to include the Summarizer agent.

In [7]:
# (Optional) Install ipywidgets if not already available
# You can run the following in a separate cell if missing:
# %pip install ipywidgets nest_asyncio
import asyncio
from IPython.display import display

try:
    import ipywidgets as widgets  # type: ignore
except ModuleNotFoundError:
    raise ModuleNotFoundError("ipywidgets is not installed. Install with `%pip install ipywidgets nest_asyncio` and re-run this cell.")

# nest_asyncio lets us reuse the running loop if needed (e.g., some hosted notebook envs)
try:
    import nest_asyncio  # type: ignore
except ModuleNotFoundError:
    nest_asyncio = None

prompt_input = widgets.Textarea(
    value="Create a slogan for a new electric SUV that is affordable and fun to drive.",
    description="Prompt:",
    layout=widgets.Layout(width='100%', height='120px')
)
use_summarizer = widgets.Checkbox(value=True, description="Include Summarizer agent")
run_button = widgets.Button(description="Run Workflow", button_style="primary")
output_area = widgets.Output(layout={'border': '1px solid #ccc'})
status_label = widgets.HTML(value="")


def _ensure_loop_patch():
    # If an event loop is already running (common in Jupyter), patch it for re-entrancy.
    if nest_asyncio:
        try:
            asyncio.get_running_loop()
            nest_asyncio.apply()
        except RuntimeError:
            pass


def _run_clicked(_: widgets.Button):
    output_area.clear_output()
    status_label.value = "<em>Running...</em>"
    prompt = prompt_input.value.strip() or "Hello"

    async def runner():
        try:
            if use_summarizer.value:
                await run_streaming_workflow_with_summarizer(prompt)
            else:
                await run_streaming_workflow(prompt)
            status_label.value = "<span style='color:green'>Done.</span>"
        except Exception as ex:  # noqa: BLE001 broad but user-facing
            status_label.value = f"<span style='color:red'>Error: {type(ex).__name__}: {ex}</span>"

    _ensure_loop_patch()
    try:
        loop = asyncio.get_running_loop()
        loop.create_task(_run_and_capture(runner))
    except RuntimeError:
        # No running loop, safe to run directly
        asyncio.run(runner())


async def _run_and_capture(coro_factory):
    async def _inner():
        with output_area:
            await coro_factory()
    await _inner()


run_button.on_click(_run_clicked)

ui = widgets.VBox([
    prompt_input,
    use_summarizer,
    run_button,
    status_label,
    output_area,
])

display(ui)

VBox(children=(Textarea(value='Create a slogan for a new electric SUV that is affordable and fun to drive.', d…

In [8]:
# One-time install helper (run this cell if ipywidgets isn't installed, then re-run the widget cell)
# %pip install ipywidgets nest_asyncio
# After installation, you may need to restart the kernel in some environments.

## Non-Streaming Variant (Azure OpenAI)
The repository already includes a non‑streaming script (`step2_agents_in_a_workflow.py`).
Below is an adapted version in notebook form using `AzureOpenAIChatClient`. It collects full responses (`AgentRunEvent`) after completion.

In [11]:
from agent_framework import AgentRunEvent

async def run_non_streaming_workflow(prompt: str):
    # Requires AZURE_OPENAI_ENDPOINT & AZURE_OPENAI_CHAT_DEPLOYMENT_NAME
    endpoint = os.environ.get('AZURE_OPENAI_ENDPOINT')
    deployment = os.environ.get('AZURE_OPENAI_CHAT_DEPLOYMENT_NAME')
    if not endpoint or not deployment:
        raise EnvironmentError('Missing required Azure OpenAI environment variables.')
    chat_client = AzureOpenAIChatClient(
        endpoint=endpoint,
        deployment_name=deployment,
        credential=AzureCliCredential()
    )
    writer = chat_client.create_agent(
        instructions='You are an excellent content writer. You create new content and edit contents based on the feedback.',
        name='writer',
    )
    reviewer = chat_client.create_agent(
        instructions=(
            'You are an excellent content reviewer. Provide actionable feedback to the writer about the provided content. Provide the feedback concisely.'
        ),
        name='reviewer',
    )
    workflow = WorkflowBuilder().set_start_executor(writer).add_edge(writer, reviewer).build()
    events = await workflow.run(prompt)
    for evt in events:
        if isinstance(evt, AgentRunEvent):
            print(f"{evt.executor_id}: {evt.data}")
    print('='*60)
    print('Workflow Outputs:', events.get_outputs())
    print('Final State:', events.get_final_state())

# Example invocation (uncomment to run)
# asyncio.run(run_non_streaming_workflow('Create a slogan for a new electric SUV that is affordable and fun to drive.'))

In [12]:
# Run the non‑streaming workflow (execute after setting Azure OpenAI variables)
try:
    asyncio.run(run_non_streaming_workflow('Create a slogan for a new electric SUV that is affordable and fun to drive.'))
except EnvironmentError as e:
    print('Skipping non-streaming run –', e)
except RuntimeError:
    import nest_asyncio, asyncio as _asyncio
    nest_asyncio.apply()
    try:
        _asyncio.get_event_loop().run_until_complete(run_non_streaming_workflow('Create a slogan for a new electric SUV that is affordable and fun to drive.'))
    except EnvironmentError as e:
        print('Skipping non-streaming run –', e)

writer: Drive Fun, Go Far—Electrify Your Journey, Affordably.
reviewer: Good slogan! Consider making it snappier for memorability. Try focusing on a direct call to action and the vehicle’s key strengths. For example: "Electrify Your Drive—Affordable Adventure Awaits."
Workflow Outputs: ['Good slogan! Consider making it snappier for memorability. Try focusing on a direct call to action and the vehicle’s key strengths. For example: "Electrify Your Drive—Affordable Adventure Awaits."']
Final State: WorkflowRunState.IDLE


## How It Works & Key Concepts
| Concept | Description |
|---------|-------------|
| Sequential Workflow | `WorkflowBuilder().set_start_executor(A).add_edge(A,B)` defines a linear pipeline. |
| Agents as Executors | Agents are wrapped so their responses become workflow events automatically. |
| Streaming Events | `AgentRunUpdateEvent` emits token chunks for responsive UX. |
| Non-Streaming Events | `AgentRunEvent` contains the full response after the agent finishes. |
| Final Output | `WorkflowOutputEvent` (streaming) or collected outputs from the event list (non-streaming). |
| Resource Mgmt | `AsyncExitStack` centralizes setup/teardown of clients & credentials. |

### Choosing Streaming vs Non-Streaming
Use streaming when you want incremental UI updates; use non-streaming for simpler batch style execution or logging full results at once.

### Next Ideas
- Add a third agent (e.g., *Summarizer*) to compress Reviewer feedback.
- Introduce custom executors that transform text (e.g., word count, sentiment).
- Integrate a human-in-the-loop step (see the separate request/response tutorial).