In [1]:
from pydantic import BaseModel


class Task(BaseModel):
    task_id: str


class TaskResponse(BaseModel):
    task_id: str
    result: str

## Single message & multiple processors
Demonstrates how a single message can be processed by multiple agents subscribed to the same topic simultaneously.
- Each `Processor` agent subscribes to the default topic using the `default_subscription()` decorator.
- When publishing a message to the default topic, all registered agents will process the message independently.

Below, we are subscribing `Processor` using the `default_subscription()` decorator, there’s an alternative way to subscribe an agent without using decorators altogether as shown in [Subscribe and Publish to Topics](https://microsoft.github.io/autogen/stable/user-guide/core-user-guide/framework/message-and-communication.html#subscribe-and-publish-to-topics), this way the same agent classcan be subscribed to different topics.



In [2]:
from autogen_core import (
    DefaultTopicId,
    MessageContext,
    RoutedAgent,
    default_subscription,
    message_handler,
)

import asyncio


@default_subscription
class Processor(RoutedAgent):
    @message_handler
    async def on_task(self, message: Task, ctx: MessageContext) -> None:
        print(f"{self._description} starting task {message.task_id}")
        await asyncio.sleep(2)  # Simulate work
        print(f"{self._description} finished task {message.task_id}")

In [3]:
from autogen_core import SingleThreadedAgentRuntime

runtime: SingleThreadedAgentRuntime = SingleThreadedAgentRuntime()

await Processor.register(runtime, "agent_1", lambda: Processor(description="Agent 1"))
await Processor.register(runtime, "agent_2", lambda: Processor(description="Agent 2"))

runtime.start()

await runtime.publish_message(Task(task_id="task-1"), topic_id=DefaultTopicId())

await runtime.stop_when_idle()

Agent 1 starting task task-1
Agent 2 starting task task-1
Agent 1 finished task task-1
Agent 2 finished task task-1


## Multiple messages & Multiple Processors
Second, this pattern demonstrates routing different types of messages to specific processors:
- `UrgentProcessor` subscribes to the “urgent” topic
- `NormalProcessor` subscribes to the “normal” topic

We make an agent subscribe to a specific topic type using the `type_subscription()` decorator.


In [4]:
from enum import StrEnum, auto


class TopicType(StrEnum):
    URGENT = auto()
    NORMAL = auto()
    TASK_RESULTS = auto()


class AgentType(StrEnum):
    URGENT_PROCESSOR = auto()
    NORMAL_PROCESSOR = auto()

In [5]:
from autogen_core import type_subscription, TopicId, RoutedAgent, MessageContext
import asyncio


task_results_topic_id: TopicId = TopicId(type=TopicType.TASK_RESULTS, source="default")


@type_subscription(topic_type=TopicType.URGENT)
class UrgentProcessor(RoutedAgent):
    @message_handler
    async def on_task(self, message: Task, ctx: MessageContext) -> None:
        print(f"Urgent processor starting task '{message.task_id}'")
        await asyncio.sleep(1)  # Simulate work
        print(f"Urgent processor finished task '{message.task_id}'")

        task_response = TaskResponse(
            task_id=message.task_id, result="Results by Urgent Processor"
        )
        await self.publish_message(task_response, topic_id=task_results_topic_id)


@type_subscription(topic_type=TopicType.NORMAL)
class NormalProcessor(RoutedAgent):
    @message_handler
    async def on_task(self, message: Task, ctx: MessageContext) -> None:
        print(f"Normal processor starting task '{message.task_id}'")
        await asyncio.sleep(3)  # Simulate work
        print(f"Normal processor finished task '{message.task_id}'")

        task_response = TaskResponse(
            task_id=message.task_id, result="Results by Normal Processor"
        )
        await self.publish_message(task_response, topic_id=task_results_topic_id)

In [6]:
from autogen_core import SingleThreadedAgentRuntime

runtime: SingleThreadedAgentRuntime = SingleThreadedAgentRuntime()

await UrgentProcessor.register(
    runtime,
    AgentType.URGENT_PROCESSOR,
    lambda: UrgentProcessor(description="Urgent Processor"),
)
await NormalProcessor.register(
    runtime,
    AgentType.NORMAL_PROCESSOR,
    lambda: NormalProcessor(description="Normal Processor"),
)

runtime.start()

await runtime.publish_message(
    Task(task_id="normal-1"), topic_id=TopicId(type="normal", source="default")
)
await runtime.publish_message(
    Task(task_id="urgent-1"), topic_id=TopicId(type="urgent", source="default")
)

await runtime.stop_when_idle()

Normal processor starting task 'normal-1'
Urgent processor starting task 'urgent-1'


Urgent processor finished task 'urgent-1'
Normal processor finished task 'normal-1'


## Collecting results
We typically want to collect results programmatically. 

 We’ve defined a dedicated topic `TASK_RESULT` where both `UrgentProcessor` and `NormalProcessor` publish their results. To collect these result messages, we’ll use a `ClosureAgent`: the `ClosureAgent` will process messages from `TASK_RESULT` this topic.


In [21]:
from enum import StrEnum, auto


class TopicType(StrEnum):
    URGENT = auto()
    NORMAL = auto()
    TASK_RESULTS = auto()


class AgentType(StrEnum):
    URGENT_PROCESSOR = auto()
    NORMAL_PROCESSOR = auto()
    CLOSURE_AGENT = auto()

In [22]:
from autogen_core import type_subscription, TopicId, RoutedAgent, MessageContext
import asyncio


task_results_topic_id: TopicId = TopicId(type=TopicType.TASK_RESULTS, source="default")


@type_subscription(topic_type=TopicType.URGENT)
class UrgentProcessor(RoutedAgent):
    @message_handler
    async def on_task(self, message: Task, ctx: MessageContext) -> None:
        print(f"Urgent processor starting task '{message.task_id}'")
        await asyncio.sleep(1)  # Simulate work
        print(f"Urgent processor finished task '{message.task_id}'")

        task_response = TaskResponse(
            task_id=message.task_id, result="Results by Urgent Processor"
        )
        await self.publish_message(task_response, topic_id=task_results_topic_id)


@type_subscription(topic_type=TopicType.NORMAL)
class NormalProcessor(RoutedAgent):
    @message_handler
    async def on_task(self, message: Task, ctx: MessageContext) -> None:
        print(f"Normal processor starting task '{message.task_id}'")
        await asyncio.sleep(3)  # Simulate work
        print(f"Normal processor finished task '{message.task_id}'")

        task_response = TaskResponse(
            task_id=message.task_id, result="Results by Normal Processor"
        )
        await self.publish_message(task_response, topic_id=task_results_topic_id)

In [23]:
from autogen_core import ClosureContext, MessageContext

# python queue where collect_result will put the results
queue = asyncio.Queue[TaskResponse]()


# func to consume messages on TASK_RESULT topic; it will called by ClosureAgent via lambda
async def collect_result(
    _agent: ClosureContext, message: TaskResponse, ctx: MessageContext
) -> None:
    await queue.put(message)

In [24]:
from autogen_core import ClosureAgent, TypeSubscription

runtime: SingleThreadedAgentRuntime = SingleThreadedAgentRuntime()

# register processor agents
await UrgentProcessor.register(
    runtime,
    AgentType.URGENT_PROCESSOR,
    lambda: UrgentProcessor(description="Urgent Processor"),
)
await NormalProcessor.register(
    runtime,
    AgentType.NORMAL_PROCESSOR,
    lambda: NormalProcessor(description="Normal Processor"),
)

# register the closure agent; attach the collect_result function via lambda
await ClosureAgent.register_closure(
    runtime,
    AgentType.CLOSURE_AGENT,
    collect_result,
    subscriptions=lambda: [
        TypeSubscription(
            topic_type=TopicType.TASK_RESULTS, agent_type=AgentType.CLOSURE_AGENT
        )
    ],
)

runtime.start()

# publish messages and wait for processing to finish
await runtime.publish_message(
    Task(task_id="normal-1"), topic_id=TopicId(type=TopicType.NORMAL, source="default")
)
await runtime.publish_message(
    Task(task_id="urgent-1"), topic_id=TopicId(type=TopicType.URGENT, source="default")
)

await runtime.stop_when_idle()

Normal processor starting task 'normal-1'
Urgent processor starting task 'urgent-1'
Urgent processor finished task 'urgent-1'
Normal processor finished task 'normal-1'


In [25]:
# consume the results from the queue
while not queue.empty():
    print(await queue.get())

task_id='urgent-1' result='Results by Urgent Processor'
task_id='normal-1' result='Results by Normal Processor'
