<img src="http://dask.readthedocs.io/en/latest/_images/dask_horizontal.svg"
     align="right"
     width="30%"
     alt="Dask logo">
     
# Dask 

Dask is a parallelization library for Python that works on your laptop all the way to cluster-scale (ie. distributed multi-node)

Main focus on creating distributed array-like abstraction: Numpy- and Pandas-like behavior.

Stack:

- Array, bag, dataframe, delayed
- Graph spec
- Scheduler

Let's you focus on algorithms and not scheduling.

Tutorial: https://github.com/dask/dask-tutorial

See also some amazing new lectures/tutorials:

https://www.youtube.com/watch?v=5Md_sSsN51k&list=PLYx7XA2nY5Gf37zYZMw6OqGFRPjB1jCy6&index=17

and a shorter talk by Matt on Dask:

https://www.youtube.com/watch?v=PAGjm4BMKlk&list=PLYx7XA2nY5Gf37zYZMw6OqGFRPjB1jCy6&index=16

In [None]:
!brew install graphviz ## on a mac
#!apt-get install graphviz ## on linux
!pip install graphviz ## dont do this with conda, installs a Python 2 package...

## Dask Arrays

Distributed notion of an array. `Dask.array` translates your array operations into a graph of inter-related tasks with data dependencies between them. Dask then executes this graph in parallel with multiple threads. We'll discuss more about this in the next section.

Manipulate `dask.array` object as you would a numpy array

In [1]:
import dask.array as da
x = da.linspace(1,10,1000000,chunks=(1000,))

In [2]:
x.shape

(1000000,)

In [5]:
rez = x.sum()

In [6]:
rez.compute()

5500000.0000000177

In [8]:
import numpy as np
import dask.array as da

x = da.random.normal(10, 0.1, size=(20000, 20000),   # 400 million element array 
                              chunks=(1000, 1000))   # Cut into 1000x1000 sized chunks
y = x.mean(axis=0)[::100]                            # Perform NumPy-style operations

In [9]:
x.nbytes / 1e9  # Gigabytes of the input processed lazily

3.2

In [10]:
%%time
y.compute()     # Time to compute the result

CPU times: user 24.7 s, sys: 1.3 s, total: 26 s
Wall time: 7.32 s


array([  9.9993224 ,  10.00011346,   9.99990898,  10.00025268,
        10.00046464,  10.001231  ,   9.99970754,   9.99971393,
        10.00055728,   9.99951275,   9.99964315,   9.99893893,
         9.99841043,  10.00011724,   9.9996878 ,   9.99951731,
        10.00103906,   9.99930003,   9.99977866,   9.99997926,
         9.99980431,  10.00104784,  10.00113035,  10.00046829,
        10.00120736,   9.999138  ,   9.9992148 ,   9.99973366,
        10.0001576 ,  10.00011493,  10.0007531 ,   9.99999275,
        10.00095417,   9.99999942,   9.9998419 ,   9.99936708,
        10.00057115,   9.99907109,  10.00036589,   9.99959894,
        10.00013048,  10.00006243,   9.99958404,  10.00009009,
         9.99944773,   9.9997497 ,  10.00037451,   9.99994827,
         9.99849907,  10.00018933,   9.999454  ,   9.99872607,
         9.99962082,   9.99962116,   9.9994275 ,  10.00001785,
        10.00021481,  10.00098604,  10.00028491,   9.99946015,
        10.00017746,  10.00056023,   9.99992635,  10.00

In [11]:
import numpy as np

In [12]:
%%time 
x = np.random.normal(10, 0.1, size=(20000, 20000)) 
y = x.mean(axis=0)[::100] 
y

CPU times: user 17.8 s, sys: 1.06 s, total: 18.9 s
Wall time: 19.1 s


In [14]:
del x, y

In [13]:
%%time
x = da.random.normal(10, 0.1, size=(20000, 20000), chunks=(1000, 1000))
y = x.mean(axis=0)[::100] 
y.compute()

CPU times: user 25.4 s, sys: 1.67 s, total: 27 s
Wall time: 7.7 s


## Dask Dataframes

meant to mimick most of pandas dataframes, but now these dataframes can be out of core.

In [15]:
!ls -lah ../02_Plotting_and_Viz/data/uber-raw-data-apr14.csv

ls: ../02_Plotting_and_Viz/data/uber-raw-data-apr14.csv: No such file or directory


In [16]:
%%time
import pandas as pd
df = pd.read_csv("../02_Plotting_and_Viz/data/uber-raw-data-apr14.csv")

OSError: File b'../02_Plotting_and_Viz/data/uber-raw-data-apr14.csv' does not exist

In [None]:
%%time
import dask.dataframe as dd
df = dd.read_csv("../02_Plotting_and_Viz/data/uber-raw-data-apr14.csv")

Some of the reading in is delayed, but we can still inspect the data.

In [None]:
df.head()

Other operations are delayed until you compute them:

In [None]:
df.describe()

In [None]:
df.describe().compute()

In [None]:
df.describe()['Lat'].compute()

`dask.delayed` (a la joblib):
 
   - `delayed(function)(*args, **kwargs)` -> lazy function that hasn't yet been evaluated
   - `delated(data)` -> lazy object that pretends to be your data
 
 See the excellent talk at SciPy 2016: https://www.youtube.com/watch?v=PAGjm4BMKlk&list=PLYx7XA2nY5Gf37zYZMw6OqGFRPjB1jCy6

In [17]:
# get a local Exector
from distributed import Executor
Executor(set_as_default=True)

<Client: scheduler="127.0.0.1:8786" processes=4 cores=4>

In [18]:
import random
from dask import delayed, visualize
from time import sleep

@delayed(pure=True)
def add(a,b):
    sleep(random.random())
    return a+b

@delayed(pure=True)
def mul(a,b):
    sleep(random.random())
    return a*b

@delayed(pure=True)
def inc(a):
    sleep(random.random())
    return a + 1

In [None]:
x = add(1,2)
x

In [None]:
x.compute()

In [None]:
a = inc(1)
b = mul(1,2)
c = add(a,b)
c

In [21]:
c.visualize(rankdir="LR")

RuntimeError: Drawing dask graphs requires the `graphviz` python library and the `graphviz` system library to be installed.

In [22]:
c.compute()

5

### Loops

In [23]:
results = []
for x in range(4):
    a = inc(1)
    b = mul(1,x)
    c = add(a,b)
    results.append(c)

total = delayed(sum,pure=True)(results)
total

Delayed('sum-68ad50d381a097128af18270a7aee321')

In [20]:
total.visualize(rankdir="LR")

RuntimeError: Drawing dask graphs requires the `graphviz` python library and the `graphviz` system library to be installed.

`pure=True`: finds nested shared expressions deep in code that dont need to be recomputed. Eg. `inc(1)` here is the same so it only gets called once. A pure function should have no side-effects.

In [24]:
total.compute()

14

In [None]:
results = []
for y in range(2,10,2):
    for x in range(4):
        a = inc(1)
        b = mul(y,x)
        c = add(a,b)
        results.append(c)

total = delayed(sum,pure=True)(results)
total

In [None]:
total.visualize()

In [None]:
total.compute()

In [None]:
# Tree reduction --- add up pairwise
while len(results) > 1:
    new_results = []
    
    for i in range(0,len(results),2):
        res = add(results[i], results[i+1])
        new_results.append(res)
    
    results = new_results

total = results[0]
total.visualize()

In [None]:
total.compute()

Note: you cannot iterate over a delayed object and you can't use them in case statements (because we dont know how long they are until they've been computed)

In [None]:
for x in range(inc(1)):
    print("hey!")

## scheduling the execution

where you run a certain piece of a parallel task depends on your architecture, what needs each piece has, and what the bottlenecks are in moving data between pieces.

The **single machine scheduler** is optimizes for larger-than-memory use. It uses:
  
   - Parallel CPU
   - Minimizes RAM: tries to remove intermediary tasks that aren't needed anymore
   - low overhead: 100$\mu$s per task
 

**Distributed scheduler** - tries to minimize data movement so you dont have to move data between computers unnecessarily.
 
 - distributed to schedule across many workers
 - works well with distributed datastores (HDFS)
 - asynchronous
 - data local
 
run `dask-scheduler` on the command line and then 

In [None]:
from dask.distributed import Executor, progress
e = Executor(set_as_default=True)
e

# swap out concurrent.futures with a dask executor.

In [None]:
%%time 
from time import sleep

#from concurrent.futures import ProcessPoolExecutor
#e = ProcessPoolExecutor() 

def slowfunc(x,y,delay=1):
    sleep(delay)
    return(x+y)

In [None]:
%%time
futures = [e.submit(slowfunc,1,2, delay=1) for _ in range(100)]
[f.result() for f in futures]

There are loads of ways to do mapping now in Python, [this notebook](https://github.com/mrocklin/scipy-2016-parallel/blob/master/notebooks/map-rosetta-stone.ipynb) is the Rosetta stone.