# Процессы

In [7]:
import random
import os
import time
import threading
import multiprocessing
import os

In [8]:
# Типы задач
def cpu_bound_task(n):
    def fib(n):
        if n == 0:
            return 0
        elif n == 1:
            return 1
        return fib(n-1) + fib(n-2)

    print(f'{n} -> {fib(n)}')


def io_bound_task(ind):
    print(f'start task {ind}')
    time.sleep(3)
    print(f'finish task {ind}\n', end='')

In [5]:
%%time

for number in [36, 35, 34, 33, 32, 31, 30, 25, 20, 15, 10]:
    cpu_bound_task(number)

36 -> 14930352
35 -> 9227465
34 -> 5702887
33 -> 3524578
32 -> 2178309
31 -> 1346269
30 -> 832040
25 -> 75025
20 -> 6765
15 -> 610
10 -> 55
CPU times: user 22.7 s, sys: 90.3 ms, total: 22.8 s
Wall time: 22.9 s


In [9]:
%%time

# CPU bound задача на потоках
tasks = []

for number in [36, 35, 34, 33, 32, 31, 30, 25, 20, 15, 10]:
    task = threading.Thread(target=cpu_bound_task, args=(number,))
    tasks.append(task)
    task.start()

for task in tasks:
    task.join()

20 -> 6765
15 -> 61010 -> 55

25 -> 75025
30 -> 832040
31 -> 1346269
32 -> 2178309
33 -> 3524578
34 -> 5702887
35 -> 9227465
36 -> 14930352
CPU times: user 23 s, sys: 433 ms, total: 23.4 s
Wall time: 23.3 s


In [None]:
%%time

# CPU bound задача на процессах
tasks = []

for number in [36, 35, 34, 33, 32, 31, 30, 25, 20, 15, 10]:
    task = multiprocessing.Process(target=cpu_bound_task, args=(number,))
    tasks.append(task)
    task.start()

for task in tasks:
    task.join()


In [10]:
%%time

for ind in range(5):
    io_bound_task(ind)

start task 0
finish task 0
start task 1
finish task 1
start task 2
finish task 2
start task 3
finish task 3
start task 4
finish task 4
CPU times: user 10.3 ms, sys: 5.35 ms, total: 15.6 ms
Wall time: 15 s


In [11]:
%%time

# IO bound задача на потоках
tasks = []

for ind in range(5):
    task = threading.Thread(target=io_bound_task, args=(ind,))
    tasks.append(task)
    task.start()

for task in tasks:
    task.join()

start task 0
start task 1
start task 2start task 3

start task 4
finish task 0
finish task 1
finish task 2
finish task 3
finish task 4
CPU times: user 5.59 ms, sys: 4.2 ms, total: 9.79 ms
Wall time: 3.01 s


In [12]:
%%time

# IO bound задача на процессах
tasks = []

for ind in range(5):
    task = multiprocessing.Process(target=io_bound_task, args=(ind,))
    tasks.append(task)
    task.start()

for task in tasks:
    task.join()

start task 0
start task 1
start task 2
start task 3
start task 4
finish task 0
finish task 1
finish task 2
finish task 3
finish task 4
CPU times: user 19.6 ms, sys: 27.1 ms, total: 46.7 ms
Wall time: 3.06 s


In [13]:
# Получение идентификатора процесса
def io_bound_task(ind):
    print(f'start task {ind} with {os.getpid()}')
    time.sleep(3)
    print(f'finish task {ind}')
    
tasks = []

for ind in range(10):
    task = multiprocessing.Process(target=io_bound_task, args=(ind,))
    tasks.append(task)
    task.start()

for task in tasks:
    task.join()

start task 0 with 17279
start task 1 with 17280
start task 2 with 17281
start task 3 with 17282
start task 4 with 17283
start task 5 with 17284
start task 6 with 17285
start task 7 with 17286
start task 8 with 17287
start task 9 with 17288
finish task 0
finish task 1
finish task 2
finish task 3
finish task 4
finish task 5
finish task 6
finish task 7
finish task 8
finish task 9


In [14]:
# Пулл процессов
from multiprocessing import Pool


def getpid(n):
    time.sleep(2)
    return os.getpid()

with Pool(3) as p:
    print(p.map(getpid, range(5)))
    # 

[17295, 17296, 17297, 17296, 17295]


In [15]:
# Шаринг ресурсов
share_memory = {
    'count': 0,
}
def share_memory_task():
    print(f'read {share_memory["count"]}')
    share_memory['count'] += 1
    print(f'write {share_memory["count"]}')

tasks = []

for _ in range(5):
    task = multiprocessing.Process(target=share_memory_task)
    tasks.append(task)
    task.start()
    
for task in tasks:
    task.join()

read 0
read 0
write 1
read 0
write 1
read 0
write 1
read 0
write 1
write 1


In [17]:
# Шаринг ресурсов через файл
filename = 'share_memory.tmp'

with open(filename, 'w') as fd:
    fd.write('0')

def share_memory_task():
    with open(filename) as fd:
        count = int(fd.read())
    print(f'read {count}')

    with open(filename, 'w') as fd:
        fd.write(str(count + 1))
    print(f'write {count + 1}')


tasks = []

for _ in range(10):
    task = multiprocessing.Process(target=share_memory_task)
    tasks.append(task)
    task.start()
    
for task in tasks:
    task.join()

with open(filename) as fd:
    print(f'COUNT: {fd.read()}')

read 0
read 0
write 1
read 1
write 1
read 1
write 2
read 1
write 2
read 2
write 2
read 2
write 3
read 3
write 3
read 3
write 4
write 4
read 4
write 5
COUNT: 5


In [19]:
from multiprocessing import Manager, Queue


PROCESSES_COUNT = 5
to_process_list = [
    [1, 2, 3], [4, 5, 6], [7, 8, 9],
    [10, 11, 12], [13, 14, 15],
    [16, 17, 18], [19, 20, 21],
    [22, 23, 24], [25, 26, 27]
]


def func(x):
    time.sleep(3)
    return 1 << x


def producer_func(to_process, queue):
    for elem in to_process:
        res = func(elem)
        queue.put(res)
        print(f"Hey, I've just put {res} to queue!")


def consumer_func(queue):
    while True:
        # .get() - блокирующий(поток остановится и будет ждать пока не сможет взять из queue)
        res = queue.get()
        if res == 'kill':
            break
        print(f"Wow, I found {res} in queue!")


def do_work(to_process_list):
    manager = Manager()
    queue = manager.Queue()
    pool = Pool(PROCESSES_COUNT)
    # создает процесс, который будет выполнять consumer_func(queue)
    pool.apply(consumer_func, (queue, ))

    jobs = []
    for to_process in to_process_list:
        job = pool.apply(producer_func, (to_process, queue))
        jobs.append(job)

    for job in jobs:
        job.get()

    queue.put('kill')
    pool.close()
    pool.join()
    

do_work(to_process_list)

Process ForkPoolWorker-54:
Process ForkPoolWorker-51:
Process ForkPoolWorker-53:
Process ForkPoolWorker-55:
Process ForkPoolWorker-52:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
  File "/Users/mac/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/Users/mac/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/Users/mac/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/Users/mac/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/Users/mac/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/Users/mac/anaconda3/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
 

KeyboardInterrupt: 

  File "/Users/mac/anaconda3/lib/python3.7/multiprocessing/connection.py", line 407, in _recv_bytes
    buf = self._recv(4)
KeyboardInterrupt
KeyboardInterrupt
  File "/Users/mac/anaconda3/lib/python3.7/multiprocessing/managers.py", line 796, in _callmethod
    kind, result = conn.recv()
KeyboardInterrupt
  File "/Users/mac/anaconda3/lib/python3.7/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
  File "/Users/mac/anaconda3/lib/python3.7/multiprocessing/connection.py", line 250, in recv
    buf = self._recv_bytes()
KeyboardInterrupt
  File "/Users/mac/anaconda3/lib/python3.7/multiprocessing/connection.py", line 407, in _recv_bytes
    buf = self._recv(4)
  File "/Users/mac/anaconda3/lib/python3.7/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
KeyboardInterrupt
