From d2ab11fd4aeba6c1de469a81a2033d7e70c5e2e7 Mon Sep 17 00:00:00 2001 From: Alan Fleming <> Date: Sat, 25 Apr 2026 11:12:36 +1000 Subject: [PATCH] Refactor Caller.as_completed. --- src/async_kernel/caller.py | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/src/async_kernel/caller.py b/src/async_kernel/caller.py index cd98d3d7..a8fc0901 100644 --- a/src/async_kernel/caller.py +++ b/src/async_kernel/caller.py @@ -1003,7 +1003,7 @@ async def as_completed( - `Caller.MAX_IDLE_POOL_INSTANCES` """ resume = noop - queue: SingleAsyncQueue[Pending[T]] = SingleAsyncQueue() + done: SingleAsyncQueue[Pending[T]] = SingleAsyncQueue() unfinished: set[Pending[T]] = set() pen_current = self.current_pending() if isinstance(items, set | list | tuple): @@ -1015,40 +1015,42 @@ async def scheduler(): nonlocal resume gen = items if isinstance(items, AsyncGenerator) else iter(items) is_async = isinstance(gen, AsyncGenerator) - while not queue.stopped and (pen := await anext(gen, None) if is_async else next(gen, None)) is not None: + while not done.stopped and (pen := await anext(gen, None) if is_async else next(gen, None)) is not None: if pen is pen_current: - queue.stop() + done.stop() msg = "Waiting for the pending in which it is running would result in deadlock!" raise RuntimeError(msg) if not isinstance(pen, Pending): pen = cast("Pending[T]", self.call_soon(await_for, pen)) if not pen.done(): unfinished.add(pen) + pen.add_done_callback(done.append) if max_concurrent_ and len(unfinished) == max_concurrent_: event = create_async_event() resume = event.set if len(unfinished) == max_concurrent_: await event resume = noop - pen.add_done_callback(queue.append) - if not queue.queue and not unfinished: - queue.stop() + else: + done.append(pen) + if not done.queue and not unfinished: + done.stop() pen_ = self.call_soon(scheduler) try: - async for pen in queue: + async for pen in done: unfinished.discard(pen) yield pen - if pen_.done() and not unfinished and not queue.queue: + if pen_.done() and not unfinished and not done.queue: break else: if max_concurrent_ and len(unfinished) < max_concurrent_: resume() pen_.result() finally: - queue.stop() + done.stop() for pen in unfinished: - pen.remove_done_callback(queue.append) + pen.remove_done_callback(done.append) if cancel_unfinished: pen.cancel("Cancelled by as_completed") await pen_.cancel_wait(shield=True) @@ -1084,6 +1086,7 @@ async def wait( if isinstance(item, Pending): done.add(item) if item.done() else pending.add(item) else: + assert inspect.isawaitable(item) pending.add(self.call_soon(await_for, item)) if done: if return_when == "FIRST_COMPLETED":