# LangGraph multi-agent design pattern for user-facing apps

I tried many of the existing LangGraph design patterns. And while they are great, they lack easy integration into user-facing front-end applications.

This design pattern facilitates smooth conversation between the user and various agents on an example of the agents that help the user create tasks in external systems and get information about these tasks.

This code can easily be integrated into your user-facing apps like web-chats or Telegram/Whatsapp bots and modified to interact with your task management systems (Asana, Teamwork, Jira, Trello, Monday.com, Basecamp, ClickUp, GoHighLevel (GHL) or any other you use).

Adding new functions of the agents is generally as easy as adding new tools in the respective place below.

The code is largely based on https://github.com/langchain-ai/langgraph/blob/main/examples/multi_agent/multi-agent-collaboration.ipynb with additional features for easy integration.

Created by Tony AI Champ, August 2024.

## Prepare the stage

Install required libraries

In [2]:
%%capture --no-stderr
%pip install -U langchain langchain_openai langsmith pandas langchain_experimental matplotlib langgraph==0.1.17 langchain_core

Import environment variables

Make sure you have OPENAI_API_KEY in your .env.

In [3]:
import os

from dotenv import load_dotenv
load_dotenv("./.env")

True

## Define assistants and their tools

Here we define LLMs used for each of the assistants and their system prompts.

In [4]:

from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4o")

assistants = {
    "HelloAssistant": {
        "llm": llm,
        "system_message": """
            You are HalloAssistant, one of the assistants from the AI agent team AIPA (AI personal assistants).
            Your role is to understand what the user wants to do: reate a new task or get information about an existing one.
            Once the user tells they want to create a task, you use switch_assistant tools to switch to TaskSetterAssistant.
        """.replace("                ", "")
    },
    "TaskSetterAssistant": {
        "llm": llm,
        "system_message": """
            You are a TaskSetterAssistant, one of the assistants from the AI agent team AIPA (AI personal assistants).
            Your role is to help the user set a new task.
            The process is:
            1. get the task description from the user
            2. ask clarifying questions
            3. provide updated task description
            You continue repeating pp.2-3 till the user confirms they are good with the task description.
        """.replace("                ", "")
    }
}

Here we define tools our assistants will be using.

In [5]:


from langchain_core.tools import tool
from typing import Annotated, List, TypedDict, Union, Dict
from typing_extensions import Literal

# This tool is used to switch between the assistants.
#  We could use assistants' outputs based on user inputs for that, but in this case this would be difficult to filter out this technical messages
#  from the output for the user on the front-end
@tool
def switch_assistant(
    assistant_name: Annotated[Literal[tuple(list(assistants.keys()))], "The name of the assistant to switch to."]
):
    """Use this to switch the current assistant to another one."""
    return assistant_name

# This tool creates a new task
@tool
def task_create(
    task_name: Annotated[str, "The name of the task."],
    task_description: Annotated[str, "The description of the task."],
):
    """Use this to create a new task."""

    # You can change the following code for integration with your task managemnt system like
    #   Asana, Teamwork, Jira, Trello, Monday.com, Basecamp, ClickUp, GoHighLevel (GHL) or any other you use.

    import os
    import uuid
    # Create 'tasks' folder if it doesn't exist
    tasks_folder = "tasks"
    os.makedirs(tasks_folder, exist_ok=True)
    # Generate a unique filename using UUID
    filename = f"{uuid.uuid4()}.txt"
    file_path = os.path.join(tasks_folder, filename)
    # Write task information to the file
    result_str = ""
    with open(file_path, "w") as file:
        file.write(f"Task Name: {task_name}\n")
        file.write(f"Task Description: {task_description}\n")
    result_str += f"\nTask file created: {file_path}"

    return result_str


@tool
def list_tasks() -> str:
    """Use this to get a list of all existing tasks."""

    import os
    tasks_folder = "tasks"
    tasks = []
    if os.path.exists(tasks_folder):
        for filename in os.listdir(tasks_folder):
            if filename.endswith(".txt"):
                file_path = os.path.join(tasks_folder, filename)
                with open(file_path, "r") as file:
                    content = file.read()
                    task_name = content.split("Task Name: ")[1].split("\n")[0]
                    tasks.append({
                        "id": filename.split('.')[0],
                        "name": task_name,
                    })

    return str(tasks)


@tool
def get_task_info(task_id: Annotated[str, "The unique identifier of the task."]) -> str:
    """Use this to get detailed information about a specific task."""

    import os
    tasks_folder = "tasks"
    file_path = os.path.join(tasks_folder, f"{task_id}.txt")
    if not os.path.exists(file_path):
        return {"error": f"Task with id {task_id} not found."}
    with open(file_path, "r") as file:
        content = file.read()
        task_name = content.split("Task Name: ")[1].split("\n")[0]
        task_description = content.split("Task Description: ")[1].strip()
    return str({
        "id": task_id,
        "name": task_name,
        "description": task_description
    })



Adding tools to assistants definitions

In [6]:
assistants["HelloAssistant"]["tools"] = [switch_assistant, list_tasks, get_task_info]
assistants["TaskSetterAssistant"]["tools"] = [task_create, switch_assistant]

Defining router that will be used in conditional edges

In [7]:
RouterOutput = Literal[tuple(list(assistants.keys()))]

@staticmethod
def router(state) -> RouterOutput:
    # if we need to switch back to user
    if state["next"] == "user":
        return "user"
    # if the last message was sent by an assistant
    else:
        return state['current_agent']

## MongoDB checkpointer

In [8]:
# Mongo DB checkpointer

# This code is based on https://langchain-ai.github.io/langgraph/how-tos/persistence_mongodb/, but updated to behave async capabilities.

import pickle
from contextlib import AbstractContextManager
from types import TracebackType
from typing import Any, Dict, Iterator, Optional, List, AsyncIterator, Tuple
from motor.motor_asyncio import AsyncIOMotorClient

from langchain_core.runnables import RunnableConfig
from typing_extensions import Self

from langgraph.checkpoint.base import (
    BaseCheckpointSaver,
    Checkpoint,
    CheckpointMetadata,
    CheckpointTuple,
    SerializerProtocol,
)
from langgraph.serde.jsonplus import JsonPlusSerializer

class JsonPlusSerializerCompat(JsonPlusSerializer):
    """A serializer that supports loading pickled checkpoints for backwards compatibility."""

    def loads(self, data: bytes) -> Any:
        if data.startswith(b"\x80") and data.endswith(b"."):
            return pickle.loads(data)
        return super().loads(data)

class MongoDBSaver(AbstractContextManager, BaseCheckpointSaver):
    """A checkpoint saver that stores checkpoints in a MongoDB database."""

    serde = JsonPlusSerializerCompat()

    def __init__(
        self,
        client: AsyncIOMotorClient,
        db_name: str,
        collection_name: str,
        *,
        serde: Optional[SerializerProtocol] = None,
    ) -> None:
        super().__init__(serde=serde)
        self.client = client
        self.db_name = db_name
        self.collection_name = collection_name
        self.collection = client[db_name][collection_name]

    def __enter__(self) -> Self:
        return self

    def __exit__(
        self,
        __exc_type: Optional[type[BaseException]],
        __exc_value: Optional[BaseException],
        __traceback: Optional[TracebackType],
    ) -> Optional[bool]:
        return True

    async def aget_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]:
        query = {"thread_id": config["configurable"]["thread_id"]}
        if config["configurable"].get("thread_ts"):
            query["thread_ts"] = config["configurable"]["thread_ts"]
        doc = await self.collection.find_one(query, sort=[("thread_ts", -1)])
        if doc:
            return CheckpointTuple(
                config,
                self.serde.loads(doc["checkpoint"]),
                self.serde.loads(doc["metadata"]),
                (
                    {
                        "configurable": {
                            "thread_id": doc["thread_id"],
                            "thread_ts": doc["parent_ts"],
                        }
                    }
                    if doc.get("parent_ts")
                    else None
                ),
            )
        return None

    async def alist(
        self,
        config: Optional[RunnableConfig],
        *,
        filter: Optional[Dict[str, Any]] = None,
        before: Optional[RunnableConfig] = None,
        limit: Optional[int] = None,
    ) -> AsyncIterator[CheckpointTuple]:
        query = {}
        if config is not None:
            query["thread_id"] = config["configurable"]["thread_id"]
        if filter:
            for key, value in filter.items():
                query[f"metadata.{key}"] = value
        if before is not None:
            query["thread_ts"] = {"$lt": before["configurable"]["thread_ts"]}
        
        cursor = self.collection.find(query).sort("thread_ts", -1)
        if limit:
            cursor = cursor.limit(limit)
        
        async for doc in cursor:
            yield CheckpointTuple(
                {
                    "configurable": {
                        "thread_id": doc["thread_id"],
                        "thread_ts": doc["thread_ts"],
                    }
                },
                self.serde.loads(doc["checkpoint"]),
                self.serde.loads(doc["metadata"]),
                (
                    {
                        "configurable": {
                            "thread_id": doc["thread_id"],
                            "thread_ts": doc["parent_ts"],
                        }
                    }
                    if doc.get("parent_ts")
                    else None
                ),
            )

    # Add synchronous version if needed
    def list(
        self,
        config: Optional[RunnableConfig],
        *,
        filter: Optional[Dict[str, Any]] = None,
        before: Optional[RunnableConfig] = None,
        limit: Optional[int] = None,
    ) -> Iterator[CheckpointTuple]:
        import asyncio
        return iter(asyncio.run(self.alist_checkpoints(config)))
    
    async def aput(
        self,
        config: RunnableConfig,
        checkpoint: Checkpoint,
        metadata: CheckpointMetadata,
    ) -> RunnableConfig:
        doc = {
            "thread_id": config["configurable"]["thread_id"],
            "thread_ts": checkpoint["id"],
            "checkpoint": self.serde.dumps(checkpoint),
            "metadata": self.serde.dumps(metadata),
        }
        if config["configurable"].get("thread_ts"):
            doc["parent_ts"] = config["configurable"]["thread_ts"]
        await self.collection.insert_one(doc)
        return {
            "configurable": {
                "thread_id": config["configurable"]["thread_id"],
                "thread_ts": checkpoint["id"],
            }
        }

    async def aget_state(self, config: RunnableConfig) -> Optional[Checkpoint]:
        checkpoint_tuple = await self.aget_tuple(config)
        if checkpoint_tuple:
            return checkpoint_tuple.checkpoint
        return None

    async def aset_state(self, config: RunnableConfig, state: Checkpoint) -> None:
        await self.aput(config, state, {})

    async def aput_writes(
        self,
        config: RunnableConfig,
        writes: List[Tuple[str, Any]],
        task_id: str,
    ) -> None:
        thread_id = config["configurable"]["thread_id"]
        thread_ts = config["configurable"].get("thread_ts")
        
        doc = {
            "thread_id": thread_id,
            "task_id": task_id,
            "writes": [
                (key, self.serde.dumps(value).decode('utf-8'))
                for key, value in writes
            ],
        }
        if thread_ts:
            doc["thread_ts"] = thread_ts
        
        await self.collection.insert_one(doc)

    async def alist_checkpoints(self, config: Optional[RunnableConfig] = None) -> List[CheckpointTuple]:
        return [checkpoint async for checkpoint in self.alist(config)]

    async def adelete_checkpoint(self, config: RunnableConfig) -> None:
        if config["configurable"].get("thread_ts"):
            query = {
                "thread_id": config["configurable"]["thread_id"],
                "thread_ts": config["configurable"]["thread_ts"],
            }
        else:
            query = {"thread_id": config["configurable"]["thread_id"]}
        await self.collection.delete_one(query)

    async def aclear(self) -> None:
        await self.collection.delete_many({})

    # # Synchronous methods (these call their async counterparts)
    # def get_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]:
    #     import asyncio
    #     return asyncio.run(self.aget_tuple(config))

    def get_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]:
        import asyncio
        loop = asyncio.get_event_loop()
        return loop.run_until_complete(self.aget_tuple(config))


    # def list(
    #     self,
    #     config: Optional[RunnableConfig],
    #     *,
    #     filter: Optional[Dict[str, Any]] = None,
    #     before: Optional[RunnableConfig] = None,
    #     limit: Optional[int] = None,
    # ) -> Iterator[CheckpointTuple]:
    #     import asyncio
    #     return iter(asyncio.run(self.alist_checkpoints(config)))

    def put(
        self,
        config: RunnableConfig,
        checkpoint: Checkpoint,
        metadata: CheckpointMetadata,
    ) -> RunnableConfig:
        import asyncio
        return asyncio.run(self.aput(config, checkpoint, metadata))

    def get_state(self, config: RunnableConfig) -> Optional[Checkpoint]:
        import asyncio
        return asyncio.run(self.aget_state(config))

    def set_state(self, config: RunnableConfig, state: Checkpoint) -> None:
        import asyncio
        asyncio.run(self.aset_state(config, state))

    def list_checkpoints(self, config: Optional[RunnableConfig] = None) -> List[CheckpointTuple]:
        import asyncio
        return asyncio.run(self.alist_checkpoints(config))

    def delete_checkpoint(self, config: RunnableConfig) -> None:
        import asyncio
        asyncio.run(self.adelete_checkpoint(config))

    def clear(self) -> None:
        import asyncio
        asyncio.run(self.aclear())


# Initialize the MongoDB client
client = AsyncIOMotorClient("mongodb://localhost:27017/")

# Create the MongoDBSaver
memory = MongoDBSaver(client, "checkpoints_db", "checkpoints_collection")

## AIPAAgent class

This class creates the agent graph and methods to communicate with the agents.

In [9]:
# AIPAAgent class

from langchain_core.messages import BaseMessage, AIMessage, HumanMessage, ToolMessage, SystemMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder

from langgraph.graph import END, StateGraph, START
from langgraph.prebuilt import ToolNode
from langgraph.checkpoint.aiosqlite import AsyncSqliteSaver

from langchain.pydantic_v1 import BaseModel, Field

from typing import Annotated, List, TypedDict, Union
from typing_extensions import Literal

import operator

import functools

import json
from pygments import highlight, lexers, formatters



class AIPAAgent:
    # default model
    llm = ChatOpenAI(model="gpt-4o")
    # this is for storing agent state
    checkpointer = None
    # this is for retrieving agent state
    checkpointer_config = {}

    def __init__(self, assistants, llm, router, checkpointer, checkpointer_config):
        self.assistants = assistants
        self.llm = llm
        self.checkpointer = checkpointer
        self.checkpointer_config = checkpointer_config
        self.router = router

        self.tools = [tool for a_name, a_data in self.assistants.items() for tool in a_data["tools"]]

        # Creating assistants nodes
        for a_name, a_data in self.assistants.items():
            agent_creator = self.create_agent(llm=a_data["llm"], tools=a_data["tools"], system_message=a_data["system_message"])
            node = functools.partial(self.agent_node, agent_creator=agent_creator, name=a_name)
            # globals()[f"node_{a_name}"] = node
            setattr(self, f"node_{a_name}", node)

        # Defining agentic workflow

        workflow = StateGraph(self.AgentState)

        workflow.add_node("user", self.user_node)
        workflow.add_node("HelloAssistant", self.node_HelloAssistant)
        workflow.add_node("TaskSetterAssistant", self.node_TaskSetterAssistant)

        workflow.add_conditional_edges(
            "user",
            self.router,
            {"user": "user", "HelloAssistant": "HelloAssistant", "TaskSetterAssistant": "TaskSetterAssistant"},
        )

        workflow.add_conditional_edges(
            "HelloAssistant",
            self.router,
            {"user": "user", "HelloAssistant": "HelloAssistant", "TaskSetterAssistant": "TaskSetterAssistant"},
        )

        workflow.add_conditional_edges(
            "TaskSetterAssistant",
            self.router,
            {"user": "user", "TaskSetterAssistant": "TaskSetterAssistant", "HelloAssistant": "HelloAssistant"},
        )

        workflow.add_edge(START, "user")
        self.graph = workflow.compile(checkpointer=checkpointer, interrupt_before=["user"])



    # Defining state class for our agents

    #  This defines the object that is passed between each node
    #  in the graph.
    class AgentState(TypedDict):
        messages: Annotated[List[Union[BaseMessage, SystemMessage]], operator.add]
        # used to switch back to current assistant
        current_agent: str
        # used when we need to switch the assistant and handout to the user
        next: str
    

    # Defining general function to create an agent

    def create_agent(self, llm, tools=[], system_message=""):
        def _create_agent():
            prompt = ChatPromptTemplate.from_messages(
                [
                    (
                        "system",
                        "{system_message}\n"
                        " You have access to the following tools: {tool_names}.\n",
                    ),
                    MessagesPlaceholder(variable_name="messages"),
                ]
            )
            prompt = prompt.partial(system_message=system_message)
            if tools:
                prompt = prompt.partial(tool_names=", ".join([tool.name for tool in tools]))
                return prompt | llm.bind_tools(tools)
            else:
                prompt = prompt.partial(tool_names="None")
                return prompt | llm
        return _create_agent
    

    # Defining nodes

    # Helper function to create a node for a given agent
    def agent_node(self, state, agent_creator, name):
        agent = agent_creator()
        result = agent.invoke(state)

        # executing tools
        if result.tool_calls:
            tool_messages = []
            ret_val = {}
            for tool_call in result.tool_calls: 
                tool_name = tool_call["name"]
                tool_args = tool_call["args"]
                if tool_name == 'switch_assistant':
                    ret_val['current_agent'] = tool_args['assistant_name']
                    ret_val['next'] = tool_args['assistant_name']
                tool_function = next((tool for tool in self.tools if tool.name == tool_name), None)
                tool_output = tool_function.invoke(tool_args)
                tool_messages.append(ToolMessage(content=tool_output, name=tool_name, type="tool", tool_call_id=tool_call['id']))
            ret_val["messages"] = [result, *tool_messages]
            return ret_val

        result = AIMessage(**result.dict(exclude={"type", "name"}), name=name, type="ai")
        return {
            "messages": [result],
            "current_agent": name,
            "next": "user"
        }
    
    # User node represents input from outside of the graph and is used
    # as a breakpoint in the agent work to receive such output
    def user_node(self, state):
        return


    # Helper function for debug purposes. Colorifying JSON.
    def json_colorify(self, obj):

        def default(o):
            if isinstance(o, BaseMessage):
                return o.dict()
            return str(o)

        formatted_json = json.dumps(obj, indent=4, default=default)
        colorful_json = highlight(formatted_json,
                                lexers.JsonLexer(),
                                formatters.TerminalFormatter())
        return colorful_json



    # Methods to communicate with the graph

    # Output method of the graph
    async def agent_message(self, config):
        """Executing agentic workflow and streaming completion tokens."""

        # init_values is used to set initial state if there is no state in the checkpointer
        init_values = {
            "current_agent": "HelloAssistant",
            "next": "user"
        # otherwise the init values are None
        } if not self.graph.get_state(config).values["messages"] else None

        # streaming completion tokens
        async for event in self.graph.astream_events(init_values, config, version="v1"):
            kind = event["event"]
            if kind == "on_chat_model_stream":
                content = event["data"]["chunk"].content
                if content:
                    yield content


    # Input method of the graph
    def user_input(self, user_input):
        """Inserting user input into the graph."""

        self.graph.update_state(self.checkpointer_config, {"messages": [HumanMessage(content=user_input,name="user",type="human")], "next": self.graph.get_state(self.checkpointer_config).values['current_agent']}, as_node="user") #"sender": "user", 
        return user_input


## Invoke

With the graph created, you can invoke it! Let's have it chart some stats for us.

In [10]:
import warnings
warnings.filterwarnings('ignore')

import nest_asyncio

nest_asyncio.apply()

async def main():
    # thread_id is basically a unique chat session id
    checkpointer_config={"configurable": {"thread_id": "306"}}
    agent = AIPAAgent(assistants=assistants, llm=ChatOpenAI(model="gpt-4o"), router=router, checkpointer=memory, checkpointer_config=checkpointer_config)

    for i, m in enumerate(agent.graph.get_state(checkpointer_config).values['messages']):
        # if m.content and m.type != "tool":
            # print(f"{i}: {m.name} ({m.type}): {m.content}\n")
        print(f"[{i}] {m}\n")

    while 1:
        if agent.graph.get_state(checkpointer_config).values.get("next") and agent.graph.get_state(checkpointer_config).values.get("next") != "user":
            print("Assistant: ", end="")
        try:
            async for event in agent.agent_message(checkpointer_config):
                print(event, end="")
            print("\n\n")
        except Exception as e:
            print(f"model error: {e}")

        user_input = agent.user_input(input("Message: "))
        print("User: ", end="")
        print(user_input)
        print("\n\n")

        if user_input =="exit":
            break

await main()




User: hi



Assistant: Hello! How can I assist you today? Are you looking to create a new task or get information about an existing one?


User: list



Assistant: Here are your existing tasks:

1. **Procure 3D Printing Price in New York** (ID: c0159b37-2225-4a1b-af8a-67dcacf119b6)
2. **Procure 3D Printing Price in Jakarta** (ID: e4c68a21-591c-4810-b304-3f6af7cd3f15)

Would you like more information about any of these tasks? If so, please provide the task ID.


User: information about c0159b37-2225-4a1b-af8a-67dcacf119b6



Assistant: Here is the detailed information about the task **"Procure 3D Printing Price in New York"**:

- **Task ID:** c0159b37-2225-4a1b-af8a-67dcacf119b6
- **Description:** 
  - Procure the price of printing one piece of the 3D model available at [this link](https://www.printables.com/model/152592-honeycomb-storage-wall) (specifically, the file `wall-honeycomb-part.stl`) in New York.
  - Consider any available 3D printing services in New York.
  - The material