# Threading, Multiproccessing и Asyncio

Библиотеки threading и asyncio позволяют создавать многопоточные (хотя модуль asyncio не оперирует понятием потока) приложения, а multiproccessing - многопроцессорные.

в Python используется глобальная блокировка интерпретатора (Global Interpreter Lock — GIL), накладывающая определенные ограничения на потоки:
* Нельзя использовать несколько процессоров одновременно
* Одновременно может выполняться только один поток.
* Интерпретатор Python переключается между потоками для достижения конкурентности.

Принцип работы прост. Потоки удерживают GIL, пока выполняются. Однако они освобождают его при блокировании для операций ввода-вывода. Каждый раз, когда поток вынужден ждать, другие, готовые к выполнению, потоки используют свой шанс запуститься.

Из всего вышеописанного следует одна очень важная вещь - модуль threading, который позволяет создовать потоки, может улучшить производительность нашей программы только в том, случае, когда у на есть множество операций ввода-вывода, которые заставляют потоки ждать. Использование модуля threading для задач связанных с вычислениями на CPU не только не ускорят выполнение программы, но наоборт замедлят его.

Модуль asyncio позволяет не CPU, а программе, выбирать в какой момент будет переключен контекст.

Модуль multiproccessing обходит ограничения GIL и позволяет запускать процессы параллелно на различных ядрах. Этот модуль стоит использовать для задач связанных с вычислениями на CPU. Возможность работать с несколькими процессами также накладывает определенные издержки, например, повышает потребление оперативной памяти по сравнению с потоками (так как каждому процессу нужно свое пространство).

Резюмируя:
* multiproccessing стоит испоьзовать для задач, которые выполняются на CPU
* asyncio и threading стоит использовать для задач, которые требуют частых задержек из-за оперции ввода-вывода (задержка сети, запросы к базам данных, ожидание ответа пользователя)

In [1]:
# задача исполняемая на CPU с использованием multiprocessing

#!/usr/bin/env python3
import multiprocessing
import time


def cpu_bound(number):
    return sum(i * i for i in range(number))


def find_sums(numbers):
    with multiprocessing.Pool() as pool:
        pool.map(cpu_bound, numbers)


if __name__ == "__main__":
    numbers = [5_000_000 + x for x in range(20)]

    start_time = time.time()
    find_sums(numbers)
    duration = time.time() - start_time
    print(f"Duration {duration} seconds")

Duration 2.1561520099639893 seconds


In [2]:
# задача исполняемая на CPU без использования библиотек

#!/usr/bin/env python3
import time


def cpu_bound(number):
    return sum(i * i for i in range(number))


def find_sums(numbers):
    for number in numbers:
        cpu_bound(number)


if __name__ == "__main__":
    numbers = [5_000_000 + x for x in range(20)]

    start_time = time.time()
    find_sums(numbers)
    duration = time.time() - start_time
    print(f"Duration {duration} seconds")

Duration 7.216629981994629 seconds


In [3]:
# # задача исполняемая на CPU с использованием threading

#!/usr/bin/env python3
import concurrent.futures
import time


def cpu_bound(number):
    return sum(i * i for i in range(number))


def find_sums(numbers):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        executor.map(cpu_bound, numbers)


if __name__ == "__main__":
    numbers = [5_000_000 + x for x in range(20)]

    start_time = time.time()
    find_sums(numbers)
    duration = time.time() - start_time
    print(f"Duration {duration} seconds")

Duration 7.552484035491943 seconds


In [None]:
# задача с ожиданием с использованием asyncio

#!/usr/bin/env python3
import asyncio
import time
import aiohttp


async def download_site(session, url):
    async with session.get(url) as response:
        print("Read {0} from {1}".format(response.content_length, url))


async def download_all_sites(sites):
    async with aiohttp.ClientSession() as session:
        tasks = []
        for url in sites:
            task = asyncio.ensure_future(download_site(session, url))
            tasks.append(task)
        await asyncio.gather(*tasks, return_exceptions=True)


if __name__ == "__main__":
    sites = [
        "https://www.jython.org",
        "http://olympus.realpython.org/dice",
    ] * 80
    start_time = time.time()
    asyncio.get_event_loop().run_until_complete(download_all_sites(sites))
    duration = time.time() - start_time
    print(f"Downloaded {len(sites)} sites in {duration} seconds")

In [5]:
# Downloaded 160 sites in 0.7162511348724365 seconds
# В jupyter notebook данный код не исполняется. Можно попробовать запустить в другом IDE, например Visual Studio Code

In [4]:
# задача с ожиданием с использованием multiprocessing


#!/usr/bin/env python3
import requests
import multiprocessing
import time

session = None


def set_global_session():
    global session
    if not session:
        session = requests.Session()


def download_site(url):
    with session.get(url) as response:
        name = multiprocessing.current_process().name
#         print(f"{name}:Read {len(response.content)} from {url}")


def download_all_sites(sites):
    with multiprocessing.Pool(initializer=set_global_session) as pool:
        pool.map(download_site, sites)


if __name__ == "__main__":
    sites = [
        "https://www.jython.org",
        "http://olympus.realpython.org/dice",
    ] * 80
    start_time = time.time()
    download_all_sites(sites)
    duration = time.time() - start_time
    print(f"Downloaded {len(sites)} in {duration} seconds")

Downloaded 160 in 2.508066415786743 seconds


In [5]:
# задача с ожиданием с использованием threading

#!/usr/bin/env python3
import concurrent.futures
import requests
import threading
import time


thread_local = threading.local()


def get_session():
    if not hasattr(thread_local, "session"):
        thread_local.session = requests.Session()
    return thread_local.session


def download_site(url):
    session = get_session()
    with session.get(url) as response:
        pass
#         print(f"Read {len(response.content)} from {url}")


def download_all_sites(sites):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        executor.map(download_site, sites)


if __name__ == "__main__":
    sites = [
        "https://www.jython.org",
        "http://olympus.realpython.org/dice",
    ] * 80
    start_time = time.time()
    download_all_sites(sites)
    duration = time.time() - start_time
    print(f"Downloaded {len(sites)} in {duration} seconds")

Downloaded 160 in 3.187723398208618 seconds


In [6]:
# задача с ожиданием без использования библиотек

#!/usr/bin/env python3
import requests
import time


def download_site(url, session):
    with session.get(url) as response:
        pass
#         print(f"Read {len(response.content)} from {url}")


def download_all_sites(sites):
    with requests.Session() as session:
        for url in sites:
            download_site(url, session)


if __name__ == "__main__":
    sites = [
        "https://www.jython.org",
        "http://olympus.realpython.org/dice",
    ] * 80
    start_time = time.time()
    download_all_sites(sites)
    duration = time.time() - start_time
    print(f"Downloaded {len(sites)} in {duration} seconds")

Downloaded 160 in 14.9812490940094 seconds


# Источники:
* https://docs.python.org/3.8/library/asyncio.html
* https://leimao.github.io/blog/Python-Concurrency-High-Level/
* https://github.com/realpython/materials/tree/master/concurrency-overview
* https://habr.com/ru/company/otus/blog/501056/
* https://habr.com/ru/post/84629/
* https://medium.com/@itIsMadhavan/concurrency-vs-parallelism-a-brief-review-b337c8dac350