# CDP - 236370
## Tutorial 2 - Python Multiprocessing 
Written by Ido Hakimi, Qasem Sayah, and Anny Firer.
<br>
Adapted from `PyMOTW-3` (https://pymotw.com/3/multiprocessing/index.html)
<br>
Documentation: https://docs.python.org/3/library/multiprocessing.html

### Processes vs Threads:

**Process**
* Created by the operating system to run programs.
* Processes can have multiple threads.
* Two processes can execute code simultaneously in the same python program.
* Processes have more overhead than threads as opening and closing processes takes more time.
* Sharing information between processes is slower than sharing between threads as processes do not share memory space. In python they share information by pickling data structures like arrays which requires IO time.

**Thread**
* Threads are like mini-processes that live inside a process.
* They share memory space and efficiently read and write to the same variables.
* Two threads cannot execute code simultaneously in the same python program (although there are workarounds*).

### Python’s GIL problem


 CPython (the standard python implementation) has something called the GIL (Global Interpreter Lock), which prevent two threads from executing simultaneously in the same program. Some people are upset by this, while others fiercely defend it. There are workarounds, however, and libraries like Numpy bypass this limitation by running external code in C.

*   Processes speed up Python operations that are CPU intensive because they benefit from multiple cores and avoid the GIL.
*   Threads provide no benefit in python for CPU intensive tasks because of the GIL.



### The Process class

In multiprocessing, processes are spawned by creating a Process object and then calling its start() method. To wait until a process has completed its work and exited, use the join() method.

In [None]:
from multiprocessing import Process
import os

def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())

def f(name):
    info('function f')
    print('hello', name)

if __name__ == '__main__':
    info('main line')
    p = Process(target=f, args=('hello',))
    p.start()
    p.join()

Spawning multiple processes.

In [None]:
import multiprocessing


def worker(num):
    print('Worker:', num)


if __name__ == '__main__':
    jobs = []
    for i in range(10):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()
        
    for j in jobs:
      j.join()

#### Determining the Current Process

Passing arguments to identify or name the process is cumbersome, and unnecessary. Each Process instance has a name with a default value that can be changed as the process is created. Naming processes is useful for keeping track of them, especially in applications with multiple types of processes running simultaneously.

In [None]:
import multiprocessing
import time


def worker():
    name = multiprocessing.current_process().name
    print(name, 'Starting')
    time.sleep(2)
    print(name, 'Exiting')


def my_service():
    name = multiprocessing.current_process().name
    print(name, 'Starting')
    time.sleep(3)
    print(name, 'Exiting')
    return 2


if __name__ == '__main__':
    service = multiprocessing.Process(
        name='my_service',
        target=my_service,
    )
    worker_1 = multiprocessing.Process(
        name='worker 1',
        target=worker,
    )
    worker_2 = multiprocessing.Process(  # default name
        target=worker,
    )

    worker_1.start()
    worker_2.start()
    service.start()
    
    worker_1.join()
    worker_2.join()
    service.join()

#### Terminating Processes

If a process appears hung or deadlocked it can be useful to be able to kill it forcibly. Calling `terminate()` on a process object kills the child process. It is important to join() the process after terminating it in order to give the process management code time to update the status of the object to reflect the termination.

In [None]:
import multiprocessing
import time


def slow_worker():
    print('Starting worker')
    time.sleep(0.1)
    print('Finished worker')


if __name__ == '__main__':
    p = multiprocessing.Process(target=slow_worker)
    print('BEFORE:', p, p.is_alive())

    p.start()
    print('DURING:', p, p.is_alive())

    p.terminate()
    print('TERMINATED:', p, p.is_alive())

    p.join()
    print('JOINED:', p, p.is_alive())

#### Synchronization between processes
`multiprocessing` contains equivalents of all the synchronization primitives from threading. For instance one can use a lock to ensure that only one process prints to standard output at a time:

In [None]:
from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()

if __name__ == '__main__':
    lock = Lock()
    for num in range(10):
        Process(target=f, args=(lock, num)).start()

### Exchanging objects between processes

Unlike threads, processes, communicate over interprocess communication channel. Multiprocessing supports two types of communication channel between processes:

1.   Queues
2.   Pipes

#### Queues

The `Queue` class is a near clone of `Queue.Queue`.
Queues are thread and process safe.

In [None]:
from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])
    
if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    p.join() #join on Proccess p
    print(q.get())

The `JoinableQueue` class, a Queue subclass, is a queue which additionally has `task_done()` and `join()` methods:
 * `join()` Blocks until all items in the queue have been gotten and processed.
 * `task_done()` should be called for each task removed from the queue or else the semaphore used to count the number of unfinished tasks may eventually overflow, raising an exception.

In [None]:
from multiprocessing import Process, JoinableQueue
import time
n_subscribers = 100

class Subscriber:
    def send(self, message):
        time.sleep(0.01) #time it takes for the subscriber to get the message
        return 0

def publisher(q, message):
    subscribers = []
    for _ in range(n_subscribers):
        subscribers.append(q.get())

    for sub in subscribers:
        if sub.send(message) != 0:
            pass  # do something
        else:
            q.task_done()


if __name__ == '__main__':
    message = "Hello world!"  # some message to send to all subscribers
    subscribers = [Subscriber() for _ in range(n_subscribers)]  # list of subscribers

    q = JoinableQueue()
    print("Begin broadcasting message")
    for sub in subscribers:
        q.put(sub)

    p = Process(target=publisher, args=(q, message))
    p.start()

    q.join()  # join on JoinableQueue q
    print("All subscribers received the message")

#### Pipes

A pipe can have only two endpoints. Hence, it is preferred over queue when only two-way communication is required. The `Pipe()` function returns a pair of connection objects connected by a pipe which by default is duplex (two-way).

Data in a pipe may become corrupted if two processes (or threads) try to read from or write to the same end of the pipe at the same time. Of course there is no risk of corruption from processes using different ends of the pipe at the same time. Also note that, Queues do proper synchronization between processes, at the expense of more complexity. Hence, queues are said to be thread and process safe.

In [None]:
from multiprocessing import Process, Pipe
import time

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())
    p.join()

### Performance Benchmarking Pipes vs Queues 
#### Pipes

In [None]:
### Performance Benchmarking Pipes vs Queues 
#### Queuesfrom multiprocessing import Process, Pipe
import time

def reader_pipe(conn):
    while True:
        msg = conn.recv()    # Read from the output pipe and do nothing
        if msg=='DONE':
            break
    conn.close()

def writer_pipe(count, conn):
    for i in range(count):
        conn.send(i)             # Write 'count' numbers into the input pipe
    conn.send('DONE')
    conn.close()

if __name__=='__main__':
    for count in [10**4, 10**5, 10**6]:
        # Pipes are unidirectional with two endpoints:  p_input ------> p_output
        p_w, p_r = Pipe()  # writer() writes to p_input from _this_ process
        reader_p = Process(target=reader_pipe, args=(p_r,))
        reader_p.start()     # Launch the reader process

        s = time.time()
        writer_pipe(count, p_w) # Send a lot of stuff to reader_proc()
        reader_p.join()
        e = time.time()
        print("Sending {0} numbers to Pipe() took {1} seconds".format(count, e-s))

#### Queues

In [None]:
from multiprocessing import Process, Queue
import time

def reader_queue(q):
    while True:
        msg = q.get()     # Read from the output pipe and do nothing
        if msg=='DONE':
            break

def writer_quque(count, q):
    for i in range(count):
        q.put(i)
    q.put('DONE')

if __name__=='__main__':
    for count in [10**4, 10**5, 10**6]:
        q = Queue()
        reader_p = Process(target=reader_queue, args=(q,))
        reader_p.start()     # Launch the reader process

        s = time.time()
        writer_quque(count, q) # Send a lot of stuff to reader_proc()
        reader_p.join()
        e = time.time()
        print("Sending {0} numbers to Queue() took {1} seconds".format(count, e-s))

### Process Pools
The Pool class can be used to manage a fixed number of workers for simple cases where the work to be done can be broken up and distributed between workers independently. The return values from the jobs are collected and returned as a list. The pool arguments include the number of processes and a function to run when starting the task process (invoked once per child).

In [None]:
import multiprocessing
import time


def do_calculation(data):
    return data * 2

def do_square(data):
    return data * data

def start_process():
    print('Starting', multiprocessing.current_process().name)


if __name__ == '__main__':
    inputs = list(range(10))
    print('Input   :', inputs)
    print('cpu_count:',multiprocessing.cpu_count())
    pool_size = multiprocessing.cpu_count() * 2
    pool = multiprocessing.Pool(
        processes=pool_size,
        initializer=start_process,
    )
    pool_outputs1 = pool.map(do_calculation, inputs)
    pool_outputs2 = pool.map(do_square, inputs)
    pool.close()  # no more tasks
    pool.join()  # wrap up current tasks
    print('Pool do_calculation:', pool_outputs1)
    print('Pool do_square:', pool_outputs2)

### Consumers-Producer Example
A more complex example shows how to manage several workers consuming data from a `JoinableQueue` and passing results back to the parent process. The poison pill technique is used to stop the workers. After setting up the real tasks, the main program adds one “stop” value per worker to the job queue. When a worker encounters the special value, it breaks out of its processing loop. The main process uses the task queue’s join() method to wait for all of the tasks to finish before processing the results.

#### Consumer

In [None]:
import multiprocessing
import time


class Consumer(multiprocessing.Process):

    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue

    def run(self):
        proc_name = self.name
        while True:
            next_task = self.task_queue.get()
            if next_task is None:
                # Poison pill means shutdown
                print('{}: Exiting'.format(proc_name))
                self.task_queue.task_done()
                break
            print('{}: {}'.format(proc_name, next_task))
            answer = next_task()
            self.task_queue.task_done()
            self.result_queue.put(answer)


#### Task

In [None]:
class Task:

    def __init__(self, a, b):
        self.a = a
        self.b = b

    def __call__(self):
        time.sleep(0.1)  # pretend to take time to do the work
        return '{self.a} * {self.b} = {product}'.format(
            self=self, product=self.a * self.b)

    def __str__(self):
        return '{self.a} * {self.b}'.format(self=self)



#### Producer-consumers

In [None]:
# multiprocessing_producer_consumer.py

if __name__ == '__main__':
    # Establish communication queues
    tasks = multiprocessing.JoinableQueue()
    results = multiprocessing.Queue()

    # Start consumers
    num_consumers = multiprocessing.cpu_count() * 2
    print('Creating {} consumers'.format(num_consumers))
    consumers = [ Consumer(tasks, results) for i in range(num_consumers) ]
    for w in consumers:
        w.start()

    # Enqueue jobs
    num_jobs = 10
    for i in range(num_jobs):
        tasks.put(Task(i, i))

    # Add a poison pill for each consumer
    for i in range(num_consumers):
        tasks.put(None)

    # Wait for all of the tasks to finish
    tasks.join()

    # Start printing results
    while num_jobs:
        result = results.get()
        print('Result:', result)
        num_jobs -= 1