# Concepts for parallel programming

* Some of the main different concepts when it comes to writing code that should not just be serial
* With a special focus on *asynchronous* programming.
* When to are those concepts most appropriate?

A real life example for to better grasp those concepts:

## Cooking Mpo-Tofu:
<center><img src="assets/MapoTofu.jpg" width="450px"></center>

 A popular Sichuan dish that consists of tofu and minced meat set in a oily spicy sauce made from fermented beans and chili paste based.

### Why cooking?

Because it demonstrates that the real world is *asynchronous*

### Why Mapo-Tofu?
Because it's my favourite dish.

## Let's get started - Ingredients:

* Silken (soft) tofu
* minced meat
* Chili-Oil
* Douchi (fermented black beans)
* Spices

## Let's get started - Steps:

* Wash and boil rice
* Blend spices
* Fry spices in chili oil
* Fry minced meat
* Add tofu to spice oil
* Add meat to spice oil
* Stir and serve

## Let's code - the classic approach
Let's try to simulate the cooking process. For this we are going to define simple functions
the can be seen as the steps for cooking Mapo-Tofu.

In [1]:
import logging
import time
from typing import List, Tuple

In [2]:
def prepare_rice() -> Tuple[float, float]:
    """Perpare the rice: whash and boil it."""
    
    print("Washing the rice ... ", end="")
    time.sleep(2)
    print("done")
    print("Boiling the rice ... ", end="")
    time.sleep(5)
    print("done")
    return time.time(), 80.

    

In [3]:
def prepare_spices() -> Tuple[float, float]:
    """Blend all spices together and fry them."""
        
    print("Blending spices ... ", end="")
    time.sleep(2)
    print("done")
    print("Frying spices ...", end="")
    time.sleep(5)
    print("done")
    return time.time(), 120.

In [4]:
def prepare_meat_and_tofu() -> Tuple[float, float]:
    """Fry the meat and add the tofu."""
    
    print("Frying meat ... ", end="")
    time.sleep(5)
    print("done")
    print("Mixing tofu...", end="")
    time.sleep(2)
    print("done")
    return time.time(), 90.

In [5]:
def cook() -> None:
    """Cook the meal."""
    start = time.time()
    rice = prepare_rice()
    oil = prepare_spices()
    mapo = prepare_meat_and_tofu()
    print("Mixing oil and meat", end="")
    time.sleep(2)
    print("done")
    serve_dish(start, rice)

In [6]:
def serve_dish(
    start: float,
    rice_prop: Tuple[float, float], 
) -> None:
    """Serve the disch and check the temp. of the rice."""
    now = time.time()
    rice_temp = rice_prop[-1] - (2 * (now - rice_prop[0]))
    print(f"Rice temperature: {rice_temp:02.1f}")
    print(f"Preperation time: {(now-start):2.3f}")

In [7]:
cook()

Washing the rice ... done
Boiling the rice ... done
Blending spices ... done
Frying spices ...done
Frying meat ... done
Mixing tofu...done
Mixing oil and meatdone
Rice temperature: 48.0
Preperation time: 23.021


## Let's try a multi-threaded version of this:
* The OS can execute a program on multiple *cores* of a cpu in parallel
* This parallel execution share resources via the cpu cache
* In python *real* multi-threading doesn't work, because of the (GIL)
    * The python interpreter executes only on line of python code at a time
    * **But**: *CPython* libraries such as numpy, time etc can release the GIL

In [8]:
from concurrent.futures import ThreadPoolExecutor, as_completed

In [9]:
def threadded_cook() -> None:
    """A multi-threadded version of cook"""
    with ThreadPoolExecutor() as pool:
        start = time.time()
        rice_thread = pool.submit(prepare_rice)
        spice_thread = pool.submit(prepare_spices)
        mapo_thread = pool.submit(prepare_meat_and_tofu)
        results = list(as_completed([rice_thread, spice_thread, mapo_thread]))
        print("Mixing oil and map", end="")
        time.sleep(2)
        print("done")
        serve_dish(start, results[0].result())

In [10]:
threadded_cook()

Washing the rice ... Blending spices ... Frying meat ... done
Boiling the rice ... done
Frying spices ...done
Mixing tofu...done
done
done
Mixing oil and mapdone
Rice temperature: 76.0
Preperation time: 9.008


## There is a catch:
* The shared resources can mess up each other:


In [31]:
def cut_meat(cupboard: List[str]) -> None:
    cupboard.pop(cupboard.index("knive"))
    print("Cutting the meat")
    time.sleep(1)
    cupboard.append("knive")
def cut_spices(cupboard: List[str]) -> None:   
    cupboard.pop(cupboard.index("knive"))
    print("Cutting the spices")
    time.sleep(1)
    cupboard.append("knive")
    

Now let's run these functions in a multi-threaded context:

In [34]:
with ThreadPoolExecutor() as pool:
    cupboard = ["knive", "spoon", "fork"]
    cut_1 = pool.submit(cut_meat, cupboard)
    cut_2 = pool.submit(cut_spices, cupboard)
    try:
        cut_1.result(), cut_2.result()
    except Exception as error:
        logging.exception(error)
    

Cutting the meat


ERROR:root:'knive' is not in list
Traceback (most recent call last):
  File "/tmp/ipykernel_544052/2699050649.py", line 6, in <cell line: 1>
    cut_1.result(), cut_2.result()
  File "/home/wilfred/.anaconda3/envs/plotting/lib/python3.10/concurrent/futures/_base.py", line 439, in result
    return self.__get_result()
  File "/home/wilfred/.anaconda3/envs/plotting/lib/python3.10/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/home/wilfred/.anaconda3/envs/plotting/lib/python3.10/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/tmp/ipykernel_544052/4159176023.py", line 7, in cut_spices
    cupboard.pop(cupboard.index("knive"))
ValueError: 'knive' is not in list


### What's the catch?

* The two threads are trying to *modify* the same resources at the same time -> **race condition**

* We could use thread locks but in more complicated examples this could lead to threads
blocking each other -> **dead lock**

* Errors in multi-threaded code can be hard to debug

## A possible solution: multi-*processing*

In [13]:
from concurrent.futures import ProcessPoolExecutor

In [14]:
with ProcessPoolExecutor() as pool:
    cupboard = ["knive", "spoon", "fork"]
    cut_1 = pool.submit(cut_meat, cupboard)
    cut_2 = pool.submit(cut_spices, cupboard)
    cut_1.result(), cut_2.result()

Cutting the meat
Cutting the spices


### What's the catch:

* All objects have to be pickled (python) -> possible overhead


* Per default no communication between processes
    * information exchange has to implemented by the mp interface/lib
    * information exchange can cause significant overhead

* Eventually your OS is orchestrating threads and processes:
<centering><img src="assets/Shiva.png" width="450px"><centering>
* Too much load will strangle your system.

## How would we cook in the real world?
We wouldn't:
* do one step at a time, 
* nor, invite `N` friends to help you cook a dish with `N` preparation steps!

We would:
* boil the rice, while rice is boiling heat oil, while oil is heating up cut spices ...

### How can this be done in a programming context?

# Asynchronous programming

Python has the GIL that can be *acquired* and *released*; but only by low level *CPython*.

In reality *we* want to tell the programming logic when to *wait* for tasks and when
things can be done in the "background". To do so we need a whole new framework with its own syntax (`await` and `async`): 

In [15]:
import asyncio

In [16]:
async def prepare_rice_async() -> Tuple[float, float]:
    """Perpare the rice: whash and boil it."""
    
    print("Washing the rice ... ", end="")
    await asyncio.sleep(2)
    print("done")
    print("Boiling the rice ... ", end="")
    await asyncio.sleep(5)
    print("done")
    return time.time(), 80.

    

In [17]:
async def prepare_spices_async() -> Tuple[float, float]:
    """Blend all spices together and fry them."""
        
    print("Blending spices ... ", end="")
    await asyncio.sleep(2)
    print("done")
    print("Frying spices ...", end="")
    await asyncio.sleep(5)
    print("done")
    return time.time(), 120.

In [18]:
async def prepare_meat_and_tofu_async() -> Tuple[float, float]:
    """Fry the meat and add the tofu."""
    
    print("Frying meat ... ", end="")
    await asyncio.sleep(5)
    print("done")
    print("Mixing tofu...", end="")
    await asyncio.sleep(2)
    print("done")
    return time.time(), 90.

In [19]:
async def cook_async() -> None:
    """Cook the meal."""
    start = time.time()
    rice_task = asyncio.create_task(prepare_rice_async())
    oil_task = asyncio.create_task(prepare_spices_async())
    mapo_task = asyncio.create_task(prepare_meat_and_tofu_async())
    await asyncio.gather(rice_task, oil_task, mapo_task)
    print("Mixing oil and meat", end="")
    await asyncio.sleep(2)
    print("done")
    await serve_dish_async(start, rice_task.result())

In [20]:
async def serve_dish_async(
    start: float,
    rice_prop: Tuple[float, float], 
) -> None:
    """Serve the disch and check the temp. of the rice."""
    now = time.time()
    rice_temp = rice_prop[-1] - (2 * (now - rice_prop[0]))
    print(f"Rice temperature: {rice_temp:02.1f}")
    print(f"Preperation time: {(now-start):2.3f}")

In [28]:
loop = asyncio.get_event_loop()
_ = loop.create_task(cook_async())
# If we weren't using jupyter we could simply call
# `asyncio.run(cook_async())` instead of this shenanigans above


Washing the rice ... Blending spices ... Frying meat ... done
Boiling the rice ... done
Frying spices ...done
Mixing tofu...done
done
done
Mixing oil and meatdone
Rice temperature: 76.0
Preperation time: 9.005


our cooking was executed in one single thread, simply by being smart about telling python about when it can execute what. 

The problems that we have with multi threading / processing doesn't really occur!

### What's the catch?
* Everything has to be stream lined. If some code blocks the execution benefits are gone:

```python
lock = True
async def consumer() -> None:
    while lock is True:
        print("Waiting for lock", end="\r")
        time.sleep(1)
    print("Waiting done, exiting")

async def producer() -> None:
    task = asyncio.create_task(consumer())
    await asyncio.sleep(.5)
    lock = True
    asyncio.gather(task)
```

There are a lot of aspects we can't cover like `coroutines`, `tasks`, `event loops` but there is great reading material out there:

* https://realpython.com/async-io-python
* https://realpython.com/python-concurrency

## When to use which concept?
Rule of thumb:

Async / Multi-Threading:

* When your program needs to wait a lot on external things (like responses from web servers)
* Needs to handle requests at the same time (like a web server)

I/O bound

Multi-Processing:

* When your program doesn't have time to wait but calculates a lot

CPU bound

### Let's do a CPU bound example:

In [22]:
def cpu_bound(number: int) -> int:
    return sum(i * i for i in range(number))

def find_sums() -> None:
    numbers = (5_000_000 + x for x in range(20))
    start_time = time.time()
    for number in numbers:
        cpu_bound(number)
    duration = time.time() - start_time
    print(f"Duration {duration:1.2f} seconds")

In [23]:
find_sums()

Duration 6.03 seconds


In [24]:
def find_sums_threaded() -> None:
    numbers = (5_000_000 + x for x in range(20))
    start_time = time.time()
    with ThreadPoolExecutor() as pool:
        pool.map(cpu_bound, numbers)
    duration = time.time() - start_time
    print(f"Duration {duration:1.2f} seconds")

In [25]:
find_sums_threaded()

Duration 6.16 seconds


In [29]:
def find_sums_process() -> None:
    numbers = (5_000_000 + x for x in range(20))
    start_time = time.time()
    with ProcessPoolExecutor() as pool:
        pool.map(cpu_bound, numbers)
    duration = time.time() - start_time
    print(f"Duration {duration:1.2f} seconds")

In [30]:
find_sums_process()

Duration 2.17 seconds
