### Create Agent Swarm

In [None]:
from dotenv import load_dotenv

load_dotenv()

In [2]:
from swarm import Swarm, Agent

client = Swarm()

def get_weather(city: str):
    return f"The weather in {city} is always 30°C."

def get_air_quality(city: str):
    return f"The air quality in {city} is 'Good' with an AQI of 42."

def route_request(context_variables):
    user_message = context_variables.get("last_user_message", "")
    if "weather" in user_message.lower():
        return weather_agent
    elif "air quality" in user_message.lower():
        return air_quality_agent
    else:
        return "I'm sorry, I don't understand your request."

weather_agent = Agent(
    name="Weather Agent",
    instructions="You are a weather assistant. Provide the weather information when asked.",
    functions=[get_weather]
)

air_quality_agent = Agent(
    name="Air Quality Agent",
    instructions="You are an air quality assistant. Provide the air quality information when asked.",
    functions=[get_air_quality]
)

supervisor_agent = Agent(
    name="Supervisor Agent",
    instructions="You are the supervisor. Determine whether the user's request is about weather or air quality, and transfer them to the appropriate agent by calling the 'route_request' function.",
    functions=[route_request]
)

In [None]:
user_message = "What's the weather like in Paris?"
context_variables = {"last_user_message": user_message}

response = client.run(
    agent=supervisor_agent,
    messages=[{"role": "user", "content": user_message}],
    context_variables=context_variables
)

print(response.messages[-1]["content"])

In [None]:
user_message = "How's the air quality in Tokyo?"
context_variables = {"last_user_message": user_message}

response = client.run(
    agent=supervisor_agent,
    messages=[{"role": "user", "content": user_message}],
    context_variables=context_variables
)

print(response.messages[-1]["content"])

### Same with LangGraph

In [5]:
from typing import TypedDict, List
from pydantic import BaseModel, Field
from langchain_openai import ChatOpenAI  # Corrected import
from langchain.schema import HumanMessage, AIMessage, SystemMessage, BaseMessage
from langchain.prompts import ChatPromptTemplate
from langchain.tools import tool
from langgraph.graph import StateGraph, START, END

# Define the tools using the @tool decorator
@tool
def get_weather(city: str) -> str:
    """Get the weather information for a given city."""
    return f"The weather in {city} is always 30°C."

@tool
def get_air_quality(city: str) -> str:
    """Get the air quality information for a given city."""
    return f"The air quality in {city} is 'Good' with an AQI of 42."

@tool
def default_answer() -> str:
    """Provide a default response when unable to answer."""
    return "I'm sorry, I can't answer that."

In [6]:
from typing import Annotated
from langgraph.graph.message import add_messages

class AgentState(TypedDict):
    messages: Annotated[List[BaseMessage], add_messages]
    on_topic: str
    classification: str
    system_message: SystemMessage


In [7]:
get_weather_tools = [get_weather]
get_air_quality_tools = [get_air_quality]
default_answer_tools = [default_answer]

In [8]:
supervisor_llm = ChatOpenAI(temperature=0)
weather_llm = ChatOpenAI(temperature=0).bind_tools(get_weather_tools)
air_quality_llm = ChatOpenAI(temperature=0).bind_tools(get_air_quality_tools)
off_topic_llm = ChatOpenAI(temperature=0).bind_tools(default_answer_tools)

In [9]:
from langchain.schema import AIMessage, SystemMessage
from typing import Dict, Any


# Function to invoke the weather model
def call_weather_model(state: AgentState) -> Dict[str, Any]:
    messages = state["messages"]
    system_message = SystemMessage(
        content="You are WeatherBot. Answer the user's weather-related questions only in French."
    )
    conversation = [system_message] + messages
    print("Weather conversation:", conversation)  # Debug statement
    response = weather_llm.invoke(conversation)
    print("Weather response:", response)  # Debug statement
    return {"messages": state["messages"] + [response]}  # Return the updated messages

# Function to invoke the air quality model
def call_air_quality_model(state: AgentState) -> Dict[str, Any]:
    messages = state["messages"]
    system_message = SystemMessage(
        content="You are AirQualityBot. Provide air quality information in a very formal and polite manner."
    )
    conversation = [system_message] + messages
    print("Air Quality conversation:", conversation)  # Debug statement
    response = air_quality_llm.invoke(conversation)
    print("Air Quality response:", response)  # Debug statement
    return {"messages": state["messages"] + [response]}  # Return the updated messages

# Function to invoke the off-topic model
def call_off_topic_model(state: AgentState) -> Dict[str, Any]:
    messages = state["messages"]
    system_message = SystemMessage(
        content="You are OffTopicBot. Apologize to the user and explain that you cannot help with their request, but do so in a friendly tone."
    )
    conversation = [system_message] + messages
    print("Off-Topic conversation:", conversation)  # Debug statement
    response = off_topic_llm.invoke(conversation)
    print("Off-Topic response:", response)  # Debug statement
    return {"messages": state["messages"] + [response]}  # Return the updated messages


In [10]:
def should_continue_weather(state: AgentState):
    messages = state["messages"]
    last_message = messages[-1]
    if hasattr(last_message, 'tool_calls') and last_message.tool_calls:
        return "weather_tools"
    return END

def should_continue_air_quality(state: AgentState):
    messages = state["messages"]
    last_message = messages[-1]
    if hasattr(last_message, 'tool_calls') and last_message.tool_calls:
        return "air_quality_tools"
    return END

def should_continue_off_topic(state: AgentState):
    messages = state["messages"]
    last_message = messages[-1]
    if hasattr(last_message, 'tool_calls') and last_message.tool_calls:
        return "off_topic_tools"
    return END


In [11]:
from pydantic import BaseModel, Field
from langchain.prompts import ChatPromptTemplate

# Define the model for the supervisor's decision
class SupervisorDecision(BaseModel):
    """Decision made by the supervisor agent."""
    classification: str = Field(description="Classify the message as 'weather', 'air_quality', or 'other'")

# Function to invoke the supervisor model
def call_supervisor_model(state: AgentState) -> AgentState:

    print(state)

    messages = state['messages']
    last_message = messages[-1].content if messages else ''

    # Define the system prompt
    system_prompt = """You are a supervisor agent that decides whether the user's message is on topic and classifies it.

    Analyze the user's message and decide:

    Classify it as 'weather', 'air_quality', or 'other' if on topic.

    Provide your decision in the following structured format:
        "classification": "weather" or "air_quality" or "other"
    """

    # Create the prompt template with system and user message
    prompt = ChatPromptTemplate.from_messages(
        [("system", system_prompt),
         ("human", "User Message:\n\n{user_message}")]
    )

    # LLM with structured output for classification
    structured_supervisor_llm = supervisor_llm.with_structured_output(SupervisorDecision)
    evaluator = prompt | structured_supervisor_llm

    # Invoke the LLM with the last user message
    result = evaluator.invoke({"user_message": last_message})

    # Store classification result in state
    classification = result.classification
    state['classification'] = classification

    return state


In [12]:
from langgraph.prebuilt import ToolNode

weather_tool_node = ToolNode(tools=[get_weather])
air_quality_tool_node = ToolNode(tools=[get_air_quality])
off_topic_tool_node = ToolNode(tools=[default_answer])

In [13]:
def supervisor_router(state: AgentState) -> str:
    classification = state.get('classification', '')
    if classification == 'weather':
        return 'weather_model'
    elif classification == 'air_quality':
        return 'air_quality_model'
    else:
        return 'off_topic_model'


In [14]:
def should_continue_weather(state: AgentState):
    messages = state["messages"]
    last_message = messages[-1]
    if hasattr(last_message, 'tool_calls') and last_message.tool_calls:
        return "weather_tools"
    return END

def should_continue_air_quality(state: AgentState):
    messages = state["messages"]
    last_message = messages[-1]
    if hasattr(last_message, 'tool_calls') and last_message.tool_calls:
        return "air_quality_tools"
    return END

def should_continue_off_topic(state: AgentState):
    messages = state["messages"]
    last_message = messages[-1]
    if hasattr(last_message, 'tool_calls') and last_message.tool_calls:
        return "off_topic_tools"
    return END


In [21]:
# Initialize the graph
workflow = StateGraph(AgentState)

# Supervisor node
workflow.add_node("supervisor_agent", call_supervisor_model)

# Weather agent nodes
workflow.add_node("weather_model", call_weather_model)
workflow.add_node("weather_tools", weather_tool_node)

# Air quality agent nodes
workflow.add_node("air_quality_model", call_air_quality_model)
workflow.add_node("air_quality_tools", air_quality_tool_node)

# Off-topic agent nodes
workflow.add_node("off_topic_model", call_off_topic_model)
workflow.add_node("off_topic_tools", off_topic_tool_node)

# Edges
workflow.add_edge(START, "supervisor_agent")
workflow.add_conditional_edges(
    "supervisor_agent", supervisor_router,
    ["weather_model", "air_quality_model", "off_topic_model"]
)

# Weather agent workflow
workflow.add_conditional_edges("weather_model", should_continue_weather, ["weather_tools", END])
workflow.add_edge("weather_tools", "weather_model")

# Air quality agent workflow
workflow.add_conditional_edges("air_quality_model", should_continue_air_quality, ["air_quality_tools", END])
workflow.add_edge("air_quality_tools", "air_quality_model")

# Off-topic agent workflow
workflow.add_conditional_edges("off_topic_model", should_continue_off_topic, ["off_topic_tools", END])
workflow.add_edge("off_topic_tools", "off_topic_model")

# Compile the workflow
app = workflow.compile()


In [None]:
from IPython.display import Image, display

try:
    display(Image(app.get_graph().draw_mermaid_png()))
except Exception:
    pass

In [None]:
from langchain.schema import HumanMessage

app.invoke({"messages": [HumanMessage(content="How is the weather in Munich?")]})