#Модуль multiprocessing

Модуль multiprocessing несет в себе возможность создавать процессы таким же образом, как и потоки из модуля threading. Таким образом, можно обойти GIL и получить настоящую параллельную работу. 

Рассмотрим на примере, как создать новый процесс:

In [6]:
import os
from multiprocessing import Process, current_process

def foo(number):
  p_name = current_process().name
  print("{0} : {1}".format(number, p_name))

if __name__ == '__main__':
  random_numbers = [5, 10, 15, 20, 25]
  p_list = []
  proc = Process(target=foo, args=(5,))
  for i, number in enumerate(random_numbers):
    proc = Process(target=foo, args=(number,))
    p_list.append(proc)
    proc.start()
  proc = Process(target=foo, name="test", args=(2,))
  proc.start()
  p_list.append(proc)
  for p in p_list:
    p.join()

5 : Process-17
10 : Process-18
15 : Process-19
20 : Process-20
25 : Process-21
2 : test


Тут мы создаем новый процесс, используя класс Process, стартуем его и в цикле for говорим главному процессу Python подождать, пока не завершатся все дочерние процессы, созданные ранее.

Класс Process в качестве аргументов принимает:

1. Target — функцию, которая будет выполняться при запуске процесса.
2. Name — имя процесса, доступного через функцию current_process().
3. args — аргументы для функции target().

Также, как и с потоками, процессы поддерживают Lock для блокировки доступа к ресурсам.

In [7]:
from multiprocessing import Process, Lock, current_process

def printf(item, lock):
  lock.acquire()
  try:
    print(item, current_process())
  finally:
    lock.release()

if __name__ == "__main__":
  lock = Lock()
  items = ['test1', 'test2', 'test3']
  for i in items:
    p = Process(target=printf, args=(i, lock))
    p.start()

test1 <Process(Process-23, started)>
test2 <Process(Process-24, started)>
test3 <Process(Process-25, started)>


По примеру можем видеть, что, благодаря Lock, процессы работают с функцией по очереди.

Аналогом ThreadPoolExecutor является класс Pool, который позволяет запустить несколько процессов одновременно.

In [9]:
from multiprocessing import Pool

def calc(number):
  return number ** 2

if __name__ == "__main__":
  numbers = [5, 10, 15, 20, 25]
  pool = Pool(processes=3)
  print(pool.map(calc, numbers))

[25, 100, 225, 400, 625]


Мы создаем экземпляр класса Pool, указываем в processes, что хотим создать 3 процесса, а затем с помощью pool.map() передаем функцию для исполнения и список numbers, где впоследствии каждый из элементов списка будет подан на вход функции doubler().

Для связи между процессами можно использовать уже знакомый класс Queue, который так же реализован в модуле multiprocessing

In [12]:
from multiprocessing import Process, Queue

stop = -1
def task_creator(data, q):
  for i in data:
    q.put(i)

def consumer(q):
  while True:
    data = q.get()
    print("data: ".format(data))
    processed = data * 2
    print(processed)
    if data is stop:
      break

if __name__ == "__main__":
  q = Queue()
  data = [10, 15, 20, 25, -1]
  p1 = Process(target=task_creator, args=(data, q))
  p2 = Process(target=consumer, args=(q,))
  p1.start()
  p2.start()
  q.close()
  q.join_thread()
  p1.join()
  p2.join()

data: 
20
data: 
30
data: 
40
data: 
50
data: 
-2


Здесь мы создаем два процесса и очередь. Один процесс при старте кладет данные в очередь, а другой считывает ее и выводит на экран. 

Очереди удобны, когда нужно связать между собой несколько процессов, например, одни кладут в очередь, другие обрабатывают.

Но в модуле multiprocessing так же есть класс Pipe, который позволяет связать между собой только два процесса.

In [14]:
import multiprocessing

def sender(conn, msgs):
  for mg in msgs:
    conn.send(mg)
    print("Sent message: {}".format(mg))
  conn.close()

def reciever(conn):
  while 1:
    msg = conn.recv()
    if msg == "END":
      break
    print("Recieved message: {}".format(msg))

if __name__ == "__main__":
  msgs = ["START", "MEDIAN", "END"]
  parent_conn, child_conn = multiprocessing.Pipe()
  p1 = multiprocessing.Process(target=sender, args=(parent_conn, msgs))
  p2 = multiprocessing.Process(target=reciever, args=(child_conn,))
  p1.start()
  p2.start()
  p1.join()
  p2.join()

Sent message: START
Recieved message: START
Sent message: MEDIAN
Recieved message: MEDIAN
Sent message: END


В этом примере мы создаем два процесса, связываем их с помощью Pipe и организуем простую передачу текстовых сообщений от одного к другому.

Хочется отметить, что Pipe возвращает два объекта — parent_conn и child_conn.

parent_conn — объект, который отправляет данные через Pipe.

child_conn — объект, которые принимает данные из Pipe.

Pipe полезно использовать, когда один процесс работает в фоне, например, мониторит доступность сети, а другой процесс хочет сходить по какому-нибудь адресу и спрашивает о доступности сети у другого процесса, а потом принимает на основе этого решение.