# Практическое занятие № 11

# Синхронное и асинхронное программирование

В программировании есть два противоположных стиля написания кода - синхронное и асинхронное программирование.

**Синхронные программы** используют блокирующие операции ввода-вывода, в которых каждая операция должна выполняться до выполнения следующей. 

**Асинхронное программирование** основано на неблокирующем протоколе ввода-вывода (I/O), т. е. асинхронная программа не выполняет операции в иерархическом или последовательном порядке. В результате получается распараллеливание которое означает, что асинхронная программа может обрабатывать несколько запросов одновременно и независимо. Если один запрос завершается неудачно, то это не влияет на другой запрос и программа может перейти к другой задаче до завершения последней. 

## Модуль asyncio

**Asyncio** — это модуль в стандартной библиотеке Python, который предоставляет инфраструктуру для написания одновременного кода с использованием асинхронных операций ввода-вывода.

Модуль **asyncio** использует однопоточный однопроцессный подход, в котором части приложения взаимодействуют для явного переключения задач в оптимальное время. Чаще всего это переключение контекста происходит, когда программа не блокирует ожидание чтения или записи данных (например http или socket протоколы). Этот модуль также включает поддержку кода планирования для запуска задачи в определенное время в будущем, чтобы одна сопрограмма могла ждать завершения другой для обработки системных сигналов, а также для распознавания других событий, которые могут быть причиной изменения приложением того, над чем оно работает.

Примечание: Не пытайтесь использовать модуль asyncio для кода, который использует вызовы, блокирующие поток программы, например файловый ввод/вывод. Из-за блокировок код будет выполняться синхронно. Для таких задач есть альтернативные встроенные модули, такие как threading и multiprocessing, у которых практически одинаковый API. Если вам все же необходим запуск блокирующих операций из асинхронного кода, то воспользуйтесь модулем concurrent.futures, что бы запустить такие операции в отдельном потоке или на другом ядре процессора.

Модуль asyncio предоставляет высокоуровневый API.

`Высокоуровневый API` позволяет:

* одновременно запускать сопрограммы Python и полностью контролировать их выполнение;
* выполнять сетевой ввод-вывод и IPC;
* контролировать ход подпроцессов;
* распределять задачи по очередям;

Помимо, высокоуровневого API есть низкоуровневый. Низкоуровневый API в основном предназначен для разработчиков библиотек и фреймворков, которым нужен более тонкий контроль над поведением цикла событий.

`Низкоуровневый API` обеспечивает:

* создание и управление циклами событий, которые предоставляют асинхронный API для работы в сети, запуска подпроцессов, обработки сигналов ОС и т. д;
* реализацию эффективных протоколов с использованием транспортов;
( обеспечения взаимодействия объектов future на основе обратного вызова и высокоуровнего кода, основанного на синтаксисе async/await.

In [1]:
import asyncio

In [2]:
async def fn():
  print('hello')
  await asyncio.sleep(1)
  print('world')

await fn()

hello
world


**Сопрограммы или coroutines**, объявленные с синтаксисом async/await, являются предпочтительным способом написания приложений, с использованием модуля asyncio.

Механизмы запуска сопрограммы:

1. Основной цикл событий (точку входа верхнего уровня) можно запустить при помощи функция asyncio.run(). 

2. Запуск сопрограмм, которые должны или могут ждать каких-то результатов (например, ответа сервера с результатами запроса) запускаются оператором await.

Рассмотрим пример кода, который напечатает "hello" после ожидания в течение 1 секунды, а затем напечатает "world" после ожидания еще 2-х секунд:

In [3]:
import asyncio, time

async def say_after(delay, what):
    """Асинхронная функция (сопрограмма)"""
    await asyncio.sleep(delay)
    print(what)

async def main():
    """Точка входа в асинхронную программу"""
    print(f"started at {time.strftime('%X')}")

    # запуск сопрограммы `say_after()` происходит при  
    # помощи оператора `await`, т. к. в самой сопрограмме  
    # есть объект ожидания - неблокирующая функция 
    # `asyncio.sleep()`, которая эмитирует ожидание ответа сервера
    await say_after(1, 'hello')
    await say_after(2, 'world')

    print(f"finished at {time.strftime('%X')}")

# запуск основного цикла событий 
await main()

started at 21:58:24
hello
world
finished at 21:58:27


3. запуск сопрограмм можно осуществлять через создание и планирование задач при помощи функции asyncio.create_task(). Объекты задач Task также являются объектами ожидания результата, т.к. планируют запуск сопрограмм в будущем, как только это станет возможным. Следовательно задачи, то же запускаем оператором await

Изменим предыдущий пример и одновременно запустим две сопрограммы say_after():

In [4]:
async def say_after(delay, what):
    """Асинхронная функция (сопрограмма)"""
    await asyncio.sleep(delay)
    print(what)

async def main():
    """Точка входа в асинхронную программу"""

    # создаем задачи `task1` и `task2`
    task1 = asyncio.create_task(say_after(1, 'hello'))
    task2 = asyncio.create_task(say_after(2, 'world'))

    print(f"started at {time.strftime('%X')}")

    # Ждем, пока обе задачи будут выполнены 
    # (должно занять около 2 секунд.)
    await task1
    await task2
    print(f"finished at {time.strftime('%X')}")

await main()

started at 21:58:27
hello
world
finished at 21:58:29


4. Класс asyncio.TaskGroup() (добавлен в Python 3.11) представляет собой более современную альтернативу asyncio.create_task(). Используя этот API, последний пример становится таким:

In [5]:
async def main():
  async with asyncio.TaskGroup() as tg:
      task1 = tg.create_task(
          say_after(1, 'hello'))

      task2 = tg.create_task(
          say_after(2, 'world'))

      print(f"started at {time.strftime('%X')}")

  # The wait is implicit when the context manager exits.

print(f"finished at {time.strftime('%X')}")

finished at 21:58:29


##  Awaitable Объекты

В асинхронном программировании есть так называемые объекты awaitable. Это объект, который, в какой-то момент времени может ничего полезного не делать, а заниматься только ожиданием каких-то результатов от сторонних сервисов (например ответа сервера с результатами на свой запрос). Такие объекты всегда запускаются с оператором await.

Практически все API-интерфейсы модуля asyncio предназначены для приема awaitable объектов - объектов ждущих каких-то результатов или команд на продолжение или приостановку работы от основного цикла событий.

Есть три основных типа объектов ожидания awaitable, которые можно запускать оператором await в асинхронном коде:

* Сопрограмма coroutine - функция, определенная с оператором async.
* Задача task - это сопрограмма, которая планируются для выполнения в будущем, как только это станет возможным.
* Объекты будущих результатов futures - объект уже запущенной сопрограммы с еще неполученными/промежуточными результатами.

### Сопрограммы/сoroutines

Сопрограммы в Python являются объектами, которые могут ждать результатов от сторонних сервисов или своей очереди выполнения, следовательно, их можно также использовать с оператором wait внутри других сопрограмм:

In [6]:
async def nested():
    return 42

async def main():
    # Ничего не происходит, если мы просто вызываем "nested()".
    # Сопрограмма `nested()` создается, но не будет выполняться,
    # т.к. в таком виде она заблокирует цикл событий, что недопустимо
    nested()

    # что бы асинхронная функция `nested()` заработала
    # необходимо заставить ее ждать своего выполнения 
    # при помощи оператора `await`
    print(await nested())

await main()

42


  nested()


### Задачи Task

Задачи (asyncio.Task) используются для одновременного планирования запуска нескольких сопрограмм. Когда сопрограмма оборачивается в задачу (передается в функцию asyncio.create_task()), то сопрограмма будет автоматически запускаться в ближайшее время, как только будет это возможным:

In [7]:
async def nested():
    return 42

async def main():
    # Запланируем запуск 'nested()' в одновременно с 'main()'.
    task = asyncio.create_task(nested())

    # объект 'task' может теперь использоваться, для отмены
    # выполнения 'nested()' или ожидания ее выполнения:
    await task

await main()

### Futures - объекты с будущими результатами

Future объект - инкапсулирует асинхронное выполнение вызываемого объекта и представляет специальный низкоуровневый объект, который хранит промежуточное состояние запущенной задачи (когда она что-то ожидает) и в будущем, будет представлять конечный результат асинхронной операции. Этот объект может хранить информацию о том, что задача ещё не выполнена или не до конца выполнена, или может хранить уже полученный результат, или исключение, полученное во время выполнения кода.

Когда происходит ожидание объекта Future , это означает, что сопрограмма будет ждать, пока Future не будет разрешен в каком-то другом месте.

Futures объекты позволяют использовать код на основе обратного вызова (который сообщает о готовности объекта Future) совместно с синтаксисом async/await, по этому они необходимы в асинхронном программировании.

Нет необходимости создавать объекты Future на уровне приложения. В основном, эти объекты создаются автоматически, при вызове функций или методов, предоставляемыми асинхронными API модулей:

In [8]:
async def main():
    await function_that_returns_a_future_object()

    # это тоже правильно:
    await asyncio.gather(
        function_that_returns_a_future_object(),
        some_python_coroutine()
    )

# Функция run() 

Функция asyncio.run() запускает цикл событий в асинхронном коде
Синтаксис:
import asyncio

asyncio.run(coro, *, debug=None, loop_factory=None)

Параметры:

coro - точка входа в асинхронную программу,

debug=False - включение отладчика,

loop_factory=None - используется для переопределения создания цикла событий

Функция run() модуля asyncio выполнит сопрограмму coro и вернет результат ее работы.

Функция asyncio.run() создает новый цикл событий, запускает переданную сопрограмму coro и в конце закрывает цикл событий. Если в программе используются асинхронные генераторы или пул потоков, то функция завершит их работу.

Функция asyncio.run() не может быть вызвана, когда другой цикл событий asyncio выполняется в том же потоке.

Если аргумент debug=True, то цикл событий будет выполняться в режиме отладки. Значение False явно отключает режим отладки. None используется для соблюдения глобальных настроек режима отладки.

В Python 3.12: добавлен аргумент loop_factory.

Если аргумент loop_factory не равен None, то он используется для создания нового цикла событий, в противном случае используется asyncio.new_event_loop(). В обязанности loop_factory входит установка созданного цикла в качестве текущего. Рекомендуется использовать loop_factory для настройки цикла обработки событий вместо использования политик.

Функция asyncio.run() всегда создает новый цикл событий и в конце его закрывает. Её следует использовать в качестве основной точки входа для программ с использованием модуля asyncio, а в идеале функцию asyncio.run() следует вызывать только один раз.

In [9]:
async def get_name():
    TARGETS = [
        ('8.8.8.8', 443),
        ('1.1.1.1', 443),
        ('8.8.4.4', 443),
        ('77.88.8.8', 443),
        ('77.88.8.88', 443),
        ('77.88.8.7', 443)
    ]
    
    # получаем текущий цикл событий
    loop = asyncio.get_event_loop()
    
    for target in TARGETS:
        # запускаем асинхронный вариант 
        # функции socket.getnameinfo()
        host, port = await loop.getnameinfo(target)
        # выводим результаты
        print(f'{target[0]:15}: {host}')

await get_name()

8.8.8.8        : dns.google
1.1.1.1        : one.one.one.one
8.8.4.4        : dns.google
77.88.8.8      : dns.yandex.ru
77.88.8.88     : safe.dns.yandex.ru
77.88.8.7      : family.dns.yandex.ru


## Создание и выполнение задач
Задачи — это способ запуска сопрограмм параллельно. Для создания задач используйте функцию asyncio.create_task(). Вот пример:

In [10]:
async def foo():
    print("Start foo")
    await asyncio.sleep(1)
    print("End foo")
 
async def main():
    task = asyncio.create_task(foo())
    await task
 
await main()

Start foo
End foo


In [11]:
async def msg(text):
    # эмитируем короткую 
    # задержку в выполнении
    await asyncio.sleep(0.1)
    print(text)

async def long_operation():
    # эмитируем долгую 
    # задержку в выполнении
    print('long_operation started')
    await asyncio.sleep(3)
    print('long_operation complete')

# основной цикл программы
async def main():
    # легкая сопрограмма
    await msg('1 msg complete')

    # Здесь запустим `long_operation()`, но ждать, пока она 
    # выполнится не хотим, т.к. необходимо получить второе 
    # сообщение как можно раньше. Для этого создаем для нее задачу... 
    task = asyncio.create_task(long_operation())
    
    # легкая сопрограмма
    await msg('2 msg complete')

    # Теперь можно дождаться завершения 
    # задачи или отменить ее
    await task

await main()

1 msg complete
long_operation started
2 msg complete
long_operation complete


## Работа с несколькими задачами

Asyncio позволяет выполнять несколько задач одновременно, используя функцию asyncio.gather(). Вот пример с несколькими асинхронными функциями:

In [12]:
async def foo():
    print("Start foo")
    await asyncio.sleep(1)
    print("End foo")
 
async def bar():
    print("Start bar")
    await asyncio.sleep(2)
    print("End bar")
 
async def main():
    await asyncio.gather(foo(), bar())
 
await main()

Start foo
Start bar
End foo
End bar


### Менеджер контекста для вызовы нескольких асинхронных функций

Синтаксис:
import asyncio

with asyncio.Runner(*, debug=None, loop_factory=None) as runner:
    runner.run(main())

Параметры:

debug=None - режиме отладки,

loop_factory=None - переопределяет создания цикла событий.

Класс Runner() модуля asyncio представляет собой менеджер контекста, упрощающий вызовы нескольких асинхронных функций в одном контексте.

Иногда в одном и том же цикле событий и contextvars.Context следует вызывать несколько асинхронных функций верхнего уровня.

Если для аргумента debug установлено значение True, то цикл обработки событий будет выполняться в режиме отладки. False явно отключает режим отладки. None используется для соблюдения глобальных настроек режима отладки.

Аргумент loop_factory можно использовать для переопределения создания цикла. В обязанности loop_factory входит установка созданного цикла в качестве текущего. По умолчанию (если loop_factory имеет значение None) для создания цикла событий используется asyncio.new_event_loop(), а как текущий цикл событий устанавливается с помощью функции asyncio.set_event_loop().

По сути, пример asyncio.run() можно переписать с использованием asyncio.Runner():

'''
async def main():
    await asyncio.sleep(1)
    print('hello')

with asyncio.Runner() as runner:
    await main()
    '''

## Методы менеджера контекста Runner

Runner.run(coro, *, context=None):

Метод Runner.run() запускает сопрограмму coro во встроенном цикле. Возвращает результат сопрограммы или вызывает ее исключение.

Необязательный ключевой аргумент context, позволяет указать пользовательский contextvars.Context для запуска сопрограммы. Если context=None, то по умолчанию используется контекст Runner.

Эта функция не может быть вызвана, когда в том же потоке выполняется другой цикл обработки событий asyncio.

Runner.close():

Метод Runner.close() закрывает менеджера контекста Runner.

Завершает работу асинхронных генераторов, завершает работу исполнителя по умолчанию, закрывает цикл обработки событий и освобождает встроенные переменные contextvars.Context.

Runner.get_loop():

Метод Runner.get_loop() Вернуть цикл обработки событий, связанный с экземпляром бегуна.

## Асинхронный менеджер TaskGroup() 

Синтаксис:

import asyncio

async with asyncio.TaskGroup() as tg:
    task1 = tg.create_task(some_coro(...))
    ...

Класс TaskGroup() (добавлен в Python 3.11.) модуля asyncio представляет собой асинхронный менеджер контекста, содержащий группу задач.

Группы задач asyncio.TaskGroup() сочетают в себе API создания задач с удобным и надежным способом ожидания завершения всех задач в группе.

Отдельные задачи можно добавлять в группу с помощью метода TaskGroup.create_task(). Все задачи ожидаются при выходе из менеджера контекста.

Метод TaskGroup.create_task() создает задачу в этой группе задач. Принимаемые аргументы и поведение этого метода совпадает с описанием asyncio.create_task().

Пример:

In [13]:

async def main():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(some_coro(...))
        task2 = tg.create_task(another_coro(...))
    print("Обе задачи уже выполнены.")

Оператор async with будет ожидать завершения всех задач в группе. Во время ожидания, в группу, все еще могут быть добавлены новые задачи (например, путем передачи экземпляра tg в одну из сопрограмм и вызова tg.create_task() в этой сопрограмме). После завершения последней задачи и выхода из блока async with в группу нельзя добавлять новые задачи.

При первом сбое любой из задач, принадлежащих группе tg, с исключением asyncio.CancelledError, оставшиеся задачи в группе отменяются. После этого в группу нельзя будет добавить больше задач. В этот момент, если тело асинхронного оператора with все еще активно (т. е. __aexit__() еще не вызывалась), то задача, непосредственно содержащая асинхронный оператор with, также отменяется. Полученный asyncio.CancelledError прервет ожидание, но не выйдет из содержащего оператора async with.

После завершения всех задач, если какие-либо задачи завершились сбоем с исключением, отличным от asyncio.CancelledError, эти исключения объединяются в ExceptionGroup или BaseExceptionGroup (соответственно; смотрите их документацию), которая затем вызывается.

Два базовых исключения обрабатываются особым образом: если какая-либо задача завершается с ошибкой с помощью KeyboardInterrupt или SystemExit, то группа задач по-прежнему отменяет оставшиеся задачи и ожидает их, но затем повторно вызывается первоначальная KeyboardInterrupt или SystemExit вместо ExceptionGroup или BaseExceptionGroup.

## Функция sleep() модуля asyncio

Приостанавливает асинхронную задачу на определенное время
Синтаксис:
import asyncio

await asyncio.sleep(delay, result=None, *, loop=None)

Параметры:

delay - задержка выполнения задачи

result=None - результат, который возвращается вызывающей стороне

Функция sleep() модуля asyncio приостанавливает текущую задачу на delay секунд. Применяется внутри сопрограмм с оператором await.

Если аргумент result установлен, то он возвращается вызывающей стороне после завершения сопрограммы.

Функция asyncio.sleep() всегда приостанавливает текущую задачу, позволяя запускать другие задачи.

In [14]:
import asyncio
import datetime

async def display_date():
    loop = asyncio.get_running_loop()
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        await asyncio.sleep(1)

await display_date()

2024-11-12 21:58:35.711194
2024-11-12 21:58:36.716642
2024-11-12 21:58:37.731199
2024-11-12 21:58:38.739226
2024-11-12 21:58:39.749080


## Функция gather() модуля asyncio

Синтаксис:

import asyncio

await asyncio.gather(*aws, loop=None, return_exceptions=False)

Параметры:

*aws - последовательность объектов ожидания,

loop=None - параметр цикла (устарел и будет удален в Python 3.10),

return_exceptions=False - обработка исключений.

Функция gather() модуля asyncio одновременно запускает объекты awaitable, переданные в функцию как последовательность *aws.

Функция asyncio.gather() представляет то же объект ожидания awaitable и запускается оператором await.

Если какой-либо объект awaitable в последовательности *aws является сопрограммой, то она автоматически назначается как задача asyncio.Task.

Если все awaitable объекты завершены успешно, то результатом является совокупный список возвращенных значений этих объектов. Порядок значений результатов соответствует порядку переданных объектов в последовательность aws.

Если аргумент return_exceptions=False (по умолчанию), то первое появившееся исключение, немедленно распространяется на ту задачу, в которой оно возникло в момент ожидания asyncio.gather(). При этом другие объекты в последовательности aws не будут отменены и продолжат выполнение.

Если return_exceptions=True, то исключения обрабатываются так же, как успешные результаты и передаются в совокупный список результатов.

Если выполнение asyncio.gather() отменяется, то все отправленные в функцию объекты, которые еще не завершены, также отменяются.

Если какая-либо задача Task или объект Future отменяется в последовательности aws, то она обрабатывается так, как если бы она вызвала исключение asyncio.CancelledError - в этом случае вызов самой функции asyncio.gather() не отменяется. Это делается для того, чтобы отмена одной отправленной Task/Future не привела к отмене других Task/Future.

ВАЖНО!!!

Запускаемые сопрограммы (асинхронные функции) не должны содержать внутри себя операции, блокирующие ход выполнения программы! Другими словами, нельзя какую либо встроенную синхронную функцию (типа socket.getnameinfo()) обернуть в асинхронную функцию (async def ...) и думать, что этот код будет работать асинхронно. НЕТ. Такая сопрограмма так же будет блокировать остальной код.

Пример вызова нескольких задач одновременно:


In [15]:
async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        print(f"Task {name}: Compute factorial({i})...")
        await asyncio.sleep(1)
        f *= i
    print(f"Task {name}: factorial({number}) = {f}")

async def main():
    args = [('A', 2), ('B', 3), ('C', 4)]
    tasks=[]
    for arg in args:
        # создаем задачи
        task = factorial(*arg)
        # складываем задачи в список
        tasks.append(task)
        
    # планируем одновременные вызовы
    L = await asyncio.gather(*tasks)
    print(L)

results = await main()

# Expected output:
#
#     Task A: Compute factorial(2)...
#     Task B: Compute factorial(2)...
#     Task C: Compute factorial(2)...
#     Task A: factorial(2) = 2
#     Task B: Compute factorial(3)...
#     Task C: Compute factorial(3)...
#     Task B: factorial(3) = 6
#     Task C: Compute factorial(4)...
#     Task C: factorial(4) = 24

Task A: Compute factorial(2)...
Task B: Compute factorial(2)...
Task C: Compute factorial(2)...
Task A: factorial(2) = 2
Task B: Compute factorial(3)...
Task C: Compute factorial(3)...
Task B: factorial(3) = 6
Task C: Compute factorial(4)...
Task C: factorial(4) = 24
[None, None, None]


### Пример объединения нескольких сопрограмм в группы вызовов.

В примере, группу спланированных задач можно отменить, вызвав например group2.cancel() или можно отменить сразу все задачи, вызвав all_result.cancel().

In [16]:
import asyncio, random
from pprint import pprint

async def worker(tag):
    print('Run:', tag)
    await asyncio.sleep(random.uniform(1, 3))
    print('Done:', tag)
    return tag


async def main():
    # объединяем сопрограммы в группы, для планирования запуска
    group1 = asyncio.gather(*[worker(f'Группа 1.{i}') for i in range(1, 6)])
    group2 = asyncio.gather(*[worker(f'Группа 2.{i}') for i in range(1, 4)])
    group3 = asyncio.gather(*[worker(f'Группа 3.{i}') for i in range(1, 5)])
    # асинхронно запускаем созданные группы оператором `await` 
    all_result = await asyncio.gather(group1, group2, group3)
    # смотрим результаты
    pprint(all_result)

await main()

Run: Группа 1.1
Run: Группа 1.2
Run: Группа 1.3
Run: Группа 1.4
Run: Группа 1.5
Run: Группа 2.1
Run: Группа 2.2
Run: Группа 2.3
Run: Группа 3.1
Run: Группа 3.2
Run: Группа 3.3
Run: Группа 3.4
Done: Группа 3.4
Done: Группа 3.1
Done: Группа 1.5
Done: Группа 1.4
Done: Группа 1.2
Done: Группа 3.3
Done: Группа 2.3
Done: Группа 2.1
Done: Группа 2.2
Done: Группа 3.2
Done: Группа 1.1
Done: Группа 1.3
[['Группа 1.1', 'Группа 1.2', 'Группа 1.3', 'Группа 1.4', 'Группа 1.5'],
 ['Группа 2.1', 'Группа 2.2', 'Группа 2.3'],
 ['Группа 3.1', 'Группа 3.2', 'Группа 3.3', 'Группа 3.4']]


## Функция as_completed() модуля asyncio

Синтаксис:
import asyncio

asyncio.as_completed(aws, *, loop=None, timeout=None)

Параметры:

aws - множество объектов ожидания

timeout=None - максимальным количеством секунд ожидания

Функция as_completed() модуля asyncio одновременно запускает объекты awaitable, переданные в итерации aws, ждет их выполнения и по готовности любого начинает возвращать готовые объекты Future.

Другими словами, функция asyncio.as_completed() запускает и ждет выполнения переданных в нее задач/awaitable-объектов, и как только появляются результаты у какой-нибудь задачи, в реальном времени, начинает возвращать их в итераторе. Результаты извлекаются, запуском каждого объекта итератора при помощи оператора await.

Например:

for coro in as_completed(aws):
    earliest_result = await coro
    # ...
Аргумент timeout (float или int) в секундах, если он указан, можно использовать для управления максимальным временем ожидания, перед тем, как остановить не выполненные задачи, оставшихся в итераторе aws.

Пример получения результатов из сопрограмм в реальном времени:

In [17]:
import asyncio, time

async def worker(delay, name):
    # эмитируем ожидание какого-то ответа 
    # ответа от стороннего сервиса
    await asyncio.sleep(delay)
    # возвращаем результат
    return name, delay

async def main():
    # список аргументов для `worker()``
    targets = [(0.5, 'one'), (0.8, 'two'), 
          (0.2, 'three'), (1, 'four'), (0.5, 'five')]
    
    # создаем задачи
    tasks = [asyncio.create_task(worker(*target)) for target in targets]
    
    # передаем задачи в функцию `as_completed()`
    for future in asyncio.as_completed(tasks):
        # получаем результаты по готовности 
        res = await future
        # выводим на печать
        print(f'Задача: {res[0]}; задержка: {res[1]}')
        
start = time.time()
# запускаем цикл событий
await main()

total = time.time() - start
print(f'Общее время выполнения: {total:.3f} сек.')

Задача: three; задержка: 0.2
Задача: one; задержка: 0.5
Задача: five; задержка: 0.5
Задача: two; задержка: 0.8
Задача: four; задержка: 1
Общее время выполнения: 1.011 сек.


Попробуйте в функцию передать значение таймаута, сначала 0.8 секунды - asyncio.as_completed(tasks, dalay=0.8), а потом 1.1 секунды и проследить поведение программы.

## Функция wait() модуля asyncio

Приостановка выполнение задач по таймауту или условию

Синтаксис:
import asyncio

done, pending = asyncio.wait(aws, *, loop=None, timeout=None, 
                             return_when=ALL_COMPLETED)
Параметры:

aws - множество объектов ожидания,

timeout=None - максимальным количеством секунд ожидания,

return_when=ALL_COMPLETED - когда функция должна возвратить результат.

Функция wait() модуля asyncio одновременно запускает awaitable-объекты (преимущественно задачи Task) из переданного множества aws и производит блокировку выполнения программы до выполнения условия, указанного в аргументе return_when.

Возвращает кортеж из двух множеств Task/Future в виде (done, pending).

Псевдокод:
done, pending = await asyncio.wait(aws)

Аргумент timeout (float или int) в секундах, если он указан, можно использовать для управления временем ожидания результатов задач, из множества aws, прежде чем приостановить не выполненные задачи.

Обратите внимание, что функция asyncio.wait() не вызывает исключение asyncio.TimeoutError. Задачи, которые не успели выполнится по истечении timeout, приостанавливают свое выполнение и возвращаются во втором множестве pending. В последствии, можно возобновить выполнение приостановленных задач.

Аргумент return_when указывает, когда функция asyncio.wait() должна возвратить результат. Это должна быть одна из следующих констант:

* asyncio.FIRST_COMPLETED	Функция возвратит результат, когда любой из Future завершится или был отменен.

* asyncio.FIRST_EXCEPTION	Функция возвратит результат, когда любой из Future завершится созданием исключения. Если никакое Future не вызывает исключения, то эта константа эквивалентна asyncio.ALL_COMPLETED.

* asyncio.ALL_COMPLETED	Функция возвратит результат, когда все Future завершатся или будут отменены.

В отличие от функции asyncio.wait_for(), asyncio.wait() не отменяет, а приостанавливает задачи при наступлении таймаута.

Функция asyncio.wait_for() поддерживает приостановку выполнения задач, после получения результата задачи, которая вернула их первой или после указанного таймаута, что обеспечивает более низкий уровень точности операций:

In [19]:
import asyncio, random

async def worker(tag):
    """Основная сопрограмма"""
    print('Run:', tag)
    # эмитируем ожидание ответа сервера случайным
    # образом при помощи модуля `random` 
    await asyncio.sleep(random.uniform(0.5, 5))
    print('Done:', tag)
    return tag

async def main():
    """Точка входа в программу"""
    # создаем и планируем асинхронный запуск задач
    tasks = [asyncio.create_task(worker(tag)) for tag in range(1, 8)]

    # запускаем созданные задачи до получения любого первого результата
    done, pending1 = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
    print("Приостановка задач, после получения первого результата:")
    for future in done:
        res = future.result()
        print(f'  Результаты задачи №{res} получены.')
    print("Кол-во приостановленных задач:", len(pending1), '\n')

    # запускаем приостановленные задачи с таймаутом в 2 сек
    done, pending2 = await asyncio.wait(pending1, timeout=2)
    print("Дальнейшие результаты после таймаута в 2 сек:")
    for future in done:
        res = future.result()
        print(f'  Результаты задачи №{res} получены.')
    print("Кол-во остановленных задач после 2 сек. работы:", len(pending2), '\n')

    done, _ = await asyncio.wait(pending2)
    print("Получаем оставшиеся результаты:")
    for future in done:
        res = future.result()
        print(f'Результаты задачи №{res} получены.')

await main()

Run: 1
Run: 2
Run: 3
Run: 4
Run: 5
Run: 6
Run: 7
Done: 4
Приостановка задач, после получения первого результата:
  Результаты задачи №4 получены.
Кол-во приостановленных задач: 6 

Done: 2
Done: 3
Done: 5
Дальнейшие результаты после таймаута в 2 сек:
  Результаты задачи №3 получены.
  Результаты задачи №5 получены.
  Результаты задачи №2 получены.
Кол-во остановленных задач после 2 сек. работы: 3 

Done: 6
Done: 1
Done: 7
Получаем оставшиеся результаты:
Результаты задачи №6 получены.
Результаты задачи №7 получены.
Результаты задачи №1 получены.
