MultiProcessing and Multi-threading. 

MultiProcessing and Multi-threading

Multi-threading
* Multiple threads live in the same process in the same space, each thread will do a
  specific task, have its own code, own stack memory, instruction pointer,
  and share heap memory. If a thread has a memory leak it can damage the other threads
  and parent process.
* One problem arises because threads use the same memory heap, multiple threads can
  write to the same location in the memory heap which is why the global
  interpreter lock(GIL) in CPython was created as a mutex to prevent it from happening

MultiProcessing
* In multi-processing different processes have different address space.
  Thus creating multiple processes is costly compare to threads

Multi-threading

Demo 1 - Sequential Processing  

In [None]:
import time
import threading

def calc_square(numbers):
    print("calculate square numbers")
    for n in numbers:
        time.sleep(0.2)
        print('square:',n*n)


def calc_cube(numbers):
    print("calculate cube of numbers")
    for n in numbers:
        time.sleep(0.2)
        print('cube:',n*n*n)

arr = [2,3,8,9]

t = time.time()

calc_square(arr)
calc_cube(arr)

In [None]:
print("done in : ",time.time()-t)

Demo 2 - Initializing a Thread 

In [None]:
import threading
import time

def calc_square(numbers):
    print("calculate square numbers")
    for n in numbers:
        time.sleep(0.3)
        print('square:',n*n)

arr = [2,3,8,9]

t1= threading.Thread(target=calc_square, args=(arr,))

t1.start()

In [None]:
print("Main Thread - Print this line while 'calc_square' is being executed ...")

Demo 3 - Using the join() Method 

In [None]:
import threading
import time

def calc_square(numbers):
    print("calculate square numbers")
    for n in numbers:
        time.sleep(0.3)
        print('square:',n*n)

arr = [2,3,8,9]

t1= threading.Thread(target=calc_square, args=(arr,))

t1.start()
t1.join()

In [None]:
print("Main Thread - Print this line after 'calc_square' was executed ...")

Demo 4 -  Multithreading

In [None]:
import time
import threading

def calc_square(numbers):
    print("calculate square numbers")
    for n in numbers:
        time.sleep(0.2)
        print('square:',n*n)


def calc_cube(numbers):
    print("calculate cube of numbers")
    for n in numbers:
        time.sleep(0.2)
        print('cube:',n*n*n)

arr = [2,3,8,9]

t = time.time()

t1= threading.Thread(target=calc_square, args=(arr,))
t2= threading.Thread(target=calc_cube, args=(arr,))

t1.start()
t2.start()

t1.join()
t2.join()

In [None]:
print("done in : ",time.time()-t)

Demo 5 -  Daemon Threads, Part 1
Presenting the problem 

In [None]:
import threading
import time

total = 4

def creates_items_1():
    global total
    for i in range(10):
        time.sleep(1)
        total += 1
        print('creates_items_1 : added item, total is now {}'.format(total))
    print('---------- creates_items_1 : Done ----------')

creates_items_1()

In [None]:
def creates_items_2():
    global total
    for i in range(7):
        time.sleep(1)
        total += 1
        print('creates_items_2 : added item, total is now {}'.format(total))
    print('---------- creates_items_2 : Done ----------')

creates_items_2()

In [None]:
def limits_items():

    global total
    while True:
        if total > 5:

            print('limits_items : overload')
            total -= 3
            print('limits_items : subtracted 3')
        else:
            time.sleep(1)
            print('limits_items : waiting')

creator1 = threading.Thread(target=creates_items_1)
creator2 = threading.Thread(target=creates_items_2)
limitor = threading.Thread(target=limits_items)

creator1.start()
creator2.start()
limitor.start()

creator1.join()
creator2.join()
limitor.join()

we'll never get to the end of the program 

In [None]:
print('---------- End of Program ----------')

Demo 6 -  Daemon Threads, Part 2
Removing limitor.join()

In [None]:
import threading
import time

total = 4

def creates_items_1():
    global total
    for i in range(10):
        time.sleep(1)
        total += 1
        print('creates_items_1 : added item, total is now {}'.format(total))
    print('---------- creates_items_1 : Done ----------')

creates_items_1()

In [None]:
def creates_items_2():
    global total
    for i in range(7):
        time.sleep(1)
        total += 1
        print('creates_items_2 : added item, total is now {}'.format(total))
    print('---------- creates_items_2 : Done ----------')

creates_items_2()

In [None]:
def limits_items():

    global total
    while True:
        if total > 5:

            print('limits_items : overload')
            total -= 3
            print('limits_items : subtracted 3')
        else:
            time.sleep(1)
            print('limits_items : waiting')

creator1 = threading.Thread(target=creates_items_1)
creator2 = threading.Thread(target=creates_items_2)
limitor = threading.Thread(target=limits_items)

creator1.start()
creator2.start()
limitor.start()

creator1.join()
creator2.join()

limitor.join()

Now this line will be printed, but still  limitor.join() will continue to run endlessly 

In [None]:
print('---------- End of Program ----------')

Demo 7 -  Daemon Threads, Part 3
Adding daemon=True

In [None]:
import threading
import time

total = 4


def creates_items_1():
    global total
    for i in range(10):
        time.sleep(1)
        total += 1
        print('creates_items_1 : added item, total is now {}'.format(total))
    print('---------- creates_items_1 : Done ----------')

creates_items_1()

In [None]:
def creates_items_2():
    global total
    for i in range(7):
        time.sleep(1)
        total += 1
        print('creates_items_2 : added item, total is now {}'.format(total))
    print('---------- creates_items_2 : Done ----------')

creates_items_2()

In [None]:
def limits_items():

    global total
    while True:
        if total > 5:

            print('limits_items : overload')
            total -= 3
            print('limits_items : subtracted 3')
        else:
            time.sleep(1)
            print('limits_items : waiting')


creator1 = threading.Thread(target=creates_items_1)
creator2 = threading.Thread(target=creates_items_2)
limitor = threading.Thread(target=limits_items, daemon=True)

creator1.start()
creator2.start()
limitor.start()

creator1.join()
creator2.join()

limitor.join()

In [None]:
print('---------- End of Program ----------')

MultiProcessing 

Demo 1 - Simple MultiProcessing 

In [None]:
import time
import multiprocessing


def calc_square(numbers):
    for n in numbers:
        time.sleep(1)
        print('square ' + str(n * n))


def calc_cube(numbers):
    for n in numbers:
        time.sleep(1)
        print('cube ' + str(n * n * n))


if __name__ == "__main__":
    arr = [2, 3, 8, 16, 32, 41, 53]
    p1 = multiprocessing.Process(target=calc_square, args=(arr,))
    p2 = multiprocessing.Process(target=calc_cube, args=(arr,))

    p1.start()
    p2.start()

    p1.join()
    p2.join()

    print("Done!")

Demo 2 - Store the result in a global variable

In [None]:
import time
import multiprocessing


square_result = []


def calc_square(numbers):
    global square_result
    for n in numbers:
        time.sleep(1)
        print('square ' + str(n * n))
        square_result.append(n*n)
    print("Outside the process - square_result : {}".format(square_result))


if __name__ == "__main__":
    arr = [2, 3, 8, 16]
    p1 = multiprocessing.Process(target=calc_square, args=(arr,))

    p1.start()
    p1.join()

    print("Done!")
    print("Outside the process - square_result : {}".format(square_result))

Demo 3 - Share Data 
Option 1 - Shared Array

In [None]:
import multiprocessing


def calc_square(numbers, result):
    for idx, n in enumerate(numbers):
        result[idx] = n*n


if __name__ == "__main__":
    numbers = [2,3,5]
    result = multiprocessing.Array('i',3) # i stands for integer, d for float (double)
    p = multiprocessing.Process(target=calc_square, args=(numbers, result))

    p.start()
    p.join()

    print(list(result))

Demo 4 - Share Data 
Option 2 - Shared Value

In [None]:
import multiprocessing


def calc_square(numbers, shared_array, shared_value):
    shared_value.value = 3.54
    for idx, n in enumerate(numbers):
        shared_array[idx] = n*n


if __name__ == "__main__":
    numbers = [2,3,5]
    shared_array = multiprocessing.Array('i',3) # i stands for integer, d for float (double)
    shared_value = multiprocessing.Value('d', 0.0)
    p = multiprocessing.Process(target=calc_square, args=(numbers, shared_array, shared_value))

    p.start()
    p.join()

    print(list(shared_array))
    print(shared_value.value)

Demo 5 - Share Data 
Option 3 - Shared Queue

In [None]:
import multiprocessing


def calc_square(numbers, q):
    for n in numbers:
        q.put(n*n)


if __name__ == "__main__":
    numbers = [2,3,5]
    q = multiprocessing.Queue()
    p = multiprocessing.Process(target=calc_square, args=(numbers,q))

    p.start()
    p.join()

    while q.empty() is False:
        print(q.get())

Demo 6 - Locks
Introducing the problem 

In [None]:
import time
import multiprocessing

def deposit(balance, lock):
    for i in range(100):
        time.sleep(0.01)
        balance.value = balance.value + 1


def withdraw(balance, lock):
    for i in range(100):
        time.sleep(0.01)
        balance.value = balance.value - 1

if __name__ == '__main__':
    balance = multiprocessing.Value('i', 200)
    lock = multiprocessing.Lock()
    d = multiprocessing.Process(target=deposit, args=(balance,lock))
    w = multiprocessing.Process(target=withdraw, args=(balance,lock))
    d.start()
    w.start()
    d.join()
    w.join()
    print(balance.value)

Demo 7 - Locks
lock.acquire() & lock.release()

In [None]:
import time
import multiprocessing

def deposit(balance, lock):
    for i in range(100):
        time.sleep(0.01)
        lock.acquire()
        balance.value = balance.value + 1
        lock.release()

def withdraw(balance, lock):
    for i in range(100):
        time.sleep(0.01)
        lock.acquire()
        balance.value = balance.value - 1
        lock.release()

if __name__ == '__main__':
    balance = multiprocessing.Value('i', 200)
    lock = multiprocessing.Lock()
    d = multiprocessing.Process(target=deposit, args=(balance,lock))
    w = multiprocessing.Process(target=withdraw, args=(balance,lock))
    d.start()
    w.start()
    d.join()
    w.join()
    print(balance.value)

Demo 8 - Locks
with lock 

In [None]:
import time
import multiprocessing

def deposit(balance, lock):
    for i in range(100):
        time.sleep(0.01)
        with lock:
            balance.value = balance.value + 1

def withdraw(balance, lock):
    for i in range(100):
        time.sleep(0.01)
        with lock:
            balance.value = balance.value - 1

if __name__ == '__main__':
    balance = multiprocessing.Value('i', 200)
    lock = multiprocessing.Lock()
    d = multiprocessing.Process(target=deposit, args=(balance,lock))
    w = multiprocessing.Process(target=withdraw, args=(balance,lock))
    d.start()
    w.start()
    d.join()
    w.join()
    print(balance.value)

Demo 9 - Parallel Processing (MultiProcessing Pool)
Utilizing a single cpu  

In [None]:
my_arr = [1,2,3,4,5]
output = []

def f(n):
    return n*n

for number in my_arr:
    output.append(f(number))

In [None]:
print(output)

Demo 10 - Parallel Processing (MultiProcessing Pool)
Parallel Processing

In [None]:
from multiprocessing import Pool
import time

def f(n):
    # time.sleep(1) sleep to watch the processes in Task Manager
    return n * n

if __name__ == "__main__":
    p = Pool(processes=3)
    result = p.map(f, [1, 2, 3, 4, 5])
    for n in result:
        print(n)
    p.close()
    p.join()
    print("End of program")

Demo 11 - Parallel Processing (MultiProcessing Pool)
Performance Test

In [None]:
from multiprocessing import Pool
import time

def f(n):
    return n * n

arr = list(range(10000000))

if __name__ == "__main__":
    p = Pool(processes=8)
    t = time.time()
    result = p.map(f, arr)
    print("Time to complete : {}".format(time.time() - t))
    p.close()
    p.join()
    print("End of program")