# Swarm

The `Swarm` pattern implements a team coordination approach where agents can dynamically hand off conversations to other agents based on their capabilities and the current context. This enables flexible multi-agent interactions driven by the agents' own decision-making rather than a central coordinator.

### Handoff Process

The swarm uses a handoff mechanism where:
1. Each agent defines which other agents it can hand off to
2. Agents make autonomous decisions about when to hand off
3. The receiving agent takes over the conversation context
4. The process continues until a termination condition is met


## Customer Support Example

![Customer Support](swarm_customer_support.svg)

This system implements a flights refund scenario with two agents:

- **Travel Agent**: Handles general travel and refund coordination
- **Flights Refunder**: Specializes in processing flight refunds with the `refund_flight` tool.

Additionally, we let the user interact with the agents, when agents handoff to the `UserProxyAgent`.

#### Workflow:
1. The **Travel Agent** initiates the conversation and evaluates the user's request.
2. Based on the request:
   - For refund-related tasks, the Travel Agent hands off to the **Flights Refunder**
   - For information needed from the customer, either agent can hand off to the **UserProxyAgent**
3. The **Flights Refunder** processes refunds using the `refund_flight` tool when appropriate
4. After user interaction, control returns to the originating agent via handoff
5. The process continues until the Travel Agent determines the task is complete and terminates the workflow

In [2]:
import asyncio
from typing import Any, Dict, List, Sequence

from autogen_agentchat.agents import AssistantAgent, BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import ChatMessage, HandoffMessage, StopMessage, TextMessage
from autogen_agentchat.task import Console, TextMentionTermination
from autogen_agentchat.teams import Swarm
from autogen_core.base import CancellationToken
from autogen_ext.models import OpenAIChatCompletionClient

### Tools

In [3]:
def refund_flight(flight_id: str) -> str:
    """Refund a flight"""
    return f"Flight {flight_id} refunded"

### User Proxy

In [4]:
class UserProxyAgent(BaseChatAgent):
    def __init__(self, name: str) -> None:
        super().__init__(name, "A human user.")

    @property
    def produced_message_types(self) -> List[type[ChatMessage]]:
        return [TextMessage, StopMessage, HandoffMessage]

    async def on_messages(self, messages: Sequence[ChatMessage], cancellation_token: CancellationToken) -> Response:
        user_input = await asyncio.get_event_loop().run_in_executor(None, input, "User: ")

        # Find the last HandOffMessage targeting the user to determine who to hand off to.
        last_handoff_source = ""
        for message in reversed(messages):
            if isinstance(message, HandoffMessage) and message.target == self.name:
                last_handoff_source = message.source
                break

        text_message = TextMessage(content=user_input, source=self.name)
        handoff_message = HandoffMessage(
            content="Handoff back to agent",
            source=self.name,
            target=last_handoff_source,
        )
        return Response(chat_message=text_message, inner_messages=[handoff_message])

    async def on_reset(self, cancellation_token: CancellationToken) -> None:
        self._last_handoff_source = None


# The name is important since it is used as the source and target
# in HandoffMessage objects to route messages between agents
user_proxy = UserProxyAgent(name="user")

### Agents

In [5]:
model_client = OpenAIChatCompletionClient(
    model="gpt-4o",
    # api_key="YOUR_API_KEY",
)

travel_agent = AssistantAgent(
    "travel_agent",
    model_client=model_client,
    handoffs=["flights_refunder"],
    system_message="""You are a travel agent.
    The flights_refunder is in charge of refunding flights.
    If you need information from the user, you must first send your message, then you can handoff to the user.
    Use TERMINATE when the travel planning is complete.""",
)

flights_refunder = AssistantAgent(
    "flights_refunder",
    model_client=model_client,
    handoffs=["travel_agent", "user"],
    tools=[refund_flight],
    system_message="""You are an agent specialized in refunding flights.
    You only need flight reference numbers to refund a flight.
    You have the ability to refund a flight using the refund_flight tool.
    If you need information from the user, you must first send your message, then you can handoff to the user.
    When the transaction is complete, handoff to the travel agent to finalize.""",
)

In [6]:
termination = TextMentionTermination("TERMINATE")
team = Swarm([travel_agent, flights_refunder, user_proxy], termination_condition=termination)

task = "I need to change my flight."
await Console(team.run_stream(task=task))

---------- user ----------
I need to change my flight.
---------- travel_agent ----------
Could you please provide me with your flight details and what changes you would like to make?
[Prompt tokens: 105, Completion tokens: 19]
---------- travel_agent ----------
[FunctionCall(id='call_js779TOwsCxXr7ZBhDRvVHyY', arguments='{}', name='transfer_to_flights_refunder')]
[Prompt tokens: 130, Completion tokens: 14]
---------- travel_agent ----------
[FunctionExecutionResult(content='Transferred to flights_refunder, adopting the role of flights_refunder immediately.', call_id='call_js779TOwsCxXr7ZBhDRvVHyY')]
---------- travel_agent ----------
Transferred to flights_refunder, adopting the role of flights_refunder immediately.
---------- flights_refunder ----------
I can assist you with refunding your flight. Could you please provide me with your flight reference number?
[Prompt tokens: 216, Completion tokens: 22]
---------- flights_refunder ----------
[FunctionCall(id='call_iBHpGnVLe0ElgPoPTqQu

## Stock Research Example

![Stock Research](swarm_stock_research.svg)


This system is designed to perform stock research tasks by leveraging four agents:

- **Planner**: The central coordinator that delegates specific tasks to specialized agents based on their expertise. The planner ensures that each agent is utilized efficiently and oversees the overall workflow.
- **Financial Analyst**: A specialized agent responsible for analyzing financial metrics and stock data using tools such as `get_stock_data`.
- **News Analyst**: An agent focused on gathering and summarizing recent news articles relevant to the stock, using tools such as `get_news`.
- **Writer**: An agent tasked with compiling the findings from the stock and news analysis into a cohesive final report.

#### Workflow:
1. The **Planner** initiates the research process by delegating tasks to the appropriate agents in a step-by-step manner.
2. Each agent performs its task independently and appends their work to the shared **message thread/history**. Rather than directly returning results to the planner, all agents contribute to and read from this shared message history. When agents generate their work using the LLM, they have access to this shared message history, which provides context and helps track the overall progress of the task.
3. Once an agent completes its task, it hands off control back to the planner.
4. The process continues until the planner determines that all necessary tasks have been completed and decides to terminate the workflow.

### Tools

In [16]:
async def get_stock_data(symbol: str) -> Dict[str, Any]:
    """Get stock market data for a given symbol"""
    return {"price": 180.25, "volume": 1000000, "pe_ratio": 65.4, "market_cap": "700B"}


async def get_news(query: str) -> List[Dict[str, str]]:
    """Get recent news articles about a company"""
    return [
        {
            "title": "Tesla Expands Cybertruck Production",
            "date": "2024-03-20",
            "summary": "Tesla ramps up Cybertruck manufacturing capacity at Gigafactory Texas, aiming to meet strong demand.",
        },
        {
            "title": "Tesla FSD Beta Shows Promise",
            "date": "2024-03-19",
            "summary": "Latest Full Self-Driving beta demonstrates significant improvements in urban navigation and safety features.",
        },
        {
            "title": "Model Y Dominates Global EV Sales",
            "date": "2024-03-18",
            "summary": "Tesla's Model Y becomes best-selling electric vehicle worldwide, capturing significant market share.",
        },
    ]

In [17]:
model_client = OpenAIChatCompletionClient(
    model="gpt-4o",
    # api_key="YOUR_API_KEY",
)

planner = AssistantAgent(
    "planner",
    model_client=model_client,
    handoffs=["financial_analyst", "news_analyst", "writer"],
    system_message="""You are a research planning coordinator.
    Coordinate market research by delegating to specialized agents:
    - Financial Analyst: For stock data analysis
    - News Analyst: For news gathering and analysis
    - Writer: For compiling final report
    Always send your plan first, then handoff to appropriate agent.
    Handoff to a single agent at a time.
    Use TERMINATE when research is complete.""",
)

financial_analyst = AssistantAgent(
    "financial_analyst",
    model_client=model_client,
    handoffs=["planner"],
    tools=[get_stock_data],
    system_message="""You are a financial analyst.
    Analyze stock market data using the get_stock_data tool.
    Provide insights on financial metrics.
    Always handoff back to planner when analysis is complete.""",
)

news_analyst = AssistantAgent(
    "news_analyst",
    model_client=model_client,
    handoffs=["planner"],
    tools=[get_news],
    system_message="""You are a news analyst.
    Gather and analyze relevant news using the get_news tool.
    Summarize key market insights from news.
    Always handoff back to planner when analysis is complete.""",
)

writer = AssistantAgent(
    "writer",
    model_client=model_client,
    handoffs=["planner"],
    system_message="""You are a financial report writer.
    Compile research findings into clear, concise reports.
    Always handoff back to planner when writing is complete.""",
)

In [18]:
# Define termination condition
text_termination = TextMentionTermination("TERMINATE")
termination = text_termination

research_team = Swarm(
    participants=[planner, financial_analyst, news_analyst, writer], termination_condition=termination
)

task = "Conduct market research for TSLA stock"
await Console(research_team.run_stream(task=task))

---------- user ----------
Conduct market research for TSLA stock
---------- planner ----------
[FunctionCall(id='call_IXFe9RcGbYGNf0V7B2hvDNJI', arguments='{}', name='transfer_to_financial_analyst')]
[Prompt tokens: 168, Completion tokens: 149]
---------- planner ----------
[FunctionExecutionResult(content='Transferred to financial_analyst, adopting the role of financial_analyst immediately.', call_id='call_IXFe9RcGbYGNf0V7B2hvDNJI')]
---------- planner ----------
Transferred to financial_analyst, adopting the role of financial_analyst immediately.
---------- financial_analyst ----------
[FunctionCall(id='call_2IYcTAXiufX1SBmnMJOG9HPq', arguments='{"symbol":"TSLA"}', name='get_stock_data')]
[Prompt tokens: 136, Completion tokens: 16]
---------- financial_analyst ----------
[FunctionExecutionResult(content="{'price': 180.25, 'volume': 1000000, 'pe_ratio': 65.4, 'market_cap': '700B'}", call_id='call_2IYcTAXiufX1SBmnMJOG9HPq')]
---------- financial_analyst ----------
Here's the market re