# Параллельное и многопоточное программирование в Python

### Процесс

**Процесс** - программа, которая запущена в оперативной памяти компьютера.

**Процесс** — экземпляр программы во время выполнения, независимый объект, которому выделены системные ресурсы (например, процессорное время и память). Каждый процесс выполняется в отдельном адресном пространстве: один процесс не может получить доступ к переменным и структурам данных другого.

Характеристики:
- PID
- Оперативная память
- Стек
- Список открытых файлов
- Ввод и вывод

In [1]:
import os
import time

#Получить pid процесса
pid = os.getpid()

for i in range(8):
    print(pid, time.time())
    time.sleep(1)

29632 1542122053.695168
29632 1542122054.696621
29632 1542122055.697806
29632 1542122056.69992
29632 1542122057.7013237


In [3]:
# Создание процесса
from multiprocessing import Process
from functions import say_hello

#Как выглядит функция в самом файле:
# def say_hello(name):
#     print(os.getpid())
#     print("hello", name)
 
p = Process(target = say_hello, args = ('Vladimir',)) 

p.start()
# Завершает открытый процесс, необходимо контролировать ресуры машины 
p.join()


In [4]:
# Создание процесса - альтернативный способ
from functions import say_hello_class

#Как выглядит  в самом файле:

# class say_hello_class(Process):
#     def __init__(self, name):

# Явно вызываем родительский конструктор.
# Его главная задача это возможность использования в классе потомке, методов класса-родителя.

#         super().__init__()
#         self.name = name
        
#     def run(self):
#         print(os.getpid())
#         print("hello", self.name)
        
p =  say_hello_class('Vladimir')
p.start()
p.join()

### Поток

- все потоки выполняются в рамках одного процесса
- потоки разделяют память и ресурсы процесса, в котором они работают

Поток != Процесс:
- у процесса своё адресное пространство
- у потоков – разделяемое
- у процесса есть хотя бы один поток (Main)
- 1 процесс может содержать несколько
потоков

In [5]:
# Создание потока
from threading import Thread
 
t = Thread(target = say_hello, args = ('Vladimir',)) 

t.start()
t.join()

22124
hello Vladimir


In [6]:
# Создание потока - альтернативный способ
from functions import say_hello_class_T

#Как выглядит  в самом файле:
# class say_hello_class_T(Thread):
#     def __init__(self, name):
#         super().__init__()
#         self.name = name
        
#     def run(self):
#         print(os.getpid())
#         print("hello", self.name)
        
p =  say_hello_class_T('Vladimir')
p.start()
p.join()

22124
hello Vladimir


<img width = '700px' src="images/lesson_9/mini_magick20171116-124-1elj0oj.png">

- Конкуренция - совместная работа (процессор очень быстро переключается между задачами)
- Параллелизм - параллельная работа (задача решается на нескольких ядрах)

### Многопоточность

Сравним две функции.

In [7]:
%%time
import datetime
from time import sleep

def slow_print(s):
    print('Вызов {} в {}'.format(s, datetime.datetime.now()))
    sleep(1)
    print('Конец {} в {}'.format(s, datetime.datetime.now()))

for i in range(5):
    slow_print(i) 

Вызов 0 в 2018-11-13 11:20:49.195674
Конец 0 в 2018-11-13 11:20:50.196017
Вызов 1 в 2018-11-13 11:20:50.196017
Конец 1 в 2018-11-13 11:20:51.196203
Вызов 2 в 2018-11-13 11:20:51.196203
Конец 2 в 2018-11-13 11:20:52.196546
Вызов 3 в 2018-11-13 11:20:52.196546
Конец 3 в 2018-11-13 11:20:53.196890
Вызов 4 в 2018-11-13 11:20:53.196890
Конец 4 в 2018-11-13 11:20:54.197034
Wall time: 5 s


In [8]:
from threading import Thread

for i in range(5):
    t = Thread(target=slow_print,args=(i,))
    t.start()
    
#ждет, пока тред закончит выполняться    
#здесь завершать не нужно, так как тред завершается сам
#     t.join()

Вызов 0 в 2018-11-13 11:20:54.211006
Вызов 1 в 2018-11-13 11:20:54.239924Вызов 2 в 2018-11-13 11:20:54.268848
Вызов 3 в 2018-11-13 11:20:54.291789

Вызов 4 в 2018-11-13 11:20:54.308758
Конец 0 в 2018-11-13 11:20:55.240413
Конец 2 в 2018-11-13 11:20:55.292092
Конец 1 в 2018-11-13 11:20:55.310108Конец 3 в 2018-11-13 11:20:55.310108

Конец 4 в 2018-11-13 11:20:55.332049


<img width = '700px' src="images/lesson_9/multi.png">

Вычислительные задачи можно разделить на две категории:

- I/O bound - требующие ввода/вывода
- CPU bound - нагружающие процессор

### GIL

Глобальная блокировка интерпретатора ( global interpreter lock , **GIL** ) Python. Если два или более потока попытаются манипулировать одним и тем же объектом в одно и то же время, то неизбежно возникнут проблемы. Глобальная блокировка интерпретатора исправляет это. В любой момент времени действия может выполнять только один поток. Python автоматически переключается между потоками, когда в этом возникает необходимость.

Сам интерпретатор CPython нe является потокобезопасным, поэтому в нем есть глобальная блокировка интерпретатора (Global Interpreter Lock - GIL), которая разрешает в каждый момент времени выполнятьбайт-код только одному потоку. Именно поэтому один процесс Python обычно не может задействовать несколько процессорных ядер одновременно.

GIL в Python реализован как обычный lock.

<img width = '700px' src="images/lesson_9/thread.png">

### Упражнение

Написать класс, который многопотчно скачивает документы из интернета и выполнить с помощью него многопоточную загрузку.

In [9]:
import urllib.request
# Список документов, которые необходимо скачать.

urls = ["https://cs8.pikabu.ru/post_img/2017/12/22/11/1513967276125723598.jpg",
        "https://cs7.pikabu.ru/post_img/2017/12/22/10/1513964378174980200.jpg",
        "https://cs8.pikabu.ru/post_img/2017/12/22/10/151396556419317615.jpg",
        "https://cs8.pikabu.ru/post_img/2017/12/22/11/1513967585163753346.jpg",
        "https://cs8.pikabu.ru/post_img/2017/12/22/11/1513965971173110667.jpg",
        "https://cs7.pikabu.ru/post_img/2017/12/22/11/1513966565120257829.jpg",
        "https://cs8.pikabu.ru/post_img/2017/12/22/11/1513966490163375420.jpg",
        "https://cs7.pikabu.ru/post_img/2017/12/22/11/1513966594199245649.jpg",
        "https://cs7.pikabu.ru/post_img/2017/12/22/11/1513966683175659107.jpg"]

1. Создать класс с названием MultiThreadLoader, он должен быть потомком класса Thread.

2. Констурктор класса в качестве атрибутов должен принимать название загружаемого документа(url), а также имя загружамеого документа(name).

Не забыть вызвать конструктор родительского класса, а также назначить имя и название загружаемого документа.

3. Переназначить функцию run родительского класса, в которой должна происходить загрузка.

In [10]:
# Рекомендация по созданию функции загрузки:

# открыть url:
handle = urllib.request.urlopen( )

# из url получить название файла:
fname = os.path.basename( )
        
# осуществить чтение файла из интернета чанками:        
with open(fname, "wb") as f_handler:
    while True:
        chunk = handle.read(1024)
        if not chunk:
            break
        f_handler.write(chunk)
        
#добавить сообщение о том, что конкретный файл загружен        

TypeError: urlopen() missing 1 required positional argument: 'url'

4. Создать цикл, в котором запускаются потоки с помщью метода start, тем самым осуществляется их параллельный запуск. Каждому потоку назначается загрузка отдельного файла картинки.

In [11]:
class MultiThreadLoader(Thread):
    
    def __init__(self, url, name):
        super().__init__()
        self.name = name
        self.url = url
        
    def run(self):
        handle = urllib.request.urlopen(self.url)
        fname = os.path.basename(self.url)
        
        with open(fname, "wb") as f_handler:
            while True:
                chunk = handle.read(1024)
                if not chunk:
                    break
                f_handler.write(chunk)
                
        msg = "%s has finished downloading %s" % (self.name,
                                                   self.url)
        print(msg)          

In [12]:
for item, url in enumerate(urls):
    name = "Thread %s" % (item+1)
    thread = MultiThreadLoader(url, name)
    thread.start()    

Thread 8 has finished downloading https://cs7.pikabu.ru/post_img/2017/12/22/11/1513966594199245649.jpg
Thread 6 has finished downloading https://cs7.pikabu.ru/post_img/2017/12/22/11/1513966565120257829.jpg
Thread 2 has finished downloading https://cs7.pikabu.ru/post_img/2017/12/22/10/1513964378174980200.jpg
Thread 3 has finished downloading https://cs8.pikabu.ru/post_img/2017/12/22/10/151396556419317615.jpgThread 9 has finished downloading https://cs7.pikabu.ru/post_img/2017/12/22/11/1513966683175659107.jpg

Thread 5 has finished downloading https://cs8.pikabu.ru/post_img/2017/12/22/11/1513965971173110667.jpg
Thread 1 has finished downloading https://cs8.pikabu.ru/post_img/2017/12/22/11/1513967276125723598.jpgThread 7 has finished downloading https://cs8.pikabu.ru/post_img/2017/12/22/11/1513966490163375420.jpg

Thread 4 has finished downloading https://cs8.pikabu.ru/post_img/2017/12/22/11/1513967585163753346.jpg


### Синхронизация доступа

При использовании более чем одного треда может возникнуть проблема, когда треды обращаются к одному ресурсу. 

In [13]:
import threading 

def first_operation():
    global x
    while x < 3000000:
        x += 1
    print (x)    
    
def second_operation():
    global x
    while x < 6000000:
        x += 1
    print (x)        

global x 
x = 0 

thread_1 =   threading.Thread(target = first_operation)  
thread_1.start()

thread_2 =   threading.Thread(target = second_operation)  
thread_2.start()

3058716
6000000


Как такое могло произойти? - Оба треда имели доступ к глобальной переменной x.

### Таймер

Начинает выполнение функции через определенный промежуток времени.

In [14]:
def first_operation():
    global x
    while x < 3000000:
        x += 1
    print (x)    
    
def second_operation():
    global x
    while x < 6000000:
        x += 1
    print (x)        

global x 
x = 0 

thread_1 =   threading.Thread(target = first_operation)  
thread_1.start()

thread_2 =   threading.Timer(1,function = second_operation)  
thread_2.start()

3000000
6000000


### Замки

In [15]:
def first_operation():
    global x,lock
    lock.acquire()
    try:
        while x < 3000000:
            x += 1
        print (x)   
    finally:
        lock.release()
    
def second_operation():
    global x,lock
    lock.acquire()
    try:
        while x < 6000000:
            x += 1
        print (x)   
    finally:
        lock.release()      

global x, lock 
x = 0 

lock = threading.Lock()

thread_1 =   threading.Thread(target = first_operation)  
thread_1.start()

thread_2 =   threading.Thread(target = second_operation)  
thread_2.start()

3000000
6000000


### Дедлоки

In [16]:
a = threading.Lock()
b = threading.Lock()

#неправильный порядок освобождения лока может привести к дедлоку
def foo():
    try:
        a.acquire()
        b.acquire()
    finally:
        a.release()
        b.release()

### Семафоры

Семафоры - более сложный и совершенный механизм блокировок. Внутри семафора - счетчик, в отличии от объекта блокировки, в которой просто флажок. Семафор блокирует поток только когда более заданного числа потоков пытаются захватить семафор.

In [17]:
def first_operation():
    global x,semaphore
    semaphore.acquire()
    try:
        while x < 3000000:
            x += 1
        print (x)   
    finally:
        semaphore.release()
    
def second_operation():
    global x,semaphore
    semaphore.acquire()
    try:
        while x < 6000000:
            x += 1
        print (x)   
    finally:
        semaphore.release()      

global x, semaphore 
x = 0 

semaphore = threading.BoundedSemaphore(10) 
# semaphore = threading.BoundedSemaphore(1) 

thread_1 =   threading.Thread(target = first_operation)  
thread_1.start()

thread_2 =   threading.Thread(target = second_operation)  
thread_2.start()

3010008
6000000


Прочие методы - RLock, Events, Conditions, Queue. Прочитать - http://www.quizful.net/post/thread-synchronization-in-python.

### multiprocessing

In [18]:
from functions import count

# def count(n):
#     while n > 0:
#         n -= 1

In [19]:
# cpu bound задача
t0 = time.time()
count(100000000)
count(100000000)
print(time.time() - t0)

# parallel run
t0 = time.time()
th1 = Thread(target=count, args=(100000000,))
th2 = Thread(target=count, args=(100000000,))

th1.start(); th2.start()
th1.join(); th2.join()
print(time.time() - t0)

9.264362096786499
12.15283489227295


In [20]:
# process parallel run
#разобьем задачу на процессы
t0 = time.time()
th1 = Process(target=count, args=(100000000,))
th2 = Process(target=count, args=(100000000,))

th1.start(); th2.start()
th1.join(); th2.join()
print(time.time() - t0)

5.312890291213989


### Замки

Аналогично тредам. Процесс ждет открытия замка.

In [21]:
from functions import printer
from multiprocessing import Lock

# def printer(item, lock):
#     proc = os.getpid()
#     print('Вызов {} в {} процесс {}'.format(item, datetime.datetime.now(),proc))
#     lock.acquire()
#     try:
#         print(item)
#         sleep(1)
#     finally:
#         lock.release()
#     print('Конец {} в {} процесс {}'.format(item, datetime.datetime.now(), proc))

In [22]:
lock = Lock()
items = ['tango', 'foxtrot', 10]
for item in items:
    p = Process(target=printer, args=(item, lock))
    p.start()

### Класс Pool

С помощью данного класса можем назначать процессы на отдельные ядра.

In [23]:
from multiprocessing import Pool
from functions import slow_print

numbers = [5, 10, 20]
pool = Pool(processes=3)

pool.map(slow_print, numbers)

[None, None, None]

### Модуль concurrent.futures

Абстракция над модулями мультипроцессинга и мультипоточности: можно работать с обоими модулями, но при этом не обладает большой гибкостью в управлении.

In [24]:
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
import os

In [25]:
def run(url):
    handle = urllib.request.urlopen(url)    
    fname = os.path.basename(url)
    with open(fname, "wb") as f_handler:
        while True:
            chunk = handle.read(1024)
            if not chunk:
                break
            f_handler.write(chunk)
    msg = "%s has finished downloading %s" % (fname,url)
    print(msg)

In [26]:
# from functions import run

   
def pool_mapper(urls):
    with ThreadPoolExecutor(max_workers = 9) as executor:
            return executor.map(run, urls)  
        
pool_mapper(urls)        

1513967276125723598.jpg has finished downloading https://cs8.pikabu.ru/post_img/2017/12/22/11/1513967276125723598.jpg
1513965971173110667.jpg has finished downloading https://cs8.pikabu.ru/post_img/2017/12/22/11/1513965971173110667.jpg
1513966490163375420.jpg has finished downloading https://cs8.pikabu.ru/post_img/2017/12/22/11/1513966490163375420.jpg
1513967585163753346.jpg has finished downloading https://cs8.pikabu.ru/post_img/2017/12/22/11/1513967585163753346.jpg
151396556419317615.jpg has finished downloading https://cs8.pikabu.ru/post_img/2017/12/22/10/151396556419317615.jpg
1513966683175659107.jpg has finished downloading https://cs7.pikabu.ru/post_img/2017/12/22/11/1513966683175659107.jpg
1513966594199245649.jpg has finished downloading https://cs7.pikabu.ru/post_img/2017/12/22/11/1513966594199245649.jpg1513964378174980200.jpg has finished downloading https://cs7.pikabu.ru/post_img/2017/12/22/10/1513964378174980200.jpg1513966565120257829.jpg has finished downloading https://cs7

<generator object Executor.map.<locals>.result_iterator at 0x000002116A1A3888>

### Упражнение

Оценить качество алгоритмов с помощью коэффициента детерминации путем распараллеливания процессов. 

In [27]:
from sklearn.ensemble import RandomForestRegressor
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.metrics import r2_score
from multiprocessing import Pool

1. Загрузить датасет из файла train.csv. Изучить датасет.

In [29]:
import pandas as pd
data_ = pd.read_csv('files/lesson_9/train.csv')
data_.head()

Unnamed: 0,ID,target,48df886f9,0deb4b6a8,34b15f335,a8cb14b00,2f0771a37,30347e683,d08d1fbe3,6ee66e115,...,3ecc09859,9281abeea,8675bec0b,3a13ed79a,f677d4d13,71b203550,137efaa80,fb36b89d9,7e293fbaf,9fc776466
0,000d6aaf2,38000000.0,0.0,0,0.0,0,0,0,0,0,...,0.0,0.0,0.0,0,0,0,0,0,0,0
1,000fbd867,600000.0,0.0,0,0.0,0,0,0,0,0,...,0.0,0.0,0.0,0,0,0,0,0,0,0
2,0027d6b71,10000000.0,0.0,0,0.0,0,0,0,0,0,...,0.0,0.0,0.0,0,0,0,0,0,0,0
3,0028cbf45,2000000.0,0.0,0,0.0,0,0,0,0,0,...,0.0,0.0,0.0,0,0,0,0,0,0,0
4,002a68644,14400000.0,0.0,0,0.0,0,0,0,0,0,...,0.0,0.0,0.0,0,0,0,0,0,0,0


2. Написать функцию calculate_quality, импортируемую из внешнего файла. В функции должны присутсовать следующие этапы:
    - загрузка датасета из файла;
    - разбивка датасета на обучающую и тестовую выборку с помощью фукции train_test_split, обратить внимание, какие столбцы должны использоваться для обучения, а какие для теста, размер тестовой выборки - 33%, random_state = 42; 
    - в качестве атрибута функция должна получать классификатор, который мы будем передавать извне
    - функция должна print`ом выводить коэффициент детерминации, рассчитанный на тестовой выборке.

3. Оценить качество моделей, полученных с помощью алгоритмов RandomForestRegressor и GradientBoostingRegressor, рассчитав качество на 2 ядрах параллельно.