In [10]:
pip install -U "autogen-agentchat"

Note: you may need to restart the kernel to use updated packages.


In [11]:
pip install "autogen-ext[openai]"

Note: you may need to restart the kernel to use updated packages.


In [4]:
from dotenv import load_dotenv
import os
load_dotenv()

True

# Single agent

In [1]:
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient

In [5]:
# Define a model client. You can use other model client that implements
# the `ChatCompletionClient` interface.
model_client = OpenAIChatCompletionClient(
    model="gpt-4o",
    api_key=os.environ['OPENAI_API_KEY'],
)

In [7]:
# Define a simple function tool that the agent can use.
# For this example, we use a fake weather tool for demonstration purposes.
async def get_weather(city: str) -> str:
    """Get the weather for a given city."""
    return f"The weather in {city} is 73 degrees and Sunny."

In [8]:
# Define an AssistantAgent with the model, tool, system message, and reflection enabled.
# The system message instructs the agent via natural language.
agent = AssistantAgent(
    name="weather_agent",
    model_client=model_client,
    tools=[get_weather],
    system_message="You are a helpful assistant.",
    reflect_on_tool_use=True,
    model_client_stream=True,  # Enable streaming tokens from the model client.
)

In [10]:
# Run the agent and stream the messages to the console.
async def main() -> None:
    await Console(agent.run_stream(task="What is the weather in New York?"))
    # Close the connection to the model client.
    await model_client.close()

In [11]:
await main()

---------- TextMessage (user) ----------
What is the weather in New York?
---------- ToolCallRequestEvent (weather_agent) ----------
[FunctionCall(id='call_hy9ltCSLcqMJNADP4LGjWbVJ', arguments='{"city":"New York"}', name='get_weather')]
---------- ToolCallExecutionEvent (weather_agent) ----------
[FunctionExecutionResult(content='The weather in New York is 73 degrees and Sunny.', name='get_weather', call_id='call_hy9ltCSLcqMJNADP4LGjWbVJ', is_error=False)]
---------- ModelClientStreamingChunkEvent (weather_agent) ----------
The current weather in New York is 73 degrees Fahrenheit and sunny.


# Team (basic usage)

In [12]:
import asyncio

from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.base import TaskResult
from autogen_agentchat.conditions import ExternalTermination, TextMentionTermination
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.ui import Console
from autogen_core import CancellationToken
from autogen_ext.models.openai import OpenAIChatCompletionClient

# Create an OpenAI model client.
model_client = OpenAIChatCompletionClient(
    model="gpt-4o-2024-08-06",
    api_key=os.environ['OPENAI_API_KEY']
)

# Create the primary agent.
primary_agent = AssistantAgent(
    "primary",
    model_client=model_client,
    system_message="You are a helpful AI assistant.",
)

# Create the critic agent.
critic_agent = AssistantAgent(
    "critic",
    model_client=model_client,
    system_message="Provide constructive feedback. Respond with 'APPROVE' to when your feedbacks are addressed.",
)

# Define a termination condition that stops the task if the critic approves.
text_termination = TextMentionTermination("APPROVE")

# Create a team with the primary and critic agents.
team = RoundRobinGroupChat([primary_agent, critic_agent], termination_condition=text_termination)

In [13]:
# Use `asyncio.run(...)` when running in a script.
result = await team.run(task="Write a short poem about the fall season.")
print(result)

messages=[TextMessage(source='user', models_usage=None, metadata={}, content='Write a short poem about the fall season.', type='TextMessage'), TextMessage(source='primary', models_usage=RequestUsage(prompt_tokens=28, completion_tokens=112), metadata={}, content="Leaves of amber gently fall,  \nIn the whisper of the autumn call.  \nA golden quilt on paths is spread,  \nAs nature paints in hues of red.  \n\nCrisp air dances on the breeze,  \nStirring thoughts with simple ease.  \nPumpkin patches, fields of maize,  \nHarvest moons in misty haze.  \n\nSweaters warm in the cool embrace,  \nNostalgia found in each changed place.  \nIn fall's brief song, the heart finds peace,  \nIn the slowing rhythm, a sweet release.  ", type='TextMessage'), TextMessage(source='critic', models_usage=RequestUsage(prompt_tokens=157, completion_tokens=167), metadata={}, content='Your poem beautifully captures the essence of the fall season. The imagery of "amber leaves" and "golden quilt" vividly portrays the 

In [15]:
result.messages

[TextMessage(source='user', models_usage=None, metadata={}, content='Write a short poem about the fall season.', type='TextMessage'),
 TextMessage(source='primary', models_usage=RequestUsage(prompt_tokens=28, completion_tokens=112), metadata={}, content="Leaves of amber gently fall,  \nIn the whisper of the autumn call.  \nA golden quilt on paths is spread,  \nAs nature paints in hues of red.  \n\nCrisp air dances on the breeze,  \nStirring thoughts with simple ease.  \nPumpkin patches, fields of maize,  \nHarvest moons in misty haze.  \n\nSweaters warm in the cool embrace,  \nNostalgia found in each changed place.  \nIn fall's brief song, the heart finds peace,  \nIn the slowing rhythm, a sweet release.  ", type='TextMessage'),
 TextMessage(source='critic', models_usage=RequestUsage(prompt_tokens=157, completion_tokens=167), metadata={}, content='Your poem beautifully captures the essence of the fall season. The imagery of "amber leaves" and "golden quilt" vividly portrays the autumn 

In [16]:
# When running inside a script, use a async main function and call it from `asyncio.run(...)`.
await team.reset()  # Reset the team for a new task.
async for message in team.run_stream(task="Write a short poem about the winter season."):  # type: ignore
    if isinstance(message, TaskResult):
        print("Stop Reason:", message.stop_reason)
    else:
        print(message)

source='user' models_usage=None metadata={} content='Write a short poem about the winter season.' type='TextMessage'
source='primary' models_usage=RequestUsage(prompt_tokens=28, completion_tokens=131) metadata={} content="In a cloak of white, the world transforms,  \nSilent whispers in the icy breeze,  \nGlimmers of frost on the bare tree forms,  \nA winter's canvas, sweeping with ease.  \n\nBlankets of snow drape the earth’s old face,  \nCrystals glisten, lit by a pale sun,  \nIn the quietude, a gentle grace,  \nWhere time slows down, and the day's begun.  \n\nFootprints trace tales in the powdery white,  \nUnderneath the stars’ soft, twinkling gleam,  \nIn winter's embrace, a tranquil night,  \nWhere nature dreams her frosted dream.  " type='TextMessage'
source='critic' models_usage=RequestUsage(prompt_tokens=177, completion_tokens=200) metadata={} content="Your poem beautifully captures the serene and magical essence of the winter season. The imagery is vivid, creating a picturesque

In [17]:
result.stop_reason

"Text 'APPROVE' mentioned"

In [18]:
await team.reset()  # Reset the team for a new task.
await Console(team.run_stream(task="Write a short poem about the summer season."))  # Stream the messages to the console.

---------- TextMessage (user) ----------
Write a short poem about the summer season.
---------- TextMessage (primary) ----------
Golden rays kiss the earth with grace,  
Under a vast, unclouded sky;  
A gentle breeze in a sunlit embrace,  
Whispers through fields where wildflowers lie.  

Laughter echoes in the open air,  
As waves dance upon the shore;  
Days are long and free from care,  
Where sun-drenched hearts begin to soar.  

The world is alive with vibrant hue,  
In the warmth of the summer's glow;  
Nature's canvas, ever anew,  
A fleeting beauty we come to know.  
---------- TextMessage (critic) ----------
Your poem beautifully captures the essence of summer with vivid imagery and evocative language. Here are a few suggestions that might enhance it even further:

1. Consider adding a metaphor or simile to deepen the imagery. For example, comparing the sun to a specific golden object could enrich the description.
   
2. In the second stanza, consider expanding on how laughter

TaskResult(messages=[TextMessage(source='user', models_usage=None, metadata={}, content='Write a short poem about the summer season.', type='TextMessage'), TextMessage(source='primary', models_usage=RequestUsage(prompt_tokens=28, completion_tokens=113), metadata={}, content="Golden rays kiss the earth with grace,  \nUnder a vast, unclouded sky;  \nA gentle breeze in a sunlit embrace,  \nWhispers through fields where wildflowers lie.  \n\nLaughter echoes in the open air,  \nAs waves dance upon the shore;  \nDays are long and free from care,  \nWhere sun-drenched hearts begin to soar.  \n\nThe world is alive with vibrant hue,  \nIn the warmth of the summer's glow;  \nNature's canvas, ever anew,  \nA fleeting beauty we come to know.  ", type='TextMessage'), TextMessage(source='critic', models_usage=RequestUsage(prompt_tokens=158, completion_tokens=190), metadata={}, content="Your poem beautifully captures the essence of summer with vivid imagery and evocative language. Here are a few sugg

There is no summary as opposed to an example in the tutorial.
```
---------- Summary ----------
Number of messages: 5
Finish reason: Text 'APPROVE' mentioned
Total prompt tokens: 972
Total completion tokens: 455
Duration: 11.78 seconds

In [19]:
# Create a new team with an external termination condition.
external_termination = ExternalTermination()
team = RoundRobinGroupChat(
    [primary_agent, critic_agent],
    termination_condition=external_termination | text_termination,  # Use the bitwise OR operator to combine conditions.
)

# Run the team in a background task.
run = asyncio.create_task(Console(team.run_stream(task="Write a short poem about the spring season.")))

# Wait for some time.
await asyncio.sleep(0.1)

# Stop the team.
external_termination.set()

# Wait for the team to finish.
await run

---------- TextMessage (user) ----------
Write a short poem about the spring season.
---------- TextMessage (primary) ----------
Blossoms whisper in the warming breeze,  
As tender buds begin to wake;  
The world awash with vibrant ease,  
In hues that only spring can make.  

The symphony of life takes flight,  
With birds that sing in joyful cheer;  
Days stretch longer, bathed in light,  
As hope renews with each dawn near.  

Gentle rains kiss the earth anew,  
Awakening dreams in hidden seeds;  
A tapestry of life and hue,  
In spring's embrace, the heart concedes.  


TaskResult(messages=[TextMessage(source='user', models_usage=None, metadata={}, content='Write a short poem about the spring season.', type='TextMessage'), TextMessage(source='primary', models_usage=RequestUsage(prompt_tokens=530, completion_tokens=111), metadata={}, content="Blossoms whisper in the warming breeze,  \nAs tender buds begin to wake;  \nThe world awash with vibrant ease,  \nIn hues that only spring can make.  \n\nThe symphony of life takes flight,  \nWith birds that sing in joyful cheer;  \nDays stretch longer, bathed in light,  \nAs hope renews with each dawn near.  \n\nGentle rains kiss the earth anew,  \nAwakening dreams in hidden seeds;  \nA tapestry of life and hue,  \nIn spring's embrace, the heart concedes.  ", type='TextMessage')], stop_reason='External termination requested')

In [20]:
# Create a cancellation token.
cancellation_token = CancellationToken()

# Use another coroutine to run the team.
run = asyncio.create_task(
    team.run(
        task="Translate the poem to Spanish.",
        cancellation_token=cancellation_token,
    )
)

# Cancel the run.
cancellation_token.cancel()

try:
    result = await run  # This will raise a CancelledError.
except asyncio.CancelledError:
    print("Task was cancelled.")

Task was cancelled.


## Single agent team

In [21]:
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import TextMessageTermination
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient

model_client = OpenAIChatCompletionClient(
    model="gpt-4o",
    # api_key="sk-...", # Optional if you have an OPENAI_API_KEY env variable set.
    # Disable parallel tool calls for this example.
    parallel_tool_calls=False,  # type: ignore
)


# Create a tool for incrementing a number.
def increment_number(number: int) -> int:
    """Increment a number by 1."""
    return number + 1


# Create a tool agent that uses the increment_number function.
looped_assistant = AssistantAgent(
    "looped_assistant",
    model_client=model_client,
    tools=[increment_number],  # Register the tool.
    system_message="You are a helpful AI assistant, use the tool to increment the number.",
)

# Termination condition that stops the task if the agent responds with a text message.
termination_condition = TextMessageTermination("looped_assistant")

# Create a team with the looped assistant agent and the termination condition.
team = RoundRobinGroupChat(
    [looped_assistant],
    termination_condition=termination_condition,
)

# Run the team with a task and print the messages to the console.
async for message in team.run_stream(task="Increment the number 5 to 10."):  # type: ignore
    print(type(message).__name__, message)

await model_client.close()


TextMessage source='user' models_usage=None metadata={} content='Increment the number 5 to 10.' type='TextMessage'
ToolCallRequestEvent source='looped_assistant' models_usage=RequestUsage(prompt_tokens=75, completion_tokens=15) metadata={} content=[FunctionCall(id='call_Xqx398CD0LUl6OAxtUYrdu8h', arguments='{"number":5}', name='increment_number')] type='ToolCallRequestEvent'
ToolCallExecutionEvent source='looped_assistant' models_usage=None metadata={} content=[FunctionExecutionResult(content='6', name='increment_number', call_id='call_Xqx398CD0LUl6OAxtUYrdu8h', is_error=False)] type='ToolCallExecutionEvent'
ToolCallSummaryMessage source='looped_assistant' models_usage=None metadata={} content='6' type='ToolCallSummaryMessage'
ToolCallRequestEvent source='looped_assistant' models_usage=RequestUsage(prompt_tokens=98, completion_tokens=15) metadata={} content=[FunctionCall(id='call_Zgily0MIksJxoOMghtBic8qg', arguments='{"number":6}', name='increment_number')] type='ToolCallRequestEvent'


# Swarm

Similar to **SelectorGroupChat** and **RoundRobinGroupChat**, participant agents broadcast their responses so all agents share the same message context.

Different from the other two group chat teams, at each turn, the speaker agent is selected based on the most recent **HandoffMessage** message in the context. This requires each agent in the team to be able to generate HandoffMessage to signal which other agents that it hands off to.

For AssistantAgent, you can set the *handoffs* argument to specify which agents it can hand off to. You can use **Handoff** to customize the message content and handoff behavior.

**HandoffMessage message** - a message requesting handoff of a conversation to another agent (сообщение с просьбой передать разговор другому агенту).

The overall process can be summarized as follows:

1. Each agent has the ability to generate HandoffMessage to signal which other agents it can hand off to. For AssistantAgent, this means setting the handoffs argument.

2. When the team starts on a task, the first speaker agents operate on the task and make localized decision about whether to hand off and to whom.

3. When an agent generates a HandoffMessage, the receiving agent takes over the task with the same message context.

4. The process continues until a termination condition is met.

The AssistantAgent uses the tool calling capability of the model to generate handoffs. This means that the model must support tool calling. **If the model does parallel tool calling, multiple handoffs may be generated at the same time. This can lead to unexpected behavior.** To avoid this, you can disable parallel tool calling by configuring the model client. For OpenAIChatCompletionClient and AzureOpenAIChatCompletionClient, you can set parallel_tool_calls=False in the configuration.

## Customer Support Example

![Example Image](../data/flight_refund.png)

This system implements a flights refund scenario with two agents:

* **Travel Agent**: Handles general travel and refund coordination.

* **Flights Refunder**: Specializes in processing flight refunds with the refund_flight tool.

Additionally, we let the user interact with the agents, when agents handoff to "user".

**Workflow:**

1. The Travel Agent initiates the conversation and evaluates the user’s request.

2. Based on the request:

    * For refund-related tasks, the Travel Agent hands off to the Flights Refunder.

    * For information needed from the customer, either agent can hand off to the "user".

3. The Flights Refunder processes refunds using the refund_flight tool when appropriate.

4. If an agent hands off to the "user", the team execution will stop and wait for the user to input a response.

5. When the user provides input, it’s sent back to the team as a HandoffMessage. This message is directed to the agent that originally requested user input.

6. The process continues until the Travel Agent determines the task is complete and terminates the workflow.

In [1]:
from typing import Any, Dict, List

from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import HandoffTermination, TextMentionTermination
from autogen_agentchat.messages import HandoffMessage
from autogen_agentchat.teams import Swarm
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient

**Tool:**

In [2]:
def refund_flight(flight_id: str) -> str:
    """Refund a flight"""
    return f"Flight {flight_id} refunded"

**Agents:**

In [5]:
model_client = OpenAIChatCompletionClient(
    model="gpt-4o",
    api_key=os.environ['OPENAI_API_KEY'],
)

travel_agent = AssistantAgent(
    "travel_agent",
    model_client=model_client,
    handoffs=["flights_refunder", "user"],
    system_message="""You are a travel agent.
    The flights_refunder is in charge of refunding flights.
    If you need information from the user, you must first send your message, then you can handoff to the user.
    Use TERMINATE when the travel planning is complete.""",
)

flights_refunder = AssistantAgent(
    "flights_refunder",
    model_client=model_client,
    handoffs=["travel_agent", "user"],
    tools=[refund_flight],
    system_message="""You are an agent specialized in refunding flights.
    You only need flight reference numbers to refund a flight.
    You have the ability to refund a flight using the refund_flight tool.
    If you need information from the user, you must first send your message, then you can handoff to the user.
    When the transaction is complete, handoff to the travel agent to finalize.""",
)

In [6]:
termination = HandoffTermination(target="user") | TextMentionTermination("TERMINATE")
team = Swarm([travel_agent, flights_refunder], termination_condition=termination)

**Running a team:**

In [7]:
task = "I need to refund my flight."


async def run_team_stream() -> None:
    task_result = await Console(team.run_stream(task=task))
    last_message = task_result.messages[-1]

    while isinstance(last_message, HandoffMessage) and last_message.target == "user":
        user_message = input("User: ")

        task_result = await Console(
            team.run_stream(task=HandoffMessage(source="user", target=last_message.source, content=user_message))
        )
        last_message = task_result.messages[-1]


# Use asyncio.run(...) if you are running this in a script.
await run_team_stream()
await model_client.close()

---------- TextMessage (user) ----------
I need to refund my flight.
---------- TextMessage (travel_agent) ----------
I can help with that. Could you please provide me with some details about your flight, such as the booking reference number and the reason for the refund? Once I have that information, I can transfer you to the flights refunder.
---------- ToolCallRequestEvent (travel_agent) ----------
[FunctionCall(id='call_b6yXuoXdvnqJ4XPKpqUv67m8', arguments='{}', name='transfer_to_user')]
---------- ToolCallExecutionEvent (travel_agent) ----------
[FunctionExecutionResult(content='Transferred to user, adopting the role of user immediately.', name='transfer_to_user', call_id='call_b6yXuoXdvnqJ4XPKpqUv67m8', is_error=False)]
---------- HandoffMessage (travel_agent) ----------
Transferred to user, adopting the role of user immediately.
---------- HandoffMessage (user) ----------
12345
---------- ToolCallRequestEvent (travel_agent) ----------
[FunctionCall(id='call_2x3zmpth0xN3PfVhiz0Kz

## Stock Research Example

![Example Image](../data/stock_research.png)

This system is designed to perform stock research tasks by leveraging four agents:

* **Planner**: The central coordinator that delegates specific tasks to specialized agents based on their expertise. The planner ensures that each agent is utilized efficiently and oversees the overall workflow.

* **Financial Analyst**: A specialized agent responsible for analyzing financial metrics and stock data using tools such as *get_stock_data*.

* **News Analyst**: An agent focused on gathering and summarizing recent news articles relevant to the stock, using tools such as *get_news*.

* **Writer**: An agent tasked with compiling the findings from the stock and news analysis into a cohesive final report.

**Workflow:**
1. The Planner initiates the research process by delegating tasks to the appropriate agents in a step-by-step manner.

2. Each agent performs its task independently and appends their work to the **shared message thread/history**. Rather than directly returning results to the planner, all agents contribute to and read from this shared message history. When agents generate their work using the LLM, they have access to this shared message history, which provides context and helps track the overall progress of the task.

3. Once an agent completes its task, it hands off control back to the planner.

4. The process continues until the planner determines that all necessary tasks have been completed and decides to terminate the workflow.

**Tools:**

In [8]:
async def get_stock_data(symbol: str) -> Dict[str, Any]:
    """Get stock market data for a given symbol"""
    return {"price": 180.25, "volume": 1000000, "pe_ratio": 65.4, "market_cap": "700B"}


async def get_news(query: str) -> List[Dict[str, str]]:
    """Get recent news articles about a company"""
    return [
        {
            "title": "Tesla Expands Cybertruck Production",
            "date": "2024-03-20",
            "summary": "Tesla ramps up Cybertruck manufacturing capacity at Gigafactory Texas, aiming to meet strong demand.",
        },
        {
            "title": "Tesla FSD Beta Shows Promise",
            "date": "2024-03-19",
            "summary": "Latest Full Self-Driving beta demonstrates significant improvements in urban navigation and safety features.",
        },
        {
            "title": "Model Y Dominates Global EV Sales",
            "date": "2024-03-18",
            "summary": "Tesla's Model Y becomes best-selling electric vehicle worldwide, capturing significant market share.",
        },
    ]

**Agents:**

In [9]:
model_client = OpenAIChatCompletionClient(
    model="gpt-4o",
    api_key=os.environ['OPENAI_API_KEY'],
)

planner = AssistantAgent(
    "planner",
    model_client=model_client,
    handoffs=["financial_analyst", "news_analyst", "writer"],
    system_message="""You are a research planning coordinator.
    Coordinate market research by delegating to specialized agents:
    - Financial Analyst: For stock data analysis
    - News Analyst: For news gathering and analysis
    - Writer: For compiling final report
    Always send your plan first, then handoff to appropriate agent.
    Always handoff to a single agent at a time.
    Use TERMINATE when research is complete.""",
)

financial_analyst = AssistantAgent(
    "financial_analyst",
    model_client=model_client,
    handoffs=["planner"],
    tools=[get_stock_data],
    system_message="""You are a financial analyst.
    Analyze stock market data using the get_stock_data tool.
    Provide insights on financial metrics.
    Always handoff back to planner when analysis is complete.""",
)

news_analyst = AssistantAgent(
    "news_analyst",
    model_client=model_client,
    handoffs=["planner"],
    tools=[get_news],
    system_message="""You are a news analyst.
    Gather and analyze relevant news using the get_news tool.
    Summarize key market insights from news.
    Always handoff back to planner when analysis is complete.""",
)

writer = AssistantAgent(
    "writer",
    model_client=model_client,
    handoffs=["planner"],
    system_message="""You are a financial report writer.
    Compile research findings into clear, concise reports.
    Always handoff back to planner when writing is complete.""",
)

In [10]:
# Define termination condition
text_termination = TextMentionTermination("TERMINATE")
termination = text_termination

research_team = Swarm(
    participants=[planner, financial_analyst, news_analyst, writer], termination_condition=termination
)

task = "Conduct market research for TSLA stock"
await Console(research_team.run_stream(task=task))
await model_client.close()

---------- TextMessage (user) ----------
Conduct market research for TSLA stock
---------- ThoughtEvent (planner) ----------
Here's the plan for conducting market research on TSLA (Tesla) stock:

1. **Financial Analysis**: Gather and analyze stock data for Tesla. This includes looking into recent stock performance, trends, and other relevant financial metrics.

2. **News Analysis**: Gather and assess the latest news related to Tesla. This includes recent developments, industry news, regulatory updates, and any other information that could impact Tesla's stock performance.

3. **Report Compilation**: Compile the insights from the financial and news analysis into a coherent market research report.

Let's begin by handing off the task to the Financial Analyst for stock data analysis.
---------- ToolCallRequestEvent (planner) ----------
[FunctionCall(id='call_W3rexbpUFxCSxvSFvEaaNVNe', arguments='{}', name='transfer_to_financial_analyst')]
---------- ToolCallExecutionEvent (planner) ------