Что почитать?
- https://bbc.github.io/cloudfit-public-docs/
- https://docs.python.org/3/library/asyncio.html

In [1]:
import asyncio
from dataclasses import dataclass
from datetime import datetime as dt
from decimal import Decimal
from time import sleep

# Посмотрим, как все работает

- Event loop (scheduler) -- рантайм для исполнения `Task`'ок. Тред может иметь только один Event loop.


- `Task` -- контейнер, который используется для конкурентного исполнения `Сoroutine`.


- `Coroutine`
    - _coroutine function_ -- `async def` функция
    - объект, который возвращается при вызове _coroutine function_


- `Awaitable` -- объект, у которого определен метод `__await__` и который, соответственно, может быть использован в выражении:
    ```python
    result = await awaitable, ...
    ```
    Примеры awaitable объектов:
    - `Coroutine`
    - `Task`

## Как запустить корутину?

In [20]:
async def hello(name, wait=3):
    await asyncio.sleep(wait)
    print(f"Hello, {name}!")
    return wait

async def raise_exc(exc):
    raise exc

In [3]:
hello("Python")

<coroutine object hello at 0x7fe3161beb40>

### `asyncio.run`

In [4]:
asyncio.run(hello("Jupyter"))

RuntimeError: asyncio.run() cannot be called from a running event loop

In [5]:
asyncio.get_event_loop()

<_UnixSelectorEventLoop running=True closed=False debug=False>

In [6]:
import threading

In [7]:
def main(f, *args):
    try:
        asyncio.get_event_loop()
    except RuntimeError:
        print("Event loop is not running")
    
    asyncio.run(f(*args))
    
    try:
        asyncio.get_event_loop()
    except RuntimeError:
        print("Event loop is not running")

    print("OK")

In [9]:
t = threading.Thread(target=main, args=(hello, "Thread"))
t.start()
t.join()

Event loop is not running
Hello, Thread!
Event loop is not running
OK


### `await`

In [11]:
await raise_exc(ValueError)

ValueError: 

### `asyncio.create_task`

In [16]:
t = asyncio.create_task(hello("World"))

In [17]:
await t

Hello, World!


3

In [14]:
t.result()

3

In [15]:
t.done()

True

## Tasks

### Создание нескольких тасок

In [21]:
async def create_multiple_tasks():
    print(dt.now())
    t1 = asyncio.create_task(hello("World"))
    t2 = asyncio.create_task(hello("Python"))
    t3 = asyncio.create_task(hello("Asyncio"))
    await t1, t2, t3
    print(dt.now())

In [22]:
await create_multiple_tasks()

2022-04-21 20:21:18.359127
Hello, World!
Hello, Python!
Hello, Asyncio!
2022-04-21 20:21:21.364971


### Отмена тасок

In [23]:
async def foo():
    try:
        await asyncio.sleep(5)
    except asyncio.CancelledError:
        print("you cannot cancel me")
    return 1

In [27]:
c = foo()

In [24]:
t = asyncio.create_task(foo())

In [25]:
t.cancel()

True

you cannot cancel me


In [None]:
await t

## Async Context Manager

In [None]:
async with AsyncCM as ctx:
    ...

то же, что и:

In [None]:
ctx = await AsyncCM.__aenter__()
try:
    ...
except Exception as e:
    if not await AsyncCM.__aexit__(type(e), e, e.__traceback__):
        raise e
else:
    await AsyncCM.__aexit__(None, None, None)

Асинхронный контекстный менеджер должен определять следующие методы:

In [None]:
async def __aenter__(self):
    ...

async def __aexit__(self, exc_t, exc_v, exc_tb):
    ...

## Примитивы синхронизации

https://docs.python.org/3/library/asyncio-sync.html

### `Lock`

In [43]:
@dataclass
class BankAccount:
    name: str
    balance: Decimal

In [44]:
A = BankAccount(name="Alice", balance=100)
B = BankAccount(name="Bob", balance=100)

In [45]:
async def select_balance(acc):
    await asyncio.sleep(0.1) # идем в базу
    return acc.balance

async def update_balance(acc, amount):
    await asyncio.sleep(0.1) # идем в базу
    acc.balance += amount

async def transfer_money(src, dst, amount):
    if await select_balance(src) >= amount:
        await update_balance(src, -amount)
        await update_balance(dst, amount)

In [46]:
await asyncio.gather(transfer_money(A, B, 100), transfer_money(A, B, 100))
A, B

(BankAccount(name='Alice', balance=-100), BankAccount(name='Bob', balance=300))

### Semaphore

In [53]:
async def gather_with_concurrency(conc, *aws):
    sem = asyncio.Semaphore(conc)  # 3

    async def sem_task(aw):
        async with sem:
            return await aw

    return await asyncio.gather(*(sem_task(aw) for aw in aws))

In [54]:
await gather_with_concurrency(3, *[hello(i) for i in range(10)])

Hello, 0!
Hello, 1!
Hello, 2!
Hello, 3!
Hello, 4!
Hello, 5!
Hello, 6!
Hello, 7!
Hello, 8!
Hello, 9!


[3, 3, 3, 3, 3, 3, 3, 3, 3, 3]

# Примеры

## Background job

In [32]:
import asyncio

async def get_some_values_from_io():
    await asyncio.sleep(0.1)  # например, HTTP запрос
    return [0]

vals = []

async def fetcher():
    while True:
        io_vals = await get_some_values_from_io()
        for val in io_vals:
            vals.append(val)

async def monitor():
    while True:
        print(len(vals))
        await asyncio.sleep(1)

t1 = asyncio.create_task(fetcher())
t2 = asyncio.create_task(monitor())

0
9
19
28
38
48
57
67
77
86
96
106
115
125
135
144
154
164
173
183
192
202
212
221
231
241
250
260
270
279
289
299
308
318
328


In [33]:
t1.cancel()
t2.cancel()

True

---

In [35]:
import sys

In [36]:
sys.executable

'/Users/iporyadkov/anaconda2/envs/py38-poetry/bin/python'

In [37]:
! $sys.executable -m pip install aiofiles aiohttp



## Чтение из файла

https://github.com/Tinche/aiofiles

In [38]:
import aiofiles

In [39]:
async with aiofiles.open("./ASYNCIO.ipynb") as f:
    content = await f.read(10)
print(content)

{
 "cells"


## HTTP запрос

https://docs.aiohttp.org/en/stable/

In [40]:
import aiohttp

In [41]:
async with aiohttp.ClientSession() as session:
    async with session.get("http://python.org") as response:

        print("Status:", response.status)
        print("Content-type:", response.headers["content-type"])

        html = await response.text()
        print("Body:", html[:15], "...")

Status: 200
Content-type: text/html; charset=utf-8
Body: <!doctype html> ...


## Запуск процесса

https://docs.python.org/3/library/asyncio-subprocess.html

In [42]:
async def run(cmd):
    proc = await asyncio.create_subprocess_shell(
        cmd,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )

    stdout, stderr = await proc.communicate()

    print(f'[{cmd!r} exited with {proc.returncode}]')
    if stdout:
        print(f'[stdout]\n{stdout.decode()}')
    if stderr:
        print(f'[stderr]\n{stderr.decode()}')

# asyncio.run(run("ls /zzz"))
await run("ls /zzz")

['ls /zzz' exited with 1]
[stderr]
ls: /zzz: No such file or directory

