# Thread vs process


Чтобы выполнять программу параллельно есть два варианта: воспользоваться потоками или процессами: 

**Процесс** — экземпляр программы во время выполнения, независимый объект, которому выделены системные ресурсы (например, процессорное время и память). Каждый процесс выполняется в отдельном адресном пространстве: один процесс не может получить доступ к переменным и структурам данных другого. Если процесс хочет получить доступ к чужим ресурсам, необходимо использовать межпроцессное взаимодействие. Это могут быть конвейеры, файлы, каналы связи между компьютерами и многое другое.

**Поток** - использует то же самое пространства стека, что и процесс, а множество потоков совместно используют данные своих состояний. Как правило, каждый поток может работать (читать и писать) с одной и той же областью памяти, в отличие от процессов, которые не могут просто так получить доступ к памяти другого процесса. У каждого потока есть собственные регистры и собственный стек, но другие потоки могут их использовать.


**TLDR**: с точки зрения ОС поток ~= процесс, с точки зрения пользователя когда поток меняет данные - они сразу же видны другим потокам(не учитывая кеши), процессы изменения не видят и надо думать, как им общаться.  

In [23]:
from threading import Thread

In [None]:
?Thread

In [32]:
from threading import Thread
from time import sleep
import os 


def threaded_function(arg):
    print(os.getpid())
        
    while True:
        sleep(1)


if __name__ == "__main__":
    thread = Thread(target = threaded_function, args = (3,))
    thread.daemon =True
    thread.start()
    sleep(1)
    print('hi')
    #thread.join()
    print("thread finished...exiting")

running
hi
thread finished...exiting
loopinglooping

looping
running
looping
loopinglooping

running
looping
loopinglooping

looping
loopinglooping

looping
looping
looping
looping
looping
looping


## Пулл потоков
Достаточно часто хочется вопспользоваться несколькими потоками, которые будут хватать задачи и максивально быстро их выполнять, умеет следующее:

**def init**(self, processes=None, initializer=None, initargs=(),
                 maxtasksperchild=None): 
```                 
   Собственно создание пула, maxtasksperchild - позволяет пересоздать воркера, после выполнения нескольких задачю. Нужно это, если у вас где-то может потечь память :)
```                 
**def apply_async**(self, func, args=(), kwds={}, callback=None):
``` 
 Просим выполнить некоторую функцию, возвращает Future(на самом деле AsyncResult), у которого можно спросить ready или позвать wait(timeout) или get() и заблокироваться до получения результата.
``` 
**def apply**(self, func, args=(), kwds={}):
```
 Внутри зовёт apply_async + get
```

**def map**(self, func, iterable, chunksize=None):
```
 Параллельный вариант мапа, блокируется до полного результата 
```
**def imap**(self, func, iterable, chunksize=1):
```
 Вернёт генератор из которого можно будет вычитывать результаты
```
**def map_async**(self, func, iterable, chunksize=None, callback=None):
```
 Тут вернётся список AsyncResult с которыми придётся разбираться самостоятельно ;)
```
**def imap_unordered**(self, func, iterable, chunksize=1):
```
 Функции выше сохраняют порядок результатов, эта нет :)
```
**def close**(self):
```
 Закрывает пулл: он перестаёт принимать задачи, как только воркеры закончат с текущими задачами, они стопаются.
```

**def terminate**(self):
```
 Терминейтим всех рабочих, в пуле ещё могут быть задачи.
```
**def join**(self):
```
 Дождаться пока все воркеры стопнуться
```

In [1]:
from multiprocessing.pool import ThreadPool
def job(arg):
    for i in range(arg): 
        print("{}:{}".format(arg,i))
    return arg

pool = ThreadPool(10)
for res in pool.imap_unordered(job,range(2,5)): 
    print("main:{}".format(res))

2:04:03:0


2:13:1

3:24:1
main:24:2


4:3
main:3
main:4


Выше видна проблама с выводом из нескольких потоков на консоль, чтобы предотвратить это безобразие, надо использовать примитивы синхронизации.

In [2]:
class BankAccount(object): 
    def __init__(self,starting_sum): 
        self.money = starting_sum
        
    def put(self,money):
        self.money +=money
    
    def get(self,money):
        if self.money >= money: 
            self.money -= money
            print(self.money)
            return True
        else: 
            return False

In [None]:
10 6
t1        if self.money >= money: 
t2        if self.money >= money: 
t1            self.money -= money 4
t2            self.money -= money -2


Теоретически мы можем уйти тут в минус

In [3]:
pool = ThreadPool(10)
account = BankAccount(10)

def activity(args):
    account.put(5)
    account.get(5)
    account.get(5)
    account.get(5)
    account.put(5)
    account.put(5)
    account.put(5)

for _ in pool.imap(activity,range(5)): 
    pass

10101010


1005



0510
10




Давайте посмотрим пример, как работает пулл и imap_unordered:

In [16]:
import random 
from time import sleep
pool = ThreadPool(2)

def work(arg): 
    sleep(random.random())
    return arg

for res in pool.imap_unordered(work,range(20)): 
    print(res)

0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19


**Lock** - примитив, который позволяет реализовать **критическую секцию** область кода, которую может выполнять только один поток.

In [9]:
l =Lock()
with l: 
    pass

1


In [7]:
?l.acquire

в части кода между acquire и release может находиться только один поток

In [None]:
l.acquire() 
pass 
l.release()

Вот так может выглядеть безопасная реализация БанковскогоАккаунта.

In [17]:
class BankAccount(object): 
    def __init__(self,starting_sum): 
        self.money = starting_sum
        self.lock = Lock()
        
    def put(self,money):
        with self.lock:
            self.money +=money
    
    def get(self,money):
        with self.lock:
            if self.money >= money: 
                self.money -= money
                print(self.money)
                return True
            else: 
                return False

## Мультипроцессинг
приблизительно такие же интерфейсы как и для потоков, есть и для процессов. 
Давайте посмотрим, как можно сделать свой микро-пулл процессов

In [18]:
from multiprocessing import Process

In [22]:
import time, logging,os
from multiprocessing import Process
from multiprocessing import Pool
from multiprocessing import Queue

from queue import Empty

class EndMarker:
    pass 


class Worker(Process):
    def __init__(self, q_in, q_out):
        super(Worker, self).__init__()
        self.q_in = q_in
        self.q_out = q_out

    def run(self):
        while True:
            try:
                task = self.q_in.get(0.1)
                if isinstance(task,EndMarker):
                    break
                else:
                    logging.warning('%s:%s', os.getpid(), task)
                    self.q_out.put(task)
            except Empty:
                time.sleep(1)


In [23]:
q_in = Queue()
q_out = Queue()
processes = [Worker(q_in,q_out) for _ in range(3)]
for p in processes: 
    p.start()

for i in range(10): 
    q_in.put(i)


for _ in processes: 
    q_in.put(EndMarker())

for p in processes: 
    p.join()



Выше мы использовали потоковую очередь, позволяющую пересылать данные между процессами.

## Синхронизация процессов

Чтобы организовать синхронизацию между потоками/процессами кроме локов можно воспользоваться ешё несколькими примитивами:

**BoundedSemaphore**: фактически атомарный счётчик, который позволяет находиться в критической секции только N потокам/процессам. В реальной жизни ипользуется для реализации ограничения доступа к лиммитированному ресурсу. [Читать больше](https://docs.python.org/3.6/library/threading.html#semaphore-objects)

**Event**: Позволяет в одном месте подождать некоторый флаг, а в другом месте его установить [Читать больше](https://docs.python.org/3.6/library/threading.html#event-objects)

**Condition**: Условие, позволяет реализовать ожидание некоторого повторяющегося события, через notify/notify_all  [Читать больше](https://docs.python.org/3.6/library/threading.html#condition-objects)


In [24]:
import threading

semaphore = threading.BoundedSemaphore()
event = threading.Event()
condition = threading.Condition()

In [26]:
?threading.BoundedSemaphore

In [27]:
?threading.Event

In [28]:
?threading.Condition

# InterProcessCommunication:
IPC - друге варианты переслать данные более сложные, чем флаги между процессами, основные:
1. Signal - механизм, которым обычно с вашим процессом общается ОС
2. File - фактически несколько ваших процессов могут договориться писать/читать из некого файлика
3. Socket - можете просто общаться через сокет
4. Pipe - каналл, в который один процесс может что-то писать, а другой что-то читать.
5. Shared memory - выделить некоторую область памяти, 

пример общения через сигналы

In [29]:
import signal
import os
import time

def receive_signal(signum, stack):
    print('Received:', signum)

signal.signal(signal.SIGUSR1, receive_signal)
signal.signal(signal.SIGUSR2, receive_signal)
signal.signal(signal.SIGALRM, receive_signal) 

print('My PID is:', os.getpid())

signal.alarm(10)                              

for i in range(30):
    print('Waiting...')
    time.sleep(1)

My PID is: 22191
Waiting...
Waiting...
Waiting...
Waiting...
Waiting...
Waiting...
Waiting...
Waiting...
Waiting...
Waiting...
Received: 14
Waiting...
Waiting...
Waiting...
Waiting...
Waiting...
Waiting...
Waiting...
Waiting...
Waiting...
Waiting...
Waiting...
Waiting...
Waiting...
Waiting...
Waiting...
Waiting...
Waiting...
Waiting...
Waiting...
Waiting...


Пример использования пайп: запускать надо из консоли и вторым питоном(

In [None]:
import cPickle

import os

#communicate with another process through named pipe
#one for receive command, the other for send command

rfPath = "./p1"

wfPath = "./p2"

try:
    os.mkfifo(wfPath)
    os.mkfifo(rfPath)
except OSError:

    pass

rp = open(rfPath, 'r')
response = rp.read()
print("P2 hear %s" % response)
rp.close()

wp = open(wfPath, 'w')
wp.write("P2: I'm fine, thank you! And you?")		
wp.close()


rp = open(rfPath, 'r')
response = rp.read()
print("P2 hear %s" % response)
rp.close()

In [3]:
#!/usr/bin/python

import cPickle
import os

wfPath = "./p1"
rfPath = "./p2"
wp = open(wfPath, 'w')
wp.write("P1: How are you?")		
wp.close()


rp = open(rfPath, 'r')
response = rp.read()
print("P1 hear %s" % response)
rp.close()

wp = open(wfPath, 'w')
wp.write("P1: I'm fine too.")		
wp.close()

ModuleNotFoundError: No module named 'cPickle'