# <center>OpenAI agent pattern: orchestrator and workers</center>

A starter guide for building an agent loop using the `openai-agents` library.

This pattern uses orchestators and workers. The orchestrator chooses which worker to use for a specific sub-task. The worker attempts to complete the sub-task and return a result. The orchestrator then uses the result to choose the next worker to use until a final result is returned.

In the following example, we'll build an agent which creates a portfolio of stocks and ETFs based on a user's investment strategy.
1.  **Orchestrator:** Chooses which worker to use based on the user's investment strategy.
2.  **Research Agent:** Searches the web for information about stocks and ETFs that could support the user's investment strategy.
3.  **Evaluation Agent:** Evaluates the research report and provides feedback on what data is missing.
4.  **Portfolio Agent:** Creates a portfolio of stocks and ETFs based on the research report.

### Install Libraries

In [None]:
# Install base libraries for OpenAI
%pip install -q openai openai-agents

# Install optional libraries for OpenInference/OpenTelemetry tracing
%pip install -q arize-phoenix-otel arize-phoenix-evals openinference-instrumentation-openai-agents openinference-instrumentation-openai

### Setup Keys

Add your OpenAI API key to the environment variable `OPENAI_API_KEY`.

Copy your Phoenix `API_KEY` from your settings page at [app.phoenix.arize.com](https://app.phoenix.arize.com).

In [None]:
import os
from getpass import getpass

import phoenix as px

if "OPENAI_API_KEY" not in os.environ:
    os.environ["OPENAI_API_KEY"] = getpass("🔑 Enter your OpenAI API key: ")

if "PHOENIX_API_KEY" not in os.environ:
    os.environ["PHOENIX_API_KEY"] = getpass("🔑 Enter your Phoenix API key: ")

if "PHOENIX_COLLECTOR_ENDPOINT" not in os.environ:
    os.environ["PHOENIX_COLLECTOR_ENDPOINT"] = getpass("🔑 Enter your Phoenix Collector Endpoint")

### Setup Tracing

In [None]:
from opentelemetry import trace

from phoenix.otel import register

tracer_provider = register(
    project_name="Orchestrator-workers", protocol="http/protobuf", auto_instrument=True, batch=True
)

tracer = trace.get_tracer("Orchestrator-workers")

client = px.Client()
PROJECT = "Orchestrator-workers"

## Creating the agents

In [None]:
from pprint import pprint
from textwrap import dedent
from typing import Literal

from agents import Agent, Runner, TResponseInputItem, WebSearchTool
from agents.model_settings import ModelSettings
from pydantic import BaseModel, Field


class PortfolioItem(BaseModel):
    ticker: str = Field(description="The ticker of the stock or ETF.")
    allocation: float = Field(
        description="The percentage allocation of the ticker in the portfolio. The sum of all allocations should be 100."
    )
    reason: str = Field(description="The reason why this ticker is included in the portfolio.")


class Portfolio(BaseModel):
    tickers: list[PortfolioItem] = Field(
        description="A list of tickers that could support the user's stated investment strategy."
    )


class EvaluationFeedback(BaseModel):
    feedback: str = Field(
        description="What data is missing in order to create a portfolio of stocks and ETFs based on the user's investment strategy."
    )
    score: Literal["pass", "needs_improvement", "fail"] = Field(
        description="A score on the research report. Pass if you have at least 5 tickers with data that supports the user's investment strategy to create a portfolio, needs_improvement if you do not have enough supporting data, and fail if you have no tickers."
    )


evaluation_agent = Agent(
    name="Evaluation Agent",
    instructions=dedent(
        """You are a senior financial analyst. You will be provided with a stock research report with positive and negative catalysts. Your task is to evaluate the report and provide feedback on what to improve."""
    ),
    model="gpt-4.1",
    output_type=EvaluationFeedback,
)

portfolio_agent = Agent(
    name="Portfolio Agent",
    instructions=dedent(
        """You are a senior financial analyst. You will be provided with a stock research report. Your task is to create a portfolio of stocks and ETFs that could support the user's stated investment strategy. Include facts and data from the research report in the stated reasons for the portfolio allocation."""
    ),
    model="o4-mini",
    output_type=Portfolio,
)

research_agent = Agent(
    name="FinancialSearchAgent",
    instructions=dedent(
        """You are a research assistant specializing in financial topics. Given a stock ticker, use web search to retrieve up‑to‑date context and produce a short summary of at most 50 words. Focus on key numbers, events, or quotes that will be useful to a financial analyst."""
    ),
    model="gpt-4.1",
    tools=[WebSearchTool()],
    model_settings=ModelSettings(tool_choice="required", parallel_tool_calls=True),
)

orchestrator_agent = Agent(
    name="Routing Agent",
    instructions=dedent("""You are a senior financial analyst. You are trying to create a portfolio based on my stated investment strategy. Your task is to handoff to the appropriate agent or tool.

    First, handoff to the research_agent to give you a report on stocks and ETFs that could support the user's stated investment strategy.
    Then, handoff to the evaluation_agent to give you a score on the research report. If the evaluation_agent returns a needs_improvement or fail, continue using the research_agent to gather more information.
    Once the evaluation_agent returns a pass, handoff to the portfolio_agent to create a portfolio."""),
    model="gpt-4.1",
    handoffs=[
        research_agent,
        evaluation_agent,
        portfolio_agent,
    ],
)

### Run our Workflow! 

Enter your investment strategy:

In [None]:
import asyncio
from uuid import uuid4

from openinference.semconv.trace import SpanAttributes

MAX_PASSES = 10
PASS_TIMEOUT = 500


async def run_agent_workflow():
    user_input = input("Enter your investment strategy: ")
    input_items: list[TResponseInputItem] = [{"content": user_input, "role": "user"}]

    with tracer.start_as_current_span(
        "Agent workflow",
        attributes={
            SpanAttributes.OPENINFERENCE_SPAN_KIND: "agent",
            SpanAttributes.INPUT_VALUE: user_input,
            SpanAttributes.SESSION_ID: str(uuid4()),
        },
    ) as root_span:
        passes = 0
        while passes < MAX_PASSES:
            try:
                orchestrator = await asyncio.wait_for(
                    Runner.run(orchestrator_agent, input_items),
                    timeout=PASS_TIMEOUT,
                )
            except asyncio.TimeoutError:
                print(f"Pass {passes + 1} hit the {PASS_TIMEOUT}s timeout—aborting.")
                break

            out = orchestrator.final_output
            pprint(out)

            if isinstance(out, Portfolio):
                break

            input_items = orchestrator.to_input_list()
            passes += 1

        root_span.set_attribute(SpanAttributes.OUTPUT_VALUE, str(out))

    print("AGENT COMPLETE")


await run_agent_workflow()

In [None]:
df = client.get_spans_dataframe(project_name="Orchestrator-workers", timeout=None)
llm_spans = df[df["span_kind"] == "LLM"]
root_ids = df[df["parent_id"].isna()]["context.trace_id"].unique()
llm_spans.head()

In [None]:
TRAJECTORY_ACCURACY_PROMPT_WITHOUT_REFERENCE = """
You are a helpful AI bot that checks whether an AI agent's internal trajectory is accurate and effective.

You will be given:
1. The agent's actual trajectory of tool calls
2. You will be given input data from a user that the agent used to make a decision
3. You will be given a tool call definition, what the agent used to make the tool call

An accurate trajectory:
- Progresses logically from step to step
- Follows the golden trajectory where reasonable
- Shows a clear path toward completing a goal
- Is reasonably efficient (doesn't take unnecessary detours)

##

Actual Trajectory:
{tool_calls}

Use Inputs:
{attributes.input.value}

Tool Definitions:
{attributes.llm.tools}

##

Your response must be a single string, either `correct` or `incorrect`, and must not include any additional text.

- Respond with `correct` if the agent's trajectory adheres to the rubric and accomplishes the task effectively.
- Respond with `incorrect` if the trajectory is confusing, misaligned with the goal, inefficient, or does not accomplish the task.
"""

In [None]:
from typing import Any, Dict

import pandas as pd


def filter_spans_by_trace_criteria(
    df: pd.DataFrame,
    trace_filters: Dict[str, Dict[str, Any]],
    span_filters: Dict[str, Dict[str, Any]],
) -> pd.DataFrame:
    """Filter spans based on trace-level and span-level criteria.

    Args:
        df: DataFrame with trace data
        trace_filters: Dictionary of column names and filtering criteria for traces
                      Format: {"column_name": {"operator": value}}
                      Supported operators: ">=", "<=", "==", "!=", "contains", "notna", "isna"
        span_filters: Dictionary of column names and filtering criteria for spans
                     Format: {"column_name": {"operator": value}}
                     Same supported operators as trace_filters

    Returns:
        DataFrame with filtered spans from traces that match trace_filters
    """
    all_trace_ids = set(df["context.trace_id"].unique())
    print(f"Total traces: {len(all_trace_ids)}")

    df_copy = df.copy()

    traces_df = df_copy.copy()
    for column, criteria in trace_filters.items():
        if column not in traces_df.columns:
            print(f"Warning: Column '{column}' not found in dataframe")
            continue

        for operator, value in criteria.items():
            if operator == ">=":
                matching_spans = traces_df[traces_df[column] >= value]
            elif operator == "<=":
                matching_spans = traces_df[traces_df[column] <= value]
            elif operator == "==":
                matching_spans = traces_df[traces_df[column] == value]
            elif operator == "!=":
                matching_spans = traces_df[traces_df[column] != value]
            elif operator == "contains":
                matching_spans = traces_df[
                    traces_df[column].str.contains(value, case=False, na=False)
                ]
            elif operator == "isna":
                matching_spans = traces_df[traces_df[column].isna()]
            elif operator == "notna":
                matching_spans = traces_df[traces_df[column].notna()]
            else:
                print(f"Warning: Unsupported operator '{operator}' - skipping")
                continue

            traces_df = matching_spans

    matching_trace_ids = set(traces_df["context.trace_id"].unique())
    print(f"Found {len(matching_trace_ids)} traces matching trace criteria")

    if not matching_trace_ids:
        print("No matching traces found")
        return pd.DataFrame()

    result_df = df[df["context.trace_id"].isin(matching_trace_ids)].copy()

    for column, criteria in span_filters.items():
        if column not in result_df.columns:
            print(f"Warning: Column '{column}' not found in dataframe")
            continue

        for operator, value in criteria.items():
            if operator == ">=":
                result_df = result_df[result_df[column] >= value]
            elif operator == "<=":
                result_df = result_df[result_df[column] <= value]
            elif operator == "==":
                result_df = result_df[result_df[column] == value]
            elif operator == "!=":
                result_df = result_df[result_df[column] != value]
            elif operator == "contains":
                result_df = result_df[result_df[column].str.contains(value, case=False, na=False)]
            elif operator == "isna":
                result_df = result_df[result_df[column].isna()]
            elif operator == "notna":
                result_df = result_df[result_df[column].notna()]
            else:
                print(f"Warning: Unsupported operator '{operator}' - skipping")
                continue

    print(f"Final result: {len(result_df)} spans from {len(matching_trace_ids)} traces")
    return result_df


def extract_tool_calls(output_messages):
    if not output_messages:
        return []

    tool_calls = []
    for message in output_messages:
        if "message.tool_calls" in message:
            for tool_call in message["message.tool_calls"]:
                tool_calls.append({"name": tool_call["tool_call.function.name"]})
    return tool_calls

In [None]:
from typing import Any, Dict

import pandas as pd

eval_traces = filter_spans_by_trace_criteria(
    df=df,
    trace_filters={"name": {"contains": "agent"}},
    span_filters={"attributes.openinference.span.kind": {"==": "LLM"}},
)

eval_traces.head()

In [None]:
eval_traces["tool_calls"] = eval_traces["attributes.llm.output_messages"].apply(extract_tool_calls)
eval_traces.head()
full_eval_spans = eval_traces[eval_traces["attributes.llm.tools"].notna()]

In [None]:
import nest_asyncio

from phoenix.evals import OpenAIModel, llm_classify
from phoenix.trace import suppress_tracing

nest_asyncio.apply()

model = OpenAIModel(
    api_key=os.environ["OPENAI_API_KEY"],
    model="gpt-4o-mini",
    temperature=0.0,
)

rails = ["correct", "incorrect"]

with suppress_tracing():
    eval_results = llm_classify(
        dataframe=full_eval_spans,
        template=TRAJECTORY_ACCURACY_PROMPT_WITHOUT_REFERENCE,
        model=model,
        rails=rails,
        provide_explanation=True,
        verbose=False,
        concurrency=20,
    )

eval_results["score"] = eval_results["label"].apply(lambda x: 1 if x == "correct" else 0)

In [None]:
import pandas as pd

merged_df = pd.merge(full_eval_spans, eval_results, left_index=True, right_index=True)

merged_df.rename(columns={"context.trace_id": "context.span_id"}, inplace=True)

merged_df.head()

In [None]:
from phoenix.trace import SpanEvaluations

client.log_evaluations(
    SpanEvaluations(
        dataframe=merged_df,
        eval_name="Agent Trajectory Accuracy",
    )
)