# Dask

Dask does two things that NumPy and Pandas don't:
- it allows you to build *task graphs* that set out a plan for executing an operation
- it manages the parallelization of these operations

In practice, this means that you can:
- work with datasets that are larger than memory without needing to micro-manage the file input-output
- operate in parallel
- but still write code that is recognisible from pandas and numpy

Dask is also built into xarray as an optional dependancy.

For reference, the calculations below are carried out on a workstation with 8 CPU cores and 23 GB of memory.

In [1]:
import dask.array as da
from dask.diagnostics import ProgressBar
import numpy as np
#import matplotlib.pyplot as plt

#%matplotlib inline

%load_ext autoreload
%autoreload 2

## Task graphs and lazy evaluation
When you call a function on a numpy array, the function is evaluated immediately.  For example:

In [3]:
np.sum( np.random.rand( 10 ) )

6.2699025622958837

This approach of evaluating immediately is known as **eager evaluation**.

When you call a function on a dask array, the function is *not* evaulated immediately.  Instead, calling the function means that dask creates a plan for how it would carry out this operation. This plan is known as a **task graph**. This approach of deferring evaluating is known as **lazy evaluation**.  

The following example shows a simple example where the ultimate computation is to 1) create a random matrix 2) increase all the elements by 1 and 3) add all the elements together.

The task graph image will **not** work on your machine unless [you install special packages to do so.](https://stackoverflow.com/a/43744112/5387991)

In [4]:
# Define a function that increases all the elements of x by 1
def inc(x):
    return x + 1
# Create a dask array with 9 random elements divided into a single chunk of 9 elements
arr = da.from_array(np.random.rand(9),chunks=(9))
# Increase all the elements by 1
arr = inc(arr)
# Add all the elements together
arr = da.sum(arr)
# Output the task graph
arr.visualize()

RuntimeError: Drawing dask graphs requires the `graphviz` python library and the `graphviz` system library to be installed.

At this point the evaluation of the task graph has still not occurred - not even the initial stage of creating the random matrix.

To get dask to trigger the evaluation, use the ```.compute``` method on ```arr```:

In [5]:
arr.compute()

11.865100532825501

Note that the syntax is different when we use dask in xarray. In xarray case the ```.load()``` method triggers the evaluation, but when using dask alone ```.compute()``` triggers execution.

The task graph visualised above is a python dictionary setting out the chain of calculations.  

## Parallelisation
Dask also manages the execution of the task graph in parallel.  When executing in parallel you must specify the way that your data should be partitioned - the partions are known as *chunks*.

To illustrate parallel execution, we repeat the calculation above with a random matrix, but this time we split the array with 9 total elements into chunks with 3 elements:

In [3]:
# Define a function that increases all the elements of x by 1
def inc(x):
    return x + 1
# Create a dask array with 9 random elements divided into a single chunk of 9 elements
arr = da.from_array(np.random.rand(9),chunks=(3,))
# Increase all the elements by 1
arr = inc(arr)
# Add all the elements together
arr = da.sum(arr)
# Output the task graph
#arr.visualize()

Dask now has a task graph that carries out the array creation and incrementing by 1 separately in each of the three chunks.  In the step where the summing of the whole array occurs, dask plans to sum each of the chunks separately to get sub-totals and then to sum the the three sub-totals together.  This reduces the amount of data that must be transferred.


## Advantages and disadvantages of dask
The ability to carry out operations in parallel is clearly an advantage of dask.  However, 
- creating the task graph;
- carrying out calculations on smaller chunks of data; and
- transferring the results of calculations from the chunks to each other

all carry computational cost.  

**If your dataset fits comfortably in memory, it is better to carry out the calculation using numpy.**

We can compare the performance of numpy and dask for such a calculation where the array fits comfortably in memory.

In [6]:
# Numpy example
rand_arr = np.random.rand( 500,500,100)
%timeit np.mean(rand_arr,axis = 1)

803 ms ± 1.75 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


We can compare this with a similar calculation using dask. 

In [31]:
# Dask example where the task graph is created first and we only time the computation
dask_rand_arr = da.from_array( rand_arr, chunks = (100,100,100))
y = da.mean(dask_rand_arr, axis = 1)
%timeit y.compute()

51.8 ms ± 1.71 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In this case dask takes twice as long.

Be aware that these kinds of performance comparisons generalise poorly to other problems.  The size of the data, the number of cores available and the nature of the calculation all have a major effect on the performance you experience.

The **real advantages** of dask arise when:
- your dataset is bigger than the memory you have available (in practice dask may start becoming advantageous from when your dataset is 1/3 of your available memory)
- you have multiple cores available on your computer.

A major advantage is that the task scheduler in dask manages the process of reading data from disk into memory for each chunk.  This can reduce the amount of micro-management that you need to do yourself.

This example shows how easy it is to do such out-of-core processing on a very large array.

In [2]:
# Create very large array
# Cut into 1000x1000 sized chunks
x = da.random.normal(0, 1, size=(200000, 200000),  
                              chunks=(1000, 1000))    

# Get size of array
print("Size of array in gigabytes", x.nbytes / 1e9)   

# Take mean on first axis (indexing is just to reduce the amount output to the screen)
y = x.mean(axis=0)[::100000]                          

Size of array in gigabytes 320.0


In this example we initialise an enourmous array that would occupy 320 Gb in memory.  We divide this array into chunks that are 1000 x 1000.  We then take the mean of this array along the first axis and output a small subset of the outputs.

As we have not yet called ```.compute()``` on y, **none of this has yet happened**.  Instead, dask has created a task graphy setting out how it would go about this.

We can now trigger the calculation.  Before doing so for the first time, it is a good idea to remove some of the zeros from the size of the array $x$ first to check that the calculation won't overwhelm your system.

In [65]:
%time  y.compute();     # Time to compute the result

CPU times: user 22.2 s, sys: 1.01 s, total: 23.2 s
Wall time: 4.47 s


array([ 10.00043669,  10.00018279])

The *user* time is the total amount of time the CPUs spent doing the calculation.

The *sys* time is the overhead for the dask scheduler.  

The *total* time is the sum of the *user* and *sys* time.

The most important output is the *Wall time* - this is the actual amout of time the calculation took to run.  The Wall time is much less than the total CPU time because the CPUs are operating in parallel.

A wall-clock time of just a few seconds for a calculation on a 320 GB array is quite impressive.  However, a similar calculation on a dataset stored in netcdf files will likely take much longer.  This longer time with netcdf files is because the data would have to be read from the hard disk, whereas the random array above was just initialised directly in memory.

## Setting chunk sizes
The chunk sizes set how the operations are parallelised.

More chunks -> more parallel operations, but more computational overhead

Chunk size depends on your system.  A rule of thumb is to have chunks with size 100 Mb - 500 Mb.

The chunk shape can affect performance.  If you are taking a mean in the x-direction, for example, then having each chunk span the x-direction can help.

In [32]:
temp = da.random.normal(10, 0.1, size=(4000, 3000,300),   # 4000x3000 array with 3000 time steps 
                              chunks=(100, 100,10))   # Short chunks on the first dimension
y = temp.mean(axis=0)[::1000,::1000]                         # Take mean on first dimension


In [58]:
#%%time
with ProgressBar():
    y.compute();     # Time to compute the result

[########################################] | 100% Completed |  0.4s


In [61]:
temp = da.random.normal(10, 0.1, size=(4000, 3000,300),   # 4000x3000 array with 3000 time steps 
                              chunks=(1000, 100,10))   # Cut into 1000x1000 sized chunks
y = temp.mean(axis=0)[::1000,::1000]                         # Perform NumPy-style operation
print("Size of temperature array in gigabytes", temp.nbytes / 1e9)

Size of temperature array in gigabytes 28.8


In [62]:
# We can add a progress bar to get a sense of how quickly the calculation is proceeding.
with ProgressBar():
    y.compute();     # Time to compute the result

[########################################] | 100% Completed |  0.2s


While the chunk shape and size is important, the performance may be dominated by how your data is stored on disk.  The time needed to open and close a netcdf file may be as long as the time needed to read the data in the file!  On account of this overhead, if you set your chunk shape and size in such a way that many file openings and closings are required, then it may overwhelm any performance advantage once the data is in memory.

Ulimately, the right solution will depend on your system and your problem, so you need to experiment.

## Linear algebra
Dask also has algorithms to handle common linear algebra operations in parallel.  For example, you can do a fast fourier transform.

In [28]:
t = da.from_array(np.linspace(0,10*np.pi, 300), chunks=(300) )
signal = np.cos(0.5*t) + np.cos(3*t)

In [29]:
out = da.fft.fft(signal)

## Other aspects of dask
As well as the dask.array, dask can deal with dataframes and arbitrary tasks.

### Delayed
The dask.delayed module allows you to create arbitrary task graphs.  This is not so useful in climate research, where we typically work with arrays or dataframes and so dask.array or dask.dataframe are most useful.  However, it may be useful for other tasks, such as processing text-based data in operations that cannot be vectorised.

In [19]:
from dask import delayed
# Define a function and decorate it with @delayed
@delayed()
def add(a,b):
    return a + b

Once the function has been created with the @delayed wrapper, it can be called as normal:

In [16]:
c = add(1,2)

As with dask.array, this operation has just created a task graph.  To execute it the ```.compute()``` method must be called:

In [20]:
c.compute()

3