<center>
    <p style="text-align:center">
        <img alt="phoenix logo" src="https://storage.googleapis.com/arize-phoenix-assets/assets/phoenix-logo-light.svg" width="200"/>
        <br>
        <a href="https://arize.com/docs/phoenix/">Docs</a>
        |
        <a href="https://github.com/Arize-ai/phoenix">GitHub</a>
        |
        <a href="https://arize-ai.slack.com/join/shared_invite/zt-2w57bhem8-hq24MB6u7yE_ZF_ilOYSBw#/shared-invite/email">Community</a>
    </p>
</center>

# <center> LangGraph Agents: Orchestrator‚ÄìWorker Pattern </center>

In this tutorial, we‚Äôll build a multi-agent system using LangGraph's **Orchestrator‚ÄìWorker pattern**, ideal for dynamically decomposing a task into subtasks, assigning them to specialized LLM agents, and synthesizing their responses.

This pattern is particularly well-suited when the structure of subtasks is unknown ahead of time‚Äîsuch as when writing modular code, creating multi-section reports, or conducting research. The **orchestrator** plans and delegates, while the **workers** each complete their assigned section.

We‚Äôll also use **Phoenix** to trace and debug the orchestration process. With Phoenix, you can visually inspect which tasks the orchestrator generated, how each worker handled its section, and how the final output was assembled.

By the end of this notebook, you‚Äôll learn how to:
- Use structured outputs to plan subtasks dynamically.
- Assign subtasks to LLM workers via LangGraph's `Send` API.
- Collect and synthesize multi-step LLM outputs.
- Trace and visualize orchestration using Phoenix.


In [None]:
%pip install -qqqqqqq langgraph langchain langchain_community arize-phoenix arize-phoenix-otel openinference-instrumentation-langchain langchain_openai

In [None]:
import os
from getpass import getpass

from langgraph.graph import END, START, StateGraph

In [None]:
os.environ["OPENAI_API_KEY"] = getpass("üîë Enter your OpenAI API key: ")

# Configure Phoenix Tracing

Make sure you go to your Phoenix instance at https://app.phoenix.arize.com/ and generate an API key. This will allow you to trace your Langgraph application with Phoenix.

In [None]:
if "PHOENIX_API_KEY" not in os.environ:
    os.environ["PHOENIX_API_KEY"] = getpass("üîë Enter your Phoenix API key: ")

if "PHOENIX_COLLECTOR_ENDPOINT" not in os.environ:
    os.environ["PHOENIX_COLLECTOR_ENDPOINT"] = getpass("üîë Enter your Phoenix Collector Endpoint")

In [None]:
from phoenix.otel import register

project_name = "langgraph-orchestrator-worker"
tracer_provider = register(project_name=project_name, auto_instrument=True)

Orchestrator‚ÄëWorkers ‚Ä¢ Research‚ÄëPaper Generator
----------------------------------------------
The orchestrator plans research‚Äëpaper *subsections* (abstract, background ‚Ä¶),
spawns one worker per subsection, then stitches everything into a full draft.

In [None]:
import operator
from typing import Annotated, List, TypedDict

from IPython.display import Markdown
from langchain_core.messages import HumanMessage, SystemMessage

# Step 1: Defining the Planning Schema
To begin, we define a structured output schema using Pydantic. This schema ensures that the LLM returns well-formatted, predictable output when tasked with planning the structure of a research paper.

We create two models:

Subsection: Represents a single unit of the paper, including its name and a brief description of what it should cover.

Subsections: A wrapper that holds a list of these units.

By using these models with LangGraph‚Äôs with_structured_output feature, we enforce that the orchestrator LLM returns an organized plan ‚Äî rather than freeform text ‚Äî that downstream nodes (worker LLMs) can reliably use.

This schema acts as the blueprint for the rest of the workflow.

In [None]:
from langchain_core.pydantic_v1 import BaseModel, Field
from langgraph.types import Send


class Subsection(BaseModel):
    name: str = Field(description="Name for this subsection of the research paper.")
    description: str = Field(
        description="Concise description of the general subjects to be covered in this subsection."
    )


class Subsections(BaseModel):
    Subsections: List[Subsection] = Field(description="All subsections of the research paper.")

# Step 2: Set Up LLM and Tools
We initialize gpt-3.5-turbo as our base LLM and bind it to the Subsections schema to create the orchestrator. We also load a search tool, Tavily, to allow worker agents to enrich sections with live web data.

In [None]:
TAVILIY_API_KEY = getpass("Tavily API Key: ")
os.environ["TAVILY_API_KEY"] = TAVILIY_API_KEY

In [None]:
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4", temperature=0)
orchestrator_llm = llm.with_structured_output(Subsections)

search = TavilySearchResults(k=5)

# Step 3: Define Graph State
We define two state schemas:

State holds the overall research paper workflow, including the topic, planned subsections, completed text, and final output.

WorkerState captures the task assigned to each worker ‚Äî a single subsection ‚Äî and where their contributions are accumulated.

This shared state structure lets LangGraph coordinate work between the orchestrator and its worker agents.

In [None]:
class State(TypedDict):
    topic: str
    subsections: List[Subsection]
    completed_subsections: Annotated[List[str], operator.add]
    final_paper: str
    search_results: str


class WorkerState(TypedDict):
    subsection: Annotated[Subsection, lambda x, y: y]
    completed_subsections: Annotated[List[str], operator.add]
    search_results: str

# Step 4: Define Nodes
We define three core nodes in the graph:

orchestrator: Dynamically plans the structure of the paper by generating a list of subsections using structured output.

subsection_writer: Acts as a worker that writes one full subsection in academic Markdown, using the provided description and scope.

synthesiser: Merges all completed subsections into a single cohesive draft, separating sections with visual dividers.

Each node contributes to a modular, scalable paper-writing pipeline ‚Äî and with Phoenix tracing, you can inspect every generation step in detail.

In [None]:
def orchestrator(state: State):
    """Plan the research‚Äëpaper subsections dynamically."""
    plan = orchestrator_llm.invoke(
        [
            SystemMessage(content="Generate a detailed subsection plan for a research paper."),
            HumanMessage(content=f"Paper topic: {state['topic']}"),
        ]
    )
    return {"subsections": plan.Subsections}


def subsection_writer(state: WorkerState):
    sub = state["subsection"]
    search_info = state.get("search_results", "")

    response = llm.invoke(
        [
            SystemMessage(
                content=(
                    "You're writing a research-paper subsection using the following web search result as background and also your own knowledge."
                )
            ),
            HumanMessage(
                content=(
                    f"Subsection: {sub.name}\n"
                    f"Description: {sub.description}\n"
                    f"Shared Search Results:\n{search_info}\n\n"
                    "Now write the section."
                )
            ),
        ]
    )
    return {"completed_subsections": [response.content]}


def synthesiser(state: State):
    """Concatenate all finished subsections into the final paper draft."""
    full_paper = "\n\n---\n\n".join(state["completed_subsections"])
    return {"final_paper": full_paper}


def search_tool(state: State):
    query = f"{state['topic']} research summary"
    search_results = search.invoke(query)
    return {"search_results": search_results}

# Step 5: Assign Workers Dynamically
This function uses LangGraph‚Äôs Send API to launch a separate subsection_writer worker for each planned subsection. By dynamically spawning one worker per section, the system scales flexibly based on the topic‚Äôs complexity.

This approach is ideal for research paper generation, where the number of sections is not known ahead of time ‚Äî and Phoenix helps trace the output from each worker node independently.

In [None]:
def assign_workers(state: State):
    """Launch one subsection_writer per planned subsection (after shared search)."""
    return [
        Send("subsection_writer", {"subsection": s, "search_results": state["search_results"]})
        for s in state["subsections"]
    ]

# Step 6: Construct the LangGraph Workflow
Here, we build the full LangGraph pipeline using a StateGraph. The workflow begins with the orchestrator node (to plan subsections), dynamically routes work to subsection_writer nodes (via assign_workers), and then aggregates all outputs in the synthesiser node.

LangGraph‚Äôs conditional edges and Send API enable scalable parallelism ‚Äî and with Phoenix tracing enabled, you can view how each section is created, tracked, and stitched together.

In [None]:
builder = StateGraph(State)

builder.add_node("orchestrator", orchestrator)
builder.add_node("search_tool", search_tool)
builder.add_node("subsection_writer", subsection_writer)
builder.add_node("synthesiser", synthesiser)

builder.add_edge(START, "orchestrator")
builder.add_edge("orchestrator", "search_tool")
builder.add_conditional_edges("search_tool", assign_workers, ["subsection_writer"])
builder.add_edge("subsection_writer", "synthesiser")
builder.add_edge("synthesiser", END)


research_paper_workflow = builder.compile()

# Step 7: Run the Research Paper Generator
We now invoke the compiled LangGraph with a sample topic: ‚ÄúScaling Laws for Large Language Models.‚Äù The orchestrator plans the outline, each worker drafts a subsection in parallel, and the synthesizer assembles the full paper.

With Phoenix integrated, every step is traced ‚Äî from section planning to writing and synthesis ‚Äî giving you full visibility into the execution flow and helping debug or refine outputs.

In [None]:
from openinference.semconv.trace import SpanAttributes
from opentelemetry import trace

tracer = trace.get_tracer(__name__)

research_topics = [
    "How do scaling laws impact the performance of large language models?",
    # Other research topic examples you can run
    # "What are the key challenges in training very large transformer models?",
    # "How much data is needed to train a performant LLM?",
    # "Explain the relationship between model size and accuracy in language models.",
    # "Why are modern language models undertrained, and how can we fix it?",
    # "What are the trade-offs between training time and model performance?",
    # "Summarize recent findings on training efficiency for large-scale language models.",
]

for topic in research_topics:
    with tracer.start_as_current_span(
        "research_paper_workflow", attributes={"topic": topic}, openinference_span_kind="agent"
    ) as span:
        span.set_attribute(SpanAttributes.INPUT_VALUE, topic)
        state = research_paper_workflow.invoke({"topic": topic})

print("===== RESEARCH PAPER DRAFT =====\n")
Markdown(state["final_paper"])

![Agent Traces](https://storage.googleapis.com/arize-phoenix-assets/assets/images/langgraph-agent-orchestrator-traces.png)

# Step 8: Agent Evaluation

Here, we will evaluate the agent‚Äôs trajectory. This means checking whether the sequence of steps it took was logical, efficient, and aligned with completing the user‚Äôs request. Then, we will log those results back to Phoenix.

In [None]:
import pandas as pd

from phoenix.client import AsyncClient

px_client = AsyncClient()
df = await px_client.spans.get_spans_dataframe(project_name=project_name)

trace_df = df.groupby("context.trace_id").agg(
    {
        "attributes.input.value": "first",
        "attributes.output.value": lambda x: " ".join(x.dropna()),
    }
)


def extract_input_content(input_value):
    try:
        if pd.isna(input_value) or input_value is None:
            return None

        # JSON string
        if isinstance(input_value, str):
            import json

            try:
                parsed = json.loads(input_value)
                return parsed.get("topic")
            except Exception:
                return None

        return None

    except (AttributeError, TypeError, KeyError):
        return None


# Apply function row by row
trace_df["attributes.input.value"] = trace_df["attributes.input.value"].apply(extract_input_content)
trace_df.head()

In [None]:
TRAJECTORY_PERFORMANCE_PROMPT = """
You are a helpful AI bot that checks whether an AI agent's internal trajectory is accurate and effective.

You will be given:
1. You will be given an input query from a user that the agent responded to
2. The agent's actual trajectory of tool calls and responses

An accurate trajectory:
- Progresses logically from step to step
- Follows the golden trajectory where reasonable
- Shows a clear path toward completing a goal
- Is reasonably efficient (doesn't take unnecessary detours)

##

User Query:
{attributes.input.value}

Trajectory:
{attributes.output.value}

##

Your response must be a single string, either `correct` or `incorrect`, and must not include any additional text.

- Respond with `correct` if the agent's trajectory adheres to the rubric and accomplishes the task effectively.
- Respond with `incorrect` if the trajectory is confusing, misaligned with the goal, inefficient, or does not accomplish the task.
"""

In [None]:
from phoenix.evals import OpenAIModel, llm_classify
from phoenix.trace import suppress_tracing

model = OpenAIModel(
    api_key=os.environ["OPENAI_API_KEY"],
    model="gpt-4o-mini",
    temperature=0.0,
)

rails = ["correct", "incorrect"]

with suppress_tracing():
    eval_results = llm_classify(
        dataframe=trace_df,
        template=TRAJECTORY_PERFORMANCE_PROMPT,
        model=model,
        rails=rails,
        provide_explanation=True,
        verbose=False,
        concurrency=20,
    )

eval_results["score"] = eval_results["label"].apply(lambda x: 1 if x == "correct" else 0)

In [None]:
root_spans = df[df["parent_id"].isna()][["context.trace_id", "context.span_id"]]
eval_results = eval_results[["score", "label", "explanation"]]

trajectory_eval_df = pd.merge(trace_df, eval_results, left_index=True, right_index=True, how="left")

trajectory_eval_df = pd.merge(
    trajectory_eval_df.reset_index(), root_spans, on="context.trace_id", how="left"
).set_index("context.span_id", drop=False)

### Log Evals to Phoenix

In [None]:
await px_client.annotations.log_span_annotations_dataframe(
    dataframe=trajectory_eval_df,
    annotation_name="TRAJECTORY PERFORMANCE",
    annotator_kind="LLM",
)

# View Final Results in Phoenix

![Eval Results](https://storage.googleapis.com/arize-phoenix-assets/assets/images/langgraph-agent-orchestrator-evals.png)