# An Introduction to dask

## Course overview

Dask is a flexible parallel computing library for Python. It allows dynamic scheduling of tasks for optimising computations and includes collections (extensions of arrays, lists and dataframes) for use with parallel data processing.

As of Iris v2.0, dask has been integrated with Iris. This means you can utilise all of the benefits of dask while keeping the fileformat interoperability and other functionality that Iris provides.

### Why does this course exist?

This course exists to enable you to use dask and Iris together for more performant, memory efficient data processing by showing some useful patterns for combining dask and Iris.

#### Iris/dask integration

Iris uses dask to defer data loading and processing by default in a number of situations, including loading and some mathematical, analytical and statistical operations. This course will introduce some patterns for using Iris and dask together to produce your own bespoke data prcessing pipelines that meet your analytical needs while maintaining deferred data.


#### Performance

A real benefit of dask is parallel data processing by default. This means you will see existing Iris processing paradigms getting faster, and we will show patterns for loading and processing data to take advantage of dask parallel processing so that your bespoke processing pipelines will also run faster.


#### Memory usage

Data volumes are getting larger and larger, and we are getting to the point where a typical dataset is now larger than the amount of system memory available to process the dataset with. In many cases then, we can no longer just load an entire dataset into memory and perform data processing on the dataset.

Dask provides intelligent deferral of loading data from disk into memory (this is often referred to as [out-of-core processing](https://en.wikipedia.org/wiki/External_memory_algorithm)). This dask functionality is available in Iris as well. This means that dask enables Iris to load and perform data processing on data volumes that are significantly larger than system memory available to you without blowing memory. This means data can also be processed faster and with less difficulty.

## Terminology

### General

#### Thread
A thread is the simplest unit of computation.

#### Process
A process is an isolated computation which consists of one or more threads. The threads within a process can be executed concurrently (simultaneously) and have access to the same resources within the process (memory, executable code and variable values). Processes cannot share these resources with other processes. Each process has its own individual address space, while the threads within that process share their address space.

#### Graph
A way to represent 'things' and their relationships, where a node (circle) is a thing and an edge (line) between nodes is their relationship; e.g. a social network where nodes are people and edges are their friendships.


### Dask

#### Dask architecture
Below is the basic structure of how the components of dask interact:

![](../images/dask_architecture.png)

Dask uses a pool of workers to process tasks specified by the scheduler. The scheduler determines the tasks to be done by intelligently traversing the task graph. The task graph is produced dynamically and automatically passed to the scheduler when using dask functionality in your code. In the case of dask distributed, the task graph is submitted to the scheduler by the client.

#### Scheduler
One of dask’s key benefits is its ability to efficiently schedule tasks to optimize computations. Everything dask does is built on top of “schedulers”. These schedulers take the order of work established by a task graph and find the optimal way to break down and carry out the tasks.

Dask has four types of schedulers:
- Synchronous: Single thread (good for debugging).
        dask.get
- Threaded: Utilises a thread pool.
        dask.threaded.get
- Multiprocessing: Utilises a process pool.
        dask.multiprocessing.get
- Distributed: Utilises a cluster of distributed machines.
        distributed.Client.get

#### Worker
A worker receives tasks to process from the scheduler and returns output to the scheduler when that processing is finished. A worker can be a thread, a process or a whole machine, depending on the scheduler used.

#### Client
A client provides the primary point of access to a distributed scheduler and its associated workers. When using a threaded or multiprocessing scheduler interation with the scheduler is handled by the collection or delayed object.


#### Task Graph
A graph of tasks (nodes) and the data which is required to pass between them (edges).
In the context of dask:
- Task = circle
- Data = box
- Direction of flow = arrow

In [None]:
def add(a, b):
    return a + b

x = 1
y = 2
z = add(x, y)
w = sum([x, y, z])

![](../images/task_graph_def_img_1.png)

Viewing the relationship between tasks and data can reveal better\* ways to order the tasks to achieve the same computation.

\* Better can mean in less time or using less memory or both!

#### Dask Graph
Dask stores task graphs in a Python dictionary which maps keys to computations:

In [None]:
dsk = {'x': 1,
       'y': 2,
       'z': (add, 'x', 'y'),
       'w': (sum, ['x', 'y', 'z'])
      }

Dask is different from other parallelising libraries in that it uses ordinary Python structures to represent task graphs instead of a specialised API:

- `{dicts}`
- `(tuples)`
- `functions()`
- `values`


### Exercise: Terminology

Now it is time to test what you have learned! Answer the following questions by assigning each a definition.

Definitions:
1. A group of threads with shared memory.
2. The object which coordinates tasks for workers to process.
3. An interface to access the scheduler in a distrubuted parallel computing cluster.
4. A representation of the relationsips between a set of interdependent tasks and their data.
5. The smallest unit of computation.

Q: What is a scheduler? (Answer in the cell below.)

Q: What is a thread?

Q: What is a task graph?

Q: What is a process?

Q: What is a client?

## Dask Delayed
As users we may want to use the functionality of dask to parallelise our own code. The `dask.delayed` interface allows us to do this with a very light API:

![](../images/dask_stack.png)

Here we see that all of dask is built on top of the `scheduler`. The graph spec layer allows graphs to be generated and interpreted by the scheduler. Dask collections such as `arrays`, `bags` and `dataframes` are built on top of these and provide functionality to use these data types without having to think about how to talk to the `scheduler` underneath. Similarly `delayed` provides us with a lightweight interface to create our own parallelisable code without need to directly talk to the scheduler.

The decorator `@delayed` is used to "dask-ify" any arbitrary function, which allows said function to be used as part of a task graph.

In [None]:
from dask import delayed

@delayed(pure=True)
def add(a, b):
    return a + b

@delayed(pure=True)
def mul(a, b):
    return a * b

@delayed(pure=True)
def inc(a):
    return a + 1

By running the cells below you will see `z` is a delayed object and the dask methods `visualize()` and `compute()` can be used on them like any other dask collection.

In [None]:
z = add(1, 2)
z

In [None]:
z.visualize()

In [None]:
z.compute()

Similarly `c` is a delayed object which can be visualized and computed; it is slightly more complex than `z` on its own.

In [None]:
a = inc(1)
b = mul(1, 2)
c = add(a, b)
c

In [None]:
c.visualize()

In [None]:
c.compute()

It is worth noting that here we have created objects which are "lazy"; when they are created they are not immediately executed. Their execution is `delayed` until a time which we the users (or a scheduler) determine.

### Exercise: 1
Run the code in the cell below and then visualize and compute `total`.

In [None]:
results = []
for x in range(4):
    a = inc(1)
    b = mul(1, x)
    c = add(a, b)
    results.append(c)

total = delayed(sum, pure=True)(results)
total

**Part 1:** Visualize total.

**Part 2:** Compute total.

## Using Iris Load with dask
Iris is a Python library for analysing and visualising meteorological and oceanographic data sets. A common issue when using Iris to analyse many data files is that loading these files can take a lot of time when done sequentially. A solution is to parallelise the loading of files, which is an ideal task for dask.

### Setup

We will need a few more libraries to load files using Iris. These include `iris` for loading files to cubes, `os` and `glob` to access files on disk, and `dask.bag` is the type of dask collection we will use to create a task graph of our parallelised load functions. 

In [None]:
import iris
import os
import glob
import dask.bag as db

The data we will be loading is monthly northward sea ice velocities from 1890-1899, generated by the Met Office Unified Model and stored in 120 PP files. We will create a list of the filenames, and check the number and names of the files in the list are what we expect, using the following code:

In [None]:
files = glob.glob(iris.sample_data_path('UM', '*.pp'))

In [None]:
print('Earliest: {}\nLatest: {}'.format(files[0], files[-1]))

### Load multiple files into multiple cubes using `iris.load_cube`

When using Iris to load data that data is stored in a structure called a `cube`. Multiple cubes can be stored in a list called a `CubeList`. We can load the data from our 120 PP files into a `CubeList` by mapping our list of filenames (`files`) onto the `iris.load_cube()` function:

In [None]:
cubelist = iris.cube.CubeList(map(iris.load_cube, files))

In [None]:
len(cubelist)

In [None]:
type(cubelist)

### Load multiple files into multiple cubes using `dask.bag`

We can parallelise the loading of data from files into cubes by creating a `dask.bag` object which maps our list of filenames onto the function `iris.load_cube()`:

In [None]:
bag_o_cubes = db.from_sequence(files).map(iris.load_cube)

In [None]:
type(bag_o_cubes)

In [None]:
bag_o_cubes.visualize()

`bag_o_cubes` is a delayed (or "lazy") object which we can `visualize` as a task graph. The delayed object will not perform the computation of mapping filenames onto `iris.load_cube()` until we give it the command to do so (namely `bag_o_cubes.compute()`). You can see from the task graph (double-click to enlarge) that each file is being loaded into a cube invidually and in parallel.

Since we want to generate a `CubeList` like the previous example we can compute `bag_o_cubes` from within our `iris.cube.CubeList()` command:

In [None]:
bag_cubelist = iris.cube.CubeList(bag_o_cubes.compute())

In [None]:
len(bag_cubelist)

In [None]:
type(bag_cubelist)

It is worth noting that part of the beauty of Python is that we do not necessarily need to create four objects (`fp`, `files`,  `bag_o_cubes`, `bag_cubelist`) to get a `CubeList` object but can chain the functions together into one big function call:

```python
iris.cube.CubeList(db.from_sequence(files).map(iris.load_cube).compute())
```

### Load multiple files into single cube using `iris.load`

Iris by default will try to merge all input files into the fewest resultant cubes as possible, obeying the rules on what data and metadata can be merged into a single cube. If we know that the result of a load operation will be a single cube we can use `iris.load_cube` to return a single merged cube from the load operation:

In [None]:
cube = iris.load_cube(files)

print(cube)

Compare this to the output of printing a single cube and it is clear that the decade's worth of data has been merged into a single cube with 120 time steps:

In [None]:
single_cube = iris.load_cube(files[0])

print(single_cube)

### Load multiple files into single cube using `dask.bag` and `dask.delayed`
We can merge the `CubeList` we generate using a `dask.bag` in exactly the same way:

In [None]:
bag_merged_cube = bag_cubelist.merge_cube()

print(bag_merged_cube)

However the `merge_cube` step in this does not leverage dask. We can utilise dask in the merge step by decorating a call to `merge_cube` with the `@delayed` decorator. This allows us to integrate it with our task graph:

In [None]:
@delayed
def delayed_merge(cubes):
    return iris.cube.CubeList(cubes).merge_cube()

delayed_merged_cube = delayed_merge(bag_o_cubes)

delayed_merged_cube.visualize()

Once again if you expand the task graph (double-click image) you can see that `merge_cube()` is now part of it. Compare this to the task graph of `bag_o_cubes`. What is different?

`delayed_merged_cube` is "lazy" so we need to call `compute()` to actually execute the task graph and output a merged cube. We can do this within a `print()` call to see that we have merged it successfully:

In [None]:
print(delayed_merged_cube.compute())

### Using Iris Load with Dask and Constraints

Often when loading files in Iris we will want to constrain the files on load, in order to avoid loading more data than we need. Loading using constraints has already been covered in the Iris Introduction course and in the Iris documentation. We will now consider some simple examples of loading with constraints and how to use dask in this process.

In [None]:
filepath = '/project/avd/iris/resources/git/iris-sample-data/sample_data'
# The ?? in files below restricts our search to files with 2 character file extensions.
# This excludes files like grib2 and txt.
files = glob.glob(os.path.join(filepath, '*.??'))
files

#### Constraining by Phenomenon

One way to constrain data is based on the phenomenon it represents. Below we have added a constraint so only cubes with the name `air_potential_temperature` will be loaded.

In [None]:
phenom_cubes = iris.load(files, 'air_potential_temperature')
phenom_cubes

In order to achieve the same thing using dask a few steps are involved. First we create a list of constraints (the same constraint repeated) that can be fed into a map. This list must be the same length as the number of files because the function mapping will iterate through them both sequentially.

In [None]:
phen_cstr = ['air_potential_temperature'] * len(files)

We then create a dask bag using the `from_delayed` method. Every file in `files` is passed to `delayed(iris.load)` along with its matching constraint from `phen_cstr`.

In [None]:
phenom_dask_load = db.from_delayed(map(delayed(iris.load), files, phen_cstr))

By running the following you can visualize how this task is chunked.

In [None]:
phenom_dask_load.visualize()

Computing the following will have the same result as the original `iris.load`.

In [None]:
phenom_dask_cubes = phenom_dask_load.compute()
phenom_dask_cubes

#### Constraining by Coordinate Values

Another way to constrain Iris loading is based dimensions and coordinate values. In this example we will only load data with a model level number less than 10. The constraint `mln_cstr` will do this and `mln_cstr_list` will have copies of that restraint which can be used in a mapping.

In [None]:
mln_cstr = iris.Constraint(model_level_number=lambda cell: cell < 10)
mln_cstr_list = [mln_cstr] * len(files)

The following cell performs a simple `iris.load` using the constraint we specified above.

In [None]:
mln_cubes = iris.load(files, mln_cstr)
mln_cubes

In the following cells a dask bag is made by mapping individual files and the model level number constraint to the delayed `iris.load`. You can visualize and compute this to have the same effect as the `iris.load` above.

In [None]:
mln_dask_load = db.from_delayed(map(delayed(iris.load), files, mln_cstr_list))

In [None]:
mln_dask_load.visualize()

In [None]:
mln_dask_cubes = mln_dask_load.compute()
mln_dask_cubes

#### Constraining by Time Coordinate Values

It is quite common for us to filter data based on time. Below we define our time constraint similarly to how we have done previously.

In [None]:
time_cstr = iris.Constraint(time=lambda cell: 1920 <= cell.point.year < 1950)
time_cstr_list = [t_cstr] * len(files)

In the next cell we have used `iris.load` similarly to how we did for the `model_level_number` example. One key difference is that we perform the load with the `iris.FUTURE.context(cell_datetime_objects=True)` option. In Iris v1.x this is necessary to enable cubes to compare their date/time with `datetime` objects.

In [None]:
with iris.FUTURE.context(cell_datetime_objects=True):
    time_cubes = iris.load(files, time_cstr)
time_cubes

Below we've constructed a dask bag like we did in the previous example. You can then visualize and compute this dask bag. However like above, the computation must be done with the `iris.FUTURE.context(cell_datetime_objects=True)` option.

In [None]:
time_dask_load = db.from_delayed(map(delayed(iris.load), files, time_cstr_list))

In [None]:
time_dask_load.visualize()

In [None]:
with iris.FUTURE.context(cell_datetime_objects=True):
    time_dask_cubes = time_dask_load.compute()
time_dask_cubes

#### Constraining by Attribute

You can also constrain load based on its attributes. For example, in the following cells we will constrain based on the UM version being used. This is very similar to the previous examples except we are using an `iris.AttributeConstraint` rather than an `iris.Constraint` object.

First we define the `AttributeConstraint` and the list of its copies.

In [None]:
um_cstr = iris.AttributeConstraint(um_version='7.3')
um_cstr_list = [um_cstr] * len(files)

Then we're able to perform a simple iris.load using the `AttributeConstraint`.

In [None]:
um_cubes = iris.load(files, um_cstr)
um_cubes

Then we can map the list of files and `AttributeConstraints` to the delayed `iris.load`. This can then be visualized and computed to the same effect as the `iris.load` above.

In [None]:
um_dask_load = db.from_delayed(map(delayed(iris.load), files, um_cstr_list))

In [None]:
um_dask_load.visualize()

In [None]:
um_dask_cubes = um_dask_load.compute()
um_dask_cubes

### Using `filter` as a Constraint for Iris Load with Dask

We have seen how to load a subset of data by passing a `constraint` to `iris.load` when loading from a list of filenames. We have also seen how to do this by mapping lists of filenames and constraints onto `delayed(iris.load)`, generating a `dask.bag`. Here we will use the `map` and `filter` functionality of `dask.bag` with `iris.load` to subset data using a phenomenon constraint.

First we should define what we mean when we talk about `map` and `filter`:

- `map(func, seq)` <br> A function which takes a callable (`func`) and a sequence of inputs (`seq`) as arguments. It returns a sequence of outputs which are the result of applying `func` to each of the items in `seq`.


- `filter(func, seq)` <br> A function which also takes a callable (`func`) and a sequence of inputs (`seq`) as arguments. `func` must be a callable which returns a Boolean value, i.e. either `True` or `False`. `func` will be applied to every element of `seq` and only if `func` returns `True` will the element of the sequence be included in the output sequence.

`filter` sounds very similar to what we were doing when using `iris.load` with a `constraint`. In a similar way to how we generated `bag_o_cubes` we can generate a `dask.bag` by mapping a sequence of filenames onto `iris.load`:

In [None]:
bag_of_cubes = db.from_sequence(files).map(iris.load)
bag_of_cubes.compute()

We can use the `filter` function of a `bag_of_cubes` to constrain it based on the phenomenon it represents. To do this we need to define a filter function:

In [None]:
phenom_filter = lambda cube: cube.name() == 'air_potential_temperature'

In this case we used a `lambda` to define our filter function, which returns `True` if the name of the phenomenon represented by the cube is `'air_potential_temperature'`.

`bag_of_cubes` is a nested list of lists, with each list containing the cubes from each file. We can use `flatten` to reduce it to a single list of cubes then apply `filter` to select cubes based on a phenomenon:

In [None]:
filtered_bag = bag_of_cubes.flatten().filter(phenom_filter)
filtered_bag.compute()

Again Python handily allows us to chain together `from_sequence()`, `map()`, `flatten()`, `filter()` and `compute()` into one long line of code:

In [None]:
alt_filter = lambda cube: cube.name() == 'surface_altitude'

db.from_sequence(files).map(iris.load).flatten().filter(alt_filter).compute()

### Exercise: Loading

Consider the following code snippets. Despite looking similar, not all of these code snippets will provide parallel file loading:

In [None]:
# This cell sets up the files needed by the exercise.
filepath = iris.sample_data_path('GloSea4')
files = glob.glob(os.path.join(filepath, '*.pp'))
files

In [None]:
dlyd = delayed(iris.load)(os.path.join(filepath, '*.pp'))
cs1 = db.from_delayed(dlyd)

In [None]:
cs2 = db.from_sequence(files).map(iris.load_cube)

In [None]:
cs3 = db.from_delayed(map(delayed(iris.load), files))

**1.** Print the task graph for each code snippet.

**2.** With reference to the task graph produced by each code snippet, state whether or not each code snippet will provide parallel file loading.

**3.** For the code snippets that do not provide parallel file loading, describe why parallel file loading does not happen and suggest a change to the code snippet to enable parallel file loading.