# Параллельный клиент IPython

### Перемножим матрицы $a$ и $b$

In [10]:
N = 5000
a = np.random.randn(N, N)
b = np.random.randn(N, N)

### 1. Перемножим на локальном компьютере

In [1]:
import numpy as np

In [46]:
%%time

c = np.dot(a,b)

Wall time: 3.47 s


In [39]:
# Выведем ответ
print(c)

[[ -73.92772554   27.81748138  -74.08860251 ...  144.4374928
  -153.28732022  -52.3906999 ]
 [ 116.23129541  -98.36813326   56.43144007 ...   30.60570389
   -60.23091495    6.78939991]
 [   0.38552557  -10.78978836  -33.08282081 ...   19.52989765
  -149.35807551  106.05437605]
 ...
 [ -28.23364076 -139.32368729  -81.32322392 ...  -43.7645375
   101.16074104  115.83318958]
 [  46.40019569  -28.49279027  -51.25809139 ...   62.58790431
     1.99866508   32.20752042]
 [ -89.10667258  -10.15040472  -20.12405314 ...   67.68362611
    51.43630807   -8.90861223]]


### 2. Перемножим на параллельном клиенте IPython

In [14]:
# Инициализируем параллельный клиент IPython
from ipyparallel import Client
cl = Client()

In [15]:
# Проверим количество доступных процессов параллельного клиента IPython
cl.ids

[0, 1, 2]

In [16]:
# Присвоим параллельный клиент IPython переменной v
v = cl[:]

In [17]:
# Проверим работоспособность параллельного клиента IPython
v.apply_sync(lambda : "Hello!")

['Hello!', 'Hello!', 'Hello!']

In [47]:
# Импортируем необходимые библиотеки на каждый процесс параллельного клиента IPython
v.execute('import numpy as np', block=True)

<AsyncResult: execute:finished>

In [41]:
# Отправим на каждый процесс параллельного клиента IPython матрицы, которые необходимо перемножить
v.push(dict(a=a, b=b), block=True)

[None, None, None]

In [49]:
%%time

# Выполним вычисления матрицы на каждом из процесс параллельного клиента IPython
r = v.execute('c = np.dot(a,b)', block=True)

# Методу execute() можно передавать сразу несколько функций, отделенных запятой, 
# например 'c = np.dot(a,b); print(c)'

Wall time: 10.2 s


In [50]:
# Вернем поле вывода параллельного клиента IPython после вычислений.
# Поскольку при вычислении не использовался вывод, display_outputs() выведет пустой дисплей.
r.display_outputs()

In [51]:
# Вернем результаты работы параллельного клиента IPython после вычислений.
dot = v.pull('c', block=True)

In [52]:
# Выведем поулченные значения перемножения матриц
print(dot)

[array([[ -73.92772554,   27.81748138,  -74.08860251, ...,  144.4374928 ,
        -153.28732022,  -52.3906999 ],
       [ 116.23129541,  -98.36813326,   56.43144007, ...,   30.60570389,
         -60.23091495,    6.78939991],
       [   0.38552557,  -10.78978836,  -33.08282081, ...,   19.52989765,
        -149.35807551,  106.05437605],
       ...,
       [ -28.23364076, -139.32368729,  -81.32322392, ...,  -43.7645375 ,
         101.16074104,  115.83318958],
       [  46.40019569,  -28.49279027,  -51.25809139, ...,   62.58790431,
           1.99866508,   32.20752042],
       [ -89.10667258,  -10.15040472,  -20.12405314, ...,   67.68362611,
          51.43630807,   -8.90861223]]), array([[ -73.92772554,   27.81748138,  -74.08860251, ...,  144.4374928 ,
        -153.28732022,  -52.3906999 ],
       [ 116.23129541,  -98.36813326,   56.43144007, ...,   30.60570389,
         -60.23091495,    6.78939991],
       [   0.38552557,  -10.78978836,  -33.08282081, ...,   19.52989765,
        -149.358

In [53]:
# Сравним результаты вычислений на локального компьютера и параллельного клиента IPython
assert np.allclose(c, dot[0])
assert np.allclose(dot[0], dot[1])
assert np.allclose(dot[1], dot[2])

# Функциональный метод многопоточности

In [30]:
from threading import Thread

In [31]:
# Целевая функция. В данном примере функция заполняет текстовый файл строками

def prescript(thefile, num):
    with open(thefile, 'w') as f:
        for i in range(num):
            if num > 500:
                f.write('МногоБукв\n')
            else:
                f.write('МалоБукв\n')

In [32]:
# Создаем два потока. Каждому потоку передаем свои значения функции.
thread1 = Thread(target=prescript, args=('f1.txt', 200,))
thread2 = Thread(target=prescript, args=('f2.txt', 1000,))

In [33]:
# Метод start() запускает поток, метод join() останавливает поток, когда тот выполнит свои задачи
thread1.start()
thread2.start()
thread1.join()
thread2.join()

# Классовый метод многопоточности

Для потока со сложным поведением обычно пишут отдельный класс, который наследуют от Thread из модуля threading. В этом случае программу действий потока прописывают в методе run() созданного класса. 

In [80]:
import threading

In [81]:
class MyThread(threading.Thread):
    def __init__(self, num):
        super().__init__(self, name="threddy" + num)
        self.num = num
    def run(self):
        print ("Thread ", self.num)

In [None]:
thread1 = MyThread("1")
thread2 = MyThread("2")

In [None]:
thread1.start()
thread2.start()
thread1.join()
thread2.join()

Чтобы управлять потоками, нужно следить, как они себя ведут. И для этого в threading есть специальные методы:

- current_thread()  — смотрим, какой поток вызвал функцию;
- active_count() — считаем работающие в данный момент экземпляры класса Thread;
- enumerate()   — получаем список работающих потоков.

Ещё можно управлять потоком через методы класса:
- is_alive()  —  спрашиваем поток: «Жив ещё, курилка?»  — получаем true или false;
- getName()  — узнаём имя потока;
- setName(any_name)  — даём потоку имя;

У каждого потока, пока он работает, есть уникальный идентификационный номер, который хранится в переменной ident.

In [None]:
thread1.start()
print(thread1.ident)

Отсрочить операции в вызываемых потоком функциях можно с помощью таймера. В инициализаторе объектов класса Timer всего два аргумента — время ожидания в секундах и функция, которую нужно в итоге выполнить:

In [85]:
import threading
print ("Waiting...")
def timer_test():
    print ("The timer has done its job!")
tim = threading.Timer(5.0, timer_test)
tim.start()

Waiting...


Таймер можно один раз создать, а затем запускать в разных частях кода.

**Потусторонние потоки**

Обычно Python-приложение не завершается, пока работает хоть один его поток. Но есть особые потоки, которые не мешают закрытию программы и останавливается вместе с ней. Их называют демонами (daemons). Проверить, является ли поток демоном, можно методом isDaemon(). Если является, метод вернёт истину.

Назначить поток демоном можно  при создании — через параметр “daemon=True” или аргумент в инициализаторе класса.

In [None]:
thread0 = Thread(target=target_func, kwargs={‘x’:10}, daemon=True)
Не поздно демонизировать и уже существующий поток методом setDaemon(daemonic).

# Global Interpreter Lock (GIL)

Global Interpreter Lock, он же GIL - глобальный шлюз, который ограничивает многопоточность на уровне интерпретатора.

Задача шлюза  — пропускать потоки строго по одному, чтоб не летали наперегонки, как печально известные стритрейсеры, и не создавали угрозу работе интерпретатора.

Без шлюза потоки подрезали бы друг друга, чтобы первыми добраться до памяти, но это еще не всё. Они имеют обыкновение внезапно засыпать за рулём! Операционная система не спрашивает, вовремя или невовремя  — просто усыпляет их в ей одной известный момент. Из-за этого неупорядоченные потоки могут неожиданно перехватывать друг у друга инициативу в работе с общими ресурсами.

Дезориентированный спросонок поток, который видит перед собой совсем не ту ситуацию, при которой засыпал, рискует разбиться и повалить интерпретатор, либо попасть в тупиковую ситуацию (deadlock). Например, перед сном Поток 1 начал работу со списком, а после пробуждения не нашёл в этом списке элементов, т.к. их удалил или перезаписал Поток 2.

Чтобы такого не было, GIL в предсказуемый момент (по умолчанию раз в 5 миллисекунд для Python 3.2+) командует отработавшему потоку: «СПАААТЬ!»  — тот отключается и не мешает проезжать следующему желающему. Даже если желающего нет, блокировщик всё равно подождёт, прежде чем вернуться к предыдущему активному потоку.

Благодаря шлюзу однопоточные приложения работают быстро, а потоки не конфликтуют. Но, к сожалению, многопоточные программы при таком подходе выполняются медленнее  — слишком много времени уходит на регулировку «дорожного движения». А значит обработка графики, расчет математических моделей и поиск по большим массивам данных c GIL идут неприемлемо долго.

В статье «Understanding Python GIL»технический директор компании Gaglers Inc. и разработчик со стажем Chetan Giridhar приводит такой пример:

In [88]:
from datetime import datetime
import threading
def factorial(number): 
    fact = 1
    for n in range(1, number+1): 
        fact *= n 
    return fact 
number = 100000 
thread = threading.Thread(target=factorial, args=(number,)) 
startTime = datetime.now() 
thread.start() 
thread.join()

endTime = datetime.now() 
print ("Время выполнения: ", endTime - startTime)

Время выполнения:  0:00:05.659194


Код вычисляет факториал числа 100 000 и показывает, сколько времени ушло у машины на эту задачу. При тестировании на одном ядре и с одним потоком вычисления заняли 3,4 секунды. Тогда Четан создал и запустил второй поток. Расчет факториала на двух ядрах длился 6,2 секунды. А ведь по логике скорость вычислений не должна была существенно измениться! Повторите этот эксперимент на своей машине и посмотрите, насколько медленнее будет решена задача, если вы добавите thread2.

Глобальный шлюз  — наследие времён, когда программисты боролись за достойную реализацию многозадачности и у них не очень получалось. Но зачем он сегодня, когда есть много- и очень многоядерные процессоры? Как объяснил Гвидо ван Россум, без GIL не будут нормально работать C-расширения для Python. Ещё упадёт производительность однопоточных приложений: Python 3 станет медленнее, чем Python 2, а это никому не нужно.

## Обход GIL

Шлюз можно временно отключить. Для этого интерпретатор Python нужно отвлечь вызовом функции из внешней библиотеки или обращением к операционной системе. Например, шлюз выключится на время сохранения или открытия файла. Помните наш пример с записью строк в файлы? Как только вызванная функция возвратит управление коду Python или интерфейсу Python C API, GIL снова включается.

Как вариант, для параллельных вычислений можно использовать процессы, которые работают изолированно и неподвластны GIL. Но это большая отдельная тема. Сейчас нам важнее найти решение для многопоточности.

Если вы собираетесь использовать Python для сложных научных расчётов, обойти скоростную проблему GIL помогут библиотеки Numba, NumPy, SciPy и др. Опишу некоторые  из них в двух словах, чтобы вы поняли, стоит ли разведывать это направление дальше.

**Numba  для математики**

Numba — динамически, «на лету» компилирует Python-код, превращая его в машинный код для исполнения на CPU и GPU. Такая технология компиляции называется JIT — “Just in time”. Она помогает оптимизировать производительность программ за счет ускорения работы циклов и компиляции функций при первом запуске.

Суть в том, что вы ставите аннотации (декораторы) в узких местах кода, где вам нужно ускорить работу функций.

Для математических расчётов библиотеку удобно использовать в связке c NumPy.  Допустим, нужно сложить одномерные массивы — элемент за элементом.

In [7]:
def arr_sum(x, y):
    result_arr = np.empty_like(x)
    for i in range(len(x)):
        result_arr[i] = x[i] + y[i]
    return result_arr

In [17]:
N = 10000000
x = np.random.randn(N)
y = np.random.randn(N)

In [18]:
%%time
z = arr_sum(x=x, y=y)

Wall time: 2.83 s


Метод __nupmy.empty_like()__ принимает массив и возвращает (но не инициализирует!) другой  — соответствующий исходному по форме и типу. Чтобы ускорить выполнение кода, импортируем класс __jit__ из модуля __numba__ и добавляем в начало кода аннотацию __@jit__:

In [20]:
from numba import jit

In [23]:
@jit
def arr_sum(x,y):
    result_arr = np.empty_like(x)
    for i in range(len(x)):
        result_arr[i] = x[i] + y[i]
    return result_arr

In [24]:
%%time
z = arr_sum(x=x, y=y)

Wall time: 155 ms


Это скромное дополнение способно ускорить выполнение операции более чем в 100 раз! Если интересно, посмотрите [замеры скорости](https://habr.com/ru/post/336684/) математических расчётов при использовании разных библиотек для Python.