## 6. Многопоточность и многозадачность

![Multithreading & Multiprocessing](https://raw.githubusercontent.com/amaargiru/pycore/main/pics/06_Multithreading_Multiprocessing.png)  


### Многопоточность

Процесс — приложение, которому выделена область памяти, недоступная другим приложениям.
Поток — наименьшая сущность, которая может управляться напрямую операционной системой. У потоков нет своей памяти, они пользуются памятью
создавшего их процесса. Потоки ассоциированы с создавшим их процессом. С каждым процессом всегда ассоциирован по меньшей мере один поток,
обычно называемый *главным*.

API модулей threading и multiprocessing похожи.

Вкратце, GIL — ограничение, не позволяющее Python-процессу исполнять более одной команды байт-кода в каждый момент времени. GIL можно
обойти при помощи многопроцессного подхода, т. к. у каждого процесса будет своя GIL.

Многопоточность реализуется модулем Threading. Это нативные Posix-треды, такие треды исполняются операционной системой, а не виртуальной машиной.

В чем отличие тредов от мультипроцессинга?

Главное отличие в разделении памяти. Процессы независимы друг от друга, имеют раздельные адресные пространства, идентификаторы, ресурсы. Треды исполняются в совместном адресном пространстве, имеют общий доступ к памяти, переменным, загруженным модулям.

Какие задачи хорошо параллелятся, какие плохо?

Те задачи, которые порождают долгий IO. Когда тред упирается в ожидание сокета или диска, интерпретатор бросает этот тред и стартует следующий. Это значит, не будет простоя из-за ожидания. Наоборот, если ходить в сеть в одном треде (в цикле), то каждый раз придется ждать ответа.

Однако, если затем в треде обрабатывает полученные данные, то выполняться будет только он один. Это не только не даст прироста в скорости, но и замедлит программу из-за переключения на другие треды.

Короткий ответ: хорошо ложатся на треды задачи по работе с сетью. Например, выкачать данные со ста разных ссылок. Полученные данные обрабатывайте вне тредов.

Нужно посчитать 100 уравнений. Делать это в тредах или нет?

Нет, потому что в этой задаче нет ввода-вывода. Интерпретатор только будет тратить лишнее время на переключение тредов. Сложные математические задачи лучше выносить в отдельные процессы, либо использовать фреймворк для распределенных задач Celery, либо подключать как C-библиотеки.

Понимание что такое heap dump и thread dump.

Понимание многопоточности, способов ей управлять и проблем, с этим связанных (синхронизации, локи, race condition и т.д.);

2. Многопоточность — вариант реализации вычислений, при котором для решения некоторой прикладной задачи запускаются и выполняются несколько независимых потоков вычислений, причём выполнение происходит одновременно или псевдоодновременно. В операционных системах, где термины "поток" и "процесс" различаются, под "потоком" понимают именно поток выполнения (ресурсами же владеет сущность, называемая "процессом"). Обычно применяется для распараллеливания вычислений на несколько вычислителей (процессоров и ядер процессора).

3. Многопроцессность — вариант реализации вычислений, когда для решения некоторой прикладной задачи запускается несколько независимых процессов. В системах, где под процессом понимается сущность, владеющая ресурсами (памятью, открытыми файлами, сетевыми подключениями), несколько процессов запускаются с целью повышения отказоустойчивости приложения, а также с целью повышения безопасности. Т.к. ОС выполняет разделение памяти и прочих ресурсов именно между процессами (в то время как потоки работают в едином адресном пространстве), то а) внезапно упавший (читай — убитый ОС) процесс не уронит остальные; б) если в процессе начал выполняться чужеродный код (например, из-за RCE уязвимости), то он не получит доступ к содержимому памяти в других процессах. Многопроцессность сегодня можно увидеть в браузерах, когда отдельные вкладки выполняются в разных процессах, и упавшая вкладка (из-за js или из-за кривого плагина) тянет за собой не весь браузер, а только себя или еще пару вкладок.

In [None]:
# Однопоточное приложение
import time

COUNT = 100_000_000

def countdown(n):
    while n > 0:
        n -= 1

start = time.time()
countdown(COUNT)    
end = time.time()

print("Count time", end - start)

Count time 3.81453800201416


In [None]:
# Многопоточное приложение, время выполнения будет больше, чем у однопоточного, т. к. добавятся временные затраты на переключение потоков
import time
from threading import Thread

COUNT = 100_000_000

def countdown(n):
    while n > 0:
        n -= 1

t1 = Thread(target=countdown, args=(COUNT//2,))
t2 = Thread(target=countdown, args=(COUNT//2,))

start = time.time()
t1.start()
t2.start()
t1.join()
t2.join()
end = time.time()

print("Count time", end - start)

Count time 3.8378489017486572


In [None]:
# Многопроцессорное приложение
import time
import multiprocessing as mp

COUNT = 100_000_000


def countdown(n):
    while n > 0:
        n -= 1

if __name__ ==  '__main__':
    pool = mp.Pool()
    start = time.time()
    pool.apply_async(countdown, args=(COUNT // 2,))
    pool.apply_async(countdown, args=(COUNT // 2,))
    pool.close()
    pool.join()
    end = time.time()
    print("Count time", end - start)

Count time 2.0029137134552

### Threading

CPython interpreter can only run a single thread at a time. That is why using multiple threads won't result in a faster execution, unless at least one of the threads contains an I/O operation.

```
from threading import Thread, RLock, Semaphore, Event, Barrier
from concurrent.futures import ThreadPoolExecutor
```

### Thread

```
<Thread> = Thread(target=<function>)           # Use `args=<collection>` to set the arguments.
<Thread>.start()                               # Starts the thread.
<bool> = <Thread>.is_alive()                   # Checks if the thread has finished executing.
<Thread>.join()                                # Waits for the thread to finish.
```

Use `'kwargs=<dict>'` to pass keyword arguments to the function.
Use `'daemon=True'`, or the program will not be able to exit while the thread is alive.**

### Lock

```
<lock> = RLock()                               # Lock that can only be released by the owner.
<lock>.acquire()                               # Waits for the lock to be available.
<lock>.release()                               # Makes the lock available again.
```

#### Or:

```
with <lock>:                                   # Enters the block by calling acquire(),
    ...                                        # and exits it with release().
``` 

### Semaphore, Event, Barrier

```
<Semaphore> = Semaphore(value=1)               # Lock that can be acquired by 'value' threads.
<Event>     = Event()                          # Method wait() blocks until set() is called.
<Barrier>   = Barrier(n_times)                 # Wait() blocks until it's called n_times.
```

### Thread Pool Executor
Object that manages thread execution.
An object with the same interface called ProcessPoolExecutor provides true parallelism by running a separate interpreter in each process. All arguments must be [pickable](#pickle).

```
<Exec> = ThreadPoolExecutor(max_workers=None)  # Or: `with ThreadPoolExecutor() as <name>: …`
<Exec>.shutdown(wait=True)                     # Blocks until all threads finish executing.
```

```
<iter> = <Exec>.map(<func>, <args_1>, ...)     # A multithreaded and non-lazy map().
<Futr> = <Exec>.submit(<func>, <arg_1>, ...)   # Starts a thread and returns its Future object.
<bool> = <Futr>.done()                         # Checks if the thread has finished executing.
<obj>  = <Futr>.result()                       # Waits for thread to finish and returns result.
```

### asyncio

https://realpython.com/async-io-python/

Преимущество asyncio — *гранулярность*. Поток будет приостановлен не в момент, более или менее правильно намеченный ОС в соответствии со своими алгоритмами планирования, а в явно помеченной программистом точке.

Сопрограмма asyncio — обычная функция Python, наделенная одной сверхспособностью: приостанавливаться, встретив операцию, для выполнения которой нужно существенное время. Для создания и приостановки сопрограммы нужно использовать ключевые слова async и await. async определяет сопрограмму, а await приостанавливает ее на время выполнения длительной операции.

Важный момент — сопрограмма не выполняется при прямом вызове. Вместо этого возвращается объект сопрограммы, который будет выполнен позже. Чтобы выполнить сопрограмму, мы должны явно передать ее циклу событий.

В JavaScript async / await сделаны жадными как Promise. При вызове async функции автоматически создается задача и отправляется в очередь на исполнение в event loop. await, в свою очередь, просто ждёт результат.

В питоне асинхронщину задизайнили иначе — лениво.

Вызов async функции возвращает объект — корутину, — которая ни чего не делает.

asyncio.run() создаёт event loop, запускает (корневую) корутину и блокирует поток до получения результата.

await запускает корутину изнутри другой корутины в текущем event loop и ждёт результат.

Для запуска корутины без ожидания (как это делает Promise) используется asyncio.create_task(coro). Либо asyncio.gather(*aws), если надо запустить сразу несколько. Нужно только следить, чтобы ссылка на возвращаемое значение сохранялась до конца вычисления, иначе его пожрет GC и все оборвется на самом интересном месте (промис бы отработал до конца не смотря ни на что).

В JS только один event loop, поэтому было вполне разумно закопать его внутрь promise / async / await как деталь реализации, упростив работу прикладному программисту. В питоне отзеркалили более ранний вариант корутин на генераторах, дали возможность использовать разные event loop и выставили все кишки наружу.

Простейший пример, одновременный запуск двух функций, последовательное выполнение которых в "синхронном" мире заняло бы 2 секунды, но в "асинхронном" мире они выполняются приблизительно за 1 секунду.

Без asyncio, просто две функции с имитацией некоторого полезного вычисления и последующего ожидания:

In [1]:
import time
from time import perf_counter

def fun1():
    sumi: int = 0

    for i in range(1000_000):
        sumi += i

    time.sleep(1)

    print(f'Sum: {sumi}')


def fun2():
    producti: int = 1

    for i in range(1, 25):
        producti = i * producti

    time.sleep(1)

    print(f'Product: {producti}')


start_time = perf_counter()

fun1()
fun2()

duration = perf_counter() - start_time
print(f'Total duration: {duration} seconds')

Sum: 499999500000
Product: 620448401733239439360000
Total duration: 2.047113300068304 seconds


С asyncio (синтаксис вызова amain() изменен в связи с некоторыми [особенностями](https://stackoverflow.com/questions/55409641/asyncio-run-cannot-be-called-from-a-running-event-loop-when-using-jupyter-no) использования asyncio в Jupiter Notebook):

In [1]:
import asyncio
from time import perf_counter

async def afun1():
    sumi: int = 0

    for i in range(1000_000):
        sumi += i

    await asyncio.sleep(1)

    print(f'Sum: {sumi}')


async def afun2():
    producti: int = 1

    for i in range(1, 25):
        producti = i * producti

    await asyncio.sleep(1)

    print(f'Product: {producti}')


async def amain():
    task1 = asyncio.create_task(afun1())
    task2 = asyncio.create_task(afun2())

    await task1
    await task2


start_time = perf_counter()

# asyncio.run(amain())
await amain()

duration = perf_counter() - start_time
print(f'Total duration: {duration} seconds')

Sum: 499999500000
Product: 620448401733239439360000
Total duration: 1.0424554999917746 seconds


Пример запуска на исполнение двух асинхронных периодических задач:

In [2]:
import asyncio
from datetime import datetime

async def periodic_fun1(a, b):
    while True:
        await asyncio.sleep(1)
        print(f'periodic_fun1 complete with result {a + b}')


async def periodic_fun2(a, b):
    while True:
        await asyncio.sleep(0.5)
        print(f'periodic_fun2 complete with result {a - b}')


async def main():
    start_time = datetime.now()

    task1 = asyncio.create_task(periodic_fun1(3, 2))
    task2 = asyncio.create_task(periodic_fun2(3, 2))

    await asyncio.sleep(10)

    task1.cancel()
    task2.cancel()

    duration_time = datetime.now() - start_time
    print(f'Total duration time: {duration_time}')


if __name__ == '__main__':
    # asyncio.run(main())
    await amain()  # https://stackoverflow.com/questions/55409641/asyncio-run-cannot-be-called-from-a-running-event-loop-when-using-jupyter-no

Sum: 499999500000
Product: 620448401733239439360000


Пример накопления данных от двух асинхронных периодических задач в одной разделяемой структуре данных asyncio.Queue():

In [7]:
import asyncio
import random
from datetime import datetime

# Пример накопления данных от двух асинхронных периодических задач в одной разделяемой структуре данных asyncio.Queue().

async def produce_small_random(queue):
    while True:
        await asyncio.sleep(0.5)
        r: int = random.randint(1, 9)
        print(f'Small random produced {r}')
        await queue.put(r)


async def produce_big_random(queue):
    while True:
        await asyncio.sleep(1)
        r: int = random.randint(100, 999)
        print(f'Big random produced {r}')
        await queue.put(r)


async def main():
    q = asyncio.Queue()
    start_time = datetime.now()

    small_random_task = asyncio.create_task(produce_small_random(q))
    big_random_task = asyncio.create_task(produce_big_random(q))

    await asyncio.sleep(10)

    small_random_task.cancel()
    big_random_task.cancel()

    # Dumping asyncio.queue into list
    randl: list[int] = []
    while q.qsize() > 0:
        randl.append(await q.get())
        q.task_done()

    duration_time = datetime.now() - start_time

    print(f'Total queue = {randl}')
    print(f'Total duration time: {duration_time}')


if __name__ == '__main__':
    # asyncio.run(main())
    await main()  # https://stackoverflow.com/questions/55409641/asyncio-run-cannot-be-called-from-a-running-event-loop-when-using-jupyter-no

Small random produced 1
Big random produced 929
Small random produced 4
Small random produced 3
Big random produced 967
Small random produced 8
Small random produced 1
Big random produced 622
Small random produced 9
Small random produced 7
Big random produced 891
Small random produced 5
Small random produced 5
Big random produced 820
Small random produced 8
Small random produced 5
Big random produced 771
Small random produced 5
Small random produced 6
Big random produced 289
Small random produced 6
Small random produced 8
Big random produced 873
Small random produced 8
Small random produced 3
Big random produced 127
Small random produced 6
Small random produced 3
Total queue = [1, 929, 4, 3, 967, 8, 1, 622, 9, 7, 891, 5, 5, 820, 8, 5, 771, 5, 6, 289, 6, 8, 873, 8, 3, 127, 6, 3]
Total duration time: 0:00:10.001941
