#  Large data

> questions:
> - "How do I work with multiple CMIP files that won't fit in memory?"
>
> objectives:
> - "Import the dask library and start a client with parallel workers."
> - "Inspect netCDF chunking."
> - "Calculate and plot an ocean temperature climatology."
>
> keypoints:
> - "Libraries such as dask and xarray can make loading, processing and visualising netCDF data much easier."
> - "Dask can massively speed up processing through parallelism but care may be needed particularly with data chunking."

> **Data used**
>
>If teaching this lesson in a classroom, a copy of the dataset should be on hand on external media such as a USB drive in case of wifi limitations. For remote teaching, please note the data being used is quite large, if network issues arise, the participant should instead use the smaller precipitation data files from the previous chapters.

So far we've been working with small, individual data files that can be comfortably read into memory on a modern laptop. What if we wanted to process a larger dataset that consists of many files and/or much larger file sizes? For instance, let's say we are studying the influence of sea surface temperature (SST) on precipitation. Now that we've plotted a global map showing the ACCESS-CM2 precipitation climatology, our next step is to create a similar global map showing the daily maximum SST over the 1850-2014 period.

Rather than download all the ACCESS-CM2 daily SST files to our laptop, for the purposes of this lesson we are going to make use of the fact that the National Computational Infrastructure (NCI) in Canberra has made their archive of CMIP6 data remotely available via a "THREDDS" (or TDS) server. A THREDDS server provides access to OPeNDAP, which is a protocol to remotely access netCDF data over a network as though it were a local file on your computer.

> TODO: On the instructor notes page maybe we should encourage instructors who aren't in Australia to use an alternative ESGF node (if the others nodes have THREDDS servers too)?

In Python, we use the `siphon` library to query THREDDS catalogues to find available files.  

In [2]:
from siphon.catalog import TDSCatalog

In [3]:
cat = TDSCatalog("http://dapds00.nci.org.au/thredds/catalog/fs38/publications/CMIP6/CMIP/CSIRO-ARCCSS/ACCESS-CM2/historical/r1i1p1f1/Oday/tos/gn/latest/catalog.xml")

In [4]:
print("\n".join(cat.datasets.keys()))

tos_Oday_ACCESS-CM2_historical_r1i1p1f1_gn_18500101-18591231.nc
tos_Oday_ACCESS-CM2_historical_r1i1p1f1_gn_18600101-18691231.nc
tos_Oday_ACCESS-CM2_historical_r1i1p1f1_gn_18700101-18791231.nc
tos_Oday_ACCESS-CM2_historical_r1i1p1f1_gn_18800101-18891231.nc
tos_Oday_ACCESS-CM2_historical_r1i1p1f1_gn_18900101-18991231.nc
tos_Oday_ACCESS-CM2_historical_r1i1p1f1_gn_19000101-19091231.nc
tos_Oday_ACCESS-CM2_historical_r1i1p1f1_gn_19100101-19191231.nc
tos_Oday_ACCESS-CM2_historical_r1i1p1f1_gn_19200101-19291231.nc
tos_Oday_ACCESS-CM2_historical_r1i1p1f1_gn_19300101-19391231.nc
tos_Oday_ACCESS-CM2_historical_r1i1p1f1_gn_19400101-19491231.nc
tos_Oday_ACCESS-CM2_historical_r1i1p1f1_gn_19500101-19591231.nc
tos_Oday_ACCESS-CM2_historical_r1i1p1f1_gn_19600101-19691231.nc
tos_Oday_ACCESS-CM2_historical_r1i1p1f1_gn_19700101-19791231.nc
tos_Oday_ACCESS-CM2_historical_r1i1p1f1_gn_19800101-19891231.nc
tos_Oday_ACCESS-CM2_historical_r1i1p1f1_gn_19900101-19991231.nc
tos_Oday_ACCESS-CM2_historical_r1i1p1f1_

We can see that daily time scale SST data is spread across 17 data files. To access those files, we just need to append the appropriate URL to the file names:

In [8]:
file_list=list(cat.datasets.keys())
DAProot='https://esgf.nci.org.au/thredds/dodsC/master/CMIP6/CMIP/CSIRO-ARCCSS/ACCESS-CM2/historical/r1i1p1f1/Oday/tos/gn/v20191108/'
accesscm2_tos_file_list = [ DAProot+f for f in file_list ]
print(accesscm2_tos_file_list)

['https://esgf.nci.org.au/thredds/dodsC/master/CMIP6/CMIP/CSIRO-ARCCSS/ACCESS-CM2/historical/r1i1p1f1/Oday/tos/gn/v20191108/tos_Oday_ACCESS-CM2_historical_r1i1p1f1_gn_18500101-18591231.nc', 'https://esgf.nci.org.au/thredds/dodsC/master/CMIP6/CMIP/CSIRO-ARCCSS/ACCESS-CM2/historical/r1i1p1f1/Oday/tos/gn/v20191108/tos_Oday_ACCESS-CM2_historical_r1i1p1f1_gn_18600101-18691231.nc', 'https://esgf.nci.org.au/thredds/dodsC/master/CMIP6/CMIP/CSIRO-ARCCSS/ACCESS-CM2/historical/r1i1p1f1/Oday/tos/gn/v20191108/tos_Oday_ACCESS-CM2_historical_r1i1p1f1_gn_18700101-18791231.nc', 'https://esgf.nci.org.au/thredds/dodsC/master/CMIP6/CMIP/CSIRO-ARCCSS/ACCESS-CM2/historical/r1i1p1f1/Oday/tos/gn/v20191108/tos_Oday_ACCESS-CM2_historical_r1i1p1f1_gn_18800101-18891231.nc', 'https://esgf.nci.org.au/thredds/dodsC/master/CMIP6/CMIP/CSIRO-ARCCSS/ACCESS-CM2/historical/r1i1p1f1/Oday/tos/gn/v20191108/tos_Oday_ACCESS-CM2_historical_r1i1p1f1_gn_18900101-18991231.nc', 'https://esgf.nci.org.au/thredds/dodsC/master/CMIP6/CM

Now that we've got our list of files, we can use `xarray` to open a "multifile" dataset as though it were a single file.

Recall that when we first open data in `xarray` it simply ("lazily") loads the metadata associated with the data and shows summary information about the contents of the dataset (i.e. it doesn't read the actual data into memory).
**This may take a little time for a large multifile dataset!**

In [9]:
import xarray as xr

dset = xr.open_mfdataset(accesscm2_tos_file_list, combine='by_coords', chunks={'time':'100MB'})
print(dset)

<xarray.Dataset>
Dimensions:             (bnds: 2, i: 360, j: 300, time: 60265, vertices: 4)
Coordinates:
  * time                (time) datetime64[ns] 1850-01-01T12:00:00 ... 2014-12...
  * j                   (j) int32 0 1 2 3 4 5 6 ... 293 294 295 296 297 298 299
  * i                   (i) int32 0 1 2 3 4 5 6 ... 353 354 355 356 357 358 359
    latitude            (j, i) float64 dask.array<chunksize=(300, 360), meta=np.ndarray>
    longitude           (j, i) float64 dask.array<chunksize=(300, 360), meta=np.ndarray>
Dimensions without coordinates: bnds, vertices
Data variables:
    time_bnds           (time, bnds) datetime64[ns] dask.array<chunksize=(3652, 2), meta=np.ndarray>
    vertices_latitude   (time, j, i, vertices) float64 dask.array<chunksize=(3652, 300, 360, 4), meta=np.ndarray>
    vertices_longitude  (time, j, i, vertices) float64 dask.array<chunksize=(3652, 300, 360, 4), meta=np.ndarray>
    tos                 (time, j, i) float32 dask.array<chunksize=(166, 300, 360), 

We can see that our `dset` object is an `xarray.Dataset`, but notice now that each variable has type `dask.array`, meaning that `xarray` is aware of the netCDF "chunks" (how the data is packed in the files). We'll be able to parallelise our data processing across these chunks if we need/want to. To keep the chunks relatively small (to avoid THREDDS download limits), when calling `open_mfdataset` we set a limit on the size of the chunks with `chunks={'time':'100MB'}`.

In this case, we are interested in the SST (`tos`) variable contained within that `xarray` Dataset:

In [10]:
print(dset['tos'])

<xarray.DataArray 'tos' (time: 60265, j: 300, i: 360)>
dask.array<concatenate, shape=(60265, 300, 360), dtype=float32, chunksize=(231, 300, 360), chunktype=numpy.ndarray>
Coordinates:
  * time       (time) datetime64[ns] 1850-01-01T12:00:00 ... 2014-12-31T12:00:00
  * j          (j) int32 0 1 2 3 4 5 6 7 8 ... 292 293 294 295 296 297 298 299
  * i          (i) int32 0 1 2 3 4 5 6 7 8 ... 352 353 354 355 356 357 358 359
    latitude   (j, i) float64 dask.array<chunksize=(300, 360), meta=np.ndarray>
    longitude  (j, i) float64 dask.array<chunksize=(300, 360), meta=np.ndarray>
Attributes:
    standard_name:  sea_surface_temperature
    long_name:      Sea Surface Temperature
    comment:        Temperature of upper boundary of the liquid ocean, includ...
    units:          degC
    cell_methods:   area: mean where sea time: mean
    cell_measures:  area: areacello
    history:        2019-11-08T18:23:54Z altered by CMOR: replaced missing va...
    _ChunkSizes:    [  1 300 360]


Notice that we now have an attribute `_ChunkSizes` listed. This has shape `[1 300 360]`, while the `dask.array` itself has shape (60265, 300, 360), and chunksize (231, 300, 360). This means that the underlying data is structured to be most efficiently accessed for the whole lat/lon range at each time step, but dask will load up 231 of these "slices" at once, for a combined dataset size of 60265 timesteps.

> **Changing chunks**
>
> If we decide to change the chunking to improve performance, note we
> can control the size of dask chunks used, but they *must* align
> with the netCDF file chunks or we will certainly make performance worse!
{: .callout}

Now that we understand the chunking information contained in the metadata, let's go ahead and calculate the dailiy maximum SST.

In [12]:
tos_max = dset['tos'].max('time', keep_attrs=True)
print(tos_max)

<xarray.DataArray 'tos' (j: 300, i: 360)>
dask.array<nanmax-aggregate, shape=(300, 360), dtype=float32, chunksize=(300, 360), chunktype=numpy.ndarray>
Coordinates:
  * j          (j) int32 0 1 2 3 4 5 6 7 8 ... 292 293 294 295 296 297 298 299
  * i          (i) int32 0 1 2 3 4 5 6 7 8 ... 352 353 354 355 356 357 358 359
    latitude   (j, i) float64 dask.array<chunksize=(300, 360), meta=np.ndarray>
    longitude  (j, i) float64 dask.array<chunksize=(300, 360), meta=np.ndarray>
Attributes:
    standard_name:  sea_surface_temperature
    long_name:      Sea Surface Temperature
    comment:        Temperature of upper boundary of the liquid ocean, includ...
    units:          degC
    cell_methods:   area: mean where sea time: mean
    cell_measures:  area: areacello
    history:        2019-11-08T18:23:54Z altered by CMOR: replaced missing va...
    _ChunkSizes:    [  1 300 360]


It seems like the calculation happened instataneously, but it's actually just another "lazy" feature of `xarray`. It's showing us what the output of the calculation would look like (i.e. a 300 by 360 array), but `xarray` won't actually do the computation until the data is actually needed (e.g. to create a plot or write to a netCDF file). 

To force `xarray` to do the computation we can use `.compute()` with the `%%time` jupyter notebook command to record how long it takes:

In [13]:
%%time
tos_max.compute()

  return func(*args, **kwargs)
  return func(*(_execute_task(a, cache) for a in args))


CPU times: user 2min 6s, sys: 2min 10s, total: 4min 17s
Wall time: 1h 12min 20s


It took a little over an hour to compute the daily maximum SST (this might be different on your laptop). As shown below, the data array (26GB) we are working with is larger than the memory available on our laptop (~17GB), so by processing the data one (231, 300, 36) chunk at a time we avoided causing a memory error. In other words, the calculation wouldn't have been possible without chunking.

In [11]:
dset['tos'].data

Unnamed: 0,Array,Chunk
Bytes,26.03 GB,99.79 MB
Shape,"(60265, 300, 360)","(231, 300, 360)"
Count,659 Tasks,321 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 26.03 GB 99.79 MB Shape (60265, 300, 360) (231, 300, 360) Count 659 Tasks 321 Chunks Type float32 numpy.ndarray",360  300  60265,

Unnamed: 0,Array,Chunk
Bytes,26.03 GB,99.79 MB
Shape,"(60265, 300, 360)","(231, 300, 360)"
Count,659 Tasks,321 Chunks
Type,float32,numpy.ndarray


Part of the reason the calculation took over an hour is that it was all done on one core. We can try and speed things up by using a `dask` "client" to run the calculation in parallel across multiple cores:

In [15]:
from dask.distributed import Client
client = Client()
client

0,1
Client  Scheduler: tcp://127.0.0.1:59152  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 4  Memory: 17.18 GB


(Click on the dashboard link to watch what's happening on each core.)

In [16]:
%%time
tos_max.compute()

CPU times: user 42.9 s, sys: 6.64 s, total: 49.5 s
Wall time: 24min 51s


FIXME: The values are all nan.

By distributing the calculation across all four cores the processing time has dropped to 24 minutes. This is close to but not quite a quarter of the original 1 hour and 12 minutes, because there's a time cost associated with setting up and coordinating jobs across all the cores.

> **Parallel isn't always best**
>
> For smaller or very complex data processing tasks,
> the time associated with setting up and coordinating jobs across multiple cores
> can mean it takes longer to run in parallel than on just one core.

In [20]:
## TODO: Create the plot of daily maximum SST

> **Direction matters**
>
> Calculating the daily maximum at each spatial (j, i) point requires taking information (i.e. the daily SST values) from each of the 321 chunks (because the the chunking was done along the time axis). Calculations like this that go across chunks are much slower than those long chunks. For instance, calculating the global maximum SST at each time step is very fast: 

In [19]:
%%time
spatial_max = dset['tos'].max(['j', 'i'], keep_attrs=True)

CPU times: user 6.54 ms, sys: 901 µs, total: 7.44 ms
Wall time: 8.57 ms


> It's worth being aware of the speed difference between along and across chunk calculations, but unfortunately we can't do much about it. The netCDF files are chunked by time, and changing the dask chunking to be inconsistent with the netCDF chunking (i.e. along a spatial axis) would make things even slower.

TODO: Talk about writing your own dask aware functions?