# 7. Параллельные вычисления. Модули threading, multiprocessing, subprocess.

## Параллельные вычисления

* потоки
* процессы

## 7.1. Модуль threading

Этот модуль реализует высокоуровневый интерфейс для работы с потоками на основе модуля `_thread`.
Некоторые функции:
* active_count() — количество активных потоков
* current_thread() — текущий поток
* enumerate() — список всех работающих потоков
* main_thread() — возвращает объект-главный поток

### 7.1.1. Класс threading.Thread

Конструктор `Thread(target=None, name=None, args=(), kwargs={}, daemon=None, ...)`:
* target — функтор, который будет выполнен в отдельном потоке
* name — имя потока (по умолчанию «Thread-N»)
* args — аргументы в виде кортежа
* kwargs — аргументы в виде словаря
* daemon — запускать ли в режиме демона

In [1]:
import threading
import datetime

def Callable(a, b):
    import time
    time.sleep(1)
    return a + b

begin = datetime.datetime.now()
for x in range(10):
    Callable(x, x + 3)
end = datetime.datetime.now()
print("Последовательное выполнение:", end - begin)

begin = datetime.datetime.now()
threads = []

for x in range(10):
    threads.append(threading.Thread(target=Callable, args=(x, x + 3)))
    threads[-1].start()

for t in threads:
    t.join()

end = datetime.datetime.now()
print("Параллельное выполнение:", end - begin)

Последовательное выполнение: 0:00:10.014459
Параллельное выполнение: 0:00:01.006376


Либо создать дочерний класс, унаследованный от threading.Thread, и переопределить у него методы run() и \_\_init\_\_()

In [2]:
import threading
import datetime

class Callable(threading.Thread):
    
    def __init__(self, a, b):
        threading.Thread.__init__(self)
        self.a = a
        self.b = b

    def run(self):
        import time
        time.sleep(1)
        return self.a + self.b

begin = datetime.datetime.now()
threads = []

for x in range(10):
    threads.append(Callable(x, x + 3))
    threads[-1].start()
    print(threading.active_count())
    
for t in threads:
    t.join()

end = datetime.datetime.now()
print("Параллельное выполнение:", end - begin)

6
7
8
9
10
11
12
13
14
15
Параллельное выполнение: 0:00:01.008634


Остальные методы:
* is_alive() — возвращает `true`, если поток еще работает
* join(timeout=None) — блокирует выполнение текущего потока до тех пор, пока не завершится поток, для которого этот метод вызван
* enumerate() — возвращает список работающих потоков

### 7.1.2. Класс threading.Lock

Методы:
* acquire(blocking=True, timeout=-1) — захватывает лок
* release() — освобождает его

In [3]:
import threading
import datetime

class Callable(threading.Thread):
    
    def __init__(self, a, b, lock):
        super(Callable, self).__init__()
        self.a = a
        self.b = b
        self.lock = lock

    def run(self):
        with self.lock:
            import time
            time.sleep(1)
            # ggg = 3 / 0
        return self.a + self.b

begin = datetime.datetime.now()
threads = []

lock = threading.Lock()

for x in range(10):
    threads.append(Callable(x, x + 3, lock))
    threads[-1].start()
    
for t in threads:
    t.join()

end = datetime.datetime.now()
print("Параллельное выполнение:", end - begin)

Параллельное выполнение: 0:00:10.014954


### 7.1.3. Класс threading.Barrier

Используется для барьерной синхронизации.

In [4]:
import threading
import datetime

barrier = threading.Barrier(11)

class Callable(threading.Thread):
    
    def __init__(self, a):
        super().__init__()
        self.a = a

    def run(self):
        import time
        time.sleep(self.a)
        barrier.wait()
        print(self.a, end=' ')

begin = datetime.datetime.now()
threads = []

for x in range(10):
    threads.append(Callable(x))
    threads[-1].start()

barrier.wait()

end = datetime.datetime.now()
print("Параллельное выполнение:", end - begin)

07 Параллельное выполнение:3194     60:00:09.013253
2   58  

Помимо перечисленных выше классов, есть еще:
* threading.Condition — условия
* threading.Semaphore — семафоры
* threading.Event — события
* threading.Thread.Timer — таймер

Смотрите документацию: https://docs.python.org/3/library/threading.html

## 7.2. Модуль multiprocessing

Для синхронизации реализованы все примитивы из threading. По функциональности этот модуль напоминает threading.
Некоторые методы:
* active_children() — количество дочерних процессов
* cpu_count() — количество ядер в системе
* current_process() — возвращает обхект, соответствующий текущему процессу

### 7.2.1. Класс multiprocessing.Process

In [5]:
from multiprocessing import Process

def Callable(a, b):
    import time
    time.sleep(1)
    return a + b

if __name__ == '__main__':
    p = Process(target=Callable, args=((1,3), (3,6)))
    p.start()
    p.join()

### 7.2.2. Класс multiprocessing.Pool

In [6]:
from multiprocessing import Pool

def f(x):
    import time
    time.sleep(1)
    return x*x

if __name__ == '__main__':
    with Pool(processes=10) as p:
        result = p.map(f, range(100))
        print(result)

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400, 441, 484, 529, 576, 625, 676, 729, 784, 841, 900, 961, 1024, 1089, 1156, 1225, 1296, 1369, 1444, 1521, 1600, 1681, 1764, 1849, 1936, 2025, 2116, 2209, 2304, 2401, 2500, 2601, 2704, 2809, 2916, 3025, 3136, 3249, 3364, 3481, 3600, 3721, 3844, 3969, 4096, 4225, 4356, 4489, 4624, 4761, 4900, 5041, 5184, 5329, 5476, 5625, 5776, 5929, 6084, 6241, 6400, 6561, 6724, 6889, 7056, 7225, 7396, 7569, 7744, 7921, 8100, 8281, 8464, 8649, 8836, 9025, 9216, 9409, 9604, 9801]


Основные аргументы в multiprocessing.pool.Pool(processes, initializer, initargs, ...):
* processes — число воркеров
* initializer — каждый воркер перед стартом запускает `initializer(*initargs)`

### 7.2.3. Обмен данными между процессами

#### 7.2.3.1. На основе очереди

In [7]:
from multiprocessing import Process, Queue

def f(q, value):
    q.put([value ** 2])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q, 11))
    p.start()
    p.join()
    print(q.get())
    


[121]


#### 7.2.3.2. На основе пайпов

In [8]:
from multiprocessing import Process, Pipe

def f(conn, value):
    import time
    time.sleep(3)
    conn.send([value ** 2])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn, 11))
    p.start()
    print(parent_conn.recv())
    p.join()

[121]


## 7.3. Модуль subprocess

Модуль позволяет запускать выполнение в новых процессах, подключаться к их потокам ввода-вывода и ошибок, а также получать коды возврата.

In [9]:
import subprocess
retcode = subprocess.call("ls /", shell=True)
print(retcode)

0


In [10]:
import subprocess
process = subprocess.Popen(["sed", "s/dog/cat/"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE)
stdout, stderr = process.communicate(input=b'dog is a pet')
print(stdout, stderr, process.returncode, sep='\n')

b'cat is a pet'
b''
0


In [11]:
import subprocess
output = subprocess.check_output(["echo 'dog is a pet' | sed s/dog/cat/"], shell=True)
print(output.strip())

b'cat is a pet'


In [12]:
import subprocess
output = subprocess.check_call(["echo 'dog is a pet' | sed s/dog/cat/"], shell=True)
print(output)

0


In [13]:
# начиная с Python 3.5
import subprocess
output = subprocess.run(["echo 'dog is a pet' | sed s/dog/cat/"], shell=True, stdout=subprocess.PIPE)
print(output.stdout)
print(output.returncode)

b'cat is a pet\n'
0


In [14]:
import threading
import time

def consumer(cond):
    with cond:
        # wait for the condition and use the resource
        print('Starting consumer thread')

        t = threading.currentThread()

        cond.wait()

        print('Resource is available to consumer')

def producer(cond):
    with cond:
        # set up the resource to be used by the consumer

        print('Starting producer thread')

        print('Making resource available')

        cond.notifyAll()


condition = threading.Condition()

# pass each thread a 'condition'
c1 = threading.Thread(name='c1', target=consumer, args=(condition,))
c2 = threading.Thread(name='c2', target=consumer, args=(condition,))
p = threading.Thread(name='p', target=producer, args=(condition,))


# start two threads and put them into 'wait' state
c1.start()
c2.start()

# after two seconds or after some operation notify them to free or step over the wait() function
time.sleep(2)
p.start()

Starting consumer thread
Starting consumer thread
Starting producer thread
Resource is available to consumerResource is available to consumer

Making resource available


## 7.4. mpi4py

MPI для Python, документация: http://mpi4py.readthedocs.io/en/stable/

Установка:

    conda install mpi4py
    pip install mpi4py
    
Дополнительно, на системе:
    
    sudo apt-get install mpich
    
Проверить, что MPI работает:

    mpiexec -n 5 ~/opt/anaconda3/bin/python -m mpi4py helloworld
    
Далее несколько примеров из документации.

In [15]:
%%file sem07mpi1.py

from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

if rank == 0:
    data = {'a': 7, 'b': 3.14}
    comm.send(data, dest=1, tag=11)
elif rank == 1:
    data = comm.recv(source=0, tag=11)
    
print(data)

Overwriting sem07mpi1.py


In [16]:
%%bash
mpirun -n 2 ~/opt/anaconda3/bin/python sem07mpi1.py

{'a': 7, 'b': 3.14}
{'a': 7, 'b': 3.14}


In [17]:
%%file sem07mpi2.py
from mpi4py import MPI
import numpy

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

# passing MPI datatypes explicitly
if rank == 0:
    data = numpy.arange(1000, dtype='i')
    comm.Send([data, MPI.INT], dest=1, tag=77)
elif rank == 1:
    data = numpy.empty(1000, dtype='i')
    comm.Recv([data, MPI.INT], source=0, tag=77)
    
print(data[:32])

# automatic MPI datatype discovery
if rank == 0:
    data = numpy.arange(100, dtype=numpy.float64)
    comm.Send(data, dest=1, tag=13)
elif rank == 1:
    data = numpy.empty(100, dtype=numpy.float64)
    comm.Recv(data, source=0, tag=13)

print(data[:32])

Overwriting sem07mpi2.py


In [18]:
%%bash
mpirun -n 2 ~/opt/anaconda3/bin/python sem07mpi2.py

[ 0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
 25 26 27 28 29 30 31]
[  0.   1.   2.   3.   4.   5.   6.   7.   8.   9.  10.  11.  12.  13.  14.
  15.  16.  17.  18.  19.  20.  21.  22.  23.  24.  25.  26.  27.  28.  29.
  30.  31.]
[ 0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
 25 26 27 28 29 30 31]
[  0.   1.   2.   3.   4.   5.   6.   7.   8.   9.  10.  11.  12.  13.  14.
  15.  16.  17.  18.  19.  20.  21.  22.  23.  24.  25.  26.  27.  28.  29.
  30.  31.]


In [19]:
%%file sem07mpi3.py
from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

if rank == 0:
    data = {'key1' : [7, 2.72, 2+3j],
            'key2' : ( 'abc', 'xyz')}
else:
    data = None
data = comm.bcast(data, root=0)

print(data)

Overwriting sem07mpi3.py


In [20]:
%%bash
mpirun -n 2 ~/opt/anaconda3/bin/python sem07mpi3.py

{'key1': [7, 2.72, (2+3j)], 'key2': ('abc', 'xyz')}
{'key1': [7, 2.72, (2+3j)], 'key2': ('abc', 'xyz')}


## 7.5. Другие возможности
  - pycuda
  - pyopencl
  - IPython notebook parallel kernels
  - celery