### AutoGen Core - Distributed Agent Runtime

[Distributed runtime](https://microsoft.github.io/autogen/stable//user-guide/core-user-guide/core-concepts/architecture.html#distributed-agent-runtime) is suitable for multi-process applications where agents may be implemented in different programming languages and running on different machines.

A distributed runtime consists of:
- `host servicer` - The host servicer facilitates communication between agents across workers and maintains the states of connections.
- `multiple workers`  The workers run agents and communicate with the host servicer via gateways. They advertise to the host servicer the agents they run and manage the agents’ lifecycles.

In [None]:
# Importing required libraries

from autogen_core import AgentId, MessageContext, RoutedAgent, message_handler
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.messages import TextMessage
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_ext.tools.langchain import LangChainToolAdapter

from langchain_community.utilities import GoogleSerperAPIWrapper
from langchain.agents import Tool
from IPython.display import display, Markdown
from dataclasses import dataclass

from dotenv import load_dotenv

In [None]:
# Loading up environment variables
load_dotenv(override=True)

ALL_IN_ONE_WORKER = False

#### Simple Message class

In [None]:
@dataclass
class Message:
    content: str

#### Starting up the host service
The code below starts the host service in the background and accepts worker connections on port 50051.

In [None]:
from autogen_ext.runtimes.grpc import GrpcWorkerAgentRuntimeHost

host = GrpcWorkerAgentRuntimeHost(address="localhost:50051")
host.start() 

#### Introducing internet search tool

In [None]:
serper = GoogleSerperAPIWrapper()
langchain_serper =Tool(
                    name="internet_search", 
                    func=serper.run, 
                    description="Useful for running internet searches")
autogen_serper = LangChainToolAdapter(langchain_serper)

In [None]:
favouring_instruction = "To help with a decision on whether to use AutoGen in a new AI Agent project, \
please research and briefly respond with reasons in favor of choosing AutoGen; the pros of AutoGen."

opposing_instruction = "To help with a decision on whether to use AutoGen in a new AI Agent project, \
please research and briefly respond with reasons against choosing AutoGen; the cons of Autogen."

judge_instruction = "You must make a decision on whether to use AutoGen for a project. \
Your research team has come up with the following reasons for and against. \
Based purely on the research from your team, please respond with your decision and brief rationale."

#### Creating Agents

In [None]:
class PlayerOneAgent(RoutedAgent):
    def __init__(self, name: str) -> None:
        super().__init__(name)
        model_client = OpenAIChatCompletionClient(model="gemini-2.0-flash")
        self._delegate = AssistantAgent(name, model_client=model_client, tools=[autogen_serper], reflect_on_tool_use=True)

    @message_handler
    async def handle_my_message_type(self, message: Message, ctx: MessageContext) -> Message:
        text_message = TextMessage(content=message.content, source="user")
        response = await self._delegate.on_messages([text_message], ctx.cancellation_token)
        return Message(content=response.chat_message.content)
    
class PlayerTwoAgent(RoutedAgent):
    def __init__(self, name: str) -> None:
        super().__init__(name)
        model_client = OpenAIChatCompletionClient(model="gemini-2.0-flash")
        self._delegate = AssistantAgent(name, model_client=model_client, tools=[autogen_serper], reflect_on_tool_use=True)

    @message_handler
    async def handle_my_message_type(self, message: Message, ctx: MessageContext) -> Message:
        text_message = TextMessage(content=message.content, source="user")
        response = await self._delegate.on_messages([text_message], ctx.cancellation_token)
        return Message(content=response.chat_message.content)
    
class JudgeAgent(RoutedAgent):
    def __init__(self, name: str) -> None:
        super().__init__(name)
        model_client = OpenAIChatCompletionClient(model="gemini-2.0-flash")
        self._delegate = AssistantAgent(name, model_client=model_client)
        
    @message_handler
    async def handle_my_message_type(self, message: Message, ctx: MessageContext) -> Message:
        favouring_message = Message(content=favouring_instruction)
        opposing_message = Message(content=opposing_instruction)
        player_one = AgentId("player1", "default")
        player_two = AgentId("player2", "default")
        response1 = await self.send_message(favouring_message, player_one)
        response2 = await self.send_message(opposing_message, player_two)
        result = f"## Pros of AutoGen:\n{response1.content}\n\n## Cons of AutoGen:\n{response2.content}\n\n"
        judgement = f"{judge_instruction}\n{result}Respond with your decision and brief explanation"
        message = TextMessage(content=judgement, source="user")
        response = await self._delegate.on_messages([message], ctx.cancellation_token)
        return Message(content=result + "\n\n## Decision:\n\n" + response.chat_message.content)


#### Now we can set up the worker agent runtimes. We use `GrpcWorkerAgentRuntime`.

In [None]:
from autogen_ext.runtimes.grpc import GrpcWorkerAgentRuntime

if ALL_IN_ONE_WORKER:

    worker = GrpcWorkerAgentRuntime(host_address="localhost:50051")
    await worker.start()

    await PlayerOneAgent.register(worker, "player1", lambda: PlayerOneAgent("player1"))
    await PlayerTwoAgent.register(worker, "player2", lambda: PlayerTwoAgent("player2"))
    await JudgeAgent.register(worker, "judge", lambda: JudgeAgent("judge"))

    agent_id = AgentId("judge", "default")

else:

    worker1 = GrpcWorkerAgentRuntime(host_address="localhost:50051")
    await worker1.start()
    await PlayerOneAgent.register(worker1, "player1", lambda: PlayerOneAgent("player1"))

    worker2 = GrpcWorkerAgentRuntime(host_address="localhost:50051")
    await worker2.start()
    await PlayerTwoAgent.register(worker2, "player2", lambda: PlayerTwoAgent("player2"))

    worker = GrpcWorkerAgentRuntime(host_address="localhost:50051")
    await worker.start()
    await JudgeAgent.register(worker, "judge", lambda: JudgeAgent("judge"))
    agent_id = AgentId("judge", "default")



In [None]:
response = await worker.send_message(Message(content="Go!"), agent_id)

In [None]:
display(Markdown(response.content))

#### To stop the worker runtimes, we can call `stop()`

In [None]:
await worker.stop()
if not ALL_IN_ONE_WORKER:
    await worker1.stop()
    await worker2.stop()

#### We can call `stop()` to stop the host service.

In [None]:
await host.stop()