In [1]:
from dotenv import load_dotenv
from autogen_core import MessageContext, RoutedAgent, SingleThreadedAgentRuntime, AgentId, message_handler
from dataclasses import dataclass

load_dotenv(override=True)

True

In [2]:
@dataclass
class Message:
    content: str

In [3]:
class SimpleAgent(RoutedAgent):
    def __init__(self) -> None:
        super().__init__("Simple")

    @message_handler
    async def handle_message(self, message: Message, ctx: MessageContext) -> Message:
        return Message(content=f"This is {self.id.key} | {self.id.type}. You said {message.content} and i disagree!")

In [4]:

runtime = SingleThreadedAgentRuntime()
await SimpleAgent.register(runtime, "simple_agent", lambda: SimpleAgent())

AgentType(type='simple_agent')

In [5]:
runtime.start()

In [6]:
agent_id = AgentId("simple_agent", "default")
response = await runtime.send_message(Message("Well hi there!"), agent_id)
print(">>>", response.content)

>>> This is default | simple_agent. You said Well hi there! and i disagree!


In [7]:
await runtime.stop()
await runtime.close()

In [8]:
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.messages import TextMessage
class MyLLMAgent(RoutedAgent):
    def __init__(self) -> None:
        super().__init__("llm_agent")
        model_client = OpenAIChatCompletionClient(model="gpt-4o-mini")
        self._delegate = AssistantAgent(name="llm_Agent", model_client=model_client)

    @message_handler
    async def handle_message(self, message: Message, ctx: MessageContext) -> Message:
        print(f"{self.id.type} received message: {message.content}")
        text_message = TextMessage(content=message.content, source="user")
        response = await self._delegate.on_messages([text_message], cancellation_token=ctx.cancellation_token)
        reply = response.chat_message.content
        print(f"{self.id.type} responded: {reply}")
        return Message(content=reply)

In [9]:
from autogen_core import SingleThreadedAgentRuntime

runtime = SingleThreadedAgentRuntime()
await SimpleAgent.register(runtime, "simple_agent", lambda: SimpleAgent())
await MyLLMAgent.register(runtime, "llm_agent", lambda: MyLLMAgent())

AgentType(type='llm_agent')

In [10]:
runtime.start()

llm_agent received message: Hi there!!!!
llm_agent responded: Hello! How can I assist you today?
llm_agent received message: This is default | simple_agent. You said Hello! How can I assist you today? and i disagree!
llm_agent responded: I apologize if my response didn't meet your expectations. How can I improve or assist you better?


In [11]:
response = await runtime.send_message(Message("Hi there!!!!"), AgentId("llm_agent", "default"))
print(">>>", response.content)
response = await runtime.send_message(Message(response.content), AgentId("simple_agent", "default"))
print(">>>", response.content)
response = await runtime.send_message(Message(response.content), AgentId("llm_agent", "default"))
print(">>>", response.content)


>>> Hello! How can I assist you today?
>>> This is default | simple_agent. You said Hello! How can I assist you today? and i disagree!
>>> I apologize if my response didn't meet your expectations. How can I improve or assist you better?


In [12]:
await runtime.stop()
await runtime.close()

In [14]:
from autogen_ext.models.ollama import OllamaChatCompletionClient

class Player1Agent(RoutedAgent):
    def __init__(self) -> None:
        super().__init__("Player_1")
        model_client = OpenAIChatCompletionClient(model="gpt-4o-mini")
        self._delegate = AssistantAgent(name="Player_1", model_client=model_client)
    
    @message_handler
    async def handle_message(self, message: Message, ctx: MessageContext) -> Message:
        text_message = TextMessage(content=message.content, source="user")
        response = await self._delegate.on_messages([text_message], cancellation_token=ctx.cancellation_token)
        return Message(content=response.chat_message.content)

class Player2Agent(RoutedAgent):
    def __init__(self) -> None:
        super().__init__("Player_2")
        model_client = OllamaChatCompletionClient(model="llama3.2:latest")
        self._delegate = AssistantAgent(name="Player_2", model_client=model_client)
    
    @message_handler
    async def handle_message(self, message: Message, ctx: MessageContext) -> Message:
        text_message = TextMessage(content=message.content, source="user")
        response = await self._delegate.on_messages([text_message], cancellation_token=ctx.cancellation_token)
        return Message(content=response.chat_message.content)


In [15]:
from autogen_core import model_context


JUDGE = "You are judging a game of rock, paper, scissors. The players have made these choices:\n"

class RockPaperScissorsAgent(RoutedAgent):
    def __init__(self) -> None:
        super().__init__("Judge")
        model_client = OpenAIChatCompletionClient(model="gpt-4o-mini")
        self._delegate = AssistantAgent(name="Judge", model_client=model_client)

    @message_handler
    async def handle_message(self, message: Message, ctx: MessageContext) -> Message:
        instruction = "You are playing rock, paper, scissors. Respond only with the one word, one of the following: rock, paper, or scissors."
        message = Message(content=instruction)
        inner_1 = AgentId("player1", "default")
        inner_2 = AgentId("player2", "default")
        response1 = await self.send_message(message, inner_1)
        response2 = await self.send_message(message, inner_2)
        result = f"Player 1: {response1.content}\nPlayer 2: {response2.content}\n"
        judgement = f"{JUDGE}{result}Who wins?"
        message = TextMessage(content=judgement, source="user")
        response = await self._delegate.on_messages([message], ctx.cancellation_token)
        return Message(content=result + response.chat_message.content)

In [16]:
runtime = SingleThreadedAgentRuntime()
await Player1Agent.register(runtime, "player1", lambda: Player1Agent())
await Player2Agent.register(runtime, "player2", lambda: Player2Agent())

await RockPaperScissorsAgent.register(runtime, "rock_paper_scissors", lambda: RockPaperScissorsAgent())

runtime.start()

In [17]:
agent_id = AgentId("rock_paper_scissors", "default")
message = Message(content="go")
response = await runtime.send_message(message, agent_id)
print(response.content)

Player 1: rock
Player 2: scissors
Player 1 wins. Rock beats scissors. TERMINATE


In [18]:
agent_id = AgentId("rock_paper_scissors", "default")
message = Message(content="go")
response = await runtime.send_message(message, agent_id)
print(response.content)

Player 1: scissors
Player 2: paper
Player 1 wins. Scissors beats paper. TERMINATE


In [20]:
await runtime.stop()
await runtime.close()