# Parallel Programming in Python

## Introduction

Modern CPUs have multiple physical cores on a single chip. Four, six, and eight physical cores are becoming extremely common on consumer CPUs. The current generation of Intel Xeon CPUs which are found in enterprise-level hardware can have up to 28 physical cores per CPU.

Additionally, if the CPU supports hyperthreading, each core can handle two tasks simultaneously. Therefore a 6-core CPU with hyperthreading can be viewed as having 12-virtual cores which can all be utilized independently.

<img src="https://5.imimg.com/data5/KE/ME/MY-12042782/intel-core-processor-500x500.jpg" width=250>
<center>Intel Core i7 - 8700k CPU with 6 physical cores</center>

Python, be default, will only ever utilize a single CPU core at a time. All instructions in your code are run one at a time, in serial, on one core of your CPU. This leaves all of the other cores sitting there doing nothing (well... not nothing, they are handling other stuff on your system, but not your code, and thats all we really care about).

<img src="https://images.anandtech.com/doci/13446/cpu0-12.png" width=300></img>
<center style="font-style: italic">Human demonstration of running standard python code on a multi-CPU/core system</center><br>

The illustration below shows a simple example of generating 4 random strings. In serial (right), the `rand_string()` function is called 4 times in a row to generate 4 random strings. This could also be accomplished in parallel (left) by having 4 CPU cores each call `rand_string()` once and all at the same time. Theoretically, this would speed up computation 4X.

<sub>note: in this simple example, computation would not be sped up at all. It would more than likely take longer to execute this in parallel using python. That is because there is some overhead that occurs when launching parallel tasks. However, if your computational load is large enough, this would accelerate your computation 4X*</sub>

<img src="https://sebastianraschka.com/images/blog/2014/multiprocessing_intro/multiprocessing_scheme.png" width=600>
<center></center>

Python contains several modules in the standard library that allow you to perform tasks in parallel. However it is not quite as simple as just importing a library and saying go. You have to think about how your code operates a little differently. And of course, not all tasks can be parallelized.

At a very high level, Python allows you to handle parallelism in two different ways:

1. Threads - Multithreading
2. Processes - Multiprocessing

Each has it's own unique pros and cons and appropriate use cases.


## Python Threads vs Processes

Python provides both a [threading](https://docs.python.org/3/library/threading.html) and a [processing](https://docs.python.org/3/library/multiprocessing.html?highlight=multiprocessing#module-multiprocessing) module to handle tasks simultaneously (concurrency).

From a high level, these models appear almost identical and seem to provide identical function. You could use them both side by side and from the keyboard it would appear they both performed exactly the same. However, how these modules are functioning differently under-the-hood are extremely important to understand to make sure your code is executing as efficiently as possible.

This section will cover a few main differences between the two

### Differences in CPU usage

This part comes first because it is probably by far the most imporant. **Multithreading** in python will ONLY ever use **1 CPU core**. You can create and excute as many threads as you want within your program, but you will never use more than 1 CPU core at a time.

Why is this? Well it is because Python multithreadding is not actually executing code in parallel. It just looks and acts like it. What is actually happeneing, is the CPU is quickly switching between each of the threads. This has to happen because of Python's infamous **Global Interpreter Lock (GIL)**. You can read all about it [here](https://wiki.python.org/moin/GlobalInterpreterLock) and [here](https://docs.python.org/3/c-api/init.html#thread-state-and-the-global-interpreter-lock). In short the GIL allows only one thread to execute **python code** at a time. However, the GIL is released during input/output (I/O) operations (important to note for later) and can released by packages executing C-code directly (e.g. numpy).

<img src="https://hackernoon.com/photos/ZbqyG0GzLmVkwsYNyBRB9kTk5DR2-83212cm" width=250>
<center style="font-style: italic">Python concurency using threads</center><br>
<img src="https://hackernoon.com/photos/ZbqyG0GzLmVkwsYNyBRB9kTk5DR2-gwnd122h" width=250>
<center style="font-style: italic">Python concurency using processes</center>

That standard implementation of Python is written in C (CPython) and was first released in 1990 before the era of modern multi-core CPUs. Additionally, the memory management of all modules utilized CPython are not necessarily thread-safe so the GIL was created to ensure thread-safety in the Python language as a whole. The GIL was found to be the most performant solution at the time (and still today) in the case of single-theaded operation, which is the standard use of Python then and now.

But why is the GIL still around 30 years later? Well, there have been many proposals to remove the GIL from Python contributors, but nobody has found a good solution to it yet.

The solution for true parallel processing in Python comes from the **multiprocessing** library. Using this library gets around the GIL by spawning an entirely independent system process with its own Python interpreter. With this comes overhead; it takes time to spawn child processes and memory-spaces must be **copied for each child process**. 

This means not only is multiprocessing limited by the number of CPUs you have. You must also have enough memory to hold replicates of the memory.

### Differences in memory management

This leads to a discussion of the second important difference between Python thread and processes: How they utilize memory.

Threads all share common memory with their parent process. This is good in that: They do not need to create a copy of object for each thread, and they can easily communicate with each other through shared data. **But caution:** it is very easy to create race conditions where two threads try to access or modify the same memory object simultaneously.

Processes each contain a copy of the parnets memory-space. This means there is no worry about one process modifying a piece of memory while the other is trying to read it. But it also makes it more difficult to share data bewteen processes.

<img src="https://uploads.toptal.io/blog/image/126087/toptal-blog-image-1526311066247-4ce28d0e2a6878d80c5374d2c53e8aff.png" width=350>
<center style="font-style: italic">Python threads use a shared memory space</center><br>
<img src="https://uploads.toptal.io/blog/image/126088/toptal-blog-image-1526311071925-a4dc10cda4cf6b88a7f11fd5d4c76f9a.png" width=350>
<center style="font-style: italic">Python processes get independent copies of the parent memory space</center>

However, Python does provide some modules out-of-the-box which provide thread-safe mechanisms to share data between threads and processes. The [Queue](https://docs.python.org/3/library/queue.html). Which will be demonstrated in the examples.

## Should I use a Thread or a Process

The easiest way to determine which method you should use is to look at your program and decide: is my problem a CPU-bound problem or an I/O-bound problem. In other words does my program spend most of its time doing computations or does it spend most of its time waiting for I/O (reading/writing to disk, communicating over the network, etc). 

If your program is I/O-bound, using threads will speed up the program, because I/O happens outside of the GIL.

<img src="https://files.realpython.com/media/IOBound.4810a888b457.png" width=450>
<center style="font-style: italic">An I/O bound program</center><br>

<img src="https://files.realpython.com/media/Threading.3eef48da829e.png" width=450>
<center style="font-style: italic">Speeding up an I/O-bound problem using threads</center><br>

If you program is CPU-bound. Threads will offer no increase in speed, because remember, the CPU is just switching back and forth between threads. However, using Processe you can speed up CPU-bound programs dramatically.

<img src="https://files.realpython.com/media/CPUBound.d2d32cb2626c.png" width=450>
<center style="font-style: italic">An CPU bound program</center><br>

<img src="https://files.realpython.com/media/CPUMP.69c1a7fad9c4.png" width=450>
<center style="font-style: italic">Speeding up an CPU-bound problem using Processes</center><br>

# Programming Examples

Below are some simple examples on how to utilize the discussed modules in Python. These are for demonstration and reference purposes only. They are by no means intended to be an exhaustive tutorial on any of this modules.

<sub>Note: Python offers some modules to abstract a way some of what we are doing in the examples below, [Pool](https://docs.python.org/3/library/multiprocessing.html?highlight=pool#multiprocessing.pool.Pool) for example. But these are a little buggy when running in Jupyter notebooks, and don't allow you to appreciate what they are actually doing.</sub>

In [1]:
from multiprocessing import Process, Queue, Event
from queue import Empty
from threading import Thread
import time
import pandas as pd
import random
import tempfile

## I/O speed-up using Threads

In [2]:
# Generate a csv with 10M rows
temp = tempfile.NamedTemporaryFile("wt")
for i in range(10000000):
    temp.write("line,data,{}\n".format(i))

In [3]:
pd.read_csv(temp.name).head()

Unnamed: 0,line,data,0
0,line,data,1
1,line,data,2
2,line,data,3
3,line,data,4
4,line,data,5


In [4]:
%%timeit -r3
# Simulate reading in 3 large dataframes in a row
for i in range(3):
    pd.read_csv(temp.name)

7.24 s ± 17.9 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)


In [5]:
# Create a function to read in a single dataframe
def read_df(filepath):
    pd.read_csv(filepath)

In [6]:
%%timeit -r3
# Use threads to simultaneouly read in the 3 dataframes
threads = []
for i in range(3):
    t = Thread(target=read_df, args=(temp.name,))
    t.start()
    threads.append(t)
    
for t in threads:
    t.join()

5.18 s ± 51.2 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)


In [7]:
# Remove the temp file
temp.close()

#### Your numbers will vary. By on my laptop I got a 8.4 seconds vs 5.1 seconds. A noticable improvement.

## Speed up long running jobs using Process

In [8]:
fake_runtimes = [5,2,4,1,5]

In [9]:
# Simulate a long running process with a runtime that is variable
def long_running_process(process_id: int, task_time: int):
    time.sleep(task_time)
    print("Process id {} - Complete - Task time: {}s".format(process_id, task_time))

In [10]:
%%timeit -r1 -n1
# Running this job 5 times in serial
for i, task_time in zip(range(5), fake_runtimes):
    long_running_process(i, task_time)

Process id 0 - Complete - Task time: 5s
Process id 1 - Complete - Task time: 2s
Process id 2 - Complete - Task time: 4s
Process id 3 - Complete - Task time: 1s
Process id 4 - Complete - Task time: 5s
17 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


#### This job happened one at a time. In full it took about 17 seconds and the tasks get done in order in which they were started.

In [11]:
%%timeit -r1 -n1
# Use multiple processes to do all 5 concurently
processes = []
for i, task_time in zip(range(5), fake_runtimes):
    p = Process(target=long_running_process, args=(i, task_time))
    p.start()
    processes.append(p)

for p in processes:
    p.join()

Process id 3 - Complete - Task time: 1s
Process id 1 - Complete - Task time: 2s
Process id 2 - Complete - Task time: 4s
Process id 0 - Complete - Task time: 5s
Process id 4 - Complete - Task time: 5s
5.04 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


#### This job happened all simultaneously in about 5 seconds. The tasks completed in a different order that which they were started. The shortest jobs completes first, and the longest completes last.

## Using a messaging queue with multiple workers

In [12]:
# Set up a message queue for all workers to listen from
task_queue = Queue()

In [13]:
# Event which can be used to control all workers, even if you don't maintain a reference to them
event = Event()

In [14]:
# Define a worker to process jobs from the queue
def worker_func(queue: Queue, worker_id: int):
    print("Working {} starting".format(worker_id))
    while event.is_set():
        try:
            task = queue.get(timeout=2)
            task_id, task_time = task
            time.sleep(task_time)
            result = "worker id: {} task id: {} task time: {}s".format(worker_id, task_id, task_time)
            print(result)
        except Empty:
            continue
            
    else:
        print("Worker {} Shutting down...".format(worker_id))

In [15]:
# Start 3 workers
event.set()
workers = []
for i in range(3):
    w = Process(target=worker_func, args=(task_queue, i), daemon=True)
    w.start()
    workers.append(w)

Working 0 starting
Working 1 starting
Working 2 starting
worker id: 0 task id: 0 task time: 1s
worker id: 2 task id: 2 task time: 1s
worker id: 0 task id: 3 task time: 2s
worker id: 1 task id: 1 task time: 4s
worker id: 1 task id: 6 task time: 1s
worker id: 2 task id: 4 task time: 4s
worker id: 2 task id: 8 task time: 2s
worker id: 0 task id: 5 task time: 5s
worker id: 0 task id: 10 task time: 1s
worker id: 1 task id: 7 task time: 4s
worker id: 2 task id: 9 task time: 3s
worker id: 0 task id: 11 task time: 2s
worker id: 2 task id: 13 task time: 1s
worker id: 1 task id: 12 task time: 5s
worker id: 0 task id: 14 task time: 4s
worker id: 2 task id: 100 task time: 3s
worker id: 1 task id: 102 task time: 4s
worker id: 0 task id: 101 task time: 6s
worker id: 2 task id: 103 task time: 6s
worker id: 1 task id: 104 task time: 6s
worker id: 0 task id: 105 task time: 5s
worker id: 1 task id: 107 task time: 2s
worker id: 0 task id: 108 task time: 3s
worker id: 2 task id: 106 task time: 6s
worker i

In [16]:
# Now we have workers listening for jobs in the background. Lets give them something to do.
for i in range(15):
    task = (i, random.randint(1,5))
    task_queue.put(task)

In [17]:
for i in range(100,111):
    task = (i, random.randint(2,6))
    task_queue.put(task)

In [18]:
event.clear()