# Dask

Dask is a flexible library for parallel computing in Python.

Dask is composed of two parts:
1. Dynamic task scheduling optimized for computation.
2. “Big Data” collections like parallel arrays, dataframes, and lists.

![Dask overview](https://docs.dask.org/en/latest/_images/dask-overview.svg)  
Image source: Dask documentation.

# Task graphs

Suppose we want to perform a simple computation: increment numbers 1 and 2 and add up results.
$$(1+1) + (2+1)$$

To make execution times more visible, assume that every operation takes a second.

In [None]:
from time import sleep

def inc(x):
    sleep(1)
    return x + 1

def add(x, y):
    sleep(1)
    return x + y

Sequential execution of the 3 operations will take 3 seconds.

In [None]:
%%time
x = inc(1)
y = inc(2)
z = add(x, y)
z

Notice that first two operations (`inc(1)` and `inc(2)`) are independent and can be executed in parallel, while the last operation depends on the results of the first two.

If we wrap our funcitons in `dask.delayed` and then call them, instead of computation we will get a `Delayed` object, which is **task graph**. Execution is instant, because no actual computation is performed at this stage.

In [None]:
from dask import delayed

delayed_inc = delayed(inc)
delayed_add = delayed(add)

In [None]:
%%time
x = delayed_inc(1)
y = delayed_inc(2)
z = delayed_add(x, y)

In [None]:
z

In [None]:
z.visualize()

We call `Delayed.compute()` method to execute the task graph and obtain result. How long do you think it would take?

In [None]:
%%time
z.compute()

## Download and unzip census tracts shapefiles

`dask.delayed` can be used as *decorator*. This is just a more compact way to write wrapping code.

```python
@delayed
def fun():
   return 1
```
is equivalent to
```python
def fun():
   return 1
fun = delayed(fun)
```

Here we repeat the process of downloading and unzipping census state tracts, but this time we delegate parallelization and scheduling to dask.

In [None]:
from tools import download_file, unzip, tracts_state_00_aa
from dask import compute

@delayed
def download_state_tracts(state_code):
    url = f'https://www2.census.gov/geo/tiger/GENZ2019/shp/cb_2019_{state_code}_tract_500k.zip'
    f = download_file(url, f'data/tracts/{state_code}', overwrite=True, verbose=False)
    return state_code, f

@delayed
def unzip_state_tracts(state_code_and_filepath):
    state_code, filepath = state_code_and_filepath
    unzip(filepath, f'data/tracts/{state_code}', overwrite=True, verbose=False)

In [None]:
%%time
plan = []
for sc in tracts_state_00_aa.keys():
    code_and_path = download_state_tracts(sc)
    z = unzip_state_tracts(code_and_path)
    plan.append(z)
_ = compute(plan)

# Dask arrays

Have the same interface as NumPy arrays, but are internally represented as multiple chunks. Computations on chunks are turned into a task graph and performed in parallel where possible.

NumPy is already using very performant low level code for linear algebra, but all data needs to be in memory. Dask relaxes this constraint.

![Dask array](https://docs.dask.org/en/latest/_images/dask-array-black-text.svg)  
Image source: Dask documentation.

Suppose we want to estimate a linear regression model using ordinary least squares ([wikipedia](https://en.wikipedia.org/wiki/Ordinary_least_squares)).

The model is given by the following eqation.
$$y = \beta_1 x_1 + \beta_2 x_2 + ... + \beta_k x_k + e$$

In matrix notation: 
$$y = X \beta + e$$
where $y$ and $e$ are vectors of length $n$, $X$ is $n$-by-$k$ matrix, and $\beta$ is vector of length $k$.

OLS estimates $\hat{\beta}$ can be calculated as
$$\hat{\beta} = (X'X)^{-1}X'y$$

First let's generate some random data with $\beta = [1, 2, ..., k]$ and store it to disk.

In [None]:
from pathlib import Path
import numpy as np
import dask.array as da

np.random.seed(0)

def generate_data(n, k):
    b = np.arange(1, k+1)
    x = np.random.rand(n, k)
    y = x.dot(b) + np.random.rand(n)

    Path('data/arr').mkdir(parents=True, exist_ok=True)

    # single files for numpy
    np.save('data/arr/x.npy', x)
    np.save('data/arr/y.npy', y)

    # directories for dask arrays
    da.to_npy_stack('data/arr/x/', da.from_array(x))
    da.to_npy_stack('data/arr/y/', da.from_array(y))

generate_data(1_000_000, 100)

Estimate OLS with NumPy.

In [None]:
import numpy as np

def ols_numpy():
    x = np.load('data/arr/x.npy')
    y = np.load('data/arr/y.npy')
    xpxi = np.linalg.inv(x.T.dot(x))
    xpy = x.T.dot(y)
    b_hat = xpxi.dot(xpy)
    return b_hat

In [None]:
%%time
bh = ols_numpy()
bh[:5]

Delayed version is created if we replace `numpy.array` with `dask.array`.

In [None]:
import dask.array as da

def ols_dask():
    x = da.from_npy_stack('data/arr/x')
    y = da.from_npy_stack('data/arr/y')
    xpxi = da.linalg.inv(x.T.dot(x))
    xpy = x.T.dot(y)
    b_hat = xpxi.dot(xpy)
    return b_hat

plan = ols_dask()
plan

In [None]:
%%time
bh = plan.compute()
bh[:5]

Under the hood, the task graph is rather complicated.

In [None]:
plan.visualize('tasks.pdf')

Let's compare resource usage.

In [None]:
# restart kernel and redefine functions to get cleaner results
from time import sleep
from tools import ResourceMonitor

mon = ResourceMonitor(interval=0.1)
mon.start()
sleep(1)
mon.tag('numpy V')
b_numpy = ols_numpy()
mon.tag('numpy ^')
sleep(1)
mon.tag('dask V')
b_dask = ols_dask().compute()
mon.tag('dask ^')
sleep(1)
mon.stop()

assert np.allclose(b_numpy, b_dask)

mon.plot()

Dask used more CPU and less memory.

# Dask dataframes

Similarly to arrays, dask dataframes are chunked for parallel processing.

## Example revisited: employment by year

In [None]:
import dask.dataframe as dd
from tools import ResourceMonitor
from time import sleep

df = dd.read_csv('data/synig/*.csv', usecols=['YEAR', 'EMPLOYEES'])
res = df.groupby('YEAR')['EMPLOYEES'].agg(['size', 'sum', 'mean'])

In [None]:
res.visualize('tasks.pdf')

In [None]:
mon = ResourceMonitor(interval=0.3)
mon.start()
sleep(0.5)
r = res.compute()
sleep(0.5)
mon.stop()
mon.plot()

## Example revisited: size vs age

In [None]:
df = dd.read_csv('data/synig/*.csv', usecols=['YEAR', 'ABI', 'EMPLOYEES'])
fy = df.groupby('ABI')['YEAR'].min().to_frame('FIRST_YEAR')
df = df.merge(fy, 'left', 'ABI')
df['AGE'] = df['YEAR'] - df['FIRST_YEAR']
res = df.groupby('AGE')['EMPLOYEES'].mean()

In [None]:
res.visualize('tasks.pdf')

In [None]:
mon = ResourceMonitor()
mon.start()
sleep(0.5)
r = res.compute()
sleep(0.5)
mon.stop()
mon.plot()

# Memory trade-offs

|                 | in memory | split-apply-combine | dask |
|-----------------|:--------:|:--------:|:--------:|
| Code complexity |    low   |  medium/high  |   low-high   |
| Running time    |   fast   |  medium  |   medium   |
| Memory usage    |   high   |  varies  |    varies   |


Dask is a powerful tool that is easy to start with, because it mimics interface of `numpy` and `pandas`. But here are some things to keep in mind.
- You will need to learn how Dask works to approach more difficult computations.
- Some operations in Dask are slow. Some are not available (eg. `df.sort_values()` remains unimplemented for [5 years](https://github.com/dask/dask/issues/958)). Some work slightly differently than in pandas (`df.drop_duplicates(keep='first')`).
- Debugging of distributed algorithms is more difficult.
- Chunk size and number of workers need to be tuned to your hardware.

# Learn more

[Dask homepage](https://dask.org/)  
[Docs](https://docs.dask.org/en/latest/)  
An excellent interactive tutorial from Dask creators:
[How to learn Dask in 2021](https://coiled.io/blog/how-to-learn-dask-in-2021)