# Thread Creation - By extenting Thread Class

In [3]:
import threading 
#inherit Thread class and override init() 
# & run() function:
class mythread(threading.Thread):
    def __init__(self,msg): 
        super(mythread,self).__init__()
        self.msg=msg 
    def run(self): 
        for i in range(5): 
            print(self.msg) 
#Create an object of Thread class to create new thread.
t1 = mythread("Thread1")
t2 = mythread("Thread2") 
#Call start method of Thread class to start thread
t1.start() 
t2.start()
#Once the threads start, the current program also 
#keeps on executing. In order to stop execution of 
#current program until a thread is complete,call join.
t1.join()
t2.join() 
# The current program will first wait for the completion 
#of t1 and then t2. Once, they are finished, the remaining 
#statements of current program are executed.
print("Done!")

Thread1
Thread2
Thread2
Thread2
Thread2
Thread2
Thread1
Thread1
Thread1
Thread1
Done!


# Thread Creation - Without extenting Thread Class

In [33]:
import threading 
def display(msg): 
    for i in range(5): 
        print(msg) 
#Create an object of Thread class to create new thread.
t1 = threading.Thread(target=display, args=("Thread1",)) 
t2 = threading.Thread(target=display, args=("Thread2",)) 
# To start a thread, we use start method of Thread class.
t1.start() 
t2.start()
#Once the threads start, the current program also 
#keeps on executing. In order to stop execution of 
#current program until a thread is complete,call join.
t1.join()
t2.join() 
# The current program will first wait for the completion 
#of t1 and then t2. Once, they are finished, the remaining 
#statements of current program are executed.
print("Done!")

Thread1
Thread1Thread2
Thread2
Thread2
Thread1
Thread1
Thread1

Thread2
Thread2
Done!


# Shared Data Access

In [43]:
import threading 
x = 0
def foo():
    global x
    for i in range(100000): x += 1
        
def bar():
    global x
    for i in range(100000): x -= 1  

t1 = threading.Thread(target=foo) 
t2 = threading.Thread(target=bar) 
t1.start(); t2.start()
t1.join(); t2.join() 
print(x)    # Expected result is 0

29923


# Performance Test - Threads

In [3]:
import datetime
from threading import * 
def countdown(n):
    while n > 0 :
        n = n - 1
count = 10000000
# Sequential Execution
start = datetime.datetime.now()
countdown(count)
end = datetime.datetime.now()
print(end-start)
# Threaded Execution
start = datetime.datetime.now()
t1 = Thread(target= countdown,args=(count/2,))
t2 = Thread(target= countdown,args=(count/2,))
t1.start(); t2.start();
t1.join();t2.join();
end = datetime.datetime.now()
print(end-start)

0:00:00.652738
0:00:01.104947


# MultiProcessing Creation - By extenting Process Class

In [13]:
import multiprocessing
# inherit Process class and override 
#init() & run() function:
class countDown(multiprocessing.Process):
    def __init__(self,count): 
        super(countDown,self).__init__()
        self.count=count 
    def run(self): 
        while self.count > 0: 
            print(self.count)
            self.count -=1
#To create a new process, we create an 
#object of Process class.
if __name__=='__main__':
    p1 = countDown (10)
    p2 = countDown (5) 
    p1.start() ;p2.start()
    p1.join();p2.join() 
print("Done!")

10
9
8
7
6
5
4
5
3
2
4
1
3
2
1
Done!


# MultiProcessing Creation - Without extenting Process Class

In [15]:
from multiprocessing import *
def countDown (count): 
    while count > 0: 
        print(count)
        count -=1
#To create a new process, we create an 
#object of Process class.
if __name__=='__main__':
    p1 = Process(target=countDown, args=(10,)) 
    p2 = Process(target=countDown, args=(5,)) 
    p1.start() ;p2.start()
    p1.join();p2.join() 
    print("Done!")

10
9
8
7
5
6
4
5
3
4
2
3
1
2
1
Done!


# Performance Test - Processes

In [2]:
import datetime
from multiprocessing import * 
def countdown(n):
    while n > 0 :
        n = n - 1
count = 10000000
# Sequential Execution
start = datetime.datetime.now()
countdown(count)
end = datetime.datetime.now()
print(end-start)
# Threaded Execution
start = datetime.datetime.now()
t1 = Process(target= countdown,args=(count/2,))
t2 = Process(target= countdown,args=(count/2,))
t1.start(); t2.start();
t1.join();t2.join();
end = datetime.datetime.now()
print(end-start)

0:00:00.656950
0:00:00.472159


# Process Communication - Pipes

In [37]:
from multiprocessing import *
def func(conn):
    conn.send([42,None,'hello'])
    conn.close()

if  __name__=='__main__':
    parent_conn,child_conn = Pipe()
    p = Process(target=func,args=(child_conn,))
    p.start()
    print(parent_conn.recv())
    p.join()

[42, None, 'hello']


In [3]:
from multiprocessing import *
def process1_send_function(conn, events):
    for event in events:
        conn.send(event)
        print("Sent:",event)
def process2_recv_function(conn):
    while True:
        event = conn.recv()
        if event == "eod":
            print("Event Received: End of Day")
            return
        print(f"Event Received: {event}")
if __name__ == "__main__":
    events = ["get up", "brush your teeth", "shower", "work", "eod"]
    conn1, conn2 = Pipe()
    process_1 = Process(target=process1_send_function, args=(conn1, events))
    process_2 = Process(target=process2_recv_function, args=(conn2,))
    process_1.start(); process_2.start()
    process_1.join(); process_2.join()


Sent: get up
Sent: brush your teeth
Sent: shower
Sent: work
Event Received: get up
Sent: eod
Event Received: brush your teeth
Event Received: shower
Event Received: work
Event Received: End of Day


# Process Communication - Queues

In [49]:
import multiprocessing
def process1_send_function(queue, events):
    for event in events:
        queue.put(event)
        print(f"Event Sent: {event}")
def process2_recv_function(queue):
    while True:
        event = queue.get()
        if event == "eod":
            print("Event Received: End of Day")
            return
        print(f"Event Received: {event}")
if __name__ == "__main__":
    events = ["get up", "brush your teeth", "shower", "work", "eod"]
    queue = multiprocessing.Queue()
    process_1 = multiprocessing.Process(target=process1_send_function, args=(queue, events))
    process_2 = multiprocessing.Process(target=process2_recv_function, args=(queue,))
    process_1.start(); process_2.start()
    process_1.join(); process_2.join()


Event Sent: get up
Event Sent: brush your teeth
Event Sent: shower
Event Received: get up
Event Sent: work
Event Received: brush your teeth
Event Sent: eod
Event Received: shower
Event Received: work
Event Received: End of Day


# Process Synchronization - Locks

In [45]:
from multiprocessing import *
# function to withdraw from account 
def withdraw(balance): 
    for _ in range(10000): 
        balance.value = balance.value - 1 
# function to deposit to account 
def deposit(balance): 
    for _ in range(10000): 
        balance.value = balance.value + 1 
if  __name__=='__main__':
    # initial balance (in shared memory) 
    balance = multiprocessing.Value('i', 100) 
    # creating new processes 
    p1 = Process(target=withdraw, args=(balance,)) 
    p2 = Process(target=deposit, args=(balance,))
    p1.start(); p2.start();
    p1.join(); p2.join();
    print(balance.value)

22


In [44]:
from multiprocessing import *
# function to withdraw from account 
def withdraw(balance,lock): 
    for _ in range(10000): 
        lock.acquire() 
        balance.value = balance.value - 1 
        lock.release() 
# function to deposit to account 
def deposit(balance,lock): 
    for _ in range(10000): 
        lock.acquire()
        balance.value = balance.value + 1 
        lock.release()
if  __name__=='__main__': 
    # initial balance (in shared memory) 
    balance = multiprocessing.Value('i', 100) 
    # creating a lock object 
    lock = multiprocessing.Lock() 
    # creating new processes 
    p1 = Process(target=withdraw, args=(balance,lock)) 
    p2 = Process(target=deposit, args=(balance,lock))
    p1.start(); p2.start();
    p1.join(); p2.join();
    print(balance.value)

100


# Pools

In [16]:
import multiprocessing 
def square(n):
    return n*n
if __name__ == "__main__": 
    mylist = [1,2,3,4] 
    # creating a pool object 
    p = multiprocessing.Pool() 
    # map list to target function 
    result = p.map(square, mylist)

In [17]:
import multiprocessing 
import os 
def square(n):
    b = n
    for i in range(0,100):
        b = n*i
    return 2*n
if __name__ == "__main__": 
    print("Number of Cores :",multiprocessing.cpu_count())
    mylist = list(range(1000000))
    start = datetime.datetime.now()
    for value in mylist:
        square(value)
    end = datetime.datetime.now()
    print(end-start)
    # creating a pool object 
    start = datetime.datetime.now()
    p = multiprocessing.Pool() 
    # map list to target function 
    result = p.map(square, mylist)
    end = datetime.datetime.now()
    print(end-start)

Number of Cores : 72
0:00:04.091704
0:00:01.916500


# Assignment

In [4]:
import numpy as np
from datetime import datetime
import multiprocessing

def image_augmentation(inputlist) :
    
    updated_list = inputlist
    for j in np.arange(100000):
        flipped_image = np.fliplr(updated_list)
        flipped_image1 = np.flipud(updated_list)
        r = np.random.randint(0, 4)    
        rotated_image = np.rot90(updated_list,r)

    return [flipped_image,flipped_image1,rotated_image]
    
if __name__ == "__main__":
    
    height = 16000
    width = 16080
    bytes_per_pixel = 2
    dtype = np.uint16
    header_size = 0
    
    inp_dir='/home/c3test/dpgs/rad/'
    filename = '16711789331-s1-b3-C03-X-rad-before-nuc.rad'
    
    raw_filename = inp_dir + filename
    count = (height * width + header_size) * bytes_per_pixel
    b1 = np.fromfile(raw_filename, dtype=dtype, count=count)
    b1 = b1[header_size // bytes_per_pixel:(header_size // bytes_per_pixel) + height * width]
    b1 = b1.reshape(height, width)
    raw_image = np.float32(b1)
    
    image_parts = []
    for dev in np.hsplit(raw_image, 24):
        Is_dev = np.vsplit(dev, 20)
        image_parts = image_parts + Is_dev
        
    print(len(image_parts))
    start = datetime.now()
    pool = multiprocessing.Pool(processes=32)
    outputs = pool.map(image_augmentation, image_parts)
    end = datetime.now()
    print('===========Time Elapsed==============', end - start)
    print(len(outputs))
    pool.close()

FileNotFoundError: [Errno 2] No such file or directory: '/home/c3test/dpgs/rad/16711789331-s1-b3-C03-X-rad-before-nuc.rad'