# Concurrency & multi-threadding in python
<br/>

- In python, only 1 thread in process can execute byte-code at a time in that process.
- In `ThreadPoolExecutor`, each thread can execute a certain number of byte-code after which it has to release the GIL lock so that other threads can be executed.
- Each process has it's own interpreter and memory-space and this limitation is only for threads belongig to the same process.
- In case of processes, 2 or more processes can run on the CPU simultaneously but they can't access any shared resource in the same way.
- We need a mutex-lock to prevent race condition and enfore consistency and concurrency between processes.
### What is GIL?
The Global Interpreter Lock (GIL) in Python, specifically in CPython (the standard implementation), is a mutex that allows only one thread to execute Python bytecode at a time within a single process. This is exactly why it is advised to use multi-threadding only when there's an I/O operation involved.
## Using `Threads`

In [1]:
import time

def fn_takes_time(time_in_ms: int = 2):
    print("Before calling sleep function")
    time.sleep(time_in_ms)  # takes parameter in seconds
    print(f"After calling sleep for {time_in_ms * 1000} ms")

In [2]:
from threading import Thread

th = Thread(target = fn_takes_time, args = [1])
th.start()
print("Main thread executing")

Before calling sleep function
Main thread executing
After calling sleep for 1000 ms


In [3]:
th = Thread(target = fn_takes_time, args = [1])
th.start()
th.join()  # joins to the main thread, acts like everything in 1 thread
print("Main thread executing")

Before calling sleep function
After calling sleep for 1000 ms
Main thread executing


In [4]:
from concurrent.futures import ThreadPoolExecutor as executor

with executor(max_workers = 1) as ex:
    future = ex.submit(fn_takes_time, 1)
    if not future.done():
        print("Function running in background...")
    else:
        print("Function completed execution")

    # this line will not be executed until the future returns the result of computation
    print(f"Function returned = {future.result()}")

Before calling sleep function
Function running in background...
After calling sleep for 1000 ms
Function returned = None


In [5]:
some_shared_resource = 0
timings = [1, 2, 2, 1]

def increment(time_in_sec: int = 1) -> int:
    print("Before calling sleep function")
    global some_shared_resource
    some_shared_resource += time_in_sec
    time.sleep(time_in_sec)
    print(f"After calling sleep for {time_in_sec * 1000} ms")
    
    return some_shared_resource

## Using `ThreadPoolExecutor`
- **CPU time:**
- **Wall time:**

`ThreadPoolExecutor.map` creates a separate thread per argument, to run the function with each argument.
## Using a single worker

In [6]:
%%time
some_shared_resource = 0

with executor(max_workers = 1) as ex:
    future = ex.map(increment, timings)

    for result in future:
        print(f"Function returned = {result}\n")

Before calling sleep function
After calling sleep for 1000 ms
Before calling sleep function
Function returned = 1

After calling sleep for 2000 ms
Before calling sleep function
Function returned = 3

After calling sleep for 2000 ms
Before calling sleep function
Function returned = 5

After calling sleep for 1000 ms
Function returned = 6

CPU times: user 12.9 ms, sys: 0 ns, total: 12.9 ms
Wall time: 6.01 s


## Using 2 workers

In [7]:
%%time
some_shared_resource = 0

with executor(max_workers = 2) as ex:
    future = ex.map(increment, timings)

    for result in future:
        print(f"Function returned = {result}\n")

Before calling sleep function
Before calling sleep function
After calling sleep for 1000 ms
Before calling sleep function
Function returned = 3

After calling sleep for 2000 ms
Before calling sleep function
Function returned = 5

After calling sleep for 2000 ms
Function returned = 6

After calling sleep for 1000 ms
Function returned = 6

CPU times: user 10.6 ms, sys: 3.62 ms, total: 14.2 ms
Wall time: 3.01 s


## Using 3 workers

In [8]:
%%time
some_shared_resource = 0

with executor(max_workers = 3) as ex:
    future = ex.map(increment, timings)

    for result in future:
        print(f"Function returned = {result}\n")

Before calling sleep function
Before calling sleep function
Before calling sleep function
After calling sleep for 1000 ms
Before calling sleep function
Function returned = 5

After calling sleep for 1000 ms
After calling sleep for 2000 ms
Function returned = 6

After calling sleep for 2000 ms
Function returned = 6

Function returned = 6

CPU times: user 6.63 ms, sys: 4.29 ms, total: 10.9 ms
Wall time: 2.01 s


## Using 4 workers

In [9]:
%%time
some_shared_resource = 0

with executor(max_workers = 4) as ex:
    future = ex.map(increment, timings)

    for result in future:
        print(f"Function returned = {result}\n")

Before calling sleep function
Before calling sleep function
Before calling sleep function
Before calling sleep function
After calling sleep for 1000 ms
Function returned = 6

After calling sleep for 1000 ms
After calling sleep for 2000 ms
Function returned = 6

After calling sleep for 2000 ms
Function returned = 6

Function returned = 6

CPU times: user 13.1 ms, sys: 482 µs, total: 13.5 ms
Wall time: 2.01 s


### How does it work?
- Each thread is assigned to a worker which runs for till a certain number of bytecode instructions is executed (controlled by `sys.setswitchinterval()` which defaults to 5ms) after which the GIL is released.
- A new thread then locks the GIL and starts it's execution.
***
## Using `ProcessPoolExecutor`

**Task:** Let's create a pool of several processes all of which computes the a part or the sum of $n$ numbers and adds it to a shared `integer` variable.

In [10]:
from multiprocessing import Value, Lock

"""
'i': Signed integer (equivalent to C’s int).
'I' Unsigned integer (equivalent to C’s unsigned int).
'h': Signed short integer.
'H': Unsigned short integer.
'l': Signed long integer.
'L': Unsigned long integer.
"""

result = Value("i", 0)  # Shared integer, initialized to 0
lock = Lock()  # Process-safe lock

In [11]:
def calculate(start: int, end: int, shared_integer, mutex_lock) -> int:
    if start > end:
        raise ValueError("Start cannot be greater than end")

    # assume each addition takes 1s of time
    n = end - start + 1
    time.sleep(n)

    with mutex_lock:
        # Protected by lock to avoid race conditions
        shared_integer.value += sum([(start + i) for i in range(n)])

    return shared_integer.value

### Let's calculate the sum of fist 10 natural numbers using 3 processes.

In [12]:
%%time
from typing import Tuple
from concurrent.futures import ProcessPoolExecutor

partitions = [(1, 3), (4, 6), (7, 10)]

def calculate_sum(args: Tuple[int]) -> int:
    return calculate(args[0], args[1], result, lock)

with ProcessPoolExecutor(max_workers = len(partitions)) as executor:
    results = executor.map(calculate_sum, partitions)
    results = list(results)
    print(results)

[6, 21, 55]
CPU times: user 16.4 ms, sys: 16.4 ms, total: 32.8 ms
Wall time: 4.03 s


### Using 1 process

In [13]:
%%time
result = Value("i", 0)  # Shared integer, initialized to 0
lock = Lock()  # Process-safe lock

with ProcessPoolExecutor(max_workers = 1) as executor:
    future = executor.submit(calculate_sum, (1, 10))
    print(future.result())

55
CPU times: user 8.43 ms, sys: 8.79 ms, total: 17.2 ms
Wall time: 10 s


Since the computation of the sum isn't based on the result of other threads, we can run each thread independent of one another.
***