This notebook processes:
- CaMa-Flood outputs to one zarr/netcdf containing daily waterlevels (per esemble member and experiment) and discharge (per ensemble member) at 3979 river mouth locations globally. 

In [None]:
import xarray as xr
from os.path import join
from datetime import timedelta, datetime
import pandas as pd
import numpy as np

In [None]:
# import dask
# # dask.config.set({'distributed.worker.multiprocessing-method': 'fork'})
# from dask.distributed import Client, progress
# client = Client()
# client

In [None]:
# load metadata and coupled indices

ddir = r'/scratch/compound'
fn_attrs = join(ddir, 'coupling', 'rivmth+gtsm_attrs.csv')
fn_coupling = join(ddir, 'coupling', r'cmf_gtsm_75km.csv')
t0, t1 = datetime(1980,1,1), datetime(2014,12,31) # disregard last day. has some weird values

qmin = 0 # minimal longterm mean discharge [m3/s]
attrs = pd.read_csv(fn_attrs, index_col=0)
coupling = pd.read_csv(fn_coupling, index_col=0)
print(len(attrs))
attrs = attrs[attrs['q_mean'] > qmin]
print(len(attrs))
index = attrs.index.values

rivmth_idx = coupling['rivmth_idx'].values # reindex to make sure it alligns
gtsm_idx = coupling['gtsm_idx'].values
coupling.head()

In [None]:
import glob
fns = glob.glob(join(ddir, 'cmf', 'rivmth_*_mswep_*_v362_1980-2014*.nc'))
# check if all complete
for fn in fns:
    if xr.open_dataset(fn).time.to_index()[-1].year != 2015:
        print(fn)
        print(xr.open_dataset(fn).time.data[-1])

In [None]:


# fn_gtsm = join(ddir, 'gtsm', r'global_model_waterlevel_clim_1980_select_egm.nc')
# fn_gtsm = join(ddir, gtsm', r'global_model_surgelevel_1983_select.nc')

# ds_gtsm =  xr.open_dataset(fn_gtsm)
# np.all(ds_gtsm['gtsm_idx'] == coupling.gtsm_idx)

In [None]:
# check station coupling. remove invalid stations
# istation = np.where([ds_gtsm['gtsm_idx'].values != coupling.gtsm_idx.values])[0]
# ds_gtsm.isel(stations=istation).gtsm_idx

In [None]:
fn_gtsm = join(ddir, 'gtsm', r'global_model_waterlevel_1980_select_egm.nc')
ds_gtsm =  xr.open_dataset(fn_gtsm)
coupling_error = coupling[ds_gtsm['gtsm_idx'].values != coupling.gtsm_idx.values]
coupling = coupling[ds_gtsm['gtsm_idx'].values == coupling.gtsm_idx.values]
coupling_error.to_csv(fn_coupling.replace('.csv','_error.csv'))
coupling_error.head()

In [None]:
rm = {
    'station_y_coordinate': 'gtsm_lat', 
    'station_x_coordinate': 'gtsm_lon',
    'station_id': 'gtsm_station_id',
    'stations': 'index',
}
drop = ['station_name']

fn_gtsm = join(ddir, '../gtsm', f'gtsm_*_select_day.nc')
ds_gtsm = xr.open_mfdataset(fn_gtsm, combine='by_coords').rename(rm).drop(drop).sortby('time').sel(
    time=slice(t0,t1), index=coupling.index.values)
# ds = []
# for yr in range(1980, 2015):
#     fn_gtsm = join(ddir, '../gtsm', f'gtsm_{yr}_select_day.nc')
#     ds.append(xr.open_dataset(fn_gtsm, chunks={'time':100, 'stations': 1000}).isel(time=slice(0,-1)))
# ds_gtsm = xr.merge(ds).rename(rm).drop(drop).sortby('time').sel(
#     time=slice(t0,t1), index=coupling.index.values).chunk({'time':100, 'stations': 1000})

# make sure stations allign
assert np.all(ds_gtsm.gtsm_station_id.values.astype('<U20') == coupling.gtsm_station_id.values.astype('<U20'))
wl_seasonal = ds_gtsm['wl_clim_inst']
wl_seasonal.name = 'sea_water_level_climatology'
wl = ds_gtsm['wl_inst']
wl.name  = 'sea_water_level'
ds_sealvl = xr.merge([wl_seasonal, wl]).astype(np.float32)
ds_sealvl['gtsm_idx'] = xr.DataArray(dims=['index'], data=coupling.gtsm_idx.values)
ds_sealvl = ds_sealvl.transpose('index', 'time')
ds_sealvl
ds_sealvl.to_netcdf(join(ddir, 'sealvl_inst.nc'))

## combine CaMa-Flood data

In [None]:

fn_format = join(ddir, 'cmf', r'rivmth_{m}_mswep_{t}_v362_1980-2014.nc')
model = ['anu', 'cnrs', 'ecmwf', 'nerc', 'jrc'] #, 'univu', 'univk']
scenarios = ['msl', 'runoff', 'surge', 'cmpnd']

# combine dstmth sensitivity runs 
# fn_out = join(out_dir, 'global_compound_rivmth_reanalysis_sensitivity.zarr')
# fn_format = join(ddir, r'rivmth_anu_mswep_{t}_v362_1980-2014{m}.nc')
# model = ['_dstmth8000', '', '_dstmth12000']
# dstmth = np.array([8000, 10000, 12000])
# out_chunks = {'time': -1, 'scen':-1, 'dstmth':-1, 'rivmth_id':100}

# dataset coordinates
t0, t1 = datetime(1980,1,1), datetime(2014,12,31) # disregard last day. has some weird values
rm_coords = {'id': 'rivmth_idx'}
# chunks
chunks = {'time': -1, 'id':100}

# # combine cmf outputs
# ds_m = []
# for m in model:
#     ds_t = []
#     for t in scenarios:
#         ds_t.append(xr.open_dataset(fn_format.format(m=m, t=t), chunks=chunks)[['sfcelv', 'outflw']].sortby('time'))
#     ds_t = xr.concat(ds_t, dim='scen')
#     ds_t['scen'] = xr.Variable(['scen'], np.asarray(scenarios).astype(str))
#     ds_m.append(ds_t)
# ds_cmf = xr.concat(ds_m, dim='ensemble').transpose('scen', 'ensemble', 'id', 'time')
# ds_cmf['ensemble'] = xr.Variable(['ensemble'], np.asarray(model).astype(str))
# ds_cmf['time'] = ds_cmf.time.to_index() + timedelta(days=-1)
# ds_cmf = ds_cmf.rename(**rm_coords).sel(rivmth_idx=coupling.rivmth_idx.values).drop(['lat_nc', 'lon_nc'])
# ds_cmf['index'] = xr.DataArray(dims=['rivmth_idx'], data=coupling.index.values, name='index')
# ds_cmf = ds_cmf.swap_dims({'rivmth_idx': 'index'})
# # ds_cmf
# fn_out = join(ddir, 'cmf.zarr')
# chunks = {'ensemble':-1, 'scen':-1, 'time': -1, 'index':100}
# ds_cmf.chunk(chunks).to_zarr(fn_out, mode='w')

In [None]:
ds_cmf = xr.open_zarr(join(ddir, 'cmf.zarr'))
ds_cmf

In [None]:
fn_month_out = join(ddir, 'cmf', 'rivmth_msl.nc')
# rm_clim = {
#     'sfcelv': 'river_water_level_climatology', 
#     'outflw': 'river_discharge_climatology',
# }
# rm = {
#     'sfcelv': 'river_water_level', 
#     'outflw': 'river_discharge',
# }
# ds_msl = ds_cmf.sel(scen='msl').drop(['scen'])
# # climatology
# ds_msl_clim = ds_msl.groupby('time.month').mean('time')
# # broadcast to middle of month
# ds_msl_month = xr.zeros_like(ds_msl).resample(time='SM').nearest()
# ds_msl_month['time'] = ds_msl_month.time.to_index() + timedelta(days=15)
# for v in ds_msl.data_vars.keys():
#     ds_msl_month[v].data = ds_msl_month[v].groupby('time.month') + ds_msl_clim[v]
# ds_msl_month = ds_msl_month.chunk({'time': -1, 'index': 100})
# ds_clim_day = ds_msl_month.interp(time=ds_msl.time, method='linear')
# ds_riv = xr.merge([
#     ds_clim_day.rename(rm_clim),
#     ds_msl.rename(rm),
# ])

# encoding = {v: {'zlib': True} for v in ds_riv.data_vars.keys()}
# ds_riv.to_netcdf(fn_month_out, encoding=encoding)
ds_riv = xr.open_dataset(fn_month_out).astype(np.float32)
ds_riv

In [None]:
scenarios = ['runoff', 'surge', 'cmpnd']
rm = {'sfcelv': 'water_level'}

ds_rivmth = ds_cmf[['sfcelv']].sel(scen=scenarios).rename(rm)
ds_rivmth

In [None]:
scenarios = ['runoff', 'surge', 'cmpnd']

ds_out = xr.merge([
    ds_riv,
    ds_sealvl,
    ds_rivmth,
]).sel(time=slice(t0,t1))

# set and fix coordinates
# select relevant stations and rename 
# cama saves outputs to next day 00:00AM
# esemble metadata
if 'ensemble' in ds_out.coords:
    ds_out['ensemble'].attrs.update(
        long_name = "multi model ensemble",
        description = "based on runoff forcing from Earth2Observe WRR2 hydrological models"
    )
elif 'dstmth' in ds_out.coords:
    ds_out['dstmth'].attrs.update(
        long_name = "downstream_boundary_distance",
        units = 'm'
    )
# scenarios
ds_out['scen'].attrs.update(
    long_name = "scenarios",
    description = "representing different flood drivers"
)

# set cf conventions on variables
ds_out['gtsm_idx'].attrs = dict(
    long_name = "GTSM station index of nearest station",
    units = "-",
    grid_mapping = "crs"
)
ds_out['rivmth_to_station_distance'] = xr.Variable(['index'], coupling['dist'].values)
ds_out['rivmth_to_station_distance'].attrs.update(
    long_name = 'horizontal distance from CaMa-Flood river mouth to the coupled GTSM station',
    units = 'm'
)
ds_out['rivmth_lat'] = xr.Variable(['index'], coupling['rivmth_lat'].values)
ds_out['rivmth_lon'] = xr.Variable(['index'], coupling['rivmth_lon'].values)
for var in ds_out.data_vars.keys():
    if 'discharge' in var:
        if 'climatology' in var:
            long_name = 'instantaneous discharge at GMT 00:00 daily',
        else:
            long_name = 'instantaneous discharge at GMT 00:00 daily',

qattrs = dict(
    standard_name = 'water_volume_transport_into_sea_water_from_rivers',
    units = 'm3 s-1',
    grid_mapping = "crs"
)

ds_out['river_discharge'].attrs.update(
    long_name = 'instantaneous daily discharge at GMT 00:00 daily', 
    **qattrs
)
ds_out['river_discharge_climatology'] = ds_out['river_discharge_climatology'].astype(np.float32)
ds_out['river_discharge_climatology'].attrs.update(
    long_name = 'monthly discharge climatology', 
    **qattrs
)


wlattrs = dict(
    standard_name = 'sea_water_surface_height_above_geoid',
    units = 'm+egm96',
    grid_mapping = "crs"
)
ds_out['water_level'].attrs.update(
    long_name = 'instantaneous surface elevation at river mouth at GMT 00:00 daily',
    **wlattrs
)
ds_out['river_water_level'].attrs.update(
    long_name = 'instantaneous surface elevation at river mouth at GMT 00:00 daily assuming MSL boundary at 0m+egm96',
    **wlattrs
)
ds_out['river_water_level_climatology'] = ds_out['river_water_level_climatology']
ds_out['river_water_level_climatology'].attrs.update(
    long_name = 'monthly river water level climatolgy ',
    **wlattrs
)
ds_out['sea_water_level'] = ds_out['sea_water_level'].astype(np.float32)
ds_out['sea_water_level'].attrs.update(
    long_name = 'daily maximum nearshore still sea surface elevation',
    **wlattrs
)
ds_out['sea_water_level_climatology'] = ds_out['sea_water_level_climatology'].astype(np.float32)
ds_out['sea_water_level_climatology'].attrs.update(
    long_name = 'monthly sea water level climatolgy ',
    **wlattrs
)

# general attributes
ds_out.attrs = dict(
    title = 'global compound flood reanalysis data',
    source = 'CaMa-Flood compound surge - discharge experiments',
    institution = 'Institute for Environmental Studies (IVM) - Vrije Universiteit Amsterdam',
    author = 'Dirk Eilander (dirk.eilander@vu.nl)',
    conventions = "CF-1.7",
    date_created = str(datetime.now().date()),
    history = 'created using xarray v' + xr.__version__,
)

# save
fn_out = join(ddir, 'gcfr.zarr')
chunks = {'ensemble':-1, 'scen':-1, 'time': -1, 'index':100}
ds_out.chunk(chunks).to_zarr(fn_out, mode='w')
# ds_cmf.chunk(out_chunks).to_netcdf(fn_out.replace('.zarr', '.nc'), mode='w')
ds_out

In [None]:
fn_out = join(ddir, 'gcfr.zarr')
ds_out = xr.open_zarr(fn_out)
ds_out