In [1]:
import asyncio
import time
from illufly.async_utils import AsyncUtils

# 创建一个异步服务实例
service = AsyncUtils()

# 定义一个复杂的异步生成器函数
async def complex_data_stream(count=5):
    """模拟一个复杂的数据流，无法简单地转换为同步代码"""
    for i in range(count):
        # 模拟一些异步操作
        await asyncio.sleep(0.5)  # 模拟IO等待
        # 模拟实时数据处理
        data = {'index': i, 'timestamp': time.time()}
        yield data

In [2]:
# 尝试直接使用生成器（这会失败）
try:
    for data in complex_data_stream():  # 这行会报错
        print(data)
except TypeError as e:
    print(f"直接使用失败: {e}")

直接使用失败: 'async_generator' object is not iterable


In [3]:
# 使用 AsyncUtils 包装后的正确使用方式
for data in service.wrap_async_generator(complex_data_stream()):
    print(f"收到数据: {data}")


收到数据: {'index': 0, 'timestamp': 1736852613.5019898}
收到数据: {'index': 1, 'timestamp': 1736852614.0034041}
收到数据: {'index': 2, 'timestamp': 1736852614.5053902}
收到数据: {'index': 3, 'timestamp': 1736852615.007243}
收到数据: {'index': 4, 'timestamp': 1736852615.508978}


In [4]:
# 或者使用异步上下文管理器
async def process_stream():
    async with service.managed_async():
        async for data in complex_data_stream():
            print(f"异步处理数据: {data}")

In [5]:
# 使用 to_sync 装饰器运行异步函数
@service.to_sync
async def run_process():
    await process_stream()
    
run_process()

异步处理数据: {'index': 0, 'timestamp': 1736852627.555938}
异步处理数据: {'index': 1, 'timestamp': 1736852628.057646}
异步处理数据: {'index': 2, 'timestamp': 1736852628.5585659}
异步处理数据: {'index': 3, 'timestamp': 1736852629.0596118}
异步处理数据: {'index': 4, 'timestamp': 1736852629.5607092}


In [1]:
from illufly.mq.message_bus import MessageBus

bus = MessageBus()
bus.subscribe("test")

bus.publish("test", "hello")
bus.publish("test")

for b in bus.collect():
    print(b)

{'content': 'hello', 'topic': 'test'}
{'block_type': 'end', 'topic': 'test'}


In [2]:
from illufly.mq.message_bus import MessageBus

# 在 Jupyter 中使用
bus = MessageBus()
# bus.subscribe("test")

# 发布一些消息
bus.publish("test", "hello")
bus.publish("test", "world")
bus.publish("test", end=True)

# 收集消息
for msg in bus.collect():
    print(msg)

# 此时事件循环应该已经被正确清理

{'content': 'hello', 'topic': 'test'}
{'content': 'world', 'topic': 'test'}
{'block_type': 'end', 'topic': 'test'}


In [9]:
a = MessageBus()
a.__hash__()

289234381

In [10]:
a

<illufly.mq.message_bus.MessageBus at 0x113d5dcd0>

In [11]:
a.__hash__()

289234381