# Handling dask arrays

We have previously worked over applying functions to NumPy arrays contained in Xarray objects.
`apply_ufunc` also lets you easily perform many of the steps involving in applying 
functions that expect and return Dask arrays.

Learning goals:
- Learn that `apply_ufunc` can automate aspects of applying computation functions on dask arrays

```{tip}
We'll reduce the length of error messages using `%xmode minimal` See the [ipython documentation](https://ipython.readthedocs.io/en/stable/interactive/magics.html#magic-xmode) for details.
```

## Setup

In [None]:
%xmode minimal

import dask
import numpy as np
import xarray as xr

xr.set_options(display_expand_data=False)

First lets set up a `LocalCluster` using [dask.distributed](https://distributed.dask.org/).

You can use any kind of dask cluster. This step is completely independent of
xarray. While not strictly necessary, the dashboard provides a nice learning
tool.


In [None]:
from dask.distributed import Client

client = Client()
client

<p>&#128070</p> Click the Dashboard link above. Or click the "Search" button in the dashboard.

Let's test that the dashboard is working..


In [None]:
import dask.array

dask.array.ones((1000, 4), chunks=(2, 1)).compute()  # should see activity in dashboard

Let's open a dataset. We specify `chunks` so that we create a dask arrays for the DataArrays

In [None]:
ds = xr.tutorial.open_dataset("air_temperature", chunks={"time": 100})
ds

## A simple example

All the concepts from applying numpy functions carry over.

However the handling of dask arrays needs to be explicitly activated.

There are three options for the `dask` kwarg.

```
    dask : {"forbidden", "allowed", "parallelized"}, default: "forbidden"
        How to handle applying to objects containing lazy data in the form of
        dask arrays:

        - 'forbidden' (default): raise an error if a dask array is encountered.
        - 'allowed': pass dask arrays directly on to ``func``. Prefer this option if
          ``func`` natively supports dask arrays.
        - 'parallelized': automatically parallelize ``func`` if any of the
          inputs are a dask array by using :py:func:`dask.array.apply_gufunc`. Multiple output
          arguments are supported. Only use this option if ``func`` does not natively
          support dask arrays (e.g. converts them to numpy arrays).
```

We will work through the following two:

1. `dask="allowed"` Dask arrays are passed to the user function. This is a good
   choice if your function can handle dask arrays and won't compute the result unless 
   explicitly requested.
2. `dask="parallelized"`. This applies the user function over blocks of the dask
   array using `dask.array.blockwise`. This is useful when your function cannot
   handle dask arrays natively (e.g. scipy API).

In [None]:
# Expect an error here
def squared_error(x, y):
    return (x - y) ** 2


xr.apply_ufunc(squared_error, ds.air, 1)

  
A good thing to check is whether the applied function (here `squared_error`) can handle pure dask arrays. 
To do this call  `squared_error(ds.air.data, 1)` and make sure of the following:
1. That you don't see any activity on the dask dashboard
2. That the returned result is a dask array.

In [None]:
squared_error(ds.air.data, 1)

Since `squared_error` can handle dask arrays without computing them, we specify
`dask="allowed"`.

In [None]:
sqer = xr.apply_ufunc(
    squared_error,
    ds.air,
    1,
    dask="allowed",
)
sqer  # dask-backed DataArray! with nice metadata!

### Understanding what's happening

Let's again use the wrapper trick to understand what `squared_error` receives.

We see that it receives a dask array (analogous to the numpy array in the previous example).

In [None]:
def wrapper(x, y):
    print(f"received x of type {type(x)}, shape {x.shape}")
    print(f"received y of type {type(y)}")
    return squared_error(x, y)


xr.apply_ufunc(wrapper, ds.air, 1, dask="allowed")

## Reductions and core dimensions

`squared_error` operated on a per-element basis. How about a reduction like `np.mean`?

Such functions involve the concept of "core dimensions". One way to think about core dimensions is to consider the smallest dimensionality of data necessary to apply the function.

For using more complex operations that consider some array values collectively,
it’s important to understand the idea of **core dimensions**. 
Usually, they correspond to the fundamental dimensions over
which an operation is defined, e.g., the summed axis in `np.sum`. A good clue
that core dimensions are needed is the presence of an `axis` argument on the
corresponding NumPy function.

With `apply_ufunc`, core dimensions are recognized by name, and then moved to
the last dimension of any input arguments before applying the given function.
This means that for functions that accept an `axis` argument, you usually need
to set `axis=-1`

```{tip} Exercise

Use `dask.array.mean` as an example of a function that can handle dask
arrays and uses an `axis` kwarg. 
```

```{tip} Solution
:class: dropdown
```python
def time_mean(da):
    return xr.apply_ufunc(
        dask.array.mean,
        da,
        input_core_dims=[["time"]],
        dask="allowed",
        kwargs={"axis": -1},  # core dimensions are moved to the end
    )


time_mean(ds.air)
```


Again, this is identical to the built-in `mean`

In [None]:
def time_mean(da):
    return xr.apply_ufunc(
        dask.array.mean,
        da,
        input_core_dims=[["time"]],
        dask="allowed",
        kwargs={"axis": -1},  # core dimensions are moved to the end
    )


ds.air.mean("time").identical(time_mean(ds.air))

## Automatically parallelizing dask-unaware functions

A very useful `apply_ufunc` feature is the ability to apply arbitrary functions
in parallel to each block. This ability can be activated using
`dask="parallelized"`. 

Again xarray needs a lot of extra metadata, so depending
on the function, extra arguments such as `output_dtypes` and `output_sizes` may
be necessary.

We will use `scipy.integrate.trapz` as an example of a function that cannot
handle dask arrays and requires a core dimension. If we call `trapz` with a dask
array, we get a numpy array back that is, the values have been eagerly computed.
This is undesirable behaviour


In [None]:
import scipy as sp
import scipy.integrate

sp.integrate.trapz(
    ds.air.data, axis=ds.air.get_axis_num("lon")
)  # does NOT return a dask array, you should see activity on the dashboard

Let's activate automatic parallelization with `dask="parallelized"`

In [None]:
integrated = xr.apply_ufunc(
    sp.integrate.trapz,
    ds,
    input_core_dims=[["lon"]],
    kwargs={"axis": -1},
    dask="parallelized",
)
integrated

And make sure the returned data is a dask array

In [None]:
integrated.air.data

We'll define a function `integrate_lon` for later use:

In [None]:
def integrate_lon(ds):
    return xr.apply_ufunc(
        sp.integrate.trapz,
        ds,
        input_core_dims=[["lon"]],
        kwargs={"axis": -1},
        dask="parallelized",
    )

### Understanding what is happening


It is very important to understand what `dask="parallelized"` does. It is quite convenient and works well for "blockwise" or "embarrassingly parallel" operations.

These are operations where one block or chunk of the output array corresponds to one block or chunk of the input array. Specifically, the blocks or chunks of the _core dimension_ is what matters.

Let's look at the dask repr for `ds` and note chunksizes are (100,25,53) for a array with shape (2920, 25, 53). This means that each block or chunk of the array contains all `lat`, `lon` points and a subset of `time` points.

In [None]:
ds.air.data

The core dimension for `trapz` is `lon`, and there is only one chunk along `lon`. This means that integrating along `lon` is a "blockwise" or "embarrassingly parallel" operation and `dask="parallelized"` works quite well. 

```{tip} Question
Do you understand why `integrate(ds)` when `ds` has a single chunk along `lon` is a "embarassingly parallel" operation?
```

```{tip} Exercise
Apply the integrate function to `ds` after rechunking to have a different chunksize along `lon` using `ds.chunk(lon=4)` (for example). What happens?
```
```{tip} Solution
:class: dropdown

`apply_ufunc` complains that it cannot automatically parallelize because the dataset `ds` is now chunked along the core dimension `lon`. You should see the following error:

    ValueError: dimension lon on 0th function argument to apply_ufunc with dask='parallelized' 
    consists of multiple chunks, but is also a core dimension. To fix, either rechunk 
    into a single array chunk along this dimension, i.e., 
    ``.chunk(dict(lon=-1))``, or pass ``allow_rechunk=True`` in ``dask_gufunc_kwargs`` 
    but beware that this may significantly increase memory usage.

```

## Clean up the cluster

In [None]:
client.close()

## More

1. https://docs.xarray.dev/en/stable/examples/apply_ufunc_vectorize_1d.html
2. https://docs.dask.org/en/latest/array-best-practices.html
