# Concurrency in Python

## at PyDelhi Virtual Meetup¶

### by Ankush Chander (building at https://raxter.io)

# Agenda
 Understand features/limitations of python so that it enables us to write efficient programs 

# CPU bound vs I/O bound


A program is CPU bound if it would go faster if the CPU were faster, i.e. it spends the majority of its time simply using the CPU (doing calculations). A program that computes new digits of π will typically be CPU-bound, it's just crunching numbers.

Eg:
1. matrix multiplication
2. Training NN models


A program is I/O bound if it would go faster if the I/O subsystem was faster. Which exact I/O system is meant can vary; I typically associate it with disk, but of course networking or communication in general is common too. A program that looks through a huge file for some data might become I/O bound, since the bottleneck is then the reading of the data from disk (actually, this example is perhaps kind of old-fashioned these days with hundreds of MB/s coming in from SSDs).

Eg:
1. Database access
2. Disk read/write

# Performance improvement of programs
### 1. Delegation/Outsourcing
    
    a. Process pool
    
    b. Thread pool


### 2. Improve efficiency of the program itself

    a. asynchronous programming 

## Basics

### Process
A process is the instance of a computer program that is being executed by one or many threads. It contains the program code and its activity.
It is identified by process_id.

    python: os.getpid()
    bash: ps -ef 
    
### Thread (lightweight process)
A thread of execution is the smallest sequence of programmed instructions that can be managed independently by a scheduler, which is typically a part of the operating system.
It is identified by thread_id

    python: threading.get_native_id()
    bash: ps -T -p process_id
    


### Coexistence of threads and processes
![title](img/concurrency/process_thread.jpg)



In [22]:
# By default python is single threaded, blocking in nature. It means all the tasks of the program are 
#lined up on a single thread.

# Example of an I/O intensive job

# End goal of program:  Our program takes as input some country codes and download it"s flags.

import os
import time
import sys
from concurrent import futures
import threading
import collections
import requests
import time


BASE_URL = "https://cdn.countryflags.com/download"

def get_flag(cc):
    url = f"{BASE_URL}/{cc}/flag-jpg-small.jpg"
    resp = requests.get(url)
    if resp.status_code != 200:  #1. catch if result not OK
        resp.raise_for_status()
    else:
        #1. log the process id and thread in which the function is being executed.
        print(f"downloaded {cc} flag in process:{os.getpid()}, thread:{threading.get_ident()}")
    return resp.content

def sequential_io_function(country_codes):
    s_time = time.perf_counter()
    #2. download flags sequentially in a for loop     
    for cc in country_codes: 
        get_flag(cc)
    print(f"sequential io took: {time.perf_counter() - s_time} seconds")

def threadpool_function(country_codes, workers=None):
    s_time = time.perf_counter()
    #3. create a thread pool to delegate download tasks to      
    with futures.ThreadPoolExecutor(workers) as executor:
        actual_workers = executor._max_workers
        #4. submit task to the thread pool         
        to_do = (executor.submit(get_flag, code) for code in country_codes)
        #5. return results as soon as the task gets done          
        for future in futures.as_completed(to_do):
            res = future.result()
    print(f"threadpool took: {time.perf_counter() - s_time} seconds")

def processpool_function(country_codes, workers=None):
    s_time = time.perf_counter()
    #3. create a process pool to delegate download tasks to
    with futures.ProcessPoolExecutor(workers) as executor:
        actual_workers = executor._max_workers
        print(actual_workers) # equal to number of CPU cores
        
        to_do = (executor.submit(get_flag, code) for code in country_codes)
        for future in futures.as_completed(to_do):
            res = future.result()
    print(f"processpool took: {time.perf_counter() - s_time} seconds")

country_codes ="Yemen Zambia Vietnam Venezuela Ukraine".lower().split()
sequential_io_function(country_codes) # takes around 17.6 seconds
# threadpool_function(country_codes) # takes around 4.00 seconds
# processpool_function(country_codes) # takes around 4.0 seconds


downloaded yemen flag in process:26510, thread:139876206409536
downloaded zambia flag in process:26510, thread:139876206409536
downloaded vietnam flag in process:26510, thread:139876206409536
downloaded venezuela flag in process:26510, thread:139876206409536
downloaded ukraine flag in process:26510, thread:139876206409536
sequential io took: 16.294264714000747 seconds


In [29]:
# Example of CPU intensive task 
# Goal of program: generate hash of a very large numbers

import os
import time
import sys
import hashlib
from concurrent import futures
from random import randrange
import threading
JOBS = 20
SIZE = 2**20
STATUS = '{} workers, elapsed time: {:.2f}s'


def sha(size):
    data = bytearray(randrange(256) for i in range(size))
    algo = hashlib.new('sha256')
    algo.update(data)
    print(f"inside process: {os.getpid()}. inside thread:{threading.get_ident()}")
    return algo.hexdigest()


def sequential_function():
    s_time = time.perf_counter() 
    for i in range(JOBS):
        res = sha(SIZE)
    print(f"sequential operation took:{time.perf_counter() - s_time} seconds")

def threadpool_function(workers=None):
    t0 = time.time()
    print(f"main thread:{threading.get_ident()}")
    with futures.ThreadPoolExecutor(workers) as executor:
        actual_workers = executor._max_workers
        print(actual_workers)
        to_do = (executor.submit(sha, SIZE) for i in range(JOBS))
        for future in futures.as_completed(to_do):
            res = future.result()

    print(STATUS.format(actual_workers, time.time() - t0))

def processpool_function(workers=None):
    t0 = time.time()
    print(f"main thread:{threading.get_ident()}")
    with futures.ProcessPoolExecutor(workers) as executor:
        actual_workers = executor._max_workers
        # print(actual_workers)
        to_do = (executor.submit(sha, SIZE) for i in range(JOBS))
        for future in futures.as_completed(to_do):
            res = future.result()

    print(STATUS.format(actual_workers, time.time() - t0))
    
if __name__ == '__main__':
#     sequential_function() # takes near 12.95696124099777 seconds
#     processpool_function() # takes near 4.21 seconds
    threadpool_function() # takes near 12.76 seconds


main thread:139876206409536
40
inside process: 26510. inside thread:139874753550080
inside process: 26510. inside thread:139874233464576
inside process: 26510. inside thread:139874216679168
inside process: 26510. inside thread:139873696593664
inside process: 26510. inside thread:139874267035392
inside process: 26510. inside thread:139873730164480
inside process: 26510. inside thread:139874778728192
inside process: 26510. inside thread:139873679808256inside process: 26510. inside thread:139873704986368

inside process: 26510. inside thread:139874225071872inside process: 26510. inside thread:139874770335488

inside process: 26510. inside thread:139874250249984
inside process: 26510. inside thread:139874241857280
inside process: 26510. inside thread:139873721771776
inside process: 26510. inside thread:139874787120896
inside process: 26510. inside thread:139874761942784
inside process: 26510. inside thread:139873713379072
inside process: 26510. inside thread:139874258642688
inside process:

## Key findings:


### For I/O intensive jobs:
Processpool ~ Threadpool > sequential execution

### For CPU intensive jobs:
1. Processpool > threadpool ~ sequential
2. Upper bound on parellel processing via processpool is the number of cores in the machine. 



### Race conditions in Tatkal 
![title](img/concurrency/race_condition_tatkal_booking.jpg)



# Global Interpreter Lock
The GIL is a single lock on the interpreter itself which adds a rule that execution of any Python bytecode requires acquiring the interpreter lock. This prevents deadlocks (as there is only one lock) and doesn’t introduce much performance overhead. But it effectively makes any CPU-bound Python program "single-threaded".


### Note: 
Every blocking I/O function in the Python standard library releases the GIL, allowing other threads to run. The time.sleep() function also releases the GIL. Therefore, Python threads are perfectly usable in I/O-bound applications, despite the GIL.

# Concurrency with asyncio

## Async behaviour in McDonalds

![title](img/concurrency/async_mcdonalds.jpeg)

### Blocking and synchronous 
if you stand in front of the queue while your order is being prepared in kitchen you are blocking next person in line as well as wasting time of the cashier.
### Non blocking and asynchronous 
if you place your order and step aside you allow more people to "make progress" and cashier make "efficient" use of his time.

## Timing comparisons of various operations

![title](img/concurrency/io_comparison.png)


# Key ingredients
## 1. Event Loop
You can think of an event loop as something like a while True loop that monitors coroutines, taking feedback on what’s idle, and looking around for things that can be executed in the meantime. It is able to wake up an idle coroutine when whatever that coroutine is waiting on becomes available.

## 2. Functions vs Coroutines
Functions are entered at one point and exited at another point.

Coroutines can be entered, exited, and resumed at many different points. This allows them to give way to other coroutines whike they are waiting on a external dependency.

They can be implemented with the
1. async def statement.
2. asyncio.coroutine decorator

In [18]:
import asyncio
import time

# Syntax of a function 
def simple_function():
    print('hello')
    #1. blocks the entire thread until the operation is completed.     
    time.sleep(1) 
    print('world')


# Syntax of a coroutine

# 1. Using async def(more preferable way.)
async def async_coroutine():
    print('hello')
    #2.  Inside a coroutine, when you await on another coroutine,
    # you step off the event loop and schedule the awaited coroutine to run *immediately*.
    await asyncio.sleep(1) 
    print('world')

# 2. Using asyncio decorator
@asyncio.coroutine
def yield_coroutine():
    print('hello')
    #2. yield from statement does two things  
    x = yield from asyncio.sleep(1) 
    print('world')
    return
    
loop = asyncio.get_event_loop()
simple_function()    

# 3.calling the coroutine directly does not cause it"s execution but raises warning
async_coroutine()

#4. By using asyncio.ensure_future you place a task on event loop for execution in future
asyncio.ensure_future(async_coroutine(), loop=loop)
asyncio.ensure_future(yield_coroutine(), loop=loop)
# yield_coroutine()

hello
world


<Task pending coro=<yield_coroutine() running at <ipython-input-18-87acbb59dce3>:23>>

hello
hello
world
world


In [19]:
# I/O intensive job using asyncio
# Goal: Download 5 flags
import os
import time
import sys
import asyncio
from aiohttp import ClientSession
import threading
import collections
import requests
import time


BASE_URL = "https://cdn.countryflags.com/download"


async def get_flag(cc):
    print(f"get_flag running on thread#{threading.get_ident()}")
    url = f"{BASE_URL}/{cc}/flag-jpg-small.jpg"
    session = ClientSession()
    resp = await session.request(url=url, method='GET',headers={'Content-type': 'image/jpg'})  # <4>
    print(resp.status)
    image = await resp.read()  # <5>
    await session.close()
    return image

async def download_many(cc_list):
    todo = [get_flag(cc) for cc in sorted(cc_list)]  # <9>
    await asyncio.gather(*todo)

if __name__=="__main__":
    country_codes ="Yemen Zambia Vietnam Venezuela Ukraine".lower().split()
    t0 = time.time()
    await download_many(country_codes)
    print(f"async flag download took {time.time() - t0} seconds")
    # asyncio servers and related links

get_flag running on thread#139876206409536
get_flag running on thread#139876206409536
get_flag running on thread#139876206409536
get_flag running on thread#139876206409536
get_flag running on thread#139876206409536
200
200
200
200
200
async flag download took 3.295886516571045 seconds


# Concurrency and parallelism
Concurrency is about dealing with lots of things at once.

Parallelism is about doing lots of things at once. Not the same, but related.

One is about structure, one is about execution.

Concurrency provides a way to structure a solution to solve a problem that may (but not necessarily) be parallelizable.

— Rob Pike Co-inventor

Co-inventor of the Go language


### A simple heuristic:
If your solution makes sense in a single core computer it is concurrency.

### References:
0. https://stackoverflow.com/questions/868568/what-do-the-terms-cpu-bound-and-i-o-bound-mean
1. https://en.wikipedia.org/wiki/Process_(computing)
2. https://en.wikipedia.org/wiki/Thread_(computing)
3. https://docs.python.org/3.6/library/threading.html
4. https://docs.python.org/3/library/concurrent.futures.html
5. https://realpython.com/python-gil/
6. https://hackernoon.com/a-simple-introduction-to-pythons-asyncio-595d9c9ecf8c
7. Fluent Python by Ramalho, Luciano
8. https://docs.python.org/3/glossary.html
9. https://www.python.org/dev/peps/pep-0492/