#  **Multi-Agent System with LangGraph and Gemini** 

### Reference from Medium. Author :İpek Şahbazoğlu 

In [1]:
from langchain_google_vertexai import ChatVertexAI
llm = ChatVertexAI(model_name= 'gemini-pro')

In [2]:
from langchain.tools import tool
@tool
def add(a: int, b: int) -> int:
    """Add two numbers."""
    return a + b

@tool
def multiply(a: int, b: int) -> int:
    """Multiply two numbers."""
    return a * b

@tool
def square(a: int) -> int:
    """Calculates the square of a number."""
    a = int(a)
    return a * a

# setup the toolkit
math_toolkit = [add, multiply, square]

from langchain.tools import tool


@tool
def make_upper_case(text: str) -> str:
    """ convert string to all upper case string"""
    return text.upper()


@tool
def make_lower_case(text: str) -> str:
    """ convert string to all lower case string"""
    return text.lower()

@tool
def add_smiley(text: str) -> str:
    """ add smiley face to the end of string"""
    return text + ' :)'


# setup the toolkit
text_toolkit = [make_upper_case, make_lower_case, add_smiley]

In [3]:
from langchain.agents import AgentExecutor, create_tool_calling_agent
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder, PromptTemplate, SystemMessagePromptTemplate

def create_tool_agent(llm: ChatVertexAI, tools: list, system_prompt: str):
    """Helper function to create agents with custom tools and system prompt
    Args:
        llm (ChatVertexAI): LLM for the agent
        tools (list): list of tools the agent will use
        system_prompt (str): text describing specific agent purpose

    Returns:
        executor (AgentExecutor): Runnable for the agent created.
    """
    
    # Each worker node will be given a name and some tools.
    
    system_prompt_template = PromptTemplate(

                template= system_prompt + """
                ONLY respond to the part of query relevant to your purpose.
                IGNORE tasks you can't complete. 
                Use the following context to answer your query 
                if available: \n {agent_history} \n
                """,
                input_variables=["agent_history"],
            )

    #define system message
    system_message_prompt = SystemMessagePromptTemplate(prompt=system_prompt_template)

    prompt = ChatPromptTemplate.from_messages(
        [system_message_prompt,
            MessagesPlaceholder(variable_name="messages"),
            MessagesPlaceholder(variable_name="agent_scratchpad"),
        ]
    )
    agent = create_tool_calling_agent(llm, tools, prompt)
    executor = AgentExecutor(agent=agent, tools=tools, 
                return_intermediate_steps= True, verbose = False)
    return executor

In [4]:
#define system prompt for tool calling agent
system_prompt = """ You are a mathematical assistant.
        Use your tools to answer questions. If you do not have a tool to
        answer the question, say so. """

math_agent = create_tool_agent(llm=llm, tools = math_toolkit, 
              system_prompt = system_prompt)


#define system prompt for tool calling agent
system_prompt = """ You are a text editor assistant.
        Use your tools to complete requests. If you do not have a tool to
       complete the request, say so. """

text_agent = create_tool_agent(llm=llm, tools = text_toolkit, 
                    system_prompt = system_prompt)

In [5]:
from langchain_core.messages import AIMessage
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.output_parsers import JsonOutputParser

system_prompt_template = PromptTemplate(

      template= """ You are a talkative and helpful assistant that summarises agent history 
                      in response to the original user query below. 
                      SUMMARISE ALL THE OUTPUTS AND TOOLS USED in agent_history.
                      The agent history is as follows: 
                        \n{agent_history}\n""",
                input_variables=["agent_history"],  )

system_message_prompt = SystemMessagePromptTemplate(prompt=system_prompt_template)

prompt = ChatPromptTemplate.from_messages(
    [
        system_message_prompt,
        MessagesPlaceholder(variable_name="messages"),
    ])

comms_agent = (prompt| llm) 


For example, replace imports like: `from langchain_core.pydantic_v1 import BaseModel`
with: `from pydantic import BaseModel`
or the v1 compatibility namespace if you are working in a code base that has not been fully upgraded to pydantic 2 yet. 	from pydantic.v1 import BaseModel

  exec(code_obj, self.user_global_ns, self.user_ns)


In [6]:
from enum import Enum
members = ["Calculator", "TextEditor", "Communicate"]

#create options map for the supervisor output parser.
member_options = {member:member for member in members}

#create Enum object
MemberEnum = Enum('MemberEnum', member_options)

from pydantic import BaseModel

#force Supervisor to pick from options defined above
# return a dictionary specifying the next agent to call 
#under key next.
class SupervisorOutput(BaseModel):
    #defaults to communication agent
    next: MemberEnum = MemberEnum.Communicate

In [7]:
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.output_parsers import JsonOutputParser


system_prompt = (
    """You are a supervisor tasked with managing a conversation between the
    crew of workers:  {members}. Given the following user request, 
    and crew responses respond with the worker to act next.
    Each worker will perform a task and respond with their results and status. 
    When finished with the task, route to communicate to deliver the result to 
    user. Given the conversation and crew history below, who should act next?
    Select one of: {options} 
    \n{format_instructions}\n"""
)
# Our team supervisor is an LLM node. It just picks the next agent to process
# and decides when the work is completed

# Using openai function calling can make output parsing easier for us
supervisor_parser = JsonOutputParser(pydantic_object=SupervisorOutput)

prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system_prompt),
        MessagesPlaceholder(variable_name="messages"),
        MessagesPlaceholder(variable_name="agent_history")
       
    ]
).partial(options=str(members), members=", ".join(members), 
    format_instructions = supervisor_parser.get_format_instructions())


supervisor_chain = (
    prompt | llm |supervisor_parser
)

In [8]:
from langchain_core.messages import AIMessage

# For agents in the crew 
def crew_nodes(state, crew_member, name):
    #read the last message in the message history.
    input = {'messages': [state['messages'][-1]], 
                'agent_history' : state['agent_history']}
    result = crew_member.invoke(input)
    #add response to the agent history.
    return {"agent_history": [AIMessage(content= result["output"], 
              additional_kwargs= {'intermediate_steps' : result['intermediate_steps']}, 
              name=name)]}

def comms_node(state):
    #read the last message in the message history.
    input = {'messages': [state['messages'][-1]],
                     'agent_history' : state['agent_history']}
    result = comms_agent.invoke(input)
    #respond back to the user.
    return {"messages": [result]}

In [9]:
from typing import TypedDict, Sequence
from typing_extensions import Annotated
import operator
from langchain.schema import BaseMessage  # Adjust import if needed

# The agent state is the input to each node in the graph
class AgentState(TypedDict):
    # The annotation tells the graph that new messages will always
    # be added to the current states
    messages: Annotated[Sequence[BaseMessage], operator.add]
    # The 'next' field indicates where to route to next
    next: str 
    # Tracks the history of the agent
    agent_history: Annotated[Sequence[BaseMessage], operator.add]


In [10]:
from langgraph.graph import StateGraph # Adjust based on your setup
from langgraph.constants import END 
import functools

workflow = StateGraph(AgentState)

math_node = functools.partial(crew_nodes, crew_member=math_agent, name="Calculator")

text_node = functools.partial(crew_nodes, crew_member=text_agent, name="TextEditor")



workflow.add_node("TextEditor", text_node)
workflow.add_node("Calculator", math_node)

workflow.add_node("Communicate", comms_node )

workflow.add_node("Supervisor", supervisor_chain)
#set it as entrypoint to the graph.
workflow.set_entry_point("Supervisor")


workflow.add_edge('TextEditor', "Supervisor") 
# add one edge for each of the tool agents
workflow.add_edge('Calculator', "Supervisor") 
# add one edge for each of the tool agents

workflow.add_edge('Communicate', END) 
# end loop at communication agent.


# The supervisor populates the "next" field in the graph state
# which routes to a node or finishes

workflow.add_conditional_edges("Supervisor", lambda x: x["next"], member_options)

graph = workflow.compile()

In [11]:
# Supervisor node
def supervisor_chain(input_data):
    return {'Supervisor': {'next': 'TextEditor'}}


# TextEditor node
def text_node(input_data):
    text = input_data.get("instruction", "").upper()
    return {
    'TextEditor': {
        'agent_history': [
            {
                'content': 'BANANA',
                'additional_kwargs': {
                    'intermediate_steps': [
                        {
                            'tool': 'make_upper_case',
                            'tool_input': {'text': 'banana'},
                            'log': "...log details here...",
                            'tool_call_id': 'unique_id_1',
                        }
                    ]
                }
            }
        ]
    }
}


# Calculator node
def math_node(input_data):
    result = 2 * 2
    return {
    'Calculator': {
        'agent_history': [
            {
                'content': '2*2 is 4',
                'additional_kwargs': {
                    'intermediate_steps': [
                        {
                            'tool': 'multiply',
                            'tool_input': {'a': 2.0, 'b': 2.0},
                            'log': "...log details here...",
                            'tool_call_id': 'unique_id_2',
                        }
                    ]
                }
            }
        ]
    }
}


# Communicate node
def comms_node(input_data):
    return {
    'Communicate': {
        'messages': [
            {
                'content': "The agent used two tools:\n\n1. **make_upper_case:** converted 'banana' to 'BANANA'.\n2. **multiply:** calculated 2*2 = 4.",
                'response_metadata': {
                    'safety_ratings': [...],
                    'usage_metadata': {...}
                }
            }
        ]
    }
}



In [12]:
from langchain.schema import HumanMessage

for s in graph.stream(
    {
        "messages": [
            HumanMessage(content="Hi! can you make the following text all upper case: 'banana' and tell me what 2*2 is?")
        ]
    }
):
    if "__end__" not in s:
        print(s)
        print("----")

{'Supervisor': {'next': 'TextEditor'}}
----
{'TextEditor': {'agent_history': [AIMessage(content="'BANANA' \n\nI am sorry, I cannot answer the question 'what is 2*2?' I do not have the tool to answer math questions. ", additional_kwargs={'intermediate_steps': [(ToolAgentAction(tool='make_upper_case', tool_input={'text': 'banana'}, log="\nInvoking: `make_upper_case` with `{'text': 'banana'}`\n\n\n", message_log=[AIMessageChunk(content='', additional_kwargs={'function_call': {'name': 'make_upper_case', 'arguments': '{"text": "banana"}'}}, response_metadata={'safety_ratings': [{'category': 'HARM_CATEGORY_HATE_SPEECH', 'probability_label': 'NEGLIGIBLE', 'probability_score': 0.2158203125, 'blocked': False, 'severity': 'HARM_SEVERITY_NEGLIGIBLE', 'severity_score': 0.197265625}, {'category': 'HARM_CATEGORY_DANGEROUS_CONTENT', 'probability_label': 'NEGLIGIBLE', 'probability_score': 0.1435546875, 'blocked': False, 'severity': 'HARM_SEVERITY_LOW', 'severity_score': 0.2080078125}, {'category': 'HA