# Data Chunking with Dask

In this notebook we demonstrate:

* Xarray + Dask
* NetCDF file Chunks versus Dask Chunks
* chunk shapes

The following material uses Coupled Model Intercomparison Project (CMIP6) collections. Please visit data collection [catalogue](https://geonetwork.nci.org.au/geonetwork/srv/eng/catalog.search#/metadata/f6600_2266_8675_3563) and [CMIP6 terms of use](https://pcmdi.llnl.gov/CMIP6/TermsOfUse/TermsOfUse6-1.html) for more information. 

---

- Authors: NCI Virtual Research Environment Team
- Keywords: CMIP6, Xarray, Dask, Diagnostics
- Create Date: 2019-June; Update Date: 2020-May

### Prerequisite

You need to run this notebook on Gadi/VDI (recommended), or on your local computer if CMIP6 data can be accessed from remote links via data services. The following modules are needed:

* Xarray
* Dask
* NetCDF

You also need to be a member of the following data project to access the data or through [NCI's data service](http://dapds00.nci.org.au/thredds/catalog.html) if the data is published:
* oi10

You can request to join the project through [NCI's user account management system](https://my.nci.org.au). 

### Load the required modules

In [1]:
import xarray as xr
import netCDF4 as nc
import time
%matplotlib inline

Starting the Dask Client is optional. It will provide a dashboard which is useful to gain insight on the computation.

The link to the dashboard will become visible when you create the client below. We recommend having it open on one side of your screen while using your notebook on the other side. This can take some effort to arrange your windows, but seeing them both at the same is very useful when learning.

In [2]:
# Create cluster
from dask.distributed import Client,LocalCluster
client = Client(scheduler_file='scheduler.json')
print(client)

<Client: 'tcp://10.6.75.59:8710' processes=24 threads=24, memory=103.08 GB>



numpy
+------------------------+---------+
|                        | version |
+------------------------+---------+
| client                 | 1.18.4  |
| scheduler              | 1.18.4  |
| tcp://10.6.75.59:32779 | 1.17.2  |
| tcp://10.6.75.59:32971 | 1.17.2  |
| tcp://10.6.75.59:33113 | 1.17.2  |
| tcp://10.6.75.59:33537 | 1.17.2  |
| tcp://10.6.75.59:35163 | 1.17.2  |
| tcp://10.6.75.59:36413 | 1.17.2  |
| tcp://10.6.75.59:36643 | 1.17.2  |
| tcp://10.6.75.59:38199 | 1.17.2  |
| tcp://10.6.75.59:38745 | 1.17.2  |
| tcp://10.6.75.59:39025 | 1.17.2  |
| tcp://10.6.75.59:39703 | 1.17.2  |
| tcp://10.6.75.59:40069 | 1.17.2  |
| tcp://10.6.75.59:40307 | 1.17.2  |
| tcp://10.6.75.59:40577 | 1.17.2  |
| tcp://10.6.75.59:40759 | 1.17.2  |
| tcp://10.6.75.59:41563 | 1.17.2  |
| tcp://10.6.75.59:41763 | 1.17.2  |
| tcp://10.6.75.59:42289 | 1.17.2  |
| tcp://10.6.75.59:42361 | 1.17.2  |
| tcp://10.6.75.59:42725 | 1.17.2  |
| tcp://10.6.75.59:43589 | 1.17.2  |
| tcp://10.6.75.59:43981 | 1.17

### Data

We will use percipitation data update of RCP8.5 based on SSP5 from GFDL-CM4 ensembles in this example, let's have a look at the data.

In [14]:
!ncdump -hst '/g/data/oi10/replicas/CMIP6/ScenarioMIP/NOAA-GFDL/GFDL-CM4/ssp585/r1i1p1f1/day/pr/gr1/v20180701/pr_day_GFDL-CM4_ssp585_r1i1p1f1_gr1_20150101-20341231.nc'

netcdf pr_day_GFDL-CM4_ssp585_r1i1p1f1_gr1_20150101-20341231 {
dimensions:
	lat = 180 ;
	bnds = 2 ;
	lon = 288 ;
	time = UNLIMITED ; // (7300 currently)
variables:
	double lat(lat) ;
		lat:long_name = "latitude" ;
		lat:units = "degrees_north" ;
		lat:axis = "Y" ;
		lat:bounds = "lat_bnds" ;
		lat:standard_name = "latitude" ;
		lat:cell_methods = "time: point" ;
		lat:_Storage = "chunked" ;
		lat:_ChunkSizes = 180 ;
		lat:_DeflateLevel = 2 ;
		lat:_Shuffle = "true" ;
		lat:_Endianness = "little" ;
	double lat_bnds(lat, bnds) ;
		lat_bnds:long_name = "latitude bounds" ;
		lat_bnds:units = "degrees_north" ;
		lat_bnds:axis = "Y" ;
		lat_bnds:_Storage = "chunked" ;
		lat_bnds:_ChunkSizes = 180, 2 ;
		lat_bnds:_DeflateLevel = 2 ;
		lat_bnds:_Shuffle = "true" ;
		lat_bnds:_Endianness = "little" ;
	double lon(lon) ;
		lon:long_name = "longitude" ;
		lon:units = "degrees_east" ;
		lon:axis = "X" ;
		lon:bounds = "lon_bnds" ;
		lon:standard_name = "longitude" ;
		lon:cell_methods = "time: poin

### Xarray + Dask

Xarray can automatically wrap its data in dask arrays. This capability turns xarray into an extremely powerful tool for Big Data earth science

To see this in action, we will download a fairly large dataset to analyze. We use xarray's `open_mfdataset` allows multiple files to be opened simultaneously.

In [15]:
!ls /g/data/oi10/replicas/CMIP6/ScenarioMIP/NOAA-GFDL/GFDL-CM4/ssp585/r1i1p1f1/day/pr/gr1/v20180701
path = '/g/data/oi10/replicas/CMIP6/ScenarioMIP/NOAA-GFDL/GFDL-CM4/ssp585/r1i1p1f1/day/pr/gr1/v20180701/*'

pr_day_GFDL-CM4_ssp585_r1i1p1f1_gr1_20150101-20341231.nc
pr_day_GFDL-CM4_ssp585_r1i1p1f1_gr1_20350101-20541231.nc
pr_day_GFDL-CM4_ssp585_r1i1p1f1_gr1_20550101-20741231.nc
pr_day_GFDL-CM4_ssp585_r1i1p1f1_gr1_20750101-20941231.nc
pr_day_GFDL-CM4_ssp585_r1i1p1f1_gr1_20950101-21001231.nc


In [16]:
f_ssp585 = xr.open_mfdataset(path, combine='by_coords')
f_ssp585

Unnamed: 0,Array,Chunk
Bytes,90.40 MB,21.02 MB
Shape,"(31390, 180, 2)","(7300, 180, 2)"
Count,20 Tasks,5 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 90.40 MB 21.02 MB Shape (31390, 180, 2) (7300, 180, 2) Count 20 Tasks 5 Chunks Type float64 numpy.ndarray",2  180  31390,

Unnamed: 0,Array,Chunk
Bytes,90.40 MB,21.02 MB
Shape,"(31390, 180, 2)","(7300, 180, 2)"
Count,20 Tasks,5 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,144.65 MB,33.64 MB
Shape,"(31390, 288, 2)","(7300, 288, 2)"
Count,20 Tasks,5 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 144.65 MB 33.64 MB Shape (31390, 288, 2) (7300, 288, 2) Count 20 Tasks 5 Chunks Type float64 numpy.ndarray",2  288  31390,

Unnamed: 0,Array,Chunk
Bytes,144.65 MB,33.64 MB
Shape,"(31390, 288, 2)","(7300, 288, 2)"
Count,20 Tasks,5 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,6.51 GB,1.51 GB
Shape,"(31390, 180, 288)","(7300, 180, 288)"
Count,15 Tasks,5 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 6.51 GB 1.51 GB Shape (31390, 180, 288) (7300, 180, 288) Count 15 Tasks 5 Chunks Type float32 numpy.ndarray",288  180  31390,

Unnamed: 0,Array,Chunk
Bytes,6.51 GB,1.51 GB
Shape,"(31390, 180, 288)","(7300, 180, 288)"
Count,15 Tasks,5 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,502.24 kB,116.80 kB
Shape,"(31390, 2)","(7300, 2)"
Count,15 Tasks,5 Chunks
Type,object,numpy.ndarray
"Array Chunk Bytes 502.24 kB 116.80 kB Shape (31390, 2) (7300, 2) Count 15 Tasks 5 Chunks Type object numpy.ndarray",2  31390,

Unnamed: 0,Array,Chunk
Bytes,502.24 kB,116.80 kB
Shape,"(31390, 2)","(7300, 2)"
Count,15 Tasks,5 Chunks
Type,object,numpy.ndarray


<div class="alert alert-warning">
<b>NOTE:</b> the values are not displayed, since that would trigger computation.
</div>

### Chunks

Notice that it says:`pr(time, lat, lon) float32 dask.array<chunksize=(7300, 180, 288), meta=np.ndarray>`. There is now the `chunksize` component. The data array also become an dask array.

The chunking of the array comes from the integration of Dask with xarray. Dask divides the data array into small pieces called "chunks", with each chunk designed to be small enough to fit into memory. 

The file itself may be already chunked. Filesystem chunking is available in netCDF-4 and HDF5 datasets. CMIP6 data should all be netCDF-4 and include some form of chunking on the file.

In [17]:
!ncdump -hst '/g/data/oi10/replicas/CMIP6/ScenarioMIP/NOAA-GFDL/GFDL-CM4/ssp585/r1i1p1f1/day/pr/gr1/v20180701/pr_day_GFDL-CM4_ssp585_r1i1p1f1_gr1_20150101-20341231.nc'

netcdf pr_day_GFDL-CM4_ssp585_r1i1p1f1_gr1_20150101-20341231 {
dimensions:
	lat = 180 ;
	bnds = 2 ;
	lon = 288 ;
	time = UNLIMITED ; // (7300 currently)
variables:
	double lat(lat) ;
		lat:long_name = "latitude" ;
		lat:units = "degrees_north" ;
		lat:axis = "Y" ;
		lat:bounds = "lat_bnds" ;
		lat:standard_name = "latitude" ;
		lat:cell_methods = "time: point" ;
		lat:_Storage = "chunked" ;
		lat:_ChunkSizes = 180 ;
		lat:_DeflateLevel = 2 ;
		lat:_Shuffle = "true" ;
		lat:_Endianness = "little" ;
	double lat_bnds(lat, bnds) ;
		lat_bnds:long_name = "latitude bounds" ;
		lat_bnds:units = "degrees_north" ;
		lat_bnds:axis = "Y" ;
		lat_bnds:_Storage = "chunked" ;
		lat_bnds:_ChunkSizes = 180, 2 ;
		lat_bnds:_DeflateLevel = 2 ;
		lat_bnds:_Shuffle = "true" ;
		lat_bnds:_Endianness = "little" ;
	double lon(lon) ;
		lon:long_name = "longitude" ;
		lon:units = "degrees_east" ;
		lon:axis = "X" ;
		lon:bounds = "lon_bnds" ;
		lon:standard_name = "longitude" ;
		lon:cell_methods = "time: poin

#### In this case the file is chunked such that  `pr:_ChunkSizes = 1, 180, 288 ;`

Here we see that the data is chunked in time, where one chunk is one time-step and all points in lat-lon.

![](images/Chunks.png)
image source: https://www.unidata.ucar.edu/blogs/developer/en/entry/chunking_data_why_it_matters

Consider 2 types of data access
1. Accessing a 2D lat-lon slice in time (RHS figure)
2. Accessing a time series at a single lat-lon point (LHS figure)

With the chunking above, the first type of data access only needs to access a single chunk, while the second type needs to access ALL the chunks of the data array regardless. This dataset will be fastest for 2D lat-lon single time-step data access.

In general, even without chunking - when the data is accessed contiguously (by index order) - time is the slowest variable to access, then y, with x being the fastest. With the chunking method of this CMIP6 dataset, time still remains the slowest variable. For more uniform variable access speeds more evenly spaced chunks would be needed, spacing the chunks in time, lat, and lon.

### Exercise

Time how long it takes to load the precipitation data at `time='2015-01-01'` and then time how long it takes to load the data at `lat=0` and `lon=180` (remember to use `method='nearest'` for the latter case). How much difference is there in the different access methods?


In [18]:
%%time
f_ssp585.pr.sel(time='2015-01-01').load()

CPU times: user 11.8 ms, sys: 1.49 ms, total: 13.3 ms
Wall time: 37.4 ms


In [19]:
%%time
f_ssp585.pr.sel(lat=0,lon=180,method='nearest').load()

CPU times: user 29.7 ms, sys: 0 ns, total: 29.7 ms
Wall time: 5.95 s


<div class="alert alert-info">
<b>NOTE:</b> If you look at the dashboard, the tast stream actually shows that the most time consuming part is data concatenation. 
</div>

### Approximately the same amount of data took tens-to-hundreds times longer to load.

The spatial dataset contains 51840 data-points and took order 100ms to load. The time-series dataset has 31390 data-points and took order 10,000 ms to load.

Chunking and the ways in which the data is read is important in considering both how you access the data and if you wish to parallelise your code.


### NetCDF file Chunks versus Dask Chunks

Keep in mind, dask chunking is different to chunking of the stored data. As we saw, the stored data is chunked with chunks of size (1,180,288). The Dask array was chunked with size (7300, 180, 288). You can change the chunking in the dask array. In the below example we are specifying that there be 730 chunks in time.

In [20]:
f_ssp585 = xr.open_mfdataset(path,chunks={'time':730}, combine='by_coords')

f_ssp585

Unnamed: 0,Array,Chunk
Bytes,90.40 MB,21.02 MB
Shape,"(31390, 180, 2)","(7300, 180, 2)"
Count,20 Tasks,5 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 90.40 MB 21.02 MB Shape (31390, 180, 2) (7300, 180, 2) Count 20 Tasks 5 Chunks Type float64 numpy.ndarray",2  180  31390,

Unnamed: 0,Array,Chunk
Bytes,90.40 MB,21.02 MB
Shape,"(31390, 180, 2)","(7300, 180, 2)"
Count,20 Tasks,5 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,144.65 MB,33.64 MB
Shape,"(31390, 288, 2)","(7300, 288, 2)"
Count,20 Tasks,5 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 144.65 MB 33.64 MB Shape (31390, 288, 2) (7300, 288, 2) Count 20 Tasks 5 Chunks Type float64 numpy.ndarray",2  288  31390,

Unnamed: 0,Array,Chunk
Bytes,144.65 MB,33.64 MB
Shape,"(31390, 288, 2)","(7300, 288, 2)"
Count,20 Tasks,5 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,6.51 GB,151.37 MB
Shape,"(31390, 180, 288)","(730, 180, 288)"
Count,91 Tasks,43 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 6.51 GB 151.37 MB Shape (31390, 180, 288) (730, 180, 288) Count 91 Tasks 43 Chunks Type float32 numpy.ndarray",288  180  31390,

Unnamed: 0,Array,Chunk
Bytes,6.51 GB,151.37 MB
Shape,"(31390, 180, 288)","(730, 180, 288)"
Count,91 Tasks,43 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,502.24 kB,11.68 kB
Shape,"(31390, 2)","(730, 2)"
Count,91 Tasks,43 Chunks
Type,object,numpy.ndarray
"Array Chunk Bytes 502.24 kB 11.68 kB Shape (31390, 2) (730, 2) Count 91 Tasks 43 Chunks Type object numpy.ndarray",2  31390,

Unnamed: 0,Array,Chunk
Bytes,502.24 kB,11.68 kB
Shape,"(31390, 2)","(730, 2)"
Count,91 Tasks,43 Chunks
Type,object,numpy.ndarray


### How big do you make your chunks?

The rule of thumb for dask chunks is that you should "create arrays with a minimum chunksize of at least one million elements":  http://xarray.pydata.org/en/stable/dask.html#chunking-and-performance

netCDF4 chunks are often a lot smaller than dask array chunks. The minimum chunksize exists because if you have too many chunks, then queuing of operations when parallising will be slow, and if they are too big computation and memory can be wasted. The default chunks from dask gave us chunks of size: (7300, 180, 288) or nearly 400 million elements so we could try reducing those chunks if needed. The larger the array, the larger the cost of queueing and the larger chunks may be needed.

#### IMPORTANT: Whatever dask array chunks you use, make sure they align with the netCDF4 file chunks!!

So far our chunks have been in time, and the netCDF4 file is also chunked in time. If we tried to use dask chunks to optimise the time-series loading of data, it will not help! 

#### Exercise

Try it, load the data with chunks size `(31390,180,1)` (i.e. chunked in lon) and name that file `f_bad_chunk`. Try re-loading the time series of pr at `lat=0` and `lon=180` and time how long it takes.

In [10]:
f_bad_chunk = xr.open_mfdataset(path,chunks={'time':31390,'lat':180,'lon':1}, combine='by_coords')

In [11]:
%%time
f_bad_chunk.pr.sel(lat=0,lon=180,method='nearest').load()

CPU times: user 29.4 ms, sys: 481 µs, total: 29.8 ms
Wall time: 6.35 s


What if we add some extra cores to the computation?

If you already use dask.distributed.Client and dask array calculations will be run in parallel automatically.

Check whether you have used a dask distributed client.

In [21]:
client

0,1
Client  Scheduler: tcp://10.6.75.59:8710  Dashboard: http://10.6.75.59:8752/status,Cluster  Workers: 24  Cores: 24  Memory: 103.08 GB


#### Exercise

Try running your previous code for `f_bad_chunk` again loading the time series of pr at `lat=0` and `lon=180` and time how long it takes if you scale up or down of the number of workers. 

Do the same with the original chunking method of `f_ssp585` and see if there is a difference.

In [None]:
%%time
f_bad_chunk.pr.sel(lat=0,lon=180,method='nearest').load()

In [None]:
%%time
f_ssp585.pr.sel(lat=0,lon=180,method='nearest').load()

### Close the client

Before moving on to the next exercise, make sure to close your client or stop this kernel.

In [22]:
client.close()

### Poor chunking with dask can make your performance worse!

Both the size of the chunks and the alignment of the chunks with the filesystem chunks are imporant to keep in mind when creating dask chunks. 

### Summary

This example shows how to make data chunking with dask. 

For further information regarding Dask please see: https://docs.dask.org/en/latest/