# Lazy evaluation on Dask arrays

If you are unfamiliar with Dask, read [Parallel computing with Dask](http://xarray.pydata.org/en/stable/dask.html) in Xarray documentation first. The current version only supports dask arrays on a single machine. Support of [Dask.distributed](https://distributed.dask.org) is in roadmap.

xESMF's Dask support is mostly for [lazy evaluation](https://en.wikipedia.org/wiki/Lazy_evaluation) and [out-of-core computing](https://en.wikipedia.org/wiki/External_memory_algorithm), to allow processing large volumes of data with limited memory. You might also get moderate speed-up on a multi-core machine by [choosing proper chunk sizes](http://xarray.pydata.org/en/stable/dask.html#chunking-and-performance), but that generally won't help your entire pipeline too much, because the read-regrid-write pipeline is severely I/O limited (see [this issue](https://github.com/pangeo-data/pangeo/issues/334) for more discussions). On a single machine, the disk bandwidth is typically limited to ~500 MB/s, and you cannot process data faster than such rate. If you need much faster data processing rate, you should resort to parallel file systems on HPC clusters or distributed storage on public cloud platforms. Please refer to the [Pangeo project](http://pangeo.io/) for more information.

In [1]:
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np
import dask.array as da  # need to have dask.array installed, although not directly using it here.
import xarray as xr
import xesmf as xe

## A simple example

### Prepare input data

In [2]:
ds = xr.tutorial.open_dataset('air_temperature', chunks={'time': 500})
ds

Unnamed: 0,Array,Chunk
Bytes,15.48 MB,2.65 MB
Shape,"(2920, 25, 53)","(500, 25, 53)"
Count,7 Tasks,6 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 15.48 MB 2.65 MB Shape (2920, 25, 53) (500, 25, 53) Count 7 Tasks 6 Chunks Type float32 numpy.ndarray",53  25  2920,

Unnamed: 0,Array,Chunk
Bytes,15.48 MB,2.65 MB
Shape,"(2920, 25, 53)","(500, 25, 53)"
Count,7 Tasks,6 Chunks
Type,float32,numpy.ndarray


In [3]:
ds.chunks

Frozen(SortedKeysDict({'time': (500, 500, 500, 500, 500, 420), 'lat': (25,), 'lon': (53,)}))

In [4]:
ds['air'].data

Unnamed: 0,Array,Chunk
Bytes,15.48 MB,2.65 MB
Shape,"(2920, 25, 53)","(500, 25, 53)"
Count,7 Tasks,6 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 15.48 MB 2.65 MB Shape (2920, 25, 53) (500, 25, 53) Count 7 Tasks 6 Chunks Type float32 numpy.ndarray",53  25  2920,

Unnamed: 0,Array,Chunk
Bytes,15.48 MB,2.65 MB
Shape,"(2920, 25, 53)","(500, 25, 53)"
Count,7 Tasks,6 Chunks
Type,float32,numpy.ndarray


### Build regridder

In [5]:
ds_out = xr.Dataset({'lat': (['lat'], np.arange(16, 75, 1.0)),
                     'lon': (['lon'], np.arange(200, 330, 1.5)),
                    }
                   )

regridder = xe.Regridder(ds, ds_out, 'bilinear')
regridder

xESMF Regridder 
Regridding algorithm:       bilinear 
Input grid shape:           (25, 53) 
Output grid shape:          (59, 87) 
Output grid dimension name: ('lat', 'lon') 
Periodic in longitude?      False

### Apply to xarray Dataset/DataArray

In [6]:
# only build the dask graph; actual computation happens later when calling compute()
%time ds_out = regridder(ds)
ds_out

using dimensions ('lat', 'lon') from data variable air as the horizontal dimensions for this dataset.
CPU times: user 2.09 ms, sys: 0 ns, total: 2.09 ms
Wall time: 2.06 ms


Unnamed: 0,Array,Chunk
Bytes,119.91 MB,20.53 MB
Shape,"(2920, 59, 87)","(500, 59, 87)"
Count,13 Tasks,6 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 119.91 MB 20.53 MB Shape (2920, 59, 87) (500, 59, 87) Count 13 Tasks 6 Chunks Type float64 numpy.ndarray",87  59  2920,

Unnamed: 0,Array,Chunk
Bytes,119.91 MB,20.53 MB
Shape,"(2920, 59, 87)","(500, 59, 87)"
Count,13 Tasks,6 Chunks
Type,float64,numpy.ndarray


In [7]:
ds_out['air'].data  # chunks are preserved

Unnamed: 0,Array,Chunk
Bytes,119.91 MB,20.53 MB
Shape,"(2920, 59, 87)","(500, 59, 87)"
Count,13 Tasks,6 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 119.91 MB 20.53 MB Shape (2920, 59, 87) (500, 59, 87) Count 13 Tasks 6 Chunks Type float64 numpy.ndarray",87  59  2920,

Unnamed: 0,Array,Chunk
Bytes,119.91 MB,20.53 MB
Shape,"(2920, 59, 87)","(500, 59, 87)"
Count,13 Tasks,6 Chunks
Type,float64,numpy.ndarray


In [8]:
%time result = ds_out['air'].compute()  # actually applies regridding

CPU times: user 298 ms, sys: 212 ms, total: 510 ms
Wall time: 154 ms


In [9]:
type(result.data), result.data.shape

(numpy.ndarray, (2920, 59, 87))

## Invalid chunk sizes to avoid

Dask support relies on `xarray.apply_ufunc`, which requires only chunking over extra/broadcasting dimensions (like `time` and `lev`), not horizontal/core dimensions (`lat`, `lon`, or `x`, `y`).

In [10]:
# xESMF doesn't like chunking over horizontal dimensions
ds_bad = ds.chunk({'lat': 10, 'lon': 10, 'time': None})
ds_bad

Unnamed: 0,Array,Chunk
Bytes,15.48 MB,1.17 MB
Shape,"(2920, 25, 53)","(2920, 10, 10)"
Count,44 Tasks,18 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 15.48 MB 1.17 MB Shape (2920, 25, 53) (2920, 10, 10) Count 44 Tasks 18 Chunks Type float32 numpy.ndarray",53  25  2920,

Unnamed: 0,Array,Chunk
Bytes,15.48 MB,1.17 MB
Shape,"(2920, 25, 53)","(2920, 10, 10)"
Count,44 Tasks,18 Chunks
Type,float32,numpy.ndarray


In [11]:
# regridder(ds_bad)  # uncomment this line to see the error message

In [12]:
# besides rechunking data properly, another simple fix is to convert to numpy array without chunking
regridder(ds_bad.load())

using dimensions ('lat', 'lon') from data variable air as the horizontal dimensions for this dataset.
