# Run Msgbus Demo (Notebook)

这个 notebook 将发送一条消息到 `memory://astrbot` 然后接收并打印结果。逐个运行代码单元。

In [None]:
from kombu import Queue # type: ignore
from astrbot_canary_api.msgbus import AstrbotMessageBus

print('imports ok')

imports ok


In [None]:
async def main() -> None:
    try:
        AstrbotMessageBus.resetBus()
        bus = AstrbotMessageBus.getBus('memory://astrbot')

        q = Queue('demo_test', bus.exchange, routing_key='demo_test')

        print('[demo] sending...')
        await bus.async_send({'msg': 'hello demo'}, exchange=bus.exchange, routing_key='demo_test', declare=[q])

        print('[demo] receiving (timeout=2s)...')
        got = await bus.async_receive_once('demo_test', timeout=2.0)
        print('[demo] received:', got)

    except Exception as e:
        print('[demo] error:', type(e).__name__, e)
    finally:
        AstrbotMessageBus.resetBus()

In [None]:

await main()


[demo] sending...
[demo] receiving (timeout=2s)...
[demo] received: {'msg': 'hello demo'}


# 拆分细讲

## 说明（概览）
下面我们把演示拆成几个步骤：初始化消息总线、声明队列/交换机、发送消息、接收消息、以及使用异步迭代逐条消费并最终清理资源。每个代码单元都可以单独运行。

In [None]:
# 1) 初始化：重置并创建一个 memory://astrbot 总线实例，声明一个 Queue 与 exchange
from kombu import Queue  # type: ignore
from astrbot_canary_api.msgbus import AstrbotMessageBus

# 重置全局单例（安全起见）
AstrbotMessageBus.resetBus()
bus = AstrbotMessageBus.getBus('memory://astrbot')
q = Queue('demo_test', bus.exchange, routing_key='demo_test')

print('bus initialized ->', bus, 'queue ->', q.name)

bus initialized -> <astrbot_canary_api.msgbus.AstrbotMessageBus object at 0x0000027979CB8AD0> queue -> demo_test


## 应用场景示例：点对点（Point-to-Point）
场景：任务队列，生产者把任务放到队列，只有一个消费者会处理每条任务（竞争消费者）。下面示例演示发送端和接收端的最简交互（在 notebook 中可以分别运行发送与接收单元）。

In [None]:
# P2P - 发送（运行此单元发送一条任务）
await bus.async_send({'task':'process_image','id':101}, exchange=bus.exchange, routing_key='tasks', declare=[Queue('tasks', bus.exchange, routing_key='tasks')])
print('sent task 101')

In [None]:
# P2P - 接收（运行此单元拉取一条任务）
got = await bus.async_receive_once('tasks', timeout=2.0)
print('got task ->', got)

## 应用场景示例：广播（Broadcast / Fanout）与组播（Multicast / Topic）
示例说明：广播使用 fanout 把消息发给所有绑定队列；组播（topic）允许基于 routing_key 模式选择性分发。

In [None]:
# 广播示例（fanout） - 发送（广播给所有绑定队列）
from kombu import Exchange, Queue 
exch_f = Exchange('broadcasts', type='fanout')
# 发送（忽略 routing_key）
await bus.async_send({'notice':'maintenance'}, exchange=exch_f, routing_key='', declare=[Queue('svcA-notify', exch_f), Queue('svcB-notify', exch_f)])
print('broadcast sent')

In [None]:
# 组播示例（topic） - 发送与订阅示例
exch_t = Exchange('events', type='topic')
# 发送特定主题事件
await bus.async_send({'user_id':42}, exchange=exch_t, routing_key='user.created', declare=[Queue('user-events', exch_t, routing_key='user.*')])
print('topic event sent')

## 交换机（Exchange）和 routing_key 详解
Exchange 是路由器：生产者把消息发到交换机，交换机会根据类型（direct/topic/fanout）和 routing_key/绑定规则把消息分发给匹配的队列。routing_key 是消息携带的键，用来匹配绑定规则。

In [None]:
# 示例：创建 direct exchange 并发送到特定 routing_key
exch_d = Exchange('astrbot', type='direct')
await bus.async_send({'hello':'direct'}, exchange=exch_d, routing_key='demo_test', declare=[Queue('demo_test', exch_d, routing_key='demo_test')])
print('direct example sent')

## declare：为什么要显式声明（declare）
declare 确保在发送或消费前，Exchange/Queue/Binding 已在 broker（或内存传输）中存在：在 memory://astrbot 场景下尤其重要，发送方和接收方都需要 declare 相同的 Queue/Exchange 才能互通。

In [None]:
# declare 示例：发送方在发送前声明队列/交换机（即便接收方尚未启动）
qtemp = Queue('temp', bus.exchange, routing_key='temp')
await bus.async_send({'x':1}, exchange=bus.exchange, routing_key='temp', declare=[qtemp])
print('sent with declare')

## 异步迭代（逐条消费）
如果你需要连续地消费队列中的消息，可以使用 `iterate` 返回的异步生成器。下面示例演示如何用 `async for` 拉取若干条消息，然后退出并清理。

In [2]:
# 4) 异步迭代示例：发两条消息，然后用 iterate 读取两条
async def produce_two():
    await bus.async_send({'n': 1}, exchange=bus.exchange, routing_key='demo_test', declare=[q])
    await bus.async_send({'n': 2}, exchange=bus.exchange, routing_key='demo_test', declare=[q])
    print('produced 2 messages')

await produce_two()

# 迭代并处理两条消息（短超时以防无限等待）
count = 0
async for msg in bus.iterate('demo_test', timeout=2.0):
    print('iter got ->', msg)
    count += 1
    if count >= 2:
        break

print('iteration done, processed', count)

produced 2 messages
iter got -> body={'n': 1} headers={} properties={'delivery_mode': 2, 'delivery_info': {'exchange': 'astrbot', 'routing_key': 'demo_test'}, 'priority': 0, 'body_encoding': 'base64', 'delivery_tag': '6a0c9330-d15b-4ced-8591-db07f72d8d8c'} delivery_info={'exchange': 'astrbot', 'routing_key': 'demo_test'}
iter got -> body={'n': 2} headers={} properties={'delivery_mode': 2, 'delivery_info': {'exchange': 'astrbot', 'routing_key': 'demo_test'}, 'priority': 0, 'body_encoding': 'base64', 'delivery_tag': 'daa08c6b-a8c6-4186-961b-57b4ffa0739d'} delivery_info={'exchange': 'astrbot', 'routing_key': 'demo_test'}
iteration done, processed 2


## 清理资源
示例结束后，调用 `resetBus()` 释放单例并关闭底层连接。

In [3]:
# 5) 清理
AstrbotMessageBus.resetBus()
print('bus reset')

bus reset
