In [1]:
#Several ML libraries use multithreading and multiple cores like Scikit Learn, they use a library called joblib
# Nympy and Scipy use a library called BLAS (basic linear algebra for subroutines) for achieving multi threading

# Number of CPUs and Threads in Linux.
# Refer: https://linux.die.net/man/1/lscpu
!lscpu


Architecture:        x86_64
CPU op-mode(s):      32-bit, 64-bit
Byte Order:          Little Endian
CPU(s):              8
On-line CPU(s) list: 0-7
Thread(s) per core:  2
Core(s) per socket:  4
Socket(s):           1
NUMA node(s):        1
Vendor ID:           GenuineIntel
CPU family:          6
Model:               142
Model name:          Intel(R) Core(TM) i5-8265U CPU @ 1.60GHz
Stepping:            11
CPU MHz:             2854.352
CPU max MHz:         3900.0000
CPU min MHz:         400.0000
BogoMIPS:            3600.00
Virtualization:      VT-x
L1d cache:           32K
L1i cache:           32K
L2 cache:            256K
L3 cache:            6144K
NUMA node0 CPU(s):   0-7
Flags:               fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe syscall nx pdpe1gb rdtscp lm constant_tsc art arch_perfmon pebs bts rep_good nopl xtopology nonstop_tsc cpuid aperfmperf pni pclmulqdq dtes64 monitor ds_cpl vmx est tm2 ssse3 sdb

In [None]:
Mean of 100 Million observations

In [3]:
# Generate random 100MM data points 
import numpy as np
n =100000000
d = np.random.rand(n)
print(d.shape)

(100000000,)


In [4]:
#runs on 1 core on the cpu
import time
def mean():

  #Sum using for loops. We can use inbuilt NumPy Sum opeartion for better speed.
  sum = 0
  n=d.size
  for i in range(n):
    sum +=d[i]

  #Mean
  mean = sum/n
  return mean


#Time the execution
start_time = time.time()
m = mean() # compute mean of 100MM numbers.
end_time = time.time()
print (end_time-start_time) #25sec
print(m)

25.498055458068848
0.5000022375926746


In [5]:
#Multi processing and Multi Core

#refer image 1 on how multi threading can be used to compute the mean of 100mn nos
# this is a trivially multi processing task

#Refer: https://docs.python.org/3/library/multiprocessing.html

#we need 2 processes (p1, p2), where each process runs on each core, we can use the no of cores present on the machine

from multiprocessing import Process, Queue #Queue is provided by the multiprocessing library to communicate b/w different processes
import math

def mean_MP(s, e, q ): #s: start index, e: end index, q: queue

  #Sum using for loops. We can use inbuilt NumPy Sum opeartion for better speed.
  sum = 0
  for i in range(s,e+1):
    sum +=d[i]

  #Mean
  mean = sum/(e-s+1)
  q.put(mean) #enqueue the mean value
  return 

n1 = math.floor(n/2)

#Queues follow a FIFO approach, they can be entered from the start (enqueue) and removed from the start (dequeue)
#compute mean of half of the nos in core 1 in a process 1 and enqueue the mean m1 in the queue
#compute mean of rest of half of the nos in core 2 in a process 2 and enqueue the mean m2 in the queue
#refer image 3

#Queues are thread and process safe.
# This means that if process 1 is trying to insert m1 and process 2 is trying to insert m2 into the queue at the same time - Inter process communication
# But the Queue in multiprocessin lib is written in such a way that multiple processes can write to the queue, without worrying about multiple processes writing to the queue
#add mean of process 1 to the queue
#add mean of process 2 to the queue
#queue is a shared resource to which processes can add data

q = Queue() #Queues are thread and process safe. For communicating between processes and threads.

p1 = Process(target=mean_MP, args=(0, n1,q ))  #target is the function which we wnt to execute, args: args to the function
p2 = Process(target=mean_MP, args=(n1+1,n-1, q)) 


#Time the execution
start_time = time.time()

#The execution of this whole process is also carried out by a Process which we can call a Parent Process
# This parent process has 2 child processes p1 and p2
# All the Processes have data in their respective memory location and Processes also have some code
# The parent process say has d (dataset=100mn) in memory and also some code
# When the parent process runs, it spawns the 2 child processes p1 and p2 and it copies the variables/data (d) in the child processes p1 and p2
# each child process has its own distinct memory, they don't know each other's memory
# Each child process is execution the function mean_MP
# Child processes communicate via the Queue - they read/write into the queue
# p1 will process data from 0 to n1
#p2 will process data from n1+1 to n-1
#refer image 3
# the parent process will be running on 1 of the cores of the computer
# whenever the parent spawns achild process, the process gets assigned (scheduled) to a core in the computer by the OS
# This process of assigning a process along with its data and code to a core is called Process Scheduling
#There is some overhead or additional time taken by the OS to spawn a process and do Process scheduling
# While the 2 child processes are running the parent process sleeps and wakes up once p1 and p2 are completed

p1.start() # process 1 starts 
p2.start() # process 2 starts 

p1.join() # Parent process will Wait till p1 finishes
p2.join() 

m=0;
while not q.empty():
     m += q.get() #get hings from the queue until there are items from the queue m = m1 + m2

m /= 2; #take average
    
end_time = time.time()

print (end_time-start_time) #takes nearly half time  (includes OS time overhead)
print(m)

#In Multiprocessing, the data of the parent process is copied to the child processes, which adds to the memory overhead, but there are optimizations like the Queue like datastructers which can be efficient to store memory in a common ds to be accessible by the parent and the children. This will be at the cost of accessing these DS from the parent and child processes. Default MP duplicates memory in parent and children

12.733050346374512
0.5000022375930983


In [6]:
#Multi Threading

# CPU bound job: Jbbs which take a lot of time of the CPU eg: Numerical computations. These types of jobs are readily available to the CPU from RAM and CPU cache (registers)
# I/O Bound job: To run a type of job, where we need to send some data from Disk(I/O device) to RAM and then RAM to CPU and it takes more time in the transfer to the CPU is I/O Bound (Numerical jobs can also sometimes be I/O bound, incase when there is a large dataset, because they need to read data)

# In each core, there can only be 1 process at a time having its own memory and code, say for instance the code while executing requires some extrnal intervention say it needs to read data from the disk, at this time the CPU core will be idle, to utilize this idle time, Intel introduced the idea of multithreading, each process in the core will have 2 threads sharing the memory and code. When 1 thread is idle, the next will execute. This is the concept of Multithreading. Threads share the sae data between processes. They are lightweight and do faster context switching

# Context switching is the time taken by the parent process to spawn child processes and copy its data into the child processes and assign each process to a CPU core
# In threads, both the threads are on the same core sharing the same memory, it just needs to change the code which is being executed. Therefore in multi threading context switching is significantly faster. Multi threading is significantly useful if some threads are CPU bound and some are I/O bound

#Refer: https://docs.python.org/3/library/threading.html
from threading import Thread


means = [0,0];

def mean_MT(s, e, threadNum ):

  #Sum using for loops. We can use inbuilt NumPy Sum opeartion for better speed.
  sum = 0
  for i in range(s,e+1):
    sum +=d[i]

  #Mean
  mean = sum/(e-s+1) 

  #compute mean of 0th thread and add to mean[0]
  #compute mean of 1st thread and add to mean[1]
  means[threadNum] = mean; # means is a shared varibale between the threads can be accessed by both - instead of Queue in case of multi processing
  #refer image 4

  return 

n1 = math.floor(n/2)

t1 = Thread(target=mean_MT, args=(0, n1,0 ))  #syntax simiar to Process, Third param is the thread number
t2 = Thread(target=mean_MT, args=(n1+1,n-1,1)) # Third param is the thread number

#Time the execution
start_time = time.time()

t1.start()
t2.start()

t1.join() # Wait till t1 finishes
t2.join() 

m = (means[0]+means[1])/2 #compute avg
    
end_time = time.time()
print (end_time-start_time)
print(m)

20.910609006881714
0.5000022375930983


In [None]:
#Not much difference with the 1 process 1 thread computation and 1 process 2 threads computation
This is because in the case with 1 process 2 threads the data d is already in the memory - no need to fetch any data, both the threads are CPU bound, no I/O needed, therefore the advantage of multi threading is very low in this case

There is a problem called GIL with Python which slows down processes of multithreading. Python came up after multithreading was mainlly introduced

Global Interpreter Lock -> worst part of Python multithreading
Say there are 2 threads T1 and T2 sharing the same memory and code under the same process
They both use the same Python interpreter

GIL says that, if T1 thread is using the Python Interpreter to execute the code (same or different code), T2 can't use the python interpreter to execute its code
This is because they are under the same process so they have to share the Python Interpreter between them
T1 has to relinquish (give up) the control of the python interpreter before T2 can execute any of its code
If T1 is waiting for some I/O at that time T1 will give up its Python Interpreter and T2 will acquire it
This is the principle of GIL, when T1 is using the interpreter it is locking control on the Interpreter such that others can't access it
This is one of the biggest problems in Python multithreaded code
refer image 5

The reason Python does this is because Python's memory Management is not thread safe.

In [None]:
To write multi processing or multi threading code we can use Process, Thread and Queues, but they are very low level functionality given by Python

We can instead use a library called Joblib which is extensively used by python libs like Scikit learn for parallizing their code
if we are not sure about writing the multi processing or multi threading code using Process, Thread and Queues, we can instead use Joblift which provides easy mechanisms

One of the most used eg of Joblib is paralellizing for loops, bcoz for loops tae most time

In [None]:
Jobib
- Simple parallel computing in python (for loops)
- Disk caching of function outputs
- Widely used by Scikit learn

In [8]:
#- Disk caching of function outputs
#Transparent and fast disk-caching of output value
# Refer: https://joblib.readthedocs.io/en/latest/
from joblib import Memory
cachedir = './' #caching dir
mem = Memory(cachedir) #create some memory loc in the cache dir

import numpy as np
a = np.vander(np.arange(3)).astype(np.float) #create some var a
square = mem.cache(np.square) # we want to use the np.square fn as part of the memory cache, ie whenever we compute square we want to cache it
b = square(a) #square is executed on matrix a and the output b is cached in the memory location provided

#joblib stores the mapping of a -> np.square -> b in a dict like structure
# when we again call the same function, joblib checks if we have this already created, if we have it then don't reexecute it and fetch it directly
# joblib is caching function output values, storing it in disk, thereby storing it for further evaluation


________________________________________________________________________________
[Memory] Calling square...
square(array([[0., 0., 1.],
       [1., 1., 1.],
       [4., 2., 1.]]))
___________________________________________________________square - 0.0s, 0.0min


In [9]:
c = square(a)
# The above call did not trigger an evaluation

In [10]:
# Simple Parallel programming for Loops
# Refer: https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html

import time
from math import sqrt # inbuilt fucntion

def f(i):
    
    # some computations  that take time, can do any computation which takes time
    x=10000
    p =1;
    for j in range(x):
        for k in range(j):
            p *= k
    
    return sqrt(i ** 2);

# Find sqrt of first n numbers
n=10;

start_time = time.time()

for i in range(n): #call function 10 times with 10 different inputs
    f(i)

end_time = time.time()
print (end_time-start_time)

21.691550493240356


In [11]:
from joblib import Parallel, delayed

start_time = time.time()

a = Parallel(n_jobs=2)(delayed(f)(i) for i in range(n)) #run this job parallely sing multi processing - 2 processes, we want to run the function f, which takes arguments from i from i = 0 to 9 (n=10), ie we want to run f(i) where i = 0-9 parallely with 2 processes

# Why we need delayed(): https://stackoverflow.com/questions/42220458/what-does-the-delayed-function-do-when-used-with-joblib-in-python
# Delayed waits for the the i values from 0-9 to be ready before executing f(i) parallely

end_time = time.time() #nearly 50% time saver
print (end_time-start_time)

#when we have nested for loops, we can use this structure
# jobblib expects that the function f(i) to execute is independent, one output does not depend on other

10.973873138427734


In [12]:
# Multi threading: GIL is an issue
start_time = time.time()

a = Parallel(n_jobs=2,prefer="threads")(delayed(f)(i ** 2) for i in range(n)) #1 process 2 threads, prefer="Threads"

end_time = time.time() #no significant improvement bcoz we have CPU bound jobs and Python's GIL issue
print (end_time-start_time)

19.127604246139526


In [13]:
# 6 jobs

from joblib import Parallel, delayed

start_time = time.time()

a = Parallel(n_jobs=6)(delayed(f)(i ** 2) for i in range(n)) 

# Why we need dealyed(): https://stackoverflow.com/questions/42220458/what-does-the-delayed-function-do-when-used-with-joblib-in-python

end_time = time.time() #includes OS time overhead
print (end_time-start_time)

7.556748390197754


In [None]:
#Multiprocessing can be efficiently used in python. Multithreading may not be efficient if jobs are CPU bound and the GIL issue

In [None]:
joblib Useful in paralleizing Matrix and vector processing and data pre processing also in building ML models like Logistic Regression

In [None]:
When the number of processes/jobs exceed the no of cores, at that time the OS will handle process scheduling such that all processes are orchestrated

In [None]:
When the file is very large in the disk, break the file up into 2 and assign each file path to a process to execute
refer image 6 