## How to Speed Up your Python Code using Concurrent Programming

Concurrency in python can be achieved in three ways: <br>     
<li> Multiprocessing 
<li> Multithreading 
<li> Asyncio (Asynchornous Code) 

<br>    
Concurrency in programming terms means being able to carry out multiple operations at once. This can be achieved using any of the three ways mentioned above with each having its own caveats. 

Concurrency is generally useful for solving two types of problems: `I/O Bound Problems` and `CPU Bound Problems`. Comparison of the two:

<img src="data/2.PNG" height=100 width=800 /> 
<br>

We need to choose different types of solutions for solving these problems. Multiprocessing is the solution which obtains true parallelism by spreading the tasks across the different cores of the computer. Threading and Asyncio still use only one process but intelligently finds ways to take turns and speed up the entire process.

<img src="data/1.PNG" height=100 width=800 />
<br>

In [29]:
import requests
import time

import threading
import concurrent.futures

import asyncio
import aiohttp

import multiprocessing as mp
from itertools import product

import pandas as pd
import numpy as np

from mp_imports import *


### We will run through two examples to understand these three approaches:
- We will try to hit multiple websites concurrently and get data from these websites (Kind of like what we do in scraping) -> This is an I/O bound task   
- We will try to do some calculations on a large dataset which will take time because of the dataset size -> This is a CPU bound task


# Example 1: I/O Bound Task


In [3]:
### Initialize URL List
url_list = ['https://heeeeeeeey.com/', 'https://hooooooooo.com/'] * 10

print(url_list)

['https://heeeeeeeey.com/', 'https://hooooooooo.com/', 'https://heeeeeeeey.com/', 'https://hooooooooo.com/', 'https://heeeeeeeey.com/', 'https://hooooooooo.com/', 'https://heeeeeeeey.com/', 'https://hooooooooo.com/', 'https://heeeeeeeey.com/', 'https://hooooooooo.com/', 'https://heeeeeeeey.com/', 'https://hooooooooo.com/', 'https://heeeeeeeey.com/', 'https://hooooooooo.com/', 'https://heeeeeeeey.com/', 'https://hooooooooo.com/', 'https://heeeeeeeey.com/', 'https://hooooooooo.com/', 'https://heeeeeeeey.com/', 'https://hooooooooo.com/']


### Synchronous Example

We will try to hit the 20 urls in the list mentioned above one after ther other and measure the time taken

In [4]:
def download_single_page(url, index):
    status = requests.get(url)
    print(status)
    return (index, status)

def download_web_pages(url_list):
    result = []
    for index, url in enumerate(url_list):
        index, status = download_single_page(url, index)
        result.append((index, status))
    return result

In [5]:
start = time.time()

result = download_web_pages(url_list)
end = time.time()

print("Time Taken: ", end-start)
print(result)

<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
Time Taken:  5.823001146316528
[(0, <Response [200]>), (1, <Response [200]>), (2, <Response [200]>), (3, <Response [200]>), (4, <Response [200]>), (5, <Response [200]>), (6, <Response [200]>), (7, <Response [200]>), (8, <Response [200]>), (9, <Response [200]>), (10, <Response [200]>), (11, <Response [200]>), (12, <Response [200]>), (13, <Response [200]>), (14, <Response [200]>), (15, <Response [200]>), (16, <Response [200]>), (17, <Response [200]>), (18, <Response [200]>), (19, <Response [200]>)]


**How the Synchornous version is working**

<img src="data/Synchronous_Executions.jpg" height=100 width=800 />

###  Threading Example 
We will try to hit the same 20 urls but using threading this time

In [7]:
### using the inbuilt threading library

start = time.time()

threads = [threading.Thread(target = download_single_page, 
                            args=(url,index)) for index, url in enumerate(url_list)]

for thread in threads:
    thread.start()

for thread in threads:
    thread.join()
    
end = time.time()

print("Time Taken: ", end-start)

<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]><Response [200]>

<Response [200]>
<Response [200]>
<Response [200]><Response [200]><Response [200]><Response [200]>

<Response [200]>

<Response [200]>
<Response [200]>

<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
Time Taken:  0.3823277950286865


Alternate and cleaner implementation of Threading - Use this implemention

In [8]:
def download_single_page_threads(url):
    status = requests.get(url)
    return (status)

start = time.time()

with concurrent.futures.ThreadPoolExecutor(max_workers=30) as executor:
    results = executor.map(download_single_page_threads, url_list)
    result_values = []
    for i in results:
        result_values.append(i)
        
end = time.time()
print("Time Taken: ", end-start)

Time Taken:  0.5087528228759766


In [9]:
print(result_values)

[<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>]


**How the Multithreading version is working**

<img src="data/Threading.jpg" width=800 />

### AsyncIO Example 
The same example mentioned above will be implemented below using AsyncIO (Asynchronous Programming)



**More About AsyncIO and How it Works**  
It is made up of the following three main constructs

- `Coroutine:` Unlike a conventional function with a single point of exit, a coroutine can pause and resume its execution. Creation of coroutine is as simple as using async keyword before declaring a function.
- `Event Loop Or Coordinator:` Coroutine that manages other coroutines. You can think of it as a scheduler or master.
- `Futures:` Futures are objects that have the __await__() method implemented, and their job is to hold a certain state and result. The state can be one of the following:  
PENDING - future does not have any result or exception set.  
CANCELLED - future was cancelled  
FINISHED - future was finished, either by a result or by an exception. The result, can either be a Python object, that will be returned, or an exception which may be raised.  

<br><br>
<img src="data/asyncio1.png" width=800 height=50 />

**[Read More Here](https://stackoverflow.com/a/51177895/6267086)**

### Example
You are cooking in a restaurant. An order comes in for eggs and toast. <br>
**Synchronous:** you cook the eggs, then you cook the toast. <br>
**Asynchronous:** you start the eggs cooking and set a timer. You start the toast cooking, and set a timer. While they are both cooking, you clean the kitchen. When the timers go off you take the eggs off the heat and the toast out of the toaster and serve them. <br>
**Multiprocessing:** you hire two more cooks, one to cook eggs and one to cook toast. Now you have the problem of coordinating the cooks so that they do not conflict with each other in the kitchen when sharing resources. And you have to pay them.

In [10]:
start = time.time()

async def download_single_page_async(url, index): 
    req = aiohttp.request(method='get', url=url)
    async with req as response:
        status = response.status
        return (index, status)  

tasks = []

for index, url in enumerate(url_list):
    task = asyncio.create_task(download_single_page_async(url, index))
    tasks.append(task)
    
await asyncio.wait(tasks)

end = time.time()
print("Time Taken: ", end-start)

Time Taken:  0.4359569549560547


In [11]:
tasks[0]

<Task finished name='Task-2' coro=<download_single_page_async() done, defined at <ipython-input-10-2ab1fc22e84f>:3> result=(0, 200)>

In [12]:
results = [i.result() for i in tasks]
print(results)

[(0, 200), (1, 200), (2, 200), (3, 200), (4, 200), (5, 200), (6, 200), (7, 200), (8, 200), (9, 200), (10, 200), (11, 200), (12, 200), (13, 200), (14, 200), (15, 200), (16, 200), (17, 200), (18, 200), (19, 200)]


### **Important concept to check when working with AsyncIO: `Blocking` and `Non-Blocking` Functions**

A function is blocking if it has to wait for something to complete. 

If the function is blocking because it is doing some CPU task, well then we cannot do much. But if it is blocking because of I/O, we know that the CPU is idle and can be used for starting another task that needs CPU.

A lot of inbuilt functions available in python will be blocking in nature and not asyncio compatible. Using those functions inside a async function will render the async block useless as it will work in a similar fashion to sequential execution.  

Always prefer to use functions that are asyncio compatible or if it is not possible to do that for some long running operations, we can combine `threading` or `multiprocessing` with `asyncio`

In [13]:
start = time.time()

async def download_single_page_async_v2(url, index): 
    req = requests.get(url)
    return req

tasks = []

for index, url in enumerate(url_list):
    task = asyncio.create_task(download_single_page_async_v2(url, index))
    tasks.append(task)
    
await asyncio.wait(tasks)

end = time.time()
print("Time Taken: ", end-start)

Time Taken:  5.449560880661011



The above code block works sequentially instead of utilizing the async method because `requests` is not asyncio compatible   

How can we solve issues like this??? By combining `threading` and `asyncio`

In [14]:
def download_single_page_async_v2(url): 
    req = requests.get(url)
    return req


async def RunBlockingFunction(func, url, n_workers=1):
    Loop = asyncio.get_event_loop()
    
    Thread_Executor = concurrent.futures.ThreadPoolExecutor(max_workers=n_workers)
    
    Results = await Loop.run_in_executor(Thread_Executor, func, url)

    return Results


start = time.time()

tasks = []

for index, url in enumerate(url_list):
    task = asyncio.create_task(RunBlockingFunction(download_single_page_async_v2, url, n_workers=1))
    tasks.append(task)
    
await asyncio.wait(tasks)

result = [task.result() for task in tasks]
print(result, end="\n\n")

end = time.time()
print("Time Taken: ", end-start)

[<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>]

Time Taken:  0.38820457458496094


### Multiprocessing Example
I/O Bound Example 1 implemented using multiprocessing. How do you think this will fare compared to the other methods?

In [15]:
mp.cpu_count()

4

In [16]:
start = time.time()

with mp.Pool(processes = mp.cpu_count()) as pool:
    result = pool.starmap(download_single_page_mp,  iterable = [(j,i) for i,j in enumerate(url_list)])
    print(result)
    
end = time.time()
print(end-start)

[(0, <Response [200]>), (1, <Response [200]>), (2, <Response [200]>), (3, <Response [200]>), (4, <Response [200]>), (5, <Response [200]>), (6, <Response [200]>), (7, <Response [200]>), (8, <Response [200]>), (9, <Response [200]>), (10, <Response [200]>), (11, <Response [200]>), (12, <Response [200]>), (13, <Response [200]>), (14, <Response [200]>), (15, <Response [200]>), (16, <Response [200]>), (17, <Response [200]>), (18, <Response [200]>), (19, <Response [200]>)]
2.6833620071411133


In [30]:
### Sleep Example

start = time.time()

with mp.Pool(processes = mp.cpu_count()) as pool:
    result = pool.map(sleep_test_mp,  iterable = [1,2,3,4])
    print(result)
    
end = time.time()
print(end-start)

[2, 4, 6, 8]
5.230348110198975


* Is this a good use of multiprocessing?
* What is a good example of using multiprocessing?


- This works worse than expected because the CPU is not really doing any intensive task. It's just waiting. Finally, there is also an overhead associated with creating multiple processes.

**How is the multiprocessing version working???**

<img src="data/mp_1.jpeg" height=80 width=800 />

# Example 2: CPU Bound Task
    
**Testing out Data Processing Performance on a Large Dataset using Multiprocessing and Comparing with other methods**

### Sequential Implementation

In [19]:
### Sequential Implementation
train_df = pd.read_csv('data/Data_For_Multiprocessing.csv')

start = time.time()

train_df['booking_date'] = pd.to_datetime(train_df['booking_date'])
train_df['checkin_date'] = pd.to_datetime(train_df['checkin_date'])
#train_df['checkout_date'] = pd.to_datetime(train_df['checkout_date'])

end = time.time()
print(end-start)

train_df.dtypes

25.25039792060852


reservation_id                                object
booking_date                          datetime64[ns]
checkin_date                          datetime64[ns]
checkout_date                                 object
channel_code                                   int64
main_product_code                              int64
numberofadults                                 int64
numberofchildren                               int64
persontravellingid                             int64
resort_region_code                             int64
resort_type_code                               int64
room_type_booked_code                          int64
roomnights                                     int64
season_holidayed_code                        float64
state_code_residence                         float64
state_code_resort                              int64
total_pax                                      int64
member_age_buckets                            object
booking_type_code                             

In [20]:
train_df.head(3)

Unnamed: 0,reservation_id,booking_date,checkin_date,checkout_date,channel_code,main_product_code,numberofadults,numberofchildren,persontravellingid,resort_region_code,...,state_code_residence,state_code_resort,total_pax,member_age_buckets,booking_type_code,memberid,cluster_code,reservationstatusid_code,resort_id,amount_spent_per_room_night_scaled
0,07659f3758d8aee27f5a7e2887adeacb67021cb95ada1b...,2018-05-04,2018-05-04,06/04/18,3,1,2,0,46,3,...,7.0,3,3,F,1,3d1539e56495b6991f0a3ef5a61ca3d03ce4fff7380e9a...,F,C,4e07408562bedb8b60ce05c1decfe3ad16b72230967de0...,7.706428
1,03930f033646d073462b35d411616323597715ac4fc398...,2015-01-23,2015-11-04,16/04/15,1,1,2,0,46,3,...,7.0,5,2,F,1,3d1539e56495b6991f0a3ef5a61ca3d03ce4fff7380e9a...,F,A,39fa9ec190eee7b6f4dff1100d6343e10918d044c75eac...,6.662563
2,d145a32920e6587ad95bfe299d80c0affa268220535aaf...,2015-01-28,2015-01-02,05/02/15,1,1,2,0,47,1,...,7.0,1,2,F,1,3d1539e56495b6991f0a3ef5a61ca3d03ce4fff7380e9a...,E,A,535fa30d7e25dd8a49f1536779734ec8286108d115da50...,7.871602


### Parallelized using MP

In [21]:
train_df = pd.read_csv('data/Data_For_Multiprocessing.csv')

train_df.shape[0]/16

21339.0

In [22]:
start = time.time()

list_of_dfs = np.array_split(train_df, 16)

with mp.Pool(processes = mp.cpu_count()) as pool:
    result = pool.starmap(convert_to_date_mp,  
                          iterable = [(i,['booking_date', 'checkin_date']) for i in list_of_dfs])
    
result = pd.concat(result)
    
end = time.time()
print(end-start)

11.121872186660767


In [23]:
result.dtypes

reservation_id                                object
booking_date                          datetime64[ns]
checkin_date                          datetime64[ns]
checkout_date                                 object
channel_code                                   int64
main_product_code                              int64
numberofadults                                 int64
numberofchildren                               int64
persontravellingid                             int64
resort_region_code                             int64
resort_type_code                               int64
room_type_booked_code                          int64
roomnights                                     int64
season_holidayed_code                        float64
state_code_residence                         float64
state_code_resort                              int64
total_pax                                      int64
member_age_buckets                            object
booking_type_code                             

### Implementation using Threading

In [24]:
train_df = pd.read_csv('data/Data_For_Multiprocessing.csv')

In [25]:
list_of_dfs = np.array_split(train_df, 16)

iters_list = ((i,['booking_date', 'checkin_date']) for i in list_of_dfs)

In [26]:
def convert_to_date(df, columns_list):
    for col in columns_list:
        df[col] = pd.to_datetime(df[col])
    return df

In [27]:
start = time.time()

with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
    results = executor.map(lambda x: convert_to_date(*x), iters_list)
    result_values = []
    for i in results:
        result_values.append(i)
        
result_values = pd.concat(result_values)
        
end = time.time()
print(end-start)

result_values.dtypes

30.23114013671875


reservation_id                                object
booking_date                          datetime64[ns]
checkin_date                          datetime64[ns]
checkout_date                                 object
channel_code                                   int64
main_product_code                              int64
numberofadults                                 int64
numberofchildren                               int64
persontravellingid                             int64
resort_region_code                             int64
resort_type_code                               int64
room_type_booked_code                          int64
roomnights                                     int64
season_holidayed_code                        float64
state_code_residence                         float64
state_code_resort                              int64
total_pax                                      int64
member_age_buckets                            object
booking_type_code                             

**Why does threading not improve performance???**