# Python 201: A multiprocessing tutorial
http://www.blog.pythonlibrary.org/2016/08/02/python-201-a-multiprocessing-tutorial/       

## multiprocessing

In [1]:
# version 1
import os

from multiprocessing import Process

def doubler(number):
    """
    A doubling function that can be used by a process
    """
    result = number * 2
    proc = os.getpid()
    print('{0: >3} doubled to {1: >3} by process id: {2}'.format(number, result, proc))

if __name__ == '__main__':
    numbers = [2,3,5,7,11,13]
    procs = []
    
    
    for index, number in enumerate(numbers):
        proc = Process(target = doubler, args = (number,))
        procs.append(proc)
        proc.start()
        
    for proc in procs:
        proc.join()  # the join() tells python to wait for the process to terminate

  2 doubled to   4 by process id: 38080
  3 doubled to   6 by process id: 38081
  7 doubled to  14 by process id: 38083
  5 doubled to  10 by process id: 38082
 11 doubled to  22 by process id: 38084
 13 doubled to  26 by process id: 38085


## current_process().name

In [2]:
# version 2

import os

from multiprocessing import Process, current_process

def doubler(number, index = -1):
    """
    A doubling function that can be used by a process
    """
    result = number * 2
    proc_name = current_process().name # current_process is the same thing as current_thread
    print('Index[{3: >2}] {0: >3} doubled to {1: >3} by process id: {2}'.format(number, result, proc_name, index))

if __name__ == '__main__':
    numbers = [2,3,5,7,11,13]
    procs = []
#     proc = Process(target=doubler, args = (5,))
    
    for index, number in enumerate(numbers):
        proc = Process(target = doubler, args = (number, index))
        procs.append(proc)
        proc.start()
        
    proc = Process(target=doubler, name='Test', args=(2,))
    proc.start()
    procs.append(proc)
    
    for proc in procs:
        proc.join()  # the join() tells python to wait for the process to terminate

Index[ 0]   2 doubled to   4 by process id: Process-7
Index[ 1]   3 doubled to   6 by process id: Process-8
Index[ 2]   5 doubled to  10 by process id: Process-9
Index[ 3]   7 doubled to  14 by process id: Process-10
Index[ 4]  11 doubled to  22 by process id: Process-11
Index[ 5]  13 doubled to  26 by process id: Process-12
Index[-1]   2 doubled to   4 by process id: Test


## Locks
The multiprocessing module supports locks in much the same way as the threading module does. All you need to do is import Lock, acquire it, do something and release it.

To prevent the threads from interfering with each other, we use a Lock object. This code will loop over our list of three items and create a process for each of them. Each process will call our function and pass it one of the items from the iterable. Because we’re using locks, the next process in line will wait for the lock to release before it can continue.

In [3]:
# with Lock

from multiprocessing import Process, Lock

def printer(item, lock):
    """
    Prints out items
    """
    lock.acquire()
    try:
        print(item)
    finally:
        lock.release()
        
if __name__ == '__main__':
    lock = Lock()
    items = ['a','b','c','d','e',1,2,3,4,5]
    for item in items:
        p = Process(target=printer, args = (item, lock))
        p.start()

a
b
c
d
e
1
2
3
4
5


In [9]:
# without Lock

from multiprocessing import Process, Lock

def printer(item, lock):
    """
    Prints out items
    """
#     lock.acquire()
    print(item)
#     finally:
#         lock.release()

if __name__ == '__main__':
#     lock = Lock()
    items = ['a','b','c','d','e',1,2,3,4,5]
    for item in items:
        p = Process(target=printer, args = (item, lock))
        p.start()

b
a
c
d
e
1
2
3
4
5


## Logging
Logging processes is a little different than logging threads. The reason for this is that Python’s logging packages doesn’t use process shared locks, so it’s possible for you to end up with messages from different processes getting mixed up. 

In [5]:
import logging
import multiprocessing

from multiprocessing import Process, Lock

def printer(item, lock):
    lock.acquire()
    try:
        print(item)
    finally:
        lock.release()
        
if __name__ == '__main__':
    lock = Lock()
    items = ['a','b','c']
    multiprocessing.log_to_stderr() # the simplest way to send log to stderr
    logger = multiprocessing.get_logger() # get access to logger
    logger.setLevel(logging.INFO)
    for item in items:
        p = Process(target=printer, args = (item, lock))
        p.start()
        # join() is called implicitly when the parent thread exits
    logger.setLevel(logging.WARNING) # raise logging level  


[INFO/Process-34] child process calling self.run()


a


[INFO/Process-34] process shutting down
[INFO/Process-35] child process calling self.run()
[INFO/Process-34] process exiting with exitcode 0


b


[INFO/Process-35] process shutting down
[INFO/Process-36] child process calling self.run()


c


[INFO/Process-35] process exiting with exitcode 0
[INFO/Process-36] process shutting down
[INFO/Process-36] process exiting with exitcode 0


## The Pool Class

The Pool class is used to represent a pool of worker processes. It has methods which can allow you to offload tasks to the worker processes. 

In [6]:
from multiprocessing import Pool

def doubler(number):
    return number * 2

if __name__ == '__main__':
    numbers = [5,10,20]
    pool = Pool(processes = 3) # create an instance pool having 3 worker processes
    print(pool.map(doubler, numbers))

[10, 20, 40]


can also get the result using apply_async, and print with get()

In [7]:
from multiprocessing import Pool

def doubler(number):
    return number * 2

if __name__ == '__main__':
    numbers = [5,10,20]
    pool = Pool(processes = 3) # create an instance pool having 3 worker processes
    result = pool.apply_async(doubler, (5,))
    print(result.get(timeout=5))

10


## Process Communication

When it comes to communicating between processes, the multiprocessing modules has two primary methods: Queues and Pipes. The Queue implementation is actually both thread and process safe.

In [8]:
from multiprocessing import Process, Queue

sentinel = -1

def creator(data, q):
    print('creating data and putting it on the queue: {}'.format(data))
    for item in data:
        q.put(item)
        
def my_consumer(q):
    while True:
        data = q.get()
        print('data found to be processed: {}'.format(data))
        processed = data * 2
        print('processed: ',processed)
        
        if data is sentinel:
            break

if __name__ == '__main__':
    q = Queue()
    data = [5,10,13,-1]
    process_one = Process(target=creator, args = (data, q))
    process_two = Process(target=my_consumer, args = (q,))
    process_one.start()
    process_two.start()
    print('queue is about to close')
    q.close()
    q.join_thread()
    process_one.join()
    process_two.join()

creating data and putting it on the queue: [5, 10, 13, -1]
data found to be processed: 5
processed:  10
data found to be processed: 10
processed:  20
data found to be processed: 13
processed:  26
data found to be processed: -1
processed:  -2
queue is about to close
