## Hierarchical Agent Teams

In our previous example ([Agent Supervisor](./agent_supervisor.ipynb)), we introduced the concept of a single supervisor node to route work between different worker nodes.

But what if the job for a single worker becomes too complex? What if the number of workers becomes too large?

For some applications, the system may be more effective if work is distributed _hierarchically_.

You can do this by composing different subgraphs and creating a top-level supervisor, along with mid-level supervisors.

To do this, let's build a simple research assistant! The graph will look something like the following:

![diagram](./img/hierarchical-diagram.png)

This notebook is inspired by the paper [AutoGen: Enabling Next-Gen LLM Applications via Multi-Agent Conversation](https://arxiv.org/abs/2308.08155), by Wu, et. al. In the rest of this notebook, you will:

1. Define the agents' tools to access the web and write files
2. Define some utilities to help create the graph and agents
3. Create and define each team (web research + doc writing)
4. Compose everything together.

But before all of that, some setup:

In [None]:
# %%capture --no-stderr
# %pip install -U langgraph langchain langchain-openai langchain-experimental

In [1]:
import getpass
import os


def _set_if_undefined(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"Please provide your {var}")


_set_if_undefined("OPENAI_API_KEY")
# _set_if_undefined("LANGCHAIN_API_KEY")
_set_if_undefined("TAVILY_API_KEY")

# Optional, add tracing in LangSmith.
# This will help you visualize and debug the control flow
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "Multi-agent Collaboration"

## Create Tools

Each team will be composed of one or more agents each with one or more tools. Below, define all the tools to be used by your different teams.

We'll start with the research team.

**ResearchTeam tools**

The research team can use a search engine and url scraper to find information on the web. Feel free to add additional functionality below to boost the team performance!

In [2]:
from typing import Annotated, List

from langchain_community.document_loaders import WebBaseLoader
from langchain_community.tools.tavily_search import TavilySearchResults

tavily_tool = TavilySearchResults(max_results=5)


def scrape_webpages(urls: List[str]) -> str:
    """Use requests and bs4 to scrape the provided web pages for detailed information."""
    loader = WebBaseLoader(urls)
    return "\n\n".join(
        [
            f'<Document name="{doc.metadata.get("title", "")}">\n{doc.page_content}\n</Document>'
            for doc in loader.load()
        ]
    )

USER_AGENT environment variable not set, consider setting it to identify your requests.


**Document writing team tools**

Next up, we will give some tools for the doc writing team to use.
We define some bare-bones file-access tools below.

Note that this gives the agents access to your file-system, which can be unsafe. We also haven't optimized the tool descriptions for performance.

In [3]:
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Dict, Optional

from langchain_experimental.utilities import PythonREPL
from typing_extensions import TypedDict

_TEMP_DIRECTORY = TemporaryDirectory()
WORKING_DIRECTORY = Path(_TEMP_DIRECTORY.name)


def create_outline(
    points: Annotated[List[str], "List of main points or sections."],
    file_name: Annotated[str, "File path to save the outline."],
) -> Annotated[str, "Path of the saved outline file."]:
    """Create and save an outline."""
    with (WORKING_DIRECTORY / file_name).open("w") as file:
        for i, point in enumerate(points):
            file.write(f"{i + 1}. {point}\n")
    return f"Outline saved to {file_name}"


def read_document(
    file_name: Annotated[str, "File path to save the document."],
    start: Annotated[Optional[int], "The start line. Default is 0"] = None,
    end: Annotated[Optional[int], "The end line. Default is None"] = None,
) -> str:
    """Read the specified document."""
    with (WORKING_DIRECTORY / file_name).open("r") as file:
        lines = file.readlines()
    if start is not None:
        start = 0
    return "\n".join(lines[start:end])


def write_document(
    content: Annotated[str, "Text content to be written into the document."],
    file_name: Annotated[str, "File path to save the document."],
) -> Annotated[str, "Path of the saved document file."]:
    """Create and save a text document."""
    with (WORKING_DIRECTORY / file_name).open("w") as file:
        file.write(content)
    return f"Document saved to {file_name}"


def edit_document(
    file_name: Annotated[str, "Path of the document to be edited."],
    inserts: Annotated[
        Dict[int, str],
        "Dictionary where key is the line number (1-indexed) and value is the text to be inserted at that line.",
    ],
) -> Annotated[str, "Path of the edited document file."]:
    """Edit a document by inserting text at specific line numbers."""

    with (WORKING_DIRECTORY / file_name).open("r") as file:
        lines = file.readlines()

    sorted_inserts = sorted(inserts.items())

    for line_number, text in sorted_inserts:
        if 1 <= line_number <= len(lines) + 1:
            lines.insert(line_number - 1, text + "\n")
        else:
            return f"Error: Line number {line_number} is out of range."

    with (WORKING_DIRECTORY / file_name).open("w") as file:
        file.writelines(lines)

    return f"Document edited and saved to {file_name}"


# Warning: This executes code locally, which is unsafe, especially when not sandboxed

repl = PythonREPL()


def python_repl(
    code: Annotated[str, "The python code to execute to generate your chart."],
):
    """Use this to execute python code. If you want to see the output of a value,
    you should print it out with `print(...)`. This is visible to the user."""
    try:
        result = repl.run(code)
    except BaseException as e:
        return f"Failed to execute. Error: {repr(e)}"
    return f"Successfully executed:\n```python\n{code}\n```\nStdout: {result}"

## Helper Utilities

We are going to create a few utility functions to make it more concise when we want to:

1. Create a worker agent.
2. Create a supervisor for the sub-graph.

These will simplify the graph compositional code at the end for us so it's easier to see what's going on.

In [45]:
from typing import List, Optional
from langgraph.prebuilt import create_react_agent
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_openai import ChatOpenAI
from datetime import datetime, timezone
from typing import Callable

from langgraph.graph import END, StateGraph


class Worker:
    def __init__(self, name: str, llm: ChatOpenAI, tools: list, system_prompt: str, before: Optional[Callable] = None):
        self.name = name
        system_prompt = (system_prompt
            + "\nWork autonomously according to your specialty, using the tools available to you."
            " Do not ask for clarification."
            " Your other team members (and other teams) will collaborate with you with their own specialties."
            " You must focus exclusively on fulfilling what is in your specialty, as defined by your tools.")
            f"\n\n Current time: {datetime.now(tz=timezone.utc).isoformat()}"
        prompt_template = ChatPromptTemplate.from_messages([("system", system_prompt), ("placeholder", "{messages}")])
        self.agent = prompt_template | lambda prmopt_value: {"messages": prompt_value.to_messages()} | create_react_agent(
            llm,
            tools,
        )
        if before:
            self.agent = before | agent

    async def __call__(self, state, config):
        result = await self.agent.ainvoke(state, config)
        # We will treat workers as "human" messages in our shared state to avoid confusing the supervisor
        return {"messages": [HumanMessage(content=result["messages"][-1].content, name=self.name)]}


class Supervisor:
    def __init__(self, llm: ChatOpenAI, members: list) -> str:
        """An LLM-based router."""
        self.name = "supervisor"
        member_names = [m.name for m in members]
        options = ["FINISH"] + member_names
        function_def = {
            "name": "route",
            "description": "Select the next role.",
            "parameters": {
                "title": "routeSchema",
                "type": "object",
                "properties": {
                    "next": {
                        "title": "Next",
                        "anyOf": [
                            {"enum": options},
                        ],
                    },
                },
                "required": ["next"],
            },
        }
    
        prompt = ChatPromptTemplate.from_messages(
            [
                (
                    "system",
                    f"""You are a supervisor tasked with managing a conversation between the following workers: {", ".join(member_names)}
Given the following user request, respond with the worker to act next. Each worker will perform a task and respond with their results and status.
When finished, respond with FINISH.

Current time: {{current_time}}""",
                ),
                MessagesPlaceholder(variable_name="messages"),
                (
                    "system",
                    "Given the conversation above, who should act next?"
                    f" Or should we FINISH? Select one of: {options}",
                ),
            ]
        ).partial(current_time=datetime.now(tz=timezone.utc).isoformat())
        self.agent = prompt | llm.with_structured_output(
            {"type": "function", "function": function_def}
        )

    async def __call__(self, state, config):
        return await self.agent.ainvoke(state, config)

## Define Agent Teams

Now we can get to define our hierarchical teams. "Choose your player!"

### Research Team

The research team will have a search agent and a web scraping "research_agent" as the two worker nodes. Let's create those, as well as the team supervisor.

In [24]:
import operator

from langchain_core.messages import BaseMessage, HumanMessage
from langchain_openai.chat_models import ChatOpenAI


# ResearchTeam graph state
class ResearchTeamState(TypedDict):
    # A message is added after each team member finishes
    messages: Annotated[List[BaseMessage], operator.add]
    # The team members are tracked so they are aware of
    # the others' skill-sets
    team_members: List[str]
    # Used to route work. The supervisor calls a function
    # that will update this every time it makes a decision
    next: str


llm = ChatOpenAI(model="gpt-4o")

search_agent = Worker(
    "Search",
    llm,
    [tavily_tool],
    "You are a research assistant who can search for up-to-date info using the tavily search engine.",
)

research_agent = Worker(
    "WebScraper",
    llm,
    [scrape_webpages],
    "You are a research assistant who can scrape specified urls for more detailed information using the scrape_webpages function.",
)

supervisor_agent = Supervisor(
    llm,
    [search_agent, research_agent],
)

Now that we've created the necessary components, defining their interactions is easy. Add the nodes to the team graph, and define the edges, which determine the transition criteria.

In [40]:
from langgraph.graph import START, END
from typing import Literal

research_graph = StateGraph(ResearchTeamState)
research_graph.add_node(search_agent.name, search_agent)
research_graph.add_node(research_agent.name, research_agent)
research_graph.add_node(supervisor_agent.name, supervisor_agent)

# Define the control flow
research_graph.add_edge(search_agent.name, supervisor_agent.name)
research_graph.add_edge(research_agent.name, supervisor_agent.name)


def route_supervisor(
    state: ResearchTeamState,
) -> Literal["Search", "WebScraper", "__end__"]:
    match state["next"]:
        case "FINISH":
            return END
        case _:
            return state["next"]


research_graph.add_conditional_edges("supervisor", route_supervisor)


research_graph.add_edge(START, "supervisor")
chain = research_graph.compile()


# The following functions interoperate between the top level graph state
# and the state of the research sub-graph
# this makes it so that the states of each graph don't get intermixed
def enter_chain(message: str):
    results = {
        "messages": [HumanMessage(content=message)],
    }
    return results


research_chain = enter_chain | chain

In [41]:
from IPython.display import Image, display

display(Image(chain.get_graph(xray=True).draw_mermaid_png()))

<IPython.core.display.Image object>

We can give this team work directly. Try it out below.

In [27]:
async for s in research_chain.astream_events(
    "when is Taylor Swift's next tour?",
    {"recursion_limit": 100},
    version="v2",
):
    match s["event"]:
        case "on_chat_model_stream":
            print(s["data"]["chunk"].content, end="")

Taylor Swift's next tour is "The Eras Tour," which will include international dates in 2024. Here are some of the key dates and locations:

- **February 7-10**: Tokyo Dome, Tokyo, Japan
- **February 16-18**: Melbourne Cricket Ground, Melbourne, Australia
- **February 23-26**: Accor Stadium, Sydney, Australia
- **March 2-4**: National Stadium, Singapore
- **May 9-12**: Paris La Défense Arena, Nanterre, France
- **May 17-19**: Friends Arena, Stockholm, Sweden
- **May 24-25**: Estádio da Luz, Lisbon, Portugal
- **May 30**: Estadio Santiago Bernabéu, Madrid, Spain
- **June 2-3**: Groupama Stadium, Lyon, France
- **June 7-9**: BT Murrayfield Stadium, Edinburgh, Scotland
- **June 13-15**: Anfield Stadium, Liverpool, England
- **June 18**: Principality Stadium, Cardiff, Wales
- **June 21-23**: Wembley Stadium, London, England
- **June 28-30**: Aviva Stadium, Dublin, Ireland
- **July 4-6**: Johan Cruijff Arena, Amsterdam, Netherlands
- **July 9-10**: Letzigrund Stadion, Zurich, Switzerland
- *

### Document Writing Team

Create the document writing team below using a similar approach. This time, we will give each agent access to different file-writing tools.

Note that we are giving file-system access to our agent here, which is not safe in all cases.

In [36]:
import operator
from pathlib import Path
from langchain_core.runnables import RunnableLambda


# Document writing team graph state
class DocWritingState(TypedDict):
    # This tracks the team's conversation internally
    messages: Annotated[List[BaseMessage], operator.add]
    # This is how the supervisor tells langgraph who to work next
    next: str
    # This tracks the shared directory state
    current_files: str


# This will be run before each worker agent begins work
# It makes it so they are more aware of the current state
# of the working directory.
def prelude(state: DocWritingState):
    written_files = []
    if not WORKING_DIRECTORY.exists():
        WORKING_DIRECTORY.mkdir()
    try:
        written_files = [
            f.relative_to(WORKING_DIRECTORY) for f in WORKING_DIRECTORY.rglob("*")
        ]
    except Exception:
        pass
    if not written_files:
        return {**state, "current_files": "No files written."}
    return {
        **state,
        "current_files": "\nBelow are files your team has written to the directory:\n"
        + "\n".join([f" - {f}" for f in written_files]),
    }


llm = ChatOpenAI(model="gpt-4o")

doc_writing_agent = Worker(
    "DocWriter",
    llm,
    [write_document, edit_document, read_document],
    "You are an expert writing a research document.\n"
    # The {current_files} value is populated automatically by the graph state
    "Below are files currently in your directory:\n{current_files}",
    before=prelude,
)
# Injects current directory working state before each call
context_aware_doc_writer_agent = prelude | RunnableLambda(doc_writing_agent)

note_taking_agent = Worker(
    "NoteTaker",
    llm,
    [create_outline, read_document],
    "You are an expert senior researcher tasked with writing a paper outline and"
    " taking notes to craft a perfect paper.{current_files}",
    before=prelude,
)
context_aware_note_taking_agent = prelude | RunnableLambda(note_taking_agent)

chart_generating_agent = Worker(
    "ChartGenerator",
    llm,
    [read_document, python_repl],
    "You are a data viz expert tasked with generating charts for a research project."
    "{current_files}",
    before=prelude,
)
context_aware_chart_generating_agent = prelude | RunnableLambda(chart_generating_agent)

doc_writing_supervisor = Supervisor(
    llm,
    [doc_writer_agent, note_taking_agent, chart_generating_agent],
)

With the objects themselves created, we can form the graph.

In [43]:
# Create the graph here:
# Note that we have unrolled the loop for the sake of this doc
authoring_graph = StateGraph(DocWritingState)
authoring_graph.add_node(doc_writing_agent.name, doc_writing_agent)
authoring_graph.add_node(note_taking_agent.name, note_taking_agent)
authoring_graph.add_node(chart_generating_agent.name, chart_generating_agent)
authoring_graph.add_node("supervisor", doc_writing_supervisor)

# Add the edges that always occur
authoring_graph.add_edge(START, "supervisor")
authoring_graph.add_edge(doc_writing_agent.name, "supervisor")
authoring_graph.add_edge(note_taking_agent.name, "supervisor")
authoring_graph.add_edge(chart_generating_agent.name, "supervisor")


# Add the edges where routing applies
def route_supervisor(
    state: DocWritingState,
) -> Literal["DocWriter", "NoteTaker", "ChartGenerator"]:
    match state["next"]:
        case "FINISH":
            return END
        case _:
            return state["next"]


authoring_graph.add_conditional_edges(
    "supervisor",
    lambda x: x["next"],
    {
        "DocWriter": "DocWriter",
        "NoteTaker": "NoteTaker",
        "ChartGenerator": "ChartGenerator",
        "FINISH": END,
    },
)

chain = authoring_graph.compile()


# The following functions interoperate between the top level graph state
# and the state of the research sub-graph
# this makes it so that the states of each graph don't get intermixed


class Handoff:
    def enter_chain(message: str, members: List[str]):
        results = {
            "messages": [HumanMessage(content=message)],
            "team_members": ", ".join(members),
        }
        return results


# We reuse the enter/exit functions to wrap the graph
authoring_chain = (
    functools.partial(enter_chain, members=authoring_graph.nodes)
    | authoring_graph.compile()
)

NameError: name 'functools' is not defined

In [None]:
from IPython.display import Image, display

display(Image(chain.get_graph().draw_mermaid_png()))

In [None]:
async for s in authoring_chain.astream_events(
    "Write an outline for poem and then write the poem to disk.",
    {"recursion_limit": 100},
    version="v2",
):
    match s["event"]:
        case "on_chat_model_stream":
            print(s["data"]["chunk"].content, end="")

## Add Layers

In this design, we are enforcing a top-down planning policy. We've created two graphs already, but we have to decide how to route work between the two.

We'll create a _third_ graph to orchestrate the previous two, and add some connectors to define how this top-level state is shared between the different graphs.

In [None]:
from langchain_core.messages import BaseMessage
from langchain_openai.chat_models import ChatOpenAI

llm = ChatOpenAI(model="gpt-4o")

supervisor_node = create_team_supervisor(
    llm,
    "You are a supervisor tasked with managing a conversation between the"
    " following teams: {team_members}. Given the following user request,"
    " respond with the worker to act next. Each worker will perform a"
    " task and respond with their results and status. When finished,"
    " respond with FINISH.",
    ["ResearchTeam", "PaperWritingTeam"],
)

In [None]:
# Top-level graph state
class State(TypedDict):
    messages: Annotated[List[BaseMessage], operator.add]
    next: str


def get_last_message(state: State) -> str:
    return state["messages"][-1].content


def join_graph(response: dict):
    return {"messages": [response["messages"][-1]]}


# Define the graph.
super_graph = StateGraph(State)
# First add the nodes, which will do the work
super_graph.add_node("ResearchTeam", get_last_message | research_chain | join_graph)
super_graph.add_node(
    "PaperWritingTeam", get_last_message | authoring_chain | join_graph
)
super_graph.add_node("supervisor", supervisor_node)

# Define the graph connections, which controls how the logic
# propagates through the program
super_graph.add_edge("ResearchTeam", "supervisor")
super_graph.add_edge("PaperWritingTeam", "supervisor")
super_graph.add_conditional_edges(
    "supervisor",
    lambda x: x["next"],
    {
        "PaperWritingTeam": "PaperWritingTeam",
        "ResearchTeam": "ResearchTeam",
        "FINISH": END,
    },
)
super_graph.set_entry_point("supervisor")
super_graph = super_graph.compile()

In [None]:
from IPython.display import Image, display

display(Image(super_graph.get_graph().draw_mermaid_png()))

In [None]:
for s in super_graph.stream(
    {
        "messages": [
            HumanMessage(
                content="Write a brief research report on the North American sturgeon. Include a chart."
            )
        ],
    },
    {"recursion_limit": 150},
):
    if "__end__" not in s:
        print(s)
        print("---")