<h1 style='font-size:60px'>
    Parallelisation in Python
</h1>

<font size='5'> Henry Wilde | 
<i class='fa fa-github' aria-hidden='false'></i>
<i class='fa fa-twitter' aria-hidden='false'></i> @daffidwilde </font>
<hr>

# Classically used for:

<br>
<img src="img/simple.pdf" width="700">
<br><br>

# Can be used for:

<br>
<img src="img/complex.pdf" width="800">
<br><br>

---

# Three approaches:

- First principles with `multiprocessing`
- The "standard" approach with `multiprocessing`
- A new approach with `dask` (documentation: [docs.dask.org](http://docs.dask.org/en/latest/))

In [None]:
import ciw


def run_simulation(num_servers, seed):
    """
    Simulate a simple M|M|c queue for 10,000 time units and find the mean waiting time.
    """

    ciw.seed(seed)

    N = ciw.create_network(
        Arrival_distributions=[["Exponential", 0.2]],
        Service_distributions=[["Exponential", 0.1]],
        Number_of_servers=[num_servers],
    )

    Q = ciw.Simulation(N)
    Q.simulate_until_max_time(10000)

    recs = Q.get_all_records()
    waits = [rec.waiting_time for rec in recs]

    return num_servers, sum(waits) / len(waits)


In [None]:
run_simulation(2, 0)


---

# Doing things sequentially

In [None]:
from collections import defaultdict
import itertools

numbers_of_servers = [1, 2, 3]
trials = range(100)


In [None]:
%%time
results = defaultdict(list)
for servers, trial in itertools.product(numbers_of_servers, trials):
    servers, time = run_simulation(servers, trial)
    results[servers].append(time)

In [None]:
results


---

# First principles (queues, queues, queues)

![In and out trays](img/punch-in-out.jpg)
Image via: Punch magazine

In [None]:
def put_down_tasks(work_queue, numbers_of_servers, trials):
    """ Put each of the task's arguments into the work queue. """

    for servers, trial in itertools.product(numbers_of_servers, trials):
        work_queue.put((servers, trial))

    print("Tasks sent out.")


In [None]:
def worker(work_queue, done_queue):
    """ Tell the worker to grab a job, execute it and then stop. """

    for servers, trial in iter(work_queue.get, "STOP"):
        result = run_simulation(servers, trial)
        done_queue.put(result)

    done_queue.put("STOP")


In [None]:
def start_workers(work_queue, done_queue, workers):
    """ Set the workers going as their own process. """

    for _ in range(workers):
        process = mp.Process(target=worker, args=(work_queue, done_queue))
        work_queue.put("STOP")
        process.start()

    print("Workers started.")


In [None]:
def process_done_queue(done_queue, workers):
    """ Retrieve the average waiting times. """

    stops = 0
    results = defaultdict(list)
    while stops < workers:
        result = done_queue.get()
        if result == "STOP":
            stops += 1
        else:
            servers, time = result
            results[servers].append(time)

    return results


---

# Running the trials

In [None]:
import multiprocessing as mp

numbers_of_servers = [1, 2, 3]
trials = range(100)
workers = mp.cpu_count()


In [None]:
workers


In [None]:
def run_with_first_principles(numbers_of_servers, trials, workers):
    """ Run all of the trials using first principles. """

    work_queue, done_queue = mp.Queue(), mp.Queue()

    put_down_tasks(work_queue, numbers_of_servers, trials)
    start_workers(work_queue, done_queue, workers)

    results = process_done_queue(done_queue, workers)
    return results


In [None]:
%%time
results = run_with_first_principles(numbers_of_servers, trials, workers)

In [None]:
results


In [None]:
import pandas as pd

df = pd.DataFrame(results)


In [None]:
df.describe()


---

# Standard approach (letting `multiprocessing` do the work)

In [None]:
def run_with_starmap(args, num_cores=mp.cpu_count()):
    """ Run all of the trials using `multiprocessing.Pool.starmap`. """

    with mp.Pool(num_cores) as pool:
        raw_data = pool.starmap(run_simulation, args)

    results = defaultdict(list)
    for servers, time in raw_data:
        results[servers].append(time)

    return results


In [None]:
%%time
args = itertools.product(numbers_of_servers, trials)
results = run_with_starmap(args)

In [None]:
results


---

# Using `dask` to parallelise trials

In [None]:
import dask


def build_tasks(args):
    """ Build all of the trials as tasks using `dask`. """

    tasks = [dask.delayed(run_simulation)(*arg) for arg in args]

    return tasks


In [None]:
args = itertools.product(numbers_of_servers, trials)
build_tasks(args)


In [None]:
def compute_tasks(tasks):
    """ Execute the tasks and return results. """

    raw_data = dask.compute(*tasks, num_workers=mp.cpu_count(), scheduler="processes")

    results = defaultdict(list)
    for servers, time in raw_data:
        results[servers].append(time)

    return results


In [None]:
def run_with_dask(args):
    """ Run all of the trials using `dask`. """

    tasks = build_tasks(args)
    results = compute_tasks(tasks)

    return results


In [None]:
%%time
args = itertools.product(numbers_of_servers, trials)
results = run_with_dask(args)

---
# What `dask` really is...

<br>

![Dask anatomy](img/collections-schedulers.png)

<br>

---
# Custom parallelisation with `dask.delayed`

In [None]:
from time import sleep


def inc(x):
    sleep(0.5)
    return x + 1


def add(x, y):
    sleep(0.5)
    return x + y


def mul(x, y):
    sleep(0.5)
    return x * y


def exp(x, y):
    sleep(0.5)
    return x ** y


In [None]:
%%time
results = []
a = inc(1)
for i in range(5): 
    b = mul(i, a)
    c = add(a, b)
    d = exp(b, c)
    results.append(d)

total = sum(results)

In [None]:
total


In [None]:
@dask.delayed
def inc(x):
    sleep(0.5)
    return x + 1


@dask.delayed
def add(x, y):
    sleep(0.5)
    return x + y


@dask.delayed
def mul(x, y):
    sleep(0.5)
    return x * y


@dask.delayed
def exp(x, y):
    sleep(0.5)
    return x ** y


In [None]:
%%time
results = []
a = inc(1)
for i in range(5): 
    b = mul(i, a)
    c = add(a, b)
    d = exp(b, c)
    results.append(d)

total = sum(results)

In [None]:
total.visualize(rankdir="LR")


In [None]:
%%time
total.compute()

---

# `pandas`-esque data handling

NYC flight data accessed at: [kaggle.com/usdot/flight-delays/version/1](https://www.kaggle.com/usdot/flight-delays/version/1#flights.csv)

In [None]:
import dask.dataframe as dd

dtype = {
    "SCHEDULED_DEPARTURE": object,
    "DEPARTURE_TIME": object,
    "SCHEDULED_TIME": float,
    "SCHEDULED_ARRIVAL": object,
    "ARRIVAL_TIME": object,
}


In [None]:
ddf = dd.read_csv("data/flights.csv", dtype=dtype, low_memory=False)


In [None]:
nyc_codes = ["JFK", "LGA", "EWR"]
nyc_departures = ddf[ddf["ORIGIN_AIRPORT"].isin(nyc_codes)]


In [None]:
mean_delay = nyc_departures.groupby("ORIGIN_AIRPORT")["DEPARTURE_DELAY"].mean()


In [None]:
mean_delay.visualize()


In [None]:
mean_delay.compute(num_workers=8, scheduler="processes")


---

# Multi-dimensional arrays like `numpy`

In [None]:
import dask.array as da

da.random.seed(0)


In [None]:
array = da.random.normal(size=(1e4, 1e4))

mean, std = array.mean(), array.std()


In [None]:
dask.visualize(*[mean, std])


In [None]:
dask.compute([mean, std], num_workers=8, scheduler="processes")


In [None]:
A = da.random.normal(size=(1e4, 1e4))
B = da.random.normal(size=1e4)

C = A.dot(B).sum() / A.mean() * B.var()


In [None]:
C.visualize()
