In [1]:
from dataclasses import dataclass


@dataclass
class TextMessage:
    content: str
    source: str


@dataclass
class ImageMessage:
    url: str
    source: str


In [2]:
from autogen_core import AgentId, MessageContext, RoutedAgent, SingleThreadedAgentRuntime, message_handler


class MyAgent(RoutedAgent):
    @message_handler
    async def on_text_message(self, message: TextMessage, ctx: MessageContext) -> None:
        print(f"Hello, {message.source}, you said {message.content}!")

    @message_handler
    async def on_image_message(self, message: ImageMessage, ctx: MessageContext) -> None:
        print(f"Hello, {message.source}, you sent me {message.url}!")
runtime = SingleThreadedAgentRuntime()
await MyAgent.register(runtime, "my_agent", lambda: MyAgent("My Agent"))

AgentType(type='my_agent')

In [3]:
runtime.start()
agent_id = AgentId("my_agent", "default")
await runtime.send_message(TextMessage(content="Hello, World!", source="User"), agent_id)
await runtime.send_message(ImageMessage(url="https://example.com/image.jpg", source="User"), agent_id)
await runtime.stop_when_idle()

Hello, User, you said Hello, World!!
Hello, User, you sent me https://example.com/image.jpg!


In [4]:
class RountedBySenderAgent(RoutedAgent):
    @message_handler(match=lambda msg, ctx: msg.source.startswith("User1"))
    async def on_user1_message(self, message: TextMessage, ctx: MessageContext) -> None:
        print(f"User1 sent: {message.content}")
    @message_handler(match=lambda msg, ctx: msg.source.startswith("User2"))
    async def on_user2_message(self, message: TextMessage, ctx: MessageContext) -> None:
        print(f"User2 sent: {message.content}")
    @message_handler(match=lambda msg, ctx: msg.source.startswith("User3"))
    async def on_user3_message(self, message: TextMessage, ctx: MessageContext) -> None:
        print(f"User3 sent: {message.content}")
runtime = SingleThreadedAgentRuntime()
await RountedBySenderAgent.register(runtime, "rounted_by_sender_agent", lambda: RountedBySenderAgent("Routed By Sender Agent"))
runtime.start()
await runtime.send_message(TextMessage(content="Hello from User1", source="User1"), AgentId("rounted_by_sender_agent", "default"))
await runtime.send_message(TextMessage(content="Hello from User2", source="User2"), AgentId("rounted_by_sender_agent", "default"))
await runtime.send_message(TextMessage(content="Hello from User3", source="User3"), AgentId("rounted_by_sender_agent", "default"))
await runtime.stop_when_idle()

User1 sent: Hello from User1
User2 sent: Hello from User2
User3 sent: Hello from User3


In [5]:
from dataclasses import dataclass

from autogen_core import MessageContext, RoutedAgent, SingleThreadedAgentRuntime, message_handler


@dataclass
class Message:
    content: str


class InnerAgent(RoutedAgent):
    @message_handler
    async def on_my_message(self, message: Message, ctx: MessageContext) -> Message:
        return Message(content=f"Hello from inner, {message.content}")


class OuterAgent(RoutedAgent):
    def __init__(self, description: str, inner_agent_type: str):
        super().__init__(description)
        self.inner_agent_id = AgentId(inner_agent_type, self.id.key)

    @message_handler
    async def on_my_message(self, message: Message, ctx: MessageContext) -> None:
        print(f"Received message: {message.content}")
        # Send a direct message to the inner agent and receives a response.
        response = await self.send_message(Message(f"Hello from outer, {message.content}"), self.inner_agent_id)
        print(f"Received inner response: {response.content}")
runtime = SingleThreadedAgentRuntime()
await InnerAgent.register(runtime, "inner_agent", lambda: InnerAgent("Inner Agent"))
await OuterAgent.register(runtime, "outer_agent", lambda: OuterAgent("Outer Agent", "inner_agent"))
runtime.start()
await runtime.send_message(Message(content="Testing"), AgentId("outer_agent", "default"))
await runtime.stop_when_idle()

Received message: Testing
Received inner response: Hello from inner, Hello from outer, Testing


In [6]:
from autogen_core import type_subscription, AgentId, message_handler, RoutedAgent, TypeSubscription

@type_subscription("TextMessage")
class ReceivingAgent(RoutedAgent):
    @message_handler
    async def on_text_message(self, message: TextMessage, ctx: MessageContext) -> None:
        print(f"Hello, {message.source}, you said {message.content}!")


from autogen_core import TopicId

class BroadcastingAgent(RoutedAgent):
    
    @message_handler
    async def on_my_message(self, message: TextMessage, ctx: MessageContext) -> None:
        print(f"Broadcasting message: {message.content}")
        # Broadcast the message to all agents subscribed to the TextMessage type
        await self.publish_message(message, TopicId(type="default", source=self.id.key))

runtime = SingleThreadedAgentRuntime()
await ReceivingAgent.register(runtime, "receiving_agent", lambda: ReceivingAgent("Receiving Agent"))
await BroadcastingAgent.register(runtime, "broadcasting_agent", lambda: BroadcastingAgent("Broadcasting Agent"))
await runtime.add_subscription( TypeSubscription(topic_type="TextMessage", agent_type="broadcasting_agent"))
runtime.start()
await runtime.publish_message(TextMessage(content="Publishing Messing!", source="User"), topic_id=TopicId(type="TextMessage", source="broadcasting_agent"))
await runtime.stop_when_idle()

Hello, User, you said Publishing Messing!!
Broadcasting message: Publishing Messing!


In [7]:
from autogen_core.models import ChatCompletionClient, CreateResult, UserMessage

config = {
    "provider": "openai_chat_completion_client",
    "config": {"model": "gpt-4o"},
}

client = ChatCompletionClient.load_component(config)

messages = [
    UserMessage(content="Hello, how are you?", source="user"),
    UserMessage(content="What is the weather like today?", source="user"),
]

stream = client.create_stream(messages=messages)
async for chunk in stream:  # type: ignore
    if isinstance(chunk, str):
        # The chunk is a string.
        print(chunk, flush=True, end="")
    else:
        # The final chunk is a CreateResult object.
        assert isinstance(chunk, CreateResult) and isinstance(chunk.content, str)
        # The last response is a CreateResult object with the complete message.
        print("\n\n------------\n")
        print("The complete response:", flush=True)
        print(chunk.content, flush=True)

I'm sorry, but I can't provide real-time weather updates. I recommend checking a reliable weather website or app for the most current information.

------------

The complete response:
I'm sorry, but I can't provide real-time weather updates. I recommend checking a reliable weather website or app for the most current information.


In [8]:
from typing import Literal

from pydantic import BaseModel
from autogen_ext.models.openai import OpenAIChatCompletionClient

# The response format for the agent as a Pydantic base model.
class AgentResponse(BaseModel):
    thoughts: str
    response: Literal["happy", "sad", "neutral"]


# Create an agent that uses the OpenAI GPT-4o model with the custom response format.
model_client = OpenAIChatCompletionClient(
    model="gpt-4o",
    response_format=AgentResponse,  # type: ignore
)

# Send a message list to the model and await the response.
messages = [
    UserMessage(content="I am happy.", source="user"),
]
response = await model_client.create(messages=messages)
assert isinstance(response.content, str)
parsed_response = AgentResponse.model_validate_json(response.content)
print(parsed_response.thoughts)
print(parsed_response.response)

# Close the connection to the model client.
await model_client.close()


It's wonderful to hear that you're feeling happy! Moments of joy are precious and can brighten your entire day. Here are a few ways to maintain that positive mood:

1. **Gratitude Practice:** Reflect on the things you're grateful for. It amplifies happiness.
   
2. **Share Your Joy:** Spread your happiness by sharing it with friends or loved ones. A simple message or call can do wonders. 
   
3. **Creative Expression:** Channel your happiness into something creative like writing, drawing, or playing music. 

4. **Stay Active:** A short walk or any physical activity can enhance your mood even more.

5. **Mindful Breathing:** Take deep, mindful breaths to savor the moment. It can help anchor and enhance feelings of happiness. 

6. **Positive Vibe Playlist:** Listen to your favorite uplifting songs. Music can be a powerful mood booster.

Remember, happiness is contagious, and you're likely spreading positive vibes just by being happy. Keep smiling and make the most of your wonderful mood!

In [14]:
print(response.content)

{"thoughts":"It's wonderful to hear that you're feeling happy! Moments of joy are precious and can brighten your entire day. Here are a few ways to maintain that positive mood:\n\n1. **Gratitude Practice:** Reflect on the things you're grateful for. It amplifies happiness.\n   \n2. **Share Your Joy:** Spread your happiness by sharing it with friends or loved ones. A simple message or call can do wonders. \n   \n3. **Creative Expression:** Channel your happiness into something creative like writing, drawing, or playing music. \n\n4. **Stay Active:** A short walk or any physical activity can enhance your mood even more.\n\n5. **Mindful Breathing:** Take deep, mindful breaths to savor the moment. It can help anchor and enhance feelings of happiness. \n\n6. **Positive Vibe Playlist:** Listen to your favorite uplifting songs. Music can be a powerful mood booster.\n\nRemember, happiness is contagious, and you're likely spreading positive vibes just by being happy. Keep smiling and make the m

In [15]:
from autogen_core import RoutedAgent, message_handler, MessageContext
from autogen_core.models import UserMessage, SystemMessage
from autogen_ext.models.openai import OpenAIChatCompletionClient

class SimpleAgent(RoutedAgent):

    def __init__(self, name: str, model_client: OpenAIChatCompletionClient):
        super().__init__(name)
        self._model_client = model_client
        self._system_message = [SystemMessage(content="You are a helpful assistant.")]

    @message_handler
    async def on_user_message(self, message: UserMessage, ctx: MessageContext) -> None:
        print(f"Received message from {message.source}: {message.content}")
        # Create a response using the model client.
        response = await self._model_client.create(messages=self._system_message + [message], cancellation_token=ctx.cancellation_token)
        if isinstance(response.content, str):
            print(f"Response from model: {response.content}")
        else:
            print("Unexpected response format:", response.content)

runtime = SingleThreadedAgentRuntime()
model_client = OpenAIChatCompletionClient(model="gpt-4o")
await SimpleAgent.register(runtime, "simple_agent", lambda: SimpleAgent("Simple Agent", model_client))
runtime.start()
await runtime.send_message(UserMessage(content="Hello, how are you?", source="User"), AgentId("simple_agent", "default"))
await runtime.stop_when_idle()

Received message from User: Hello, how are you?
Response from model: Hello! I'm just a program, so I don't have feelings, but I'm here and ready to help you. How can I assist you today?


In [22]:
from IPython.display import display, Markdown
from autogen_core import RoutedAgent, message_handler, MessageContext
from autogen_core.model_context import BufferedChatCompletionContext
from autogen_core.models import UserMessage, SystemMessage, AssistantMessage
from autogen_ext.models.openai import OpenAIChatCompletionClient

@dataclass
class Message:
    content: str

class SimpleAgent(RoutedAgent):

    def __init__(self, name: str, model_client: OpenAIChatCompletionClient):
        super().__init__(name)
        self._model_client = model_client
        self._system_message = [SystemMessage(content="You are a helpful assistant.")]
        self._model_context = BufferedChatCompletionContext(buffer_size=10)

    @message_handler
    async def on_user_message(self, message: Message, ctx: MessageContext) -> None:
        user_message = UserMessage(content=message.content, source="user")
        print(f"Received message from {user_message.source}: {user_message.content}")
        await self._model_context.add_message(user_message)
        # Create a response using the model client.
        response = await self._model_client.create(messages=self._system_message + ( await self._model_context.get_messages()), cancellation_token=ctx.cancellation_token)
        if isinstance(response.content, str):
            # print(f"Response from model: {response.content}")   
            display(Markdown(f"**Response from model:** {response.content}"))
            await self._model_context.add_message(AssistantMessage(content=response.content, source=self.metadata["type"]))
        else:
            print("Unexpected response format:", response.content)

runtime = SingleThreadedAgentRuntime()
model_client = OpenAIChatCompletionClient(model="gpt-4o")
await SimpleAgent.register(runtime, "simple_agent", lambda: SimpleAgent("Simple Agent", model_client))
runtime.start()
agent_id = AgentId("simple_agent", "default")
msg1 = Message(content="Hello, describe some points about biriyani")
msg2 = Message(content="What is the 1st point you told before?")
# Send messages to the agent.
await runtime.send_message(message=msg1, recipient=agent_id)
await runtime.send_message(message=msg2, recipient= agent_id)
await runtime.stop_when_idle()

Received message from user: Hello, describe some points about biriyani


**Response from model:** Biryani is a popular and flavorful rice dish that has roots in the Indian subcontinent, and it is enjoyed by people all over the world. Here are some key points about biryani:

1. **Origin and History**: Biryani is believed to have originated in the Indian subcontinent, with influences from Persian cuisine. It is said to have been brought to India by Persian travelers and merchants, and it was further developed in royal kitchens.

2. **Main Ingredients**: The dish typically consists of basmati rice, meat (such as chicken, beef, goat, or lamb), and a variety of spices. In some variations, seafood or eggs are also used. Vegetarian versions often include vegetables or paneer.

3. **Spices and Flavoring**: Biryani is known for its rich and aromatic flavors, which come from a blend of spices such as cardamom, cloves, cinnamon, bay leaves, cumin, and saffron. Other ingredients may include ginger, garlic, mint, and yogurt.

4. **Cooking Method**: There are various methods of cooking biryani, but a common technique is the "Dum" method, where the rice and meat are layered and cooked together on low heat, allowing the flavors to meld.

5. **Types of Biryani**: There are numerous regional variations of biryani, each with its own unique twist. Some famous types include Hyderabadi Biryani, Lucknowi (Awadhi) Biryani, Kolkata Biryani, Malabar Biryani, and Sindhi Biryani, among others.

6. **Accompaniments**: Biryani is often served with side dishes such as raita (a yogurt-based side dish), salad, boiled eggs, or pickles.

7. **Cultural Significance**: Biryani holds significant cultural importance in South Asia. It is a staple dish at celebrations, weddings, and festivals, symbolizing hospitality and festivity.

8. **Global Popularity**: While biryani is traditionally from the Indian subcontinent, its popularity has spread globally. It is a favorite in many countries, often available in restaurants that specialize in Indian, Pakistani, Bangladeshi, or Middle Eastern cuisines.

9. **Variations and Innovations**: Modern chefs and home cooks have introduced various innovations and fusions to traditional biryani, incorporating new ingredients and cooking techniques to suit different palates and dietary preferences, including vegan and low-carb versions.

Overall, biryani is a beloved dish that offers a delightful combination of flavors, aromas, and textures, making it a timeless culinary treasure.

Received message from user: What is the 1st point you told before?


**Response from model:** The first point I mentioned about biryani was its origin and history. Biryani is believed to have originated in the Indian subcontinent, with influences from Persian cuisine. It is said to have been brought to India by Persian travelers and merchants, and it was further developed in royal kitchens.