# Введение в методы анализа данных. Язык Python.

## Лекция 9. Параллелизм в Python. Оптимизация программ
<br><br><br><br>
__Аксентьев Артем (akseart@ya.ru)__

__Ксемидов Борис (nstalker.anonim@yandex.ru)__
<br>

# Зачем необходимо?

- Большие расчеты
- Работа с вводом-выводом
- Разделение задач между потоками

Параллелизм:
- Конкурентности
- Параллелизма

# Про многоядерность

Многоядерный процессор — центральный процессор, содержащий два и более вычислительных ядра на одном процессорном кристалле или в одном корпусе.



# Процесс и Поток

Поток(thread)-это наименьшая единица выполнения, которая может быть выполнена на компьютере. Потоки существуют как части процесса и обычно не являются независимыми друг от друга, что означает, что они совместно используют данные и память с другими потоками в рамках одного и того же процесса.

Процесс(process)-это задание или экземпляр компьютерной программы, который может быть выполнен.

<img src="2560px-Multithreaded_process.svg.png" width="512"/>

# Многопоточность

In [18]:
from threading import Thread
from time import sleep

def func():
    for i in range(10):
        print(f"Thread 1: {i}")
        sleep(1)

th = Thread(target=func)
th.start()

for i in range(10):
    print(f"Main-thread: {i}")
    sleep(1)


Thread 1: 0
Main-thread: 0
Thread 1: 1
Main-thread: 1
Main-thread: 2
Thread 1: 2
Main-thread: 3
Thread 1: 3
Main-thread: 4
Thread 1: 4
Main-thread: 5
Thread 1: 5
Main-thread: 6
Thread 1: 6
Main-thread: 7
Thread 1: 7
Main-thread: 8
Thread 1: 8
Main-thread: 9
Thread 1: 9


In [19]:
def foo1(a=10):
    for i in range(10):
        a = a + a ** i
    return a

def foo2(a=10):
    for i in range(10):
        a = a + a / (i + 1)
    return a

In [23]:
%%timeit
foo1()
foo2()


122 ms ± 225 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [None]:
from threading import Thread

In [22]:
%%timeit

th1 = Thread(target=foo1, name="**")
th2 = Thread(target=foo2, name="/")

th1.start()
th2.start()

th1.join()
th2.join()

121 ms ± 440 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)


## Пара слов о GIL

GIL -- Global Interpreter Lock. Позволяет выполнять ровно один поток в один момент времени. Это необходимо из-за сложного устройства Python для того, чтобы потоки не "мешали" друг другу.

GIL -- некий ключ, который блокирует исполнение других потоков, в том случае, если один поток выполняется. Ключ становится открытым, когда происходит ожидание ввода вывода или же через какое-то определенное время.

![](GIL.png)

In [24]:
class CustomThread(Thread):
    def __init__(self, limit):
        Thread.__init__(self)
        self._limit = limit

    def run(self):
        for i in range(self._limit):
            print(f"from CustomThread: {i}")
            sleep(0.5)

cth = CustomThread(3)
cth.start()
cth.join()

from CustomThread: 0
from CustomThread: 1
from CustomThread: 2


In [31]:
class CustomThread(Thread):
    def __init__(self, limit):
        Thread.__init__(self)
        self._limit = limit

    def run(self):
        for i in range(self._limit):
            print(f"from CustomThread: {i}")
            sleep(1)

cth = CustomThread(2)
cth1 = CustomThread(5)
cth1.start()
cth.start()

cth.join()
cth1.join(100)
print(cth1.is_alive())

from CustomThread: 0
from CustomThread: 0
from CustomThread: 1
from CustomThread: 1
from CustomThread: 2
from CustomThread: 3
from CustomThread: 4
False


## Особенности при работе с несколькими потоками:
### race condition
Состояние гонки или race condition – это ошибка, возникающая при неправильном проектировании многопоточной программы. Она возникает тогда, когда несколько потоков обращаются к одним и тем же данным.


In [32]:
import random
from time import sleep
def foo1(a=10):
    global x
    sleep(random.random())
    if x == 5:
        print("x == 5")
    else:
        print("x not == 5")

def foo2(a=10):
    global x
    sleep(random.random())
    x = 1

In [36]:
from threading import Thread
x = 5
th1 = Thread(target=foo1)
th2 = Thread(target=foo2)

th1.start()
th2.start()

th1.join()
th2.join()

x not == 5


### Решение

In [42]:
import random
from time import sleep
import threading
x = 5
x_locking = threading.Lock()
def foo1(a=10):
    global x
    x_locking.acquire()
    sleep(random.random())
    if x == 5:
        print("x == 5")
    else:
        print("x not == 5")
    x_locking.release()

def foo2(a=10):
    global x
    x_locking.acquire()
    sleep(random.random())
    x = 1
    x_locking.release()

th1 = Thread(target=foo1)
th2 = Thread(target=foo2)

th1.start()
sleep(0.2)
th2.start()

th1.join()
th2.join()

x == 5


In [None]:
import random
from time import sleep
import threading
x = 5
x_locking = threading.Lock()
def foo1(a=10):
    global x
    with x_locking:
        sleep(random.random())
        if x == 5:
            print("x == 5")
        else:
            print("x not == 5")
    ...

def foo2(a=10):
    global x
    with x_locking:
        sleep(random.random())
        x = 1

th1 = Thread(target=foo1)
th2 = Thread(target=foo2)

th1.start()
th2.start()

th1.join()
th2.join()

## deadlock
deadlock -- Ожидание блокировки, которая не может быть снята

In [45]:
import random
from time import sleep
import threading
x = 5
x_locking = threading.Lock()
def foo1(a=10):
    global x
    x_locking.acquire()
    print("foo1 -- блокировка установлена")
    sleep(random.random())
    if x == 5:
        print("x == 5")
    else:
        print("x not == 5")
    # return None
    x_locking.release()
    print("foo1 -- блокировка снята")
    print("Foo1 exit")

def foo2(a=10):
    global x
    x_locking.acquire()
    print("foo2 -- блокировка установлена")
    sleep(random.random())
    x = 1
    x_locking.release()
    print("foo2 -- блокировка снята")
    print("Foo2 exit")

th1 = Thread(target=foo1)
th2 = Thread(target=foo2)

th1.start()
th2.start()

th1.join()
th2.join()

foo1 -- блокировка установлена
x == 5
foo1 -- блокировка снятаfoo2 -- блокировка установлена
Foo1 exit

foo2 -- блокировка снята
Foo2 exit


### Решение:

In [47]:
import random
from time import sleep
import threading
x = 5
x_locking = threading.Lock()
def foo1(a=10):
    global x
    with x_locking:
        print("foo1 -- блокировка установлена")
        sleep(random.random())
        if x == 5:
            print("x == 5")
        else:
            print("x not == 5")
        raise None

    print("foo1 -- блокировка снята")
    print("Foo1 exit")

def foo2(a=10):
    global x
    x_locking.acquire()
    print("foo2 -- блокировка установлена")
    sleep(random.random())
    x = 1
    x_locking.release()
    print("foo2 -- блокировка снята")
    print("Foo2 exit")

th1 = Thread(target=foo1)
th2 = Thread(target=foo2)

th1.start()
th2.start()

th1.join()
th2.join()

foo1 -- блокировка установлена


Exception in thread Thread-69:
Traceback (most recent call last):
  File "/Users/artem/miniconda3/envs/Python_Lections_MSU/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/Users/artem/miniconda3/envs/Python_Lections_MSU/lib/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/var/folders/g0/yt0m6mx534b6cl53216hwy940000gn/T/ipykernel_2999/1110259563.py", line 15, in foo1
TypeError: exceptions must derive from BaseException


x == 5
foo2 -- блокировка установлена
foo2 -- блокировка снята
Foo2 exit


## deadlock архитектурный

In [48]:
import random
from time import sleep
import threading
x = 5
x_locking = threading.Lock()
def foo1(a=10):
    global x
    x_locking.acquire()
    print("foo1 -- блокировка установлена")
    sleep(random.random())
    print("foo1 -- блокировка установлена 2")
    x_locking.acquire()
    if x == 5:
        print("x == 5")
    else:
        print("x not == 5")
    x_locking.release()
    print("foo1 -- блокировка снята")
    print("Foo1 exit")

def foo2(a=10):
    global x
    x_locking.acquire()
    print("foo2 -- блокировка установлена")
    sleep(random.random())
    x = 1
    x_locking.release()
    print("foo2 -- блокировка снята")
    print("Foo2 exit")

th1 = Thread(target=foo1)
th2 = Thread(target=foo2)

th1.start()
th2.start()

th1.join()
th2.join()

foo1 -- блокировка установлена
foo1 -- блокировка установлена 2


KeyboardInterrupt: 

### Решение

Используем Rlock. Rlock -- блокирует поток, только если заблокирован был другим потоком

In [49]:
import random
from time import sleep
import threading
x = 5
x_locking = threading.RLock()
def foo1(a=10):
    global x
    x_locking.acquire()
    print("foo1 -- блокировка установлена")
    sleep(random.random())
    print("foo1 -- блокировка установлена 2")
    x_locking.acquire()
    if x == 5:
        print("x == 5")
    else:
        print("x not == 5")
    x_locking.release()
    x_locking.release()
    print("foo1 -- блокировка снята")
    print("Foo1 exit")

def foo2(a=10):
    global x
    x_locking.acquire()
    print("foo2 -- блокировка установлена")
    sleep(random.random())
    x = 1
    x_locking.release()
    print("foo2 -- блокировка снята")
    print("Foo2 exit")

th1 = Thread(target=foo1)
th2 = Thread(target=foo2)

th1.start()
th2.start()

th1.join()
th2.join()

foo1 -- блокировка установлена
foo1 -- блокировка установлена 2
x == 5
foo1 -- блокировка снята
Foo1 exit
foo2 -- блокировка установлена
foo2 -- блокировка снята
Foo2 exit


In [52]:
from time import sleep
from threading import Thread
num_of_stud = 0
lock = threading.Lock()

def foo(studs):
    while True:
        name = threading.currentThread().name
        with lock:
            global num_of_stud
            if num_of_stud >= len(studs):
                break
            current_stud = studs[num_of_stud]
            num_of_stud += 1

        print(f"{name} начал    проверять студента {current_stud[1]}")
        sleep(current_stud[0] / 30)
        print(f"{name} закончил проверять студента {current_stud[1]}")

students= [(99, "Андрей"),
           (76, "Александр"),
           (75, "Никита"),
           (72, "Евгений"),
           (66, "Алексей"),
           (62, "Сергей"),
           (50, "Михаил")]

teacher1 = Thread(target=foo, args=(students,), name="Проверяющий 1")
teacher2 = Thread(target=foo, args=(students,), name="Проверяющий 2")

teacher1.start()
teacher2.start()

teacher1.join()
teacher2.join()


Проверяющий 2 начал    проверять студента Андрей
Проверяющий 1 начал    проверять студента Александр
Проверяющий 1 закончил проверять студента Александр
Проверяющий 1 начал    проверять студента Никита
Проверяющий 2 закончил проверять студента Андрей
Проверяющий 2 начал    проверять студента Евгений
Проверяющий 1 закончил проверять студента Никита
Проверяющий 2 закончил проверять студента Евгений
Проверяющий 2 начал    проверять студента Алексей
Проверяющий 1 начал    проверять студента Сергей
Проверяющий 2 закончил проверять студента Алексей
Проверяющий 2 начал    проверять студента Михаил
Проверяющий 1 закончил проверять студента Сергей
Проверяющий 2 закончил проверять студента Михаил


# Многопроцессность

В отличии от многопоточности действительно параллельна.

Создаются новые экземпляры программы и выполняются какие-то действия

In [53]:
import multiprocessing

In [54]:
multiprocessing.cpu_count()

10

In [64]:
%%writefile worker.py
def f(x):
    a = 10
    for i in range(x):
        a += a ** i // 2
    return a


Overwriting worker.py


In [65]:
%%time
from multiprocessing import Pool
from worker import f

if __name__ == '__main__':
    with Pool(6) as p:
        print(p.map(f, list(range(5))))

[10, 10, 15, 127, 1024318]
CPU times: user 4.13 ms, sys: 19.4 ms, total: 23.6 ms
Wall time: 37.4 ms


In [66]:
from imp import reload
import worker
reload(worker)

<module 'worker' from '/Users/artem/Documents/Chillers_Lections/Python_MSU/course-python/lectures/Lection_9_multithread/worker.py'>

In [69]:
%%time

result = []
for i in range(5):
    result.append(worker.f(i))

CPU times: user 6 µs, sys: 0 ns, total: 6 µs
Wall time: 7.87 µs


In [None]:
result

In [1]:
%%writefile test1.py
import os

def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())

def f(name):
    info('function f')
    print('hello', name)


Overwriting test1.py


In [2]:
from multiprocessing import Process
import test1

test1.info('main line')
p = Process(target=test1.f, args=('bob',))
p.start()
p.join()

main line
module name: test1
parent process: 1830
process id: 56773
function f
module name: test1
parent process: 56773
process id: 57148
hello bob


## Средства межпроцессного взаимодействия

### Очереди

In [7]:
%%writefile queue_test.py
from time import sleep
def f(q):
    sleep(1)
    q.put([42, None, 'hello'])
    sleep(1)
    q.put([21, None, 'hell'])

Overwriting queue_test.py


In [9]:
from multiprocessing import Process, Queue
import queue_test

if __name__ == '__main__':
    q = Queue()
    p = Process(target=queue_test.f, args=(q,))
    p.start()
    print(q.get())    # распечатает "[42, None, 'hello']"
    print(q.get())
    p.join()

[42, None, 'hello']
[21, None, 'hell']


### Конвейеры

In [12]:
%%writefile pipe_test.py
from time import sleep
def f(conn):
    sleep(1)
    conn.send([42, None, 'hello'])
    sleep(1)
    print(conn.recv())
    conn.close()

Overwriting pipe_test.py


In [13]:
from multiprocessing import Process, Pipe
import pipe_test

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=pipe_test.f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())
    parent_conn.send([21, 'hell'])
    p.join()

[42, None, 'hello']
[21, 'hell']


## Средства синхронизации

Такие же как и в threading

In [17]:
%%writefile lock_test.py
import time
import random
def f(l, i):
    # l.acquire()
    # try:
        print('hello world', i)
        time.sleep(random.random())
    # finally:
    #     l.release()

Overwriting lock_test.py


In [18]:
from multiprocessing import Process, Lock
import lock_test
from imp import reload
reload(lock_test)

lock = Lock()

proc = []
for num in range(10):
    proc.append(Process(target=lock_test.f, args=(lock, num)))

for p in proc:
    p.start()

for p in proc:
    p.join()


hello world 1
hello world 8
hello world 3
hello world 0
hello world 5
hello world 9
hello world 2
hello world 7
hello world 6
hello world 4


## Пример

In [None]:
%%writefile teacher.py
from multiprocessing import current_process
from time import sleep
def foo(studs, result):
    while True:
        current_stud = studs.get()
        name = current_process().name
        result.put(f"{name} начал    проверять студента {current_stud[1]}")
        sleep(current_stud[0] / 20)
        result.put(f"{name} закончил проверять студента {current_stud[1]}")
        if studs.empty():
            break

In [20]:
from time import sleep
from multiprocessing import Process, Queue

import teacher

students= [(99, "Андрей"),
           (76, "Александр"),
           (75, "Никита"),
           (72, "Евгений"),
           (66, "Алексей"),
           (62, "Сергей"),
           (50, "Михаил")]

q_stud = Queue()
for i in students:
    q_stud.put(i)

result = Queue()
teacher1 = Process(target=teacher.foo, args=(q_stud, result), name="Проверяющий 1")
# sleep(10)
teacher2 = Process(target=teacher.foo, args=(q_stud, result), name="Проверяющий 2")

teacher1.start()
teacher2.start()

teacher1.join()
teacher2.join()

while not result.empty():
    print(result.get())

Проверяющий 1 начал    проверять студента Андрей
Проверяющий 2 начал    проверять студента Александр
Проверяющий 2 закончил проверять студента Александр
Проверяющий 2 начал    проверять студента Никита
Проверяющий 1 закончил проверять студента Андрей
Проверяющий 1 начал    проверять студента Евгений
Проверяющий 2 закончил проверять студента Никита
Проверяющий 2 начал    проверять студента Алексей
Проверяющий 1 закончил проверять студента Евгений
Проверяющий 1 начал    проверять студента Сергей
Проверяющий 2 закончил проверять студента Алексей
Проверяющий 2 начал    проверять студента Михаил
Проверяющий 1 закончил проверять студента Сергей
Проверяющий 2 закончил проверять студента Михаил


# Рекомендуемая литература:
- Современные операционные системы | Таненбаум Эндрю, Бос Х.


# Вопросы для самостоятельного изучения:

- Различные средства синхронизации потоков/процессов
- Средства межпроцессного взаимодействия
- Устройство потоков и процессов в Unix


# Вопросы к зачету

- Чем отличается поток и процесс в Python, когда лучше использовать тот или иной подход?
- Какие средства межпроцессного взаимодействия Вы занаете?
- Примитивы синхронизации
- Зачем нужна синхронизация?
- Что такое race condition и deadlock. Как бороться?