# Вебинар 7. Лучшие практики по работе с параллельными вычислениями в Python

## Проверка связи

**Если у вас нет звука:**
- убедитесь, что на вашем устройстве
и в колонках включён звук
- обновите страницу или закройте её и заново присоединитесь к вебинару
- откройте вебинар в другом браузере
- перезагрузите ваше устройство
и  попытайтесь войти снова


**Поставьте в чат:**<br>
\+ — если меня видно и слышно<br>
– — если нет

## О спикере

**Дмитрий Клабуков**
- Старший инженер данных в Альфа-банке
- Опыт разработки с 2018 года

## Правила участия

- Приготовьте блокнот и ручку, чтобы записывать важные мысли и идеи
- Продолжительность вебинара — 1 час 30 минут
- Вы можете писать вопросы в чате или задавать их через микрофон
- Запись вебинара будет доступна в личном кабинете

- Отработали на практике работу с многопрограммным комплексом

## Цель занятия

Расширить знания по практике использования параллельных технологий

## План занятия


[1. Последовательное выполнение](#1.-ПОСЛЕДОВАТЕЛЬНОЕ-ВЫПОЛНЕНИЕ)<br>
[2. Потоки](#2.-ПОТОКИ)<br>
[3. Процессы](#3.-ПРОЦЕССЫ)<br>


Фаулер - Конкурентное программирование на Python

## Задачи, ограниченные вводом - выводом (I/O bound)  и задачи, ограниченные быстродействием процессора( CPU Bound задачи)

К задачам, ограниченным вводом-выводом относятся те задачи, в которых в которых большая часть времени тратится на ожидание ответа от сети или другого устройства.

К задачам, ограниченным быстродействием процессора или счетным задачам относят те задачи, в которых проходят вычисления или обработка данных.

## Конкурентность и параллелизм

Конкурентность - свойство системы по которому, выполнение задач, выполняющихся в одно и то же время, может переключаться от одной к другой в зависимости от их доступности к выполнению. Конкурентность возможна, когда несколько задач может работать не-
зависимо друг от друга.

Параллелизм - возможность выполнять задачи одновременно.

Конкурентность бывает вытесняющая или невытесняющая (кооперативная)

Вытесняющая конкурентность  - та, в которая система сама решает, какой задаче отдать управление засчет квантования времени

Кооперативная - та, в которой явно указано в коде в какой момент можно отдать управление другим задачам


## Процессы, потоки, многопроцессность, многопоточность

Процессом называется работающее приложение, которому выделена
область памяти, недоступная другим приложениям. 

Поток - это базовая единица ОС, которой выделяется процессорное время. Поток живет внутри процесса и использует выделенную под процесс память.


In [None]:
import os
import threading
print(f'Исполняется Python-процесс с идентификатором: {os.getpid()}')
total_threads = threading.active_count()
thread_name = threading.current_thread().name

print(f'В данный момент Python исполняет {total_threads} поток(ов)')
print(f'Имя текущего потока {thread_name}')

In [None]:
import threading

print([i.name for i in threading.enumerate()])

## GIL

Глобальная блокировка интерпретатора (global interpreter lock – GIL) - механизм Python, не позволяющий выполнять Python процессу в конкретный момент времени несколько команд кода

## 1. Последовательное выполнение

In [None]:
from time import time
def fact(n: int):
    if n == 1:
        return 1
    else:
        return n * fact(n-1)
        
def some_activity():
    t0 = time()
    for x in range(1_000_000):
        x - (x ** 3/fact(3)) + (x ** 5 / fact(5)) - (x ** 7 / fact(7)) + (x ** 9 / fact(9)) - (x ** 11 / fact(11))


t0 = time()
for _ in range(10):
    some_activity()
print(time() - t0)

In [None]:
import threading
def hello_from_thread():
    print(f'Привет от потока {threading.current_thread()}!')
    
hello_thread = threading.Thread(target=hello_from_thread)
hello_thread.start()
total_threads = threading.active_count()
thread_name = threading.current_thread().name
print(f'В данный момент Python выполняет {total_threads} поток(ов)')
print(f'Имя текущего потока {thread_name}')
hello_thread.join()

In [None]:
import threading
from time import time 

def threaded_activity(n_threads: int) -> float:
    t0 = time()
    threads = [threading.Thread(target=some_activity) for _ in range(n_threads)]
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    return time() - t0

threaded_activity(10)

In [None]:
import requests

def some_network_activity():
    requests.get('https://ya.ru')

t0 = time()
for _ in range(10):
    some_network_activity()
print(time() - t0)

In [None]:
from threading import Thread
from time import time 

def threaded_activity(n_threads: int) -> float:
    t0 = time()
    threads = [Thread(target=some_network_activity) for _ in range(n_threads)]
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    return time() - t0

threaded_activity(10)

In [None]:
import multiprocessing
import os
def hello_from_process():
    print(f'Привет от дочернего процесса {os.getpid()}!')

if __name__ == '__main__':
    hello_process = multiprocessing.Process(target=hello_from_process)

    hello_process.start()
    print(f'Привет от родительского процесса {os.getpid()}')
    hello_process.join()

In [None]:
from multiprocessing import Process, Pool

def processed_activity(n_processes: int) -> float:
    t0 = time()
    processes = [Process(target=some_network_activity) for _ in range(n_processes)]
    for t in processes:
        t.start()
    for t in processes:
        t.join()
    return time() - t0

threaded_activity(10)

## 1. Последовательное выполнение

## Ваши вопросы

## Ваши вопросы

## Итоги занятия

## 3. Асинхронность


In [11]:
def plus_one_gen(number: int) -> int:
    numbers = [1, 2, 3, 5]
    for i in numbers:
        yield i + number

In [12]:
g = plus_one_gen(1)
for i in g:
    print(i)

2
3
4
6


In [13]:
g = plus_one_gen(1)
print(next(g))
print(next(g))
print(next(g))
print(next(g))
print(next(g))

2
3
4
6


StopIteration: 

Расширили знания по практике использования параллельных технологий

In [15]:
import time

def timed(func):
    async def inner(*args, **kwargs):
        t0 = time.time()
        await func(*args, **kwargs)
        print(time.time() - t0)
    return inner

In [17]:
import asyncio


async def waiter_1():
    print("Hi to all from waiter 1")
    await asyncio.sleep(1)
    print("Bye all from waiter 1")

async def waiter_2():
    print("Hi to all from waiter 2")
    await asyncio.sleep(2)
    print("Bye all from waiter 2")

async def waiter_3():
    print("Hi to all from waiter 3")
    await asyncio.sleep(3)
    print("Bye all from waiter 3")

@timed
async def main():
    asyncio.create_task(waiter_1()) # []
    asyncio.create_task(waiter_2())
    await asyncio.create_task(waiter_3())

In [18]:
#asyncio.run(main)

In [19]:

await main()

Hi to all from waiter 1
Hi to all from waiter 2
Hi to all from waiter 3
Bye all from waiter 1
Bye all from waiter 2
Bye all from waiter 3
3.0019381046295166


In [21]:
@timed
async def main():
    await asyncio.create_task(waiter_1()) # [task]
    await asyncio.create_task(waiter_2()) # [task]
    await asyncio.create_task(waiter_3()) # [task]


In [22]:
await main()

Hi to all from waiter 1
Bye all from waiter 1
Hi to all from waiter 2
Bye all from waiter 2
Hi to all from waiter 3
Bye all from waiter 3
6.005086898803711


In [23]:
import asyncio


async def waiter_1():
    print("Hi to all from waiter 1")
    await asyncio.sleep(2.1)
    print("Bye all from waiter 1")

async def waiter_2():
    await asyncio.sleep(1)
    print("Hi to all from waiter 2")
    await asyncio.sleep(2)
    print("Bye all from waiter 2")

async def waiter_3():
    await asyncio.sleep(2)
    print("Hi to all from waiter 3")
    await asyncio.sleep(2)
    print("Bye all from waiter 3")

@timed
async def main():
    asyncio.create_task(waiter_1())
    asyncio.create_task(waiter_2())
    await asyncio.create_task(waiter_3()) #

In [24]:
await main()

Hi to all from waiter 1
Hi to all from waiter 2
Hi to all from waiter 3
Bye all from waiter 1
Bye all from waiter 2
Bye all from waiter 3
4.003917217254639


In [25]:
import asyncio
import time

async def waiter_1():
    print("Hi to all from waiter 1")
    await asyncio.sleep(2.1)
    print("Bye all from waiter 1")

async def waiter_2():
    print("Hi to all from waiter 2")
    time.sleep(5)
    print("Bye all from waiter 2")

async def waiter_3():
    print("Hi to all from waiter 3")
    await asyncio.sleep(2)
    print("Bye all from waiter 3")

@timed
async def main():
    asyncio.create_task(waiter_1())
    asyncio.create_task(waiter_2())
    await asyncio.create_task(waiter_3())

In [26]:
await main()

Hi to all from waiter 1
Hi to all from waiter 2
Bye all from waiter 2
Hi to all from waiter 3
Bye all from waiter 1
Bye all from waiter 3
7.006998062133789


In [27]:
import asyncio


async def waiter_1():
    print("Hi to all from waiter 1")
    await asyncio.sleep(2.1)
    print("Bye all from waiter 1")

async def waiter_2():
    await asyncio.sleep(1)
    print("Hi to all from waiter 2")
    await asyncio.sleep(2)
    print("Bye all from waiter 2")

async def waiter_3():
    await asyncio.sleep(2)
    print("Hi to all from waiter 3")
    await asyncio.sleep(2)
    print("Bye all from waiter 3")

@timed
async def main():
    await asyncio.gather(waiter_1(), waiter_2(), waiter_3())

In [28]:
await main()

Hi to all from waiter 1
Hi to all from waiter 2
Hi to all from waiter 3
Bye all from waiter 1
Bye all from waiter 2
Bye all from waiter 3
4.003663063049316


При использовании asyncio необходимо максимально сократить количество блокирующих синхронных вызовов.
Также необходимо использовать библиотеки, поддерживающие asyncio. Например requests - синхронная бибилиотека, 
под капотом использует синхронный блокирующий вызов. Вместо requests можно воспользоваться aiohttp

## Ваши вопросы