In [1]:
%load_ext cudf.pandas

<img src="https://docs.xarray.dev/en/stable/_static/dataset-diagram-logo.png" align="right" width="30%">


# A gentle introduction

`map_blocks` is inspired by the `dask.array` function of the same name and lets
you map a function on blocks of the xarray object (including Datasets!).

At _compute_ time, your function will receive an xarray object with concrete
(computed) values along with appropriate metadata. This function should return
an xarray object.


## Setup

In [2]:
%%cudf.pandas.profile

import dask
import numpy as np
import xarray as xr

First lets set up a `LocalCluster` using [dask.distributed](https://distributed.dask.org/).

You can use any kind of dask cluster. This step is completely independent of
xarray. While not strictly necessary, the dashboard provides a nice learning
tool.


In [3]:
%%cudf.pandas.profile

from dask.distributed import Client

client = Client()
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 10
Total threads: 80,Total memory: 0.98 TiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:43787,Workers: 10
Dashboard: http://127.0.0.1:8787/status,Total threads: 80
Started: Just now,Total memory: 0.98 TiB

0,1
Comm: tcp://127.0.0.1:42169,Total threads: 8
Dashboard: http://127.0.0.1:45097/status,Memory: 100.78 GiB
Nanny: tcp://127.0.0.1:35467,
Local directory: /raid/charlesb/.tmp/dask-scratch-space/worker-avqa3eyo,Local directory: /raid/charlesb/.tmp/dask-scratch-space/worker-avqa3eyo

0,1
Comm: tcp://127.0.0.1:37111,Total threads: 8
Dashboard: http://127.0.0.1:34927/status,Memory: 100.78 GiB
Nanny: tcp://127.0.0.1:33985,
Local directory: /raid/charlesb/.tmp/dask-scratch-space/worker-cx_3f_oq,Local directory: /raid/charlesb/.tmp/dask-scratch-space/worker-cx_3f_oq

0,1
Comm: tcp://127.0.0.1:39801,Total threads: 8
Dashboard: http://127.0.0.1:35107/status,Memory: 100.78 GiB
Nanny: tcp://127.0.0.1:41079,
Local directory: /raid/charlesb/.tmp/dask-scratch-space/worker-tuog4j_y,Local directory: /raid/charlesb/.tmp/dask-scratch-space/worker-tuog4j_y

0,1
Comm: tcp://127.0.0.1:46007,Total threads: 8
Dashboard: http://127.0.0.1:41047/status,Memory: 100.78 GiB
Nanny: tcp://127.0.0.1:44543,
Local directory: /raid/charlesb/.tmp/dask-scratch-space/worker-xo0bmqcu,Local directory: /raid/charlesb/.tmp/dask-scratch-space/worker-xo0bmqcu

0,1
Comm: tcp://127.0.0.1:36005,Total threads: 8
Dashboard: http://127.0.0.1:35421/status,Memory: 100.78 GiB
Nanny: tcp://127.0.0.1:35851,
Local directory: /raid/charlesb/.tmp/dask-scratch-space/worker-4a7png1b,Local directory: /raid/charlesb/.tmp/dask-scratch-space/worker-4a7png1b

0,1
Comm: tcp://127.0.0.1:32937,Total threads: 8
Dashboard: http://127.0.0.1:41949/status,Memory: 100.78 GiB
Nanny: tcp://127.0.0.1:42205,
Local directory: /raid/charlesb/.tmp/dask-scratch-space/worker-muv1ioen,Local directory: /raid/charlesb/.tmp/dask-scratch-space/worker-muv1ioen

0,1
Comm: tcp://127.0.0.1:40869,Total threads: 8
Dashboard: http://127.0.0.1:41049/status,Memory: 100.78 GiB
Nanny: tcp://127.0.0.1:40307,
Local directory: /raid/charlesb/.tmp/dask-scratch-space/worker-na_ty1v1,Local directory: /raid/charlesb/.tmp/dask-scratch-space/worker-na_ty1v1

0,1
Comm: tcp://127.0.0.1:43655,Total threads: 8
Dashboard: http://127.0.0.1:46409/status,Memory: 100.78 GiB
Nanny: tcp://127.0.0.1:44453,
Local directory: /raid/charlesb/.tmp/dask-scratch-space/worker-37lr5zf1,Local directory: /raid/charlesb/.tmp/dask-scratch-space/worker-37lr5zf1

0,1
Comm: tcp://127.0.0.1:36239,Total threads: 8
Dashboard: http://127.0.0.1:41649/status,Memory: 100.78 GiB
Nanny: tcp://127.0.0.1:35157,
Local directory: /raid/charlesb/.tmp/dask-scratch-space/worker-lso5gowe,Local directory: /raid/charlesb/.tmp/dask-scratch-space/worker-lso5gowe

0,1
Comm: tcp://127.0.0.1:35829,Total threads: 8
Dashboard: http://127.0.0.1:43471/status,Memory: 100.78 GiB
Nanny: tcp://127.0.0.1:43731,
Local directory: /raid/charlesb/.tmp/dask-scratch-space/worker-f8guev34,Local directory: /raid/charlesb/.tmp/dask-scratch-space/worker-f8guev34


<p>&#128070</p> Click the Dashboard link above. Or click the "Search" button in the dashboard.

Let's test that the dashboard is working..


In [4]:
%%cudf.pandas.profile

import dask.array

dask.array.ones((1000, 4), chunks=(2, 1)).compute()  # should see activity in dashboard

array([[1., 1., 1., 1.],
       [1., 1., 1., 1.],
       [1., 1., 1., 1.],
       ...,
       [1., 1., 1., 1.],
       [1., 1., 1., 1.],
       [1., 1., 1., 1.]])

Let's open a dataset. We specify `chunks` so that we create a dask arrays for the DataArrays

In [5]:
%%cudf.pandas.profile

ds = xr.tutorial.open_dataset("air_temperature", chunks={"time": 100})
ds

Unnamed: 0,Array,Chunk
Bytes,14.76 MiB,517.58 kiB
Shape,"(2920, 25, 53)","(100, 25, 53)"
Dask graph,30 chunks in 2 graph layers,30 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 14.76 MiB 517.58 kiB Shape (2920, 25, 53) (100, 25, 53) Dask graph 30 chunks in 2 graph layers Data type float32 numpy.ndarray",53  25  2920,

Unnamed: 0,Array,Chunk
Bytes,14.76 MiB,517.58 kiB
Shape,"(2920, 25, 53)","(100, 25, 53)"
Dask graph,30 chunks in 2 graph layers,30 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


## Simple example

Here is an example

In [6]:
%%cudf.pandas.profile

def time_mean(obj):
    # use xarray's convenient API here
    # you could convert to a pandas dataframe and use pandas' extensive API
    # or use .plot() and plt.savefig to save visualizations to disk in parallel.
    return obj.mean("lat")


ds.map_blocks(time_mean)  # this is lazy!

Unnamed: 0,Array,Chunk
Bytes,604.53 kiB,20.70 kiB
Shape,"(2920, 53)","(100, 53)"
Dask graph,30 chunks in 4 graph layers,30 chunks in 4 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 604.53 kiB 20.70 kiB Shape (2920, 53) (100, 53) Dask graph 30 chunks in 4 graph layers Data type float32 numpy.ndarray",53  2920,

Unnamed: 0,Array,Chunk
Bytes,604.53 kiB,20.70 kiB
Shape,"(2920, 53)","(100, 53)"
Dask graph,30 chunks in 4 graph layers,30 chunks in 4 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


In [7]:
%%cudf.pandas.profile

# this will calculate values and will return True if the computation works as expected
ds.map_blocks(time_mean).identical(ds.mean("lat"))

2023-11-08 09:54:53,421 - distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
  File "/datasets/charlesb/miniforge3/envs/cudf-xarray-tutorial/lib/python3.10/site-packages/distributed/protocol/core.py", line 160, in loads
    return msgpack.loads(
  File "msgpack/_unpacker.pyx", line 194, in msgpack._cmsgpack.unpackb
  File "/datasets/charlesb/miniforge3/envs/cudf-xarray-tutorial/lib/python3.10/site-packages/distributed/protocol/core.py", line 152, in _decode_default
    return pickle.loads(sub_header["pickled-obj"], buffers=sub_frames)
  File "/datasets/charlesb/miniforge3/envs/cudf-xarray-tutorial/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 96, in loads
    return pickle.loads(x)
_pickle.UnpicklingError: state is not a dictionary
2023-11-08 09:54:53,431 - distributed.worker - ERROR - Scheduler was unaware of this worker 'tcp://127.0.0.1:40869'. Shutting down.
2023-11-08 09:54:53,435 - distributed.protocol.core - CRITIC

KilledWorker: Attempted to run task ('time-time_mean-7e1883d802e4b710e8261c5e360e36a3-6025c4426ac76cd4faff28449686d765', 21, 0, 0) on 4 different workers, but all those workers died while running it. The last worker that attempt to run the task was tcp://127.0.0.1:46007. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html.

### Exercise

Try applying the following function with `map_blocks`. Specify `scale` as an
argument and `offset` as a kwarg.

The docstring should help:
https://docs.xarray.dev/en/stable/generated/xarray.map_blocks.html

```
def time_mean_scaled(obj, scale, offset):
    return obj.mean("lat") * scale + offset
```


### More advanced functions

`map_blocks` needs to know what the returned object looks like _exactly_. It
does so by passing a 0-shaped xarray object to the function and examining the
result. This approach cannot work in all cases For such advanced use cases,
`map_blocks` allows a `template` kwarg. See
https://docs.xarray.dev/en/stable/user-guide/dask.html#map-blocks for more details


In [8]:
%%cudf.pandas.profile

client.close()