# Threading

## Fibonacci Example

In [1]:
#coding: utf-8
import logging, threading

from queue import Queue

logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(message)s')

ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch.setFormatter(formatter)
logger.addHandler(ch)

# fibo_dict: store each integer (provided as an input) as a key, and its respective key values will be the Fibonacci series values calculated. 
fibo_dict = {}
# shared_queue: the container of our shared data among threads that calculate the Fibonacci series and the thread that inserts elements in the Queue object.
shared_queue = Queue()
input_list = [3, 10, 5, 7]
# queue_condition: synchronize the access to resources according to a speci c condition.
queue_condition = threading.Condition()


def fibonacci_task(condition):
    """
    The fibonacci_task function receives the condition object as an argument that will 
    control the fibonacci_task access to `shared_queue`. 
    """
    with condition:
        while shared_queue.empty(): 
            logger.info("[%s] - waiting for elements in queue..." % threading.current_thread().name)
            condition.wait()
        else:
            value = shared_queue.get()
            a, b = 0, 1
            for item in range(value):
                a, b = b, a + b
                fibo_dict[value] = a
            shared_queue.task_done()
            logger.debug("[%s] fibonacci of key [%d] with result [%d]" %
                (threading.current_thread().name, value, fibo_dict[value]))

def queue_task(condition):
    """
    Will be executed by the thread responsible for populating `shared_queue` with elements 
    to be processed. 
    """
    logging.debug('Starting queue_task...')
    with condition:
        for item in input_list:
            shared_queue.put(item)
        logging.debug("[%s] - Notifying fibonacci_task threads that the queue is ready to consume.." % threading.current_thread().name)
        condition.notifyAll()

In [2]:
fibo_dict = {}
shared_queue = Queue()
input_list = [3, 10, 5, 7] #simulates user input

queue_condition = threading.Condition()


threads = [threading.Thread(name='fibonacci_task_thread{}'.format(i+1),
            daemon=True, target=fibonacci_task, args=(queue_condition,)) for i in range(4)]

[thread.start() for thread in threads]

prod = threading.Thread(name='queue_task_thread', daemon=True, target=queue_task, args=(queue_condition,))
prod.start()

[thread.join() for thread in threads]

logger.info("[%s] - Result: %s" % (threading.current_thread().name, fibo_dict))

2018-08-20 14:16:19,315 - [fibonacci_task_thread1] - waiting for elements in queue...
2018-08-20 14:16:19,317 - [fibonacci_task_thread2] - waiting for elements in queue...
2018-08-20 14:16:19,320 - [fibonacci_task_thread3] - waiting for elements in queue...
2018-08-20 14:16:19,320 - Starting queue_task...
2018-08-20 14:16:19,328 - [fibonacci_task_thread4] - waiting for elements in queue...
2018-08-20 14:16:19,332 - [queue_task_thread] - Notifying fibonacci_task threads that the queue is ready to consume..
2018-08-20 14:16:19,335 - [fibonacci_task_thread1] fibonacci of key [3] with result [2]
2018-08-20 14:16:19,342 - [fibonacci_task_thread2] fibonacci of key [10] with result [55]
2018-08-20 14:16:19,344 - [fibonacci_task_thread3] fibonacci of key [5] with result [5]
2018-08-20 14:16:19,346 - [fibonacci_task_thread4] fibonacci of key [7] with result [13]
2018-08-20 14:16:19,349 - [MainThread] - Result: {3: 2, 10: 55, 5: 5, 7: 13}


# Multiprocessing 

## Using multiprocessing.Pipe
A pipe consists of a mechanism that establishes communication between two endpoints (two processes in communication). It is a way to create a channel so as to exchange messages among processes.

In [1]:
import os, random
from multiprocessing import Process, Pipe


def producer_task(conn):
    value = random.randint(1, 10)
    conn.send(value)
    print('Value [%d] sent by PID [%d]' % (value, os.getpid()))
    conn.close()

def consumer_task(conn):
    print('Value [%d] received by PID [%d]' % (conn.recv(), os.getpid()))

if __name__ == '__main__':
    producer_conn, consumer_conn = Pipe()
    consumer = Process(target=consumer_task, args=(consumer_conn,))
    producer = Process(target=producer_task, args=(producer_conn,))
    
    consumer.start()
    producer.start()
    
    consumer.join()
    producer.join()

Value [5] sent by PID [18073]
Value [5] received by PID [18072]


In [17]:
#coding: utf-8

import sys, logging, time, os, random
from multiprocessing import Process, Queue, Pool, \
    cpu_count, current_process, Manager

logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(message)s')

ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch.setFormatter(formatter)
logger.addHandler(ch)

def producer_task(q, fibo_dict):
    for i in range(15):
        value = random.randint(1, 20)
        fibo_dict[value] = None #  fibo_dict: a dictionary generated by a Manager object.
        logger.info("Producer [%s] putting value [%d] into queue.. "
                % (current_process().name, value))
        q.put(value) # q: Queue()

def consumer_task(q, fibo_dict):
    while not q.empty():
        value = q.get(True, 0.05)
        a, b = 0, 1
        for item in range(value):
            a, b = b, a + b
            fibo_dict[value] = a
        logger.info("consumer [%s] getting value [%d] from queue..."
                    % (current_process().name, value))

if __name__ == '__main__':
    data_queue = Queue()
    number_of_cpus = cpu_count()
    manager = Manager()
    fibo_dict = manager.dict()
    
    producer = Process(target=producer_task, args=(data_queue, fibo_dict))
    producer.start()
    producer.join()
    
    consumer_list = []
    for i in range(number_of_cpus):
        consumer = Process(target=consumer_task, args=(data_queue, fibo_dict))
        consumer.start()
        consumer_list.append(consumer)
    
    [consumer.join() for consumer in consumer_list]
    
    logger.info(fibo_dict)

2018-08-21 12:05:10,695 - Producer [Process-5] putting value [15] into queue.. 
2018-08-21 12:05:10,700 - Producer [Process-5] putting value [15] into queue.. 
2018-08-21 12:05:10,703 - Producer [Process-5] putting value [4] into queue.. 
2018-08-21 12:05:10,705 - Producer [Process-5] putting value [16] into queue.. 
2018-08-21 12:05:10,708 - Producer [Process-5] putting value [17] into queue.. 
2018-08-21 12:05:10,710 - Producer [Process-5] putting value [1] into queue.. 
2018-08-21 12:05:10,714 - Producer [Process-5] putting value [11] into queue.. 
2018-08-21 12:05:10,717 - Producer [Process-5] putting value [1] into queue.. 
2018-08-21 12:05:10,724 - Producer [Process-5] putting value [14] into queue.. 
2018-08-21 12:05:10,726 - Producer [Process-5] putting value [17] into queue.. 
2018-08-21 12:05:10,729 - Producer [Process-5] putting value [7] into queue.. 
2018-08-21 12:05:10,733 - Producer [Process-5] putting value [6] into queue.. 
2018-08-21 12:05:10,736 - Producer [Process-5

In [None]:
#coding: utf-8

import sys, logging, time, os, random
from multiprocessing import Process, Queue, Pool, \
    cpu_count, current_process, Manager

logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(message)s')

ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch.setFormatter(formatter)
logger.addHandler(ch)



<multiprocessing.queues.Queue at 0x10a55dba8>

In [9]:
"""
Intro to Multiprocessing
"""
import time
import multiprocessing as mp

def cal_square(num_array):
    for n in num_array:
        print("square of [{}] is [{}]".format(n, n * n))

def cal_cube(num_array):
    for n in num_array:
        print("cube of [{}] is [{}]".format(n, n * n * n))
        
if __name__ == "__main__":
    num_array = [1, 2, 3, 4]
    start = time.time()
    process1 = mp.Process(target=cal_square, args=(num_array,))
    process2 = mp.Process(target=cal_cube, args=(num_array,))
    process1.start()
    process2.start()
    process1.join()
    process2.join()
    end = time.time()
    print("Tasks done in {}".format(end-start))

square of [1] is [1]
cube of [1] is [1]
square of [2] is [4]
cube of [2] is [8]
square of [3] is [9]
cube of [3] is [27]
square of [4] is [16]
cube of [4] is [64]
Tasks done in 20.03281807899475


# Share Global Variables - [shared memory]

In [14]:
"""
multiprocessing.Array & multiprocessing.Value
"""

import multiprocessing as mp

def cal_square(num_array, results_shared_array):
    for idx, n in enumerate(num_array):
        results_shared_array[idx] = n * n
        print("square of [{}] is [{}]".format(n, results_shared_array[idx]))
        
if __name__ == "__main__":
    num_array = [1, 2, 3, 4]
    results_shared_array = mp.Array("d", len(num_array))
    start = time.time()
    process = mp.Process(target=cal_square, args=(num_array, results_shared_array))
    process.start()
    process.join()
    end = time.time()
    print("Tasks done in {}".format(end-start))
    print("In the main process: {}".format(results_shared_array[:]) )

square of [1] is [1.0]
square of [2] is [4.0]
square of [3] is [9.0]
square of [4] is [16.0]
Tasks done in 0.022748947143554688
In the main process: [1.0, 4.0, 9.0, 16.0]


In [16]:
"""
multiprocessing.Queue
"""

import multiprocessing as mp

def cal_square(num_array, results_shared_queue):
    for n in num_array:
        results_shared_queue.put(n * n)
        
if __name__ == "__main__":
    num_array = [1, 2, 3, 4]
    results_shared_queue = mp.Queue()
    start = time.time()
    process = mp.Process(target=cal_square, args=(num_array, results_shared_queue))
    process.start()
    process.join()
    end = time.time()
    while results_shared_queue.empty() is False:
        print(results_shared_queue.get())

1
4
9
16


## Lock

In [31]:
"""
Racing condition
"""
import time
import multiprocessing as mp

def deposit(balance, lock):
    for i in range(100):
        time.sleep(0.001)
        lock.acquire()
        balance.value += 1
        lock.release()

def withdraw(balance, lock):
    for i in range(100):
        time.sleep(0.002)
        lock.acquire()
        balance.value -= 1
        lock.release()
        
if __name__ == "__main__":
    balance = mp.Value("i", 200)
    lock = mp.Lock()
    process_deposite = mp.Process(target=deposit, args=(balance, lock))
    process_witndraw = mp.Process(target=withdraw, args=(balance, lock))
    process_deposite.start()
    process_witndraw.start()
    process_deposite.join()
    process_witndraw.join()
    print(balance.value)

200


## MapReduce - [Pool]
Use Map-Reduce Method to run a function in parallel

In [22]:
from multiprocessing import Pool
import time
import numpy as np
def compute(n):
    temp = 0
    for i in range(n):
        temp += i
    return temp
if __name__ == "__main__":
    long_list = range(10000)
    num_cores = 4
    pool_start = time.time()
    pool = Pool(processes = num_cores)
    pool_result = pool.map(compute, long_list)
    pool.close()
    pool.join()
    pool_end = time.time()
    print("Pool took: [{}] s".format(pool_end - pool_start))
    
    serial_start = time.time()
    serial_result = []
    for e in long_list:
        temp = 0
        for i in range(e):
            temp += i
        serial_result.append(temp)
    serial_end = time.time()
    print("Serial took: [{}] s".format(serial_end - serial_start))
    print("Difference betwwen two results: {}".format(np.sum(np.array(pool_result) - np.array(serial_result))))

Pool took: [1.6648108959197998] s
Serial took: [5.136871099472046] s
Difference betwwen two results: 0


0