In [1]:
from dataclasses import dataclass
from autogen_core import AgentId, MessageContext, RoutedAgent, message_handler
from autogen_core import SingleThreadedAgentRuntime
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.messages import TextMessage
from autogen_ext.models.ollama import OllamaChatCompletionClient
from dotenv import load_dotenv

load_dotenv(override=True)


True

In [2]:
## Lets have a single one

@dataclass
class Message:
    content: str

### Will define our agent

##### every agent will have an AgentID which will have 2 componenst
agent.id.type descibe the tyoe of agent
agent.id.key gives its unique indetifier
Any method with the @message_handler decorated will have the opportunity to recive the messages

In [3]:
class SimpleAgent(RoutedAgent):
    def __init__(self) -> None:
        super().__init__("Simple")
        
        @message_handler
        async def on_my_messsage(self, message: Message, ctx: MessageContext) -> Message:
            return Message(content=f"This is (self.id.type)-(self.id.key). You said '{message.content}' and I disagree.")

#### will create a standalone runtime and regsiter our agent type

In [4]:
class SimpleAgent(RoutedAgent):
    def __init__(self) -> None:
        super().__init__("Simple")
    
    @message_handler
    async def on_my_messsage(self, message: Message, ctx: MessageContext) -> Message:
        return Message(content=f"This is {self.id.type}-{self.id.key}. You said '{message.content}' and I disagree.")

### Lets start the runtime and send message

In [5]:
runtime = SingleThreadedAgentRuntime()
await SimpleAgent.register(runtime, "simple_agent", lambda: SimpleAgent())

AgentType(type='simple_agent')

In [6]:
runtime.start()

In [7]:
agent_id = AgentId("simple_agent", "default")
response = await runtime.send_message(Message("Well hi there!"), agent_id)
print(">>>", response.content)

>>> This is simple_agent-default. You said 'Well hi there!' and I disagree.


In [8]:
await runtime.stop()
await runtime.close()

#### Will use agentchat assistant!

In [9]:
class MyLLMAgent(RoutedAgent):
    def __init__(self) -> None:
        super().__init__("LLMAgent")
        model_clinet = OllamaChatCompletionClient(model="mistral")
        self._delegate = AssistantAgent("LLMAgent", model_client=model_clinet)
        
    @message_handler
    async def handle_my_message(self, message: Message, ctx: MessageContext) -> Message:
        print(f"{self.id.type} received message: {message.content}")
        text_message = TextMessage(content=message.content, source="user")
        response = await self._delegate.handle_message([text_message], ctx.cancellation_token)
        reply = response.chat_message.content
        print(f"{self.id.type} respond: {reply}")
        return Message(content=reply)

In [10]:
from autogen_core import SingleThreadedAgentRuntime

runtime = SingleThreadedAgentRuntime()
await SimpleAgent.register(runtime, "simple_agent", lambda: SimpleAgent())
await MyLLMAgent.register(runtime, "LLMAgent", lambda: MyLLMAgent())

AgentType(type='LLMAgent')

In [11]:
runtime.start()
response = await runtime.send_message(Message("Hello, there!"), AgentId("LLMAgent", "default"))
print(">>>", response.content)
response = await runtime.send_message(Message(response.content), AgentId("simple_agent", "default"))
print(">>>", response.content)


LLMAgent received message: Hello, there!


AttributeError: 'AssistantAgent' object has no attribute 'handle_message'

In [None]:
await runtime.stop()
await runtime.close()

## Will make a three agent interaction

In [12]:
from autogen_ext.models.ollama import OllamaChatCompletionClient
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.messages import TextMessage
from autogen_core import AgentId, MessageContext, RoutedAgent, message_handler
from autogen_ext.models.ollama import OllamaChatCompletionClient
from dataclasses import dataclass


class Player1Agent(RoutedAgent):
    def __init__(self, name: str) -> None:
        super().__init__(name)
        model_client = OllamaChatCompletionClient(model="llama3.2:1b", temperature=1.0)
        self._delegate = AssistantAgent(name, model_client=model_client)
        
    @message_handler
    async def handle_my_message_type(self, message: Message, ctx: MessageContext) -> "Message":
        text_message = TextMessage(content=message.content, source="user")
        response = await self._delegate.on_messages([text_message], cancellation_token=ctx.cancellation_token)
        return Message(content=response.chat_message.content)
    
class Player2Agent(RoutedAgent):
    def __init__(self, name: str) -> None:
        super().__init__(name)
        model_client = OllamaChatCompletionClient(model="llama3.2:1b", temperature=1.0)
        self._delegate = AssistantAgent(name, model_client=model_client)
        
    @message_handler
    async def handle_my_message_type(self, message: Message, ctx: MessageContext) -> "Message":
        text_message = TextMessage(content=message.content, source="user")
        response = await self._delegate.on_messages([text_message], cancellation_token=ctx.cancellation_token)
        return Message(content=response.chat_message.content)

In [13]:
JUDGE = "You are judging a game of rock, paper, scissors. The players have made these choices:\n"

class RockPaperScissorsAgent(RoutedAgent):
    def __init__(self, name: str) -> None:
        super().__init__(name)
        model_client = OllamaChatCompletionClient(model="llama3.2:1b", temperature=1.0)
        self._delegate = AssistantAgent(name, model_client=model_client)
        
    @message_handler
    async def handle_my_message_type(self, message: Message, ctx: MessageContext) -> Message:
        instruction = "You are playing rock, paper, scissors. Respond with either 'rock', 'paper', or 'scissors'."
        message = Message(content=instruction)
        inner_1 = AgentId("Player1", "default")
        inner_2 = AgentId("Player2", "default")
        response1 = await self.send_message(message, inner_1)
        response2 = await self.send_message(message, inner_2)
        result = f"Player 1: {response1.content}\nPlayer 2: {response2.content}.\n"
        judgement = f"{JUDGE}{result} Who wins?"
        
        text_message = TextMessage(content=judgement, source="user")
        response = await self._delegate.on_messages([text_message], cancellation_token=ctx.cancellation_token)
        return Message(content = result + response.chat_message.content)

In [14]:
## Have to register
runtime = SingleThreadedAgentRuntime()
await Player1Agent.register(runtime, "Player1", lambda: Player1Agent("Player1"))
await Player2Agent.register(runtime, "Player2", lambda: Player2Agent("Player2"))
await RockPaperScissorsAgent.register(runtime, "rock_paper_scissors", lambda: RockPaperScissorsAgent("rock_paper_scissors"))
runtime.start()

In [15]:
agent_id = AgentId("rock_paper_scissors", "default")
message = Message(content="go")
response = await runtime.send_message(message, agent_id)
print(response.content)

Player 1: I have my choice.
Player 2: I have responded. My turn: rock..
Since Player 1 has chosen "rock," and Player 2 has chosen "paper", according to the rules of rock, paper, scissors, Paper covers Rock, so Player 2 wins this round.

The score is now:
Player 1: 0
Player 2: 1

Would you like me to continue with the next round?


In [None]:
await runtime.stop()
await runtime.close()