# Jupyter with large datasets

* [Reading Data](#Reading-data)
* [Dask](#Dask)
* [Processing 60 GB](#Processing-60-GB)
* [Saving Data](#Saving-Data)
* [Processing 3 TB](#Processing-3-TB)
* [Parallel Processing](#Parallel-Processing)

## Our objective

The target today is to calculate a daily wind magnitude field over Tasmania from ERA5 data

## Start a notebook

https://github.com/coecms/nci_scripts

* `gadi_jupyter` submits a job to the queue, and costs NCI resources. Try to stick to 4 or fewer cpus
* `vdi_jupyter.py` connects to the VDI desktop, free but can get congested

Both can access storage on /g/data

`gadi_jupyter` prints out some help on how to start up a Dask cluster when the notebook starts, we'll get back to that later.

To start with we need to find the data. There's a whole bunch of different datasets pre-downloaded at NCI, we want to use these to avoid filling up disk space unneccessarily. You can find information on these datasets on the CMS wiki http://climate-cms.wikis.unsw.edu.au (search google for 'climate cms wiki' if you don't remember the link)

## Reading Data

We'll be using the surface u and v wind fields, which the wiki says are under `/g/data/ub4/era5/netcdf/surface`.



In [None]:
ls /g/data/ub4/era5/netcdf/surface/10U/2019

These are netcdf format files, they include both arrays with the field's data as well as metadata about coordinates and where the data is from. There's a number of ways to read netcdf data, my favourite is xarray

Xarray lets you open multiple files at once to create a time series, using the function xarray.open_mfdataset. There are two ways to do this, specified with the 'combine' parameter. It can concatenate files in the order that their names are in, or it can open up the file and look at the coordinate values to sort the files.

For the most part published datasets will be well structured, with their files named using ISO timestamps. This means we can use the quicker 'nested' combining to merge the files together.

In [None]:
import xarray

ds = xarray.open_dataset('/g/data/ub4/era5/netcdf/surface/10U/2019/10U_era5_global_20190101_20190131.nc')
ds

In [None]:
ds = xarray.open_mfdataset('/g/data/ub4/era5/netcdf/surface/10U/2019/10U_era5_global_*.nc',
                           combine='nested', concat_dim='time')
ds

Note that in the second version the time axis covers the whole year.

ERA5 is a big dataset - let's check how big this field is now that we've loaded it

In [None]:
ds.u10.nbytes / 1024 ** 3 # Convert bytes to GB

This is larger than the available memory on VDI - 32 GB, so how is this possible?

Data in a Netcdf file is 'lazily loaded' - it only actually gets read when we read the values, either by printing, plotting or saving them to a new file

In [None]:
ds.u10.sel(latitude=147.3272, longitude=-42.8821, method='nearest').plot()

With xarray you can call `.load()` on a dataset to actually try and load the whole thing - most of the time you don't want to do this

In [None]:
# ds.u10.load()

## Dask

In addition to the files themselves being lazy there is a second layer at work as well, Dask. If we look at the data inside the `u10` variable we can see that it's made up of multiple 'chunks' - 12 in fact, one for each month's files

In [None]:
ds.u10.data

Dask arrays work just like numpy arrays, however rather than storing values directly, they store how to calculate the array's values

In [None]:
import dask.array

a = dask.array.zeros((10,10), chunks=(5,5))
a

Here the array `a` is broken up into four 5x5 chunks, and each chunk is created using the 'zeros' function

In [None]:
a.visualize()

Breaking the data into chunks helps for large datasets, as you only need to work on one chunk at a time, and can caluclate multiple chunks in parallel.

Otherwise you can do pretty much everything you can do with a Numpy array using a Dask array

In [None]:
a = dask.array.random.random((10,10), chunks=(5,5))
b = dask.array.random.random((10,10), chunks=(5,5))

a + b

As you work on a Dask array it will build up a graph of operations required to create the output, but it won't actually run any calculations

In [None]:
(a + b).visualize()

If you only need part of an array Dask will only use that part of the graph

In [None]:
(a + b)[1,6].visualize()

You can convert a Dask array to a Numpy array using the `.compute()` function (or `.load()` on a Xarray DataArray). Only the neccessary parts of the graph will be computed.

In [None]:
(a + b)[:,6].compute()

More complex operations will create a more complex graph, here's a matrix multiply

In [None]:
import numpy
numpy.matmul(a, b).visualize()

With large datasets there's a tradeoff between chunk size (bigger chunks > more memory used) and graph size (smaller chunks > more complex graph is slower to process)

## Processing 60 GB

It's not just Dask that breaks up arrays into chunks - this can be done within a NetCDF file as well

In [None]:
ds.u10.encoding

It's a good idea for Dask chunks to be a multiple of the NetCDF chunk size, but do experiment to see what works best for your use case.

You can specify Dask chunks when opening a NetCDF file using the `chunks` parameter

In [None]:
ds = xarray.open_mfdataset('/g/data/ub4/era5/netcdf/surface/10U/2019/10U_era5_global_*.nc',
                           combine='nested', concat_dim='time', chunks={'longitude':93*2,'latitude':91*2})
ds.u10.data

Just like with plain Dask arrays you can do normal Numpy-style operations on files you open with Xarray, and it will build up a graph of operations

In [None]:
u = ds.u10

ds_v = xarray.open_mfdataset('/g/data/ub4/era5/netcdf/surface/10V/2019/10V_era5_global_*.nc',
                           combine='nested', concat_dim='time', chunks={'longitude':93*2,'latitude':91*2})
v = ds_v.v10

wind = numpy.sqrt(u**2 + v**2)
wind.data

The ERA5 data is hourly, we want daily data for our output

In [None]:
wind.time

To convert the hourly ERA data to daily we can do a resample, getting the mean of each day's data

In [None]:
wind_daily = wind.resample(time='D').mean()
wind_daily

This creates a lot of chunks though - one for each day, so it doesn't work well on large datasets. Try to avoid big jumps in the number of tasks or chunks

In [None]:
wind_daily.data

The 'climtas' library I've been developing has some routines to improve chunking performance for resampling and grouping by day of year. They're less flexible than the standard Xarray functions, but handy to have for large datasets as they keep the original chunking

https://climtas.readthedocs.io/

In [None]:
import climtas.blocked

wind_daily = climtas.blocked.blocked_resample(wind, time=24).mean()
wind_daily.data

## Saving Data

Before saving to file check how big the data is - remember disk quota is a resource shared between all members of a project, so don't fill up space you don't need to.

It's also a good idea to make a quick plot to make sure you have the right area and the values are reasonable

In [None]:
import matplotlib.pyplot as plt
import cartopy.crs as ccrs

ax = plt.axes(projection=ccrs.PlateCarree())
ax.coastlines('10m')

wind_daily.sel(longitude=slice(144,149), latitude=slice(-40, -44)).isel(time=0).plot(ax=ax)

When saving your data try as much as possible to use compression, it can save a lot of space. This can be set up using the `encoding` parameter of `.to_netcdf`.

In [None]:
import dask.diagnostics

wind_daily.name = 'wind'

encoding = {
    'wind': { # Variable name
        'zlib': True, # Turn on compression
        'shuffle': True, # Turn on shuffle filter
        'complevel': 4, # Compression amount (0-9), 4 is a good choice
    }
}

# Show a progress bar - doesn't work with 'distributed' unfortunately
with dask.diagnostics.ProgressBar():
    (wind_daily
     .sel(longitude=slice(144,149), latitude=slice(-40, -44))
     .to_netcdf('/g/data/w35/saw562/analysis_example.nc', encoding=encoding))

You can see the chunking and compression of a file using the `-s` option of `ncdump`

In [None]:
! ncdump -hs /g/data/w35/saw562/analysis_example.nc

## Processing 3 TB

The same process works on the entire ERA5 timeseries. The full timeseries of a 2d file is 1.5 TB though, so it might take a while!

In [None]:
ds_u = xarray.open_mfdataset('/g/data/ub4/era5/netcdf/surface/10U/*/10U_era5_global_*.nc',
                           combine='nested', concat_dim='time', chunks={'longitude':93*2,'latitude':91*2})

ds_v = xarray.open_mfdataset('/g/data/ub4/era5/netcdf/surface/10V/*/10V_era5_global_*.nc',
                           combine='nested', concat_dim='time', chunks={'longitude':93*2,'latitude':91*2})

ds_u.u10.data

In [None]:
wind = numpy.sqrt(ds_u.u10**2 + ds_v.v10**2)

wind_daily = climtas.blocked.blocked_resample(wind, time=24).mean()
wind_daily.name = 'wind'

wind_daily.data

When processing a lot of data climtas' throttled save function can be helpful, it limits how much data Dask will read at once so there's less chance of running out of memory during the processing. It also automatically sets up NetCDF compression and chunking.

Before doing a calculation like this, consider if you need the whole dataset or just a subset, and get in touch with us at cws_help@nci.org.au to see if the output can be stored centrally

In [None]:
import climtas.io

# climtas.io.to_netcdf_throttled(wind_daily, '/g/data/w35/saw562/analysis_example.nc')

## Parallel Processing

To speed this up a bit we can try running in parallel on Gadi.

There is a limit to how parallel you can make this - writing data to the file can't happen in parallel, each process must wait for its turn. Also with a lot of processes Dask will spend time shuffling data between the processes.

Use the code that `gadi_jupyter` prints out to start a parallel Dask cluster with the number of CPUs requested by your job - it can't use more than one node's worth of CPUs (so keep the number of cpus under 48)

It's important to set the `memory_limit` and `local_directory` options, so you don't run out of memory and don't fill up your home directory

```python
import os
import dask.distributed

# Edit as desired
threads_per_worker = 1

try:
    c # Already running
except NameError:
    c = dask.distributed.Client(
        n_workers=int(os.environ['PBS_NCPUS'])//threads_per_worker,
        threads_per_worker=threads_per_worker,
        memory_limit=f'{3.9*threads_per_worker}gb',
        local_directory=os.path.join(os.environ['PBS_JOBFS'],
                                     'dask-worker-space')
    )
c
```

In [None]:
import xarray
import numpy
import climtas

ds_u = xarray.open_mfdataset('/g/data/ub4/era5/netcdf/surface/10U/*/10U_era5_global_*.nc',
                           combine='nested', concat_dim='time', chunks={'longitude':93*2,'latitude':91*2})

ds_v = xarray.open_mfdataset('/g/data/ub4/era5/netcdf/surface/10V/*/10V_era5_global_*.nc',
                           combine='nested', concat_dim='time', chunks={'longitude':93*2,'latitude':91*2})

wind = numpy.sqrt(ds_u.u10**2 + ds_v.v10**2)

wind_daily = climtas.blocked.blocked_resample(wind, time=24).mean()
wind_daily.name = 'wind'

#for year, data in wind_daily.groupby('time.year'):
#    climtas.io.to_netcdf_throttled(data, f'/g/data/w35/saw562/analysis_example_{year}.nc')

## Bonus

Climtas also has an optimised function for generating climatologies that might be handy

In [None]:
climtas.blocked.blocked_groupby(wind_daily.sel(time=slice('1980','2019')), time='monthday').percentile(q=90)

In [None]:
climtas.blocked.blocked_groupby(wind_daily.sel(time=slice('1980','2019')), time='dayofyear').percentile(q=90)