# Async Example

In [1]:
import asyncio

In [2]:
async def producer(queue):
    for i in range(1_000_000):
        await queue.put(f"Item {i}")
        print(f"Produced Item {i}")
        await asyncio.sleep(1)

In [6]:
async def consumer(queue, n_samples=10, consumer_id=0):
    for i in range(n_samples):
        item = await queue.get()
        if item is None:
            break
        print(f"Consumer {consumer_id} Consumed {item}")
        queue.task_done()  # optional

## Single Consumer

In [4]:
async def run_async_single():
    queue = asyncio.Queue()
    producer_task = asyncio.create_task(producer(queue))
    consumer_task = asyncio.create_task(consumer(queue, n_samples=4))
    await consumer_task
    producer_task.cancel()

In [None]:
# This will cause error on Colab/Notebook
# asyncio.run(main())

In [5]:
await run_async_single()

Produced Item 0
Consumed Item 0
Produced Item 1
Consumed Item 1
Produced Item 2
Consumed Item 2
Produced Item 3
Consumed Item 3


## Multiple Consumers

In [14]:
async def run_async_multiple(n_consumers=4):
    q = asyncio.Queue()
    producer_task = asyncio.create_task(producer(q))
    consumers = [asyncio.create_task(consumer(q, 4, cid)) for cid in range(n_consumers)]
    await asyncio.gather(*consumers)
    producer_task.cancel()

In [15]:
await run_async_multiple()

Produced Item 0
Consumer 0 Consumed Item 0
Produced Item 1
Consumer 0 Consumed Item 1
Produced Item 2
Consumer 1 Consumed Item 2
Produced Item 3
Consumer 2 Consumed Item 3
Produced Item 4
Consumer 3 Consumed Item 4
Produced Item 5
Consumer 0 Consumed Item 5
Produced Item 6
Consumer 1 Consumed Item 6
Produced Item 7
Consumer 2 Consumed Item 7
Produced Item 8
Consumer 3 Consumed Item 8
Produced Item 9
Consumer 0 Consumed Item 9
Produced Item 10
Consumer 1 Consumed Item 10
Produced Item 11
Consumer 2 Consumed Item 11
Produced Item 12
Consumer 3 Consumed Item 12
Produced Item 13
Consumer 1 Consumed Item 13
Produced Item 14
Consumer 2 Consumed Item 14
Produced Item 15
Consumer 3 Consumed Item 15


## Multiple Consumers (pub-sub method)

In [10]:
async def producer_pubsub(queues):
    for i in range(1_000_000):
        for q in queues:
            await q.put(f"Item {i}")
        print(f"Produced Item {i}")
        await asyncio.sleep(1)

In [11]:
async def consumer_pubsub(queue, n_samples=10, consumer_id=0):
    for i in range(n_samples):
        item = await queue.get()
        if item is None:
            break
        print(f"Consumer {consumer_id} Consumed {item}")
        queue.task_done()

In [12]:
async def run_async_pubsub(n_consumers=4):
    queues = [asyncio.Queue() for _ in range(n_consumers)]
    producer_task = asyncio.create_task(producer_pubsub(queues))
    consumers = [asyncio.create_task(consumer_pubsub(q, 4, cid)) for cid, q in enumerate(queues)]
    await asyncio.gather(*consumers)

    for q in queues:
        await q.join()

    for c in consumers:
        c.cancel()

    producer_task.cancel()

In [13]:
await run_async_pubsub(n_consumers=2)

Produced Item 0
Consumer 0 Consumed Item 0
Consumer 1 Consumed Item 0
Produced Item 1
Consumer 0 Consumed Item 1
Consumer 1 Consumed Item 1
Produced Item 2
Consumer 0 Consumed Item 2
Consumer 1 Consumed Item 2
Produced Item 3
Consumer 0 Consumed Item 3
Consumer 1 Consumed Item 3
