# Daily Aggregations using Xarray
*(author: Grant Buster)*

Requirements:

- Install dask-distributed: `pip install distributed --upgrade`
- Initialize a dask client for parallel processing
- Set dask compute chunks appropriately


In this notebook, we demonstrate how you can use ``xarray`` to quickly and easily compute daily statistics for a year of sup3rCC data. (If it seems strange to you that we're aggregating a dataset intended to improve resolution - don't worry, we think it's a bit strange too).


### Helpful tips

- Performance is really sensitive (perhaps unsurprisingly) to the dask compute chunk size you specify. The h5 chunks on disk are way too small and result in way too many operations, adding too much overhead to performance. You can get much worse performance with xarray+dask vs. a serial read when working on a small set (~1e3) locations if your compute chunks are too small. Here, we find that `(8760, 10000)` is a good compute chunk size.

- Too large of compute chunks will violate the requested Dask memory limit. If you are memory constrained, try setting a memory limit and reduce the compute chunk size.

- Setting up the full aggregate dataset and then doing one `.compute()` call resulted in chaos and memory errors. The `Client` memory limit doesn't seem to do anything

In [None]:
import numpy as np
import pandas as pd
import xarray as xr
from dask.distributed import Client

In [None]:
# Running on HPC node, so can use 100 processes and high memory limit
client = Client(n_workers=100, memory_limit='100GB')
client

In [None]:
%%time

year = 2015
scenario = 'ecearth3cc_ssp245_r1i1p1f1'

fp_base = '/datasets/sup3rcc/conus_{scenario}/v0.2.2_beta/sup3rcc_conus_{scenario}_{group}_{year}.h5'
fp_pr = fp_base.replace('v0.2.2_beta', 'v0.2.2_beta/daily')

kwargs = dict(engine="rex", chunks={'time_index': 8784, 'gid': 100000})
xds_trh = xr.open_mfdataset(fp_base.format(scenario=scenario, group='trh', year=year), **kwargs)
xds_wind = xr.open_mfdataset(fp_base.format(scenario=scenario, group='wind', year=year), **kwargs)
xds_pr = xr.open_mfdataset(fp_pr.format(scenario=scenario, group='pr', year=year), **kwargs)

In [None]:
%%time
da = xds_trh['temperature_2m'].groupby("time.date").max("time").astype(np.float32).compute()
da['date'] = pd.to_datetime(da['date'].values).astype(int)
da = da.rename({'date': 'time'})

ds_out = da.to_dataset()

In [None]:
%%time
da = xds_trh['relativehumidity_2m'].groupby("time.date").min("time").astype(np.float32).compute()
da['date'] = pd.to_datetime(da['date'].values).astype(int)
da = da.rename({'date': 'time'})
ds_out['relativehumidity_2m'] = da

In [None]:
%%time
da = xds_wind['windspeed_10m'].groupby("time.date").mean("time").astype(np.float32).compute()
da['date'] = pd.to_datetime(da['date'].values).astype(int)
da = da.rename({'date': 'time'})
ds_out['windspeed_10m'] = da

In [None]:
%%time
xds_pr['time'] = pd.to_datetime(xds_pr['time'].values - pd.Timedelta('12hr')).astype(int)
ds_out['pr'] = xds_pr['pr'].compute()

In [None]:
%%time
ds_out['temperature_2m'].attrs['aggregation'] = 'Daily maximum'
ds_out['relativehumidity_2m'].attrs['aggregation'] = 'Daily minimum'
ds_out['windspeed_10m'].attrs['aggregation'] = 'Daily average'
ds_out['pr'].attrs['aggregation'] = 'Daily average'

encoding = {"temperature_2m": {"dtype": "int16", "scale_factor": 0.01, '_FillValue': 100},
            "windspeed_10m": {"dtype": "int16", "scale_factor": 0.01, '_FillValue': 120},
            "relativehumidity_2m": {"dtype": "uint16", "scale_factor": 0.01, '_FillValue': 101},
           }

ds_out = ds_out.drop_vars('time_index', errors='ignore')
ds_out.to_netcdf(f'sup3rcc_test_daily_{scenario}_{year}.nc', format='NETCDF4', engine="h5netcdf", encoding=encoding)

In [None]:
ds_out