# A gentle introduction

`map_blocks` is inspired by the `dask.array` function of the same name and lets
you map a function on blocks of the xarray object (including Datasets!).

At _compute_ time, your function will receive a chunk of an xarray object with concrete
(computed) values along with appropriate metadata. This function should return
an xarray object.


## Setup

In [1]:
import dask
import numpy as np
import xarray as xr

First lets set up a `LocalCluster` using [dask.distributed](https://distributed.dask.org/).

You can use any kind of dask cluster. This step is completely independent of
xarray. While not strictly necessary, the dashboard provides a nice learning
tool.


In [2]:
from dask.distributed import Client

client = Client()
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 16,Total memory: 13.47 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:33457,Workers: 0
Dashboard: http://127.0.0.1:8787/status,Total threads: 0
Started: Just now,Total memory: 0 B

0,1
Comm: tcp://127.0.0.1:38267,Total threads: 4
Dashboard: http://127.0.0.1:36711/status,Memory: 3.37 GiB
Nanny: tcp://127.0.0.1:43379,
Local directory: /tmp/dask-scratch-space/worker-kaeoti17,Local directory: /tmp/dask-scratch-space/worker-kaeoti17

0,1
Comm: tcp://127.0.0.1:44583,Total threads: 4
Dashboard: http://127.0.0.1:42207/status,Memory: 3.37 GiB
Nanny: tcp://127.0.0.1:32963,
Local directory: /tmp/dask-scratch-space/worker-xu4u2tvo,Local directory: /tmp/dask-scratch-space/worker-xu4u2tvo

0,1
Comm: tcp://127.0.0.1:40455,Total threads: 4
Dashboard: http://127.0.0.1:43579/status,Memory: 3.37 GiB
Nanny: tcp://127.0.0.1:34433,
Local directory: /tmp/dask-scratch-space/worker-lkx0dkdw,Local directory: /tmp/dask-scratch-space/worker-lkx0dkdw

0,1
Comm: tcp://127.0.0.1:44451,Total threads: 4
Dashboard: http://127.0.0.1:38245/status,Memory: 3.37 GiB
Nanny: tcp://127.0.0.1:44839,
Local directory: /tmp/dask-scratch-space/worker-vgmqdvmg,Local directory: /tmp/dask-scratch-space/worker-vgmqdvmg


<p>&#128070</p> Click the Dashboard link above. Or click the "Search" button in the dashboard.

Let's test that the dashboard is working..


In [3]:
import dask.array

dask.array.ones((1000, 4), chunks=(2, 1)).compute()  # should see activity in dashboard

array([[1., 1., 1., 1.],
       [1., 1., 1., 1.],
       [1., 1., 1., 1.],
       ...,
       [1., 1., 1., 1.],
       [1., 1., 1., 1.],
       [1., 1., 1., 1.]], shape=(1000, 4))

Let's open a dataset. We specify `chunks` so that we create a dask arrays for the DataArrays.

Depending on the desired function to be applied on the chunks, it is vital to set the chunks correctly. Our goal is to compute the mean along the time dimension. Therefore we do not chunk the time dimension at all (indicated by `"time": -1`). We deliberately set `lat` and `lon` chunks to something smaller then the size of their respective dimension (otherwise we would potentially end up with a single big chunk for the entire `ds`).

In [4]:
ds = xr.tutorial.open_dataset("air_temperature", chunks={"time": -1, "lat": 5, "lon": 10})
ds

Unnamed: 0,Array,Chunk
Bytes,29.52 MiB,1.11 MiB
Shape,"(2920, 25, 53)","(2920, 5, 10)"
Dask graph,30 chunks in 2 graph layers,30 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 29.52 MiB 1.11 MiB Shape (2920, 25, 53) (2920, 5, 10) Dask graph 30 chunks in 2 graph layers Data type float64 numpy.ndarray",53  25  2920,

Unnamed: 0,Array,Chunk
Bytes,29.52 MiB,1.11 MiB
Shape,"(2920, 25, 53)","(2920, 5, 10)"
Dask graph,30 chunks in 2 graph layers,30 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


## Simple example

Here is an example

In [5]:
def time_mean(obj: xr.Dataset):
    # use xarray's convenient API here
    # you could convert to a pandas dataframe and use pandas' extensive API
    # or use .plot() and plt.savefig to save visualizations to disk in parallel.
    return obj.mean("time")

ds.map_blocks(time_mean)  # this is lazy!

Unnamed: 0,Array,Chunk
Bytes,10.35 kiB,400 B
Shape,"(25, 53)","(5, 10)"
Dask graph,30 chunks in 4 graph layers,30 chunks in 4 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 10.35 kiB 400 B Shape (25, 53) (5, 10) Dask graph 30 chunks in 4 graph layers Data type float64 numpy.ndarray",53  25,

Unnamed: 0,Array,Chunk
Bytes,10.35 kiB,400 B
Shape,"(25, 53)","(5, 10)"
Dask graph,30 chunks in 4 graph layers,30 chunks in 4 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [6]:
# this triggers the actual computation
ds.map_blocks(time_mean).compute()

In [7]:
# this will calculate values and will return True if the computation works as expected
ds.map_blocks(time_mean).equals(ds.mean("time"))

True

### Exercises


::::{admonition} Exercise 1
:class: tip

When opening the dataset, set the chunks for the dimension to anything smaller than the size of the time dimension (< 2920), e.g., `"time": 100`, and keep the size of the other chunks the same:

```python
ds = xr.tutorial.open_dataset(
    "air_temperature",
    chunks={"time": 100, "lat": 5, "lon": 10},
)
```

Now run the notebook again. The result of `ds.map_blocks(time_mean)` is no more equivalent to `ds.mean("time")`. Why does `ds.map_blocks(time_mean)` return a different result this time?

:::{admonition} Solution
:class: dropdown

Quoting from the documentation of `map_blocks`: _The function will receive a subset or ‘block’ of obj (see below), corresponding to one chunk along each chunked dimension._

`ds.mean("time")` computes the mean over the entire time dimension. In our example `ds.map_blocks(time_mean)` passes individual chunks of `ds` to `time_mean`. Once the time dimension is chunked, `time_mean` receives more than a single chunk along the dimension, meaning `time_mean` computes the mean along the time dimension for a single chunk rather than along the entire time dimension. Therefore we do not receive an identical result.

You can also modify the function to show the shape of the chunks passed to `time_mean`. Compare the output of the modified function with `ds.chunks` to find out how they relate to each other!

```python
def time_mean(obj: xr.Dataset):
    print(f"received obj of type {type(obj)}")
    print("obj contains the following data variables:")
    for data_var in obj.data_vars:
        print(f"'{data_var}' with shape {obj[data_var].shape}")

    return obj.mean("time")
```

:::
::::


::::{admonition} Exercise 2
:class: tip  

Try applying the following function with `map_blocks`. Specify `scale` as an
argument and `offset` as a kwarg.

The docstring should help:
https://docs.xarray.dev/en/stable/generated/xarray.map_blocks.html

```python
def time_mean_scaled(obj, scale, offset):
    return obj.mean("lat") * scale + offset
```

::::

### More advanced functions

`map_blocks` needs to know what the returned object looks like _exactly_. It
does so by passing a 0-shaped xarray object to the function and examining the
result. This approach cannot work in all cases For such advanced use cases,
`map_blocks` allows a `template` kwarg. See
https://docs.xarray.dev/en/stable/user-guide/dask.html#map-blocks for more details


In [8]:
client.close()