# Building Complex Agent Workflows with LangGraph

This notebook demonstrates how to build complex agent workflows using LangGraph, a framework for creating stateful, multi-agent systems.

In [None]:
%%capture --no-stderr
%pip install -U langgraph langchain_community langchain_anthropic langchain_experimental

In [None]:
import getpass
import os


def _set_if_undefined(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"Please provide your {var}")


_set_if_undefined("ANTHROPIC_API_KEY")
_set_if_undefined("TAVILY_API_KEY")

<div class="admonition tip">
    <p class="admonition-title">Set up <a href="https://smith.langchain.com">LangSmith</a> for LangGraph development</p>
    <p style="padding-top: 5px;">
        Sign up for LangSmith to quickly spot issues and improve the performance of your LangGraph projects. LangSmith lets you use trace data to debug, test, and monitor your LLM apps built with LangGraph â€” read more about how to get started <a href="https://docs.smith.langchain.com">here</a>. 
    </p>
</div>

## Create tools

For this example, you will make an agent to do web research with a search engine, and one agent to create plots. Define the tools they'll use below:

In [None]:
from typing import Annotated

from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.tools import tool
from langchain_experimental.utilities import PythonREPL

tavily_tool = TavilySearchResults(max_results=5)

# This executes code locally, which can be unsafe
repl = PythonREPL()


@tool
def python_repl_tool(
    code: Annotated[str, "The python code to execute to generate your chart."],
):
    """Use this to execute python code and do math. If you want to see the output of a value,
    you should print it out with `print(...)`. This is visible to the user."""
    try:
        result = repl.run(code)
    except BaseException as e:
        return f"Failed to execute. Error: {repr(e)}"
    result_str = f"Successfully executed:\n```python\n{code}\n```\nStdout: {result}"
    return result_str

### Create Agent Supervisor

It will use LLM with structured output to choose the next worker node OR finish processing.

In [None]:
from typing import Literal
from typing_extensions import TypedDict

from langchain_anthropic import ChatAnthropic
from langgraph.graph import MessagesState, END
from langgraph.types import Command


members = ["researcher", "coder"]
# Our team supervisor is an LLM node. It just picks the next agent to process
# and decides when the work is completed
options = members + ["FINISH"]

system_prompt = (
    "You are a supervisor tasked with managing a conversation between the"
    f" following workers: {members}. Given the following user request,"
    " respond with the worker to act next. Each worker will perform a"
    " task and respond with their results and status. When finished,"
    " respond with FINISH."
)


class Router(TypedDict):
    """Worker to route to next. If no workers needed, route to FINISH."""

    next: Literal[*options]


llm = ChatAnthropic(model="claude-3-5-sonnet-latest")


class State(MessagesState):
    next: str


def supervisor_node(state: State) -> Command[Literal[*members, "__end__"]]:
    messages = [
        {"role": "system", "content": system_prompt},
    ] + state["messages"]
    response = llm.with_structured_output(Router).invoke(messages)
    goto = response["next"]
    if goto == "FINISH":
        goto = END

    return Command(goto=goto, update={"next": goto})

## Construct Graph

We're ready to start building the graph. Below, define the state and worker nodes using the function we just defined.

In [None]:
from langchain_core.messages import HumanMessage
from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import create_react_agent


research_agent = create_react_agent(
    llm, tools=[tavily_tool], prompt="You are a researcher. DO NOT do any math."
)


def research_node(state: State) -> Command[Literal["supervisor"]]:
    result = research_agent.invoke(state)
    return Command(
        update={
            "messages": [
                HumanMessage(content=result["messages"][-1].content, name="researcher")
            ]
        },
        goto="supervisor",
    )


# NOTE: THIS PERFORMS ARBITRARY CODE EXECUTION, WHICH CAN BE UNSAFE WHEN NOT SANDBOXED
code_agent = create_react_agent(llm, tools=[python_repl_tool])


def code_node(state: State) -> Command[Literal["supervisor"]]:
    result = code_agent.invoke(state)
    return Command(
        update={
            "messages": [
                HumanMessage(content=result["messages"][-1].content, name="coder")
            ]
        },
        goto="supervisor",
    )


builder = StateGraph(State)
builder.add_edge(START, "supervisor")
builder.add_node("supervisor", supervisor_node)
builder.add_node("researcher", research_node)
builder.add_node("coder", code_node)
graph = builder.compile()

In [None]:
# Test the graph with a query
query = "What is the average net worth of the top 5 richest people in the world? Create a bar chart."

# Create an event stream
events = graph.stream({"messages": [{"role": "user", "content": query}]})

# Process each event
for event in events:
    node = event.get("node")
    if node is not None:
        print(f"\n{'-' * 50}")
        print(f"Currently executing node: {node}")
        if node == "supervisor":
            print(f"Next: {event['state'].get('next', 'Not specified')}")
    
    state = event.get("state")
    if state is not None and "messages" in state:
        # Print the last message
        last_message = state["messages"][-1]
        if hasattr(last_message, "name") and last_message.name:
            print(f"\n{last_message.name.upper()}: {last_message.content[:200]}...")
        else:
            print(f"\n{last_message['role'].upper()}: {last_message['content'][:200]}...")

## Building an Enhanced Multi-Agent System

Now let's extend this concept to create a more complex system with additional agents:

In [None]:
from langchain_core.tools import tool

# Define additional specialized tools
@tool
def summarize_text(text: Annotated[str, "The text to summarize"]):
    """Summarize the provided text."""
    # In a real implementation, this would use an LLM or specialized summarization model
    return f"Summary of {len(text)} characters of text: This is a placeholder summary."

@tool
def analyze_sentiment(text: Annotated[str, "The text to analyze for sentiment"]):
    """Analyze the sentiment of the provided text."""
    # Mock implementation
    if "good" in text.lower() or "great" in text.lower() or "excellent" in text.lower():
        return "Positive sentiment detected"
    elif "bad" in text.lower() or "terrible" in text.lower() or "awful" in text.lower():
        return "Negative sentiment detected"
    else:
        return "Neutral sentiment detected"

In [None]:
# Create a more complex routing system with additional agents
enhanced_members = ["researcher", "coder", "summarizer", "analyst"]
enhanced_options = enhanced_members + ["FINISH"]

enhanced_system_prompt = (
    "You are a supervisor tasked with managing a conversation between the"
    f" following workers: {enhanced_members}. Given the following user request,"
    " respond with the worker to act next. Each worker will perform a"
    " task and respond with their results and status.\n\n"
    "Researcher: Finds information on topics using search.\n"
    "Coder: Writes and executes Python code, creates visualizations.\n"
    "Summarizer: Creates concise summaries of text.\n"
    "Analyst: Analyzes sentiment and provides insights.\n\n"
    "When no further work is needed, respond with FINISH."
)


class EnhancedRouter(TypedDict):
    """Worker to route to next. If no workers needed, route to FINISH."""

    next: Literal[*enhanced_options]


def enhanced_supervisor_node(state: State) -> Command[Literal[*enhanced_members, "__end__"]]:
    messages = [
        {"role": "system", "content": enhanced_system_prompt},
    ] + state["messages"]
    response = llm.with_structured_output(EnhancedRouter).invoke(messages)
    goto = response["next"]
    if goto == "FINISH":
        goto = END

    return Command(goto=goto, update={"next": goto})

In [None]:
# Create the additional specialized agents
summarizer_agent = create_react_agent(
    llm, tools=[summarize_text], prompt="You are a summarizer. Your job is to create concise summaries of text."
)

analyst_agent = create_react_agent(
    llm, tools=[analyze_sentiment], prompt="You are an analyst. Your job is to analyze sentiment and provide insights."
)


def summarizer_node(state: State) -> Command[Literal["supervisor"]]:
    result = summarizer_agent.invoke(state)
    return Command(
        update={
            "messages": [
                HumanMessage(content=result["messages"][-1].content, name="summarizer")
            ]
        },
        goto="supervisor",
    )


def analyst_node(state: State) -> Command[Literal["supervisor"]]:
    result = analyst_agent.invoke(state)
    return Command(
        update={
            "messages": [
                HumanMessage(content=result["messages"][-1].content, name="analyst")
            ]
        },
        goto="supervisor",
    )

In [None]:
# Build the enhanced graph
enhanced_builder = StateGraph(State)
enhanced_builder.add_edge(START, "supervisor")
enhanced_builder.add_node("supervisor", enhanced_supervisor_node)
enhanced_builder.add_node("researcher", research_node)
enhanced_builder.add_node("coder", code_node)
enhanced_builder.add_node("summarizer", summarizer_node)
enhanced_builder.add_node("analyst", analyst_node)
enhanced_graph = enhanced_builder.compile()

In [None]:
# Visualize the graph (uncomment if you have graphviz installed)
# enhanced_graph.show()

In [None]:
# Test the enhanced graph with a complex query
complex_query = "Research the latest climate change reports, summarize the main findings, analyze the sentiment of public responses, and create a visualization of temperature trends over the last decade."

# Create an event stream
events = enhanced_graph.stream({"messages": [{"role": "user", "content": complex_query}]})

# Process each event
for event in events:
    node = event.get("node")
    if node is not None:
        print(f"\n{'-' * 50}")
        print(f"Currently executing node: {node}")
        if node == "supervisor":
            print(f"Next: {event['state'].get('next', 'Not specified')}")
    
    state = event.get("state")
    if state is not None and "messages" in state:
        # Print the last message
        last_message = state["messages"][-1]
        if hasattr(last_message, "name") and last_message.name:
            print(f"\n{last_message.name.upper()}: {last_message.content[:200]}...")
        else:
            print(f"\n{last_message['role'].upper()}: {last_message['content'][:200]}...")

## Creating a Conditional Workflow

Let's create a workflow with conditional branching based on user requirements:

In [None]:
from typing import Dict, List, Any

# Define a function to route based on request classification
def classify_request(state: State) -> Dict[str, Any]:
    messages = state["messages"]
    user_message = [m for m in messages if m["role"] == "user"][-1]["content"]
    
    # Use the LLM to classify the request
    classification_prompt = f"""
    Given the user request: "{user_message}"
    
    Classify this request into one of the following categories:
    - RESEARCH: If the user is asking for information on a topic.
    - ANALYSIS: If the user is asking for analysis of data or text.
    - VISUALIZATION: If the user is asking for a chart or visualization.
    - MULTI_TASK: If the user request includes multiple different types of tasks.
    
    Respond with just the category name.
    """
    
    classification_response = llm.invoke([
        {"role": "system", "content": "You are a classifier that categorizes user requests. Respond only with the category name."}, 
        {"role": "user", "content": classification_prompt}
    ])
    
    category = classification_response.content.strip()
    
    # Return this as debug information, but it won't be added to state
    return {"classification": category}


def conditional_router(state: State, classification_result: Dict[str, Any]) -> str:
    category = classification_result["classification"]
    
    if "RESEARCH" in category:
        return "researcher"
    elif "ANALYSIS" in category:
        return "analyst"
    elif "VISUALIZATION" in category:
        return "coder"
    else:  # MULTI_TASK or anything else
        return "supervisor"
        
# Build a conditional graph
conditional_builder = StateGraph(State)
conditional_builder.add_node("classifier", classify_request)
conditional_builder.add_node("supervisor", enhanced_supervisor_node)
conditional_builder.add_node("researcher", research_node)
conditional_builder.add_node("coder", code_node)
conditional_builder.add_node("summarizer", summarizer_node)
conditional_builder.add_node("analyst", analyst_node)

# Add conditional branching
conditional_builder.add_edge(START, "classifier")
conditional_builder.add_conditional_edges(
    "classifier",
    conditional_router,
    {
        "researcher": "researcher",
        "analyst": "analyst",
        "coder": "coder",
        "supervisor": "supervisor"
    }
)

# Add remaining edges
for member in enhanced_members:
    conditional_builder.add_edge(member, "supervisor")

# Compile the conditional graph
conditional_graph = conditional_builder.compile()

In [None]:
# Test the conditional graph with different types of queries
queries = [
    "What is the impact of climate change on polar bears?",  # Research
    "Analyze the sentiment of this review: 'The hotel was excellent but the staff was rude.'",  # Analysis
    "Create a bar chart showing the population of the 5 largest cities in the world.",  # Visualization
    "Research renewable energy trends, summarize the findings, and create a visualization of adoption rates."  # Multi-task
]

for i, query in enumerate(queries):
    print(f"\n\n{'=' * 80}\nQuery {i+1}: {query}\n{'=' * 80}")
    
    # Stream results
    events = conditional_graph.stream({"messages": [{"role": "user", "content": query}]})
    
    # Process the first few events
    event_count = 0
    for event in events:
        event_count += 1
        node = event.get("node")
        if node is not None:
            print(f"\n{'-' * 50}")
            print(f"Currently executing node: {node}")
            if node == "supervisor":
                print(f"Next: {event['state'].get('next', 'Not specified')}")
        
        state = event.get("state")
        if state is not None and "messages" in state:
            # Print the last message
            last_message = state["messages"][-1]
            if hasattr(last_message, "name") and last_message.name:
                print(f"\n{last_message.name.upper()}: {last_message.content[:200]}...")
            elif isinstance(last_message, dict) and "role" in last_message:
                print(f"\n{last_message['role'].upper()}: {last_message['content'][:200]}...")
                
        # Limit the number of events we process for this demo
        if event_count >= 5:
            print("\n... (Output truncated for brevity) ...")
            break

## Conclusion

In this notebook, we've explored how to build complex agent workflows using LangGraph:

1. **Basic Multi-Agent System**: A system with a supervisor that routes tasks to specialized agents (research and coding)
2. **Enhanced Multi-Agent System**: Added more specialized agents for summarization and analysis
3. **Conditional Workflows**: Created a system that classifies user queries and routes them to the appropriate agent first

These patterns can be combined and extended to create powerful AI applications that can handle complex, multi-step tasks efficiently.