<a href="https://colab.research.google.com/github/Cru1zzz3/python-parallel-programming-cookbook/blob/main/Python_Parallel_Programming_(Lab_3)_Udartsev_Stanislav.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**How to spawn a process**

In [None]:
import multiprocessing

def foo(i):
  print ('called function in process: %s' %i)
  return

if __name__ == '__main__':
  Process_jobs = []
  for i in range(5):
    p = multiprocessing.Process(target=foo, args=(i,))
    Process_jobs.append(p)
    p.start()
    p.join()


called function in process: 0
called function in process: 1
called function in process: 2
called function in process: 3
called function in process: 4


**How to name a process**

Процесс обращения к имени процесса схож с тем, что был во второй главе во время обращения к имени потока.

In [None]:
import multiprocessing
import time

def foo():
  name = multiprocessing.current_process().name
  print("Starting %s \n" %name)
  time.sleep(3)
  print("Exiting %s \n %name")

if __name__ == '__main__':
  process_with_name = multiprocessing.Process(name='foo_process', target=foo)
  process_with_name.daemon = True
  process_with_default_name = multiprocessing.Process(target=foo)
  process_with_name.start()
  process_with_default_name.start()

Starting foo_process 

Starting Process-19 



**How to run a process in the background**

In [None]:
import multiprocessing
import time

def foo():
 name = multiprocessing.current_process().name
 print ("Starting %s \n" %name)
 time.sleep(3)
 print ("Exiting %s \n" %name)

if __name__ == '__main__':
  background_process = multiprocessing.Process(name='background_process', target=foo)
  background_process.daemon = True

  NO_background_process = multiprocessing.Process(name='NO_background_process', target=foo)
  NO_background_process.daemon = False

  background_process.start()
  NO_background_process.start()
  
  background_process.join()
  NO_background_process.join()



Starting background_process 

Starting NO_background_process 

Exiting background_process 

Exiting NO_background_process 



**How to kill a process**

Опрос статуса процесса можно с помощью метода .is_alive(). Убить процесс с помощью метода .terminate()    

In [None]:
import multiprocessing
import time

def foo():
 print ('Starting function')
 time.sleep(0.1)
 print ('Finished function')

if __name__ ==  '__main__':
  p = multiprocessing.Process(target=foo)
  print ('Process before execution:', p, p.is_alive())
  p.start()
  print ('Process running:', p, p.is_alive())
  p.terminate()
  print ('Process terminated:', p, p.is_alive())
  p.join()
  print ('Process joined:', p, p.is_alive())
  print ('Process exit code:', p.exitcode)


Process before execution: <Process(Process-31, initial)> False
Process running: <Process(Process-31, started)> True
Process terminated: <Process(Process-31, started)> True
Process joined: <Process(Process-31, stopped[SIGTERM])> False
Process exit code: -15


**How to use a process in a subclass**

In [None]:
import multiprocessing

class MyProcess(multiprocessing.Process):
  def run(self):
    print ('called run method in process: %s' %self.name)
    return

if __name__ == '__main__':
 jobs = []
 for i in range(5):
  p = MyProcess ()
  jobs.append(p)
  p.start()
  p.join()

called run method in process: MyProcess-1
called run method in process: MyProcess-2
called run method in process: MyProcess-3
called run method in process: MyProcess-4
called run method in process: MyProcess-5


**How to exchange objects between
processes**

1. Using queue to exchange objects

In [None]:
import multiprocessing
import random
import time

class producer(multiprocessing.Process):
  def __init__(self, queue):
    multiprocessing.Process.__init__(self)
    self.queue = queue
    
  def run(self) :
    for i in range(10):
      item = random.randint(0, 256)
      self.queue.put(item)
      print ("Process Producer : item %d appended to queue %s" % (item,self.name))
      time.sleep(1)
      print ("The size of queue is %s" % self.queue.qsize())

class consumer(multiprocessing.Process):
  def __init__(self, queue):
    multiprocessing.Process.__init__(self)
    self.queue = queue

  def run(self):
    while True:
      if (self.queue.empty()):
        print("the queue is empty")
        break
      else :
        time.sleep(2)
        item = self.queue.get()
        print ('Process Consumer : item %d popped from by %s \n' % (item, self.name))
        time.sleep(1)

if __name__ == '__main__':
  queue = multiprocessing.Queue()
  process_producer = producer(queue)
  process_consumer = consumer(queue)
  process_producer.start()
  process_consumer.start()
  process_producer.join()
  process_consumer.join()



Process Producer : item 139 appended to queue producer-6
The size of queue is 1
Process Producer : item 5 appended to queue producer-6
The size of queue is 1
Process Producer : item 165 appended to queue producer-6
Process Consumer : item 139 popped from by consumer-7 

The size of queue is 2
Process Producer : item 46 appended to queue producer-6
The size of queue is 3
Process Producer : item 200 appended to queue producer-6
Process Consumer : item 5 popped from by consumer-7 
The size of queue is 3

Process Producer : item 200 appended to queue producer-6
The size of queue is 4
Process Producer : item 129 appended to queue producer-6
The size of queue is 5
Process Producer : item 191 appended to queue producer-6
Process Consumer : item 165 popped from by consumer-7 

The size of queue is 5
Process Producer : item 243 appended to queue producer-6
The size of queue is 6
Process Producer : item 85 appended to queue producer-6
The size of queue is 7
Process Consumer : item 46 popped from

2. Using pipes to exchange objects 

In [None]:
import multiprocessing

def create_items(pipe):
  output_pipe, _ = pipe
  for item in range(10):
    output_pipe.send(item)
  output_pipe.close()

def multiply_items(pipe_1, pipe_2):
  close, input_pipe = pipe_1
  close.close()
  output_pipe, _ = pipe_2
  try:
    while True:
      item = input_pipe.recv()
      output_pipe.send(item * item)
  except EOFError:
    output_pipe.close()

if __name__== '__main__':
  #First process pipe with numbers from 0 to 9
  pipe_1 = multiprocessing.Pipe(True)
  process_pipe_1 = multiprocessing.Process(target=create_items, args=(pipe_1,))
  process_pipe_1.start()
  #second pipe,
  pipe_2 = multiprocessing.Pipe(True)
  process_pipe_2 = multiprocessing.Process(target=multiply_items, args=(pipe_1, pipe_2,))
  process_pipe_2.start()
  pipe_1[0].close()
  pipe_2[0].close()
  try:
    while True:
      print(pipe_2[1].recv())
  except EOFError:
    print("End")

0
1
4
9
16
25
36
49
64
81
End


**How to synchronize processes**

Примитивы синхронизации очень похожи на те, которые встречались в прошлой лабораторной работе при использовании библиотеки multithreading. Добавляется новый примитив синхронизации Barrier. Процессы могут продолжить свою работу, только при достижении порогового значения барьера, таким образом можно добиться синхронизации. 

В примере из книги значения времени при достижении барьерного значения 

In [1]:
import multiprocessing
from multiprocessing import Barrier, Lock, Process
from time import time
from datetime import datetime

def test_with_barrier(synchronizer, serializer):
  name = multiprocessing.current_process().name
  synchronizer.wait()
  now = time()
  with serializer:
    print("process %s ----> %s" %(name,datetime.fromtimestamp(now)))

def test_without_barrier():
  name = multiprocessing.current_process().name
  now = time()
  print("process %s ----> %s" %(name ,datetime.fromtimestamp(now)))

if __name__ == '__main__':
  synchronizer = Barrier(2)
  serializer = Lock()
  Process(name='p1 - test_with_barrier', target=test_with_barrier, args=(synchronizer, serializer)).start()
  Process(name='p2 - test_with_barrier', target=test_with_barrier, args=(synchronizer, serializer)).start()
  Process(name='p3 - test_without_barrier', target=test_without_barrier).start()
  Process(name='p4 - test_without_barrier', target=test_without_barrier).start()





process p1 - test_with_barrier ----> 2021-11-22 09:25:34.575058
process p2 - test_with_barrier ----> 2021-11-22 09:25:34.577652
process p3 - test_without_barrier ----> 2021-11-22 09:25:34.606573
process p4 - test_without_barrier ----> 2021-11-22 09:25:34.613505


**How to manage a state between processes**

Благодаря объекту Manager() данные могут быть доступны между несколькими процессами. В нашем случае объектом данных выступает словарь, доступный каждому процессу, значения которого вычисляются внутри каждого из процессов.

In [5]:
import multiprocessing

def worker(dictionary, key, item):
  dictionary[key] = item
  print("key = ", key, "value = ", item)

if __name__ == "__main__":
  mgr = multiprocessing.Manager()
  dictionary = mgr.dict()
  jobs = [multiprocessing.Process(target=worker, args=(dictionary, i, i*2)) for i in range(10)]
  for j in jobs: 
    j.start()
  for j in jobs:
    j.join()
  print("Results:", dictionary)

key =  0 value =  0
key =  1 value =  2
key =  2 value =  4
key =  3 value =  6
key =  4 value =  8
key =  5 value =  10
key =  6 value =  12
key =  8 value =  16
key =  7 value =  14
key =  9 value =  18
Results: {0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18}


**How to use a process pool**

Использования объекта Pool() позволяет выполнять вычисление функции параллельно, задавая необходимое параллельных процессов в пуле процессов. Для пула процессов можно применять метод  одноимённый встроенной функции .map(), которая применяет функцию к каждому элементу итерируемого объекта, но делает это параллельно.

In [19]:
def function_square(data):
  result = data*data
  return result

if __name__ == '__main__':
  inputs = list(range(100))
  pool = multiprocessing.Pool(processes=4)
  pool_outputs = pool.map(function_square, inputs)
  pool.close()
  pool.join()
  print ('Pool :', pool_outputs)

Pool : [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400, 441, 484, 529, 576, 625, 676, 729, 784, 841, 900, 961, 1024, 1089, 1156, 1225, 1296, 1369, 1444, 1521, 1600, 1681, 1764, 1849, 1936, 2025, 2116, 2209, 2304, 2401, 2500, 2601, 2704, 2809, 2916, 3025, 3136, 3249, 3364, 3481, 3600, 3721, 3844, 3969, 4096, 4225, 4356, 4489, 4624, 4761, 4900, 5041, 5184, 5329, 5476, 5625, 5776, 5929, 6084, 6241, 6400, 6561, 6724, 6889, 7056, 7225, 7396, 7569, 7744, 7921, 8100, 8281, 8464, 8649, 8836, 9025, 9216, 9409, 9604, 9801]


**Using the mpi4py Python module**

Модуль mpi4py представляет собой обётку над библиотека языка С под названием MPI, котороая позволяет устанавливать взаимодействие процессов с использованием сообщений 

In [20]:
!pip install mpi4py

Collecting mpi4py
  Downloading mpi4py-3.1.2.tar.gz (2.5 MB)
[K     |████████████████████████████████| 2.5 MB 5.1 MB/s 
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
    Preparing wheel metadata ... [?25l[?25hdone
Building wheels for collected packages: mpi4py
  Building wheel for mpi4py (PEP 517) ... [?25l[?25hdone
  Created wheel for mpi4py: filename=mpi4py-3.1.2-cp37-cp37m-linux_x86_64.whl size=2183208 sha256=48f6a817f7419910af4626f24bdaa2a7fb8221f26c27290538963e80743f2243
  Stored in directory: /root/.cache/pip/wheels/62/d6/0a/91ea5f998589bc72ca3fcf425c0e364baf8536753e1cd26ea4
Successfully built mpi4py
Installing collected packages: mpi4py
Successfully installed mpi4py-3.1.2


In [47]:
from google.colab import files

# helloWorld_MPI.py
with open('/content/helloWorld_MPI.py', 'w') as f:
  f.write( 
      "from mpi4py import MPI\n" 
      "comm = MPI.COMM_WORLD\n"
      "rank = comm.Get_rank()\n"
      "print ('hello world from process ', rank)")

! mpiexec --allow-run-as-root -np 5 python /content/helloWorld_MPI.py

hello world from process  1
hello world from process  0
hello world from process  2
hello world from process  4
hello world from process  3


**Point-to-point communication**