## Multi-Agent GroupChat
In the previous four notebooks, we have explored and learn how to
- create `Assistant` agent to generate hello world responses.
- create `WeatherAssistant` agent to provide weather information using the tool call.
- create `ImageAssistant` agent to provide information of the image using multi-modal model.
- Introduce human user in the conversation.

In this notebook, we will create a group chat that hosts all the agents we created in the previous examples, orchestrated by a `GroupChatManager` agent which will manage the conversation flow based on the conversation context.

### The topology of the agent group.

The group chat will have a star topology where the `GroupChatManager` agent is the central agent that interacts with all the other agents. The `GroupChatManager` agent will interact with the `Assistant`, `WeatherAssistant`, `ImageAssistant`, and `HumanUser` agents. But the `Assistant`, `WeatherAssistant`, and `ImageAssistant` agents will not interact with each other directly.
```mermaid
graph TD
    A[GroupChatManager] <--> B[Assistant]
    A[GroupChatManager] <--> C[WeatherAssistant]
    A[GroupChatManager] <--> D[ImageAssistant]
    A[GroupChatManager] <--> E[HumanUser]
```


In [1]:
from dataclasses import dataclass, field
from typing import List, Optional
import base64
from io import BytesIO
from autogen_core.base import MessageContext, AgentId, AgentInstantiationContext, TopicId
from autogen_core.components import DefaultTopicId, RoutedAgent, default_subscription, message_handler, TypeSubscription
from autogen_core.components.code_executor import CodeExecutor, extract_markdown_code_blocks
from autogen_core.components.models import (
    AssistantMessage,
    ChatCompletionClient,
    LLMMessage,
    SystemMessage,
    UserMessage,
)
import datetime
import tempfile
from autogen_core.components.tool_agent import ToolAgent, tool_agent_caller_loop
from autogen_core.application import SingleThreadedAgentRuntime
from autogen_core.components.models import OpenAIChatCompletionClient, FunctionExecutionResultMessage, FunctionExecutionResult
import random
import json
from autogen_core.base import CancellationToken
from autogen_core.components.tools import FunctionTool, ToolSchema
from autogen_core.components._image import Image
from typing_extensions import Annotated
from pydantic import BaseModel
from PIL import Image as PILImage
import os
import requests
import uuid;

  from autogen_core.components.models import OpenAIChatCompletionClient, FunctionExecutionResultMessage, FunctionExecutionResult


# Load .env

In [2]:
%load_ext dotenv
%dotenv .env

In [3]:
class Message(BaseModel):
    text: str
    source: Optional[str] = None
    image_url: Optional[str] = None # url or base64 encoded image

class Conversation(BaseModel):
    chat_history: List[Message]

In [4]:
class UserAgent(RoutedAgent):
    def __init__(self, description: str, group_manager_topic: str) -> None:
        super().__init__(description=description)
        self._group_manager_topic = group_manager_topic

    @message_handler
    async def handle_request_to_speak(self, message: Conversation, ctx: MessageContext) -> None:
        user_input = input("Enter your message, type 'TERMINATE' to terminate the task: ")
        print(f"### User: \n{user_input}")
        if user_input == "TERMINATE": # end the conversation
            return
        
        message.chat_history.append(Message(text=user_input, source=self.type))
        await self.publish_message(
            message,
            DefaultTopicId(type=self._group_manager_topic),
        )

In [5]:

class ImageAssistant(RoutedAgent):
    def __init__(self,
                 model_client: ChatCompletionClient,
                 system_message: str,
                 group_manager_topic: str) -> None:
        super().__init__(description=system_message)
        self._system_message = system_message
        self._group_manager_topic = group_manager_topic
        self._model_client = model_client
        self._load_local_image_tool = FunctionTool(self.load_local_image, description="Load local image from disk and return base64 encoded image")
        self._load_image_from_url_tool = FunctionTool(self.load_image_from_url, description="Load image from url and return base64 encoded image")
        self._tools = [self._load_local_image_tool, self._load_image_from_url_tool]

    # load local image from disk and return base64 encoded image
    async def load_local_image(self, path: str) -> str:
        with open(path, "rb") as image_file:
            return base64.b64encode(image_file.read()).decode('utf-8')
    
    async def load_image_from_url(self, url: str) -> str:
        return base64.b64encode(requests.get(url).content).decode('utf-8')

    @message_handler
    async def handle_request_to_speak(self, message: Conversation, ctx: MessageContext) -> None:
        chat_history: List[LLMMessage] = [SystemMessage(content=self._system_message)]
        last_msg = message.chat_history[-1]

        # firstly, try to infer if there is an image url or image path in the last message
        # e.g. Please describe this local image: /path/to/image.jpg
        load_image_prompt = """
        call the right tool to load the image if there is an image path or url in the last message,
        otherwise, say 'No image in the last message'
"""
        completion = await self._model_client.create(
            [SystemMessage(content=load_image_prompt), UserMessage([last_msg.text], source='user')],
            tools=self._tools,
            cancellation_token=ctx.cancellation_token,
        )
        if completion.content == "No image in the last message":
            reply = Message(text="No image in the last message", source=self.type)
            message.chat_history.append(reply)
            await self.publish_message(
                message,
                DefaultTopicId(type=self._group_manager_topic),
            )
            return
        
        user_message_parts : List[str | Image] = [last_msg.text]

        for tool_call in completion.content:
            arguments = tool_call.arguments
            arguments = json.loads(tool_call.arguments)
            if tool_call.name == self._load_local_image_tool.name:
                base64_image =  await self._load_local_image_tool.run_json(arguments, ctx.cancellation_token)
                user_message_parts.append(Image.from_base64(self._load_local_image_tool.return_value_as_string(base64_image)))
            elif tool_call.name == self._load_image_from_url_tool:
                base64_image =  await self._load_local_image_tool.run_json(arguments, ctx.cancellation_token)
                user_message_parts.append(Image.from_base64(self._load_image_from_url_tool.return_value_as_string(base64_image)))

        userMessage = UserMessage(user_message_parts, source='user')
        chat_history.append(userMessage)
        completion = await self._model_client.create(chat_history)
        response = completion.content
        if not isinstance(response, str):
            raise ValueError(f"Expected response to be a string, got {response}")

        print(f"### {self.type}: \n{response}")
        reply = Message(text=response, source=self.type)
        message.chat_history.append(reply)
        await self.publish_message(
            message,
            DefaultTopicId(type=self._group_manager_topic),
        )

In [6]:
class WeatherAssistant(RoutedAgent):
    def __init__(self,
                 model_client: ChatCompletionClient,
                 system_message: str,
                 group_manager_topic: str) -> None:
        super().__init__(description=system_message)
        self._system_message = system_message
        self._group_manager_topic = group_manager_topic
        self._model_client = model_client
        self._get_date_tool = FunctionTool(self.get_date, description="Get the current date.")
        self._get_weather_tool = FunctionTool(self.get_weather, description="Get the weather for a city and state on a given date.")
        self._tools = [self._get_weather_tool, self._get_date_tool]

    async def get_weather(self, city: str, state: str, date: str) -> str: # is the return type matter here?
    # get the weather for a city and state
        print(f"Getting the weather for {city}, {state} on {date}")
        return f"The weather in {city}, {state} on {date} will be sunny."
    
    async def get_date(self) -> str:
        today = datetime.datetime.today()
        return f"today is {today.strftime('%Y-%m-%d')}"
    
    @message_handler
    async def handle_request_to_speak(self, message: Conversation, ctx: MessageContext) -> None:
        chat_history: List[LLMMessage] = [SystemMessage(content=self._system_message)]
        last_msg = message.chat_history[-1]
        if last_msg.image_url is not None:
            reply = Message(text="can't recognize image in the last message", source=self.type)
            message.chat_history.append(reply)
            await self.publish_message(
                message,
                DefaultTopicId(type=self._group_manager_topic),
            )
        chat_history.append(UserMessage(content=last_msg.text, source="user"))
        while True:
            completion = await self._model_client.create(
                chat_history,
                tools=self._tools,
                cancellation_token=ctx.cancellation_token,
            )
            print(f"### {self.type}: \n{completion.content}")
            if not isinstance(completion.content, list):
                message.chat_history.append(Message(text=completion.content, source=self.type))
                await self.publish_message(
                    message,
                    DefaultTopicId(type=self._group_manager_topic),
                )
                
                break
            
            # run tool call
            for tool_call in completion.content:
                arguments = tool_call.arguments
                arguments = json.loads(tool_call.arguments)
                if tool_call.name == self._get_date_tool.name:
                    result = await self._get_date_tool.run_json(arguments, ctx.cancellation_token)
                    reply = Message(text=self._get_date_tool.return_value_as_string(result))
                    print(f"### {self.type}: \n{reply.text}")
                    await self.publish_message(
                        reply,
                        DefaultTopicId(type=self._group_manager_topic),
                    )
                    chat_history.append(AssistantMessage(content=[tool_call], source="assistant"))
                    chat_history.append(FunctionExecutionResultMessage(content=[FunctionExecutionResult(content=reply.text, call_id=tool_call.id)]))
                elif tool_call.name == self._get_weather_tool.name:
                    result = await self._get_weather_tool.run_json(arguments, ctx.cancellation_token)
                    reply = Message(text=self._get_weather_tool.return_value_as_string(result))
                    print(f"### {self.type}: \n{reply.text}")
                    await self.publish_message(
                        reply,
                        DefaultTopicId(type=self._group_manager_topic),
                    )
                    chat_history.append(AssistantMessage(content=[tool_call], source="assistant"))
                    chat_history.append(FunctionExecutionResultMessage(content=[FunctionExecutionResult(content=reply.text, call_id=tool_call.id)]))

In [7]:
class Assistant(RoutedAgent):
    def __init__(self,
                 model_client: ChatCompletionClient,
                 system_message: str,
                 group_manager_topic: str) -> None:
        super().__init__("An assistant agent.")
        self._model_client = model_client
        self._group_manager_topic = group_manager_topic
        self._system_message = system_message

    @message_handler
    async def handle_message(self, message: Conversation, ctx: MessageContext) -> None:
        chat_history: List[LLMMessage] = [SystemMessage(content=self._system_message)]
        for msg in message.chat_history:
            if msg.source != self.type:
                chat_history.append(UserMessage(content=msg.text, source=msg.source))
            else:
                chat_history.append(AssistantMessage(content=msg.text, source="assistant"))

        result = await self._model_client.create(chat_history)
        print(f"\n{'-'*80}\nAssistant:\n{result.content}")
        message.chat_history.append(Message(text=result.content, source=self.type))
        await self.publish_message(message, DefaultTopicId(type=self._group_manager_topic))  # type: ignore

In [8]:
class GroupChatManager(RoutedAgent):
    def __init__(self,
                    model_client: ChatCompletionClient,
                    weather_agent: str,
                    weather_agent_description: str,
                    image_agent: str,
                    image_agent_description: str,
                    user_agent: str,
                    user_agent_description: str,
                    assistant: str,
                    assistant_description: str) -> None:
        super().__init__("Group Chat Manager")
        self._model_client = model_client
        self._weather_agent = weather_agent
        self._weather_agent_description = weather_agent_description
        self._image_agent = image_agent
        self._image_agent_description = image_agent_description
        self._user_agent = user_agent
        self._user_agent_description = user_agent_description
        self._assistant = assistant
        self._assistant_description = assistant_description

    @message_handler
    async def handle_message(self, message: Conversation, ctx: MessageContext) -> None:
        # Format message history.
        messages: List[str] = []
        for msg in message.chat_history:
            messages.append(f"{msg.source}: {msg.text}")
        history = "\n".join(messages)
        # Format roles.
        roles = "\n".join(
            [
                f"{topic_type}: {description}".strip()
                for topic_type, description in zip(
                    [self._weather_agent, self._image_agent, self._user_agent, self._assistant],
                    [
                        self._weather_agent_description,
                        self._image_agent_description,
                        self._user_agent_description,
                        self._assistant_description,
                    ],
                )
            ]
        )
        selector_prompt = """You are in a role play game. The following roles are available:
{roles}.
Read the following conversation. Then select the next role from {participants} to play. Only return the role.

{history}

Read the above conversation. Then select the next role from {participants} to play. Only return the role.
"""
        available_roles = [
            self._weather_agent,
            self._image_agent,
            self._user_agent,
            self._assistant,
        ]
        system_message = SystemMessage(
            selector_prompt.format(
                roles=roles,
                history=history,
                participants=str(
                    [
                        topic_type
                        for topic_type in available_roles
                    ]
                ),
            )
        )

        completion = await self._model_client.create([system_message])
        assert isinstance(completion.content, str)
        for topic_type in available_roles:
            if topic_type.lower() in completion.content.lower():
                print(f"### {self.type}: \nSelected role: {topic_type}")
                await self.publish_message(message, DefaultTopicId(type=topic_type))
                return
        raise ValueError(f"Invalid role selected: {completion.content}")

In [9]:
runtime = SingleThreadedAgentRuntime()
gpt_4o_mini = OpenAIChatCompletionClient(
            api_key=os.environ.get("OPENAI_API_KEY"),
            model="gpt-4o-mini",
        )
gpt_4o = OpenAIChatCompletionClient(
            api_key=os.environ.get("OPENAI_API_KEY"),
            model="gpt-4o",
        )
user_agent_description = "A user agent that generates messages for the assistant agent."
user_agent = await UserAgent.register(
    runtime,
    type="user",
    factory=lambda: UserAgent(
        description=user_agent_description,
        group_manager_topic="admin",
    ),
)

image_agent_description = "An agent that processes images."
image_agent = await ImageAssistant.register(
    runtime,
    type="image",
    factory=lambda: ImageAssistant(
        model_client=gpt_4o_mini,
        system_message="You are an AI assistant that can process images.",
        group_manager_topic="admin",
    ),
)

weather_agent_description = "An agent that provides weather information."
weather_agent = await WeatherAssistant.register(
    runtime,
    type="weather",
    factory=lambda: WeatherAssistant(
        model_client=gpt_4o_mini,
        system_message="You are a helpful AI assistant that can provide weather information.",
        group_manager_topic="admin",
    ),
)

assistant_description = "An assistant agent that can help with various tasks."
assistant = await Assistant.register(
    runtime,
    type="assistant",
    factory=lambda: Assistant(model_client=gpt_4o_mini, system_message="You are a helpful AI assistant", group_manager_topic="admin"),
)

group_manager_description = "A group chat manager that routes messages to the appropriate agent."
group_manager = await GroupChatManager.register(
    runtime,
    type="admin",
    factory=lambda: GroupChatManager(
        model_client=gpt_4o,
        weather_agent=weather_agent.type,
        weather_agent_description=weather_agent_description,
        image_agent=image_agent.type,
        image_agent_description=image_agent_description,
        user_agent=user_agent.type,
        user_agent_description=user_agent_description,
        assistant=assistant.type,
        assistant_description=assistant_description,
    ),
)

await runtime.add_subscription(
    TypeSubscription("user", user_agent.type))
await runtime.add_subscription(
    TypeSubscription("assistant", assistant.type))
await runtime.add_subscription(
    TypeSubscription("image", image_agent.type))
await runtime.add_subscription(
    TypeSubscription("weather", weather_agent.type))
await runtime.add_subscription(
    TypeSubscription("admin", group_manager.type))

runtime.start()
session_id = str(uuid.uuid4())
msg = Conversation(chat_history=[])
await runtime.publish_message(
    msg,
    TopicId(type="user", source=session_id),
)

await runtime.stop_when_idle()

### User: 
What's the weather in seattle
### admin: 
Selected role: weather
### weather: 
[FunctionCall(id='call_1xZpc5c0atpKLjwuZsE6aR4f', arguments='{"city":"Seattle","state":"WA","date":"2023-10-01"}', name='get_weather')]
Getting the weather for Seattle, WA on 2023-10-01
### weather: 
The weather in Seattle, WA on 2023-10-01 will be sunny.
### weather: 
The weather in Seattle, WA on October 1, 2023, is expected to be sunny.
### admin: 
Selected role: user
### User: 
What's the weather in seattle today
### admin: 
Selected role: weather
### weather: 
[FunctionCall(id='call_AZ3ondsMBPeCgjxwG7W6WGNx', arguments='{}', name='get_date')]
### weather: 
today is 2024-10-25
### weather: 
[FunctionCall(id='call_l0rQlat6E8ytlTu6ngLivEbD', arguments='{"city":"Seattle","state":"WA","date":"2024-10-25"}', name='get_weather')]
Getting the weather for Seattle, WA on 2024-10-25
### weather: 
The weather in Seattle, WA on 2024-10-25 will be sunny.
### weather: 
The weather in Seattle, WA today (Octo