<a href="https://colab.research.google.com/github/polaris2010/CLI-Encoder/blob/main/asyncio.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Асинхронное программирование на Python. Библиотека [asyncio](https://docs.python.org/3/library/asyncio.html)

Слыша слова "асинхронное программирование" в основном представляется, что имея одну большую задачу мы хотим разбить ее на несколько небольших кусочков и выполнить их независимо друг от друга, а после склеить результаты и получить полное решение.

Однако как же это правильно реализовать на самом деле?

Задачи, которые перед нами возникают принято делить на 2 типа:
- **I/O-bound**

    Это тип задач, которые которые в основном зависят от скорости ввода/вывода информации (input/output)


- **CPU-bound**

    Вычислительные задачи, которые в основном зависят от ресурсов процессора. Примерами таких задач могут служить математические вычисления, майнинг криптовалют или взлом шифров по ключу с помощью подбора простых множителей.


В зависимости от типа задачи используют разные подходы к её делению на подзадачи.


## IO-bound задачи

**Идея** - будем распределять множество задач одному работнику, причем пока одна из задач не требует непосредственного его участия он начинает делать следующую.

Множество задач внутри которых есть какое-либо ожидание (например, ожидание ответа от сервера) запускаются по очереди, причем пока одна задача находится в режиме ожидания другая начинает работать. Стоит отметить, что все происходит в **одном** потоке.

**Как же будет устроена работа асинхронного кода?**

У нас будут функции, которые умеют засыпать и просыпаться в нужное время, а так же обработчик (*event loop*). Мы собираем функции в обработчик. Когда одна из функций засыпает управление получает обработчик, после чего отдает управление очередной функции.


Рассмотрим простейший пример:

In [None]:
import time
queue = []

def counter():
    cnt = 0
    while True:
        print(cnt)
        cnt += 1
        yield


def printer():
    cnt = 1
    while True:
        if cnt%3 == 0:
            print("Go!")
        cnt += 1
        yield


g1 = counter()
queue.append(g1)

g2 = printer()
queue.append(g2)

i = 0
while True:
    g = queue.pop(0)
    time.sleep(0.5)
    next(g)
    queue.append(g)
    i += 1
    if i == 100: break

В Python много библиотек для асинхронного программирования, наиболее популярными являются Tornado, Asyncio и Gevent. Давайте посмотрим, как работает Asyncio.

# Asyncio

In [None]:
import asyncio

Объект корутины (асинхронная функция) объявляется с помощью `async def` и исполняется с помощью `await`:

In [None]:
async def rocket():
    print(1)
    await asyncio.sleep(1)
    print(2)
    await asyncio.sleep(1)
    print(3)
    await asyncio.sleep(1)
    print("Go!")

await rocket()

1
2
3
Go!


Заметим, что просто написав rocket() у нас ничего не выполнится.

In [None]:
rocket()

<coroutine object rocket at 0x79742decab90>

Разберем следующий пример:

In [None]:
import time


async def say_after(delay, msg):
    await asyncio.sleep(delay)
    print(msg)

async def main():
    started = time.time()

    await say_after(1, 'hello')
    await say_after(2, ' world')

    print(f"Time: {time.time() - started}")

await main()

hello
 world
Time: 3.003570079803467


Хм, что-то не то, ждем все три секунды, где же профит?

Проблема заключается в том, что в нашем коде задачи выполняются последовательно с использованием `await`. Каждый вызов `await` блокирует выполнение текущей корутины до завершения задачи. Поэтому корутины `say_after` выполняются последовательно, а не параллельно.

Теперь рассмотрим как можно исправить такую проблему. Поговорим про `Task()`.

## `Task`

`Task()` - это объект, который запускает корутину. Объект `Task` используется для запуска корутин в циклах событий при помощи оператора `await`.

Циклы событий используют **совместное планирование**. Другими словами, цикл событий запускает одну задачу за раз. Пока объект задачи Task ожидает готовности, цикл событий запускает другие задачи, обратные вызовы или выполняет операции ввода-вывода.

Теперь создадим задачи и посмотрим, исправило ли эту проблему:

In [None]:
async def main():
    task1 = asyncio.create_task(
        say_after(1, 'hello'))

    task2 = asyncio.create_task(
        say_after(2, 'world'))

    started = time.time()

    await task1
    await task2

    print(f"Time: {time.time() - started}")

await main()

hello
world
Time: 2.002523422241211


Как мы видим, это нам помогло исправить нашу проблему!

Теперь рассмотрим некоторые полезные метода класса `Task()`:

In [None]:
task = asyncio.create_task(
    say_after(1, 'hello')
)

await task

hello


Метод `Task.cancelled()` возвращает `True`, если задача `Task` отменена.

Задача отменяется, когда отмена была запрошена с помощью метода `Task.cancel()` и обернутая сопрограмма распространила переданное в нее исключение `asyncio.CancelledError`.

In [None]:
task.cancelled()

False

Метод `Task.done()` возвращает `True`, если задача `Task` выполнена.

Задача считается выполненной, когда обернутая сопрограмма либо отработала и вернула результат, либо вызвала исключение, либо была отменена.

In [None]:
task.done()

True

В примере ниже задача `my_coroutine()` выполняется асинхронно с помощью `asyncio.create_task()`. Если вызовете `task.done()` непосредственно после создания задачи, она вернет `False`, так как задача еще не завершилась.

In [None]:
async def my_coroutine():
    await asyncio.sleep(3)
    print("Задача завершена")


async def main():
    task = asyncio.create_task(my_coroutine())
    if task.done():
        print("Задача уже завершена")
    else:
        print("Задача все еще выполняется")


await main()

Задача все еще выполняется


Метод `Task.set_name()` устанавливает имя для задачи `Task`. Аргументом значения может быть любой объект, который затем преобразуется в строку.

In [None]:
task.set_name(123)

Метод `Task.get_name()` возвращает имя name задачи Task. Если имя задачи Task не было явно установлено, то по умолчанию оно генерируется во время создания:

In [None]:
task.get_name()

'123'

In [None]:
task2 = asyncio.create_task(my_coroutine())
task3 = asyncio.create_task(my_coroutine())

Задача завершена
Задача завершена


In [None]:
task2.get_name()

'Task-39'

In [None]:
task3.get_name()

'Task-40'

Так же важно знать про объекты `Future`.

Класс `asyncio.Future()` представляет собой конечный результат асинхронной операции. Класс `Task()` является подклассом класса `Future()`.

In [None]:
async def slow_operation():
    await asyncio.sleep(2)
    return "Завершено!"


async def main():
    future = asyncio.ensure_future(slow_operation())  # Создание объекта Future
    await asyncio.sleep(1)
    future_result = await future  # Ожидание завершения Future
    print(future_result)


await main()

Завершено!


### Функция `gather`

Функция `gather()` одновременно запускает объекты, переданные в функцию.

Рассмотрим это на примере - мы хотим подключаться к пользователю и отправлять ему тестовое сообщение:

In [None]:
id_list = [1, 2, 3, 4]

async def connect_to_user(usr_id):
    print(f'Подключаюсь к {usr_id}')
    await asyncio.sleep(2)
    print(f'Отправляю команду test пользователю {usr_id}')
    await asyncio.sleep(2)


async def send_msg(id_list):
    coroutines = map(connect_to_user, id_list)
    result = await asyncio.gather(*coroutines,return_exceptions=False )
    return result

res = await send_msg(id_list)

Подключаюсь к 1
Подключаюсь к 2
Подключаюсь к 3
Подключаюсь к 4
Отправляю команду test пользователю 1
Отправляю команду test пользователю 2
Отправляю команду test пользователю 3
Отправляю команду test пользователю 4


А что произойдет, если попытка подключения окажется неудачной и выбросится исключение? Как мы будем его обрабатывать?

Для этого есть аргумент `return_exceptions`:
- `return_exceptions=False` (по умолчанию) - первое появившееся исключение, немедленно распространяется на ту задачу, в которой оно возникло в момент ожидания `asyncio.gather()`, при этом другие объекты в последовательности не будут отменены и продолжат выполнение.
- `return_exceptions=True` - исключения обрабатываются так же, как успешные результаты и передаются в совокупный список результатов.

In [None]:
async def connect_to_user(usr_id):
    print(f'Подключаюсь к {usr_id}')
    if usr_id == 4:
        raise OSError(f'Не могу подключиться к {usr_id}')
    await asyncio.sleep(1)
    print(f'Отправляю команду test пользователю {usr_id}')
    await asyncio.sleep(1)


async def send_msg(id_list, return_ex = False):
    coroutines = map(connect_to_user, id_list)
    result = await asyncio.gather(*coroutines,return_exceptions=return_ex)
    return result

await send_msg(id_list, False)

Подключаюсь к 1
Подключаюсь к 2
Подключаюсь к 3
Подключаюсь к 4


OSError: Не могу подключиться к 4

In [None]:
await send_msg(id_list, True)

Подключаюсь к 1
Подключаюсь к 2
Подключаюсь к 3
Подключаюсь к 4
Отправляю команду test пользователю 1
Отправляю команду test пользователю 2
Отправляю команду test пользователю 3


[None, None, None, OSError('Не могу подключиться к 4')]

Три корутины с usr_id 1, 2 и 3 выполнились успешно и вернули `None` в качестве результата. Это потому, что функция `connect_to_user(usr_id)` не возвращает явно никакого значения, кроме `None`.
Четвертая корутина с usr_id 4 вызвала `OSError` и исключение было сохранено в списке результатов.

Теперь пример с использованием `Future` и `gather()` для ожидания нескольких асинхронных операций:

In [None]:
async def slow_operation(num):
    await asyncio.sleep(2)
    return f"Операция {num} завершена"

async def main():
    futures = [asyncio.ensure_future(slow_operation(i)) for i in range(3)]  # Создание нескольких объектов Future
    results = await asyncio.gather(*futures)  # Ожидание завершения всех Future
    for result in results:
        print(result)
await main()

Операция 0 завершена
Операция 1 завершена
Операция 2 завершена


### Функция `shield`

Представьте, что у вас есть асинхронная функция, которая выполняет важную задачу: отправку критической команды на удаленное устройство. Вы не хотите, чтобы эта задача была отменена, даже если другие части программы пытаются отменить выполнение. В таком случае в `asyncio` есть функция `shield`, которая защищает задачи от отмены методом `Task.cancel()`:

In [None]:
async def start_task():
    print('Старт задачи...')
    await asyncio.sleep(2)
    print('А что со мной хотели сделать?')


async def cancel(some_task):
    await asyncio.sleep(0.5)
    some_task.cancel()
    print('Отмена!')


async def main():
    real_task = asyncio.create_task(start_task())
    shield = asyncio.shield(real_task)
    asyncio.create_task(cancel(shield))
    await real_task


await main()

Старт задачи...
Отмена!
А что со мной хотели сделать?


### Функция `wait_for`

Следит за тем, чтобы корутина не выполнялась больше заданного времени. Если истекает таймаут, то отменяет задачу и бросает `TimeoutError`.

Если необходимо избежать отмены задачи, то лучше обренуть в `shield()`.

In [None]:
async def fast_func():
    await asyncio.sleep(360)
    print('Да не долго я работаю')


async def main():
    try:
        await asyncio.wait_for(fast_func(), timeout=5.0)
    except asyncio.TimeoutError:
        print('Какая-то не очень быстрая ...')

await main()

Какая-то не очень быстрая ...


In [None]:
async def send_message(user_id, message):
    print(f"Отправка сообщения '{message}' пользователю {user_id}")
    await asyncio.sleep(5)
    print(f"Сообщение '{message}' доставлено пользователю {user_id}")


async def receive_message(user_id):
    try:
        await asyncio.wait_for(send_message(user_id, "Привет!"), timeout=3)
        print(f"Получен ответ: Привет!")
    except asyncio.TimeoutError:
        print(f"Время ожидания ответа от пользователя {user_id} истекло")

async def main():
    user_id = 123
    await receive_message(user_id)


await main()

Отправка сообщения 'Привет!' пользователю 123
Время ожидания ответа от пользователя 123 истекло


In [None]:
async def send_important_message():
    print("Начало отправки важного сообщения")
    await asyncio.sleep(7)


async def main():
    task = asyncio.create_task(send_important_message())
    try:
        await asyncio.sleep(5)
        task.cancel()
        # Здесь используем shield() для гарантированной отправки важного сообщения
        await asyncio.shield(asyncio.wait_for(task, timeout=5))
        print("Отправлено!")
    except asyncio.TimeoutError:
        print("Время на отправку важного сообщения истекло")


await main()

Начало отправки важного сообщения


CancelledError: 

### Функция `wait`

Выполняет корутины до таймаута или до параметра функции `return_when`.

Возваращает 2 множества корутин: выполненных и нет.

Параметр `return_when` в методе `wait()` определяет, когда метод должен вернуть результат в зависимости от состояния задач. `return_when` является одним из значений перечисления `asyncio.ReturnWhen`, и у него есть следующие возможные значения:

- `FIRST_COMPLETED`: Метод вернет управление, когда хотя бы одна задача из переданного множества завершится (успешно или с исключением).

- `FIRST_EXCEPTION`: Метод вернет управление, когда произойдет первое исключение в задаче. Это может быть полезно, если вы хотите получить первое исключение из множества задач.

- `ALL_COMPLETED` (по умолчанию): Метод вернет управление, когда все задачи из переданного множества завершатся (успешно или с исключением).

In [None]:
async def foo():
    await asyncio.sleep(1)
    print('hello ...')


async def bar():
    await asyncio.sleep(2)
    print('... world!')


print_hello = asyncio.create_task(foo())
print_world = asyncio.create_task(bar())

done, pending = await asyncio.wait({print_hello,print_world}, return_when=asyncio.FIRST_COMPLETED)

print(done)
print(pending)

hello ...
{<Task finished name='Task-104' coro=<foo() done, defined at <ipython-input-47-44f6b0987f59>:1> result=None>}
{<Task pending name='Task-105' coro=<bar() running at <ipython-input-47-44f6b0987f59>:7> wait_for=<Future pending cb=[Task.task_wakeup()]>>}


In [None]:
print_hello = asyncio.create_task(foo())
print_world = asyncio.create_task(bar())

done, pending = await asyncio.wait({print_hello,print_world}, timeout=1.5)

print(done)
print(pending)

hello ...
{<Task finished name='Task-107' coro=<foo() done, defined at <ipython-input-47-44f6b0987f59>:1> result=None>}
{<Task pending name='Task-108' coro=<bar() running at <ipython-input-47-44f6b0987f59>:7> wait_for=<Future pending cb=[Task.task_wakeup()]>>}


In [None]:
print_hello = asyncio.create_task(foo())
print_world = asyncio.create_task(bar())

done, pending = await asyncio.wait({print_hello,print_world}, return_when=asyncio.ALL_COMPLETED)

print(done)
print(pending)

hello ...
... world!
{<Task finished name='Task-110' coro=<foo() done, defined at <ipython-input-47-44f6b0987f59>:1> result=None>, <Task finished name='Task-111' coro=<bar() done, defined at <ipython-input-47-44f6b0987f59>:6> result=None>}
set()


**Замечание** - `wait` в отличии от `wait_for` не отменяет задачи.

In [None]:
async def task1():
    await asyncio.sleep(2)
    print("Задача 1 завершилась")


async def task2():
    await asyncio.sleep(3)
    raise ValueError("Исключение в задаче 2")


async def main():
    tasks = {asyncio.create_task(task1()), asyncio.create_task(task2())}

    # Вернуть управление после первой завершившейся задачи
    done, _ = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
    for task in done:
        print(f"Завершилась задача: {task.get_name()}")

    # Вернуть управление после первого исключения
    _, exceptions = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
    for task, exception in exceptions:
        print(f"Исключение в задаче: {task.get_name()}, {exception}")


await main()

Задача 1 завершилась
Завершилась задача: Task-113


ERROR:asyncio:Task exception was never retrieved
future: <Task finished name='Task-114' coro=<task2() done, defined at <ipython-input-50-ad6167489986>:6> exception=ValueError('Исключение в задаче 2')>
Traceback (most recent call last):
  File "<ipython-input-50-ad6167489986>", line 8, in task2
    raise ValueError("Исключение в задаче 2")
ValueError: Исключение в задаче 2


## Проблемы

Давайте рассмотрим какие проблемы часто возникают при многопоточном/асинхронном программировании

1) Состояние гонки (**race condition**) возникает, когда два или более потока одновременно пытаются получить доступ к общей переменной.

2) Взаимная блокировка(**deadlock**) - ситуация, в которой один или несколько потоков ожидают события, которое никогда не произойдет. Наиболее распространенная ситуация - это когда каждый из двух потоков удерживает объект синхронизации, который нужен другому потоку, и у одного потока нет возможности освободить объект, который он удерживает, чтобы позволить другому потоку продолжить работу.

### Race Condition

Рассмотрим такой пример на `race condition`:

In [None]:
async def increment_counter():
    global counter
    current = counter
    await asyncio.sleep(0)
    current += 1
    await asyncio.sleep(0)
    counter = current


async def main():
    global counter
    counter = 0
    coroutines = [increment_counter() for _ in range(10000)] # создаем корутины для обновления counter
    await asyncio.gather(*coroutines) # запускаем их
    print(counter)


await main()

10000


Казалось мы должны были получить в результате 10000 (количетсов корутин), т.к каждая корутина должна была заинкрементить `counter` на 1. Но на выходе получили 1.

Давайте разберемся почему так получилось:
- Мы могли бы ожидать, что каждая корутина будет выполняться как атомарный блок, но этого не произошло.
- Каждая корутина приостанавливалась (как переключение контекста) два раза: один раз после копирования `counter` и один раз после увеличения `current`.
- Первая корутина запускается и приостанавливается в первой точке, вторая корутина запускается и приостанавливается в первой точке и так далее. Это продолжается для всех 10 000 корутин.
- Все они скопировали текущее значение `counter`, равное нулю, в `current`.
- Первая корутина возобновляет работу и увеличивает `current` с нуля до единицы, а затем приостанавливает выполнение. Вторая корутина выполняет то же самое, и так далее для всех 10 000 корутин.
- Все они имеют значение единицы в своих временных переменных.
Наконец, первая корутина копирует `current` в `counter`. Вторая корутина выполняет то же самое и так далее для всех 10 000 корутин.
- `counter` каждый раз устанавливается равной единице.

Для предотвращения гонки данных в нашем примере можно использовать один из примитивов синхронизации - мьютекс (`asyncio.Lock`):

In [None]:
async def increment_counter(lock):
    global counter
    async with lock:
        current = counter
        await asyncio.sleep(0)
        current += 1
        await asyncio.sleep(0)
        counter = current


async def main():
    global counter
    counter = 0
    lock = asyncio.Lock()
    coroutines = [increment_counter(lock) for _ in range(10000)]  # создаем корутины для обновления counter
    await asyncio.gather(*coroutines)  # запускаем их
    print(counter)


await main()

10000


Теперь видим, что у нас код правильно работает. Каждая корутина перед обновлением `counter` захватывает мьютекс с помощью оператора `async with`, что предотвращает гонку данных.

**Рекомендую** ознакомиться с этой замечательной [статьей](https://habr.com/ru/articles/767792/) на Хабре о примитивах синхронизации в Python.

### Deadlock

Теперь давайте рассмотрим несколько примеров с `deadlock`:

Пример с одной корутиной:

In [None]:
async def task(lock):
    print('Task acquiring lock...')
    # acquire the lock
    async with lock:
        print('Task acquiring lock again...')
        # acquire the lock
        async with lock:
            # will never get here
            pass

async def main():
    # create the shared lock
    lock = asyncio.Lock()
    # execute and await the coroutine
    await task(lock)


await main()

Task acquiring lock...
Task acquiring lock again...


CancelledError: 

В приведенном примере возможен deadlock, поскольку корутина `task()` пытается дважды захватить один и тот же асинхронный мьютекс.

Пример с несколькими корутинами:

In [None]:
async def task(number, lock1, lock2):
    print(f'Task {number} acquiring lock 1...')
    async with lock1:
        await asyncio.sleep(1)
        print(f'Task {number} acquiring lock 2...')
        async with lock2:
            pass


async def main():
    lock1 = asyncio.Lock()
    lock2 = asyncio.Lock()
    coro1 = task(1, lock1, lock2)
    coro2 = task(2, lock2, lock1)
    await asyncio.gather(coro1, coro2)

await main()

Task 1 acquiring lock 1...
Task 2 acquiring lock 1...
Task 1 acquiring lock 2...
Task 2 acquiring lock 2...


CancelledError: 

Пример когда не удается снять блокировку

In [None]:
async def task(lock):
    # acquire the lock
    print('Task acquiring lock...')
    await lock.acquire()
    # fail
    raise Exception('Something bad happened')
    # release the lock (never gets here)
    print('Task releasing lock...')
    lock.release()


async def main():
    # create the mutex lock
    lock = asyncio.Lock()
    # create and start the new task
    asyncio.create_task(task(lock))
    # wait a moment
    await asyncio.sleep(1)
    # acquire the lock
    print('Main acquiring lock...')
    await lock.acquire()
    # do something...
    # release lock (never gets here)
    lock.release()

await main()

Теперь рассмотрим некоторые из важных рекомендаций по предотвращению `deadlock`.

1) **Context Managers**

По возможности получайте и снимайте блокировку с помощью контекстного менеджера.
Блокировки могут быть получены вручную с помощью вызова функции `acquire()` в начале критической секции, за которым следует вызов функции `release()` в конце критической секции.

По возможности следует избегать такого подхода:
``` python
# acquire the lock manually
lock.acquire()
# critical section...
# release the lock
lock.release()
```

Традиционно рекомендовалось всегда устанавливать и снимать блокировку в структуре `try-finally`.
Блокировка получена, критическая секция выполняется в блоке try, и блокировка всегда снимается в блоке finally. Например:
```python
# acquire the lock
lock.acquire()
try:
	# critical section...
finally:
	# always release the lock
	lock.release()
```
Но тех пор это было заменено интерфейсом context manager, который обеспечивает то же самое с меньшим количеством кода.
```python
# acquire the lock
with lock:
	# critical section...
```

2) **Используйте тайм-ауты при ожидании**

Всегда используйте тайм-аут при ожидании корутины или задачи.
Многие вызовы, выполняемые с помощью примитивов синхронизации, могут блокироваться.
Например:
Ожидание получения `asyncio.Lock` с помощью `acquire()`.
Ожидание завершения корутины с помощью выражения `await`.
Ожидает получения уведомления о `asyncio.Condition` через `wait()`.

В отличие от потоков и процессов, блокирующие вызовы примитивов синхронизации `asyncio` не принимают аргумент “тайм-аут”. Вместо этого мы должны использовать функцию `asyncio.wait_for()` для ожидания доступной (корутины или задачи) с тайм-аутом. Например:
```python
try:
    await asyncio.wait_for(lock.acquire(), timeout=10)
except asyncio.TimeoutError:
```
Это позволит ожидающей корутине прекратить ожидание по истечении установленного срока, а затем попытаться исправить ситуацию, например, сообщить об ошибке, принудительно завершить работу и т.д.

3) **Устанавливайте блокировки в одном и том же порядке по всему приложению, где это возможно.**

## Примитивы синхронизации в asyncio

При решении проблемы `race condition` на примере выше мы использовали асинхронный мьютекс, который является одним из примитивов синхронизации. Давайте более подробно расскажем о них.

Некоторые ошибки, встречающиеся в многопоточных и многопроцессных приложениях, исключены в силу однопоточной природы `asyncio`, но это не совсем так. Вам редко потребуется прибегать к синхронизации при работе с `asyncio`, однако остаются ситуации, когда с этим нужно что‑то делать. Примитивы синхронизации `asyncio` могут помочь предотвратить ошибки, свойственные только модели однопоточной конкурентности.

### `asyncio.Semaphore`

Иногда нам необходимо получить доступ к ресурсу с ограниченным количеством одновременных запросов. Например, конкретная база данных допускает одновременное открытие только пяти соединений. Или, в зависимости от типа подписки, веб-интерфейс поддерживает только определенное количество одновременных запросов. В этом случае необходимо использовать `asyncio.Semaphore`.

`asyncio.Semaphore` использует внутренний счетчик, который уменьшается на единицу при каждом получении блокировки `Semaphore`, пока не достигнет нуля.

Когда счетчик `asyncio.Semaphore` равен нулю, другие задачи, которым нужна блокировка, будут ждать. При вызове метода `release` после выполнения других задач счетчик будет увеличен на единицу. Ожидающие задачи могут продолжить выполнение.

In [None]:
from asyncio import Semaphore
from aiohttp import ClientSession


async def get_url(url: str, session: ClientSession, semaphore: Semaphore):
    print('Waiting to acquire semaphore...')
    async with semaphore:
        print('Semaphore acquired, requesting...')
        response = await session.get(url)
        print('Finishing requesting')
        return response.status


async def main():
    # Хотя мы запускаем 100 задач, одновременно будут выполняться только 10 задач.
    semaphore: Semaphore = Semaphore(10)
    async with ClientSession() as session:
        tasks = [asyncio.create_task(get_url("https://www.example.com", session, semaphore))
                 for _ in range(100)]
        await asyncio.gather(*tasks)

await main()

Waiting to acquire semaphore...
Semaphore acquired, requesting...
Waiting to acquire semaphore...
Semaphore acquired, requesting...
Waiting to acquire semaphore...
Semaphore acquired, requesting...
Waiting to acquire semaphore...
Semaphore acquired, requesting...
Waiting to acquire semaphore...
Semaphore acquired, requesting...
Waiting to acquire semaphore...
Semaphore acquired, requesting...
Waiting to acquire semaphore...
Semaphore acquired, requesting...
Waiting to acquire semaphore...
Semaphore acquired, requesting...
Waiting to acquire semaphore...
Semaphore acquired, requesting...
Waiting to acquire semaphore...
Semaphore acquired, requesting...
Waiting to acquire semaphore...
Waiting to acquire semaphore...
Waiting to acquire semaphore...
Waiting to acquire semaphore...
Waiting to acquire semaphore...
Waiting to acquire semaphore...
Waiting to acquire semaphore...
Waiting to acquire semaphore...
Waiting to acquire semaphore...
Waiting to acquire semaphore...
Waiting to acquire s

### `asyncio.Event`

`Event` поддерживает внутреннюю булеву переменную в качестве флага. `asyncio.Event` имеет три общих метода: `wait`, `set` и `clear`.


Когда задача добегает до `event.wait()`, она находится в состоянии ожидания. В этот момент можно вызвать `event.set()`, чтобы установить внутренний маркер в `True`, и все ожидающие задачи могут продолжить выполнение.

Когда задача завершится, необходимо вызвать метод `event.clear()`, чтобы сбросить значение маркера в `False`, вернуть событие в исходное состояние, и можно продолжать использовать событие в следующий раз.

In [None]:
async def coro1(event):
    print("coro1: Waiting for the event to be set")
    await event.wait()  # Ждем, пока событие не будет установлено
    print("coro1: Event has been set")

async def coro2(event):
    await asyncio.sleep(10)  # Подождем 2 секунды
    print("coro2: Setting the event")
    event.set()  # Устанавливаем событие

async def main():
    # Создаем объект события
    event = asyncio.Event()

    # Запускаем два таска
    task1 = asyncio.create_task(coro1(event))
    task2 = asyncio.create_task(coro2(event))

    # Ждем, пока оба таска завершат выполнение
    await asyncio.gather(task1, task2)

await main()

### `asyncio.Condition`

`asyncio.Event` хорош, когда нужно просто уведомить о том, что произошло нечто, но бывают ситуации посложнее. Допустим, что по событию требуется получить доступ к разделяемому ресурсу, т.е. захватить блокировку. Или что перед продолжением работы нужно дождаться более сложного сочетания условий, чем простое событие. Или что нужно разбудить не все задачи, а только определенное число. Во всех этих случаях может выручить `asyncio.Condition`.

`asyncio.Condition` похож на `asyncio.Lock` и `asyncio.Event` вместе взятые. Сначала мы используем `async with`, чтобы обеспечить получение блокировки условия, а затем вызываем `condition.wait()`, чтобы освободить блокировку условия и заставить задачу временно подождать.

Когда `condition.wait()` проходит, мы возвращаем блокировку условия, чтобы гарантировать одновременное выполнение только одной задачи. Пока задача временно освобождает блокировку и переходит в состояние ожидания по методу `condition.wait()`, другая задача может либо асинхронизироваться с блокировкой условия и уведомить все ожидающие задачи о необходимости продолжить выполнение по методу `condition.notify_all()`.

In [None]:
from asyncio import Condition


async def do_work(condition: Condition):
    print("do_work: захватываю блокировку условия...")
    async with condition:
        print("do_work: Блокировка захвачена, освобождаю и жду выполнения условия...")
        await condition.wait()
        print("do_work: Условие выполнено, вновь захватываю блокировку и начинаю работать...")
        await asyncio.sleep(1)
        print("do_work: Работа закончена, блокировка освобождена.")


async def fire_event(condition: Condition):
    await asyncio.sleep(5)
    print("fire_event: захватываю блокировку условия...")
    async with condition:
        print("fire_event: Блокировка захвачена, уведомляю всех исполнителей")
        condition.notify_all()
        print("fire_event: Исполнители уведомлены, освобождаю блокировку.")


async def main():
    condition = Condition()
    asyncio.create_task(fire_event(condition))

    await asyncio.gather(do_work(condition), do_work(condition))


if __name__ == "__main__":
    await main()

Иногда мы используем `asyncio.Condition`, чтобы ждать выполнения определенного условия перед переходом к следующему шагу. Мы можем вызвать `condition.wait_for()` и передать метод в качестве аргумента. При каждом вызове `condition.notify_all` метод `condition.wait_for` проверяет результат выполнения переданного метода. Если результат равен `True`, ожидание завершается, если `False` - ожидание продолжается.

Пример использования `condition.wait_for` может быть проиллюстрирован на следующем коде. Мы моделируем подключение к базе данных, проверяя, инициализировано ли соединение, и выполняем запрос, если инициализация завершена. В противном случае код ждет завершения инициализации соединения.

In [None]:
from asyncio import Condition
from enum import Enum

In [None]:
class ConnectionState(Enum):
    WAIT_INIT = 0
    INITIALING = 1
    INITIALIZED = 2

In [None]:
class Connection:
    def __init__(self):
        self._state = ConnectionState.WAIT_INIT
        self._condition = Condition()

    async def initialize(self):
        print("initialize: Preparing initialize the connection.")
        await self._change_state(ConnectionState.INITIALING)
        await asyncio.sleep(5)
        print("initialize: Connection initialized")
        await self._change_state(ConnectionState.INITIALIZED)

    async def execute(self, query: str):
        async with self._condition:
            print("execute: Waiting for connection initialized")
            await self._condition.wait_for(self._is_initialized)
            print(f"execute: Connection initialized, executing query: {query}")
            await asyncio.sleep(5)
            print("execute: Execute finished.")

    async def _change_state(self, state: ConnectionState):
        print(f"_change_state: Will change state from {self._state} to {state}")
        self._state = state
        print("_change_state: Change the state and notify all..")
        async with self._condition:
            self._condition.notify_all()

    def _is_initialized(self):
        if self._state is not ConnectionState.INITIALIZED:
            print("_is_initialized: The connection is not initialized.")
            return False
        print("_is_initialized: The connection is ready.")
        return True

In [None]:
async def main():
    connection = Connection()
    task_one = asyncio.create_task(connection.execute("SELECT * FROM table"))
    task_two = asyncio.create_task(connection.execute("SELECT * FROM other_table"))

    asyncio.create_task(connection.initialize())
    await asyncio.gather(task_one, task_two)
await main()

## Библиотека `concurrent.futures`

Кроме библиотеки `asyncio` в `Python` есть не менее интересная библиотека `concurrent.futures`.

Какие преимущества предоставляет этот модуль?

1) **Простота использования:** `concurrent.futures` предоставляет простой и интуитивно понятный API для запуска задач параллельно. Это позволяет сосредоточиться на решении задачи, а не на деталях многозадачности.

2) **Автоматическое масштабирование:** Модуль позволяет легко масштабировать задачи, выполняемые в пулах потоков (ThreadPoolExecutor) и пулах процессов (ProcessPoolExecutor). Вы можете использовать их в зависимости от характера задачи и доступных ресурсов.

3) **Удобная обработка результатов:** `concurrent.futures` предоставляет `Future` объекты, которые позволяют отслеживать выполнение задач и получать результаты, когда они готовы.

4) **Отсутствие необходимости заботиться о GIL:** В отличие от многих других способов параллельного выполнения в Python, concurrent.futures позволяет избежать проблем, связанных с `Global Interpreter Lock (GIL)`, что делает его отличным выбором для многозадачных приложений.
Подробно [тут](https://habr.com/ru/companies/otus/articles/771346/).

Полезные ссылки по теме семинара:
* Документация Asyncio : https://docs.python.org/3/library/asyncio.html
* Документация Tornado : https://www.tornadoweb.org/en/stable/guide/async.html
* Документация Gevent : http://www.gevent.org/index.html
* Документация Multiproccesing : https://docs.python.org/3/library/multiprocessing.html
* Документация Threading: https://docs.python.org/3/library/threading.html
* GIL: https://wiki.python.org/moin/GlobalInterpreterLock