# Multiprocessing with Python

Concurrency? Parallelism? Threading? This are the main conecpts we find everytime we want to make our code run faster. In this Notebook, we will discuss some of the concepts begind parallelism in Python, and how to try to tailormade parallel code for our specific problem. 

Let's start with some definitions first: 
 - Concurrency: In Python, concurrency describes the situation where things are running simulataneously. Several things can run at the same time, in Python `thread`, `task`, `process` are equal at a high-level.
 - Parallelism: We can run processes in only one core, but ideally, we would like to run several processes using all the available cores. Parallelism describes this situation. 
 - **[Advanced!]** Asynchronous: This is also known as cooperative multiprocessing, and describes the situation where a pool of processes is used to run several tasks. By definition, once a tasks finishes, it ping other tasks to start (that's why is cooperative). 
 
Not all processes are made equal: we can have different kind of data tasks that involve different needs. For instance, I can make several requests to an API, which involves waiting until the server reply to my first request to continue to the next one. This is known as *I/O bounded processes* (see Image). 

<img src="../imgs/io_bound.png" width="800" height="400">

Another common type of process is where the computation task takes most of the CPU available and the next task need until the precedent finish to continue. This is a *CPU bounded process* 

<img src="../imgs/cpu_bound.png" width="800" height="400">

Here we will focus on a _I/O bounded process_: retrieve data from an API or a URL


In [6]:
import requests
import time
import multiprocessing

### I/O Bounded: Non-concurrent

In [3]:
def download_site(url, session):
    with session.get(url) as response:
        print(f"Read {len(response.content)} from {url}")
    

def download_all_sites(sites):
    with requests.Session() as session:
        for url in sites:
            download_site(url, session)

sites = ["https://www.jython.org",
         "http://olympus.realpython.org/dice"] * 80
start_time = time.time()
download_all_sites(sites)
duration = time.time() - start_time
print(f"Downloaded {len(sites)} in {duration} seconds")

Read 19210 from https://www.jython.org
Read 275 from http://olympus.realpython.org/dice
Read 19210 from https://www.jython.org
Read 275 from http://olympus.realpython.org/dice
Read 19210 from https://www.jython.org
Read 275 from http://olympus.realpython.org/dice
Read 19210 from https://www.jython.org
Read 275 from http://olympus.realpython.org/dice
Read 19210 from https://www.jython.org
Read 275 from http://olympus.realpython.org/dice
Read 19210 from https://www.jython.org
Read 275 from http://olympus.realpython.org/dice
Read 19210 from https://www.jython.org
Read 275 from http://olympus.realpython.org/dice
Read 19210 from https://www.jython.org
Read 275 from http://olympus.realpython.org/dice
Read 19210 from https://www.jython.org
Read 275 from http://olympus.realpython.org/dice
Read 19210 from https://www.jython.org
Read 275 from http://olympus.realpython.org/dice
Read 19210 from https://www.jython.org
Read 275 from http://olympus.realpython.org/dice
Read 19210 from https://www.jyth

### I/O bounded: Threading 

In [4]:
import concurrent.futures
import requests
import threading
import time


thread_local = threading.local()


def get_session():
    if not hasattr(thread_local, "session"):
        thread_local.session = requests.Session()
    return thread_local.session


def download_site(url):
    session = get_session()
    with session.get(url) as response:
        print(f"Read {len(response.content)} from {url}")


def download_all_sites(sites):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        executor.map(download_site, sites)

sites = [
    "https://www.jython.org",
    "http://olympus.realpython.org/dice",
] * 80
start_time = time.time()
download_all_sites(sites)
duration = time.time() - start_time
print(f"Downloaded {len(sites)} in {duration} seconds")

Read 275 from http://olympus.realpython.org/dice
Read 275 from http://olympus.realpython.org/dice
Read 275 from http://olympus.realpython.org/dice
Read 275 from http://olympus.realpython.org/dice
Read 19210 from https://www.jython.org
Read 19210 from https://www.jython.org
Read 19210 from https://www.jython.org
Read 19210 from https://www.jython.org
Read 275 from http://olympus.realpython.org/dice
Read 275 from http://olympus.realpython.org/dice
Read 19210 from https://www.jython.org
Read 19210 from https://www.jython.org
Read 275 from http://olympus.realpython.org/dice
Read 19210 from https://www.jython.org
Read 275 from http://olympus.realpython.org/dice
Read 19210 from https://www.jython.org
Read 275 from http://olympus.realpython.org/dice
Read 19210 from https://www.jython.org
Read 275 from http://olympus.realpython.org/dice
Read 19210 from https://www.jython.org
Read 275 from http://olympus.realpython.org/dice
Read 275 from http://olympus.realpython.org/diceRead 19210 from https:/

### I/O Bounded: Multiprocessing

In [8]:
session = None


def set_global_session():
    global session
    if not session:
        session = requests.Session()


def download_site(url):
    with session.get(url) as response:
        name = multiprocessing.current_process().name
        print(f"{name}:Read {len(response.content)} from {url}")


def download_all_sites(sites):
    with multiprocessing.Pool(8, initializer=set_global_session) as pool:
        pool.map(download_site, sites)


sites = [
    "https://www.jython.org",
    "http://olympus.realpython.org/dice",
] * 80
start_time = time.time()
download_all_sites(sites)
duration = time.time() - start_time
print(f"Downloaded {len(sites)} in {duration} seconds")

ForkPoolWorker-12:Read 275 from http://olympus.realpython.org/dice
ForkPoolWorker-8:Read 275 from http://olympus.realpython.org/dice
ForkPoolWorker-6:Read 275 from http://olympus.realpython.org/dice
ForkPoolWorker-10:Read 275 from http://olympus.realpython.org/dice
ForkPoolWorker-5:Read 19210 from https://www.jython.org
ForkPoolWorker-6:Read 19210 from https://www.jython.org
ForkPoolWorker-9:Read 19210 from https://www.jython.org
ForkPoolWorker-7:Read 19210 from https://www.jython.org
ForkPoolWorker-11:Read 19210 from https://www.jython.org
ForkPoolWorker-8:Read 19210 from https://www.jython.org
ForkPoolWorker-12:Read 19210 from https://www.jython.org
ForkPoolWorker-5:Read 275 from http://olympus.realpython.org/dice
ForkPoolWorker-6:Read 275 from http://olympus.realpython.org/dice
ForkPoolWorker-10:Read 19210 from https://www.jython.org
ForkPoolWorker-8:Read 275 from http://olympus.realpython.org/dice
ForkPoolWorker-12:Read 275 from http://olympus.realpython.org/dice
ForkPoolWorker-7:R

ForkPoolWorker-5:Read 19210 from https://www.jython.org
ForkPoolWorker-12:Read 275 from http://olympus.realpython.org/dice
ForkPoolWorker-11:Read 275 from http://olympus.realpython.org/dice
ForkPoolWorker-7:Read 19210 from https://www.jython.org
ForkPoolWorker-10:Read 19210 from https://www.jython.org
ForkPoolWorker-8:Read 19210 from https://www.jython.org
ForkPoolWorker-5:Read 275 from http://olympus.realpython.org/dice
ForkPoolWorker-9:Read 19210 from https://www.jython.org
ForkPoolWorker-6:Read 19210 from https://www.jython.org
ForkPoolWorker-7:Read 275 from http://olympus.realpython.org/dice
ForkPoolWorker-10:Read 275 from http://olympus.realpython.org/dice
ForkPoolWorker-9:Read 275 from http://olympus.realpython.org/dice
ForkPoolWorker-8:Read 275 from http://olympus.realpython.org/dice
ForkPoolWorker-6:Read 275 from http://olympus.realpython.org/dice
ForkPoolWorker-12:Read 19210 from https://www.jython.org
ForkPoolWorker-11:Read 19210 from https://www.jython.org
ForkPoolWorker-5:R