<img src="../images/dask_horizontal.svg" align="right" width="30%">

# Parallelize code with `dask.delayed`



In this section we parallelize simple for-loop style code with Dask and `dask.delayed`. Often, this is the only function that you will need to convert functions for use with Dask.

This is a simple way to use `dask` to parallelize existing codebases or build [complex systems](https://blog.dask.org/2018/02/09/credit-models-with-dask).  This will also help us to develop an understanding for later sections.


## Learning Objectives 

- Deploy a local Dask Distributed Cluster and access the diagnostics dashboard
- Work with `dask.delayed` to parallelize custom functions/workloads

## Prerequisites


| Concepts | Importance | Notes |
| --- | --- | --- |
| Familiarity with Python | Necessary | |
| Familiarity with xarray | Helpful | |


- **Time to learn**: *25-35 minutes*



## Deploy a local Dask Distributed Cluster

As we'll see in the [distributed scheduler notebook](11-dask-distributed.ipynb), Dask has several ways of executing code in parallel. We'll use the distributed scheduler by creating a `dask.distributed.LocalCluster` and then passing that to the `dask.distributed.Client`. For now, this will provide us with some nice diagnostics. We'll talk about schedulers in depth later.


In [None]:
from dask.distributed import Client, LocalCluster

In [None]:
cluster = LocalCluster()
client = Client(cluster)
client

**Note**:

- A cluster is a set of "workers". In the `LocalCluster` case, these workers are all on a single machine
- A client allows us to connect our jupyter notebook or script to the cluster

You may want to look at the keyword arguments available on `LocalCluster` to understand the options available to you on handling the mixture of threads and processes, etc... by un-commenting the cell below:

In [None]:
# LocalCluster?

## Basics

First let's make some toy functions, `square`, `add`, and `square_root` that sleep for a while to simulate work. We'll then time running these functions normally.

In the next section we'll parallelize this code.

In [None]:
import time

import dask

In [None]:
def square(x):
    time.sleep(1)
    return x ** 2


def add(x, y):
    time.sleep(1)
    return x + y


def square_root(x):
    time.sleep(1)
    return x ** (1 / 2)

We time the execution of this normal code using the `%%time` magic, which is a special function of the Jupyter Notebook.

In [None]:
%%time

x = square(3)
y = square(4)
z = add(x, y)
r = square_root(z)
r

This takes `~4 seconds` to run because we call each function sequentially, one after the other.

Those two `square` calls *could* be called in parallel, because they are totally independent of one-another.

We'll transform the `square`, `add`, and `square_root` functions using the `dask.delayed` function. When we call the delayed version by passing the arguments, exactly as before, the original function isn't actually called yet - which is why the cell execution finishes very quickly.
Instead, a *delayed object* is made, which keeps track of the function to call and the arguments to pass to it.


In [None]:
%%time
delayed_square = dask.delayed(square)
delayed_add = dask.delayed(add)
delayed_square_root = dask.delayed(square_root)

x = delayed_square(3)
y = delayed_square(4)
z = delayed_add(x, y)
r = delayed_square_root(z)
r

**This ran immediately, since nothing has really happened yet.** 

To get the result, call `compute`. 

In [None]:
%%time

r.compute()



<div class="admonition alert alert-success">
    <p class="admonition-title" style="font-weight:bold"></p>
    Notice that this runs faster than the original code.
</div>

## What just happened?

The `r` object is a lazy `Delayed` object.  This object holds everything we need to compute the final result, including references to all of the functions that are required and their inputs and relationship to one-another.  We can evaluate the result with `.compute()` as above or we can visualize the task graph for this value with `.visualize()`.

In [None]:
r.visualize()

<div class="admonition alert alert-info">
    <p class="admonition-title" style="font-weight:bold">Reminder: Task and Task Graphs</p>
    <ul>
        <li> A task is a function that you want to call and its corresponding inputs. </li>
    <li> A task graph is a collection of (1) the functions we want to call + their inputs (2) their dependencies. </li>
    </ul>
</div>


<img src="../images/dask-task-stream.gif">




By default the task graph is rendered from top to bottom. In the case that you prefer to visualize it from left to right, pass `rankdir="LR"` as a keyword argument to `.visualize()`.

In [None]:
r.visualize(rankdir="LR")

Notice that this includes the names of the functions from before, and the logical flow of the outputs of the `square` functions to the inputs of `add` and `square_root`.

### Some questions to consider:

-  Why did we go from 4s to 3s?  Why weren't we able to parallelize down to 2s?
-  What would have happened if the `square`, `add`, and `square_root` functions didn't include the `sleep(1)`?  Would Dask still be able to speed up this code?
-  What if we have multiple outputs or also want to get access to x or y?

## Exercise: Parallelize a for loop

`for` loops are one of the most common things that we want to parallelize.  Use `dask.delayed` on our custom `square` function and the built-in `sum`  function to parallelize the computation below:

In [None]:
data = list(range(1, 11))
data

In [None]:
%%time
results = []
for x in data:
    y = square(x)
    results.append(y)

total = sum(results)

In [None]:
total

In [None]:
%%time
# Your parallel code here...

In [None]:
%%time
# Solution

results = []

for x in data:
    y = delayed_square(x)
    results.append(y)

total = dask.delayed(sum)(results)
print(f"Before computing: {total}")  # Let's see what type of thing total is

result = total.compute()
print(f"After computing : {result}", result)  # After it's computed

How do the graph visualizations compare with the given solution, compared to a version with the `sum` function used directly rather than wrapped with `delayed`? Can you explain the latter version? You might find the result of the following expression illuminating


```python

delayed_square(1) + delayed_square(2)
```

In [None]:
z = delayed_square(1) + delayed_square(2)
z.visualize()

In [None]:
sum(results).visualize()

In [None]:
total.visualize()

## Exercise: Parallelizing a for-loop code with control flow

Often we want to delay only *some* functions, running a few of them immediately.  This is especially helpful when those functions are fast and help us to determine what other slower functions we should call.  This decision, to delay or not to delay, is usually where we need to be thoughtful when using `dask.delayed`.

In the example below we iterate through a list of inputs.  If that input is even then we want to call `square`.  If the input is odd then we want to call `double`.  This `is_even` decision to call `square` or `double` has to be made immediately (not lazily) in order for our graph-building Python code to proceed.

In [None]:
def double(x):
    time.sleep(1)
    return x * 2


def square(x):
    time.sleep(1)
    return x ** 2


def is_even(x):
    return not x % 2


data = list(range(1, 11))
data

In [None]:
%%time
# Sequential code

results = []
for x in data:
    if is_even(x):
        y = double(x)
    else:
        y = square(x)
    results.append(y)

total = sum(results)
print(total)

In [None]:
%%time
# Your parallel code here...
# TODO: parallelize the sequential code above using dask.delayed
# You will need to delay some functions, but not all

In [None]:
results = []
for x in data:
    if dask.delayed(is_even)(x):  # even
        y = dask.delayed(double)(x)
    else:  # odd
        y = dask.delayed(square)(x)
    results.append(y)

total = dask.delayed(sum)(results)
total.visualize()

In [None]:
%%time
total.compute()

### Some questions to consider:

-  What are other examples of control flow where we can't use delayed?
-  What would have happened if we had delayed the evaluation of `is_even(x)` in the example above?
-  What are your thoughts on delaying `sum`?  This function is both computational but also fast to run.

## Exercise: Parallelizing climatology and anomaly computations with xarray and `dask.delayed`

In this exercise we read four netCDF files for 4 ensemble members of CESM2 output submitted to the CMIP6 project. We then use xarray to compute anomalies for each ensemble member in parallel i.e. compute the climatology and use xarray's groupby arithmetic to remove this climatology from our original data for each member. 

We are given sequential code to do this and parallelize it with `dask.delayed`.

The computation we will parallelize is to compute the anomalies for each ensemble member from the input data.  We will do this by using `dask.delayed` together with `xarray`.  In a future section we will do this same exercise with xarray xarray dataset backed by `dask.array`.

### Download data

To download the necessary data, make sure to run the [Download Data Notebook](00-download-data.ipynb). This will download all necessary input files for four ensemble members (`r11i1p1f1`, `r7i1p1f1`, `r8i1p1f1`, `r9i1p1f1`) from the [esgf-node](https://esgf-node.llnl.gov/search/cmip6/).

### Inspect data

In [None]:
import pathlib

data_dir = pathlib.Path("data/")
files = sorted(data_dir.glob("tos_Omon_CESM2*"))
files

### Read one file with `xarray.open_dataset` and compute anomaly

In [None]:
import xarray as xr

In [None]:
ds = xr.open_dataset(files[0], engine="netcdf4")
ds

In [None]:
# Compute anomaly
gb = ds.tos.groupby('time.month')
tos_anom = gb - gb.mean(dim='time')
tos_anom

In [None]:
tos_anom.sel(lon=310, lat=50, method='nearest').plot();

### Sequential code: Anomaly for each ensemble member

The above cell computes the anomaly for one ensemble member during the period spanning `2000 - 2014`. Here we expand that to all four ensemble members using a sequential for loop.

In [None]:
%%time

results = {}
for file in files:

    # Read in file
    ds = xr.open_dataset(file, engine='netcdf4')

    # Compute anomaly
    gb = ds.tos.groupby('time.month')
    tos_anom = gb - gb.mean(dim='time')

    # Save the computed anomaly and record the name of the ensemble member
    results[file.stem.split('_')[-3]] = tos_anom


# Combine the results in our dataarray by concatenating the results across a new dimension `ensemble_member`
dset_anom = xr.concat(list(results.values()), dim='ensemble_member')
dset_anom['ensemble_member'] = list(results.keys())
dset_anom

In [None]:
dset_anom.sel(lon=310, lat=50, method='nearest').plot(col='ensemble_member', col_wrap=2, size=4);

### Parallelize the code above

Use `dask.delayed` to parallelize the code above.  Some extra things you will need to know.

1.  Methods and attribute access on delayed objects work automatically, so if you have a delayed object you can perform normal arithmetic, slicing, and method calls on it and it will produce the correct delayed calls.

    ```python
    ds = dask.delayed(xr.open_dataset)(files[0], engine='netcdf4')
    ds.isel(time=0).sum() # everything here was delayed
    ds.groupby('time.season').mean() # everything here was delayed
    ```
    
2.  Calling the `.compute()` method works well when you have a single output.  When you have multiple outputs you might want to use the `dask.compute` function:

    ```python
    >>> from dask import delayed, compute
    >>> x = delayed(np.arange)(10)
    >>> y = x ** 2
    >>> min_, max_ = compute(y.min(), y.max())
    >>> min_, max_
    (0, 81)
    ```
    
    This way Dask can share the intermediate values (like `y = x**2`)
    
So your goal is to parallelize the code above (which has been copied below) using `dask.delayed`.  You may also want to visualize a bit of the computation to see if you're doing it correctly.

In [None]:
%%time

# This is just one possible solution, there are
# several ways to do this using `delayed`

results = {}
for file in files:

    # Read in file
    ds = dask.delayed(xr.open_dataset)(file, engine='netcdf4')

    # Compute anomaly
    gb = ds.tos.groupby('time.month')
    tos_anom = gb - gb.mean(dim='time')

    # Save the computed anomaly and record the name of the ensemble member
    results[file.stem.split('_')[-3]] = tos_anom


# Compute the results
# dask.compute() returns a tuple here with a single item. So, ensure to grab this one item by using the 0 index
computed_results = dask.compute(results)[0]
# Combine the results in our dataarray by concatenating the results across a new dimension `ensemble_member`
dset_anom = xr.concat(list(computed_results.values()), dim='ensemble_member')
dset_anom['ensemble_member'] = list(computed_results.keys())
dset_anom

In [None]:
# Make a quick plot to ensure the results still match
dset_anom.sel(lon=310, lat=50, method='nearest').plot(col='ensemble_member', col_wrap=2, size=4);

In [None]:
results

In [None]:
results['r11i1p1f1'].visualize(rankdir="LR")

### Some questions to consider:

- How much speedup did you get? Is this how much speedup you'd expect?


## Close the Cluster and Client

Before moving on to the next notebook, make sure to close your cluster, and client or stop this kernel.

In [None]:
client.close()
cluster.close()

In [None]:
%load_ext watermark
%watermark --time --python --updated --iversion

---

## Summary

- `dask.delayed` is a handy mechanism for creating the Dask graph, but the adventurous may wish to play with the full fexibility afforded by building the graph dictionaries directly. Detailed information can be found [here](https://dask.pydata.org/en/latest/graphs.html).

## Learn More

Visit the [Delayed documentation](https://docs.dask.org/en/latest/delayed.html). In particular, this [delayed screencast](https://www.youtube.com/watch?v=SHqFmynRxVU) will reinforce the concepts you learned here and the [delayed best practices](https://docs.dask.org/en/latest/delayed-best-practices.html) document collects advice on using `dask.delayed` well.

In [None]:
from IPython.display import YouTubeVideo

YouTubeVideo(id="SHqFmynRxVU", width=600, height=300)

## Resources and references

* Reference
    *  [Docs](https://dask.org/)
    *  [Examples](https://examples.dask.org/)
    *  [Code](https://github.com/dask/dask/)
    *  [Blog](https://blog.dask.org/)
*  Ask for help
    *   [`dask`](http://stackoverflow.com/questions/tagged/dask) tag on Stack Overflow, for usage questions
    *   [github discussions](https://github.com/dask/dask/discussions) for general, non-bug, discussion, and usage questions
    *   [github issues](https://github.com/dask/dask/issues/new) for bug reports and feature requests
    
* Pieces of this notebook are adapted from the following sources
  * https://github.com/dask/dask-tutorial/blob/main/01_dask.delayed.ipynb
  
  
 <div class="admonition alert alert-success">
    <p class="title" style="font-weight:bold">Previous: <a href="./07-dask-intro.ipynb">Introducing Dask</a></p>
     <p class="title" style="font-weight:bold">Next: <a href="./09-dask-array.ipynb">Dask Array</a></p>
    
</div>