Skip to content

Commit

Permalink
pythongh-77714: Provide an async iterator version of as_completed (py…
Browse files Browse the repository at this point in the history
…thonGH-22491)

* as_completed returns object that is both iterator and async iterator
* Existing tests adjusted to test both the old and new style
* New test to ensure iterator can be resumed
* New test to ensure async iterator yields any passed-in Futures as-is

Co-authored-by: Serhiy Storchaka <storchaka@gmail.com>
Co-authored-by: Guido van Rossum <gvanrossum@gmail.com>
  • Loading branch information
3 people committed Apr 1, 2024
1 parent ddf814d commit c741ad3
Show file tree
Hide file tree
Showing 5 changed files with 387 additions and 120 deletions.
61 changes: 48 additions & 13 deletions Doc/library/asyncio-task.rst
Original file line number Diff line number Diff line change
Expand Up @@ -867,19 +867,50 @@ Waiting Primitives

.. function:: as_completed(aws, *, timeout=None)

Run :ref:`awaitable objects <asyncio-awaitables>` in the *aws*
iterable concurrently. Return an iterator of coroutines.
Each coroutine returned can be awaited to get the earliest next
result from the iterable of the remaining awaitables.

Raises :exc:`TimeoutError` if the timeout occurs before
all Futures are done.

Example::

for coro in as_completed(aws):
earliest_result = await coro
# ...
Run :ref:`awaitable objects <asyncio-awaitables>` in the *aws* iterable
concurrently. The returned object can be iterated to obtain the results
of the awaitables as they finish.

The object returned by ``as_completed()`` can be iterated as an
:term:`asynchronous iterator` or a plain :term:`iterator`. When asynchronous
iteration is used, the originally-supplied awaitables are yielded if they
are tasks or futures. This makes it easy to correlate previously-scheduled
tasks with their results. Example::

ipv4_connect = create_task(open_connection("127.0.0.1", 80))
ipv6_connect = create_task(open_connection("::1", 80))
tasks = [ipv4_connect, ipv6_connect]

async for earliest_connect in as_completed(tasks):
# earliest_connect is done. The result can be obtained by
# awaiting it or calling earliest_connect.result()
reader, writer = await earliest_connect

if earliest_connect is ipv6_connect:
print("IPv6 connection established.")
else:
print("IPv4 connection established.")

During asynchronous iteration, implicitly-created tasks will be yielded for
supplied awaitables that aren't tasks or futures.

When used as a plain iterator, each iteration yields a new coroutine that
returns the result or raises the exception of the next completed awaitable.
This pattern is compatible with Python versions older than 3.13::

ipv4_connect = create_task(open_connection("127.0.0.1", 80))
ipv6_connect = create_task(open_connection("::1", 80))
tasks = [ipv4_connect, ipv6_connect]

for next_connect in as_completed(tasks):
# next_connect is not one of the original task objects. It must be
# awaited to obtain the result value or raise the exception of the
# awaitable that finishes next.
reader, writer = await next_connect

A :exc:`TimeoutError` is raised if the timeout occurs before all awaitables
are done. This is raised by the ``async for`` loop during asynchronous
iteration or by the coroutines yielded during plain iteration.

.. versionchanged:: 3.10
Removed the *loop* parameter.
Expand All @@ -891,6 +922,10 @@ Waiting Primitives
.. versionchanged:: 3.12
Added support for generators yielding tasks.

.. versionchanged:: 3.13
The result can now be used as either an :term:`asynchronous iterator`
or as a plain :term:`iterator` (previously it was only a plain iterator).


Running in Threads
==================
Expand Down
7 changes: 7 additions & 0 deletions Doc/whatsnew/3.13.rst
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,13 @@ asyncio
forcefully close an asyncio server.
(Contributed by Pierre Ossman in :gh:`113538`.)

* :func:`asyncio.as_completed` now returns an object that is both an
:term:`asynchronous iterator` and a plain :term:`iterator` of awaitables.
The awaitables yielded by asynchronous iteration include original task or
future objects that were passed in, making it easier to associate results
with the tasks being completed.
(Contributed by Justin Arthur in :gh:`77714`.)

base64
------

Expand Down
152 changes: 108 additions & 44 deletions Lib/asyncio/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from . import events
from . import exceptions
from . import futures
from . import queues
from . import timeouts

# Helper to generate new task names
Expand Down Expand Up @@ -564,62 +565,125 @@ async def _cancel_and_wait(fut):
fut.remove_done_callback(cb)


# This is *not* a @coroutine! It is just an iterator (yielding Futures).
class _AsCompletedIterator:
"""Iterator of awaitables representing tasks of asyncio.as_completed.
As an asynchronous iterator, iteration yields futures as they finish. As a
plain iterator, new coroutines are yielded that will return or raise the
result of the next underlying future to complete.
"""
def __init__(self, aws, timeout):
self._done = queues.Queue()
self._timeout_handle = None

loop = events.get_event_loop()
todo = {ensure_future(aw, loop=loop) for aw in set(aws)}
for f in todo:
f.add_done_callback(self._handle_completion)
if todo and timeout is not None:
self._timeout_handle = (
loop.call_later(timeout, self._handle_timeout)
)
self._todo = todo
self._todo_left = len(todo)

def __aiter__(self):
return self

def __iter__(self):
return self

async def __anext__(self):
if not self._todo_left:
raise StopAsyncIteration
assert self._todo_left > 0
self._todo_left -= 1
return await self._wait_for_one()

def __next__(self):
if not self._todo_left:
raise StopIteration
assert self._todo_left > 0
self._todo_left -= 1
return self._wait_for_one(resolve=True)

def _handle_timeout(self):
for f in self._todo:
f.remove_done_callback(self._handle_completion)
self._done.put_nowait(None) # Sentinel for _wait_for_one().
self._todo.clear() # Can't do todo.remove(f) in the loop.

def _handle_completion(self, f):
if not self._todo:
return # _handle_timeout() was here first.
self._todo.remove(f)
self._done.put_nowait(f)
if not self._todo and self._timeout_handle is not None:
self._timeout_handle.cancel()

async def _wait_for_one(self, resolve=False):
# Wait for the next future to be done and return it unless resolve is
# set, in which case return either the result of the future or raise
# an exception.
f = await self._done.get()
if f is None:
# Dummy value from _handle_timeout().
raise exceptions.TimeoutError
return f.result() if resolve else f


def as_completed(fs, *, timeout=None):
"""Return an iterator whose values are coroutines.
"""Create an iterator of awaitables or their results in completion order.
When waiting for the yielded coroutines you'll get the results (or
exceptions!) of the original Futures (or coroutines), in the order
in which and as soon as they complete.
Run the supplied awaitables concurrently. The returned object can be
iterated to obtain the results of the awaitables as they finish.
This differs from PEP 3148; the proper way to use this is:
The object returned can be iterated as an asynchronous iterator or a plain
iterator. When asynchronous iteration is used, the originally-supplied
awaitables are yielded if they are tasks or futures. This makes it easy to
correlate previously-scheduled tasks with their results:
for f in as_completed(fs):
result = await f # The 'await' may raise.
# Use result.
ipv4_connect = create_task(open_connection("127.0.0.1", 80))
ipv6_connect = create_task(open_connection("::1", 80))
tasks = [ipv4_connect, ipv6_connect]
If a timeout is specified, the 'await' will raise
TimeoutError when the timeout occurs before all Futures are done.
async for earliest_connect in as_completed(tasks):
# earliest_connect is done. The result can be obtained by
# awaiting it or calling earliest_connect.result()
reader, writer = await earliest_connect
Note: The futures 'f' are not necessarily members of fs.
"""
if futures.isfuture(fs) or coroutines.iscoroutine(fs):
raise TypeError(f"expect an iterable of futures, not {type(fs).__name__}")
if earliest_connect is ipv6_connect:
print("IPv6 connection established.")
else:
print("IPv4 connection established.")
from .queues import Queue # Import here to avoid circular import problem.
done = Queue()
During asynchronous iteration, implicitly-created tasks will be yielded for
supplied awaitables that aren't tasks or futures.
loop = events.get_event_loop()
todo = {ensure_future(f, loop=loop) for f in set(fs)}
timeout_handle = None
When used as a plain iterator, each iteration yields a new coroutine that
returns the result or raises the exception of the next completed awaitable.
This pattern is compatible with Python versions older than 3.13:
def _on_timeout():
for f in todo:
f.remove_done_callback(_on_completion)
done.put_nowait(None) # Queue a dummy value for _wait_for_one().
todo.clear() # Can't do todo.remove(f) in the loop.
ipv4_connect = create_task(open_connection("127.0.0.1", 80))
ipv6_connect = create_task(open_connection("::1", 80))
tasks = [ipv4_connect, ipv6_connect]
def _on_completion(f):
if not todo:
return # _on_timeout() was here first.
todo.remove(f)
done.put_nowait(f)
if not todo and timeout_handle is not None:
timeout_handle.cancel()
for next_connect in as_completed(tasks):
# next_connect is not one of the original task objects. It must be
# awaited to obtain the result value or raise the exception of the
# awaitable that finishes next.
reader, writer = await next_connect
async def _wait_for_one():
f = await done.get()
if f is None:
# Dummy value from _on_timeout().
raise exceptions.TimeoutError
return f.result() # May raise f.exception().
A TimeoutError is raised if the timeout occurs before all awaitables are
done. This is raised by the async for loop during asynchronous iteration or
by the coroutines yielded during plain iteration.
"""
if inspect.isawaitable(fs):
raise TypeError(
f"expects an iterable of awaitables, not {type(fs).__name__}"
)

for f in todo:
f.add_done_callback(_on_completion)
if todo and timeout is not None:
timeout_handle = loop.call_later(timeout, _on_timeout)
for _ in range(len(todo)):
yield _wait_for_one()
return _AsCompletedIterator(fs, timeout)


@types.coroutine
Expand Down

0 comments on commit c741ad3

Please sign in to comment.