## Broadcast messaging
In this mode, the messages are not sent directly to a an agent but are just "published", 
if someone has registered for such message it will received it otherwise it will simply get lost.  

Also important to note:  
- Request/Response mode won't work in this case.
- If an agent return a message from an handler it will be ignored.
- If an agent pushes a message for with it has subscribed it won't receive it.

Let's now create an example with two agents, each subscribing to a specific topic.

In [None]:
from dataclasses import dataclass
from autogen_core import AgentId, MessageContext, RoutedAgent, message_handler, TopicId, DefaultTopicId
from autogen_core import SingleThreadedAgentRuntime, type_subscription, TypeSubscription
from rich import print
import time
import random
import asyncio

As usual we need some messaging types

In [None]:
@dataclass
class Message:
    content: str

Now we create the agent, in this case is important to note that we can create a type subscription in two ways:

1. Using the `@type_subscription decorator`,` as with OrderAgent.
2. Using the runtime `add_subscription` method.

In [None]:

@type_subscription(topic_type="order")
class OrderAgent(RoutedAgent):
    @message_handler
    async def on_my_message(self, message: Message, ctx: MessageContext) -> None:
        print(f"[cyan]OrderAgent received message: '{message.content}'[/cyan]")
        time.sleep(1)
        print(f"[cyan]Order processed, publishing request to continue with shipping.[/cyan]")
        order_id=random.randint(1,10)
        await self.publish_message(Message(content=f"Please ship order {order_id}"), topic_id=TopicId("shipping", self.id.key))


class ShippingAgent(RoutedAgent):
    @message_handler
    async def on_my_message(self, message: Message, ctx: MessageContext) -> None:
        print(f"[yellow]ShippingAgent received message: '{message.content}'[/yellow]") 
        time.sleep(1)
        print(f"[yellow]Order shipped.[/yellow]")       

Now, as usual, we need to register the agents with the runtime, the presence of the type_subscription decorator will automatically
map the topic `order` to OrderAgent and `shipping` to ShippingAgent using `default` as topic source.

In [None]:
runtime = SingleThreadedAgentRuntime()
await OrderAgent.register(runtime, "order_agent", lambda: OrderAgent("I am the order agent")) # Register the OrderAgent and type subscription
await ShippingAgent.register(runtime, "shipping_agent", lambda: ShippingAgent("I am the shipping agent")) # Register only the ShippingAgent

type_subscription=TypeSubscription(topic_type="shipping", agent_type="shipping_agent")
await runtime.add_subscription(type_subscription) # Subscribe the ShippingAgent to the "shipping" topic (other mode)

# start the runtime
runtime.start()

print("[green]Publishing 1st order...[/green]")
await runtime.publish_message(
    Message(content="I want to order an IPhone 16 Pro"),
            topic_id=TopicId(type="order",source="default"))

await runtime.stop_when_idle()
await runtime.close()

As you see from the printed output the initial published topic has been collected by Order agent and the topic he published
has been collected by the shipping agent.

*Can the same agent subscribe to multiple topics?* 

Yes 😊 you can add more than one `type_subscription` decorators or use the `add_subscription` method.

### Getting resuls out from the workflow
Since with broadcasting we can't use request/response, how can we get the result from system like we did with direct messaging?  

The approach to use is to use a dedicated agent whose goal is to collect the final result/s and make them available externally.  

For common scenarios AutoGen offers the `ClosureAgent`

Step 1: Import some AutoGen types, create a message that will contain the results, and a Queue to collect those messages

In [None]:
from autogen_core import ClosureAgent, ClosureContext, DefaultSubscription

@dataclass
class FinalResult:
    value: str

queue = asyncio.Queue[FinalResult]()

We create a new Shipping Agent that will publish a `FinalResult` message when order is shipped

In [None]:
from autogen_core import SingleThreadedAgentRuntime, type_subscription

@type_subscription(topic_type="shipping")
class ShippingAgentWithConfirm(RoutedAgent):
    @message_handler
    async def on_my_message(self, message: Message, ctx: MessageContext) -> None:
        print(f"[yellow]ShippingAgent received message: '{message.content}'[/yellow]") 
        time.sleep(1)
        await self.publish_message(FinalResult(value="Order successfully shipped."), topic_id=DefaultTopicId())
        print(f"[yellow]Order shipped.[/yellow]")   

Create a new runtime and add the required agent and typer registrations.

In [None]:
runtime = SingleThreadedAgentRuntime()

await OrderAgent.register(runtime, "order_agent", lambda: OrderAgent("I am the order agent")) 
await ShippingAgentWithConfirm.register(runtime, "shipping_agent", lambda: ShippingAgentWithConfirm("I am the shipping agent")) 

Now we need to add the registration for the agent that will collect the `FinalResult` message.

In [None]:

async def output_result(_agent: ClosureContext, message: FinalResult, ctx: MessageContext) -> None:
    await queue.put(message)

await ClosureAgent.register_closure(runtime, "output_result", output_result, subscriptions=lambda: [DefaultSubscription()])

As before, we broadcast the initial message and, once the runtime processed all the messages, we read the content of the queue that will contain the result/s we wish to export.

In [None]:
runtime.start()

print("[green]Publishing 1st order...[/green]")
await runtime.publish_message(
    Message(content="I want to order an IPhone 16 Pro"),
            topic_id=TopicId(type="order",source="default"))

await runtime.stop_when_idle()
await runtime.close()
while not queue.empty():
        print((result := await queue.get()).value)