In [1]:
print("Start")
import asyncio

Start


In [2]:
import logging
from logging.handlers import QueueHandler, QueueListener
import multiprocessing

_LOG_FILE = "./tmp/backeground_process_task.log"

def configure_worker_logging(log_queue: multiprocessing.Queue):
    """Logging configuration inside worker process."""
    root = logging.getLogger()
    root.setLevel(logging.INFO)
    handler = QueueHandler(log_queue)
    root.handlers = [handler]  # Replace handlers with queue handler

In [3]:
cancel_event = asyncio.Event()

In [4]:
from datetime import datetime


async def background_worker(log_queue: multiprocessing.Queue, state_queue: asyncio.Queue):
    """
    Фоновая задача: получает состояния и обрабатывает их.
    Завершается только после получения STOP.
    """
    
    try:
        configure_worker_logging(log_queue)
        logger = logging.getLogger(f"worker")

        logger.info("=== Background worker started ===")
    
        while True:
            state = await state_queue.get()
            
            await asyncio.sleep(2)
            
            logger.info(f"{datetime.now()} BG worker: process state {state}")
            if cancel_event.is_set():
                logger.info("BG worker: STOP received")
                if not state_queue.empty():
                    logger.info("Complete last states")
                else:
                    break

    except Exception as e:
        # логируем в реальной системе
        logger.info("BG worker exception:", e)
    finally:
        logger.info("BG worker exiting")


In [5]:
import asyncio
from typing import AsyncIterable

class AsyncCounter(AsyncIterable[int]):
    """некая задача генерирующая значения

    Args:
        AsyncIterable (_type_): _description_
    """
    def __init__(self, name, n=3,sleep=2):
        self.n = n
        self.name = name
        self.sleep = sleep

    def __aiter__(self):
        self.i = 0
        return self

    async def __anext__(self):
        if self.i >= self.n:
            raise StopAsyncIteration
        await asyncio.sleep(self.sleep)
        print(f"{self.name}: {self.i}")
        self.i += 1
        return self.i

In [6]:
from datetime import datetime
import time
import multiprocessing

cancel_event = asyncio.Event()
async def listen_all(counters):
    state = {c.name: None for c in counters}
    queue = asyncio.Queue()
    done={c.name: False for c in counters}
    
    log_queue = multiprocessing.Queue()
    file_handler = logging.FileHandler(_LOG_FILE, encoding="utf-8")
    file_handler.setLevel(logging.INFO)
    formatter = logging.Formatter(
        "%(asctime)s | %(processName)s | %(name)s | %(levelname)s | %(message)s"
    )
    file_handler.setFormatter(formatter)
    listener = QueueListener(log_queue, file_handler)
    listener.start()
    
    configure_worker_logging(log_queue)
    logger = logging.getLogger("main")
    
    async def consume(counter: AsyncCounter):
        canceled = False
        async for value in counter:
            if cancel_event.is_set():
                canceled = True
                break
            await queue.put((counter.name, value))
        if not canceled:
            print(f"{counter.name} DONE")
            done[counter.name] = True
        else:
            print(f"{counter.name} CANCELED")
    
    # запускаем задачу в одтельном процессе
    mp_queue: multiprocessing.Queue = multiprocessing.Queue()
    worker = multiprocessing.Process(target=background_worker, args=(log_queue,mp_queue,), daemon=True)
    worker.start()

    # стартуем обработчики
    consumer_tasks = [asyncio.create_task(consume(c)) for c in counters]

    # агрегатор
    queue_task = asyncio.create_task(queue.get())
    cancel_task = asyncio.create_task(cancel_event.wait())
    
    first_loop = True
    while True:
        logger.info("Start loop")
        awated_tasks, pending = await asyncio.wait(
            [queue_task, cancel_task],
            return_when=asyncio.FIRST_COMPLETED
        )
        #awated_tasks - список задач которые вернули ответы
        #pending_tasks - список задач которые не успели вернуть ответы
        for awaited_task in awated_tasks:
            if awaited_task is queue_task:
                name, value = awaited_task.result()
                state[name] = value
                logger.info(f"{datetime.now()}: Q name {name}, value: {value}. State {state}")
                
                # отправляем копию state в фоновый процесс НЕ блокируя loop
                # используем asyncio.to_thread, чтобы mp_queue.put не блокировал loop
                state_copy = dict(state)  # сериализуемая копия
                await asyncio.to_thread(mp_queue.put, state_copy)
                
                # создаём новый task для следующего элемента
                queue_task = asyncio.create_task(queue.get())
        
        
        # all(done.values()) - если все задания завершились
        # (cancel_event.is_set() and queue.empty()) - если есть сигнал отмены и нет необработанных записей в очереди. Для корректной работы задания должны тоже завершаться по cancel_event
        if  all(done.values()) or \
            (cancel_event.is_set() and queue.empty()):
            logger.info("waiting for finish backgroud worker")
            await asyncio.to_thread(worker.join)
            logger.info("Wait 3 second before break. Проверяем, что queue не пропадут. Если перед выходом подождать")
            logger.info("Break loop")
            break
        
        if first_loop:
            logger.info("Wait 3 second on first loop. Проверяем, что queue не пропадут")
            time.sleep(3)
            first_loop=False
        logger.info("End loop")
        
    # ожидание всех задач
    await asyncio.gather(*consumer_tasks)
    
    if worker.is_alive():
        try:
            await asyncio.to_thread(worker.join, timeout=5)
        except Exception:
            worker.terminate()
            worker.join()
    listener.stop()
    return state

In [7]:
async def main():
    counters = [
        AsyncCounter("N9_S1", 9,1),
        AsyncCounter("N4_S2", 4,2.5),
        AsyncCounter("N3_S3", 3,3.3),
    ]
    result = await listen_all(counters)
    print("Final:", result)

In [None]:
cancel_event.clear()
task = asyncio.create_task(main())
await asyncio.sleep(4)
print("raise cancel_event")
cancel_event.set()

  self._target(*self._args, **self._kwargs)


N9_S1: 0
N9_S1: 1
N4_S2: 0
N3_S3: 0
raise cancel_event


N9_S1: 2
N9_S1 CANCELED
N4_S2: 1
N4_S2 CANCELED
N3_S3: 1
N3_S3 CANCELED
Final: {'N9_S1': 2, 'N4_S2': 1, 'N3_S3': 1}
