In [1]:
import multiprocessing
num_cores = multiprocessing.cpu_count()
print("{} cores available".format(num_cores))

6 cores available


## Pool
a convenient and simple way to parallelize the execution of a function across multiple input values

In [2]:
import os
import time
from multiprocessing import Pool

def square(x):
    print("PID {} is processing input {}".format(os.getpid(), x))
    return x ** 2

# define numbe of processes created equal to the number of cores
p = Pool(processes=num_cores)

# map the inputs to processes
result = p.map(square, range(6))

time.sleep(0.1)
print(result)

PID 26692 is processing input 0
PID 26693 is processing input 1
PID 26694 is processing input 2
PID 26699 is processing input 5
PID 26696 is processing input 4
PID 26695 is processing input 3
[0, 1, 4, 9, 16, 25]


## Thread vs Process

Thread:
- A new thread is created within the existing process
- Memory is shared amongs all threads within a process
- One GIL for all threads within a process

Process:
- A new process is started independent from the main process
- Each process has its own memory space
- One GIL for each process

In [3]:
# just some time-consuimg function
def calc_cube():
    print("function is executed on pid: {}".format(os.getpid()))
    for i in range(0, 1000000):
        i ** 3

In [4]:
from datetime import datetime
import threading

print("pid of parent process: {}".format(os.getpid()))
t_start = datetime.now()

threads = []
for i in range(num_cores):
    print("creating thread {}".format(i))
    threads.append(threading.Thread(target=calc_cube))

for thread in threads:
    thread.start()

for thread in threads:
    thread.join()
    
t_end = datetime.now()

print("time taken: {}".format(t_end-t_start))

pid of parent process: 26678
creating thread 0
creating thread 1
creating thread 2
creating thread 3
creating thread 4
creating thread 5
function is executed on pid: 26678
function is executed on pid: 26678
function is executed on pid: 26678
function is executed on pid: 26678
function is executed on pid: 26678
function is executed on pid: 26678
time taken: 0:00:01.457958


In [5]:
from multiprocessing import Process

print("pid of parent process: {}".format(os.getpid()))

t_start = datetime.now()

processes = []
for i in range(num_cores):
    print("creating process {}".format(i))
    processes.append(Process(target=calc_cube))
    
for process in processes:
    process.start()
    
for process in processes:
    process.join()
    
t_end = datetime.now()

print("time taken: {}".format(t_end-t_start))

pid of parent process: 26678
creating process 0
creating process 1
creating process 2
creating process 3
creating process 4
creating process 5
function is executed on pid: 26722
function is executed on pid: 26725
function is executed on pid: 26729
function is executed on pid: 26732
function is executed on pid: 26733
function is executed on pid: 26728
time taken: 0:00:00.264661


## Inter Process Communication
1. write/read file
2. shared value/array
3. queue/pipe

In [6]:
# using shared memory method
from multiprocessing import Process, Value, Array

def ipc(input_arr, result_arr, val):
    # use Value.value to modify
    val.value = 3.14
    
    # access Array based on index
    for idx, num in enumerate(input_arr):
        result_arr[idx] = num ** 2


input_num = list(range(10))

# i -> int, d -> double
result = Array('i',10)
val = Value('d', 0.0)

p = Process(target=ipc, args=(input_num, result, val))
p.start()
p.join()

print("Value:", val.value)
print("Array:", result[:])

Value: 3.14
Array: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]


In [7]:
# using Queue (First In First Out)
from multiprocessing import Queue

def calc_square_q(input_arr, q):
    for num in input_arr:
        q.put(num ** 2)


input_num = list(range(10))

q = Queue()
p = Process(target=calc_square_q, args=(input_num, q))
p.start()
p.join()

while not q.empty():
    print(q.get(), end=" ")

0 1 4 9 16 25 36 49 64 81 

In [8]:
# using Pipe (better performance but blocking)
from multiprocessing import Pipe

def calc_square_p(input_arr, p):
    for num in input_arr:
        p.send(num ** 2)
    p.send("end")

input_num = list(range(10))

# a pipe has two ends
p_parent, p_children = Pipe()
p = Process(target=calc_square_p, args=(input_num, p_children))
p.start()

while True:
    received = p_parent.recv()
    if received == 'end':
        break
    else:
        print(received, end=" ")
        
p.join()

0 1 4 9 16 25 36 49 64 81 

## Lock

In [9]:
# Bank account example - no lock
# behavior cannot be determined, due to the concurrent operation on shared variable 
def deposit(balance):
    for i in range(1000):
        time.sleep(0.001)
        balance.value = balance.value + 1

def withdraw(balance):
    for i in range(1000):
        time.sleep(0.001)
        balance.value = balance.value - 1


my_balance = Value('i', 0)
print("starting balance: {}".format(my_balance.value))

d = Process(target=deposit, args=(my_balance,))
w = Process(target=withdraw, args=(my_balance,))

d.start()
w.start()
d.join()
w.join()

print("end balance: {}".format(my_balance.value))

starting balance: 0
end balance: 8


In [10]:
# Bank account example - with lock
from multiprocessing import Lock

def deposit(balance, lock):
    for i in range(1000):
        time.sleep(0.001)
        lock.acquire()
        balance.value = balance.value + 1
        lock.release()

def withdraw(balance, lock):
    for i in range(1000):
        time.sleep(0.001)
        lock.acquire()
        balance.value = balance.value - 1
        lock.release()

my_balance = Value('i', 0)
print("starting balance: {}".format(my_balance.value))

lock = multiprocessing.Lock()
d = Process(target=deposit, args=(my_balance, lock))
w = Process(target=withdraw, args=(my_balance, lock))

d.start()
w.start()
d.join()
w.join()

print("end balance: {}".format(my_balance.value))

starting balance: 0
end balance: 0
