### Тема 6: многопоточность (multithreading) и многопроцессность (multiprocessing)

[Хорошая статья, картинка оттуда.](https://medium.com/contentsquare-engineering-blog/multithreading-vs-multiprocessing-in-python-ece023ad55a)

![Многопоточность и многопроцессность](https://miro.medium.com/max/1250/1*2zTp9ga9egrWu7GMhLK-nA.png)

 Набор полезных ссылок:
* [Пример из документации - ProcessThreadExecutor, там же мультипроцессинг](https://docs.python.org/3/library/concurrent.futures.html)
* [Альтернативная документация по мультипроцессингу](https://docs.python.org/3/library/multiprocessing.html)
* [То же самое, но более простыми словами](https://medium.com/contentsquare-engineering-blog/multithreading-vs-multiprocessing-in-python-ece023ad55a)
* [Красивый продвинутый пример](https://www.toptal.com/python/beginners-guide-to-concurrency-and-parallelism-in-python)
* [Туториал, объясняется похуже, рассказно про Queue](https://python-scripts.com/threading)

Про многопоточность: 
 * **Процесс** - экземпляр запущенной программы с собственной памятью, он есть в памяти компа и в Task Manager. Внутри процесса есть подпроцессы - треды. Поток - способ выполнения процесса, отдельное исполняемое задание внутри него. Потоки внутри одного процесса могут иметь общую память. 
  * Можно разбить один процесс на несколько потоков (треды), если эти потоки сами по себе занимают много времени и параллельно должны выполняться другие. Например, функция ждёт ответа от сайта. В этом случае, если мы ждём ответа от нескольких сайтов, мы запустим тред для первого сайта, потом приостановим его, запустим тред для второго, приостановим, вернёмся к первому и так далее. То есть как бы одновременный запуск по чуть-чуть всех тредов, хотя они не исполняются параллельно и процесс работает только на одном ядре, в отличие от мультипроцессинга.
  * Отсюда вывод: если попробовать разбить на треды сложное вычислительное задание, которое загружает процессор, выйдет только хуже, чем если бы мы вообще этого не делали.
 * GIL. Lock - механизм синхронизации разных потоков. Если один поток пишет в память что-то, а другой это что-то читает, то нужен некоторый механизм, который гарантирует, что сперва всё будет записано, а потом прочтено; и механизм Lock специально за этим - он заставляет ждать отдельные треды, если необходимо. Lock можно самому поставить в код в нужный момент и можно убрать.<br>
**Вопрос**. Работает ли это конкурентно (то есть пока доделывается один тред, уже запустился другой)? Видимо, да.<br>

Стратегии использования многопоточности $(multithreading)$:
 1. Если есть небольшое число задач, которые мы хотим выполнять конкурентно, и они различаются между собой (то есть нельзя применить аналог map), тогда можно вручную создать отдельные треды и запустить их. Например, одновременно ждать ответ от сайта, от базы данных и ещё что-то.
 2. Если есть набор одинаковых задач, и порядок их выполнения не важен, и у них нет общих переменных, то используем ```ThreadPoolExecutor``` (типа map).
 3. Если есть набор одинаковых задач, и порядок их выполнения важен, то используем ```Queue```. Правда, тут уже вопрос, поможет ли это ускорить программу, или выйдет только хуже.

#### Стратегия 1.

In [1]:
import time
import threading
from threading import Thread

In [2]:
%%time
def work(to_sleep, to_print):
    time.sleep(to_sleep)
    print('{name} says :{text}'.format(name=threading.current_thread(), text=to_print))
    
def main():
    t1=Thread(target=work, args=(1, 'I am second'))
    t2=Thread(target=work, args=(0.5, 'I am first'))
    
    t1.start()
    t2.start()
    # join - дождись, пока код в треде выполнится. Это просто способ упорядочить треды, если их порядок важен.
    t1.join()
    t2.join()
    
    # главный тред (0.1 сек) выполнил старт, подождёт, пока выполнится первый тред
    work(0.1, 'I am the last!')
    
if __name__=='__main__':

    main()

<Thread(Thread-7, started 13572)> says :I am first
<Thread(Thread-6, started 17088)> says :I am second
<_MainThread(MainThread, started 15552)> says :I am the last!
Wall time: 1.14 s


In [3]:
%%time
work(0.5, 'I am first')
work(1, 'I am second')
work(0.1, 'I am the last!')

<_MainThread(MainThread, started 15552)> says :I am first
<_MainThread(MainThread, started 15552)> says :I am second
<_MainThread(MainThread, started 15552)> says :I am the last!
Wall time: 1.62 s


То есть в первом случае мы запустили функцию на 0,5 сек, одновременно на 1 сек, в конце на 0,1 сек. Из-за переключения между тредами мы получили вдобавок 0,02 сек.

#### Проблема: потоки имеют общую память, если они работают с общими переменными, код может сломаться.

In [4]:
%%time
counter = 0

# counter глобальный, поэтому надо добавить локи
# иначе процессор запутается, и будет неверное количество итераций

def work():
    global counter
    for _ in range(1000000):
        counter = counter+1
        
def main():
    t1 = Thread(target=work)
    t2 = Thread(target=work)
    
    t1.start()
    t2.start()
    
    t1.join()
    # join гарантирует, что принт ниже будет только после окончания 
    # выполнения функции
    t2.join()
    
    print(counter)
    
if __name__=='__main__':
    main()

1273822
Wall time: 763 ms


Решим проблему с помощью замка.

In [5]:
%%time
from threading import Lock

counter = 0
# для разных частей программы создаются разные экземпляры Lock
lock = Lock()
# если будет вторая функция work(), идентичная первой, но без lock,
# то всё сломается, как в примере выше

def work():
    global counter
    for _ in range(1000000):
        lock.acquire() # загорелась красная лампочка для всех процессов
        counter = counter+1
        lock.release()
        
def main():
    t1 = Thread(target=work)
    t2 = Thread(target=work)
    
    t1.start()
    t2.start()
    
    t1.join()
    t2.join()
    
    print(counter)
    
if __name__=='__main__':
    main()

2000000
Wall time: 4.51 s


In [6]:
%%time
work()
work()
print(counter)

4000000
Wall time: 2.16 s


Когда мы добавили замки, стало, естественно, хуже, чем если бы мы тупо писали код без всяких улучшений.<br>

#### Стратегия 2: аналог map.

Расммотрим конкретный пример, когда нужно применить Multithreading в духе map: есть функция, есть набор аргументов.<br>   - ```ThreadPoolExecutor(max_workers, other args)``` создаёт набор (```pool```) процессов, которые выполняются мультитредово. Что удобно, он работает как контекстный менеджер и берёт на себя запуск и склейку тредов, это уже не надо делать вручную. ```Max_workers``` представляет собой количество тредов, которое может быть больше, чем количество ядер, поскольку на одном ядре могут выполняться несколько процессов. По умолчанию этот аргумент рассчитывается как $min(32, os.cpu\_count() + 4)$, можно его не писать самому.<br>
 * Метод ```ThreadPoolExecutor.submit()``` создаёт отдельный тред с помощью executor для каждого сочетания функция - аргументы (future).
 * Метод ```concurrent.futures.as_completed()```  возвращает итератор, который позволяет итерироваться по нескольким futures, если их выполнение успешно закончено или прервано программой.

In [10]:
%%time
import requests
import concurrent.futures

# future - это объект с результатом выполнения обрабатываемой функции

def get_wiki_page_existence(wiki_page_url, timeout):
    response = requests.get(url=wiki_page_url, timeout=timeout)

    page_status = "unknown"
    if response.status_code == 200:
        page_status = "exists"
    elif response.status_code == 404:
        page_status = "does not exist"

    return wiki_page_url + " - " + page_status

wiki_page_urls = [
    "https://en.wikipedia.org/wiki/Ocean",
    "https://en.wikipedia.org/wiki/Island",
    "https://en.wikipedia.org/wiki/this_page_does_not_exist",
    "https://en.wikipedia.org/wiki/Shark",
]

with concurrent.futures.ThreadPoolExecutor() as exec:
    futures = []
    for url in wiki_page_urls:
        futures.append(exec.submit(get_wiki_page_existence, wiki_page_url=url, timeout=5))
    for future in concurrent.futures.as_completed(futures):
        print(future.result())
    

https://en.wikipedia.org/wiki/Island - exists
https://en.wikipedia.org/wiki/this_page_does_not_exist - does not exist
https://en.wikipedia.org/wiki/Ocean - exists
https://en.wikipedia.org/wiki/Shark - exists
Wall time: 389 ms


Сравним с обычным циклом по строкам:

In [8]:
%%time
for url in wiki_page_urls:
    print(get_wiki_page_existence(url, timeout=5))

https://en.wikipedia.org/wiki/Ocean - exists
https://en.wikipedia.org/wiki/Island - exists
https://en.wikipedia.org/wiki/this_page_does_not_exist - does not exist
https://en.wikipedia.org/wiki/Shark - exists
Wall time: 1.22 s


#### Перейдём к мультипроцессингу. Он тоже есть в модуле ```concurrent.futures```, но красивее писать его по-другому.

Посмотрим пример осмысленного использования модуля ```multiprocessing``` для обработки 3 последовательностей.<br>
Стратегии работы с мультипроцессингом аналогичны мультитредингу:
 1. Можно определить отдельные процессы, если они различаются между собой, и запустить их параллельно.<br>
 2. Для решения множества одинаковых задач используется ```map```. <br>

В модуле используется **```Pool```**, и его метод ```map``` для обработки нескольких элементов последовательности. ```Pool``` работает через ```with() ```, как контекстный менеджер.
 * **На линуксе и в обычных интерпретаторах всё работает из одной ячейки, а для винды надо импортировать целевую функцию из отдельного модуля**
 * Число процессов Pool по умолчанию равно ```os.cpu_count()```.
 * imap_unordered() позволяет обрабатывать задачи в произвольном порядке.
 * есть ещё полезный вариант ```starmap```: он позволяет создать через ```zip``` комбинации переменных для целевой функции, когда нужно подавать на вход несколько переменных, и проитерироваться по этим кортежам как ```map```.

#### Стратегия 2. Придётся вот так (функция определена в отдельном модуле):

In [9]:
import time
from functools import wraps
from random import randint
from multiprocessing import Pool
from last_prime import last_prime

def timeit(f):
    @wraps(f)
    def inner(*args, **kwargs):
        start = time.time()
        r = f(*args, **kwargs)
        print('{} took {:3f} seconds'.format(f.__name__, time.time() - start))
        return r

    return inner


@timeit
def main():
    min = 12 ** 4
    caps = [
        min + randint(0, 1000) for _ in range(10)
    ]
    with Pool() as pool:
        last_primes = pool.imap_unordered(last_prime, caps)
        print(list(last_primes))


if __name__ == '__main__':
    main()


[20731, 20983, 21031, 21121, 21347, 21649, 21481, 21727, 20879, 21401]
main took 13.091313 seconds


#### Сравним с обычным подходом:

In [10]:
import time
from functools import wraps
from random import randint
from multiprocessing import Pool

def is_prime(n):
    for i in range(2, n):
        if n % i == 0:
            return False
    else:
        return True


def last_prime(cap):
    max_prime = 1
    for i in range(cap):
        if is_prime(i):
            max_prime = i

    return max_prime


def timeit(f):
    @wraps(f)
    def inner(*args, **kwargs):
        start = time.time()
        r = f(*args, **kwargs)
        print('{} took {:3f} seconds'.format(f.__name__, time.time() - start))
        return r

    return inner


@timeit
def main():
    min = 12 ** 4
    caps = [
        min + randint(0, 1000) for _ in range(10)
    ]
    
    last_primes = map(last_prime, caps)
    print(list(last_primes))


if __name__ == '__main__':
    main()

[20947, 21713, 21169, 21121, 21727, 21247, 20983, 21433, 21121, 20947]
main took 71.051435 seconds


In [None]:
import time
from functools import wraps
from multiprocessing.dummy import Pool


def count_symbols_in_file(filename):
    with open(filename)as f:
        return len(f.read())


def timeit(f):
    @wraps(f)
    def inner(*args, **kwargs):
        start = time.time()
        r = f(*args, **kwargs)
        print('{} took {:3f} seconds'.format(f.__name__, time.time() - start))
        return r

    return inner


@timeit
def main():
    time.time()
    files = ['1', '2', '3', '4']
    with Pool() as p:
        res = p.map(count_symbols_in_file, files)
        for f, r in zip(files, res):
            print(f, r)

    # for name in files:
    #     print(name, count_symbols_in_file(name))


if __name__ == '__main__':
    main()