# Multi-Processing and Multi-Threading
![Multi-processing v Multi-threading](https://miro.medium.com/v2/resize:fit:720/format:webp/1*FSEGozbKSrPGlBDf4Dl0Zg.jpeg)

In [2]:
# system specifications
! lscpu

Architecture:                    x86_64
CPU op-mode(s):                  32-bit, 64-bit
Byte Order:                      Little Endian
Address sizes:                   46 bits physical, 48 bits virtual
CPU(s):                          4
On-line CPU(s) list:             0-3
Thread(s) per core:              2
Core(s) per socket:              2
Socket(s):                       1
NUMA node(s):                    1
Vendor ID:                       GenuineIntel
CPU family:                      6
Model:                           79
Model name:                      Intel(R) Xeon(R) CPU @ 2.20GHz
Stepping:                        0
CPU MHz:                         2200.160
BogoMIPS:                        4400.32
Hypervisor vendor:               KVM
Virtualization type:             full
L1d cache:                       64 KiB
L1i cache:                       64 KiB
L2 cache:                        512 KiB
L3 cache:                        55 MiB
NUMA node0 CPU(s):               0-3
Vulnerability 

- 4 CPUs
- 2 Core on each CPU
- 2 Threads on each Core

## Task: Compute Mean on 1 Billion records

Computation of mean: 

Say we have an array `arr` containing 100 numbers

1. Standard way to compute mean <br>
`mean = sum(arr) / 100`

2. Distributing the computation <br>
`m1 = sum(arr[0:51]) / 50` <br>
`m2 = sum(arr[51:]) / 50` <br>
`mean = (m1 + m2) / 2`

Notice in 2nd way we are still computing the mean of entire array, but we divided the computation in 2 jobs.

The distribution method employed in this case is known as ‘Trivially Parallelize’, which refers to the ease with which the task was distributed among multiple subtasks. However, it is important to note that not all tasks are as straightforward to parallelize, and some may not be parallelizable at all.

In [3]:
# Sample Data
import numpy as np
import time
n = 1000000000 # 1 billion
np.random.seed(41)
arr = np.random.randint(0,10,n)
print(len(arr))

1000000000


# Simple Execution (Single core, no threading)

In [4]:
def mean(a):
    n = len(a)
    sum = 0
    for elem in a:
        sum += elem

    mean_ = sum/n
    return mean_

start_time = time.time() # starttime

data_mean = mean(arr)
print(data_mean)

print(f"execution time: {round(time.time() - start_time, 2)} sec") # endtime

4.499927063
execution time: 97.54 sec


# Multi-Processing

### Multi-processing is a way to distribute your work/computation among multiple cores of your CPU, resulting in simultaneuos execution of multiple jobs.

Multiprocessing and parallelization are related concepts, but they are not the same thing. Multiprocessing is a technique that allows a program to use multiple processes to perform tasks concurrently.

On the other hand, parallelization refers to the ability of a program to perform multiple tasks simultaneously.

In other words, multiprocessing is a specific implementation of parallelization. A program can be parallelized without using multiprocessing, but multiprocessing is one way to achieve parallelism.

In [5]:
!pip install multiprocess -q

In [6]:
from multiprocess import Process, Queue

def mean_mp(a, s, e, q):
    n = e+1
    sum = 0
    for elem in a[s:n]:
        sum += elem

    mean_ = sum/(n-s)
    q.put(mean_)

n_processes = 2 # number of cores you want to utilize on the cpu

'''
    The Queue object is a special kind of object that is process-safe and can be used to store data that needs to be globally available to all cores.
    Each core on your CPU has its own registers, and the data stored in these registers cannot be shared by other cores. 
    Therefore, the object must be stored in RAM, which is a shared memory accessible by all cores.
'''
q = Queue()

n1 = n//n_processes

p1 = Process(target=mean_mp, args=(arr,0,n1,q))
p2 = Process(target=mean_mp, args=(arr,n1+1,n-1,q))

start_time = time.time() # starttime

p1.start()
p2.start()

# for parent process
p1.join() # wait for p1 to complete
p2.join() # wait for p2 to complete

# extract the means from both processes stored in the Queue and compute mean
mean_s = []
while not q.empty():
    mean_s.append(q.get())

data_mean = np.sum(mean_s) / n_processes
print(data_mean)

print(f"execution time: {round(time.time() - start_time, 2)} sec") # endtime

4.499927062999671
execution time: 62.6 sec


It took half the time it required with 1 core which is ideal, becuase as we doubled the cores, the execution time is havled. Though it will not be always so visible but it will definitely reduce the time if the cores are available.

## Multi-Processing using Joblib

In [7]:
from joblib import Parallel, delayed

n_processes = 2
n1 = n//n_processes

start_time = time.time() # starttime

#only minimal changes in the code, by using joblib. Notice we are using original mean function
mean_s = Parallel(n_jobs=n_processes)(delayed(mean)(arr[i:i+n1]) for i in range(0,n,n1))

data_mean = np.sum(mean_s) / n_processes
print(data_mean)

print(f"execution time: {round(time.time() - start_time, 2)} sec") # endtime

4.499927063
execution time: 320.78 sec


One possible reason why our 2nd code - which ideally should be doing the same thing as the above code written using multiprocessing module - takes more time than the 1st one is that joblib has some overhead in creating and managing the worker processes, as well as serializing and deserializing the arguments and results. This overhead can be significant if the tasks are relatively small or simple, as in your case. Another possible reason is that joblib does not use shared memory or memory mapping by default, but copies the data to each worker process. This can increase the memory usage and the communication cost between processes.

Parallelism Model: joblib uses a different parallelism model compared to multiprocess. joblib uses a "fork" approach, where the Python interpreter is cloned along with the parent process. This can have different performance characteristics compared to the "spawn" approach used by multiprocess.

https://stackoverflow.com/questions/57706763/why-does-joblib-parallel-take-much-more-time-than-a-non-paralleled-computation

# Multi-Threading

### Multi-threading is a way to perform your task concurrently among multiple threads of your CPU's core, resulting in concurrent execution of your code.

Concurrency and multithreading are related concepts, but they are not the same thing. Concurrency refers to the ability of a program to perform multiple tasks simultaneously, while multithreading is a technique that allows a program to use multiple threads of execution to perform tasks concurrently.

In other words, concurrency is a broader concept that encompasses multithreading. A program can be concurrent without being multithreaded, but a multithreaded program is always concurrent.

In [8]:
from threading import Thread

def mean_mt(a, s, e, n_thread, mean_s):
    n = e+1
    sum = 0
    for elem in a[s:n]:
        sum += elem

    mean_ = sum/(n-s)
    mean_s[n_thread] = mean_

n_threads = 2 # number of threads you want to utilize on a core

'''
    Queues can be used in this scenario as well, as they provide a means of storing data that is accessible to all threads. 
    Queues are stored in RAM and are accessible to all threads running on the CPU's core. 
    Since all threads running on the same core they anyway share the same memory, hence any object can be used, regardless of whether 
    it is stored in RAM or in a register. 
    There are also thread-safe objects that can be used specifically for threads, but that can be avoided for now.
'''
mean_s = [0]*n_threads

n1 = n//n_threads

t1 = Thread(target=mean_mt, args=(arr,0,n1,0,mean_s))
t2 = Thread(target=mean_mt, args=(arr,n1+1,n-1,1,mean_s))

start_time = time.time() # starttime

t1.start()
t2.start()

# for parent process
t1.join() # wait for t1 to complete
t2.join() # wait for t2 to complete

#compute mean
data_mean = np.sum(mean_s) / n_threads
print(data_mean)

print(f"execution time: {round(time.time() - start_time, 2)} sec") # endtime

4.499927062999671
execution time: 99.98 sec


We can do multi-threading in this machine, then why the computation is not faster? <br>

Becuase the code is not suited for multi-threading, because to really multi-thread the code should have some i/o bound i.e., their should be some time where cpu is idle to run another code in the mean time when code 1 is performing i/o task. (Multi-Threading is similar to Concurrency). <br>

Here the entire code is cpu bound, hence the cpu will never be idle to go back and forth to perform another tasks.

i/o bound means that time when your system has to make some request and your cpu is idle waiting for the request to complete.

To make use of multi-threading your code should not be entirely CPU Bound.

In [9]:
'''
    Let's take an example where we have some i/o bound operations
'''
def func(a):
    time.sleep(5) #this line is NOT cpu bound, meaning the cpu will be idle and has to wait for this to complete. We can take advantage of this using threading
    return

n_threads = 2
n1 = n//n_threads

print("w/o threading")
start_time = time.time() # starttime

for i in range(2):
    func(i)
    
print(f"execution time: {round(time.time() - start_time, 2)} sec") # endtime

print("\nw/ threading")
start_time = time.time() # starttime

Parallel(n_jobs=n_threads, prefer='threads')(delayed(func)(i) for i in range(2))

print(f"execution time: {round(time.time() - start_time, 2)} sec") # endtime

w/o threading
execution time: 10.01 sec

w/ threading
execution time: 5.02 sec


Notice without threading the time took to complete the execution was 10 sec, but with threading, the cpu took advantage of that 5 sec to wait for code 1 to complete its i/o bound execution, and ran code 2 in the mean time, resulting in cutting the execution time by a whopping 50%.

With the cost of 1 iteration, it completed 2 iterations.

## Multi-Threading using Joblib

In [10]:
n_threads = 2
n1 = n//n_threads

start_time = time.time() # starttime

#only minimal changes in the code, by using joblib. Notice we are using original mean function
mean_s = Parallel(n_jobs=n_threads, prefer='threads')(delayed(mean)(arr[i:i+n1+1]) for i in range(0,n,n1+1))

data_mean = np.sum(mean_s) / n_threads
print(data_mean)

print(f"execution time: {round(time.time() - start_time, 2)} sec") # endtime

4.499927062999671
execution time: 100.7 sec


# Caching using Joblib

The function caching store the input-output of the function as a key-value pair in memory, so to not always run the function for the same input passed before.

In [11]:
# !rm -r joblib

In [12]:
from joblib import Memory
cachedir = "./"
mem = Memory(cachedir)

def func(a):
    time.sleep(5) # deliberate; consider this to be some computation
    return a+1

func_c = mem.cache(func)

print("w/o Cache")
start_time = time.time() #startime
print(func_c(10)) # require time
print(f"execution time: {round(time.time() - start_time, 2)} sec") # endtime

print()
print("w/ Cache")

start_time = time.time() #startime
print(func_c(10)) # NO time required - present in cache
print(f"execution time: {round(time.time() - start_time, 2)} sec") # endtime

print()

start_time = time.time() #startime
print(func_c(20)) # require time
print(f"execution time: {round(time.time() - start_time, 2)} sec") # endtime

w/o Cache
________________________________________________________________________________
[Memory] Calling __main__--tmp-ipykernel-631066686.func...
func(10)
_____________________________________________________________func - 5.0s, 0.1min
11
execution time: 5.01 sec

w/ Cache
11
execution time: 0.0 sec

________________________________________________________________________________
[Memory] Calling __main__--tmp-ipykernel-631066686.func...
func(20)
_____________________________________________________________func - 5.0s, 0.1min
21
execution time: 5.01 sec


The 1st time it took about 5 secs, but as the function is cached and for the input a=10 the cache was available, the time required for 2nd call is just retrieving the data from the cache.

If we will pass another input which is not present in the cache to be mapped, then it will have to execute the entire funciton and store the input-output as a key-value pair in the cache.

# End

![](https://media.giphy.com/media/mPKa6OI5oRsmextwBq/giphy.gif)