# || минимум

+ Процесс — запущенная программа.
+ У каждого процесса есть изолированное от других
процессов состояние:
    + виртуальное адресное пространство,
    + указатель на исполняемую инструкцию,
    + стек вызовов,
    + системные ресурсы, например, открытые файловые
дескрипторы.
+ Процессы удобны для одновременного выполнения
нескольких задач.
+ Альтернативный способ: делегировать каждую задачу на
выполнение потоку

+ Поток похож на процесс тем, что его исполнение
происходит независимо от других потоков (и процессов).
+ В отличие от процесса поток исполняется внутри процесса
и разделяет с ним адресное пространство и системные
ресурсы.
+ Потоки удобны для одновременного выполнения
нескольких задач, которым требуется доступ к
разделяемому состоянию.
+ Совместным выполнением нескольких процессов и
потоков управляет операционная система, поочерёдно
разрешая каждому процессу или потоку использовать
сколько-то циклов процессора.


# Модуль threading

+ Поток в Python — это системный поток, то есть его
выполнением управляет не интерпретатор, а
операционная система.
+ Создать поток можно с помощью класса Thread из модуля
стандартной библиотеки threading.

In [27]:
import time
from threading import Thread
def countdown(n):
    for i in range(n):
        print(n - i - 1, "left")
        time.sleep(1)

t = Thread(target=countdown, args=(4, ))
t.start()

3 left
2 left
1 left
0 left


In [4]:
print('down')

1 left
down
0 left


Альтернативный способ создания потока — наследование:

In [5]:
class CountdownThread(Thread):
    def __init__(self, n):
        super().__init__()
        self.n = n

    def run(self): # вызывается методом start.
        for i in range(self.n):
            print(self.n - i - 1, "left")
            time.sleep(1)

In [6]:
t = CountdownThread(3)
t.start()

2 left
1 left
0 left


Минус такого подхода в том, что он ограничивает
переиспользование кода: функциональность класса
CountdownThread можно использовать только в отдельном
потоке.

При создании потоку можно указать имя. По умолчанию
оно "Thread-N":

In [8]:
Thread().name

'Thread-10'

In [9]:
Thread(name="NumberCruncher").name

'NumberCruncher'

У каждого активного потока есть идентификатор —
неотрицательное число, уникальное для всех активных
потоков.

In [10]:
t = Thread()
t.start()
t.ident

129732

Список активных потоков можно получить методом *enumerate()*

In [11]:
import threading
threading.enumerate()

[<HistorySavingThread(IPythonHistorySavingThread, started 137540)>,
 <ParentPollerWindows(Thread-3, started daemon 141128)>,
 <Thread(Thread-4, started daemon 136196)>,
 <_MainThread(MainThread, started 141644)>,
 <Heartbeat(Thread-5, started daemon 140852)>]

## Присоединение потоков

+ Метод join позволяет дождаться завершения потока.
+ Выполнение вызывающего потока приостановится, пока не
завершится поток t.
+ Повторные вызовы метода join не имеют эффекта.

Пример:


In [12]:
t = Thread(target=time.sleep, args=(5, ))
t.start()
t.join() # блокируется на 5 секунд
t.join() # выполняется моментально

Проверить, выполняется ли поток, можно с помощью
метода is_alive:

In [13]:
t = Thread(target=time.sleep, args=(5, ))
t.start()
t.is_alive()

True

In [14]:
t.is_alive()

False

Демон — это поток, созданный с аргументом daemon=True:

In [15]:
t = Thread(target=time.sleep, args=(5, ), daemon=True)
t.start()
t.daemon

True

+ Отличие потока-демона от обычного потока в том,
потоки-демоны автоматически уничтожаются при выходе
из интерпретатора.
+ Уничтожение потока-демона не подразумевает процедуру
финализации, поэтому следует быть аккуратным при
использовании демонов для задач, работающих с
ресурсами.

+ В Python нет встроенного механизма завершения
потоков — это не случайность, а осознанное решение
разработчиков языка.
+ Корректное завершение потока часто связано с
освобождением ресурсов, например:
    + поток может работать с файлом, дескриптор которого
нужно закрыть,
    + или захватить примитив синхронизации.
+ Для завершения потока обычно используют флаг:


In [17]:
class Task:
    def __init__(self):
        self._running = True

    def terminate(self):
        self._running = False

    def run(self, n):
        while self._running:
            # ...

SyntaxError: unexpected EOF while parsing (<ipython-input-17-2939a3ff2a0f>, line 10)

## Примитивы синхронизации: мьютексы и семафоры

Набор примитивов синхронизации в модуле threading
стандартный:
+ Lock — обычный мьютекс, используется для обеспечения
эксклюзивного доступа к разделяемому состоянию.
+ RLock — рекурсивный мьютекс, разрешающий потоку,
владеющему мьютексом, захватить мьютекс более одного
раза.
+ Semaphore — вариация мьютекса, которая разрешает
захватить себя не более фиксированного числа раз.
+ BoundedSemaphore — семафор, который следит за тем, что
его захватили и отпустили одинаковое количество раз.

Все примитивы синхронизации реализуют единый
интерфейс:
+ метод acquire захватывает примитив синхронизации,
+ а метод release отпускает его.

In [18]:
class SharedCounter:
    def __init__(self, value):
        self._value = value
        self._lock = Lock()

    def increment(self, delta=1):
        self._lock.acquire()
        self._value += delta
        self._lock.release()
    
    def get(self):
        return self._value

+ Все мьютексы и семафоры в модуле threading
реализованы “с нуля” в терминах примитивного бинарного
семафора(http://bit.ly/cpython-thread)
``` C
typedef struct {
char locked;
cond_t lock_released;
mutex_t mut;
} lock_t;
```
+ Мьютекс mut используется **только** для синхронизации
доступа к полю locked.
+ Забавное следствие: для мьютекса в Python не определено
понятие владеющего потока, то есть поток может отпустить
мьютекс, не захваченный им.

Пример(http://bit.ly/beazley-synchronization):

In [20]:
done = threading.Lock()
def idle_release():
    print("Running!")
    time.sleep(15)
    done.release()

In [21]:
done.acquire()

True

In [22]:
Thread(target=idle_release).start()

Running!


In [None]:
done.acquire() and print("WAT?") # будет работать через 15 секунд (и повторно ее лучше не запускать)

## Примитивы синхронизации: условные переменные

+ Condition используется для отправки сигналов между
потоками.
+ Метод wait блокирует вызывающий поток, пока какой-то
другой поток не вызовет метод **notify** или **notify_all**.

+ Поток может разблокироваться даже если метод notify не
был вызван. Такое поведение называют **spurious wakeup**.


### Пример: функция follow
Функция follow читает сообщения из переданного ей
соединения и кладёт их в очередь на обработку.


In [None]:
def follow(connection, connection_lock, q):
    try:
        while True:
            connection_lock.acquire()
            message = connection.read_message()
            connection_lock.release()
            q.put(message)
    except InvalidMessage:
        follow(connection, connection_lock, q)

follower = Thread(target=follow, args=...)
follower.start()

Чтобы минимизировать ошибки при использовании методов
**acquire release**, все примитивы синхронизации поддерживают
протокол менеджеров контекста.

In [None]:
def follow(connection, connection_lock, q):
    try:
        while True:
            with connection_lock:
                message = connection.read_message()
                q.put(message)
    except IOError:
        follow(connection, connection_lock, q)

# Модуль queue

+ Модуль queue реализует несколько потокобезопасных
очередей:
    + Queue — FIFO очередь,
    + LifoQueue — LIFO очередь aka стек,
    + PriorityQueue — очередь, элементы которой — пары вида
(priority, item).
+ Никаких особых изысков в реализации очередей нет: все
методы, изменяющие состояние, работают “внутри”
мьютекса.
+ Класс Queue использует в качестве контейнера deque, а
классы LifoQueue и PriorityQueue — список.

Пример работы с очередью:

In [1]:
def worker(q):
    while True:
        item = q.get() # блокирующе ожидает следующий
        do_something(item) # элемент
        q.task_done() # уведомляет очередь о выполнении  задания


def master(q):
    for item in source():
        q.put(item)
        # блокирующе ожидает, пока все элементы очереди
        # не будут обработаны
    q.join()

# Модуль futures
Модуль **concurrent.futures** содержит абстрактный класс
**Executor** и его реализацию в виде пула потоков —
**ThreadPoolExecutor**.

Интерфейс исполнителя состоит всего из трёх методов:

In [2]:
from concurrent.futures import ThreadPoolExecutor

In [3]:
executor = ThreadPoolExecutor(max_workers=4)

In [4]:
executor.submit(print, "Hello, world!")

Hello, world!

<Future at 0x2b7be92d4e0 state=running>




In [5]:
list(executor.map(print, ["Knock?", "Knock!"])) # Раскидывает вызов по потокам

Knock?
Knock!


[None, None]

In [6]:
executor.shutdown() # выключение экзекьютора

Исполнители поддерживают протокол менеджеров
контекста:
```python
with ThreadPoolExecutor(max_workers=4) as executor:
... # ...
...
```

Метод **Executor.submit** возвращает экземпляр класса
**Future**, инкапсулирующего асинхронные вычисления.

Что можно делать с Future?
```python
>>> with ThreadPoolExecutor(max_workers=4) as executor:
... f = executor.submit(sorted, [4, 3, 1, 2])
...
```
+ Поинтересоваться состоянием вычисления:
```python
>>> f.running(), f.done(), f.cancelled()
(False, True, False)
```
+ Блокирующе подождать результата вычисления:
```python
>>> print(f.result())
[1, 2, 3, 4]
>>> print(f.exception())
None
```
+ Добавить функцию, которая будет вызвана после
завершения вычисления:
```python
>>> f.add_done_callback(print)
<Future at 0x102edaac8 state=finished returned list>
```

### Пример использования модуля futures: integrate

In [8]:
import math
def integrate(f, a, b, *, n_iter=1000):
    acc = 0
    step = (b - a) / n_iter
    for i in range(n_iter):
        acc += f(a + i * step) * step
    return acc


In [9]:
integrate(math.cos, 0, math.pi / 2)

1.0007851925466296

In [10]:
integrate(math.sin, 0, math.pi)

1.9999983550656637

In [5]:
from functools import partial
from concurrent.futures import as_completed
def integrate_async(f, a, b, *, n_jobs, n_iter=1000):
    executor = ThreadPoolExecutor(max_workers=n_jobs)
    spawn = partial(executor.submit, integrate, f,
    n_iter=n_iter // n_jobs)

    step = (b - a) / n_jobs
    fs = [spawn(a + i * step, a + (i + 1) * step) for i in range(n_jobs)]
    return sum(f.result() for f in as_completed(fs))

In [14]:
integrate_async(math.cos, 0, math.pi / 2, n_jobs=2)

1.0007851925466305

In [15]:
integrate_async(math.sin, 0, math.pi, n_jobs=2)

1.9999983550656624

Модули threading, queue и concurrent.futures реализуют
привычные инструменты для || программирования на
Python.

# Параллелизм и конкурентность

Сравним производительность последовательной и
параллельной версий функции integrate с помощью
“магической” команды timeit:


In [17]:
%timeit -n100 integrate(math.cos, 0, math.pi / 2, n_iter=10**6)

433 ms ± 12.7 ms per loop (mean ± std. dev. of 7 runs, 100 loops each)


In [18]:
%timeit -n100 integrate_async(math.cos, 0, math.pi / 2, n_iter=10**6, n_jobs=2)

380 ms ± 14.6 ms per loop (mean ± std. dev. of 7 runs, 100 loops each)


In [19]:
%timeit -n100 integrate_async(math.cos, 0, math.pi / 2, n_iter=10**6, n_jobs=4)

373 ms ± 13.2 ms per loop (mean ± std. dev. of 7 runs, 100 loops each)


Ответом на возможный вопрос, почему нет различий в скорости выполнения программ, являются три буквы, и буквы эти *G.I.L*

+ GIL (global interpreter lock) — это мьютекс, который
гарантирует, что в каждый момент времени только один
поток имеет доступ к внутреннему состоянию
интерпретатора.
+ Python C API позволяет отпустить GIL, но это безопасно
только при работе с объектами, не зависящими от
интерпретатора Python.
+ Например, все операции ввода/вывода в CPython
отпускают GIL(http://bit.ly/cpython-fileio):
```c
// ...
Py_BEGIN_ALLOW_THREADS
err = close(fd);
if (err < 0)
save_errno = errno;
Py_END_ALLOW_THREADS
// ...
```

## GIL — это плохо?

+ Ответ зависит от задачи.
+ Наличие GIL делает невозможным использование потоков
в Python для параллелизма: несколько потоков не
ускоряют, а иногда даже замедляют работу программы.
+ GIL не мешает использовать потоки для конкурентности
при работе с вводом/выводом, например:
```python
>>> from urllib.request import urlretrieve
>>> with ThreadPoolExecutor(max_workers=4) as executor:
... with open("urls.txt", "w") as handle:
... for url in handle:
... executor.submit(urlretrieve, url)
...
```
+ Альтернативный подход к организации конкурентной
работы с вводом/выводом основан на использовании
паттернов реактор и проактор.

# Модуль asyncio
Модуль asyncio появился сравнительно недавно. Его вдохновители вне
стандартной библиотеки Python: twisted, tornado, gevent.

In [20]:
import asyncio
async def echo(source, target):
    while True:
        line = await source.readline() # ->
        if not line:
            break
        target.write(line)

loop = asyncio.get_event_loop()
server = asyncio.start_server(echo, port=8080)
loop.create_task(server)
try:
    loop.run_forever()
finally:
    server.close()
    loop.close()

RuntimeError: Cannot close a running event loop

От **GIL** можно уйти через **Cython**

In [21]:
%load_ext cython

In [22]:
%%cython
from libc.math cimport cos
def integrate_async(f, double a, double b, long n_iter):
            # ^ мы используем C-версию функции
    cdef double acc = 0
    cdef double step = (b - a) / n_iter
    cdef long i
    with nogil:
        for i in range(n_iter):
            acc += cos(a + i * step) * step
    return acc


In [25]:
%timeit -n100 integrate_async(math.cos, 0, math.pi / 2, n_iter=10**6)

12.1 ms ± 224 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


# Модуль multiprocessing

+ Можно использовать вместо потоков процессы.
+ У каждого процесса будет свой GIL, но он не помешает им
работать параллельно.
+ За работу с процессами в Python отвечает модуль
multiprocessing:

In [28]:
import multiprocessing as mp
p = mp.Process(target=countdown, args=(5, ))
p.start()

In [29]:
p.name, p.pid

('Process-1', 150180)

In [30]:
p.daemon

False

In [31]:
p.join()

In [32]:
p.exitcode

1

+ Модуль реализует базовые примитивы синхронизации:
мьютексы, семафоры, условные переменные.
+ Для организации взаимодействия между процессами
можно использовать *Pipe* — основанное на сокете
соединение между двумя процессами:

In [33]:
def ponger(conn):
    conn.send("pong")

In [34]:
parent_conn, child_conn = mp.Pipe()

In [35]:
p = mp.Process(target=ponger, args=(child_conn, ))

In [None]:
p.start()
parent_conn.recv()

In [None]:
p.join()

In [10]:
import math
from concurrent.futures import ProcessPoolExecutor

def integrate_async(f, a, b, *, n_jobs, n_iter=1000):
    executor = ProcessPoolExecutor(max_workers=n_jobs)
    spawn = partial(executor.submit, integrate, f, n_iter=n_iter // n_jobs)
    
    step = (b - a) / n_jobs
    fs = [spawn(a + i * step, a + (i + 1) * step) for i in range(n_jobs)]
    return sum(f.result() for f in as_completed(fs))

In [11]:
%timeit -n100 integrate_async(math.cos, 0, math.pi / 2, n_iter=10**6, n_jobs=4)

BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

Exception in thread Thread-7:
Traceback (most recent call last):
  File "c:\users\professional\appdata\local\programs\python\python35\lib\threading.py", line 914, in _bootstrap_inner
    self.run()
  File "c:\users\professional\appdata\local\programs\python\python35\lib\threading.py", line 862, in run
    self._target(*self._args, **self._kwargs)
  File "c:\users\professional\appdata\local\programs\python\python35\lib\concurrent\futures\process.py", line 295, in _queue_management_worker
    shutdown_worker()
  File "c:\users\professional\appdata\local\programs\python\python35\lib\concurrent\futures\process.py", line 253, in shutdown_worker
    call_queue.put_nowait(None)
  File "c:\users\professional\appdata\local\programs\python\python35\lib\multiprocessing\queues.py", line 129, in put_nowait
    return self.put(obj, False)
  File "c:\users\professional\appdata\local\programs\python\python35\lib\multiprocessing\queues.py", line 83, in put
    raise Full
queue.Full



# Пакет joblib
+ Пакет joblib реализует параллельный аналог цикла for,
который удобно использовать для параллельного
выполнения независимых задач.

In [12]:
from joblib import Parallel, delayed
def integrate_async(f, a, b, *, n_jobs, n_iter=1000, backend=None):
    step = (b - a) / n_jobs
    with Parallel(n_jobs=n_jobs, backend=backend) as parallel:
        fs = (delayed(integrate)(a + i * step, a + (i + 1) * step, n_iter=n_iter // n_jobs)
        for i in range(n_jobs))
    return sum(parallel(fs))

ImportError: No module named 'joblib'

В качестве значения аргумента backend можно указать
*"threading"* или *"multiprocessing"*.

+ GIL — это глобальный мьютекс, который ограничивает
возможности использования потоков для параллелизма в
программах на СPython.
+ Для программ, использующих, в основном, операции
ввода/вывода, GIL не страшен: в CPython эти операции
отпускают GIL.
+ Для программ, нуждающихся в параллелизме, для
повышения производительности есть варианты:
    + писать критическую функциональность на C или Cython или
    + использовать модуль multiprocessing.