# Продвинутый Python, лекция 12

**Лектор:** Петров Тимур

**Семинаристы:** Петров Тимур, Коган Александра, Бузаев Федор, Дешеулин Олег

**Spoiler Alert:** в рамках курса нельзя изучить ни одну из тем от и до досконально (к сожалению, на это требуется больше времени, чем даже 3 часа в неделю). Но мы попробуем рассказать столько, сколько возможно :)

## Threading

В первую очередь надо понять, что такое потоки (и в чем разница от процессов и корутин)

Процесс - это инстанс программы, где у вас есть выполняемый код и собственные ресурсы памяти. У каждого процесса прямой доступ есть только к своим ресурсам. А поток (thread) - это разделение выполнения внутри процесса, где у всех потоков общий ресурс памяти (память процесса)

Можно ли создавать бесконечное число тредов? Можно сколько угодно сделать (но надо ли?)

Зачем нужны потоки? Ответ простой: таким образом мы можем выполнять одновременно несколько операций (это быстрее) + независимость (в отличие от корутин)

Внутри Python за это отвечает библиотека [threading](https://docs.python.org/3/library/threading.html)

На примере:

In [None]:
import threading
import sys


def thread_job(number):
    print('Hello {}'.format(number)) # Выводим в stdout


def run_threads(count):
    threads = [
        threading.Thread(target=thread_job, args=(i,)) # создаем по потоку, назначаем, что он должен делать, можем отдельно передавать аргументы
        for i in range(0, count)
    ]
    for thread in threads:
        thread.start()  # каждый поток должен быть запущен
    for thread in threads:
        thread.join()  # дожидаемся исполнения всех потоков (можем не дожидаться, дождаться надо, потому что может закончиться родительский тред)


run_threads(4)
print("finish") #А тут уже что-то перемешалось (потому что поток вывода один)

Hello 0
Hello 1
Hello 2Hello 3

finish


### Mutex

Давайте попробуем запустить вот такой код:

In [None]:
import threading
import random
import time
import sys


def thread_job():
    global counter
    old_counter = counter
    time.sleep(random.randint(0, 1)) #засыпаем, чтобы вызвать проблему
    counter = old_counter + 1
    print('{} '.format(counter), end='')
    sys.stdout.flush()


counter = 0
threads = [threading.Thread(target=thread_job) for _ in range(4)]
for thread in threads:
    thread.start()
for thread in threads:
    thread.join()
print(counter)

1 1 1 1 1


Почему-то не получается 4. А как вы думаете, почему? А все потому что поток принимает текущее значение global переменной и добавляет 1 и возвращает результат. То есть таким образом они перезаписывают результат и все, мы получаем вот такую фигню. Такое состояние называется race condition

Как решить подобную проблемы? Для этого есть Lock (более частое название - mutex). Что такое mutex? По существу, это замочек

Когда поток забирает данные, мы говорим: "Я забронил, вы не можете прикасаться к этим данным". Когда же мы сделали все операции, мы освобождаем данные и говорим: ну все, можно пользоваться. На картинке:

![](https://camo.githubusercontent.com/aff3fa583e71fd028e5850a0513f59805d1b758cd6acc03c63e2dc15d49e3ada/687474703a2f2f616e746b6f7277696e2e636f6d2f636f6e63757272656e63792f646961672d30363732383334613737333762623332333939306161626533626362356365362e706e67)



Внутри Threading есть имплементация мьютекса, который называется Lock() и с ним очень удобно работать через контекстный менеджер (заходим - блокируем, выходим - освобождаем)

In [None]:
import threading
import random
import time
import sys


def thread_job():
    with lock: # Входим и блокируем ресурсы
        global counter
        old_counter = counter
        time.sleep(random.randint(0, 1))
        counter = old_counter + 1
        print('{} '.format(counter), end='')
        sys.stdout.flush()


lock = threading.Lock()
counter = 0
threads = [threading.Thread(target=thread_job) for _ in range(8)]
for thread in threads:
    thread.start()
for thread in threads:
    thread.join()
print(counter)

1 2 3 4 5 6 7 8 8


Если хочется делать руками блокирование - разблокирование, то есть две функции:

* acquire() - локаем

* release() - делаем разлок

Но обратите внимание - если не делать разлок, то все остальные треды будут в максимальном ожидании и ничего не случится (потому что ресурсы заблокированы другим тредом)

In [None]:
import threading
import random
import time
import sys


def thread_job():
    lock.acquire()
    global counter
    old_counter = counter
    time.sleep(random.randint(0, 1))
    counter = old_counter + 1
    print('{} '.format(counter), end='')
    sys.stdout.flush()


lock = threading.Lock()
counter = 0
threads = [threading.Thread(target=thread_job) for _ in range(8)]
for thread in threads:
    thread.start()
for thread in threads:
    thread.join() # будем ждать бесконечность
print(counter)

1 

KeyboardInterrupt: ignored

Но есть более лучшие вещи, например, очереди! (которая является thread-safe, то есть можно использовать спокойно для потоков)

Идея: создаем очередь на вход и выход. Забираем тред из очереди на вход, кидаем его в очередь на выход


In [None]:
import queue

def adder(arr, part_id, thread_count, queue_out):
    queue_out.put(sum(arr[i] for i in range(part_id, len(arr), thread_count)))


def sum_threads(arr, thread_count):
    queue_out = queue.Queue()
    threads = [
        threading.Thread(target= lambda i=i: adder(arr, i, thread_count, queue_out))
        for i in range(thread_count)
    ]
    for thread in threads:
        thread.start()
    result = []
    for thread in threads:
        result.append(queue_out.get())
        thread.join()
    return sum(result)

In [None]:
arr = [1 for _ in range(10**7)]

In [None]:
%%time
sum(arr[i] for i in range(len(arr)))

CPU times: user 840 ms, sys: 0 ns, total: 840 ms
Wall time: 844 ms


10000000

In [None]:
%%time

sum_threads(arr, 4)

CPU times: user 861 ms, sys: 8.54 ms, total: 870 ms
Wall time: 989 ms


10000000

А почему не в 4 раза улучшение?..

### GIL и проблемы

![](https://uwpce-pythoncert.github.io/SystemDevelopment/_images/gil.png)

На самом деле CPython - популярная реализация интерпретатора - имеет встроенный механизм, который обеспечивает выполнение ровно одного потока в любой момент времени. GIL облегчает реализацию интерпретатора, защищая объекты от одновременного доступа из нескольких потоков. По этой причине, создание несколько потоков не приведет к их одновременному исполнению на разных ядрах процессора. (и в этом проблема большая, Python не предназначен для этого)

То есть распараллелить подсчеты в помощью Python невозможно... можно задать логичный вопрос, а нафига оно тогда? Ответ простой: просто не для такой работы он нужен)

In [None]:
import requests

urls = ['http://wiki.cs.hse.ru/Заглавная_страница', 'https://ya.ru/', 'https://docs.python.org/3/library/multiprocessing.html',
        'https://colab.research.google.com/', 'https://www.youtube.com/', 'https://mail.ru/']

In [None]:
def read_url(url):
    return requests.get(url).text

In [None]:
%%time

for url in urls:
    read_url(url)

CPU times: user 137 ms, sys: 19.2 ms, total: 156 ms
Wall time: 4.87 s


In [None]:
%%time

threads = [
    threading.Thread(target=lambda url=url: read_url(url))
    for url in urls
]
for thread in threads:
    thread.start()
for thread in threads:
    thread.join()

CPU times: user 128 ms, sys: 16.1 ms, total: 144 ms
Wall time: 1.56 s


То есть параллелим мы запросы, которые требуют "ничего не ожидания" (а в таких случаях GIL отпускается)

В каких еще случаях GIL отпускается?

* Задачи, связанные с вводом-выводом (чтение файла)

* Операции с базами данных

* HTTP-запросы

## Multiprocessing

Библиотека [multiprocessing](https://docs.python.org/3/library/multiprocessing.html) позволяет организовать параллелизм вычислений за счет создания подпроцессов. Так как каждый процесс выполняется независимо от других, этот метод параллелизма позволяет избежать проблем с GIL.

В чем разница? В данном случае внутри процесса создаем подпроцессы (дробим процесс, то есть и память и все остальное). И в этом смысле мы реально делим на ядра (каждое ядро выполняет свою часть). Разбивая процесс на подпроцессы мы создаем отдельно интерпретаторы для каждой части, которые выполняются независимо (такая реальная параллельность), но при этом они могут друг с другом перекидываться данными

Число процессов ограничивается число ядер. Как узнать число ядер:

In [None]:
import multiprocessing as mp

print(mp.cpu_count())

2


Давайте создавать различные процессы!

In [None]:
result = []

def f():
    result.append("Hi")

processes = [mp.Process(target=f) for _ in range(2)]
for p in processes:
    p.start()
for p in processes:
    p.join()
print(result)

[]


А чего это ничего не получилось? Потому что каждый процесс - это процесс (соответственно, своя память, свои локальные переменные, обмена никакого нет, получаем пустоту)

Как бы этого избежать? Для этого есть такая штука как Pool() - это связанный набор воркеров, которые обмениваются друг с другом данными и полученными результатами:

In [None]:
def f(x):
    return x ** 2

with mp.Pool() as pool:
    result = pool.map(f, range(10)) # аналог функции map, только для процессов
result

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

In [None]:
arr = [1 for _ in range(10**7)]

In [None]:
%%time

sum(arr[i] for i in range(len(arr)))

CPU times: user 903 ms, sys: 0 ns, total: 903 ms
Wall time: 913 ms


10000000

In [None]:
size = len(arr)
process_count = mp.cpu_count()
part_size = size // process_count
array_parts = [
    arr[i * part_size: (i + 1) * part_size]
    for i in range(process_count)
]

In [None]:
%%time
with mp.Pool(process_count) as pool:
    pool.map(sum, array_parts)

CPU times: user 321 ms, sys: 284 ms, total: 605 ms
Wall time: 1.95 s


Получили не сильно больше, а почему? А потому что обмен данными между подпроцессами - удовольствие дорогое (как можно видеть по sys)

Зачем это все тогда нужно? Это нужно, если вы хотите запустить несколько процессов, которые друг с другом не связаны, например, и таким образом вам не надо передавать много данных (а задачи сами по себе сложные)

## Асинхронность

Что такое асинхронность? Представим себе программу, которая запрашивает данные на каком-либо сервере. Отправляется запрос... ждем ... ждем ... и получаем ответ.

Что происходит в момент ожидания? Ничего, мы просто ждем результата. А вообще было бы классно не тратить время попусту (как и ресурсы), а делать что-то еще (как будто наше ожидание происходит на фоне). В этом и суть асинхронности!

У нас есть несколько функций, которые мы вызываем, но не ожидаем результат прямо сейчас. Внутри асинхронного программирования - идея конкурентности (concurrency) — две или более задачи могут запускаться, выполняться и завершаться в перекрывающиеся периоды времени.

Для этого в Python есть библиотека [asyncio](https://docs.python.org/3/library/asyncio.html), которая основана на корутинах. Корутины - это некоторая функция, которая может начинаться, приостанавливаться и завершаться в произвольный момент времени.

Давайте на примере:

In [None]:
import asyncio

async def compute(a, b):
    print('Compute...')
    await asyncio.sleep(1.0)
    return a + b

async def print_sum(a, b):
    result = await compute(a, b)
    print('{} + {} = {}'.format(a, b, result))

<coroutine object print_sum at 0x7fb19d0708c0>

In [None]:
import asyncio

async def compute(a, b):
    print('Compute...')
    await asyncio.sleep(1.0)
    return a + b

async def main():
    a, b = 1, 2
    result = await compute(a, b)
    print('{} + {} = {}'.format(a, b, result))

asyncio.run(main())

Compute...
1 + 2 = 3


Что здесь происходит?

Мы создали две корутины, compute и print_sum (указывается с помощью слова async)

Далее мы запустили наше выполнение с помощью так называемого event_loop, который берет нашу функцию, и запускает ее. В свою очередь, наша корутина compute запускается и находится в режиме ожидания, пока не выполнится compute() (это делается с помощью слова await). После ожидания в секунду, compute выводит ответ, который передается к print_sum (напоминаю, все это время print_sum выполняется) и далее выводит ответ

Общая схема выглядит вот так:

![](https://camo.githubusercontent.com/de86a2c33affd5101ddc77b69a274823e643bda2/687474703a2f2f6e746f6c6c2e6f72672f7374617469632f696d616765732f74756c69705f636f726f2e706e67)

Понятное дело, что сейчас собрали такой игрушечный пример (какой сейчас был смсл так сделать). Но таким образом можно запустить выполнение и ждать результата, например, от нескольких функций etc. Давайте на вот таком примере:

In [None]:
import asyncio

async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        print(f"Task {name}: Compute factorial({i})...")
        await asyncio.sleep(1)
        f *= i
    print(f"Task {name}: factorial({number}) = {f}")
    return f

res = await asyncio.gather(
    factorial("A", 2),
    factorial("B", 3),
    factorial("C", 4),
    return_exceptions= True # как обрабатывать ошибки, если что-то упадет
)
print(res)


Task A: Compute factorial(2)...
Task B: Compute factorial(2)...
Task C: Compute factorial(2)...
Task A: factorial(2) = 2
Task B: Compute factorial(3)...
Task C: Compute factorial(3)...
Task B: factorial(3) = 6
Task C: Compute factorial(4)...
Task C: factorial(4) = 24
[2, 6, 24]


Что у нас происходит здесь и что добавилось?

* gather - запусти все, что мы перечислили, асихнронно

Что мы видим по выводу?

У нас есть таска A, B, C. В линейной логике мы бы вначале посчитали A, потом B, потом C (и все время ожидания сложилось бы). Как это здесь работает?

Идем в задачу A, что-то сделали, уходим в режим ожидания. Пока ожидаем, можно взять и что-то сделать в таске B, уходим, можем сделать что-то в таске С. Дождались внутри A, делаем ее и так далее. Таким образом, мы делаем 3 таски в одно время

![](https://pythonru.com/wp-content/uploads/2021/05/ispolzovanii-asinhronnosti.png)

### Ограничение ожидания

Отлично, можем делать что-то асинхронно, но что, если у нас один из вызовов затупил настолько, что мы не хотим ждать и пойти делать дальше что-то без него (скажем, какой-то сервер упал, ответа от него мы не дождемся)?

Для этого есть timeout - ограничение на время ответа

In [None]:
import asyncio

async def eternity():
    await asyncio.sleep(3600)
    print('yay!')


# Wait for at most 1 second
try:
    await asyncio.wait_for(eternity(), timeout=1.0) # Отдельная функция wait_for, то есть ждем функцию, но ограничиваем по времени
except asyncio.TimeoutError: # если выплюнул ошибку ожидания, то делаем то или иное
    print('timeout!')


timeout!


Что происходит на уровне корутины? Он ловит ошибку, убивается и закрывается. Но внутри той же самой корутины можно сделать что-то еще с этой ошибкой (и это нормально)

In [None]:
import asyncio

async def eternity():
    try:
        await asyncio.sleep(3600)
        print('yay!')
    except asyncio.CancelledError:
        print("closed")
        await asyncio.sleep(5)

# Wait for at most 1 second
try:
    await asyncio.wait_for(eternity(), timeout=1.0) # Отдельная функция wait_for, то есть ждем функцию, но ограничиваем по времени
except asyncio.TimeoutError: # если выплюнул ошибку ожидания, то делаем то или иное
    print('timeout!')


closed
timeout!


А ежели мы хотим сделать так, чтобы она все-таки доработала, но скинула ошибку того, что она работает долго? На этот случай есть shield (защита от отмены таска):

In [None]:
import asyncio

async def eternity():
    await asyncio.sleep(5)
    print('yay!')

# Wait for at most 1 second
try:
    await asyncio.wait_for(asyncio.shield(eternity()), timeout=1.0) # Отдельная функция wait_for, то есть ждем функцию, но ограничиваем по времени
except asyncio.TimeoutError: # если выплюнул ошибку ожидания, то делаем то или иное
    print('timeout!')
    await asyncio.sleep(5)

timeout!
yay!


### Таски

У нас есть ожидание, есть запуски и так далее. А нельзя это как-то обернуть в сущность типа задачи, которую можно вызвать позже сразу как результат? Можно! Для этого есть объект Task, который создается с помощью функции create_task() (и отсчет начинается уже в момент создания таски):

In [None]:
import asyncio
import time

async def say(word, delay):
    print("heh" + word)
    await asyncio.sleep(delay)
    print(word)

t_1 = asyncio.create_task(say("Hi_1", 2))
t_2 = asyncio.create_task(say("Hi_2", 1))

print(time.strftime("%X"))

await t_1
await t_2

print(time.strftime("%X"))

16:41:22
hehHi_1
hehHi_2
Hi_2
Hi_1
16:41:24


### Забираем результаты как придут

Хорошо, у нас есть несколько корутин, умеем запускать с помощью gather, а как сразу забирать результаты, не ожидая окончания каждого из них? На это есть as_completed:

In [None]:
async def fact(number):
    res = 1
    for i in range(2, number + 1):
        await asyncio.sleep(1)
        res *= i
    return number, res

for i, future in enumerate(asyncio.as_completed([fact(4), fact(3), fact(2)])):
    number, result = await future
    print(f"Factorial({number}) = {result}")

Factorial(2) = 2
Factorial(3) = 6
Factorial(4) = 24


## А что и когда вообще использовать?

* Сложные задачи, мало коннектятся друг с другом, есть ресурсы - multiprocessing

* Остальное - комбинация threading и asyncio

## Попугай дня

![](https://do-slez.com/uploads/posts/2020-02/1582813051_pesquets-dracula-parrots-birds-new-guinea-1-5e55392f17e1e__700.jpg)

Сегодня у нас очень красивый орлиный попугай (орлиный за счет его клюва) и сходит в семейство щетиноголовых попугаев (видите какая щетина у него прям)

![](https://img.theepochtimes.com/assets/uploads/2020/05/14/Dracula-Parrot-i.jpg)

Исторически живет в Новой Гвинее и его очень редко можно встретить в зоопарках из-за очень прихотливого питания (им обязательно нужны тропические фрукты для ферментации) и требований к содержанию (температура, влажность)

Еще один, к сожалению, вымирающий вид, потому что на них охотятся индейцы за их красные перья (хоть и достаточно распространен в авикультуре, но выращивать его очень сложно).