# Chunking with xarray and dask

A basic example of loading a netCDF4 dataset using `xarray`, chunking it using `dask`, and saving it to disk in `zarr` format

Author: Charles Blackmon-Luca

## Getting started

We start by importing the necessary packages:

In [1]:
import xarray as xr
import dask
import zarr

Note that we are using developmental versions of `xarray` and `zarr` - this is so we can take advantage of `zarr`'s consolidated metadata:

In [2]:
print(xr.__version__)
print(zarr.__version__)

0.11.1+64.g612d390
2.2.1.dev140


To get a grasp of dask's functionality beyond cloud computing, we will also use a local distributed scheduler, which can be viewed by starting up a `Client` through `dask.distributed`:

In [3]:
from dask.distributed import Client, LocalCluster

client  = Client()
client

0,1
Client  Scheduler: tcp://127.0.0.1:45868  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 16  Memory: 135.44 GB


We can check the progress of this scheduler by following the link above to the Dashboard - we will need to forward port `8787` to our local machine using `ssh -L`:

```
ssh -L 8787:localhost:8787 tracmip@weather.rsmas.miami.edu
```

## Monthly data

With the dashboard open, let's try opening a random dataset - ECHAM-6.3's LandOrbit experiment with a monthly timestep. When working with data we find is commonly used for climatologies, we may prefer to chunk by time, as we'd prefer to load the entire space for any given plot or computation. When working with pressure-sensitive data, we may prefer to chunk by pressure levels. Overall, take into consideration that we want our chunks to be around 10-100 MB in size; when in doubt, use `'auto'` to quickly select chunk sizes you *think* may need to be chunked but don't necessarily know a good size for:

In [4]:
monthly = xr.open_mfdataset('/data2/tracmip/ECHAM-6.3/LandOrbit/Amon/*.nc').chunk(chunks={'time' : 'auto', 'plev' : 'auto'})
monthly

<xarray.Dataset>
Dimensions:  (lat: 96, lon: 192, plev: 17, time: 480)
Coordinates:
  * lon      (lon) float64 0.0 1.875 3.75 5.625 7.5 ... 352.5 354.4 356.2 358.1
  * lat      (lat) float64 88.57 86.72 84.86 83.0 ... -83.0 -84.86 -86.72 -88.57
  * plev     (plev) float64 1e+05 9.25e+04 8.5e+04 7e+04 ... 3e+03 2e+03 1e+03
  * time     (time) datetime64[ns] 2066-01-30T23:52:00 ... 2105-12-30T23:52:00
Data variables:
    cl       (time, plev, lat, lon) float32 dask.array<shape=(480, 17, 96, 192), chunksize=(160, 10, 96, 192)>
    cli      (time, plev, lat, lon) float32 dask.array<shape=(480, 17, 96, 192), chunksize=(160, 10, 96, 192)>
    clivi    (time, lat, lon) float32 dask.array<shape=(480, 96, 192), chunksize=(480, 96, 192)>
    clt      (time, lat, lon) float32 dask.array<shape=(480, 96, 192), chunksize=(480, 96, 192)>
    clw      (time, plev, lat, lon) float32 dask.array<shape=(480, 17, 96, 192), chunksize=(160, 10, 96, 192)>
    clwvi    (time, lat, lon) float32 dask.array<shape

Once we have the data loaded and chunked, we can convert it to `zarr`, where the chunking will be retained. This process shouldn't take very long, as the amount of data is relatively small; the progress of this conversion can be viewed from our dashboard. Note our option `consolidated=True`, which will opt to store metadata for each variable in one single file rather than in each chunk:

In [5]:
!rm -rf /data2/tracmip/zarr/test/
monthly.to_zarr('/data2/tracmip/zarr/test/', consolidated=True)

<xarray.backends.zarr.ZarrStore at 0x7f1a2b875a58>

Now that our data is saved to disk, we can inspect the chunk size in terminal to make sure it is reasonable:

In [6]:
!ls -lh /data2/tracmip/zarr/test/zg

total 292M
-rw------- 1 tracmip tracmip 59M Feb 21 15:27 0.0.0.0
-rw------- 1 tracmip tracmip 39M Feb 21 15:27 0.1.0.0
-rw------- 1 tracmip tracmip 59M Feb 21 15:27 1.0.0.0
-rw------- 1 tracmip tracmip 39M Feb 21 15:27 1.1.0.0
-rw------- 1 tracmip tracmip 59M Feb 21 15:27 2.0.0.0
-rw------- 1 tracmip tracmip 39M Feb 21 15:27 2.1.0.0


## Daily data

The monthly data is chunked well, but it is not representative of the data that would benefit from cloud computing. A better example is LandOrbit data with a daily timestep:

In [7]:
daily = xr.open_mfdataset('/data2/tracmip/ECHAM-6.3/LandOrbit/Aday/*.nc').chunk(chunks={'time' : 'auto', 'plev' : 'auto'})
daily

<xarray.Dataset>
Dimensions:  (lat: 96, lon: 192, plev: 17, time: 3600)
Coordinates:
  * lon      (lon) float64 0.0 1.875 3.75 5.625 7.5 ... 352.5 354.4 356.2 358.1
  * lat      (lat) float64 88.57 86.72 84.86 83.0 ... -83.0 -84.86 -86.72 -88.57
  * plev     (plev) float64 1e+05 9.25e+04 8.5e+04 7e+04 ... 3e+03 2e+03 1e+03
  * time     (time) datetime64[ns] 2096-01-01T23:52:00 ... 2105-12-30T23:52:00
Data variables:
    cl       (time, plev, lat, lon) float32 dask.array<shape=(3600, 17, 96, 192), chunksize=(720, 2, 96, 192)>
    cli      (time, plev, lat, lon) float32 dask.array<shape=(3600, 17, 96, 192), chunksize=(720, 2, 96, 192)>
    clivi    (time, lat, lon) float32 dask.array<shape=(3600, 96, 192), chunksize=(1800, 96, 192)>
    clt      (time, lat, lon) float32 dask.array<shape=(3600, 96, 192), chunksize=(1800, 96, 192)>
    clw      (time, plev, lat, lon) float32 dask.array<shape=(3600, 17, 96, 192), chunksize=(720, 2, 96, 192)>
    clwvi    (time, lat, lon) float32 dask.array<

This process should take notably longer, but still benefits from the `Client` we initialized above:

In [8]:
!rm -rf /data2/tracmip/zarr/test/
daily.to_zarr('/data2/tracmip/zarr/test/', consolidated=True)

<xarray.backends.zarr.ZarrStore at 0x7f1a2b4eb710>

As we can imagine, there are much more chunks:

In [9]:
!ls -lh /data2/tracmip/zarr/test/zg

total 2.3G
-rw------- 1 tracmip tracmip 56M Feb 21 15:28 0.0.0.0
-rw------- 1 tracmip tracmip 56M Feb 21 15:28 0.1.0.0
-rw------- 1 tracmip tracmip 56M Feb 21 15:28 0.2.0.0
-rw------- 1 tracmip tracmip 56M Feb 21 15:28 0.3.0.0
-rw------- 1 tracmip tracmip 56M Feb 21 15:28 0.4.0.0
-rw------- 1 tracmip tracmip 54M Feb 21 15:28 0.5.0.0
-rw------- 1 tracmip tracmip 53M Feb 21 15:28 0.6.0.0
-rw------- 1 tracmip tracmip 52M Feb 21 15:28 0.7.0.0
-rw------- 1 tracmip tracmip 26M Feb 21 15:28 0.8.0.0
-rw------- 1 tracmip tracmip 56M Feb 21 15:28 1.0.0.0
-rw------- 1 tracmip tracmip 56M Feb 21 15:28 1.1.0.0
-rw------- 1 tracmip tracmip 56M Feb 21 15:28 1.2.0.0
-rw------- 1 tracmip tracmip 56M Feb 21 15:28 1.3.0.0
-rw------- 1 tracmip tracmip 56M Feb 21 15:28 1.4.0.0
-rw------- 1 tracmip tracmip 54M Feb 21 15:28 1.5.0.0
-rw------- 1 tracmip tracmip 53M Feb 21 15:28 1.6.0.0
-rw------- 1 tracmip tracmip 52M Feb 21 15:28 1.7.0.0
-rw------- 1 tracmip tracmip 26M Feb 21 15:28 1.8.0.0
-rw------- 1 trac

Once in `zarr` format, we can interact with the datasets through `zarr`'s own interface, which is more compatible with JupyterLab:

In [10]:
zgroup = zarr.open('/data2/tracmip/zarr/test/')
zgroup.zg.info

0,1
Name,/zg
Type,zarr.core.Array
Data type,float32
Shape,"(3600, 17, 96, 192)"
Chunk shape,"(720, 2, 96, 192)"
Order,C
Read-only,False
Compressor,"Blosc(cname='lz4', clevel=5, shuffle=SHUFFLE, blocksize=0)"
Store type,zarr.storage.DirectoryStore
No. bytes,4512153600 (4.2G)


Here, we can get some insight on what type of data is contained, and how it is stored and compressed.