Scalable Machine Learning in Python 
===================
with Scikit-Learn and Dask 
===============
## 2 - Dask Task Graphs
**May 2017**

<a href=http://dask.pydata.org ><img src=https://www.continuum.io/sites/default/files/dask_stacked.png
 width=200 />
</a>

[http://bit.ly/scaleml-dask-wkshp](http://bit.ly/scaleml-dask-wkshp)


### We have a strong analytics ecosystem (NumPy, Pandas)

### that is mostly restricted to a single core and RAM

How do we parallelize an ecosystem?

of thousands of packages

each with custom algorithms

### Sckit-Image: general image analysis

    skimage.feature.canny(im, sigma=3)

<img src="http://scikit-image.org/docs/dev/_images/sphx_glr_plot_canny_001.png"
     alt="Canny edge detection from skimage"
     width="50%">


### Scikit-Allel: Specialized genomics

<img src="http://alimanfoo.github.io/assets/2016-06-10-scikit-allel-tour_files/2016-06-10-scikit-allel-tour_50_0.png" alt="scikit-allel example" width="50%" align="center">

### Need a parallel computing library

... that is flexible enough

... and familiar enough

... to parallelize a disparate ecosystem

Outline
-------

-  Parallel NumPy and Pandas
-  Parallel code generally
-  Task Graphs and Task Scheduling
    -   Compare with other systems (Spark, Airflow)
    -   Dask's task schedulers
-  Python APIs and Protocols
-  Python Ecosystem and strengths for parallel computing

# Distributed Numpy  `dask.array`

<img src="images/dask-array-black-text.svg" width="60%">

In [None]:
%%time
# NumPy code
import numpy as np
x = np.random.random((2000, 500)) # 2k rows
u, s, v = np.linalg.svd(x.dot(x.T))

In [None]:
# Dask.array code (don't execute this!)
import dask.array as da
x = da.random.random((1000000, 500), chunks=(500, 500)) # 500x to 1M rows
u, s, v = da.linalg.svd(x.dot(x.T))

## `dask.dataframe`

<img src="images/dask-dataframe.svg" width="30%">

In [None]:
import pandas as pd
df = pd.read_csv('data/minute/aig/2010-01-04.csv', parse_dates=['timestamp'])
df.groupby(df.timestamp.dt.hour).close.mean()

In [None]:
import dask.dataframe as dd
df = dd.read_csv('data/minute/aig/2010-01-*.csv', parse_dates=['timestamp']) # operate on many files
df.groupby(df.timestamp.dt.hour).close.mean().compute()

In [None]:
# or generalize to distributed data sets, e.g. using HDFS
import dask.dataframe as dd
df = dd.read_csv('hdfs://myfiles.*.csv', parse_dates=['timestamp'])
df.groupby(df.timestamp.dt.hour).value.mean().compute()

## Distributed data structure, parallel algorithms

In [None]:
df.groupby(df.timestamp.dt.hour).close.mean().visualize()

# But many problems aren't just big arrays and dataframes

The Python community writes clever algorithms

Fine Grained Python Code:

Cartesian product $R = A \times B$ where $r_{i,j}$ is a function of $i$ and $j$

In [None]:
%%time
results = {}

for a in A:
    for b in B:
        if a < b:
            results[a, b] = f(a, b)
        else:
            results[a, b] = g(a, b)

## Parallelizable, but not a list, dataframe, or array

In [None]:
%%time
from dask import delayed, compute

results = {}

for a in A:
    for b in B:
        if a < b:
            results[a, b] = delayed(f)(a, b)  # lazily construct graph
        else:
            results[a, b] = delayed(g)(a, b)  # without structure

In [None]:
%%time
final = compute(results)  # trigger all computation

## `concurrent.futures.ThreadPoolExecutor`

In [None]:
%%time
from concurrent.futures import ThreadPoolExecutor 

e = ThreadPoolExecutor()

results = {}

for a in A:
    for b in B:
        if a < b:
            results[a, b] = e.submit(f, a, b)  # submit work asynchronously
        else:
            results[a, b] = e.submit(g, a, b)  # submit work asynchronously

In [None]:
%%time
results = {k: v.result() for k, v in results.items()} # block until finished

# Dask APIs Produce Task Graphs

---
# Dask Schedulers Execute Task Graphs

## Exercise 2.1 `dask.delayed`

Implement the Cartesian cross product above using Numpy `array` objects for `A` and `B`, both with and without `dask.delayed` wrappers, and compare the performance.  Template code is provided below.

5 minutes

**BONUS:** Experiment with the `concurrent.futures.ThreadPoolExecutor` version

In [1]:
from dask.distributed import Client
client = Client()

In [None]:
client

In [6]:
import numpy as np
from   time import sleep
A = np.random.randint(low=1, high=10, size=(50,))  # [1, 9]
B = np.random.randint(low=4, high=13, size=(25,))   # [4,12]

# the point is that f() and g() may perform some non-trivial operation
# but that each f() and g() is independent from every other invocation

def f(s, t):
    sleep(0.02)  # simulate computational complexity
    return s - t

def g(s, t):
    sleep(0.03)  # simulate computational complexity
    return s * t

## Predict Execution Time for Serial Compute

How long would you expect this to take on your computer? (rough estimate)

**Challenge:** Run the cell below and see if you can guess before it completes.

Remember that it isn't vectorized and there is no automatic parallelism.

In [None]:
%%time
results = {}

for a in A:
    for b in B:
        if a < b:
            results[a, b] = f(a, b)
        else:
            results[a, b] = g(a, b)

In [None]:
list(results.items())[:40:3]

In [None]:
# How many results do you expect there to be?
len(results)

## Predict Execution Time for Perfect Parallelism

Imagine every `f()` and `g()` were 100% independent, with no communication overhead.

How long would you expect it to take on your computer?

In [7]:
%%time
from dask import delayed, compute

results = {}

for a in A:
    for b in B:
        if a < b:
            results[a, b] = delayed(f)(a, b)  # lazily construct graph
        else:
            results[a, b] = delayed(g)(a, b)  # without structure

CPU times: user 48.7 ms, sys: 7.36 ms, total: 56 ms
Wall time: 54.4 ms


In [None]:
list(results.items())[:40:3]

In [4]:
its      = 1    # number of iterations len(A) * len(B)
cpus     = 1    # check your system stats
exectime = 0.01 # whatever the per-function delay is.

estimate = its * exectime / cpus
print(estimate)

0.01


In [8]:
%%time
final = compute(results)

CPU times: user 559 ms, sys: 169 ms, total: 727 ms
Wall time: 21.1 s


In [None]:
list(final[0].items())[:40:3]

# Dask functions and data structures use `delayed()` internally

## 1D-Array

<img src="images/array-1d.svg">

    >>> np.ones((15,))
    array([ 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.])

    >>> x = da.ones((15,), chunks=(5,))

### 1D-Array

<img src="images/array-1d-sum.svg" width="30%">

In [None]:
import numpy      as np
import dask.array as da

In [None]:
x = da.ones((15,), chunks=(5,))
x.sum()

In [None]:
x.sum().visualize()

## ND-Array - Sum

<img src="images/array-sum.svg">

In [None]:
x = da.ones((15, 15), chunks=(5, 5))
x.sum(axis=0)

In [None]:
x.sum(axis=0).visualize()

In [None]:
x.sum(axis=0).compute()

### ND-Array - Transpose

<img src="images/array-xxT.svg">

In [None]:
x = da.random.randint(low=1, high=10, size=(15, 15), chunks=(5, 5))
y = x + x.T

In [None]:
y.visualize()

In [None]:
y

In [None]:
y.compute()

### ND-Array - Matrix Multiply

<img src="images/array-xdotxT.svg">

In [None]:
x = da.random.randint(low=1, high=10, size=(15, 15), chunks=(5, 5))
y = x.dot(x.T + 1)

In [None]:
y

In [None]:
y.visualize()

In [None]:
y.compute()

## ND-Array - Compound Operations

<img src="images/array-xdotxT-mean.svg">

In [None]:
x = da.random.randint(low=1, high=10, size=(15, 15), chunks=(5, 5))
y = x.dot(x.T + 1) - x.mean()

In [None]:
y

In [None]:
y.visualize()

In [None]:
y.compute()

## ND-Array - Compound Operations

<img src="images/array-xdotxT-mean-std.svg">

In [None]:
x = da.ones((15, 15), chunks=(5, 5))
y = (x.dot(x.T + 1) - x.mean()).std()

In [None]:
y

In [None]:
y.visualize()

In [None]:
y.compute() # I don't understand the result here, but linear algebra is not the main point ...

# Iterative Task Graph Construction

![xT](images/array-xxT.svg)

```python
x + x.T
```

![xT](images/array-xdotxT.svg)

```python
x.dot(x.T + 1)
```

![xT](images/array-xdotxT-mean.svg)

```python
x.dot(x.T + 1) - x.mean()
```

![xT](images/array-xdotxT-mean-std.svg)

```python
(x.dot(x.T + 1) - x.mean()).std()
```

# Dask APIs Produce Task Graphs

<hr>

# Dask Schedulers Execute Task Graphs

# Exercise 2.2 `dask.delayed` wrapper

In [None]:
from time import sleep

def inc(x):
    sleep(1)
    return x + 1

def add(x, y):
    sleep(1)
    return x + y

* How many functions are called?
* How long does each one take?
* How long will the entire sequence take?
* Can you spot any opportunities for parallelism?

In [None]:
%%time
x = inc(1)
y = inc(2)
z = add(x, y)

### Parallelize with dask.delayed decorator

Those two increment calls *could* be called in parallel.

**ACTION:** Wrap `inc()` and `add()` with `dask.delayed`.  Then invoke the same sequence but using the wrapped versions of the functions.

This changes those functions so that they don't run immediately, but instead put those functions and arguments into a task graph.  Now when we run our code this runs immediately, but all it does it create a graph.  We then separately compute the result by calling the `.compute()` method.

In [None]:
# wrap each function

In [None]:
# invoke the same sequence for x, y, z using the wrapped functions

In [None]:
# inspect your result z -- how do you get the actual output?

In [None]:
# visualize the task graph (if graphviz is working for you)

### What just happened?

The `z` object is a lazy `dask.Delayed` object.  This object holds everything we need to compute the final result.  We can compute the result with `.compute()` as above or we can visualize the result with `.visualize()`.

### Some questions to consider:

-  Why did we go from 3s to 2s?  Why weren't we able to parallelize down to 1s?
-  What would have happened if the inc and add functions didn't include the `sleep(1)`?  Would Dask still be able to speed up this code?
-  What if we have multiple outputs or also want to get access to x or y?

## Exercise 2.3 Parallelize a `for` loop

For loops are one of the most common things that we want to parallelize.  Use dask.delayed on `inc` and `sum` to parallelize the computation below:

In [None]:
data = [1, 2, 3, 4, 5, 6, 7, 8]

In [None]:
%%time
# Sequential code

results = []
for x in data:
    y = inc(x)
    results.append(y)
    
total = sum(results)

In [None]:
total

In [None]:
%%time
# Parallel code

results = []
for x in data:
    # TODO

In [None]:
total

### Parallelizing for-loop code with control flow

Often we want to delay only *some* functions, running a few of them immediately.  This is especially helpful when those functions are fast and help us to determine what other slower functions we should call.  This decision, to delay or not to delay, is usually where we need to be thoughtful when using dask.delayed.

In the example below we iterate through a list of inputs.  If that input is even then we want to call `inc`.  If the input is odd then we want to call `double`.  This `iseven` decision to call `inc` or `double` has to be made immediately (not lazily) in order for our graph-building Python code to proceed.

In [None]:
def double(x):
    sleep(1)
    return 2 * x

def iseven(x):
    return x % 2 == 0

data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

In [None]:
%%time
# Sequential code

results = []
for x in data:
    if iseven(x):
        y = double(x)
    else:
        y = inc(x)
    results.append(y)
    
total = sum(results)
total

In [None]:
%%time
# Parallel code
# TODO: parallelize the sequential code above using dask.delayed
# You will need to delay some functions, but not all

In [None]:
total.visualize()

In [None]:
%time total.compute()

In [None]:
### Solutions

In [None]:
%load solutions/02-delayed-inc-double.py

### Some questions to consider:

-  What are other examples of control flow where we can't use delayed?
-  What would have happened if we had delayed the evaluation of `iseven(x)` in the example above?
-  What are your thoughts on delaying `sum`?  This function was both computational but also very fast to run.

## Exercise 2.4 Pandas and Dask

In this exercise we read several CSV files and perform a groupby operation in parallel.  We are given sequential code to do this and parallelize it with Dask.delayed.

The computation we will parallelize is to compute the daily high-low spread of a stock over time.  We will do this by using dask.delayed together with Pandas.  In a future section we will do this same exercise with dask.dataframes.

### Read one file with `pandas.read_csv` and compute spread

In [None]:
import pandas as pd
df = pd.read_csv('data/minute/ibm/2010-01-04.csv', 
                 parse_dates=['timestamp'], 
                 index_col='timestamp')
df.head()

In [None]:
spread = df.high.max() - df.low.min()
spread

### Sequential code: spread over time

This code performs the spread computation on every day of data using a sequential for loop.

In [None]:
from glob import glob
filenames = sorted(glob('data/minute/ibm/*.csv'))

In [None]:
%%time

spreads = []
days = []
for fn in filenames:
    df = pd.read_csv(fn, parse_dates=['timestamp'], index_col='timestamp')
    spread = df.high.max() - df.low.min()
    day = df.index[0].round('1d')
    
    spreads.append(spread)
    days.append(day)

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline

plt.figure(figsize=(10, 5))
plt.plot(days, spreads)

### Parallelize the code above

Use `dask.delayed` to parallelize the code above.  Some extra things you will need to know.

1.  Methods and attribute access on delayed objects work automatically, so if you have a delayed object you can perform normal arithmetic, slicing, and method calls on it and it will produce the correct delayed calls.

    ```python
    x = delayed(np.arange)(10)
    y = (x + 1)[::2].sum()  # everything here was delayed
    ```
2.  Calling the `.compute()` method works well when you have a single output.  When you have multiple outputs you might want to use the `dask.compute` function:

    ```python
    >>> x = dask.delayed(np.arange)(10)
    >>> y = x ** 2
    >>> min, max = dask.compute(y.min(), y.max())
    (0, 81)
    ```
    
    This way dask can share the intermediate values (like `y = x**2`)
    
So your goal is to parallelize the code above (which has been copied below) using Dask.delayed.  You may also want to visualize a bit of the computation to see if you're doing it correctly.

*Note: performance will improve a little bit, but not a whole lot.  We'll discuss why afterwards*

In [None]:
%%time

spreads = []
days = []
for fn in filenames:
    ...
    
spreads, days = dask.compute(spreads, days)

### Solutions

In [None]:
%load solutions/02-delayed-pandas.py

Now use `dask.dataframe.read_csv()` to perform the same operations