# Обработка параллельных итераторов

## Init

In [1]:
print("fix asyncio")

fix asyncio


In [2]:
import asyncio

class AsyncCounter:
    def __init__(self, name, n=3,sleep=2):
        self.n = n
        self.name = name
        self.sleep = sleep

    def __aiter__(self):
        self.i = 0
        return self

    async def __anext__(self):
        if self.i >= self.n:
            raise StopAsyncIteration
        await asyncio.sleep(self.sleep)
        self.i += 1
        return self.i

In [32]:
from datetime import datetime
import time

cancel_event = asyncio.Event()
async def listen_all(counters):
    state = {c.name: None for c in counters}
    queue = asyncio.Queue()
    done={c.name: False for c in counters}
    
    async def consume(counter: AsyncCounter):
        canceled = False
        async for value in counter:
            if cancel_event.is_set():
                canceled = True
                break
            state[counter.name] = value
            await queue.put((counter.name, value))
        if not canceled:
            print(f"{counter.name} DONE")
            done[counter.name] = True
        else:
            print(f"{counter.name} CANCELED")
        
    # стартуем обработчики
    tasks = [asyncio.create_task(consume(c)) for c in counters]

    # агрегатор
    queue_task = asyncio.create_task(queue.get())
    cancel_task = asyncio.create_task(cancel_event.wait())
    
    first_loop = True
    while True:
        print("Start loop")
        awated_tasks, pending = await asyncio.wait(
            [queue_task, cancel_task],
            return_when=asyncio.FIRST_COMPLETED
        )
        #awated_tasks - список задач которые вернули ответы
        #pending_tasks - список задач которые не успели вернуть ответы
        for awaited_task in awated_tasks:
            if awaited_task is queue_task:
                name, value = awaited_task.result()
                state[name] = value
                print(f"{datetime.now()}: Q name {name}, value: {value}. State {state}")
                # создаём новый task для следующего элемента
                queue_task = asyncio.create_task(queue.get())
        
        
        # all(done.values()) - если все задания завершились
        # (cancel_event.is_set() and queue.empty()) - если есть сигнал отмены и нет необработанных записей в очереди. Для корректной работы задания должны тоже завершаться по cancel_event
        if  all(done.values()) or \
            (cancel_event.is_set() and queue.empty()):
            print("Wait 3 second before break. Проверяем, что queue не пропадут. Если перед выходом подождать")
            print("Break loop")
            break
        
        if first_loop:
            print("Wait 3 second on first loop. Проверяем, что queue не пропадут")
            time.sleep(3)
            first_loop=False
        print("End loop")
        
    # ожидание всех задач
    await asyncio.gather(*tasks)
    return state

In [33]:
async def main():
    counters = [
        AsyncCounter("N9_S1", 9,1),
        AsyncCounter("N4_S2", 4,2.5),
        AsyncCounter("N3_S3", 3,3.3),
    ]
    result = await listen_all(counters)
    print("Final:", result)

## Простой запуск

In [34]:
await main()

Start loop
2025-11-22 13:28:15.675606: Q name N9_S1, value: 1. State {'N9_S1': 1, 'N4_S2': None, 'N3_S3': None}
Wait 3 second on first loop. Проверяем, что queue не пропадут
End loop
Start loop
2025-11-22 13:28:18.676076: Q name N9_S1, value: 2. State {'N9_S1': 2, 'N4_S2': 1, 'N3_S3': 1}
End loop
Start loop
2025-11-22 13:28:18.676181: Q name N4_S2, value: 1. State {'N9_S1': 2, 'N4_S2': 1, 'N3_S3': 1}
End loop
Start loop
2025-11-22 13:28:18.676426: Q name N3_S3, value: 1. State {'N9_S1': 2, 'N4_S2': 1, 'N3_S3': 1}
End loop
Start loop
2025-11-22 13:28:19.677661: Q name N9_S1, value: 3. State {'N9_S1': 3, 'N4_S2': 1, 'N3_S3': 1}
End loop
Start loop
2025-11-22 13:28:20.679030: Q name N9_S1, value: 4. State {'N9_S1': 4, 'N4_S2': 1, 'N3_S3': 1}
End loop
Start loop
2025-11-22 13:28:21.176804: Q name N4_S2, value: 2. State {'N9_S1': 4, 'N4_S2': 2, 'N3_S3': 1}
End loop
Start loop
2025-11-22 13:28:21.680640: Q name N9_S1, value: 5. State {'N9_S1': 5, 'N4_S2': 2, 'N3_S3': 1}
End loop
Start loop
2

## Запуск в фоновой задаче с отменой через время

In [9]:
task = asyncio.create_task(main())

2025-11-22 13:06:17.983627 Updated state: {'N9_S1': 1, 'N4_S2': None, 'N3_S3': None}
2025-11-22 13:06:18.984944 Updated state: {'N9_S1': 2, 'N4_S2': None, 'N3_S3': None}
2025-11-22 13:06:19.483744 Updated state: {'N9_S1': 2, 'N4_S2': 1, 'N3_S3': None}
2025-11-22 13:06:19.986493 Updated state: {'N9_S1': 3, 'N4_S2': 1, 'N3_S3': None}
2025-11-22 13:06:20.284076 Updated state: {'N9_S1': 3, 'N4_S2': 1, 'N3_S3': 1}
2025-11-22 13:06:20.989102 Updated state: {'N9_S1': 4, 'N4_S2': 1, 'N3_S3': 1}
2025-11-22 13:06:21.985417 Updated state: {'N9_S1': 4, 'N4_S2': 2, 'N3_S3': 1}
2025-11-22 13:06:21.989734 Updated state: {'N9_S1': 5, 'N4_S2': 2, 'N3_S3': 1}
2025-11-22 13:06:22.991392 Updated state: {'N9_S1': 6, 'N4_S2': 2, 'N3_S3': 1}
2025-11-22 13:06:23.585333 Updated state: {'N9_S1': 6, 'N4_S2': 2, 'N3_S3': 2}
2025-11-22 13:06:23.992063 Updated state: {'N9_S1': 7, 'N4_S2': 2, 'N3_S3': 2}
2025-11-22 13:06:24.486894 Updated state: {'N9_S1': 7, 'N4_S2': 3, 'N3_S3': 2}
2025-11-22 13:06:24.992663 Updated

In [10]:
await asyncio.sleep(4)
cancel_event.set()

In [36]:
cancel_event.clear()
task = asyncio.create_task(main())
await asyncio.sleep(4)
print("raise cancel_event")
cancel_event.set()

Start loop
2025-11-22 13:29:28.070309: Q name N9_S1, value: 1. State {'N9_S1': 1, 'N4_S2': None, 'N3_S3': None}
Wait 3 second on first loop. Проверяем, что queue не пропадут
End loop
Start loop
raise cancel_event


2025-11-22 13:29:31.072650: Q name N9_S1, value: 2. State {'N9_S1': 2, 'N4_S2': 1, 'N3_S3': 1}
End loop
Start loop
2025-11-22 13:29:31.072729: Q name N4_S2, value: 1. State {'N9_S1': 2, 'N4_S2': 1, 'N3_S3': 1}
End loop
Start loop
2025-11-22 13:29:31.072770: Q name N3_S3, value: 1. State {'N9_S1': 2, 'N4_S2': 1, 'N3_S3': 1}
Wait 3 second before break. Проверяем, что queue не пропадут. Если перед выходом подождать
Break loop
N9_S1 CANCELED
N4_S2 CANCELED
N3_S3 CANCELED
Final: {'N9_S1': 2, 'N4_S2': 1, 'N3_S3': 1}
