Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 13 additions & 10 deletions src/async_kernel/caller.py
Original file line number Diff line number Diff line change
Expand Up @@ -1003,7 +1003,7 @@ async def as_completed(
- `Caller.MAX_IDLE_POOL_INSTANCES`
"""
resume = noop
queue: SingleAsyncQueue[Pending[T]] = SingleAsyncQueue()
done: SingleAsyncQueue[Pending[T]] = SingleAsyncQueue()
unfinished: set[Pending[T]] = set()
pen_current = self.current_pending()
if isinstance(items, set | list | tuple):
Expand All @@ -1015,40 +1015,42 @@ async def scheduler():
nonlocal resume
gen = items if isinstance(items, AsyncGenerator) else iter(items)
is_async = isinstance(gen, AsyncGenerator)
while not queue.stopped and (pen := await anext(gen, None) if is_async else next(gen, None)) is not None:
while not done.stopped and (pen := await anext(gen, None) if is_async else next(gen, None)) is not None:
if pen is pen_current:
queue.stop()
done.stop()
msg = "Waiting for the pending in which it is running would result in deadlock!"
raise RuntimeError(msg)
if not isinstance(pen, Pending):
pen = cast("Pending[T]", self.call_soon(await_for, pen))
if not pen.done():
unfinished.add(pen)
pen.add_done_callback(done.append)
if max_concurrent_ and len(unfinished) == max_concurrent_:
event = create_async_event()
resume = event.set
if len(unfinished) == max_concurrent_:
await event
resume = noop
pen.add_done_callback(queue.append)
if not queue.queue and not unfinished:
queue.stop()
else:
done.append(pen)
if not done.queue and not unfinished:
done.stop()

pen_ = self.call_soon(scheduler)
try:
async for pen in queue:
async for pen in done:
unfinished.discard(pen)
yield pen
if pen_.done() and not unfinished and not queue.queue:
if pen_.done() and not unfinished and not done.queue:
break
else:
if max_concurrent_ and len(unfinished) < max_concurrent_:
resume()
pen_.result()
finally:
queue.stop()
done.stop()
for pen in unfinished:
pen.remove_done_callback(queue.append)
pen.remove_done_callback(done.append)
if cancel_unfinished:
pen.cancel("Cancelled by as_completed")
await pen_.cancel_wait(shield=True)
Expand Down Expand Up @@ -1084,6 +1086,7 @@ async def wait(
if isinstance(item, Pending):
done.add(item) if item.done() else pending.add(item)
else:
assert inspect.isawaitable(item)
pending.add(self.call_soon(await_for, item))
if done:
if return_when == "FIRST_COMPLETED":
Expand Down
Loading