# Multiprocessing & Multithreading

## Resources
* __[threading module](https://docs.python.org/3/library/threading.html)__ 
* __[queue module](https://docs.python.org/3/library/queue.html)__
* __[shutil module](https://docs.python.org/3/library/shutil.html)__
* __[multiprocessing module](https://docs.python.org/3/library/multiprocessing.html)__


Terminlogy:
* **Concurrency** is when two or more tasks can start, run, and complete in overlapping time periods. It doesn’t necessarily mean they’ll ever both be running at the same instant. Eg. multitasking on a single-core machine.
* **Parallelism** is when two or more tasks are executed simultaneously.
* A **thread** is a sequence of instructions within a process. It can be thought of as a lightweight process. Threads share the same memory space.
* A **process** is an instance of a program running in a computer which can contain one or more threads. A process has its independent memory space.


The `threading` module is used for working with threads in Python.

* The CPython implementation has a Global Interpreter Lock (GIL) which allows only one thread to be active in the interpreter at once. 
* This means that threads cannot be used for parallel execution of Python code. 
* While parallel CPU computation is not possible, parallel IO operations are possible using threads. This is because performing IO operations releases the GIL.


* What are threads used for in Python?
  * In GUI applications to keep the UI thread responsive 
  * IO tasks (network IO or filesystem IO)

* Threads should not be used for CPU bound tasks. Using threads for CPU bound tasks will actually result in worse performance compared to using a single thread.

## Example 1: Threads for filesystem IO

* A queue is used to store the files that need to be processed. 
* A dictionary is used to store the input and output file names. 
* The `process_queue()` function is used to retrieve items from the queue and perform the copy operation. 
* The copy operation is done in the `copy_op` function using the `shutil` module.

In [12]:
import threading
from queue import Queue
import time
import shutil

print_lock = threading.Lock()

def copy_op(file_data):
    with print_lock:
        print("Starting thread: {}".format(threading.current_thread().name))
    mydata = threading.local()
    mydata.ip, mydata.op = next(iter(file_data.items()))
    
    shutil.copy(mydata.ip, mydata.op)
    
    with print_lock:
        print("Finished thread : {}".format(threading.current_thread().name))

def process_queue():
    while True:
        file_data = compress_queue.get()
        copy_op(file_data)
        compress_queue.task_done()

compress_queue = Queue()

output_names = [{'v1.mkv':'v11.mkv'},{'v2.mkv':'v22.mkv'}]

for i in range(2):
    t = threading.Thread(target=process_queue)
    t.daemon = True
    t.start()

start = time.time()

for file_data in output_names:
    compress_queue.put(file_data)

compress_queue.join()

print("Execution time = {0:.5f}".format(time.time() - start))
# Try with 1, 2 threads

Starting thread: Thread-25Starting thread: Thread-26

Finished thread : Thread-25
Finished thread : Thread-26
Execution time = 1.43463


Thus, threads can be used for parallel filesystem IO.

## Example 2: Threads for network IO

In [8]:
import threading
from queue import Queue
import requests
import bs4
import time

print_lock = threading.Lock()

def get_url(current_url):
    with print_lock:
        print("\nStarting thread {}".format(threading.current_thread().name))
    
    res = requests.get(current_url)
    res.raise_for_status()
    
    current_page = bs4.BeautifulSoup(res.text,"html.parser")
    current_title = current_page.select('title')[0].getText()
    
    with print_lock:
        print("{}\n",format(threading.current_thread().name))
        print("{}\n",format(current_url))
        print("{}\n",format(current_title))
        print("Finished fetching : {}".format(current_url))

def process_queue():
    while True:
        current_url = url_queue.get()
        get_url(current_url)
        url_queue.task_done()

url_queue = Queue()

url_list = ["https://www.google.com/", 
            "https://www.google.co.in", 
            "https://suiit.ac.in/", 
            "https://www.suniv.ac.in/",
            "https://www.ugc.ac.in/"
           ]

for i in range(5):
    t = threading.Thread(target=process_queue)
    t.daemon = True
    t.start()

start = time.time()

for current_url in url_list:
    url_queue.put(current_url)

url_queue.join()

print(threading.enumerate())

print("Execution time = {0: .5f}".format(time.time() - start))
# Try with 1, 2, 5

Starting thread Thread-13Starting thread Thread-14Starting thread Thread-15


Starting thread Thread-11Starting thread Thread-12

{}
 Thread-14
{}
 https://www.google.co.in
{}
 Google
Finished fetching : https://www.google.co.in
{}
 Thread-13
{}
 https://www.google.com/
{}
 Google
Finished fetching : https://www.google.com/


Exception in thread Thread-15:
Traceback (most recent call last):
  File "/home/atulnag/intelpython3/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/home/atulnag/intelpython3/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-8-9d81150f24eb>", line 28, in process_queue
    get_url(current_url)
  File "<ipython-input-8-9d81150f24eb>", line 14, in get_url
    res.raise_for_status()
  File "/home/atulnag/intelpython3/lib/python3.6/site-packages/requests/models.py", line 935, in raise_for_status
    raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 403 Client Error: Forbidden for url: https://suiit.ac.in/



{}
 Thread-11
{}
 https://www.suniv.ac.in/
{}
 Sambalpur University, Jyoti Vihar, Burla.
Finished fetching : https://www.suniv.ac.in/
{}
 Thread-12
{}
 https://www.ugc.ac.in/
{}
 
	Welcome to UGC, New Delhi, India

Finished fetching : https://www.ugc.ac.in/


KeyboardInterrupt: 

In network IO, most of the time is spent waiting for the response from the URL, so this is another use case where using threads improves performance.

## Example 3: CPU bound tasks: prime number less than or equal to given number

* Bad to use threads for CPU bound tasks. 
* In the following program a queue holds numbers. The task is to find the sum of prime number less than or equal to the given number.

In [None]:
import threading
from queue import Queue
import time

list_lock = threading.Lock()

def find_rand(num):
    sum_of_primes = 0
    
    ix = 2
    
    while ix <= num:    
        if is_prime(ix):
            sum_of_primes += ix
        ix += 1
    
    sum_primes_list.append(sum_of_primes)


def is_prime(num):
    if num <= 1:
        return False
    elif num <= 3:
        return True
    elif num%2 == 0 or num%3 == 0:
        return False
    i = 5
    while i*i <= num:
        if num%i == 0 or num%(i+2) == 0:
            return False
        i += 6
    return True

def process_queue():
    while True:
        rand_num = min_nums.get()
        find_rand(rand_num)
        min_nums.task_done()

min_nums = Queue()

rand_list = [1000000, 2000000, 3000000]
sum_primes_list = list()

for i in range(2):
    t = threading.Thread(target=process_queue)
    t.daemon = True
    t.start()

start = time.time()

for rand_num in rand_list:
    min_nums.put(rand_num)

min_nums.join()

end_time = time.time()

sum_primes_list.sort()
print(sum_primes_list)

print("Execution time = {0:.5f}".format(end_time - start))     
# Try with 1/2 threads

The results are very clear : don’t use threads to improve performance of CPU bound tasks. You will always end
up with worse performance.

## Multiprocessing

* For parallel execution of task the `multiprocessing` module can be used.

## Example 4: primes using `multiprocessing`

In the following example we take the same task used above and process the inputs in parallel using the multiprocessing module.

In [11]:
from multiprocessing import Pool
import time

def sum_prime(num):
    sum_of_primes = 0
    
    ix = 2

    while ix <= num:    
        if is_prime(ix):
            sum_of_primes += ix
        ix += 1
    
    return sum_of_primes


def is_prime(num):
    if num <= 1:
        return False
    elif num <= 3:
        return True
    elif num%2 == 0 or num%3 == 0:
        return False
    i = 5
    while i*i <= num:
        if num%i == 0 or num%(i+2) == 0:
            return False
        i += 6
    return True

start = time.time()
with Pool(2) as p:
    print(p.map(sum_prime,[1000000, 2000000, 3000000]))
print("Time taken = {0:.5f}".format(time.time() - start))
# try with 1,2,3 processes

[37550402023, 142913828922, 312471072265]
Time taken = 10.63457


So using the multiprocessing module results in the full utilization of the CPU. Inter process communication can be achieved using queues or pipes. The Queue in the multiprocessing module works similar to the queue module used to demonstrate how the threading module works.