In [16]:
import asyncio
from typing_extensions import Never
from agent_framework import WorkflowBuilder, WorkflowContext, WorkflowOutputEvent, executor


from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential
from pydantic import BaseModel

In [2]:
@executor(id="upper_case_executor")
async def to_upper_case(text: str, ctx: WorkflowContext[str]) -> None:
    """Transform the input to uppercase and forward it to the next step."""
    result = text.upper()

    # Send the intermediate result to the next executor
    await ctx.send_message(result)

In [3]:
@executor(id="reverse_text_executor")
async def reverse_text(text: str, ctx: WorkflowContext[Never, str]) -> None:
    """Reverse the input and yield the workflow output."""
    result = text[::-1]

    # Yield the final output for this workflow run
    await ctx.yield_output(result)

In [4]:
workflow = (
    WorkflowBuilder()
    .add_edge(to_upper_case, reverse_text)
    .set_start_executor(to_upper_case)
    .build()
)

In [5]:
async def main():
    # Run the workflow and stream events
    async for event in workflow.run_stream("hello world"):
        print(f"Event: {event}")
        if isinstance(event, WorkflowOutputEvent):
            print(f"Workflow completed with result: {event.data}")

if __name__ == "__main__":
    await main()

Event: WorkflowStartedEvent(origin=WorkflowEventSource.FRAMEWORK, data=None)
Event: WorkflowStatusEvent(state=WorkflowRunState.IN_PROGRESS, data=None, origin=WorkflowEventSource.FRAMEWORK)
Event: ExecutorInvokedEvent(executor_id=upper_case_executor, data=None)
Event: ExecutorCompletedEvent(executor_id=upper_case_executor, data=None)
Event: ExecutorInvokedEvent(executor_id=reverse_text_executor, data=None)
Event: WorkflowOutputEvent(data=DLROW OLLEH, source_executor_id=reverse_text_executor)
Workflow completed with result: DLROW OLLEH
Event: ExecutorCompletedEvent(executor_id=reverse_text_executor, data=None)
Event: WorkflowStatusEvent(state=WorkflowRunState.IDLE, data=None, origin=WorkflowEventSource.FRAMEWORK)


In [6]:
from agent_framework import WorkflowBuilder, WorkflowViz

viz = WorkflowViz(workflow)

In [7]:
# Generate Mermaid flowchart
mermaid_content = viz.to_mermaid()
print("Mermaid flowchart:")
print(mermaid_content)

# Example output:
# flowchart TD
#   dispatcher["dispatcher (Start)"];
#   researcher["researcher"];
#   marketer["marketer"];
#   legal["legal"];
#   aggregator["aggregator"];
#   dispatcher --> researcher;
#   dispatcher --> marketer;
#   dispatcher --> legal;
#   researcher --> aggregator;
#   marketer --> aggregator;
#   legal --> aggregator;

Mermaid flowchart:
flowchart TD
  upper_case_executor["upper_case_executor (Start)"];
  reverse_text_executor["reverse_text_executor"];
  upper_case_executor --> reverse_text_executor;


In [8]:
# Generate DOT diagram
dot_content = viz.to_digraph()
print("DOT diagram:")
print(dot_content)


DOT diagram:
digraph Workflow {
  rankdir=TD;
  node [shape=box, style=filled, fillcolor=lightblue];
  edge [color=black, arrowhead=vee];

  "upper_case_executor" [fillcolor=lightgreen, label="upper_case_executor\n(Start)"];
  "reverse_text_executor" [label="reverse_text_executor"];
  "upper_case_executor" -> "reverse_text_executor";
}


In [12]:

import graphviz


In [13]:
try:
    # Export as SVG (vector format, recommended)
    svg_file = viz.export(format="svg")
    print(f"SVG exported to: {svg_file}")

    # Export as PNG (raster format)
    png_file = viz.export(format="png")
    print(f"PNG exported to: {png_file}")

    # Export as PDF (vector format)
    pdf_file = viz.export(format="pdf")
    print(f"PDF exported to: {pdf_file}")

    # Export raw DOT file
    dot_file = viz.export(format="dot")
    print(f"DOT file exported to: {dot_file}")

except ImportError:
    print("Install 'viz' extra and GraphViz for image export:")
    print("pip install agent-framework[viz]")
    print("Also install GraphViz binaries for your platform")    # system python / virtualenv
    

SVG exported to: C:\Users\amartays\AppData\Local\Temp\tmphlmw5ot_.svg
PNG exported to: C:\Users\amartays\AppData\Local\Temp\tmp8fmlvl8y.png
PDF exported to: C:\Users\amartays\AppData\Local\Temp\tmpoq548tlj.pdf
DOT file exported to: C:\Users\amartays\AppData\Local\Temp\tmpizbtn2gb.dot


In [14]:
# Export with custom filename
svg_path = viz.export(format="svg", filename="my_workflow.svg")
png_path = viz.export(format="png", filename="workflow_diagram.png")

# Convenience methods
svg_path = viz.save_svg("workflow.svg")
png_path = viz.save_png("workflow.png")
pdf_path = viz.save_pdf("workflow.pdf")

In [19]:
from agent_framework import (
    WorkflowBuilder, WorkflowViz, AgentExecutor,
    AgentExecutorRequest, AgentExecutorResponse
)

chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())

# Create agents
researcher = AgentExecutor(
    chat_client.create_agent(
            instructions=(
                "You are a research assistant that helps users gather information on various topics."
            )
        ),
    id="researcher")

marketer = AgentExecutor(
    chat_client.create_agent(
            instructions=(
                "You are a marketer assistant."
            )
        ), 
    id="marketer")


legal = AgentExecutor(chat_client.create_agent(
            instructions=(
                "You are a legal assistant that ensures all content complies with relevant laws and regulations."
            )
        ), id="legal")

# Build fan-out/fan-in workflow
workflow = (
    WorkflowBuilder()
    .set_start_executor(dispatcher)
    .add_fan_out_edges(dispatcher, [researcher, marketer, legal])  # Fan-out
    .add_fan_in_edges([researcher, marketer, legal], aggregator)   # Fan-in
    .build()
)

# Visualize
viz = WorkflowViz(workflow)
print(viz.to_mermaid())

NameError: name 'dispatcher' is not defined