# Workflows in Agent Framework

### Understanding Workflows in Agent Framework

- **AI Agent**: An AI agent is powered by a large language model (LLM) and can access various tools to accomplish tasks. Its behavior is dynamic - the steps it takes are determined by the LLM based on the context of the conversation and the instructions provided. Agents are great for handling open-ended tasks where flexibility and reasoning are key.
- **Workflow**: A workflow is a predefined sequence of operations designed to handle more structured processes. It can include AI agents as components, along with human-in-the-loop steps and integrations with external systems. Unlike an agent’s dynamic decision-making, a workflow’s execution path is explicitly defined, giving you greater control and predictability. Workflows are ideal for orchestrating complex business processes that require coordination across multiple agents and systems.

### Key Features
- Type Safety: Strong typing ensures messages flow correctly between components, with comprehensive validation that prevents runtime errors.
- Flexible Control Flow: Graph-based architecture allows for intuitive modeling of complex workflows with `executors` and `edges`. Conditional routing, parallel processing, and dynamic execution paths are all supported.
- External Integration: Built-in request/response patterns for seamless integration with external APIs, and human-in-the-loop scenarios.
- Checkpointing: Save workflow states via checkpoints, enabling recovery and resumption of long-running processes on server sides.
- **Multi-Agent Orchestration**: Built-in patterns for coordinating multiple AI agents, including sequential, concurrent, hand-off, and magentic.

### Core Concepts
1. **Executors**: represent individual processing units within a workflow. They can be AI agents or custom logic components. They receive input messages, perform specific tasks, and produce output messages.
2. **Edges**: define the connections between executors, determining the flow of messages. They can include conditions to control routing based on message contents.
3. **Workflows**: are directed graphs composed of executors and edges. They define the overall process, starting from an initial executor and proceeding through various paths based on conditions and logic defined in the edges.

### Basic Executor Structure
Executors are the fundamental building blocks that process messages in a workflow. They are autonomous processing units that receive typed messages, perform operations, and can produce output messages or events.

Executors inherit from the `Executor` base class. Each executor has a unique identifier and can handle specific message types using methods decorated with the `@handler` decorator. 
Handlers must have the proper type annotations to specify the type of messages they can process.


In [None]:
from agent_framework import (
    Executor,
    WorkflowContext,
    handler,
    executor
)

# Executors inherit from the Executor base class. Each executor represents a node in the workflow graph.
# --- Approach 1: Class-based Executor ---
class UpperCase(Executor):

    @handler
    # Handler signature contract:
    # - First parameter is the typed input to this node (here: text: str)
    # - Second parameter is a WorkflowContext[T_Out], where T_Out is the type of data the node will emit via ctx.send_message  
    async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:    
        """Within a handler you typically:
        - Compute a result
        - Forward that result to downstream node(s) using ctx.send_message(result)"""
        await ctx.send_message(text.upper())    # Send the transformed message to the next executor in the workflow

# --- Approach 2: Function-based Executor --- 

# For simple steps you can skip subclassing and define an async function with the
# same signature pattern (typed input + WorkflowContext[T_Out, T_W_Out]) and decorate it with
# @executor. This creates a fully functional node that can be wired into a flow.
@executor(id="upper_case_executor")
async def upper_case(text: str, ctx: WorkflowContext[str]) -> None:
    """Convert the input to uppercase and forward it to the next node.

    Note: The WorkflowContext is parameterized with the type this handler will
    emit. Here WorkflowContext[str] means downstream nodes should expect str.
    """
    await ctx.send_message(text.upper())

### The `WorkflowContext` Object
The `WorkflowContext` object provides methods for the handler to interact with the workflow during execution. The `WorkflowContext` is parameterized with the type of messages the handler will emit and the type of outputs it can yield.

The most commonly used method is `send_message`, which allows the handler to send messages to connected executors.
A handler can use yield_output to produce outputs that will be considered as workflow outputs and be returned/streamed to the caller as an output event.
If a handler neither sends messages nor yields outputs, no type parameter is needed for `WorkflowContext`


In [None]:
from agent_framework import WorkflowContext
from typing_extensions import Never

class SomeHandler(Executor):

    @handler
    async def send_message_example(self, message: str, ctx: WorkflowContext[str]) -> None:
        """
        Forward an intermediate message to the next executor.
        WorkflowContext[str] means downstream expects a string.
        """
        await ctx.send_message("Hello, World!")  # Sends to next step

    @handler
    async def yield_output_example(self, message: str, ctx: WorkflowContext[Never, str]) -> None:
        """
        Yield a final output without sending downstream.
        WorkflowContext[Never, str] means no intermediate messages, only final output of type str.
        """
        await ctx.yield_output("Hello, World!")  # Ends workflow

    @handler
    async def no_output_example(self, message: str, ctx: WorkflowContext) -> None:
        """
        Perform work without sending or yielding anything.
        WorkflowContext without type parameters means no strict typing.
        """
        print("Doing some work...")  # Side effect only

### Edge Patterns
Edges define how messages flow between executors with optional conditions. They represent the connections in the workflow graph and determine the data flow paths.

The framework supports several edge patterns:

1. **Direct Edges**: Simple one-to-one connections between executors
2. **Conditional Edges**: Edges with conditions that determine when messages should flow
3. **Fan-out Edges**: One executor sending messages to multiple targets
4. **Fan-in Edges**: Multiple executors sending messages to a single target

Explore implementation of all edge patterns in detail [here](https://learn.microsoft.com/en-us/agent-framework/user-guide/workflows/core-concepts/edges?pivots=programming-language-python.) 

In [None]:
from agent_framework import WorkflowBuilder, WorkflowContext, executor

# Define three executors
@executor(id="upper_case_executor")
async def upper_case(text: str, ctx: WorkflowContext[str]) -> None:
    await ctx.send_message(text.upper())

@executor(id="reverse_text_executor")
async def reverse_text(text: str, ctx: WorkflowContext[str]) -> None:
    await ctx.yield_output(text[::-1])

@executor(id="append_exclamation_executor")
async def append_exclamation(text: str, ctx: WorkflowContext[str]) -> None:
    await ctx.yield_output(text + "!!!")

# Build a simple workflow with edges
workflow = (
    WorkflowBuilder()
    # Direct edge: always forward messages to reverse_text
    .add_edge(upper_case, reverse_text)
    # Conditional edge: forward to append_exclamation only if text length > 5
    .add_edge(upper_case, append_exclamation, condition=lambda msg: len(msg) > 5)
    .set_start_executor(upper_case)
    .build()
)

### Workflows
A **Workflow** ties everything together and manages execution. It's the orchestrator that coordinates executor execution, message routing, and event streaming.

Workflows are constructed using the `WorkflowBuilder` class, which provides a fluent API for defining the workflow structure. It is a general-purpose builder for creating workflows with any orchestration pattern.

#### Example - Create a Simple Sequential Workflow
Now let's put it all together and create our first workflow with two executors:

In [None]:
from agent_framework import (
    Executor,
    WorkflowBuilder,
    WorkflowContext,
    executor,
    handler,
)
from typing_extensions import Never

# Subclassing Executor lets you define a named node with lifecycle hooks if needed.
class UpperCase(Executor):
    def __init__(self, id: str):
        super().__init__(id=id)

    # The work itself is implemented in an async method decorated with @handler.
    @handler
    async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:
        """Convert the input to uppercase and forward it to the next node.

        Note: The WorkflowContext is parameterized with the type this handler will
        emit. Here WorkflowContext[str] means downstream nodes should expect str.
        """
        result = text.upper()

        # Send the result to the next executor in the workflow.
        await ctx.send_message(result)


# For simple steps you can skip subclassing and define an async function with the
# same signature pattern (typed input + WorkflowContext[T_Out, T_W_Out]) and decorate it with @executor.
@executor(id="reverse_text_executor")
async def reverse_text(text: str, ctx: WorkflowContext[Never, str]) -> None:
    """Reverse the input string and yield the workflow output.

    This node yields the final output using ctx.yield_output(result).
    The workflow will complete when it becomes idle (no more work to do).

    """
    result = text[::-1]

    # Yield the output - the workflow will complete when idle
    await ctx.yield_output(result)

async def main():
    """Build and run a simple 2-step workflow using the fluent builder API."""

    upper_case = UpperCase(id="upper_case_executor")

    # Build the workflow using a fluent pattern:
    # 1) add_edge(from_node, to_node) defines a directed edge upper_case -> reverse_text
    # 2) set_start_executor(node) declares the entry point
    # 3) build() finalizes and returns an immutable Workflow object
    workflow = (WorkflowBuilder()
                .add_edge(upper_case, reverse_text)
                .set_start_executor(upper_case)
                .build()
    )

    # Run the workflow by sending the initial message to the start node.
    # The run(...) call returns an event collection; its get_outputs() method
    # retrieves the outputs yielded by any terminal nodes.
    events = await workflow.run("hello world")
    print(events.get_outputs())

    # Summarize the final run state (e.g., COMPLETED)
    print("Final state:", events.get_final_state())

await main()

### Workflow Events
In Agent Framework, workflows are not just about passing data between executors - they also emit events during execution. 

These events allow developers to:

- Observe workflow progress (e.g., when an executor starts or completes).
- React to intermediate states (e.g., log outputs, trigger side effects).
- Handle errors gracefully (e.g., capture exceptions without breaking the workflow).

There are built-in events that provide observability into the workflow execution:

```python
# Workflow lifecycle events
WorkflowStartedEvent    # Workflow execution begins
WorkflowOutputEvent     # Fired when the workflow produces a final output
WorkflowErrorEvent      # Workflow encounters an error

# Executor events
ExecutorInvokeEvent     # Fired when an executor is invoked
ExecutorCompleteEvent   # Fired when an executor finishes

# Request events
RequestInfoEvent        # A request is issued

```

In addition to built-in events, you can define and emit your own custom events during workflow execution. This is useful for fine-grained monitoring, custom logging or metrics for specific steps and external action triggers. Learn more about custom events in the [documentation](https://learn.microsoft.com/en-us/agent-framework/user-guide/workflows/core-concepts/events?pivots=programming-language-python).


In [None]:
from typing_extensions import Never
from agent_framework import WorkflowBuilder, WorkflowContext, executor
from agent_framework import (
    WorkflowOutputEvent,
)

@executor(id="upper_case_executor")
async def to_upper_case(text: str, ctx: WorkflowContext[str]) -> None:
    """Transform the input to uppercase and forward it to the next step."""
    result = text.upper()

    # Send the intermediate result to the next executor
    await ctx.send_message(result)

@executor(id="reverse_text_executor")
async def reverse_text(text: str, ctx: WorkflowContext[Never, str]) -> None:
    """Reverse the input and yield the workflow output."""
    result = text[::-1]

    # Yield the final output for this workflow run
    await ctx.yield_output(result)

# TODO: Build the workflow

async def main():
    
    # Run the workflow, iterate over events and stream them
    async for event in workflow.run_stream("hello world"):
        print(f"Event: {event}")

        if isinstance(event, WorkflowOutputEvent):
            print(f"Workflow completed with result: {event.data}")

await main()


<details>
  <summary>See the solution</summary>
  
  ```python

from typing_extensions import Never
from agent_framework import WorkflowBuilder, WorkflowContext, executor
from agent_framework import (
    WorkflowOutputEvent,
)

@executor(id="upper_case_executor")
async def to_upper_case(text: str, ctx: WorkflowContext[str]) -> None:
    """Transform the input to uppercase and forward it to the next step."""
    result = text.upper()

    # Send the intermediate result to the next executor
    await ctx.send_message(result)

@executor(id="reverse_text_executor")
async def reverse_text(text: str, ctx: WorkflowContext[Never, str]) -> None:
    """Reverse the input and yield the workflow output."""
    result = text[::-1]

    # Yield the final output for this workflow run
    await ctx.yield_output(result)

workflow = (
    WorkflowBuilder()
    .add_edge(to_upper_case, reverse_text)
    .set_start_executor(to_upper_case)
    .build()
)

async def main():
    # Run the workflow, iterate over events and stream them
    async for event in workflow.run_stream("hello world"):
        print(f"Event: {event}")

        if isinstance(event, WorkflowOutputEvent):
            print(f"Workflow completed with result: {event.data}")

await main()

```
</details>

### Exercise - Concurrent Workflows
Before we jump into using agents in a workflow, let's reinforce our understanding of workflow orchestration concepts and create a concurrent workflow. 
You'll learn to implement fan-out and fan-in patterns that enable parallel processing, allowing multiple executors or agents to work simultaneously and then aggregate their results.

You'll create a workflow that:

1. Takes a list of numbers as input
2. Distributes the list to two parallel executors (one calculating average, one calculating sum)
3. Aggregates the different result types (float and int) into a final output
4. Demonstrates how the framework handles different result types from concurrent executors

In [None]:
import random
from agent_framework import Executor, WorkflowBuilder, WorkflowContext, WorkflowOutputEvent, handler
from typing_extensions import Never


# Define your Executors
class Dispatcher(Executor):
    """Dispatch the input list to other executors."""
    @handler
    async def handle(self, numbers: list[int], ctx: WorkflowContext[list[int]]):
        # TODO: Validate input and send message to downstream executors
        pass
    

class Average(Executor):
    """Calculate the average of a list of integers."""
    @handler
    async def handle(self, numbers: list[int], ctx: WorkflowContext[float]):
        # TODO: Compute average and send message
        pass


class Sum(Executor):
    """Calculate the sum of a list of integers."""
    @handler
    async def handle(self, numbers: list[int], ctx: WorkflowContext[int]):
        # TODO: Compute sum and send message
        pass


class Aggregator(Executor):
    """Aggregate the results from the different tasks and yield the final output."""
    @handler
    async def handle(self, results: list[int | float], ctx: WorkflowContext[Never, list[int | float]]):
        """Receive results from upstream executors and yield final output."""
        # TODO: Yield aggregated results
        pass


# Build and Run the Workflow
async def main() -> None:
    # Generate random input
    numbers = [random.randint(1, 100) for _ in range(10)]
    print(f"Input numbers: {numbers}")
    print("=" * 60)

    # 1) Create the executors
    # TODO: Instantiate Dispatcher, Average, Sum, Aggregator

    # 2) Build a simple fan-out and fan-in workflow
    # TODO: Use WorkflowBuilder to connect executors

    # 3) Run the workflow and collect output
    # TODO: Stream events, detect WorkflowOutputEvent, and print final results

# Execute main
await main()


<details>
  <summary>See the solution</summary>
  
  ```python

import random
from agent_framework import Executor, WorkflowBuilder, WorkflowContext, WorkflowOutputEvent, handler
from typing_extensions import Never

class Dispatcher(Executor):
    """Dispatch the input list to other executors."""
    @handler
    async def handle(self, numbers: list[int], ctx: WorkflowContext[list[int]]):
        if not numbers:
            raise RuntimeError("Input must be a valid list of integers.")

        await ctx.send_message(numbers)

class Average(Executor):
    """Calculate the average of a list of integers."""

    @handler
    async def handle(self, numbers: list[int], ctx: WorkflowContext[float]):
        average: float = sum(numbers) / len(numbers)
        await ctx.send_message(average)


class Sum(Executor):
    """Calculate the sum of a list of integers."""

    @handler
    async def handle(self, numbers: list[int], ctx: WorkflowContext[int]):
        total: int = sum(numbers)
        await ctx.send_message(total)


class Aggregator(Executor):
    """Aggregate the results from the different tasks and yield the final output."""

    @handler
    async def handle(self, results: list[int | float], ctx: WorkflowContext[Never, list[int | float]]):
        """Receive the results from the source executors."""
        await ctx.yield_output(results)

async def main() -> None:

    numbers = [random.randint(1, 100) for _ in range(10)]
    print(f"Input numbers: {numbers}")
    print("=" * 60)


    dispatcher = Dispatcher(id="dispatcher")
    average = Average(id="average")
    summation = Sum(id="summation")
    aggregator = Aggregator(id="aggregator")


    workflow = (
        WorkflowBuilder()
        .set_start_executor(dispatcher)
        .add_fan_out_edges(dispatcher, [average, summation])
        .add_fan_in_edges([average, summation], aggregator)
        .build()
    )

    output: list[int | float] | None = None
    async for event in workflow.run_stream(numbers):
        print(f"{event}") 

        if isinstance(event, WorkflowOutputEvent):
            output = event.data

    if output is not None:
        print("\nFinal Aggregated Output:")
        print(f"Average: {output[0]} | Sum: {output[1]}")
        print("=" * 60)


await main()

```
</details>

### Agents in a Workflow

To add intelligence to your workflows, you can use AI agents as part of your workflow execution. AI agents can be easily integrated into workflows, allowing you to create complex, intelligent solutions that were previously difficult to achieve.

#### What happens when you add an agent to a workflow

When you add an agent to a workflow in the Agent Framework (see example below), you’re essentially wrapping the agent inside a workflow executor so it can participate in the workflow orchestration. The executor is the one handling the communication of the agent with other workflow parts.
1. Whenever the executor receives a single or a list of chat messages, it will trigger the agent to respond with the response type of `AgentExecutorResponse` object. The class contains the following information about agent's response:
    - `executor_id` - the ID of the executor that produced the response
    - `agent_run_response` - the full response from the agent
    - `full_conversation` - the full conversation history up to this point
2. There are two possible event types related to agent responses that can be emitted:
    - `AgentRunUpdateEvent` - chunks of agent's responses as they are generated (streaming)
    - `AgentRunEvent` - full response from the agent (non-streaming)

By default, agents are wrapped in executors that run in streaming mode. You can customize this behavior by creating a custom executor. Learn more on to learn how to [create a Custom Agent Executor](https://learn.microsoft.com/en-us/agent-framework/user-guide/workflows/using-agents?pivots=programming-language-python).



#### Example: Using Built-In Agent Executor
In this step, the agents we add to the workflow will be instantiated using the AzureOpenAIChatClient class that we already used earlier in Section 01.1:

In [None]:
import os
from dotenv import load_dotenv
from agent_framework.azure import AzureOpenAIChatClient

load_dotenv()
endpoint = os.getenv("AZURE_OPENAI_ENDPOINT")
api_key = os.getenv("AZURE_OPENAI_API_KEY")
api_version = os.getenv("AZURE_OPENAI_API_VERSION")
deployment = os.getenv("AZURE_OPENAI_CHAT_DEPLOYMENT_NAME")

chat_client=AzureOpenAIChatClient(
        endpoint=endpoint,
        api_key=api_key,
        api_version=api_version,
        deployment_name=deployment,
    )

When you add an agent to a workflow:

- It behaves like an executor but runs with its own instructions and context.
- You can chain multiple agents together to create collaborative flows (e.g., Writer → Reviewer).
- The workflow orchestrates their interaction, passing messages and collecting outputs.

In the following example, two agents are created:
-  `writer_agent` generates and edits content and `reviewer_agent` reviews content and provides feedback
- The workflow setup uses `WorkflowBuilder` to set the start executor as the writer agent, add an edge from the writer to the reviewer and builds the workflow graph.

In [None]:
from agent_framework import AgentRunEvent, WorkflowBuilder

async def main():
    """Build and run a simple two node agent workflow: Writer then Reviewer."""

    # TODO: Create a Writer agent with instructions and a name

    # TODO: Create a Reviewer agent with instructions and a name

    # TODO: Build the workflow

    # TODO: Run the workflow with the user's initial message. For clarity, use run (non streaming) and print the terminal event.

    # Print detailed agent run events
    for event in events:
        if isinstance(event, AgentRunEvent):
            print(f"\nExecutor: {event.executor_id}")
            print(f"Output:\n{event.data}")
            print("-" * 40)

    # Print final workflow state
    print(f"Workflow State: {events.get_final_state()}")
    print("=" * 60)


await main()


<details>
  <summary>See the solution</summary>
  
  ```python

from agent_framework import AgentRunEvent, WorkflowBuilder

async def main():
    """Build and run a simple two node agent workflow: Writer then Reviewer."""

    
    writer_agent = chat_client.create_agent(
        instructions=(
            "You are an excellent content writer. You create new content and edit contents based on the feedback."
        ),
        name="writer",
    )

    reviewer_agent = chat_client.create_agent(
        instructions=(
            "You are an excellent content reviewer."
            "Provide actionable feedback to the writer about the provided content."
            "Provide the feedback in the most concise manner possible."
        ),
        name="reviewer",
    )

    workflow = (WorkflowBuilder()
                .set_start_executor(writer_agent)
                .add_edge(writer_agent, reviewer_agent)
                .build())

    # Run the workflow with the user's initial message. For clarity, use run (non streaming) and print the terminal event.
    events = await workflow.run("Create a slogan for a new electric SUV that is affordable and fun to drive.")

    # Print detailed agent run events
    for event in events:
        if isinstance(event, AgentRunEvent):
            print(f"\nExecutor: {event.executor_id}")
            print(f"Output:\n{event.data}")
            print("-" * 40)

    # Print final workflow state
    print(f"Workflow State: {events.get_final_state()}")
    print("=" * 60)

await main()
```
</details>

### Exercise - Create a Simple Workflow with Agents
In this exercise, you will a workflow by:

- Adding another agent to the process.
- Creating a custom scenario that reflects a real-world business process or planning task.
- Feel free to experiment with other concepts we have introduced so far - different edge patterns or custom executors. Check the [docs](https://learn.microsoft.com/en-us/agent-framework/user-guide/workflows/using-agents?pivots=programming-language-python) for additional guidance.

In [None]:
# EXERCISE: Build Your Custom Workflow

# 1. Define your scenario:
#    Example: "Marketing campaign planning" or "Business case validation"



# 2. Define your agents:



# 3. (Optional) Define a custom executor:
#    - Add logic for validation, filtering, counting, or transformation.
#    - Use @handler and WorkflowContext to send messages.



# 4. Build the workflow using WorkflowBuilder:



# 5. Run the workflow:



# 6. Print agent outputs and final workflow state:



<details>
  <summary>See the solution</summary>
  
  ```python
from agent_framework import AgentRunEvent, WorkflowBuilder

async def main():
    """Build and run a three-node agent workflow: Analyst → Finance → Approval."""

    analyst_agent = chat_client.create_agent(
        instructions=(
            "You are a business analyst. Draft a short business case based on the provided idea. "
        ),
        name="analyst",
    )

    finance_agent = chat_client.create_agent(
        instructions=(
            "You are a finance expert. Review the business case and highlight any financial risks or constraints."
        ),
        name="finance",
    )

    approval_agent = chat_client.create_agent(
        instructions=(
            "You are a senior manager. Decide whether to approve the business case based on strategic alignment. "
            "Respond with 'Approved' or 'Needs Revision' and provide reasoning."
        ),
        name="approval",
    )


    workflow = (
        WorkflowBuilder()
        .set_start_executor(analyst_agent)
        .add_edge(analyst_agent, finance_agent)
        .add_edge(finance_agent, approval_agent)
        .build()
    )


    events = await workflow.run("Develop a business case for implementing an AI-driven customer support service for handling complaints.")


    for event in events:
        if isinstance(event, AgentRunEvent):
            print(f"\nExecutor: {event.executor_id}")
            print(f"Output:\n{event.data}")
            print("-" * 40)

    print(f"Workflow State: {events.get_final_state()}")
    print("=" * 60)


await main()
```
</details>

### Agent Framework Workflows Orchestrations
Traditional single-agent systems are limited in their ability to handle complex, multi-faceted tasks. By orchestrating multiple agents, each with specialized skills or roles, we can create systems that are more robust, adaptive, and capable of solving real-world problems collaboratively. 

Orchestrations are pre-built workflow patterns that allow developers to quickly create complex workflows by simply plugging in their own AI agents. Instead of manually wiring every edge, you can use these patterns to define how agents interact.

Agent Framework provides four orchestration patterns:

1. Sequential - Agents execute one after another in a defined order, ideal for linear processing
2. Concurrent - Multiple agents run in parallel on a single task
3. Handoff - Control is passed from one agent to another based on context or role.
4. Magentic - Agents dynamically pull tasks based on the evolving context, task progress, and agent capabilities.

Refer to the [Agent Framework documentation](https://learn.microsoft.com/en-us/agent-framework/user-guide/workflows/orchestrations/overview) to learn more about these orchestration types and check out the [Agent Framework repo](https://github.com/microsoft/agent-framework/tree/main/python/samples/getting_started/workflows/orchestration) to see more advanced orchestration types in action.

In this section, we’ll introduce Sequential and Concurrent orchestrations:

### Example of Sequential Orchestration
In sequential orchestration, agents are organized in a pipeline. Each agent processes the task in turn, passing its output to the next agent in the sequence. This is ideal for workflows where each step builds upon the previous one, such as document review, data processing pipelines, or multi-stage reasoning.

The `SequentialBuilder` class creates a pipeline where agents process tasks in order. Each agent sees the full conversation history and adds their response.

In [None]:
from typing import Any
from agent_framework import SequentialBuilder, ChatMessage, Role

# 1) Create agents from chat_client:
meal_planner = chat_client.create_agent(
    instructions=(
        "You are a meal planner. Suggest a simple, healthy weekly meal plan for breakfast, lunch, and dinner."
    ),
    name="meal_planner",
)

budget_estimator = chat_client.create_agent(
    instructions=(
        "You are a budget estimator. Calculate an approximate total cost for the 7-day meal plan provided by the previous assistant."
    ),
    name="budget_estimator",
)

# 2) Build sequential workflow: meal_planner -> budget_estimator
workflow = SequentialBuilder().participants([meal_planner, budget_estimator]).build()

# 3) Run and collect messages
messages: list[ChatMessage] = []
async for event in workflow.run_stream("Create a short weekly meal plan for one person."):
    if hasattr(event, "data") and isinstance(event.data, list):
        messages.extend(event.data)

# 4) Print final conversation
print("===== Final Conversation =====")
for i, msg in enumerate(messages, start=1):
    name = msg.author_name or ("assistant" if msg.role == Role.ASSISTANT else "user")
    print(f"{'-' * 60}\n{i:02d} [{name}]\n{msg.text}")

#### Mixing Agents with Custom Executors
Sequential orchestration supports mixing agents with custom executors for specialized processing. 
Executors allow you to:
- Implement custom logic for tasks that don’t need an LLM.
- Ensure predictable behavior for compliance, validation, or data transformation.
- Reduce cost and latency by handling simple operations without invoking a model.

In [None]:
from agent_framework import Executor, WorkflowContext, handler
from agent_framework import ChatMessage, Role, SequentialBuilder

# Simple custom executor to check compliance
class ComplianceChecker(Executor):
    """Checks if mandatory business terms are present in the contract clause."""

    @handler
    async def validate(
        self,
        messages: list[ChatMessage],
        ctx: WorkflowContext[list[ChatMessage]]
    ) -> None:
        clause = messages[-1].text if messages else ""
        required_terms = ["payment", "delivery"]
        missing = [term for term in required_terms if term.lower() not in clause.lower()]

        if missing:
            compliance_note = ChatMessage(
                role=Role.ASSISTANT,
                text=f"Compliance Warning: Missing terms -> {', '.join(missing)}"
            )
        else:
            compliance_note = ChatMessage(
                role=Role.ASSISTANT,
                text="Compliance Check Passed"
            )

        await ctx.send_message(messages + [compliance_note])

contract_writer = chat_client.create_agent(
    instructions="Draft a short contract clause for a business agreement based on the user's request.",
    name="contract_writer",
)

legal_reviewer = chat_client.create_agent(
    instructions="Review the clause for clarity and professional tone.",
    name="legal_reviewer",
)

# Workflow - Writer -> Compliance Checker -> Reviewer ---
compliance_executor = ComplianceChecker(id="compliance_checker")
workflow = SequentialBuilder().participants([contract_writer, compliance_executor, legal_reviewer]).build()

messages: list[ChatMessage] = []
async for event in workflow.run_stream("Create a clause about timely delivery and payment terms for a supplier contract."):
    if hasattr(event, "data") and isinstance(event.data, list):
        messages.extend(event.data)


print("===== Final Conversation =====")
for i, msg in enumerate(messages, start=1):
    name = msg.author_name or ("assistant" if msg.role == Role.ASSISTANT else "user")
    print(f"{'-' * 60}\n{i:02d} [{name}]\n{msg.text}")


### Concurrent Workflow Orchestration
Concurrent orchestration enables multiple agents to work on the same task in parallel. Each agent processes the input independently, and their results are collected and aggregated. This approach is well-suited for scenarios where diverse perspectives or solutions are valuable. 
The `ConcurrentBuilder` class allows you to construct a workflow that runs multiple agents in parallel.

- You pass a list of agents as participants.
- The builder automatically orchestrates their execution concurrently.
- Results are returned as a collection of outputs from all agents.

The following example demonstrates how to:

1. Build a concurrent workflow orchestration using `ConcurrentBuilder`
2. Fan-out to multiple agents, fan-in aggregation of final ChatMessages.
3. Expose the entire workflow as a reusable agent via `workflow.as_agent(...)`, allowing downstream workflows or coordinators to treat the orchestration as a single agent entry point.
4. Handle workflow completion when all parallel tasks finish.

In [None]:
from agent_framework import ConcurrentBuilder

async def main() -> None:
    # 1) Create three domain agents using AzureOpenAIChatClient

    researcher = chat_client.create_agent(
        instructions=(
            "You're an expert market and product researcher. Given a prompt, provide concise, factual insights,"
            " opportunities, and risks."
        ),
        name="researcher",
    )

    marketer = chat_client.create_agent(
        instructions=(
            "You're a creative marketing strategist. Craft compelling value propositions and target messaging"
            " aligned to the prompt."
        ),
        name="marketer",
    )

    legal = chat_client.create_agent(
        instructions=(
            "You're a cautious legal/compliance reviewer. Highlight constraints, disclaimers, and policy concerns"
            " based on the prompt."
        ),
        name="legal",
    )

    # 2) Build a concurrent workflow
    workflow = ConcurrentBuilder().participants([researcher, marketer, legal]).build()

    # 3) Expose the concurrent workflow as an agent for easy reuse
    agent = workflow.as_agent(name="ConcurrentWorkflowAgent")
    prompt = "We are launching a new budget-friendly electric bike for urban commuters."
    agent_response = await agent.run(prompt)

    
    # Add user input as first message
    all_messages = [ChatMessage(role=Role.USER, text=prompt)] + agent_response.messages

    for i, msg in enumerate(all_messages, start=1):
        name = msg.author_name or ("assistant" if msg.role == Role.ASSISTANT else "user")
        print(f"{'-' * 60}\n\n{i:02d} [{name}]:\n{msg.text}")


await main()