# **Multiprocessing with Python**

Python is inherently not designed for multicore works. It has something called GIL (global interpreter lock) which causes all the threads to act in serial manner when interacting with python interpreter. But this does not mean that there are no ways to make multicore programs in python.

But there are few things to note before diving in to multiprocessing.

1. Speedup is not linear with cores/threads used. There are overheads atatched to multiprocessing. (as any other language of course)
2. Shared  states between threads means more annoyance to handle, which means high overhead/ development effort.
3. Program will get affected by Amdahl's law. if dont remember read!
4. Python threads are not like other languages (Java, C++). They are literal threads as in OS native threads, but they all will work in single process which means one python interpreter. So will get bottlenecked by the GIL. Therefore we use processes with independent interpreters for each.
5. Should reduce message/state passing between processes as much as possible.

## Python Multiprocessing overview

* Process - A forked copy of current process with new identifier. We can provide a target method to run in it.
* Pool - Wrapper for process or threading. 
* Queue - A FIFO queue for multiple producer/consumer pattern
* Pipe - A communication channel between 2 processes.
* Manager - A high level managed interface to share objects between processes.


For the example usage of multiprocessing, we will use Monte Carlo simulation of Pi calculation (learned in stats). Below is the base implementation of experiment unit (random x, y values in unit area)

In [4]:
%%writefile Multiprocessing/pi_estimate.py

import os
import random

def calc_point_inside_circle(num_of_estimates):
    
    print(f"Executing calc_point_inside_circle with {num_of_estimates:,} on pid {os.getpid()}")

    trials_inside_circle = 0

    for step in range(int(num_of_estimates)):
        x = random.uniform(0,1)
        y = random.uniform(0,1)

        is_inside_circle = 1 if (x**2 + y**2) <= 1 else 0
        trials_inside_circle += is_inside_circle

    return trials_inside_circle


from multiprocessing import Pool # This is process based
# from multiprocessing import Pool # This is thread based
import time

if __name__ == "__main__":
    total_trials = 1e8
    num_workers = 4

    pool = Pool(processes=num_workers)
    trials_per_worker = total_trials/num_workers
    trials_per_processes = [trials_per_worker]*num_workers

    start_time = time.time()
    trials_inside_circle = pool.map(calc_point_inside_circle, trials_per_processes)
    pi_estimate = (sum(trials_inside_circle)*4)/float(total_trials)
    print(pi_estimate)
    print(f"Time consumed: {time.time()-start_time}")


Overwriting Multiprocessing/pi_estimate.py



If we run the above segment in 4 processes output is as follows.

<center><image src="./img/15.jpg" width="700"/></center>

If we only used 1 process output would be like below.

<center><image src="./img/16.jpg" width="600"/></center>

We can see the clear performance improvement by using multiple processes for this operation. But instead if we used threads we cant expect the same amount of performance gain due to GIL.

## Python Joblib module

Joblib is an improvement on the multioprocessing module, with lightweight pipelining. It can easily be used in pure python/numpy processes with embarasingly parrallel preperty. Also this can be used in calling expensive functions where outputs can be cached to disk between sessions.
Install joblib package using below.

<center>pip install joblib</center>

In [13]:
%%writefile Multiprocessing/pi_estimate_joblib.py

import os
import random
import time

def calc_point_inside_circle(num_of_estimates):
    
    print(f"Executing calc_point_inside_circle with {num_of_estimates:,} on pid {os.getpid()}")

    trials_inside_circle = 0

    for step in range(int(num_of_estimates)):
        x = random.uniform(0,1)
        y = random.uniform(0,1)

        is_inside_circle = 1 if (x**2 + y**2) <= 1 else 0
        trials_inside_circle += is_inside_circle

    return trials_inside_circle

from joblib import Parallel, delayed
if __name__ == "__main__":

    total_trials = 1e8
    num_workers = 4

    trials_per_worker = total_trials/num_workers
    trials_per_processes = [trials_per_worker]*num_workers


    parrallel_obj = Parallel(n_jobs=num_workers, verbose=1)
    async_function = delayed(calc_point_inside_circle)

    start_time = time.time()
    trials_inside_circle =  parrallel_obj(async_function(trials_per_worker) for _ in range(num_workers))
    pi_estimate = (sum(trials_inside_circle)*4)/float(total_trials)

    print(pi_estimate)
    print(f"Time consumed: {time.time()-start_time}")


Overwriting Multiprocessing/pi_estimate_joblib.py


Joblib function syntax is bit confusing as it behave like a chain. Anyhow the output is as follows.

<center><image src="./img/17.jpg" width="600"/></center>

Total consumed time is less than just using a one process.

Parrallel class have so many parameter which we can play with including debug info, timeouts, change usage to threads instead of processes and change the backend etc. Can mess around with those based on the requirement you have.

Another useful feature in joblib is it's `MemoryCache`. This decorator function saves the results to disk cache based on the input arguments. But this require the async_function to have unique arguments. Otherwise cache store wont be able to uniquely identify the results from each process. To do that we can add an index as additional parameter. Check the code below.


In [15]:
%%writefile Multiprocessing/pi_estimate_joblib_cached.py

import os
import random
import time
from joblib import Parallel, delayed
from joblib import Memory
memory = Memory("./Multiprocessing/joblib_cache", verbose=0)

@memory.cache
def calc_point_inside_circle_with_idx(num_of_estimates, idx):
    
    print(f"Executing calc_point_inside_circle with {num_of_estimates:,} and index {idx} on pid {os.getpid()}")

    trials_inside_circle = 0

    for step in range(int(num_of_estimates)):
        x = random.uniform(0,1)
        y = random.uniform(0,1)

        is_inside_circle = 1 if (x**2 + y**2) <= 1 else 0
        trials_inside_circle += is_inside_circle

    return trials_inside_circle

if __name__ == "__main__":

    total_trials = 1e8
    num_workers = 4

    trials_per_worker = total_trials/num_workers
    trials_per_processes = [trials_per_worker]*num_workers


    parrallel_obj = Parallel(n_jobs=num_workers, verbose=1)
    async_function = delayed(calc_point_inside_circle_with_idx)

    start_time = time.time()
    trials_inside_circle =  parrallel_obj(async_function(trials_per_worker, i) for i in range(num_workers))
    pi_estimate = (sum(trials_inside_circle)*4)/float(total_trials)

    print(pi_estimate)
    print(f"Time consumed: {time.time()-start_time}")


Overwriting Multiprocessing/pi_estimate_joblib_cached.py


If we run the above code for the first time output is as follows.

<center><image src="./img/18.jpg" width="600"/></center>

But if we did not change the arguments, then results will come from the cache improving the performance significantly.

<center><image src="./img/19.jpg" width="600"/></center>

### Random number generation in parallel processes

Generating random numbers is a weird problem itself, and if we try to do it in parallel processes things get complicated bit more as there may be repeating patterns, correlations between numbers generated by the processes. 

> If we use normal python random module with multiprocessing, they will automatically get seeded with different values which would yield unique sequence for each process. But if we used numpy random generators, we need to explicitly set the seeds. Otherwise each process would have same random sequence.

Below is example numpy based implementation.

In [23]:
%%writefile Multiprocessing/pi_estimate_joblib_cached_numpy.py

import os
import random
import time
from joblib import Parallel, delayed
from joblib import Memory
import numpy as np

memory = Memory("./Multiprocessing/joblib_cache", verbose=0)

@memory.cache
def calc_point_inside_circle_with_numpy(num_of_estimates, idx):
    
    print(f"Executing calc_point_inside_circle with {num_of_estimates:,} and index {idx} on pid {os.getpid()}")
    import numpy as np
    np.random.seed()

    xs = np.random.uniform(0, 1, num_of_estimates)
    ys = np.random.uniform(0, 1, num_of_estimates)

    sqr_zs = (xs*xs + ys*ys) <= 1 # returns a boolean array with condition checked

    trials_inside_circle = np.sum(sqr_zs)
    return trials_inside_circle

if __name__ == "__main__":

    total_trials = 1e8
    num_workers = 4

    trials_per_worker = int(total_trials/num_workers)
    trials_per_processes = [trials_per_worker]*num_workers


    parrallel_obj = Parallel(n_jobs=num_workers, verbose=1)
    async_function = delayed(calc_point_inside_circle_with_numpy)

    start_time = time.time()
    trials_inside_circle =  parrallel_obj(async_function(trials_per_worker, i) for i in range(num_workers))
    pi_estimate = (np.sum(trials_inside_circle)*4)/float(total_trials)

    print(pi_estimate)
    print(f"Time consumed: {time.time()-start_time}")


Overwriting Multiprocessing/pi_estimate_joblib_cached_numpy.py


This is the output of the above example code. See the improvement gained by numpy operations.

<center><image src="./img/20.jpg" width="600"/></center>

## Prime Number search

Another sample example for using parrallel processing is finding prime numbers in a given range. This is a computationally expensive process specially if the number is large. Also because of that, even if we run the tasks parrallely tasks containing large numbers would take more time. Therefore different type of scheduling queue or task assigning method would be more beneficial.

To do that we can use `multiprocessing queues`. These keep objects which can be shared across multiple processes. They are synchronized and non persistent which means there are overheads in using and will loose data if program crashed.

These multiprocessing queues use picked objects to share data. Keep that in mind when developing programs as it could be a bottleneck.

In [25]:
import math

def check_prime_serial(n):
    if n % 2 == 0:
        return False
    for i in range(3, int(math.sqrt(n)) + 1, 2):
        if n % i == 0:
            return False
    return True

Below is the implementation of above check prime function using queues. 

**Very similar to Pub-Sub architecture in software development**

In [30]:
%%writefile Multiprocessing/prime_search_queues.py

import multiprocessing
from multiprocessing import Pool
import time
import math

FLAG_ALL_DONE = b"WORK_FINISHED"
FLAG_WORKER_FINISHED_PROCESSING = b"WORKER_FINISHED_PROCESSING"

def check_prime(search_space_queue, verified_primes_queue):

    while True:
        n = search_space_queue.get() # This is a blocking operation.

        if n == FLAG_ALL_DONE:
            # flag that our results have all been pushed to the results queue
            verified_primes_queue.put(FLAG_WORKER_FINISHED_PROCESSING)
            break
        else:
            if n % 2 == 0:
                continue
            for i in range(3, int(math.sqrt(n)) + 1, 2):
                if n % i == 0:
                    break
            else:
                verified_primes_queue.put(n)


if __name__ == '__main__':

    num_of_workers = 4

    primes = []
    manager = multiprocessing.Manager()
    search_space_queue = manager.Queue()
    verified_primes_queue = manager.Queue()

    pool = Pool(processes=num_of_workers)

    # Initializing the child processes.
    processes = []
    for _ in range(num_of_workers):
        p = multiprocessing.Process(target=check_prime, 
                                    args=(  search_space_queue, 
                                            verified_primes_queue)
                                    )
        processes.append(p)
        p.start()

    # Actual calculations begin here!
    t1 = time.time()
    number_range = range(100_000_000, 101_000_000)

    # add jobs to the inbound work queue
    for number in number_range:
        search_space_queue.put(number)

    # add poison pills to stop the remote workers(force to break from the infinite loop)
    for n in range(num_of_workers):
        search_space_queue.put(FLAG_ALL_DONE)

    # Now wait till child processes complete their tasks.
    finished_child_processes = 0
    while True:
        new_result = verified_primes_queue.get() # Wait till a child process put a result to the queue
        if new_result == FLAG_WORKER_FINISHED_PROCESSING:
            finished_child_processes += 1
            if finished_child_processes == num_of_workers:
                break
        else:
            primes.append(new_result)

    assert finished_child_processes == num_of_workers

    print("Took:", time.time() - t1)
    print(len(primes), primes[:10], primes[-10:])


Overwriting Multiprocessing/prime_search_queues.py


Here 2 flags are defined to control the behaviour of the task. The `FLAG_ALL_DONE` is called poison pill because it causes the process to exit the loop once recognized. It is fed by the parent process and once every value is processed, it will act as a sentinel to stop further processing. Second flag, `FLAG_WORKER_FINISHED_PROCESSING` is controlled by the child processes. 

Above program outout is as below.

<center><image src="./img/21.jpg" width="600"/></center>

As you can see this takes considerable amount of time. If we used just one process output is as below.

<center><image src="./img/22.jpg" width="600"/></center>

Here time consumption is less than 4 processes. Reason for that is using queues have large overhead because of locking mechanisms, pickling/unpickling processes etc. But our problem is not large enough to gain any benefit from using this type of implementation.

> If your task has a long completion time (at least a sizable fraction of a second) with a small amount of communication, a Queue approach might be the right answer.

## Interprocess Communication

In order to check the python interprocess communication behaviour, we will check the task of verifying whether the given number is prime. In prime number checking it becomes time consuming when the numbers getting larger as larger numbers can have more factors. Therefore using multiple processes to do this task can be more efficient.

Below include various IPC methods used for the verification of prime numbers.

In [52]:
from math import sqrt

def check_prime_serial(n):

    if(n%2==0):
        return False
    for i in range(3, int(sqrt(n)+1), 2):
        if(n%i==0):
            return False
    return True

%timeit check_prime_serial(112272535095295)

587 ns ± 1.65 ns per loop (mean ± std. dev. of 7 runs, 1,000,000 loops each)


In [50]:
%%writefile Multiprocessing/verify_prime_naive.py

import multiprocessing
from multiprocessing import Pool
import time
import math

def create_range(from_i, to_i, num_processes):

    n = to_i - from_i
    chunk = (n/num_processes)

    output = []
    for i in range(num_processes):
        output.append((from_i+int(chunk*i), from_i+int(chunk*(i+1))))
    
    return output

def check_prime(n, pool, num_processes):
    from_i = 3
    to_i = int(math.sqrt(n)) + 1
    ranges_to_check = create_range(from_i, to_i, num_processes)
    ranges_to_check = zip(num_processes*[n], ranges_to_check)

    results = pool.map(check_prime_in_range, ranges_to_check)
    if False in results:
        return False
    return True

def check_prime_in_range(n_from_i_to_i):
    (n, (from_i, to_i)) = n_from_i_to_i
    if n % 2 == 0:
        return False
    if (from_i % 2 == 0):
        from_i = from_i - 1
    for i in range(from_i, int(to_i), 2):
        if n % i == 0:
            return False
    return True

if __name__ == '__main__':
    #[non_prime, non_prime, non_prime, prime, prime]
    test_cases = [112272535095295, 100109100129100369, 100109100129101027, 100109100129100151, 100109100129162907]

    num_of_workers = 4

    primes = []
    manager = multiprocessing.Manager()
    
   

    for n in test_cases:
        pool = Pool(processes=num_of_workers)
        st = time.time()
        print(f'The Value {n} is a prime: {check_prime(n, pool, num_of_workers)} (Took {time.time() - st} seconds')
        


Overwriting Multiprocessing/verify_prime_naive.py


The output of above naive implementation is as follows.

<center><image src="./img/23.jpg" width="500"/></center>

Here in small non_prime case, the time it took it is bit large compared to the pure serial implementation (587 ns ± 1.65 ns per loop (mean ± std. dev. of 7 runs, 1,000,000 loops each)). This is due to various overheads in parrallel processing. We can make this solution better by making a serial check for first smaller digit section. Sample implementation is as below.

In [5]:
%%writefile Multiprocessing/verify_prime_less_naive.py

import multiprocessing
from multiprocessing import Pool
import time
import math

def create_range(from_i, to_i, num_processes):

    n = to_i - from_i
    chunk = (n/num_processes)

    output = []
    for i in range(num_processes):
        output.append((from_i+int(chunk*i), from_i+int(chunk*(i+1))))
    
    return output

def check_prime(n, pool, num_processes):
    from_i = 3
    to_i = int(math.sqrt(n)) + 1

    result = check_prime_in_range((n, (from_i, min(to_i, 21))))
    if to_i<=21 or result==False:
        return result
    
    from_i = 21
    ranges_to_check = create_range(from_i, to_i, num_processes)
    ranges_to_check = zip(num_processes*[n], ranges_to_check)

    results = pool.map(check_prime_in_range, ranges_to_check)
    if False in results:
        return False
    return True

def check_prime_in_range(n_from_i_to_i):
    (n, (from_i, to_i)) = n_from_i_to_i
    if n % 2 == 0:
        return False
    if (from_i % 2 == 0):
        from_i = from_i - 1
    for i in range(from_i, int(to_i), 2):
        if n % i == 0:
            return False
    return True

if __name__ == '__main__':
    #[non_prime, non_prime, non_prime, prime, prime]
    test_cases = [112272535095295, 100109100129100369, 100109100129101027, 100109100129100151, 100109100129162907]

    num_of_workers = 4

    primes = []
    manager = multiprocessing.Manager()
    
    for n in test_cases:
        pool = Pool(processes=num_of_workers)
        st = time.time()
        print(f'The Value {n} is a prime: {check_prime(n, pool, num_of_workers)} (Took {time.time() - st} seconds)')
        


Overwriting Multiprocessing/verify_prime_less_naive.py


See the speedup we gain for the first non prime number.

<center><image src="./img/24.jpg" width="500"/></center>

Serialization of code segments as a precheck condition is a common technique used in parallel computing to avoid overheads associated with the parallelization.

One major issue in above implementation is that, even though a processes exited after realizing the number is a prime, other processes will still continue to search in their assigned range since they do not communicate with each other. To solve that issue we can use a flag like variable common for all the processes. This flag provided by `multiprocessing.Manager` class is bit uninuitive in my opinion as of now. Any how usage of it is below.

In [11]:
%%writefile Multiprocessing/verify_prime_flagged.py

import multiprocessing
from multiprocessing import Pool
import time
import math

def create_range(from_i, to_i, num_processes):

    n = to_i - from_i
    chunk = (n/num_processes)

    output = []
    for i in range(num_processes):
        output.append((from_i+int(chunk*i), from_i+int(chunk*(i+1))))
    
    return output

def check_prime(n, pool, num_processes, flag):
    from_i = 3
    to_i = int(math.sqrt(n)) + 1
    flag.value = FLAG_CLEAR

    result = check_prime_in_range( (n, (from_i, min(to_i, SERIAL_CHECK_CUTOFF)), flag) )
    if to_i<=SERIAL_CHECK_CUTOFF or result==False:
        return result
    
    from_i = SERIAL_CHECK_CUTOFF
    ranges_to_check = create_range(from_i, to_i, num_processes)
    ranges_to_check = zip(num_processes*[n], ranges_to_check, num_processes*[flag])

    results = pool.map(check_prime_in_range, ranges_to_check)
    if False in results:
        return False
    return True

def check_prime_in_range(n_from_i_to_i_flag):
    (n, (from_i, to_i), flag) = n_from_i_to_i_flag
    if n % 2 == 0:
        return False
    if (from_i % 2 == 0):
        from_i = from_i - 1

    check_freq = CHECK_EVERY
    for i in range(from_i, int(to_i), 2):
        if(i%check_freq==0):
            if(flag.value==FLAG_SET):
                return False
        if n % i == 0:
            flag.value = FLAG_SET
            return False
    return True


SERIAL_CHECK_CUTOFF = 21
CHECK_EVERY = 1000
FLAG_CLEAR = b'0'
FLAG_SET = b'1'

if __name__ == '__main__':
    #[non_prime, non_prime, non_prime, prime, prime]
    test_cases = [112272535095295, 100109100129100369, 100109100129101027, 100109100129100151, 100109100129162907]

    num_of_workers = 4

    primes = []
    manager = multiprocessing.Manager()
    flag = manager.Value(b'c', FLAG_CLEAR) # Setting the initial flag
    
    for n in test_cases:
        pool = Pool(processes=num_of_workers)
        st = time.time()
        print(f'The Value {n} is a prime: {check_prime(n, pool, num_of_workers, flag)} (Took {time.time() - st} seconds)')
        


Overwriting Multiprocessing/verify_prime_flagged.py


The output is as follows.

<center><image src="./img/25.jpg" width="500"/></center>

As we can observe, execution speed is not great because of the additional overhead of reading a shared object. Therefore usage of this technique will greatly depend on your task.


__Instead of using multiprocessing library provided `Manager.Value` we can use key/value store `Redis` which has its own concurrent handling mechanism as a shared value store. This is great because then shared values can be used with any programming language/tool.__


> Redis stores everything in RAM and snapshots to disk (optionally using journaling) and supports master/slave replication to a cluster of instances.

Also there are several other python methods/modules to share objects among processes. Which include `mmap`, `fastners` module etc. They have their own specialities and weaknesses. For example `mmap` is not synchronizing. Therefore multiple updations at the same time may cause data corruptions. But in a usecase like checking for primes we dont need synchronization. Therefore can use mmaps to solve our problem.