## Parallelize existing code with Dask.delayed

### Simplest possible example

In [None]:
from time import sleep

def inc(x):
    sleep(1)
    return x + 1

def add(x, y):
    sleep(1)
    return x + y

In [None]:
%%time
x = inc(1)
y = inc(2)
z = add(x, y)

### Parallelize with dask.delayed decorator

In [None]:
import dask

In [None]:
%%time
x = dask.delayed(inc)(1)
y = dask.delayed(inc)(2)
z = dask.delayed(add)(x, y)

In [None]:
%%time
z.compute()

### What just happened?

In [None]:
z

In [None]:
z.visualize(rankdir="LR")

### Some questions to consider:

-  Why did we go from 3s to 2s?  Why weren't we able to parallelize down to 1s?
-  What would have happened if the inc and add functions didn't include the `sleep(1)`?  Would Dask still be able to speed up this code?
-  What if we have multiple outputs or also want to get access to x or y?

### Parallelizing for-loop code with control flow

Now we look at a more complex problem where we want to delay some parts of the computation, but not others.  Deciding what to delay is the hard part of using dask.delayed.

We don't always want to wrap everything with `dask.delayed`.  In most cases we will mix immediate execution with delayed execution.  Generally we choose to immediately run (not delay) any code that decides what our computation will be (control flow), especially if it is fast, while we choose to delay functions that don't affect the shape of our computation, but do take a while to run.

In the example below we iterate through a list of inputs and, based on whether the input is even or odd we choose to run either `inc` or `double`.  We collect these results into a list and sum them together.  

-   Which parts of this computation decide what other computations we're going to run?  
    We should not wrap these in dask.delayed because we need to know the answer immediately.
-   Which parts of this computation can we delay?

In [None]:
def double(x):
    sleep(1)
    return 2 * x

def iseven(x):
    return x % 2 == 0

data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

In [None]:
%%time

results = []
for x in data:
    if iseven(x):
        y = double(x)
    else:
        y = inc(x)
    results.append(y)
    
total = sum(results)
total

In [None]:
%%time

results = []
for x in data:
    if iseven(x):  # even
        y = dask.delayed(double)(x)
    else:          # odd
        y = dask.delayed(inc)(x)

    results.append(y)
    
total = dask.delayed(sum)(results)
total.compute()

In [None]:
total.visualize()

### Some questions to consider:

-  What are other examples of control flow where we can't use delayed?
-  What would have happened if we had delayed the evaluation of `iseven(x)` in the example above?
-  What are your thoughts on delaying `sum`?  This function was both computational but also very fast to run.

### Pandas exercise

In this exercise we read several CSV files and perform a groupby operation in parallel.  We are given sequential code to do this and parallelize it with Dask.delayed.

### Prep data

First, run this code to prep some data.  You don't need to understand this code.

In [None]:
import dask.dataframe as dd
from datetime import date, timedelta
import dask.multiprocessing
import os
import shutil

In [None]:
dirname = os.path.join('data', 'stocks')
if os.path.exists(dirname):
    shutil.rmtree(dirname)

os.mkdir(dirname)

def name_function(i):
    return str(date(2015, 1, 1) + timedelta(days=1) * i)

for symbol in ['GOOG', 'YHOO', 'AAPL', 'MSFT']:
    df = dd.demo.daily_stock(symbol, '2015', '2016', freq='1s')
    dirname = os.path.join('data', 'stocks', symbol)
    os.mkdir(dirname)
    df.to_csv(os.path.join('data', 'stocks', symbol, '*.csv'), 
              name_function=name_function, 
              get=dask.multiprocessing.get)

### Inspect data

In [None]:
os.listdir(os.path.join('data', 'stocks'))

In [None]:
os.listdir(os.path.join('data', 'stocks', 'GOOG'))

### Read one file with pandas.read_csv and compute spread

In [None]:
import pandas as pd
df = pd.read_csv(os.path.join('data', 'stocks', 'GOOG', '2015-01-01.csv'), 
                 parse_dates=['timestamp'], 
                 index_col='timestamp')
df.head()

In [None]:
spread = df.high.max() - df.low.min()
spread

### Plot spread over time

In [None]:
%%time

from glob import glob
filenames = sorted(glob(os.path.join('data', 'stocks', 'GOOG', '*.csv')))

spreads = []
days = []
for fn in filenames:
    df = pd.read_csv(fn, parse_dates=['timestamp'], index_col='timestamp')
    spread = df.high.max() - df.low.min()
    day = df.index[0].date()
    
    spreads.append(spread)
    days.append(day)

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
plt.figure(figsize=(10, 5))
plt.plot(days, spreads)

### Exercise: parallelize the code above

Use dask.delayed to parallelize the code above.  Some extra things you will need to know.

1.  Methods and attribute access on delayed objects work automatically, so if you have a delayed object you can perform normal arithmetic, slicing, and method calls on it and it will produce the correct delayed calls.

    ```python
    x = delayed(np.arange)(10)
    y = (x + 1)[::2].sum()  # everything here was delayed
    ```
2.  Calling the `.compute()` method works well when you have a single output.  When you have multiple outputs you might want to use the `dask.compute` function:

    ```python
    >>> x = dask.delayed(np.arange)(10)
    >>> y = x ** 2
    >>> min, max = dask.compute(y.min(), y.max())
    (0, 81)
    ```
    
    This way dask can share the intermediate values (like `y = x**2`)
    
So your goal is to parallelize the code above (which has been copied below) using Dask.delayed.  You may also want to visualize a bit of the computation to see if you're doing it correctly.

*Note: performance will improve a little bit, but not a whole lot.  We'll discuss why afterwards*

In [None]:
%%time

from glob import glob
filenames = sorted(glob(os.path.join('data', 'stocks', 'GOOG', '*.csv')))

spreads = []
days = []
for fn in filenames:
    df = dask.delayed(pd.read_csv)(fn, parse_dates=['timestamp'], index_col='timestamp')
    spread = df.high.max() - df.low.min()
    day = df.index[0].date()
    
    spreads.append(spread)
    days.append(day)
    
spreads, days = dask.compute(spreads, days)