Dask
====

<img src="http://dask.readthedocs.io/en/latest/_images/dask_horizontal.svg" 
     width="30%" 
     align=right
     alt="Dask logo">


Dask is a flexible parallel computing library for analytic computing. Dask provides dynamic parallel task scheduling and high-level big-data collections like `dask.array` and `dask.dataframe`. More on dask here: https://docs.dask.org/en/latest/

_Note: Pieces of this notebook comes from the following sources:_

- https://github.com/pangeo-data/pangeo-tutorial

## Start a Dask distributed cluster and a Client for Dashboard

Starting the Dask Cluster/Client is generally optional.  
It provides a dashboard which is useful to gain insight on the computation.  
Using dask-jobqueue will also provide more computing power by scaling Dask on several nodes.

The link to the dashboard will become visible when you create the cluster or client below. As [dask-labextension](https://github.com/dask/dask-labextension) is integrated in the current environment, it can be sufficient for monitoring Dask tasks (see Task Stream and Progress windows on the right). Otherwise, we recommend having the dashboard open on one side of your screen while using your notebook on the other side.  This can take some effort to arrange your windows, but seeing them both at the same is very useful when learning.

Start the Dask cluster on Cheyenne, this will also activate the Dask windows on the right:

In [None]:
from dask_jobqueue import PBSCluster
cluster = PBSCluster(cores=1, memory='10GB', processes=1, queue='share', walltime='01:00:00')
cluster.scale(4) # Ask for 4 workers
cluster

Connect a client to it:

In [None]:
from dask.distributed import Client
client = Client(cluster) # Connect this local process to remote workers
client

# Dask Arrays

A dask array looks and feels a lot like a numpy array.
However, a dask array doesn't directly hold any data.
Instead, it symbolically represents the computations needed to generate the data.
Nothing is actually computed until the actual numerical values are needed.
This mode of operation is called "lazy"; it allows one to build up complex, large calculations symbolically before turning them over the scheduler for execution.

If we want to create a numpy array of all ones, we do it like this:

In [None]:
import numpy as np
shape = (1000, 4000)
ones_np = np.ones(shape)
ones_np

This array contains exactly 32 MB of data:

In [None]:
print('%.1f MB' % (ones_np.nbytes / 1e6))

Now let's create the same array using dask's array interface.

In [None]:
import dask.array as da
ones = da.ones(shape)
ones

This works, but we didn't tell dask how to split up the array, so it is not optimized for distributed computation.

A crucal difference with dask is that we must specify the `chunks` argument. "Chunks" describes how the array is split up over many sub-arrays.

![Dask Arrays](http://dask.pydata.org/en/latest/_images/dask-array-black-text.svg)
_source: [Dask Array Documentation](http://dask.pydata.org/en/latest/array-overview.html)_

There are [several ways to specify chunks](http://dask.pydata.org/en/latest/array-creation.html#chunks).
In this tutorial, we will use a block shape.

In [None]:
chunk_shape = (1000, 1000)
ones = da.ones(shape, chunks=chunk_shape)
ones

Notice that we just see a symbolic represetnation of the array, including its shape, dtype, and chunksize.
No data has been generated yet.
When we call `.compute()` on a dask array, the computation is trigger and the dask array becomes a numpy array.

In [None]:
ones.compute()

In order to understand what happened when we called `.compute()`, we can visualize the dask _graph_, the symbolic operations that make up the array

In [None]:
ones.visualize()

Our array has four chunks. To generate it, dask calls `np.ones` four times and then concatenates this together into one array.

Rather than immediately loading a dask array (which puts all the data into RAM), it is more common to reduce the data somehow. For example:

In [None]:
sum_of_ones = ones.sum()
sum_of_ones.visualize()

Here we see dask's strategy for finding the sum. This simple example illustrates the beauty of dask: it automatically designs an algorithm appropriate for custom operations with big data. 

If we make our operation more complex, the graph gets more complex.

In [None]:
fancy_calculation = (ones * ones[::-1, ::-1]).mean()
fancy_calculation.visualize()

### A Bigger Calculation

The examples above were toy examples; the data (32 MB) is nowhere nearly big enough to warrant the use of dask.

We can make it a lot bigger!

In [None]:
bigshape = (200000, 4000)
big_ones = da.ones(bigshape, chunks=chunk_shape)
big_ones

In [None]:
print('%.1f MB' % (big_ones.nbytes / 1e6))

This dataset is 6.4 GB, rather than 32 MB! This is probably close to or greater than the amount of available RAM than you have in your computer. Nevertheless, dask has no problem working on it.

_Do not try to `.visualize()` this array!_

When doing a big calculation, dask also has some tools to help us understand what is happening under the hood. Let's watch the dashboard again as we do a bigger computation.

In [None]:
big_calc = (big_ones * big_ones[::-1, ::-1]).mean()

result = big_calc.compute()
result

### Reduction 

All the usual numpy methods work on dask arrays.
You can also apply numpy function directly to a dask array, and it will stay lazy.

In [None]:
big_ones_reduce = (np.cos(big_ones)**2).mean(axis=1)
big_ones_reduce

Plotting also triggers computation, since we need the actual values

In [None]:
from matplotlib import pyplot as plt
%matplotlib inline
plt.rcParams['figure.figsize'] = (12,8)

In [None]:
plt.plot(big_ones_reduce)

# Dask Delayed

Dask.delayed is a simple and powerful way to parallelize existing code.  It allows users to delay function calls into a task graph with dependencies.  Dask.delayed doesn't provide any fancy parallel algorithms like Dask.dataframe, but it does give the user complete control over what they want to build.

Systems like Dask.dataframe are built with Dask.delayed.  If you have a problem that is paralellizable, but isn't as simple as just a big array or a big dataframe, then dask.delayed may be the right choice for you.

## Create simple functions

These functions do simple operations like add two numbers together, but they sleep for a random amount of time to simulate real work.

In [None]:
import time

def inc(x):
    time.sleep(0.1)
    return x + 1

def dec(x):
    time.sleep(0.1)
    return x - 1
    
def add(x, y):
    time.sleep(0.2)
    return x + y 

We can run them like normal Python functions below

In [None]:
%%time
x = inc(1)
y = dec(2)
z = add(x, y)
z

These ran one after the other, in sequence.  Note though that the first two lines `inc(1)` and `dec(2)` don't depend on each other, we *could* have called them in parallel had we been clever.

## Annotate functions with Dask Delayed to make them lazy

We can call `dask.delayed` on our funtions to make them lazy.  Rather than compute their results immediately, they record what we want to compute as a task into a graph that we'll run later on parallel hardware.

In [None]:
import dask
inc = dask.delayed(inc)
dec = dask.delayed(dec)
add = dask.delayed(add)

Calling these lazy functions is now almost free.  We're just constructing a graph

In [None]:
%%time
x = inc(1)
y = dec(2)
z = add(x, y)
z

## Visualize computation

In [None]:
z.visualize(rankdir='LR')

## Run in parallel

Call `.compute()` when you want your result as a normal Python object

If you started `Client()` above then you may want to watch the status page during computation.

In [None]:
%%time
z.compute()

## Parallelize Normal Python code

Now we use Dask in normal for-loopy Python code.  This generates graphs instead of doing computations directly, but still looks like the code we had before.  Dask is a convenient way to add parallelism to existing workflows.

In [None]:
%%time
zs = []
for i in range(256):
    x = inc(i)
    y = dec(x)
    z = add(x, y)
    zs.append(z)
    
zs = dask.persist(*zs)  # trigger computation in the background

To make this go faster, add additional workers.

In [None]:
cluster.scale(8)
cluster

# Dask Deploy

The Dask library is written in pure Python. Installation of Dask is as simple as:

```shell
$ pip install "dask[complete]"
# or
$ conda install dask
```


Once dask is installed, the steps to deploying dask differ depending on the the computational infrastructure you are working with and what scheduler you plan to use. We'll briefly cover that topic next.

Dask-deploy docs: http://docs.dask.org/en/latest/setup.html

## Dask Schedulers

The Dask *Schedulers* orchestrate the tasks in the Task Graphs so that they can be run in parallel.  *How* they run in parallel, though, is determined by which *Scheduler* you choose.

There are 3 *local* schedulers:

- **Single-Thread Local:** For debugging, profiling, and diagnosing issues
- **Multi-threaded:** Using the Python built-in `threading` package (the default for all Dask operations except `Bags`)
- **Multi-process:** Using the Python built-in `multiprocessing` package (the default for Dask `Bags`)

and 1 *distributed* scheduler, which we will talk about later:

- **Distributed:** Using the `dask.distributed` module (which uses `tornado` for TCP communication). The distributed scheduler uses a `Cluster` to manage communication between the scheduler and the "workers". This is described in the next section.

## Distributed Clusters (http://distributed.dask.org/)

Dask can be deployed on distributed infrastructure, such as a an HPC system or a cloud computing system.

- `LocalCluster` - Creates a `Cluster` that can be executed locally. Each `Cluster` includes a `Scheduler` and `Worker`s. 
- `Client` - Connects to and drives computation on a distributed `Cluster`

### Dask Jobqueue (http://jobqueue.dask.org/)

- `PBSCluster`
- `SlurmCluster`
- `LSFCluster`
- etc.

### Dask Kubernetes (http://kubernetes.dask.org/)

- `KubeCluster`
