# 04: Building a Multi-Agent Workflow with StateGraph

In this notebook, we will explore how to build a multi-agent workflow using **LangGraph's StateGraph**. A **StateGraph** allows agents to communicate through a shared state, enabling them to collaborate on complex tasks. Each node in the graph performs an action, reads from the shared state, and writes results back to it, forming a seamless workflow between agents.

By the end of this notebook, you'll understand how to:

- Define a shared state schema for agent communication.
- Build and compile a `StateGraph` where nodes represent agents or actions.
- Use multi-agent workflows to solve more complex tasks by aggregating results across different agents.

## Introduction to StateGraph

A `StateGraph` is a graph structure where each node represents an action or a task that modifies a shared state. The state is passed between nodes, allowing each node (or agent) to update it with partial results. You can think of it as a chain of actions, where each action contributes to solving a larger problem.

### Example Use Cases

- **Multi-agent Collaboration**: Multiple agents working together to solve a problem by performing different tasks, each contributing to the final solution.
- **Data Processing Pipelines**: Complex workflows where each node processes and transforms data, passing the results to the next node.

In this notebook, we will focus on building such a workflow and demonstrate how agents can communicate by reading from and writing to the shared state.

## 1. Setup

Before we begin, let's make sure your environment is set up correctly. We'll start by installing the necessary Python packages.

### Installing Required Packages

To get started, you'll need to install a few Python libraries. Run the following command to install them:

In [91]:
%pip install langchain langgraph langgraph-checkpoint-sqlite langchain_community requests termcolor wikipedia arxiv

Note: you may need to restart the kernel to use updated packages.


## 2. Configuring a Simple Model

In this section, we configure the machine learning model that our agent will use to process tasks. The `ModelService` class manages the interaction with the model (in this case, "llama3.1:8b-instruct-fp16"), allowing the agent to handle tasks such as listing VMs and retrieving details.

### Model Configuration

We initialize the `ModelService` with a specific model configuration, including parameters such as model endpoint, temperature (for controlling randomness), and others. This step enables our agent to perform model-based tasks using the provided configuration.

In [92]:
from services.model_service import ModelService

# Initialize the service with the model configuration
ollama_service = ModelService(model="llama3.1:8b-instruct-fp16")

## 3. Defining the Shared State

In this step, we will define the shared state structure that the agents will use to communicate in the workflow. Each agent or node in the graph will read from this state, perform its task, and then update the state with new information. This shared state allows the agents to collaborate on solving a problem by contributing their partial results.

### Shared State Structure

We will use a custom class, `AgentGraphState`, to define the structure of the shared state. This class will store the input provided by the user and the responses from different agents. The key fields in our state will be:

- **input**: The initial input provided by the user (e.g., a question or task).
- **researcher_response**: A list that stores the responses from the "researcher" agent. This agent might be responsible for gathering relevant information or performing research.
- **search_response**: A list that stores the results from the "search" agent, which can handle web searches or database queries.

The fields `researcher_response` and `search_response` are annotated with `add_messages`, which is a helper function that aggregates responses. This allows the agents to append new messages to the existing lists.

In [93]:
from langgraph.graph.message import add_messages
from typing import Annotated, TypedDict, Any


class AgentGraphState(TypedDict):
    input: str
    researcher_response: Annotated[list, add_messages]
    search_response: Annotated[list, add_messages]
    evaluator_response: Annotated[list, add_messages]

### Updating the State

To allow agents to update the shared state, we will create a utility function, update_state. This function takes the current state, a key (the field in the state to be updated), and a value (the new data to be added).

If the specified key exists in the state, the function updates it with the new value. If the key doesn't exist, the function issues a warning, ensuring that agents can only update pre-defined fields in the state.

In [94]:
def update_state(state: AgentGraphState, key: str, value: Any):
    """
    Update the state of the agent. Warn if the key doesn't exist.
    """
    if key in state:
        state[key] = value
    else:
        print(f"Warning: Attempting to update a non-existing state key '{key}'.")

## 4. Creating the Agents

In this step, we will define the agents that will perform tasks within the graph. Each agent is responsible for using external tools to gather data, such as searching Wikipedia, querying Arxiv, or performing web searches. These agents will use the shared state to store their outputs and pass information to other agents in the workflow.

### Agents Overview

We will create two types of agents in this step:
1. **Researcher Agent**: This agent will be responsible for gathering academic and knowledge-based information from sources like Wikipedia and Arxiv.
2. **Search Agent**: This agent will handle web searches using tools like DuckDuckGo to retrieve information from the internet.

Both agents will be built using the `ReactAgent` class, which can perform the **Thought → Action → Observation** loop, invoking external tools and storing the results in the shared state.

### Configuring the Researcher Agent

The researcher agent will use two tools:
- **Wikipedia API**: We will configure this tool to return the top 1 result from Wikipedia, limiting the content length for concise responses.
- **Arxiv API**: This tool will retrieve academic papers related to the query, again returning only the top 1 result.

These tools are added to a list called `researcher_tools`, which will be used by the researcher agent.

In [95]:

from langchain_community.tools import WikipediaQueryRun
from langchain_community.utilities import WikipediaAPIWrapper
from langchain_community.utilities import ArxivAPIWrapper
from langchain_community.tools import ArxivQueryRun

api_wrapper = WikipediaAPIWrapper(top_k_results=1, doc_content_chars_max=200)
wiki = WikipediaQueryRun(api_wrapper=api_wrapper)

arxiv_wrapper = ArxivAPIWrapper(top_k_results=1, doc_content_chars_max=200)
arxiv = ArxivQueryRun(api_wrapper=arxiv_wrapper)

# Adding the search tool to the list of available tools
researcher_tools = [arxiv, wiki]

In [96]:
from agent.react_agent import ReactAgent

def researcher_node_function(state: AgentGraphState):
    researcher_agent = ReactAgent(
        state=state,
        role="researcher",
        tools=researcher_tools,
        ollama_service=ollama_service,
    )

    return researcher_agent.react(user_request=state["input"])

### Configuring the Search Agent

The search agent uses the **DuckDuckGo** search tool, which allows it to perform general web searches. The tool fetches relevant results from the internet based on the user query. This agent will store the search results in the `search_response` field of the shared state.

In [97]:
from langchain_community.tools import DuckDuckGoSearchRun

# Initialize DuckDuckGo Search Tool
duckduckgo_search = DuckDuckGoSearchRun()

search_tools = [duckduckgo_search]

In [98]:
def search_node_function(state: AgentGraphState):
    search_agent = ReactAgent(
        state=state,
        role="search",
        tools=search_tools,
        ollama_service=ollama_service,
    )

    return search_agent.react(user_request=state["input"])

## Evaluatorting the Research and Search

In [99]:
def create_eval_user_request(state: AgentGraphState) -> str:
    """
    Formulate a user request for the evaluation node, including the responses from both agents.

    Parameters:
    - state: The current state of the workflow containing the responses from both agents.
    - input_query: The original user query.

    Returns:
    - str: The formatted evaluation request.
    """
    # Retrieve the last entries from the researcher and search responses

    last_research = state["researcher_response"][-1].content
    last_search = state["search_response"][-1].content

    # Formulate the user request including both agent responses
    user_request = f"""
    As an evaluator, your task is to analyze the responses from two agents based on the following query: '{state["input"]}'.

    Agents:
    1. **Researcher Agent**: Uses academic and knowledge-based sources such as Wikipedia and Arxiv to provide answers.
    2. **Search Agent**: Performs general web searches using DuckDuckGo to gather information.

    Agent Responses:
    - **Researcher Agent Response**: 
    '{last_research}'

    - **Search Agent Response**:
    '{last_search}'

    Evaluation Criteria:
    - **Accuracy**: How factually correct is the information provided by each agent?
    - **Relevance**: How closely does each response align with the query?
    - **Clarity and Conciseness**: How clear and concise is the explanation provided by each agent?

    Instructions:
    Please compare both responses based on the criteria above and determine which agent provided a better response. 
    Provide a detailed evaluation of each agent's response, and conclude which agent performed better overall and why.

    Your output should include:
    - A brief evaluation of each agent's response based on the criteria.
    - A final decision on which agent's response was better, supported by clear reasons.
    """

    return user_request

In [100]:
from agent.base_agent import Agent

def eval_node_function(state: AgentGraphState):
    eval_agent = Agent(
        state=state,
        role="evaluator",
        ollama_service=ollama_service,
    )

    return eval_agent.work(user_request=create_eval_user_request(state))

### How the Agents Work

1. **Researcher Agent**:
   - The researcher agent reads the `input` field from the shared state, which contains the user’s request.
   - It then interacts with Wikipedia and Arxiv to gather relevant information.
   - After processing the user request, the agent updates the `researcher_response` field in the state with the gathered data.

2. **Search Agent**:
   - The search agent also reads from the `input` field in the shared state.
   - It uses DuckDuckGo to perform a web search based on the input and retrieves relevant results.
   - The results are stored in the `search_response` field of the state.

By utilizing these agents, we can build a workflow where the agents collaborate to gather and process information from multiple sources.

## 5. Creating the Graph and Defining the Workflow

In this step, we will define the **StateGraph** that orchestrates the workflow between the agents. The **StateGraph** represents the flow of tasks between the agents, allowing them to share a state and perform actions in a defined sequence. We will also introduce a checkpointing system using **SqliteSaver** to allow the graph to save and retrieve its state during execution.

### Creating the Graph

The **StateGraph** is the core structure that defines how different agents (nodes) interact with one another and pass the shared state along the workflow. We will:
1. **Add Nodes**: The nodes in the graph represent the agents performing tasks. In our case, we add two nodes: the **researcher agent** and the **search agent**.
2. **Define the Workflow**:
   - The workflow starts with the **researcher agent**, which gathers academic or knowledge-based information.
   - The results from the researcher agent are passed to the **search agent**, which performs a web search to gather additional information.
   - The graph ends once the search agent has completed its task and updated the shared state with the results.

By defining edges between the nodes, we can ensure that tasks are executed in a sequential manner, with each agent building upon the results from the previous one.

In [101]:
from langgraph.graph import StateGraph, START, END


def create_graph() -> StateGraph:
    """
    Create the state graph by defining nodes and edges.

    Returns:
    - StateGraph: The compiled state graph ready for execution.
    """
    graph = StateGraph(AgentGraphState)

    # Add nodes
    graph.add_node("researcher_agent", researcher_node_function)
    graph.add_node("search_agent", search_node_function)
    graph.add_node("eval_agent", eval_node_function)

    # Define the flow of the graph
    graph.add_edge(START, "researcher_agent")
    graph.add_edge("researcher_agent", "search_agent")
    graph.add_edge("search_agent", "eval_agent")
    graph.add_edge("eval_agent", END)

    return graph

### Checkpointing with SqliteSaver

To add persistence and fault tolerance to our workflow, we will integrate a **checkpointing system** using **SqliteSaver**. This allows the state graph to save its progress in an in-memory SQLite database, which can later be retrieved or resumed if necessary.

The checkpointing system:
- **Saves the State**: It stores the state of the graph and its nodes, allowing the workflow to be paused and resumed.
- **Memory-Based Storage**: For simplicity, we will use an in-memory SQLite database (`":memory:"`), but this can be replaced with a persistent database for long-term storage.

In [102]:
from langgraph.checkpoint.sqlite import SqliteSaver
import sqlite3


def from_conn_stringx(
    cls,
    conn_string: str,
) -> "SqliteSaver":
    return SqliteSaver(conn=sqlite3.connect(conn_string, check_same_thread=False))


SqliteSaver.from_conn_stringx = classmethod(from_conn_stringx)

memory = SqliteSaver.from_conn_stringx(":memory:")

## Step 6: Executing the Workflow and Handling State Changes

In this final step, we will compile the graph, execute the multi-agent workflow, and monitor the state changes as the agents process the user query. Additionally, we will use a custom serializer to handle and inspect the agent's state at various points in the workflow.

### Creating and Compiling the Workflow

Once the **StateGraph** is defined and nodes are connected, the next step is to **compile** the graph into a runnable workflow. This is achieved using the `compile` method, which transforms the graph into an executable state.

- **Checkpointing**: The workflow uses the **SqliteSaver** to checkpoint the state during execution, ensuring that we can pause and resume the workflow if needed.
- **Interrupt Before**: We define an interrupt point before the **search agent** node, which means the workflow can be paused just before this agent runs, giving us the option to inspect or modify the state.

### Workflow Parameters

Before executing the workflow, we configure several parameters:
- **iterations**: Defines the number of times the workflow will run. In this case, we set it to 10 iterations to allow for multiple rounds of processing.
- **verbose**: If set to `True`, this flag will print detailed information about each event (i.e., what each agent is doing and the state updates).
- **config**: This dictionary contains configurable options, such as `thread_id`, which may be used by agents during task execution.

### Example Query

For this example, we define a query:
- **"Who is the USA's President?"** — This will be the input provided to the agents. The agents will use this query to gather relevant information from the tools (e.g., Wikipedia, Arxiv, DuckDuckGo).

![Multi Agent Workflow](images/workflow-search.png)

In [103]:
# Create the graph and compile the workflow
graph = create_graph()
workflow = graph.compile(checkpointer=memory, interrupt_before=["search_agent"])
print("Graph and workflow created.")

# Define workflow parameters
iterations = 10
verbose = True
config = {"configurable": {"thread_id": "1"}}

query = "Who's the USA's President?"
dict_inputs = {"input": query}
limit = {"recursion_limit": iterations}

Graph and workflow created.


### Executing the Workflow

Once the workflow is compiled, we execute it using the `stream` method, which iterates through each node in the graph and processes the input. At each step:
- The current event (i.e., agent action) is printed, showing what the agent is doing and how the state is changing.
- State changes are printed in real-time, allowing us to observe how the agents update the shared state with new information.

In [104]:
# Execute the workflow and print state changes
for event in workflow.stream(dict_inputs, config):
    if verbose:
        print("\nEvent:", event)
    else:
        print("\n")

[32m<|start_header_id|>user<|end_header_id|>

Who's the USA's President?<|eot_id|>[0m
[36m<|start_header_id|>assistant<|end_header_id|>

{
    "thought": "The question is asking about a specific person, likely someone who has held or holds public office. A suitable tool to use would be Wikipedia, as it has information on various public figures and their roles.",
    "action": "wikipedia",
    "action_input": {
        "query": {
            "title": "List of Presidents of the United States",
            "description": ""
        }
    }
}<|eot_id|>[0m
[35m<|python_tag|>wikipedia.call({'query': {'title': 'List of Presidents of the United States', 'description': ''}})
<|eom_id|>[0m
[33m<|start_header_id|>ipython<|end_header_id|>

{
    "observation": "Error executing tool wikipedia: 1 validation error for WikipediaQueryInput\nquery\n  str type expected (type=type_error.str)"
}<|eot_id|>[0m
[36m<|start_header_id|>assistant<|end_header_id|>

{
    "thought": "The error message ind

### Inspecting the Final State

At the end of the workflow, we capture a **snapshot of the state** using the `get_state` method. This snapshot contains all the information gathered by the agents during the workflow execution. We use a **custom serializer** to serialize the state, making it easy to inspect the contents, such as responses from the agents.

The state snapshot includes:
- **Agent Messages**: These are the interactions between agents and tools, captured as `AIMessage` or `SystemMessage` objects. We use a custom serializer to format these messages for easier readability.
- **Next Node**: Indicates the next node in the workflow, if any, to show where the workflow will continue from if it hasn't finished.

In [105]:
import json
from langchain_core.messages.ai import AIMessage
from langchain_core.messages import SystemMessage


def custom_serializer(obj):
    if isinstance(obj, AIMessage):
        return {
            "content": obj.content,
            "id": obj.id,
            # Add other attributes as needed
        }
    elif isinstance(obj, SystemMessage):
        return {
            "content": obj.content,
            "id": obj.id,
            # Add other attributes as needed
        }
    # Add other custom class serializations here
    else:
        return str(obj)  # Fallback to string conversion if not recognized


state_snapshot = workflow.get_state(config)

print(json.dumps(state_snapshot.values, indent=4, default=custom_serializer))

print(state_snapshot.next)

{
    "input": "Who's the USA's President?",
    "researcher_response": [
        {
            "content": "Sorry, I cannot answer your query.",
            "id": "51d0af9f-587d-48a5-88ae-482b7c1486d8"
        }
    ],
    "search_response": [],
    "evaluator_response": []
}
('search_agent',)


### Re-executing the Workflow

Finally, the workflow can be re-executed by running the `stream` method again. This allows you to modify configurations, input, or parameters and observe how the agents behave under different conditions.

By the end of this step, we have a fully functioning multi-agent workflow that can process complex queries, gather information, and update its shared state accordingly.

In [106]:
# Execute the workflow and print state changes
for event in workflow.stream(None, config):
    if verbose:
        print("\nEvent:", event)
    else:
        print("\n")

[32m<|start_header_id|>user<|end_header_id|>

Who's the USA's President?<|eot_id|>[0m
[36m<|start_header_id|>assistant<|end_header_id|>

{
    "thought": "To find the current president of the United States, I'll need to perform a search with DuckDuckGo.",
    "action": "duckduckgo_search",
    "action_input": {
        "query": {
            "title": "Current US President",
            "description": "search query to look up the USA's current president"
        }
    }
}<|eot_id|>[0m
[35m<|python_tag|>duckduckgo_search.call({'query': {'title': 'Current US President', 'description': "search query to look up the USA's current president"}})
<|eom_id|>[0m
[33m<|start_header_id|>ipython<|end_header_id|>

{
    "observation": "Error executing tool duckduckgo_search: 1 validation error for DDGInput\nquery\n  str type expected (type=type_error.str)"
}<|eot_id|>[0m
[36m<|start_header_id|>assistant<|end_header_id|>

{
    "thought": "The error indicates that the 'query' field should be 

In [107]:
final_state = workflow.get_state(config)

print(json.dumps(final_state.values, indent=4, default=custom_serializer))

{
    "input": "Who's the USA's President?",
    "researcher_response": [
        {
            "content": "Sorry, I cannot answer your query.",
            "id": "51d0af9f-587d-48a5-88ae-482b7c1486d8"
        }
    ],
    "search_response": [
        {
            "content": "I have the answer: Joe Biden.",
            "id": "340f647d-dc02-4178-b513-54975b0b9da9"
        }
    ],
    "evaluator_response": [
        {
            "content": "{\n    \"Evaluation\": {\n        \"Researcher Agent Response\": {\n            \"Accuracy\": 0,\n            \"Relevance\": 1,\n            \"Clarity and Conciseness\": 0.5\n        },\n        \"Search Agent Response\": {\n            \"Accuracy\": 1,\n            \"Relevance\": 1,\n            \"Clarity and Conciseness\": 1\n        }\n    },\n    \"Conclusion\": 0\n}",
            "id": "e72aea3e-4ea7-4f70-9671-7abba46c1bf3"
        }
    ]
}


In [108]:
from termcolor import colored

# Print the user input
print(colored(f"User Query: {final_state.values['input']}\n","green"))

# Print the researcher agent's response
print(colored("Researcher Agent Response:", "yellow"))
if final_state.values["researcher_response"]:
    for message in final_state.values["researcher_response"]:
        if isinstance(message, AIMessage):
            pretty_message = message.pretty_repr()
            print(colored(f"{pretty_message}", "yellow"))
    print("\n")

# Print the researcher agent's response
print(colored("Search Agent Response:", "magenta"))
if final_state.values["search_response"]:
    for message in final_state.values["search_response"]:
        if isinstance(message, AIMessage):
            pretty_message = message.pretty_repr()
            print(colored(f"{pretty_message}", "magenta"))
    print("\n")

# Print the researcher agent's response
print(colored("Evaluator Agent Response:", "cyan"))
if final_state.values["evaluator_response"]:
    for message in final_state.values["evaluator_response"]:
        if isinstance(message, AIMessage):
            pretty_message = message.pretty_repr()
            print(colored(f"{pretty_message}", "cyan"))
    print("\n")

[32mUser Query: Who's the USA's President?
[0m
[33mResearcher Agent Response:[0m

Sorry, I cannot answer your query.[0m


[35mSearch Agent Response:[0m

I have the answer: Joe Biden.[0m


[36mEvaluator Agent Response:[0m

{
    "Evaluation": {
        "Researcher Agent Response": {
            "Accuracy": 0,
            "Relevance": 1,
            "Clarity and Conciseness": 0.5
        },
        "Search Agent Response": {
            "Accuracy": 1,
            "Relevance": 1,
            "Clarity and Conciseness": 1
        }
    },
    "Conclusion": 0
}[0m


