https://realpython.com/python-concurrency/

In [18]:
print("Лекція 5: потоки, процеси та асинхронність у Python")

Лекція 5: потоки, процеси та асинхронність у Python


## Concurrency (паралелізм) у Python

---

![тематична заставка](./media/parallel_computing.jpg)


Що робить програма, поки чекає відповіді від сервера або чекає, поки файл прочитається з жорсткого диска? Зазвичай у такі моменти застосунок нічого не робить та просто чекає на результат операції. Це не є проблемою, якщо не потрібно прискорити роботу програми. Особливо це критично для ситуацій, коли програма робить багато дуже схожих блокуючих викликів (запитів на віддалений сервер, операцій читання/запису файлів). У такі моменти хочеться оптимізувати роботу програми та виконувати кілька операцій, що блокують, паралельно. Код, який виконує паралельно кілька завдань, називається асинхронним. Найпростіший спосіб реалізувати асинхронність — це виконувати завдання в окремих потоках всередині одного процесу.

__Процес__ — область пам'яті (віртуальна) + набір ресурсів + 1 та більше потоків.

__Потік__ — послідовність інструкцій та системних викликів усередині процесу.

Усі потоки мають доступом до всіх ресурсів свого процесу. Всі процеси ізольовані один від одного та будь-яка міжпроцесна взаємодія відбувається лише через операції введення/виводу (системні виклики).

Завдання, які виконують операції введення/виводу (читання/запис файлів, запити в мережі тощо), називаються IO (Input Output)-bound завданнями. Досягти паралелізму виконання IO завдань у Python можна, використовуючи потоки.

Однак треба пам'ятати, що асинхронний код завжди на порядок складніше для розуміння та налагодження. Для багатопотокових застосунків складно писати тести та складно перевіряти всілякі ситуації, які відбуваються рідко та залежать від порядку викликів у різних потоках. Загальне правило для програмування будь-якою мовою: якщо є можливість обійтися синхронним кодом, то треба таким чином зробити.

Інший тип блокуючих викликів — це важкі з погляду обчислень операції. Реальна програма завжди повинна якось відгукуватися на дії користувача та якщо ваша програма під час виконання складних обчислень перестає відповідати на запити, то користувач може вирішити, що програма просто зависла. Виходить, що для зручності користувача програма повинна відповідати на запити, навіть коли робить якісь складні та довгі обчислення.

Такі завдання називаються CPU-bound задачами. Як й для IO-bound завдань можна винести виконання блокуючих операцій (складних обчислень) до окремого потоку, щоб програма продовжувала взаємодіяти з користувачем, проводячи обчислення.

Потокам управління передає операційна система (як й передача управління процесам). Це означає, що у будь-який момент перед будь-яким викликом ОС (Операційна Система) може призупинити виконання коду потоку та почати виконувати код іншого потоку, щоб потім точно так само призупинити та його для передачі управління далі.

До появи багатоядерних процесорів справжній паралелізм був неможливим. Звичайно, коли керування передається різним потокам по кілька тисяч разів на секунду, з погляду користувача це виглядає як паралельне виконання кількох завдань. У сучасних процесорах зазвичай є щонайменше два ядра та тепер ми можемо писати код, який виконується дійсно паралельно. Це з одного боку додає можливостей, але й додає складнощів, оскільки тепер треба бути ще уважнішими при написанні асинхронного коду, адже припуститися помилки ще простіше.


Паралелізм (у контексті мов програмування) означає одночасне виникнення. У Python речі, які відбуваються одночасно, називаються різними іменами (потік, завдання, процес), але на високому рівні всі вони посилаються на послідовність інструкцій, які виконуються по черзі.

Таким чином, до "одночасного" виконання відносять декілька понять:
- thread (потік);
- task (завдання);
- process (процес).

Можe здаватися дивним, чому Python використовує різні слова для того самого поняття. Виявляється, потоки, завдання та процеси однакові, лише якщо розглядати їх з високого рівня. Коли починається занурення до деталей, усі вони представляють насправді різні речі. У міру проходження прикладів побачите, чим вони відрізняються.


Важливо розуміти, що коли використовуємо слова "одночасне виконання", то когнітивно ми міркуємо саме про ОДНОЧАСНЕ виконання. Але для потоків (thread) та завдань (task у розумінні асинхронного програмування) це не так. Вони працюють на одному процесорі, тому працюють лише по одному в один момент. Просто вони вміють працювати так, що це дозволяє пришвидшити загальний час виконання програми. Водночас, вони не використовують паралельні обчислення, проте все одно їх називать паралельними.

### Як працює багатозадачність у операційних системах.

Докладно — [тут](https://uk.wikipedia.org/wiki/%D0%91%D0%B0%D0%B3%D0%B0%D1%82%D0%BE%D0%B7%D0%B0%D0%B4%D0%B0%D1%87%D0%BD%D1%96%D1%81%D1%82%D1%8C#:~:text=%D0%A1%D0%B8%D1%81%D1%82%D0%B5%D0%BC%D0%B0%20%D0%BE%D1%80%D0%B3%D0%B0%D0%BD%D1%96%D0%B7%D0%BE%D0%B2%D1%83%D1%94%20%D1%87%D0%B5%D1%80%D0%B3%D0%B8%20%D0%B7%D0%B0%D0%B2%D0%B4%D0%B0%D0%BD%D1%8C%20%D1%82%D0%B0%D0%BA,%D0%BC%D0%BE%D0%B6%D1%83%D1%82%D1%8C%20%D0%B0%D0%BA%D1%82%D0%B8%D0%B2%D1%83%D0%B2%D0%B0%D1%82%D0%B8%D1%81%D1%8F%2C%20%D0%B4%D0%B5%D0%B0%D0%BA%D1%82%D0%B8%D0%B2%D1%83%D0%B2%D0%B0%D1%82%D0%B8%D1%81%D1%8F%20%D1%96%20%D0%B2%D1%96%D0%B4%D0%B4%D0%B0%D0%BB%D1%8F%D1%82%D0%B8%D1%81%D1%8F).

Будемо звертати увагу тільки на ті моменти, які необхідні у рамках розуміння основної теми заняття.

Один з методів реалізації багатозадачності — витискальна багатозадачність. Докладніше — [тут](https://uk.wikipedia.org/wiki/%D0%92%D0%B8%D1%82%D0%B8%D1%81%D0%BA%D0%B0%D0%BB%D1%8C%D0%BD%D0%B0_%D0%B1%D0%B0%D0%B3%D0%B0%D1%82%D0%BE%D0%B7%D0%B0%D0%B4%D0%B0%D1%87%D0%BD%D1%96%D1%81%D1%82%D1%8C).

Суть її дуже проста. Операційна система в один момент часу обробляє лише одне завдання та дуже швидко перемикається між різними завданнями.

![витісняюча багатозадачність](./media/mulitask.png)

 Витісняюча баготозадачність буває випереджаюча та кооперативною.

 Випереджаюча багатозадачність зручна тим, що коду в потоці не потрібно нічого робити, щоб здійснити перемикання. Також це може бути важко через фразу «будь-коли». Це перемикання може статися в середині одного оператора Python, навіть такого тривіального, як x = x + 1. Це те, як працюють потоки — threading.

Asyncio, з іншого боку, використовує кооперативну багатозадачність. Завдання повинні співпрацювати, оголошуючи, коли вони готові до вимкнення. Це означає, що код завдання має трохи змінитися, щоб це сталося.

Перевага виконання цієї додаткової роботи наперед полягає у тому, що ви завжди знаєте, куди ваше завдання буде замінено. Його не буде замінено в середині оператора Python, якщо цей оператор не позначено. Пізніше побачите, як це може спростити частини вашого дизайну.

## IO-bound: порівняємо різні підходи до вирішення.

Порівняємо використання різних підходів до вирішення одного та того ж класичного IO-bound — завантаженні вмісту через мережу. У нашому прикладі ми будемо завантажувати веб-сторінки з кількох сайтів, але насправді це може бути будь-який мережевий трафік. Це просто легше візуалізувати та налаштувати за допомогою веб-сторінок.


#### Синхронна версія.

In [19]:
import requests
import functools
import time


def get_duration(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        res = func(*args, **kwargs)
        duration = time.time() - start_time
        print(f"Duration = {duration}")
        return res

    return wrapper


def download_site(url, session, count):
    with session.get(url) as response:
        print(f"{count} --> Read {len(response.content)} from {url}")


@get_duration
def download_all_sites(sites):
    with requests.Session() as session:
        for count, url in enumerate(sites):
            download_site(url, session, count)


sites = [
            "http://www.testingmcafeesites.com/index.html",
            "https://www.jython.org"
        ] * 200

Як бачимо, це досить коротка програма.

get_duration() — простий декоратор, який допоможе визначати час виконання нашого коду. Він друкує, скільки часу тривав цей процес, щоб ви могли отримати задоволення від того, наскільки нам допоміг паралелізм у наступних прикладах.

Функція download_site() — просто завантажує вміст з URL-адреси та друкує розмір. Слід зазначити одну незначну річ: у прикладі використовується Session об’єкт з модуля requests. Можна просто використовувати напряму get() з requests, але створення Session об’єкта дозволяє requests виконувати деякі хитрощі мережевих трюків та дійсно пришвидшити роботу.

Функція download_all_sites() створює Session, а потім переглядає список сайтів, завантажуючи кожен з них по черзі. Вбудовану функцію enumerate використовується для того, щоб побачити послідовність виконання запитів до сайтів.

Запустіть код нижче (не забувайте перед цим виконати усі попередні блоки коду).

In [20]:
a = download_all_sites(sites)

0 --> Read 28634 from http://www.testingmcafeesites.com/index.html
1 --> Read 10782 from https://www.jython.org
2 --> Read 28634 from http://www.testingmcafeesites.com/index.html
3 --> Read 10782 from https://www.jython.org
4 --> Read 28634 from http://www.testingmcafeesites.com/index.html
5 --> Read 10782 from https://www.jython.org
6 --> Read 28634 from http://www.testingmcafeesites.com/index.html
7 --> Read 10782 from https://www.jython.org
8 --> Read 28634 from http://www.testingmcafeesites.com/index.html
9 --> Read 10782 from https://www.jython.org
10 --> Read 28634 from http://www.testingmcafeesites.com/index.html
11 --> Read 10782 from https://www.jython.org
12 --> Read 28634 from http://www.testingmcafeesites.com/index.html
13 --> Read 10782 from https://www.jython.org
14 --> Read 28634 from http://www.testingmcafeesites.com/index.html
15 --> Read 10782 from https://www.jython.org
16 --> Read 28634 from http://www.testingmcafeesites.com/index.html
17 --> Read 10782 from https:/

Синхронна версія працює. Запам'ятайте час, на протязі якого тривало виконання цього блоку коду.
Ця версія коду проста у написанні, проста у налагоджуванні та у виконанні. Але, як побачимо далі, це буде найповільніший варіант.

Однак бути повільнішим не завжди є великою проблемою. Якщо програма, яку ви запускаєте, займає лише 2 секунди з синхронною версією та запускається рідко, можливо, не варто додавати паралелізм. Зупинимося тут детальніше.

Діаграма обробки для цієї програми буде виглядати наступним чином:

![синхронна версія, діаграма роботи](media/IO_bound_task.png)

#### Потоки. Threading-версія.

Написання потокової програми вимагає більше зусиль. Однак ви можете бути здивовані тим, як мало додаткових зусиль потрібно для простих випадків. Ось як виглядає та сама програма з threading:

In [21]:
import concurrent.futures
import requests
import threading
import time

thread_local = threading.local()


def get_duration(func):
    @functools.wraps(func)
    def wraper(*args, **kwargs):
        start_time = time.time()
        res = func(*args, **kwargs)
        duration = time.time() - start_time
        print(f"Duration = {duration}")
        return res

    return wraper


def get_session():
    if not hasattr(thread_local, "session"):
        thread_local.session = requests.Session()
    return thread_local.session


def download_site(site_num):
    session = get_session()
    with session.get(site_num[1]) as response:
        print(f"{site_num[0]}: read {len(response.content)} from {site_num[1]}")


@get_duration
def download_all_sites(sites):
    with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
        executor.map(download_site, sites)


sites = [
            "http://www.testingmcafeesites.com/index.html",
            "https://www.jython.org"
        ] * 200

sites_num = [(num, site) for num, site in enumerate(sites)]

При додаванні потоків, загальна структура залишається тією самою та потрібно лише внести кілька змін.

Функцію download_all_sites() змінено від виклику функції один раз для сайту до більш складної структури.

У цій версії ми створюємо інстанс класу ThreadPoolExecutor, що здається складною річчю. Розглянемо це детальніше:

ThreadPoolExecutor = Thread + Pool + Executor.

Thread частина — це лише потік, який ми згадували раніше.

Далі – Pool. Це те, де починає ставати цікавим. Цей об’єкт збирається створити пул потоків (деяку кількість), кожен з яких може працювати одночасно. Нарешті, Executor це та частина, яка контролюватиме, як та коли кожен з потоків у пулі буде працювати. Він виконає запит у пулі.

До речі, стандартна бібліотека реалізує клас ThreadPoolExecutor як менеджер контексту, щоб ви могли використовувати with синтаксис для керування створенням та звільненням пулу Threads.

Маючи ThreadPoolExecutor, можна використовувати його зручний метод .map(). Цей метод запускає передану функцію на кожному з сайтів у списку. Головне, що він автоматично запускає їх одночасно, використовуючи пул потоків, якими він керує.

У Python є досить велика кількість функціоналу, який керує деталями. У модулі threading є такі інструменти як Thread.start(), Thread.join() та Queue.

Усе це все ще є й ви можете використовувати їх для досягнення точного контролю над тим, як виконуються ваші потоки. Але, починаючи з Python 3.2, стандартна бібліотека додала абстракцію вищого рівня під назвою Executors, яка керує багатьма деталями за вас, якщо вам не потрібен цей тонкий контроль, який ми й використовуємо зараз.

Ще одна цікава зміна у нашому прикладі полягає в тому, що кожен потік повинен створити свій власний об’єкт requests.Session(). Коли ви переглядаєте документацію для requests, це не обов’язково легко зрозуміти, але читаючи цю проблему, здається досить очевидним, що потрібен окремий сеанс для кожного потоку.

Це одне з цікавих й складних питань з threading. Оскільки операційна система контролює, коли ваше завдання переривається та починається інше завдання, будь-які дані, якими спільно користуються потоки, мають бути захищені або безпечні для потоків. На жаль, requests.Session(), це потоконебезпечний.

Існує кілька стратегій потокобезпечного доступу до даних залежно від того, які дані та як ви їх використовуєте. Одним з них є використання потокобезпечних структур даних, як Queue з queue модуля Python.

Ці об’єкти використовують низькорівневі примітиви, threading.Lock, щоб гарантувати, що лише один потік може отримати доступ до блоку коду або частини пам’яті одночасно. Ви використовуєте цю стратегію опосередковано через об’єкт ThreadPoolExecutor.

Ще одна стратегія, яку тут можна використовувати, — це локальне зберігання потоків. threading.local() створює об’єкт, який виглядає як глобальний, але специфічний для кожного окремого потоку. У вашому прикладі це робиться за допомогою thread_local та get_session():

local() є у складі threading модуля, щоб конкретно вирішити цю проблему. Це виглядає трохи дивно, але ви хочете створити лише один з цих об’єктів, а не по одному для кожного потоку. Сам об’єкт піклується про розділення доступу з різних потоків до різних даних.

Коли get_session() викликається, session пошук залежить від конкретного потоку, у якому він працює. Таким чином, кожен потік створить окремий сеанс під час першого виклику get_session(), а потім просто використовуватиме цей сеанс під час кожного наступного виклику протягом усього часу існування.

Нарешті, коротке зауваження щодо вибору кількості потоків. Ми бачимо, що приклад коду використовує 5 потоків. Поексперементуйте з цим числом та подивитися, як зміниться загальний час. Можна очікувати, що один потік на кожне завантаження буде найшвидшим (тобто, 400 потоків у нашому випадку), але, принаймні в даних прикладах, це не так. Найшвидші результати десь між 30 та 60 потоками. Якщо піти далі, то додаткові накладні витрати на створення та знищення потоків стирають будь-яку економію часу — при великих кількостях потоків.

Складна відповідь полягає у тому, що правильна кількість потоків не є постійною для кожного завдання. Потрібні деякі експерименти.

Запустіть функцію download_all_sites() та подивіться на час виконання.

In [22]:
download_all_sites(sites_num)

7: read 10782 from https://www.jython.org
1: read 10782 from https://www.jython.org
3: read 10782 from https://www.jython.org
13: read 10782 from https://www.jython.org
21: read 10782 from https://www.jython.org
9: read 10782 from https://www.jython.org
5: read 10782 from https://www.jython.org
17: read 10782 from https://www.jython.org
19: read 10782 from https://www.jython.org
15: read 10782 from https://www.jython.org
11: read 10782 from https://www.jython.org
23: read 10782 from https://www.jython.org
25: read 10782 from https://www.jython.org
27: read 10782 from https://www.jython.org
31: read 10782 from https://www.jython.org
6: read 28634 from http://www.testingmcafeesites.com/index.html
29: read 10782 from https://www.jython.org
0: read 28634 from http://www.testingmcafeesites.com/index.html
33: read 10782 from https://www.jython.org
18: read 28634 from http://www.testingmcafeesites.com/index.html
12: read 28634 from http://www.testingmcafeesites.com/index.html
14: read 28634 f

Зверніть увагу на швидкість. Вона значно перевищує синхронну версію.

Можна побачити, що черга виконання запитів вже не така лінійна, як у синхронній версії.

Так виглядає діаграма роботи багатопотокової версії:

![діаграма роботи багтопотокової версії](media/threads.png)

Він використовує кілька потоків, щоб мати декілька відкритих запитів до веб-сайтів одночасно, що дозволяє програмі перекривати час очікування та швидше отримувати кінцевий результат.

##### Проблеми з threading-версією

Що ж, як ви можете бачити з прикладу, для цього потрібно трохи більше коду та справді потрібно трохи подумати про те, які дані обмінюються між потоками.

Потоки можуть взаємодіяти непомітно й важко для виявлення. Ці взаємодії можуть спричинити Race Conditions, які часто призводять до випадкових, періодичних помилок, які може бути досить важко знайти. Детальніше ознайомитися з концепцією Race Conditions, можна [тут](https://ru.wikipedia.org/wiki/%D0%A1%D0%BE%D1%81%D1%82%D0%BE%D1%8F%D0%BD%D0%B8%D0%B5_%D0%B3%D0%BE%D0%BD%D0%BA%D0%B8#:~:text=%D0%A1%D0%BE%D1%81%D1%82%D0%BE%D1%8F%D0%BD%D0%B8%D0%B5%20%D0%B3%D0%BE%D0%BD%D0%BA%D0%B8%20(%D0%B0%D0%BD%D0%B3%D0%BB.,%D0%93%D0%BE%D0%BD%D0%BA%D0%B8%20%D1%81%D0%B8%D0%B3%D0%BD%D0%B0%D0%BB%D0%BE%D0%B2).).

#### Асинхронність: asyncio-версія.

---

Згадайте кооперативну багатозадачність: головна ідея полягає у тому, щоб не переривати виконання потоку коду до того моменту, поки цей код сам не "скаже", що він готовий віддати керування (наприклад, коли цей блок коду очікує відповіді від повільного пристрою введення-виведення, мережі, тощо). Тобто, блоки коду, які ми використовуємо, повинні вміти якось взаємодіяти.

Тобто, вся взаємодія, яка буде описуватись далі, не керується операційною системою, а в межах ОДНОГО потоку та одного процесу керується кодом у межах python-модуля asyncio.

##### Основи __asyncio__. Як це працює.

Загальна концепція asyncio полягає у тому, що єдиний об’єкт Python, який називається цикл подій, контролює як та коли виконується кожне завдання. Цикл подій знає про кожне завдання та знає, у якому стані воно перебуває. Насправді, існує багато станів, у яких можуть перебувати завдання, але зараз уявімо спрощений цикл подій, який має лише два стани.

__Стан готовності__ вказуватиме на те, що завдання має виконати роботу та готове до виконання, а __стан очікування__ означає, що завдання очікує завершення зовнішньої дії, наприклад мережевої операції.

Ваш спрощений цикл подій підтримує два списки завдань, по одному для кожного з цих станів. Він обирає одне з готових завдань (__стан готовності__) та "дає йому роботу". Це завдання знаходиться працює самостійно та його робота не буде перериватись доки воно самостійно не передасть керування назад у цикл подій.

Коли запущене завдання повертає керування циклу подій, цикл подій розміщує це завдання або у __списку готовності__, або у __списку очікування__ (в залежності від того, повернуті результати роботи, чи ні), а потім переглядає кожне завдання у __списку очікування__, щоб перевірити, чи воно стало готовим до завершення, тобто завершилася операція введення-виведення, чи ні. Він знає, що завдання у __списку готовності__ все ще готові отримувати завдання, оскільки вони ще не запущені.

Коли усі завдання знову відсортовано в правильному списку, цикл подій обирає наступне завдання для запуску та процес повторюється. Ваш спрощений цикл подій вибирає завдання, яке чекало найдовше та запускає його. Цей процес повторюється до завершення циклу подій.

Важливим моментом asyncio є те, що завдання ніколи не відмовляються від контролю без свідомості. Їх ніколи не переривають під час операції. Це дозволяє нам трохи легше ділитися ресурсами в asyncio ніж у threading. Не потрібно турбуватися про те, щоб зробити ваш код потокобезпечним.

Це досить високорівневий погляд на роботу asyncio. Докладніше дивіться [тут](https://stackoverflow.com/questions/49005651/how-does-asyncio-actually-work/51116910#51116910) або в [документації](https://docs.python.org/3/library/asyncio.html).

##### async та await

Два нових ключових слова, доданих до Python, починаючи з версії 3.5: __async__ та __await__.

###### await

Можна розглядати await як оператор або команду, яка дозволяє завданню передати керування назад у цикл подій. Коли ваш код очікує щось (відповідь з мережі, відповідь від якогось пристрою введення-виведення), це означає, що виклик, ймовірно, буде чимось, що займе деякий час й завдання має відмовитися від контролю.

###### async

Найлегше уявити async Python як прапорець, який повідомляє, що функція, яку потрібно визначити, використовує await. У деяких випадках це не зовсім вірно, як асинхронні генератори, але це актуально для багатьох випадків та надає просту модель, поки ви починаєте.

Одним винятком з цього, який ви побачите у наступному коді, є async with оператор, який створює диспетчер контексту з об’єкта, якого ви зазвичай очікуєте. Хоча семантика дещо інша, ідея та сама: позначити цей контекстний менеджер як щось, що можна замінити.

##### Асинхронна версія коду.

In [23]:
import asyncio
import time
import aiohttp
import functools


def get_duration(func):
    @functools.wraps(func)
    def wraper(*args, **kwargs):
        start_time = time.time()
        res = func(*args, **kwargs)
        duration = time.time() - start_time
        print(f"Duration = {duration}")
        return res

    return wraper


async def download_site(session, url, num):
    async with session.get(url) as response:
        print(
            f"Read -{num}- {response.content_length} from {url}, status code is {response.status}"
        )


async def download_all_sites(sites):
    async with aiohttp.ClientSession() as session:
        tasks = [
            asyncio.create_task(download_site(session, url, num)) for num, url in enumerate(sites)
        ]
        await asyncio.gather(*tasks)


sites = [
            "http://www.testingmcafeesites.com/index.html",
            "https://www.jython.org"
        ] * 200

if __name__ == "__main__":
    asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
    start_time = time.time()
    await download_all_sites(sites)
    duration = time.time() - start_time
    print(f"Duration = {duration}")

Read -12- 2335 from http://www.testingmcafeesites.com/index.html, status code is 200
Read -16- 2335 from http://www.testingmcafeesites.com/index.html, status code is 200
Read -0- 2335 from http://www.testingmcafeesites.com/index.html, status code is 200
Read -32- 2335 from http://www.testingmcafeesites.com/index.html, status code is 200
Read -54- 2335 from http://www.testingmcafeesites.com/index.html, status code is 200
Read -18- 2335 from http://www.testingmcafeesites.com/index.html, status code is 200
Read -50- 2335 from http://www.testingmcafeesites.com/index.html, status code is 200
Read -10- 2335 from http://www.testingmcafeesites.com/index.html, status code is 200
Read -30- 2335 from http://www.testingmcafeesites.com/index.html, status code is 200
Read -24- 2335 from http://www.testingmcafeesites.com/index.html, status code is 200
Read -3- 3721 from https://www.jython.org, status code is 200
Read -62- 2335 from http://www.testingmcafeesites.com/index.html, status code is 200
Read

---

### ВАЖЛИВО!!

Зважаючи на [особливості виконання асинхронного коду в CPython та у середовищі jupyter notebook](https://stackoverflow.com/questions/55409641/asyncio-run-cannot-be-called-from-a-running-event-loop-when-using-jupyter-no), для демонстрації роботи цього блоку документа доцільніше запустити його у файлі __async_version.py__, який знаходиться у цій же теці.

---

Ця версія трохи складніша за дві попередні. Вона має подібну структуру, але з налаштуванням завдань потрібно трохи більше роботи, ніж зі створенням ThreadPoolExecutor. Почнемо з верхньої частини прикладу.

__download_site()__

Функція download_site() у верхній частині майже ідентична threading версії, за винятком async ключового слова у рядку визначення функції та async with ключових слів під час виклику session.get(). Пізніше ви побачите, чому Session можна передати сюди, а не використовувати локальне сховище потоків.

__download_all_sites()__

У функції download_all_sites() ви побачите найбільшу зміну з threading прикладу.

Можете поділитися сеансом між усіма завданнями, тому сеанс створюється тут як менеджер контексту. Завдання можуть спільно використовувати сеанс, оскільки всі вони виконуються в одному потоці. Немає способу, яким одне завдання може перервати інше, поки сеанс перебуває у "поганому" стані, тобто у стані несподіване переривання, у якому може мати негативні наслідки.

У цьому контекстному менеджері with створює список завдань за допомогою asyncio.ensure_future(), що піклується про їх запуск. Після створення всіх завдань ця функція використовує asyncio.gather() для підтримки контексту сеансу, доки всі завдання не будуть завершені.

Код threading робить щось подібне до цього, але деталі зручно обробляти у ThreadPoolExecutor. Наразі AsyncioPoolExecutor класу немає.

Однак тут є одна невелика, але важлива зміна, прихована у деталях. Пам’ятаєте, як ми говорили про кількість потоків для створення? У threading прикладі не було очевидно, яка оптимальна кількість потоків.

Однією з чудових переваг asyncio є те, що модуль масштабується набагато краще, ніж threading. Створення кожного завдання займає набагато менше ресурсів та менше часу, ніж створення потоку, тому створення та виконання більшої кількості завдань працює добре. Цей приклад просто створює окреме завдання для кожного сайту для завантаження, що працює досить добре.

Зверніть увагу, що запуск коду у jupyter notebook та в окремоу файлі (__async_version.py__) відрізняється. Це обумовлено технічними особливостями двох, фактично, різних оболонок виконання.

###### Чим крута asyncio версія?

Це дуже швидко! Це, напевно, найшвидший варіант.

Часова діаграма виконання виглядає дуже подібно до того, що відбувається в threading прикладі. Просто всі запити введення/виведення виконуються одним потоком:

![diagramm for asyncio version](media/async_task.png)

Відсутність такої гарної обгортки ThreadPoolExecutor робить цей код дещо складнішим, ніж threading приклад. Це той випадок, коли вам доведеться трохи попрацювати, щоб отримати набагато кращу продуктивність.

Крім того, існує загальний аргумент, що необхідність додавати async та await у належних місцях є додатковою складністю. Певною мірою це істина. Зворотний бік цього аргументу полягає в тому, що він змушує вас думати про те, коли дане завдання буде замінено, що може допомогти вам створити кращий та швидший дизайн.

###### Проблеми з asyncio версією

__asyncio__
 На цьому етапі є кілька проблем. Вам потрібні спеціальні асинхронні версії бібліотек, щоб отримати всі переваги asyncio. Якби ви використовували лише requests для завантаження сайтів, це було б набагато повільніше, оскільки requests не призначено для сповіщення циклу подій про те, що його заблоковано. Ця проблема стає все меншою й меншою з часом та все більше бібліотек охоплює asyncio.

Інша, більш тонка проблема полягає у тому, що всі переваги кооперативної багатозадачності втрачаються, якщо одне з завдань не співпрацює. Незначна помилка у коді може призвести до вимкнення завдання та затримки процесора протягом тривалого часу, заважаючи іншим завданням, які потребують виконання. Немає можливості увійти до циклу подій, якщо завдання не передасть йому керування.

##### Багатопроцесорна версія коду.

---

На відміну від попередніх підходів, multiprocessing версія коду повністю використовує переваги кількох ЦП, які має ваш крутий новий комп’ютер. Почнемо з коду:

In [24]:
import requests
import multiprocessing
import time

session = None


def get_duration(func):
    @functools.wraps(func)
    def wraper(*args, **kwargs):
        start_time = time.time()
        res = func(*args, **kwargs)
        duration = time.time() - start_time
        print(f"Duration = {duration}")
        return res

    return wraper


def set_global_session():
    global session
    if not session:
        session = requests.Session()


def download_site(url):
    with session.get(url) as response:
        name = multiprocessing.current_process().name
        print(f"{name}:Read {len(response.content)} from {url}")


def download_all_sites(sites):
    with multiprocessing.Pool(initializer=set_global_session) as pool:
        pool.map(download_site, sites)

##  ВАЖЛИВО!

Розберемо код, а запускати його необхідно в окремому файлі (знаходиться у цій же теці, має назву __multi.py__). Це пов'язано з взаємодією IPython, який використовує jupyter notebook та взаємодією з Windows. Докладніше [тут](https://medium.com/@grvsinghal/speed-up-your-python-code-using-multiprocessing-on-windows-and-jupyter-or-ipython-2714b49d6fac).

Так як ускланювати код при поясненні основ не сама гарна ідея, дивимось код тут, а запускаємо такий самий код в окремому файлі.

###### __multiprocessing код__

У коді є кілька невеликих змін порівняно з нашою синхронною версією.
Перший у функції download_all_sites(). Замість того, щоб просто викликати download_site() кілька разів, він створює multiprocessing.Pool об’єкт та мапить download_site на iterable sites. Це має виглядати знайомим з threading прикладу.

Тут відбувається те, що Pool створює кілька окремих процесів інтерпретатора Python та кожен з них запускає вказану функцію для деяких елементів ітерованого елемента, яким у нашому випадку є список сайтів. Зв’язок між головним процесом та іншими процесами обробляється multiprocessing модулем за вас.

Рядок, який створює Pool, варта вашої уваги. По-перше, він не вказує, скільки процесів потрібно створити в Pool, хоча це необов’язковий параметр. За замовчуванням multiprocessing.Pool() визначатиме кількість ЦП у вашому комп’ютері та відповідатиме їй. Найчастіше це найкраща відповідь та це у даному випадку.

Для цієї проблеми збільшення кількості процесів не пришвидшило роботу. Це фактично уповільнило роботу, оскільки вартість налаштування та зняття всіх цих процесів була більшою, ніж перевага від виконання запитів введення-виведення паралельно.

Далі ми маємо initializer=set_global_session частину цього виклику. Пам’ятайте, що кожен наш процес Pool має власний простір пам’яті. Це означає, що вони не можуть ділитися такими речами, як Session об’єкт. Ви не хочете створювати нову Session щоразу, коли викликається функція, ви хочете створити по одній для кожного процесу.

Параметр initializer функції створено саме для цього випадку. Немає способу передати значення, що повертається, назад з initializer функції, яку викликає функція download_site(), але ви можете ініціалізувати глобальну session змінну, щоб утримувати один сеанс для кожного процесу. Оскільки кожен процес має власний простір пам’яті, глобальний для кожного буде різним.

Це насправді все. Решта коду дуже схожа на те, що ви бачили раніше.

##### Чому multiprocessing версія крута.

Версія multiprocessing цього прикладу чудова, тому що її відносно легко налаштувати та вимагає невеликого додаткового коду. Він також повністю використовує потужність ЦП вашого комп’ютера. Часова діаграма виконання цього коду виглядає так:

![multiprocessing IO diagramm](media/multiprocessing.png)

##### Проблеми з multiprocessing версією

Ця версія прикладу вимагає додаткового налаштування, а глобальний session об’єкт є дивним. Ви повинні витратити деякий час на роздуми про те, які змінні будуть доступні в кожному процесі.

Нарешті, він явно повільніший, ніж версії з залученням модулів asyncio та threading, тому що вимагає надвеликих накладних витрат: фактично на кожному вашому процесорі було запущено окремий інтерпретатор Python!

Це не дивно, оскільки проблеми, пов’язані з введенням/виведенням, насправді не multiprocessing рішень.

#### CPU-bound завдання

---

Трохи змінимо напрямок думок. Усі приклади досі стосувалися проблеми, пов’язаної з введенням-виведенням. Розглянемо проблему, пов’язану з процесором. Як ви бачили, проблема, пов’язана з введенням-виведенням, витрачає більшу частину часу на очікування завершення зовнішніх операцій, наприклад мережевого виклику. Проблема, пов’язана з процесором, з іншого боку, виконує мало операцій введення-виведення та її загальний час виконання є фактором того, наскільки швидко вона може обробити необхідні дані.

Для цілей нашого прикладу ми використаємо дещо функцію що потребує багато часу для виконання на ЦП. Ця функція отримує цілі числа в ітераційному об’єкті, знаходить для них усі існуючі цілі дільники, які повертає у списках, розміщених у кортежі (послідовність результатів відповідає послідовності початкових чисел в аргументі функції).
Зверніть увагу, що функція використовує найбільш неефективний алгоритм пошуку цілочисельного дільника. Завдання цієї програми полягає у дослідженні ефективності багатопроцесорної обробки з "CPU-bound" кодом.

In [25]:
def factorize_singl(*number) -> tuple:
    # gets integers in an iterable object, finds for them all existing integer divisors
    # which it returns in the lists placed in a tuple (the sequence of results corresponds
    # to the sequence of initial numbers in the function argument)
    # Important! The function uses the most inefficient integer divisor search algorithm.
    # The task of this program is to investigate the efficiency of multiprocessing with
    # "CPU-bound" code.
    return tuple([x for x in range(1, num + 1) if num % x == 0] for num in number)

Для того, щоб порівняти різні підходи для такого класу завдань розмістимо цю функцію, а також її багатопроцесорні варіанти (з різними варіантами запуску багатопроцесорних завдань) у файлі __multiprocess.py__ у цій же теці. Запустіть програму, почекайте декілька хвилин, щоб дочекатись результатів, а потім зробимо висновки.

Як працює синхронна версія? Діаграма її роботи:

![робота синхронної версії](media/CPU_bound_task.png)

На відміну від прикладів, пов’язаних з введенням/виведенням, приклади, пов’язані з процесором, зазвичай досить послідовні у часі виконання.

Як працюють multiprocessing версії програм. Діаграма їх роботи:

![робота в багатопроцесорному режимі](media/multiprocessing_work.png)

Діаграма зроблена для роботи двох процесорів. Коли використовується більше процесорів з'являються додаткові, абсолютно аналогічні процеси (у прикладі в файлі - до 8-и процесів).

Порівнюючи роботу, швидкість суттєво збільшується при збільшенні кількості процесів, але не лінійно, бо додаткові затрати виробничих потужностей при фактичних запусках додаткових інтерпретаторів будуть досить значними. Але інших шляхів підвищити швидкість обробки такого класу завдань немає.

##### threading та asyncio версії

Як ви гадаєте, наскільки це пришвидшить переписування цього коду за допомогою threading або asyncio?

Якщо ви відповіли «Зовсім ні» — ви молодець та вірно зрозуміли тему. Якщо ви відповіли: «Це сповільнить роботу» — ви двічі молодець, бо це й є вірна відповідь.

Ось чому: у вашому прикладі, пов’язаному з введенням-виведенням вище, більша частина загального часу була витрачена на очікування завершення повільних операцій. Модулі threading та asyncio пришвидшили це, дозволивши вам перекривати час очікування замість того, щоб виконувати їх послідовно.

Проте, якщо проблема пов’язана з процесором, чекати не потрібно. ЦП запускає так швидко, як тільки може, щоб вирішити проблему. У Python потоки та завдання виконуються на одному процесорі в одному процесі. Це означає, що один центральний процесор виконує усю роботу неконкурентного коду, а також додаткову роботу з налаштування потоків або завдань.

##### Чому multiprocessing версія крута?

Версія multiprocessing цього прикладу чудова, тому що її відносно легко налаштувати та вимагає невеликого додаткового коду. Вона також повністю використовує потужність ЦП вашого комп’ютера.

Це саме те, що ми казали минулого разу, коли ми дивилися на multiprocessing при IO-bound завданні. Велика різниця полягає в тому, що цього разу це фактично найкращий варіант.

##### Проблеми з __multiprocessing__ версією.

Є деякі недоліки використання multiprocessing. Вони насправді не відображаються у цьому простому прикладі, але розділити вашу проблему, щоб кожен процесор міг працювати незалежно, іноді може бути важко.

Крім того, багато рішень вимагають більше зв’язку між процесами. Це може додати певної складності вашому рішенню, з яким не потрібно мати справу неконкурентній програмі.