In [None]:
import os
from dotenv import load_dotenv
import asyncio
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import AzureOpenAIChatCompletionClient

from dataclasses import dataclass
from typing import List

from autogen_core import AgentId, MessageContext, RoutedAgent, SingleThreadedAgentRuntime, message_handler
from autogen_core.models import ChatCompletionClient, SystemMessage, UserMessage


# Load environment variables from a .env file
load_dotenv("/home/azureuser/azure-ai-agent-workshop/.env")

True

In [2]:
model_client=AzureOpenAIChatCompletionClient(
            model=os.getenv("AZURE_OPENAI_MODEL"),
            azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
            api_key=os.getenv("AZURE_OPENAI_KEY"),
            azure_deployment=os.getenv("AZURE_OPENAI_DEPLOYMENT"),
            api_version=os.getenv("AZUER_OPENAI_API_VERSION")
        )

In [3]:
@dataclass
class WorkerTask:
    task: str
    previous_results: List[str]


@dataclass
class WorkerTaskResult:
    result: str


@dataclass
class UserTask:
    task: str


@dataclass
class FinalResult:
    result: str

In [4]:
class WorkerAgent(RoutedAgent):
    def __init__(
        self,
        model_client: AzureOpenAIChatCompletionClient,
    ) -> None:
        super().__init__(description="Worker Agent")
        self._model_client = model_client

    @message_handler
    async def handle_task(self, message: WorkerTask, ctx: MessageContext) -> WorkerTaskResult:
        if message.previous_results:
            # If previous results are provided, we need to synthesize them to create a single prompt.
            system_prompt = "You have been provided with a set of responses from various open-source models to the latest user query. Your task is to synthesize these responses into a single, high-quality response. It is crucial to critically evaluate the information provided in these responses, recognizing that some of it may be biased or incorrect. Your response should not simply replicate the given answers but should offer a refined, accurate, and comprehensive reply to the instruction. Ensure your response is well-structured, coherent, and adheres to the highest standards of accuracy and reliability.\n\nResponses from models:"
            system_prompt += "\n" + "\n\n".join([f"{i+1}. {r}" for i, r in enumerate(message.previous_results)])
            model_result = await self._model_client.create(
                [SystemMessage(content=system_prompt), UserMessage(content=message.task, source="user")]
            )
        else:
            # If no previous results are provided, we can simply pass the user query to the model.
            model_result = await self._model_client.create([UserMessage(content=message.task, source="user")])
        assert isinstance(model_result.content, str)
        print(f"{'-'*80}\nWorker-{self.id}:\n{model_result.content}")
        return WorkerTaskResult(result=model_result.content)

In [5]:
class OrchestratorAgent(RoutedAgent):
    def __init__(
        self,
        model_client: AzureOpenAIChatCompletionClient,
        worker_agent_types: List[str],
        num_layers: int,
    ) -> None:
        super().__init__(description="Aggregator Agent")
        self._model_client = model_client
        self._worker_agent_types = worker_agent_types
        self._num_layers = num_layers

    @message_handler
    async def handle_task(self, message: UserTask, ctx: MessageContext) -> FinalResult:
        print(f"{'-'*80}\nOrchestrator-{self.id}:\nReceived task: {message.task}")
        # Create task for the first layer.
        worker_task = WorkerTask(task=message.task, previous_results=[])
        # Iterate over layers.
        for i in range(self._num_layers - 1):
            # Assign workers for this layer.
            worker_ids = [
                AgentId(worker_type, f"{self.id.key}/layer_{i}/worker_{j}")
                for j, worker_type in enumerate(self._worker_agent_types)
            ]
            # Dispatch tasks to workers.
            print(f"{'-'*80}\nOrchestrator-{self.id}:\nDispatch to workers at layer {i}")
            results = await asyncio.gather(*[self.send_message(worker_task, worker_id) for worker_id in worker_ids])
            print(f"{'-'*80}\nOrchestrator-{self.id}:\nReceived results from workers at layer {i}")
            # Prepare task for the next layer.
            worker_task = WorkerTask(task=message.task, previous_results=[r.result for r in results])
        # Perform final aggregation.
        print(f"{'-'*80}\nOrchestrator-{self.id}:\nPerforming final aggregation")
        system_prompt = "You have been provided with a set of responses from various open-source models to the latest user query. Your task is to synthesize these responses into a single, high-quality response. It is crucial to critically evaluate the information provided in these responses, recognizing that some of it may be biased or incorrect. Your response should not simply replicate the given answers but should offer a refined, accurate, and comprehensive reply to the instruction. Ensure your response is well-structured, coherent, and adheres to the highest standards of accuracy and reliability.\n\nResponses from models:"
        system_prompt += "\n" + "\n\n".join([f"{i+1}. {r}" for i, r in enumerate(worker_task.previous_results)])
        model_result = await self._model_client.create(
            [SystemMessage(content=system_prompt), UserMessage(content=message.task, source="user")]
        )
        assert isinstance(model_result.content, str)
        return FinalResult(result=model_result.content)

In [6]:
task = (
    "I have 432 cookies, and divide them 3:4:2 between Alice, Bob, and Charlie. How many cookies does each person get?"
)

In [8]:
runtime = SingleThreadedAgentRuntime()
await WorkerAgent.register(
    runtime, "worker", lambda: WorkerAgent(model_client=model_client)
)
await OrchestratorAgent.register(
    runtime,
    "orchestrator",
    lambda: OrchestratorAgent(
        model_client=model_client, worker_agent_types=["worker"] * 3, num_layers=3
    ),
)

runtime.start()
result = await runtime.send_message(UserTask(task=task), AgentId("orchestrator", "default"))
await runtime.stop_when_idle()
print(f"{'-'*80}\nFinal result:\n{result.result}")

--------------------------------------------------------------------------------
Orchestrator-orchestrator/default:
Received task: I have 432 cookies, and divide them 3:4:2 between Alice, Bob, and Charlie. How many cookies does each person get?
--------------------------------------------------------------------------------
Orchestrator-orchestrator/default:
Dispatch to workers at layer 0


--------------------------------------------------------------------------------
Worker-worker/default/layer_0/worker_0:
To divide the cookies in a ratio of 3:4:2 among Alice, Bob, and Charlie, we first need to determine the total parts in the ratio.

The total parts is calculated as follows:
\[ 
3 + 4 + 2 = 9 
\]

Next, we find the value of each part by dividing the total number of cookies by the total parts:
\[ 
\text{Value of each part} = \frac{432}{9} = 48 
\]

Now we can calculate the number of cookies each person gets based on the ratio:

- Alice receives:
\[ 
3 \times 48 = 144 
\]

- Bob receives:
\[ 
4 \times 48 = 192 
\]

- Charlie receives:
\[ 
2 \times 48 = 96 
\]

Therefore, the distribution of cookies is:
- Alice: 144 cookies
- Bob: 192 cookies
- Charlie: 96 cookies
--------------------------------------------------------------------------------
Worker-worker/default/layer_0/worker_1:
To divide the cookies in the ratio of 3:4:2, we first need to determine the total number 