# threading

### Процесс и Поток

Процесс - программа, которая выполняется в текущий момент.

<Представление программы в памяти>: https://commons.wikimedia.org/wiki/File:C-memlayout-ru.svg?uselang=ru

Поток (поток выполнения, thread) - единица обработки, исполнение которой может быть назначено ядром операционной системы. Исполняющаяся последовательность инструкций внутри процесса.

Поток также называют "легковестным процессом" (https://learn-gevent-socketio.readthedocs.io/en/latest/general_concepts.html).

Несколько потоков выполнения могут существовать в рамках одного и того же процесса и совместно использовать его ресурсы.

Процесс с двумя потоками выполнения на одном ядре процессора: https://commons.wikimedia.org/wiki/File:Multithreaded_process.svg?uselang=ru

Процессы с тредами: https://sites.google.com/site/sureshdevang/thread-vs-process


* одно ядро процессора в один момент времени может исполнять только один тред
* треды одного процесса могут исполняться физически одновременно (на разных ядрах)
* бессмысленно порождать вычислительных тредов больше, чем у вас есть ядер

### Исполнение кода питона

CPython (самая популярная реализация интерпретатора питона) был реализован с максимальной простотой и имеет потокобезопасный механизм - GIL (Global Interpreter Lock).

Благодаря этому Lock'у интерпретатор питона может исполнять лишь одну команду в один момент времени (single threading). По этой причине, создание несколько потоков не приведет к их одновременному исполнению на разных ядрах процессора, тем не менее, потоки полезны и в python.

In [None]:
# модуль питона для работы с потоками
import threading

Рассмотрим простой пример программы, создающей потоки:

In [23]:
import threading # модуль для работы с потоками (threads)
import sys

def thread_job(number):
    print('Hello {}'.format(number))
    sys.stdout.flush()

def run_threads(count):
    thread_job(0)
    threads = [
        threading.Thread(target=thread_job, args=(i,))
        for i in range(1, count + 1)
    ]
    for thread in threads:
        thread.start()  # каждый поток должен быть запущен
    for thread in threads:
        thread.join()  # дожидаемся исполнения всех потоков

run_threads(4)

Hello 0
Hello 1
Hello 2Hello 3Hello 4




### Упражнение №1

Запустите следующий код. В чем проблема данного кода? Всегда ли counter = 10 после исполнения кода программы?

In [26]:
counter = 0

def thread_job():
    global counter
    old_counter = counter
    counter = old_counter + 1
    print('{} '.format(counter), end='')

threads = [threading.Thread(target=thread_job) for _ in range(10)]
for thread in threads:
    thread.start()
for thread in threads:
    thread.join()

counter

2 1 3 4 5 6 7 8 9 10 

10

Демонстрация "проблемности" кода:

In [33]:
import random
import time

counter = 0
def thread_job():
    global counter
    old_counter = counter
    time.sleep(random.randint(0, 1))
    counter = old_counter + 1
    print('{} '.format(counter), end='')

threads = [threading.Thread(target=thread_job) for _ in range(10)]
for thread in threads:
    thread.start()
for thread in threads:
    thread.join()
counter

1 2 3 4 5 6 7 8 9 10 

10

#### Почему так происходит?

Одно из возможных решений (не самое аккуратное):

In [21]:
counter = 0

def thread_job(lock):
    lock.acquire() # mutex
    global counter
    counter += 1
    print('{} '.format(counter), end='')
    sys.stdout.flush()
    lock.release()

lock = threading.Lock()
threads = [
    threading.Thread(target=thread_job, args=(lock,))
    for i in range(10)
]
for thread in threads:
    thread.start()
for thread in threads:
    thread.join()

counter

1 2 3 4 5 6 7 8 9 10 

10

Решение лучше (с with, https://jeffknupp.com/blog/2016/03/07/python-with-context-managers):

In [30]:
counter = 0

def thread_job(lock):
    with lock:
        global counter
        counter += 1
        print('{} '.format(counter), end='')
        sys.stdout.flush()

lock = threading.Lock()
threads = [
    threading.Thread(target=thread_job, args=(lock,))
    for i in range(10)
]
for thread in threads:
    thread.start()
for thread in threads:
    thread.join()

counter

1 2 3 4 5 6 7 8 9 10 

10

Лучшее решение (используя queue (очереди) на счет и вывод на экран):

In [34]:
import threading
import queue

class Counter:
    def __init__(self, value):
        self.value = value

def printer(printing_queue):
    while True:
        value = printing_queue.get()
        print(value)
        printing_queue.task_done()

def calculator(counter, calculation_queue, printing_queue):
    while True:
        delta = calculation_queue.get()
        counter.value += delta
        printing_queue.put(counter.value)
        calculation_queue.task_done()

def delta_generator(calculation_queue):
    calculation_queue.put(1)

# Main
printing_queue = queue.Queue()
printer_daemon = threading.Thread(
    target=printer,
    args=(printing_queue,),
    daemon=True
)
printer_daemon.start()

counter = Counter(0)
calculation_queue = queue.Queue()
calculator_daemon = threading.Thread(
    target=calculator,
    args=(counter, calculation_queue, printing_queue),
    daemon=True
)
calculator_daemon.start()

workers = [
    threading.Thread(target=delta_generator, args=(calculation_queue,))
    for _ in range(10)
]
for worker in workers:
    worker.start()
for worker in workers:
    worker.join()

calculation_queue.join()
printing_queue.join()


1
2
3
4
5
6
7
8
9
10


* ошибки в многопоточном коде - одни из самых неприятных
* модуль queue позволяет на порядок меньше думать и ошибаться, это самый pythonic способ писать многопоточный код

### Упражнение:
Написать программу, которая будет находить сумму чисел массива с использованием N тредов
(N <= core_count)

Пример:

In [36]:
import queue

def thread_job(arr, part_id, thread_count, results_queue):
    results_queue.put(
        sum(arr[i] for i in range(part_id, len(arr), thread_count))
    )

def sum_using_threads(arr, thread_count):
    results_queue = queue.Queue()
    threads = [
        threading.Thread(target=thread_job, args=(arr, i, thread_count, results_queue))
        for i in range(thread_count)
    ]
    for thread in threads:
        thread.start()

    results = []
    for thread in threads:
        results.append(results_queue.get())
        thread.join()

    return sum(results)

In [37]:
arr = [1 for _ in range(10 * 1000 * 1000)]

In [40]:
%%timeit
sum_using_threads(arr, 1)

912 ms ± 28.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [39]:
%%timeit
sum_using_threads(arr, 4)

905 ms ± 22.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


Видим влияние GIL (Lock) на исполнение. **Вычисления** распараллеливать бессмысленно

Существует сценарии, при которых использование потоков оправдано!

Упражнение №.:

In [46]:
import urllib.request

urls = [
    'https://www.yandex.ru', 'https://www.google.com',
    'https://habrahabr.ru', 'https://www.python.org',
    'https://isocpp.org',
]

def read_url(url):
    with urllib.request.urlopen(url) as u:
        return u.read()

In [43]:
%%timeit
for url in urls:
    read_url(url)

6.18 s ± 341 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [44]:
%%timeit
readers = [
    threading.Thread(target=read_url, args=(url,))
    for url in urls
]
for reader in readers:
    reader.start()
for reader in readers:
    reader.join()

2.28 s ± 181 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


Треды очень уместны, если в коде есть блокирующие операции (ввод-вывод, сетевые взаимодействия). Также, лучше всего разбивать логические процессы по тредам (анимация, графический интерфейс, и тд)

# multiprocessing

https://sebastianraschka.com/Articles/2014_multiprocessing.html#multi-threading-vs-multi-processing

In [47]:
import multiprocessing  # модуль для работы с процессами, создание, управление и тд.

Интерфейс строится аналогично threading. Модуль позволяет полностью использовать мощности многоядерных процессоров.

### Упражнение №.

Запустите код. Объясните почему так происходит (LIST).

In [48]:
import multiprocessing

LIST = []

def worker():
    LIST.append('item')
    
processes = [
    multiprocessing.Process(target=worker)
    for _ in range(5)
]

for p in processes:
    p.start()
for p in processes:
    p.join()
    
LIST

[]

Общение между процессами:

In [50]:
from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

q = Queue()
p = Process(target=f, args=(q,))
p.start()
p.join()
print(q.get())

[42, None, 'hello']


### Использование очереди

In [56]:
import multiprocessing as mp
import random
import string

random.seed(123)

# Define an output queue
output = mp.Queue()

# define a example function
def rand_string(length, output):
    """ Generates a random string of numbers, lower- and uppercase chars. """
    rand_str = ''.join(random.choice(
                        string.ascii_lowercase
                        + string.ascii_uppercase
                        + string.digits)
                   for i in range(length))
    output.put(rand_str)

# Setup a list of processes that we want to run
processes = [mp.Process(target=rand_string, args=(5, output)) for x in range(4)]

# Run processes
for p in processes:
    p.start()

# Exit the completed processes
for p in processes:
    p.join()

# Get process results from the output queue
results = [output.get() for p in processes]

print(results)

['tWIn1', 'N8B8n', 'aavo9', 'z6FNG']


### Pool

Класс Pool - удобный механизм распараллеливания выполнения функций, распределения входных данных по процессам и тд

Наиболее интересные функции:
* Pool.apply
* Pool.map
* Pool.apply_async
* Pool.map_async

apply, map работают аналогично питоновским built-in apply, map

In [57]:
def cube(x):
    return x**3

pool = mp.Pool(processes=4)  # создаем пул из 4 процессов
results = [pool.apply(cube, args=(x,)) for x in range(1,7)]  # раскидываем числа от 1 до 7 по 4 процессам
print(results)

pool = mp.Pool(processes=4)
results = pool.map(cube, range(1,7))  # то же самое, но с map
print(results)

[1, 8, 27, 64, 125, 216]
[1, 8, 27, 64, 125, 216]


map, apply - блокирующие вызовы. Главная программа будет заблокирована, пока процесс не выполнит работу.

map_async, apply_async - неблокирующие. При их вызове, они сразу возвращают управление в главную программу (возвращают ApplyResult как результат). Метод get() объекта ApplyResult блокирует основной поток, пока функция не будет выполнена.

In [58]:
pool = mp.Pool(processes=4)
results = [pool.apply_async(cube, args=(x,)) for x in range(1,7)]
output = [p.get() for p in results]
print(output)

[1, 8, 27, 64, 125, 216]


#### Теперь сравним скорость работы программы на вычисление, используя multiprocessing модуль

Kernel Density Estimation. (Ядерная оценка плотности)

Оценим плотность методом окна Парзена.

In [59]:
import numpy as np

def parzen_estimation(x_samples, point_x, h):
    """
    Implementation of a hypercube kernel for Parzen-window estimation.

    Keyword arguments:
        x_sample:training sample, 'd x 1'-dimensional numpy array
        x: point x for density estimation, 'd x 1'-dimensional numpy array
        h: window width

    Returns the predicted pdf as float.

    """
    k_n = 0
    for row in x_samples:
        x_i = (point_x - row[:,np.newaxis]) / (h)
        for row in x_i:
            if np.abs(row) > (1/2):
                break
        else:
            k_n += 1
    return (k_n / len(x_samples)) / (h**point_x.shape[1])

Пример использования:

In [62]:
X_inside = np.array([[0,0,0],[0.2,0.2,0.2],[0.1, -0.1, -0.3]])

X_outside = np.array([[-1.2,0.3,-0.3],[0.8,-0.82,-0.9],[1, 0.6, -0.7],
                      [0.8,0.7,0.2],[0.7,-0.8,-0.45],[-0.3, 0.6, 0.9],
                      [0.7,-0.6,-0.8]])

point_x = np.array([[0],[0],[0]])
X_all = np.vstack((X_inside,X_outside))

print('p(x) =', parzen_estimation(X_all, point_x, h=1))

p(x) = 0.3


Сгенерируем данные:

In [63]:
import numpy as np

np.random.seed(123)

# Generate random 2D-patterns
mu_vec = np.array([0,0])
cov_mat = np.array([[1,0],[0,1]])
x_2Dgauss = np.random.multivariate_normal(mu_vec, cov_mat, 10000)

Вопрос заключается в том, какой размер окна выбрать. Изменим функцию parzen_estimation, чтобы она возвращала дополнительно размер окна:

In [70]:
def parzen_estimation(x_samples, point_x, h):
    k_n = 0
    for row in x_samples:
        x_i = (point_x - row[:,np.newaxis]) / (h)
        for row in x_i:
            if np.abs(row) > (1/2):
                break
        else:
            k_n += 1
    return (h, (k_n / len(x_samples)) / (h**point_x.shape[1]))

Однопоточный алгоритм вычисления для нескольких окон:

In [71]:
def serial(samples, x, widths):
    return [parzen_estimation(samples, x, w) for w in widths]

### Упражнение

Написать многопоточный вариант, используя Pool.apply_async

In [72]:
def multiprocess(processes, samples, x, widths):
    pool = mp.Pool(processes=processes)
    results = [pool.apply_async(parzen_estimation, args=(samples, x, w)) for w in widths]
    results = [p.get() for p in results]
    results.sort() # to sort the results by input window width
    return results

Запустим multiprocess версию и посмотрим на результаты

In [75]:
point_x = np.array([[0],[0]])
widths = np.linspace(1.0, 1.2, 100)

In [76]:
import timeit

mu_vec = np.array([0,0])
cov_mat = np.array([[1,0],[0,1]])
n = 10000

x_2Dgauss = np.random.multivariate_normal(mu_vec, cov_mat, n)

benchmarks = []

benchmarks.append(timeit.Timer('serial(x_2Dgauss, point_x, widths)',
            'from __main__ import serial, x_2Dgauss, point_x, widths').timeit(number=1))

benchmarks.append(timeit.Timer('multiprocess(2, x_2Dgauss, point_x, widths)',
            'from __main__ import multiprocess, x_2Dgauss, point_x, widths').timeit(number=1))

benchmarks.append(timeit.Timer('multiprocess(4, x_2Dgauss, point_x, widths)',
            'from __main__ import multiprocess, x_2Dgauss, point_x, widths').timeit(number=1))

#### отобразить benchmarks на графике

При написании программ с использованием модуля **multiprocessing** нужно помнить, что:

- передача данных между процессами - это дорого
- если задача легкая, а данные тяжелые, то возможно лучше ничего не параллелить
- нет ограничения в виде GIL, можно легко параллелить тяжелые независимые задачи