Материалы:

- https://www.youtube.com/watch?v=maMReTuUOWA

- http://uneex.org/LecturesCMC/PythonIntro2020/14_Async

- https://www.youtube.com/watch?v=5SyA3lsO_hQ

- https://docs.python.org/3/library/asyncio.html

- https://realpython.com/async-io-python/#the-asyncio-package-and-asyncawait

- https://realpython.com/python-concurrency/#how-to-speed-up-an-io-bound-program

- Fluent Python 2021 pre-release, Luciano Ramalho главы 19-21

# Speeding up

Перед обсуждением более сложных концептов, вспомним необходимые термины и зайдем чуть издалека

## Generators and coroutines

### Gen. functions with yield

#### Функция-генератор

In [None]:
def gen_fun():
    yield 42
    yield 43
    yield 44

In [None]:
for v in gen_fun():
    print(v)

#### Объект-генератор

In [None]:
obj_fun = gen_fun()

In [None]:
next(obj_fun)

In [None]:
obj_fun.__next__()

In [None]:
obj_fun.send(None)

In [None]:
next(obj_fun)

#### Пример стрима

Генераторы могут быть полезны, когда вы хотите сделать некоторую бесконечную последовательность и работать с ней "на лету".
Можно делать стримы. Например, нагенерировать последовательно Фибоначчи в виде генератора, постепенно читать из больших файлов и т.д.

Давайте попробуем написать такую последовательность!

**Task. Напишите бесконечный генератор простых чисел**

In [None]:
from math import sqrt, floor


def create_prime_stream():
    n = 1
    while True:
        n += 1
        for i in range(2, floor(sqrt(n))+1):
            if n % i == 0:
                break
        else:
            yield n

In [None]:
prime_stream = create_prime_stream()

In [None]:
for prime_value in prime_stream:
    print(prime_value)

#### Можно делать и return

In [11]:
def gen_fun():
    yield 42
    yield 43
    yield 44
    return 'string'

In [12]:
obj_fun = gen_fun()

In [13]:
next(obj_fun)

42

In [14]:
next(obj_fun)

43

In [15]:
next(obj_fun)

44

In [16]:
try:
    next(obj_fun)
except StopIteration as e:
    print(e.value)
    raise(e)

string


StopIteration: string

### Coroutines

#### Пример работы yield "на вход"

In [1]:
from collections.abc import Generator

In [18]:
def averager() -> Generator:
    total = 0.
    count = 0
    avg = 0.
    while True:
        curr = yield avg
        total += curr
        count += 1
        avg = total / count

In [19]:
coro_avg = averager()
next(coro_avg)

0.0

In [20]:
coro_avg.send(10)

10.0

In [21]:
coro_avg.send(15)

12.5

Очень похоже на рассмотренный ранее на курсе пример с замыканием. Корутина хранит состояние.

В Python корутины -- это по сути те же генераторы, только мы еще помимо считывания данных, их туда дописываем

In [22]:
coro_avg.send(20)

15.0

In [23]:
coro_avg.close()

In [24]:
coro_avg.send(20)

StopIteration: 

Неформально, **корутина** (или сопрограмма в старой терминологии) -- это такая функция, в которую можно входить много раз и каждый раз попадать именно в то место, из которого в прошлый раз выходил. А также при очеденом входе в такую функцию можно добавлять новые данные. После частичного выхода из сопрограммы, ее код перестает выполняться.

Когда выполнение **корутины** приостанавливается, может происходить переключение на другие задачи.

Итого, в python **корутина** -- это функция, которая может приостановить свое выполение до своего формального завеешения, и на некоторое время передавать ресурсы программы на выполнение другой **корутины**

#### yield from

Также обратим внимание на конструкцию **yield from**

In [2]:
from time import sleep
def secondary():
    for i in range(10):
        yield f"SEC {i}"

def primary(name="Prim"):
    while True:
        yield from secondary()
        yield name

In [26]:
core = primary()

n = 12
for i in range(n):
    sleep(1)
    print(f'{i} {next(core)}')

0 SEC 0
1 SEC 1
2 SEC 2
3 SEC 3
4 SEC 4
5 SEC 5
6 SEC 6
7 SEC 7
8 SEC 8
9 SEC 9
10 Prim
11 SEC 0


Что если сделать два таких primary исполнителя?

In [3]:
from random import random

def secondary(src_name):
    yield f'SEC 1 from {src_name}'
    yield f'SEC 2 from {src_name}'

def primary(name='prim'):
    for i in range(1000):
        yield from secondary(name)
        yield f'{name}-{i}'


core_one = primary('first prim')
core_two = primary('second prim')

n = 30
threshold = 0.8

for i in range(n):
    sleep(1)
    if random() > threshold:
        res = next(core_one)
    else:
        res = next(core_two)
    print(f'{i}| {res}')

0| SEC 1 from first prim
1| SEC 1 from second prim
2| SEC 2 from second prim


KeyboardInterrupt: 

Обратите внимание на глобальный счетчик. В каждом такте цикла мы возвращаемся к выполнению либо *core_one*, либо *core_2* в зависимости от выбранной вероятности. Один такт цикла -- один отрабатываемый yield

#### Добавляем return

А что если было возвращаемое значение?

In [28]:
def secondary(src_name, i):
    yield f'SEC 1 from {src_name} #{i}'
    yield f'SEC 2 from {src_name} #{i}'
    return src_name * i
    
def primary(name='prim'):
    for i in range(1000):
        res = yield from secondary(name, i)
        print(f'secondary-{i}: {res}')
        
core = primary()

n = 10
for i in range(n):
    sleep(1)
    print(f'{i} | {next(core)}')

0 | SEC 1 from prim #0
1 | SEC 2 from prim #0
secondary-0: 
2 | SEC 1 from prim #1
3 | SEC 2 from prim #1
secondary-1: prim
4 | SEC 1 from prim #2
5 | SEC 2 from prim #2
secondary-2: primprim
6 | SEC 1 from prim #3
7 | SEC 2 from prim #3
secondary-3: primprimprim
8 | SEC 1 from prim #4
9 | SEC 2 from prim #4


#### "Общение" генераторов с попощью send

Также генераторы могут друг другу пересылать данные через **.send**

In [None]:
def value_computer(val):
    x = yield f'computer: get val x '
    y = yield f'computer: get val y '
    return x ** 2 + y

def value_provider():
    res_one = yield from value_computer(1)
    print(f'provider: got {res_one} from computer')
    res_two = yield from value_computer(2)
    print(f'provider: got {res_two} from computer')
    
    return res_one + res_two + 100

command = value_provider()
req = next(command)

for i in range(4):
    data = eval(input(req))
    req = command.send(data)  # send в computer, у которого больше нет yield

#### Подходим к async своими руками

Теперь значения будем не руками передавать, а делать это из цикла. Например, если у нас сгенерирован int, будем засылать рандомную цифру, str -- букву

Делаем асинхронность своими руками

In [None]:
import random
from collections import deque


def computer():
    '''
    умножаем строчку на число
    '''
    return (yield int) * (yield str)

def repeater(n):
    '''
    склеиваем n строчек
    '''
    res = ''
    for i in range(n):
#         print(f'repeater before iteration {i}: {res}')
        res += yield from computer()
#         print(f'repeater after iteration {i}: {res}')
    
    return res

def runner(*commands):
    curr_queue = deque((command, None) for command in reversed(commands))
    res = []
    
    while curr_queue:
#         print(curr_queue, res)
        cmd, req_type = curr_queue.pop()
#         print(cmd, req_type)
        try:
            if req_type is str:
                random_char = random.choice('ABCDRTYUWETDGH')
#                 print(f'computer got STR -> send {random_char}')
                req_type = cmd.send(random_char)
            elif req_type is int:
                random_int = random.randint(1, 7)
#                 print(f'computer got INT -> send {random_int}')
                req_type = cmd.send(random_int)
            elif req_type is None:
                req_type = next(cmd)
            else:
                raise ValueError(req_type)
        except StopIteration as e:
            res.append(e.value)
            cmd.close()
            # когда получили StopIteration, генератор еще существует
            # закрытие как защита от непредвиденного поведения
        else:
            curr_queue.appendleft((cmd, req_type))
    return res

In [None]:
runner(repeater(3))

In [None]:
print(runner(repeater(10), repeater(2), repeater(6)))

Код выполняется в произвольном порядке (но все еще контролируемым программистом!)

*А теперь следите за руками*

In [1]:
import asyncio
import types

In [31]:
import random
from collections import deque

@types.coroutine
def computer():
    '''
    умножаем строчку на число
    '''
    return (yield int) * (yield str)

async def repeater(n):
    '''
    склеиваем n строчек
    '''
    res = ''
    for i in range(n):
        res += await computer()
    
    return res

def runner(*commands):
    curr_queue = deque((command, None) for command in reversed(commands))
    res = []
    
    while curr_queue:
        cmd, req_type = curr_queue.pop()
        try:
            if req_type is str:
                req_type = cmd.send(random.choice('ABCDRTYUWETDGH'))
            elif req_type is int:
                req_type = cmd.send(random.randint(1, 7))
            elif req_type is None:
                req_type = cmd.send(None)
            else:
                raise ValueError(req_type)
        except StopIteration as e:
            res.append(e.value)
            cmd.close()
            # когда получили StopIteration, генератор еще существует
            # закрытие как защита от непредвиденного поведения
        else:
            curr_queue.appendleft((cmd, req_type))
    return res

In [None]:
print(runner(repeater(10), repeater(2), repeater(6)))

Мы сейчас практически из кода на чистом питоне перешли к использованию **asyncio**!  

## Concurrency, processes, threads кто такие

**concurrency**

The ability to handle multiple pending tasks, making progress one at a time or in parallel (if possible) so that each of them eventually succeeds or fails. A single-core CPU is capable of concurrency if it runs an OS scheduler that interleaves the execution of the pending tasks. Also known as multitasking

**parallelism**

The ability to execute multiple computations at the same time. This requires a multi-core CPU, a GPU, or multiple computers in cluster.

**process**

An instance of a computer program while it is running, using memory and a slice of the CPU time. Modern operating systems are able to manage many processes concurrently, with each process isolated in its own private memory space. Processes communicate via pipes, sockets, or memory mapped files—all of which can only carry raw bytes, not live Python objects. A process can spawn sub-processes, each called achild process. These are also isolated from each other and from the parent.

Каждый инстанс интерпретатора Python -- это процесс. Для создания процессов в своей программе можно использовать модуль *multiprocessing*

**thread**

An execution path within a single process. When a process starts, it uses a single thread: the main thread. Using operating system APIs, a process can create more threads that operate concurrently thanks to the operating system scheduler. Threads share the memory space of the process, which holds live Python objects. This allows easy communication between threads, but can also lead to corrupted data when more than one thread updates the same object concurrently.

Python (CPython) использует один тред. Можно добавлять новые треды в программу с помощью модуля *threading*

**contention**

Dispute over a limited asset. Resource contention happens when multiple processes or threads try to access a shared resource—such as a lock or storage. There’s also CPU contention, when compute-intensive processes or threads must wait for their share of CPU time.

**lock**

An object that threads can use to coordinate and synchronize their actions and avoid corrupting data. While updating a shared data structure, a thread should hold an associated lock. This makes other well-behaved threads wait until the lock is released before accessing the same data structure. The simplest type of lock is also known as a mutex (for mutual exclusion).

В Python есть такая вещь как GIL (Global Interpreter Lock). В один момент времени только один тред может держать данный лок

- Интерпретатор каждые несколько мс делает паузы, освобождая лок и позволяя другому ожидающему треду (при наличии такового) продолжить работу

- Мы не управляем GIL явно и в полной мере

- GIL освобождается во время функций, производящих disk и network I/O, а также time.sleep()

- Библиотеки, написанные на C/C++  могут обходить ограничения GIL и улучшать производительность (e.g. numpy)

---

В качестве упражнения подумайте, как соотносятся между собой термины *Parallelism*, *Asynchronous programming*, *Threading*, *Multiprocessing*, *Concurrency*

## AsyncIO введение

Выше мы с вами увидели, что от асинхронного кода, реализованного на генераторах, можно перейти к использованию стандратного модуля **AsyncIO**

Все потому, что в основе **asyncio** стоит понятие *корутины*. А выше мы с вами уже увидели, что корутины можно реализовать с помощью обычных генераторов. Данный модуль позволяет сильно упростить реализацию асинхронного кода и "отделить" для пользователя понятие корутин от классических генераторов.  



### К примерам

#### Первая программа с async

In [32]:
from time import sleep

def printer(text: str):
    print(text)

def sleeper(duration: int):
    sleep(duration)
    
def main():
    sleeper(5)
    printer('Woke up')

Запустим синхронно (последовательно)

In [33]:
%%time

main()

Woke up
Wall time: 5.01 s


Заиспользуем **asyncio**

In [34]:
import asyncio

async def printer(text: str):
    print(text)
    
async def sleeper(duration: int):
    print(f'sleeping for {duration} seconds')
    await asyncio.sleep(duration)

def main():
    sleeper(5)
    printer('Woke up')

In [None]:
main()# код на семинаре

In [None]:
def main():
    await sleeper(5)
    await printer('Woke up')

In [None]:
async def main():
    await sleeper(5)
    await printer('Woke up')

In [None]:
await main()

In [41]:
%%writefile async_intro_intro_1.py

import asyncio

async def printer(text: str):
    print(text)
    
async def sleeper(duration: int):
    print(f'sleeping for {duration} seconds')
    await asyncio.sleep(duration)

async def main():
    sleeper(5)
    printer('Woke up')

async def main():
    print("Runnint wait")
    task_sleep = asyncio.create_task(sleeper(5))
    print('slept')
    task_print = asyncio.create_task(print('woke up'))

    print('Finish')
    await task_sleep
    await task_print

asyncio.run(main())

Overwriting async_intro_intro_1.py


In [42]:
! python async_intro_intro_1.py

Runnint wait

Traceback (most recent call last):
  File "async_intro_intro_1.py", line 25, in <module>
    asyncio.run(main())
  File "D:\Programs\Anaconda\lib\asyncio\runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "D:\Programs\Anaconda\lib\asyncio\base_events.py", line 616, in run_until_complete
    return future.result()
  File "async_intro_intro_1.py", line 19, in main
    task_print = asyncio.create_task(print('woke up'))
  File "D:\Programs\Anaconda\lib\asyncio\tasks.py", line 382, in create_task
    task = loop.create_task(coro)
  File "D:\Programs\Anaconda\lib\asyncio\base_events.py", line 431, in create_task
    task = tasks.Task(coro, loop=self, name=name)
TypeError: a coroutine was expected, got None



slept
woke up
sleeping for 5 seconds


## AsyncIO пример работы с сетью

In [None]:
# ! pip install requests, aiohttp

Выгрузим с гитхаба в отдельную папку примеры репозиториев, в которых встречаются заданные кейворды

In [44]:
import time
from typing import Callable
import requests

import os

BASE_DIR = 'github_downloads'
URL = 'https://api.github.com/search/repositories'
KEYWORDS = ['search', 'torch', 'tensorflow',
            'requests', 'alice', 'data', 'deep', 'ML']

def get_repos_data(query: str) -> dict:
    response = requests.get(
        URL,
        params={'q': f'{query}+language:python'},
    )
    if response.ok:
        return response.json()
    else:
        return {}


def get_owners_ids(resp: dict) -> list:
    owners = []
    for item in resp.get('items', []):
        owners.append(item['owner']['id'])
    return owners


def save_stats(ids: list, query_name: str):
    os.makedirs(BASE_DIR, exist_ok=True)
    with open(os.path.join(BASE_DIR, query_name), 'w') as f:
        f.write(str(ids))


def get_many_stats(queries: list) -> int:
    for query in queries:
        data = get_repos_data(query)
        repo_owners = get_owners_ids(data)
        save_stats(repo_owners, query)
    return len(queries)

def main(downloader: Callable[[list], dict]):
    t_start = time.perf_counter()
    count = downloader(KEYWORDS)
    elapsed = time.perf_counter() - t_start
    print(f'\n{count} searches done in {elapsed:.2f}s')

if __name__ == '__main__':
    main(get_many_stats)


8 searches done in 13.97s


Как сделать код эффективнее?

## AsyncIO progress-bar

Разберем забавный пример на показ загрузки по время выполнения программы

Пусть есть такая заготовка (с ошибками). Что нужно изменить, чтобы программа заработала?

In [60]:
%%writefile task_loading_async.py

import asyncio
import itertools
import time

FPS = 5

async def print_load(msg: str) -> None:
    for smile in itertools.cycle(['(⊙﹏⊙)', '(Ф﹏Ф)']):  # infinite loop
        status = f'\r{smile} {msg}'
        print(status, flush=True, end='')
        
        await asyncio.sleep(1 / FPS)

#         try:
#         except asyncio.CancelledError:
#             print('Task is cancelled')
#             break

    blanks = ' ' * 50
    print(f'\r{blanks}\r', end='')

async def do_smth_slow() -> int:
    await asyncio.sleep(30)
    return 1337

async def supervisor() -> int:
    loading_icon_displayer_task = asyncio.create_task(print_load('loading...'))
    slow_result_task = asyncio.create_task(do_smth_slow)
    slow_result = await slow_result_task
#     await loading_icon_displayer_task
    await slow_result
#     loading_icon_displayer.cancel()
#     return slow_result

def main() -> None:
    res = asyncio.run(supervisor())
    print(f'Result is {res}')

if __name__ == '__main__':
    main()


Overwriting task_loading_async.py


In [61]:
! python task_loading_async.py

Traceback (most recent call last):
  File "task_loading_async.py", line 41, in <module>
    main()
  File "task_loading_async.py", line 37, in main
    res = asyncio.run(supervisor())
  File "D:\Programs\Anaconda\lib\asyncio\runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "D:\Programs\Anaconda\lib\asyncio\base_events.py", line 616, in run_until_complete
    return future.result()
  File "task_loading_async.py", line 29, in supervisor
    slow_result_task = asyncio.create_task(do_smth_slow)
  File "D:\Programs\Anaconda\lib\asyncio\tasks.py", line 382, in create_task
    task = loop.create_task(coro)
  File "D:\Programs\Anaconda\lib\asyncio\base_events.py", line 431, in create_task
    task = tasks.Task(coro, loop=self, name=name)
TypeError: a coroutine was expected, got <function do_smth_slow at 0x00000212D83A54C0>
Task exception was never retrieved
future: <Task finished name='Task-2' coro=<print_load() done, defined at task_loading_async.py:8> exception=Un

In [6]:
%%writefile task_loading_async_solved.py

import asyncio
import itertools
import time

FPS = 5

async def print_load(msg: str) -> None:
    for smile in itertools.cycle(['(⊙﹏⊙)', '(Ф﹏Ф)']):  # infinite loop
        status = f'\r{smile} {msg}'
        print(status, flush=True, end='')

        try:
            await asyncio.sleep(1 / FPS)
        except asyncio.CancelledError:
            print('Task is cancelled')
            break

    blanks = ' ' * 50
    print(f'\r{blanks}\r', end='')

async def do_smth_slow() -> int:
    await asyncio.sleep(30)
    # time.sleep(30)
    return 1337

async def supervisor() -> int:
    loading_icon_displayer = asyncio.create_task(print_load('loading...'))
    slow_result_task = asyncio.create_task(do_smth_slow())
    slow_result = await slow_result_task
    loading_icon_displayer.cancel()
    return slow_result

def main() -> None:
    res = asyncio.run(supervisor())
    print(f'Result is {res}')

if __name__ == '__main__':
    main()

Overwriting task_loading_async_solved.py


In [7]:
! python task_loading_async_solved.py


(⊙﹏⊙) loading...
(Ф﹏Ф) loading...
(⊙﹏⊙) loading...
(Ф﹏Ф) loading...
(⊙﹏⊙) loading...
(Ф﹏Ф) loading...
(⊙﹏⊙) loading...
(Ф﹏Ф) loading...
(⊙﹏⊙) loading...
(Ф﹏Ф) loading...
(⊙﹏⊙) loading...
(Ф﹏Ф) loading...
(⊙﹏⊙) loading...
(Ф﹏Ф) loading...
(⊙﹏⊙) loading...
(Ф﹏Ф) loading...
(⊙﹏⊙) loading...
(Ф﹏Ф) loading...
(⊙﹏⊙) loading...
(Ф﹏Ф) loading...
(⊙﹏⊙) loading...
(Ф﹏Ф) loading...
(⊙﹏⊙) loading...
(Ф﹏Ф) loading...
(⊙﹏⊙) loading...
(Ф﹏Ф) loading...
(⊙﹏⊙) loading...
(Ф﹏Ф) loading...
(⊙﹏⊙) loading...
(Ф﹏Ф) loading...
(⊙﹏⊙) loading...
(Ф﹏Ф) loading...
(⊙﹏⊙) loading...
(Ф﹏Ф) loading...
(⊙﹏⊙) loading...
(Ф﹏Ф) loading...
(⊙﹏⊙) loading...
(Ф﹏Ф) loading...
(⊙﹏⊙) loading...
(Ф﹏Ф) loading...
(⊙﹏⊙) loading...
(Ф﹏Ф) loading...
(⊙﹏⊙) loading...
(Ф﹏Ф) loading...
(⊙﹏⊙) loading...
(Ф﹏Ф) loading...
(⊙﹏⊙) loading...
(Ф﹏Ф) loading...
(⊙﹏⊙) loading...
(Ф﹏Ф) loading...
(⊙﹏⊙) loading...
(Ф﹏Ф) loading...
(⊙﹏⊙) loading...
(Ф﹏Ф) loading...
(⊙﹏⊙) loading...
(Ф﹏Ф) loading...
(⊙﹏⊙) loading...
(Ф﹏Ф) loading...
(⊙﹏⊙) loading

## Примеры на использование тредов и процессов

Модуль [concurrent.futures](https://docs.python.org/3/library/concurrent.futures.html)

Модуль [threading](https://docs.python.org/3/library/threading.html?highlight=threading#module-threading)

Модуль [multiprocessing](https://docs.python.org/3/library/multiprocessing.html?highlight=multiprocessing#module-multiprocessing)

`Часто для выполнение функций на больших массисивах встречал использование multiprocessing.Pool.map и библиотеки joblib`

In [None]:
from multiprocessing import Process

def f(name):
    print('hello', name)

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

In [None]:
help(Process.start)

In [None]:
help(Process.join)

In [None]:
from threading import Thread

def f(name):
    print('hello', name)

if __name__ == '__main__':
    p = Thread(target=f, args=('bob',))
    p.start()
    p.join()

Чтобы закрыть тред, нужно явно передать ему команду. Можно создать объект Event, и делать это через него

    The threading.Event class is Python’s simplest signalling mechanism to coordinate threads. An Event instance has an internal boolean flag which starts as False. Calling Event.set() sets the flag to True. While the flag is false, if a thread calls Event.wait(), it is blocked until another thread calls Event.set(), at which time Event.wait() returns True. If a timeout in seconds is given to Event.wait(s), this call returns False when the timeout elapses

### Разберем примеры **loading** с использованием тредов и процессов

- Заметим, что в threading.Thread по своей логике похож на asyncio.Task
- Чтобы остановить тред, нужно явно передать ему сигнал. Например, done через Event. В asyncio есть Task.Cancell

Как могут сказаться отличия работы с памятью в программах на треды и процессы, если добавить туда относительно тяжелый список? Попробуйте сделать предположение без запуска кода

Что будет, если в серии программ **loading_.** заменить медленную функцию на такую?

In [None]:
def is_prime(n: int) -> bool:
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False
    root = math.isqrt(n)
    for i in range(3, root + 1, 2):
        if n % i == 0:
            return False
    return True

Как это повлияет на работу в версии с разными процессами, тредами или async? Почему? Попробуйте догадаться без запуска кода