# Multi-Agent Workflow Implementation with OpenAI

This notebook demonstrates how to create a multi-agent workflow system using the OpenAI Agents SDK. We will define multiple agents, each with its own capabilities, and then create a workflow that allows these agents to interact and collaborate to achieve a common goal.

In [26]:
# Install required packages
!pip install openai openai-agents python-dotenv

# Verify installations
import importlib

def check_package(package_name):
    try:
        importlib.import_module(package_name)
        return True
    except ImportError:
        return False

packages = {
    "openai": "OpenAI API",
    "agents": "OpenAI Agents",
    "dotenv": "Python Dotenv",
    "logging": "Logging",
}

all_installed = True
for package, display_name in packages.items():
    installed = check_package(package)
    print(f"{display_name}: {'✅ Installed' if installed else '❌ Not installed'}")
    all_installed = all_installed and installed

if all_installed:
    print("\n✅ All required packages are installed!")
else:
    print("\n⚠️ Some packages are missing. Run the installation command again.")

OpenAI API: ✅ Installed
OpenAI Agents: ✅ Installed
Python Dotenv: ✅ Installed
Logging: ✅ Installed

✅ All required packages are installed!


## Environment Setup

Load environment variables from the `.env` file. <br>
N.b. it will look through the entire project for a valid `.env` file.

In [27]:
import os
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

# Get API key from environment variables
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")

# Set environment variables for compatibility with libraries that expect them
os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY or ""

# Verify API key is set
if not OPENAI_API_KEY:
    print("⚠️ Warning: OPENAI_API_KEY is not set in .env file")
else:
    print("✅ API key is set")    

✅ API key is set


## Import Required Libraries

First, we'll import the necessary libraries for our multi-agent workflow.

In [28]:
import openai
import asyncio
from IPython.display import Markdown, display

# Configure OpenAI API key
openai.api_key = OPENAI_API_KEY

if not OPENAI_API_KEY:
    raise ValueError("No OpenAI API key found. Please set OPENAI_API_KEY above.")

print("✅ API key verified")

✅ API key verified


## Define Web Search Tool

Create a web search function tool that our agents can use to access real-time information.

In [29]:
from agents import Agent, function_tool

@function_tool
def web_search(query: str) -> str:
    """
    Perform a web search using OpenAI's web search capable model.
    """
    import os
    from dotenv import load_dotenv
    load_dotenv()
    OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
    
    def create_openai_response_with_web_search(query):
        """
        Create an OpenAI response with web search capability
        Args:
            query (str): The user's query
        Returns:
            str: The model's response content
        """
        from openai import OpenAI
        client = OpenAI(api_key=OPENAI_API_KEY)
        completion = client.chat.completions.create(
            model="gpt-4o-mini-search-preview",
            web_search_options={},
            messages=[{"role": "user", "content": query}]
        )
        return completion.choices[0].message.content
    
    return create_openai_response_with_web_search(query)

## Define Basic Agents

Let's create a few specialized agents with different capabilities and roles.

In [30]:
# Define a planning agent
plan_agent = Agent(
    name="Plan Agent",
    model="gpt-4.1-nano",
    instructions="You have to create a plan to achieve the user's goal. You can use web search to find information.",
)

# Define an information gathering agent
get_info_agent = Agent(
    name="Get Info Agent",
    model="gpt-4.1-nano",
    instructions="You have to get the information needed to achieve the user's goal. You can use web search to find information.",
    tools=[web_search],
)

# Define language-specific agents
spanish_agent = Agent(
    name="Spanish agent",
    model="gpt-4.1-mini",
    instructions="You only speak Spanish.",
)

english_agent = Agent(
    name="English agent",
    model="gpt-4.1-mini",
    instructions="You only speak English",
)

# Define a triage agent that can route to language-specific agents
triage_agent = Agent(
    name="Triage agent",
    model="gpt-4.1-nano",
    instructions="Handoff to the appropriate agent based on the language of the request.",
    handoffs=[spanish_agent, english_agent],
)

## Helper Function for Asynchronous Code

Create a helper function to safely run asynchronous code in Jupyter notebooks.

In [31]:
from agents import Runner, trace

# Define a helper function to run async code in Jupyter
def run_safely_in_jupyter(async_func):
    """Helper function to run async code in Jupyter notebooks"""
    try:
        # Get the current event loop
        loop = asyncio.get_event_loop()
        
        # Check if the loop is already running
        if loop.is_running():
            # Use nest_asyncio to patch the running loop
            import nest_asyncio
            nest_asyncio.apply()
            print("✅ Applied nest_asyncio patch to allow nested event loops")
    except RuntimeError:
        # Create a new event loop if none exists
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        print("✅ Created new event loop")
    
    # Run the async function in the event loop
    return loop.run_until_complete(async_func())

## Example 1: Triage Agent for Language Routing

This example demonstrates how a triage agent can route requests to specialized agents based on the language.

In [32]:
# Define a thread ID for grouping related traces
thread_id = "my_conversation_1"

# Use trace context manager for better logging and grouping
with trace(workflow_name="Triage Example", group_id=thread_id):
    # Run the triage agent with an English request
    async def run_triage_agent_en():
        return await Runner.run(triage_agent, "Hello, how are you?")
    
    # Run the async function safely in Jupyter
    result_en = run_safely_in_jupyter(run_triage_agent_en)
    
    print("Triage Agent (English):")
    print(f"Final Output: {result_en.final_output}")
    print("-" * 20)

    # Run the triage agent with a Spanish request
    async def run_triage_agent_es():
        return await Runner.run(triage_agent, "¿Hola, cómo estás?")
    
    result_es = run_safely_in_jupyter(run_triage_agent_es)
    
    print("Triage Agent (Spanish):")
    print(f"Final Output: {result_es.final_output}")

✅ Applied nest_asyncio patch to allow nested event loops
Triage Agent (English):
Final Output: Hello! I'm doing well, thank you. How can I assist you today?
--------------------
✅ Applied nest_asyncio patch to allow nested event loops
Triage Agent (English):
Final Output: Hello! I'm doing well, thank you. How can I assist you today?
--------------------
✅ Applied nest_asyncio patch to allow nested event loops
Triage Agent (Spanish):
Final Output: ¡Hola! Estoy bien, gracias. ¿Y tú, cómo estás? ¿En qué puedo ayudarte hoy?
Triage Agent (Spanish):
Final Output: ¡Hola! Estoy bien, gracias. ¿Y tú, cómo estás? ¿En qué puedo ayudarte hoy?


## Example 2: Multi-Agent Research Workflow

Create a more complex multi-agent workflow for researching topics with structured data outputs.

In [33]:
from pydantic import BaseModel, Field
import time

# Define output schemas for structured agent responses
class PlanOutput(BaseModel):
    reasoning: str = Field(description="Step-by-step reasoning about how to approach the user's goal")
    plan: str = Field(description="A detailed plan with specific steps to achieve the user's goal")

class InfoOutput(BaseModel):
    information: str = Field(description="The information gathered to help achieve the user's goal")
    sources: str = Field(description="Sources of the information, if applicable")

class ResponseOutput(BaseModel):
    response: str = Field(description="The final formatted response to the user")

# Update agents with structured output types
plan_agent.output_type = PlanOutput
get_info_agent.output_type = InfoOutput

# Create a formatter agent that takes info and creates a final response
formatter_agent = Agent(
    name="Formatter Agent",
    instructions="You format the plan and information in a friendly way, adding emojis to make it engaging.",
    output_type=ResponseOutput,
)

# Create a router agent that orchestrates the workflow
router_agent = Agent(
    name="Router Agent",
    instructions=(
        "You determine the workflow for responding to user queries about complex topics. "
        "First, create a plan using the plan agent. "
        "Then, gather detailed information using the info agent. "
        "Finally, hand off to the formatter agent to create an engaging final response."
    ),
    tools=[
        plan_agent.as_tool(
            tool_name="create_research_plan",
            tool_description="Create a detailed plan for researching the user's question"
        ),
        get_info_agent.as_tool(
            tool_name="gather_detailed_information",
            tool_description="Gather detailed information to address the user's question"
        )
    ],
    handoffs=[formatter_agent],
)

## Run the Multi-Agent Research Workflow

Execute a complete research task using our multi-agent workflow system.

In [34]:
# Define workflow constants
WORKFLOW_NAME = "Research Assistant Workflow"
GROUP_ID = "research_conversation_1"

# Example usage of the multi-agent workflow
async def run_multi_agent_workflow():
    question = "What are the latest advancements in quantum computing and how might they impact cybersecurity?"
    
    print(f"🔍 Processing query: {question}")
    print("=" * 80)
    
    start_time = time.time()
    
    # Run the router agent with streaming to see intermediate steps
    from agents import RunConfig
    
    # Set up the run configuration
    config = RunConfig(
        workflow_name=WORKFLOW_NAME,
        group_id=GROUP_ID
    )
    
    # Initialize the stream
    stream = Runner.run_streamed(
        starting_agent=router_agent,
        input=question,
        run_config=config
    )
    
    # Track seen outputs and the final result
    seen_outputs = set()
    final_result = None
    
    # Process all events in a single pass
    async for event in stream.stream_events():
        # Handle agent start/finish events
        if event.type == "run_step":
            if event.name == "agent_started":
                print(f"🏃 Starting `{event.item.agent.name}`")
            elif event.name == "agent_finished":
                print(f"✅ `{event.item.agent.name}` completed")
                # Store the final result if this is the final agent
                if event.item.agent.name == "Formatter Agent" and hasattr(event.item, 'output'):
                    final_result = event.item.output
        
        # Handle tool output events (ensure no duplicates)
        elif event.type == "run_item_stream_event" and event.name == "tool_output":
            # Create a hash of the tool output to avoid duplicates
            output_str = str(event.item.output)
            if output_str not in seen_outputs:
                seen_outputs.add(output_str)
                print("🛠️ Tool output:")
                print("-" * 40)
                print(event.item.output)
                print("-" * 40)
    
    # Record the end time and calculate duration
    end_time = time.time()
    print("\n" + "=" * 80)
    print(f"✨ Total time taken: {end_time - start_time:.2f} seconds")
    
    return final_result

In [None]:
# Run the multi-agent workflow
print("\n📋 Starting Multi-Agent Research Workflow Example:")
final_output = run_safely_in_jupyter(run_multi_agent_workflow)
print("\n" + "=" * 80)
print("✅ Multi-Agent Workflow completed successfully!")


📋 Starting Multi-Agent Research Workflow Example:
✅ Applied nest_asyncio patch to allow nested event loops
🔍 Processing query: What are the latest advancements in quantum computing and how might they impact cybersecurity?
🛠️ Tool output:
----------------------------------------
{"reasoning":"To address the user's query, I need to gather the latest information on advancements in quantum computing and understand how these developments could influence cybersecurity. Starting with recent research papers, news articles, and announcements from leading tech companies and research institutions will provide up-to-date insights. After collecting this data, I'll analyze the potential impacts on cybersecurity, considering both vulnerabilities and new security paradigms enabled by quantum technology.","plan":"First, perform a web search to find the latest articles, research papers, and news on advancements in quantum computing. Next, identify key breakthroughs and emerging trends in the field. The

## Conclusion

In this notebook, we demonstrated:

1. Creating specialized agents with different roles and capabilities
2. Building a triage system for routing requests based on language
3. Designing a complex research workflow with multiple agents
4. Using structured data schemas for agent outputs

The OpenAI Agents SDK provides a powerful framework for building multi-agent systems that can collaborate to solve complex tasks by delegating subtasks to specialized agents and coordinating their activities.

However due to it relatively new nature, it is hard to find real-world examples of its usage. The documentation is okay but lacking in examples.