# ResOpsBR: time series
***

**Autor:** Chus Casado<br>
**Date:** 18-07-2025<br>

**Introduction:**<br>
This code creates the time series for the reservoirs in ResOpsBR. The time series include records from ANA and simulations from GloFAS. For each reservoir, these time series are exported both in CSV and a NetCDF format.

Records are cleaned to avoid errors:
    * Outliers in the **storage** time series are filtered by comparison with a moving median (window 7 days). If the relative difference of a given storage value and the moving median exceeds a threshold, the value is removed. This procedure is encapsulated in the function `lisfloodreservoirs.utils.timeseries.clean_storage()`
    * Outliers in the **inflow** time series are removed using two conditions: one based in the gradient, and the other using an estimated inflow based on the water balance. When both conditions are met, the value is removed. Since inflow time series cannot contain missing values when used in the reservoir simulation, a simple linear interpolation is used to fill in gaps up to 7 days. This procedure is encapsulated in the function `lisfloodreservoirs.utils.timeseries.clean_inflow()`.

**To do:**<br>
* [x] Plot time series
* [x] Make sure that there aren't negative values in the time series, nor zeros in storage.
* [ ] Check the quality of the data by closing the mass balance when possible. <font color='steelblue'>I've used the mass balance to identify errors in the inflow time series (function `clean_inflow`).</font>.
* [ ] Fill in the inflow time series with the mass balance, if possible. <font color='steelblue'>I've filled in gaps in the inflow time series with linear interpolation up to 7-day gaps (function `clean_inflow`).</font>.

In [1]:
import numpy as np
import pandas as pd
import xarray as xr
# from datetime import datetime, timedelta
from tqdm.auto import tqdm
# from copy import deepcopy

from lisfloodreservoirs.utils import DatasetConfig
from lisfloodreservoirs import read_attributes
from lisfloodreservoirs.utils.plots import plot_resops, reservoir_analysis, compare_flows
from lisfloodreservoirs.utils.timeseries import clean_storage, clean_inflow, time_encoding, fit_reservoir_curve, storage_from_elevation, elevation_from_storage
from lisfloodreservoirs.utils.timezone import convert_to_utc, reindex_to_00utc

from utils_br import plot_timeseries_BR

## Configuration

In [2]:
cfg = DatasetConfig('config_ResOpsBR_v11.yml')

print(f'Time series will be saved in {cfg.PATH_TS}')

Time series will be saved in Z:\nahaUsers\casadje\datasets\reservoirs\ResOpsBR\v1.1\time_series


## Data

### Attributes


In [3]:
# import all tables of attributes
attributes = read_attributes(cfg.PATH_ATTRS, index_col='GDW_ID')
map_ana_gdw = {sar_id: gdw_id for gdw_id, sar_id in attributes['SAR_ID'].items()}
print(f'{attributes.shape[0]} reservoirs in the attribute tables')

143 reservoirs in the attribute tables


### Time series
#### Observed: ANA

In [45]:
path_plots = cfg.PATH_TS / 'plots'
path_plots.mkdir(parents=True, exist_ok=True)

# read time series
resops_ts = {}
for sar_id, gdw_id in tqdm(map_ana_gdw.items()):

    # load timeseries
    file = cfg.PATH_OBS_TS / f'{sar_id}.csv'
    if file.is_file():
        series = pd.read_csv(file, parse_dates=['date'], index_col='date')
        series['volume_mcm'] = series.volume_pct / 100 * attributes.loc[gdw_id, 'CAP_MCM']
    else:
        print(f"{file} doesn't exist")
        continue

    # trim to GloFAS long run period
    series = series.loc[cfg.START:cfg.END,:]
    if series.empty:
        print(f'Reservoir {gdw_id} has no observations in the time period from {cfg.START} to {cfg.END}')
        continue
    # # ensure there aren't gaps in the dates
    series = series.asfreq('D')
    series.index.name = 'date'

    # rename columns
    rename_cols = {
        'volume_mcm': 'storage',
        'level_m': 'elevation',
        'inflow_cms': 'inflow',
        'outflow_cms': 'outflow',
        'outflow_spillway_cms': 'spillway',
        'outflow_turbine_cms': 'turbine',
        'outflow_natural_cms': 'natural',
    }
    series.rename(columns=rename_cols, inplace=True)
    series = series[rename_cols.values()]

    # remove negative values
    series[series < 0] = np.nan
    # clean storage time series
    series.storage = clean_storage(series.storage, w=7, error_thr=.1)
    # clean inflow time series
    series.inflow = clean_inflow(
        series.inflow,
        storage=series.storage,
        outlfow=series.outflow,
        grad_thr=1e4,
        balance_thr=5,
        int_method='linear'
    )
    
    # # trim time series to period with inflow, storage and outflow
    # mask_availability = series[['inflow', 'storage', 'outflow']].notnull().all(axis=1)
    # if mask_availability.sum() == 0:
    #     continue
    # start, end = series[mask_availability].first_valid_index(), series[mask_availability].last_valid_index()
    start = series[['storage', 'elevation', 'inflow', 'outflow']].first_valid_index()
    end = series[['storage', 'elevation', 'inflow', 'outflow']].last_valid_index()
    start, end = max(cfg.START, start), min(cfg.END, end)
    attributes.loc[gdw_id, ['TIME_SERIES_START', 'TIME_SERIES_END']] = start, end
    series = series.loc[start:end]
    
    try:
        # convert time series to UTC (with offset)
        series = convert_to_utc(
            lon=attributes.loc[gdw_id, 'LON'], 
            lat=attributes.loc[gdw_id, 'LAT'], 
            series=series
        )
        # interpolate values to 00 UTC
        series = reindex_to_00utc(series)
    except Exception as e:
        print(f"Failed to convert to UCT the time series for GDW_ID {gdw_id}: {e}")
        continue
        
    # save in dictionary
    series.index = pd.DatetimeIndex(series.index.date, name='date')
    resops_ts[gdw_id] = series

    # plot observed time series
    plot_resops(
        series.storage,
        series.elevation,
        series.inflow,
        series.outflow,
        attributes.loc[gdw_id, ['CAP_MCM', 'SAR_VO_MAX']],
        title=gdw_id,
        save=path_plots / f'{gdw_id:04}_lineplot.jpg'
        )
    
print(f'{len(resops_ts)} reservoirs in ResOpsBR time series')

  0%|          | 0/143 [00:00<?, ?it/s]

Z:\nahaUsers\casadje\datasets\reservoirs\ResOpsBR\raw\time_series\SIN\19164.csv doesn't exist
142 reservoirs in ResOpsBR time series


##### **Plot timeseries**

In [6]:
# PATH_PLOTS = cfg.PATH_TS / 'plots'
# PATH_PLOTS.mkdir(exist_ok=True)

# for gdw_id, ts in tqdm(resops_ts.items()):
#     max_storage = {
#         'GDW': attributes.loc[gdw_id, 'CAP_MCM'],
#         # 'BR': 
#     }
#     max_elevation = {
#         'GDW': attributes.loc[gdw_id, 'ELEV_MASL'],
#         # 'BR': 
#     }
#     title = '{0} - {1}'.format(gdw_id, attributes.loc[gdw_id, 'DAM_NAME'])
#     plot_timeseries_BR(
#         ts.storage,
#         ts.elevation,
#         ts.outflow,
#         ts.inflow,
#         max_storage,
#         max_elevation,
#         # zlim=(attributes.loc[gdw_id, 'NAME_MASL'] - attributes.loc[gdw_id, 'DAM_HGT_M'] * 1.2, None),
#         title=title,
#         save=PATH_PLOTS / f'{gdw_id}.jpg'
#     )

  0%|          | 0/142 [00:00<?, ?it/s]

In [6]:
# gdw_id = 1363 #1349 #1347 #1333
# ts = timeseries[gdw_id]

# plot_resops(ts.storage, ts.elevation, outflow=ts.outflow,
#             capacity=attributes.loc[gdw_id, ['NAME_MCM', 'NAMO_MCM']],
#             level=attributes.loc[gdw_id, ['NAME_MASL', 'NAMO_MASL']])

# plot_resops(ts.storage, ts.area, outflow=ts.outflow,
#             capacity=attributes.loc[gdw_id, ['NAME_MCM', 'NAMO_MCM']],
#             # level=attributes.loc[gdw_id, ['NAME_MASL', 'NAMO_MASL']]
#            )

In [46]:
# convert to xarray.Dataset
xarray_list = []
for key, df in resops_ts.items():
    ds = xr.Dataset.from_dataframe(df)
    ds = ds.assign_coords(GDW_ID=key)
    xarray_list.append(ds)
obs = xr.concat(xarray_list, dim='GDW_ID')

#### Simulated: GloFAS

In [64]:
# # import GloFAS simulation
# sim = xr.open_dataset(cfg.PATH_SIM_TS / 'dis.nc')
# sim = sim.rename({'time': 'date', 'id': 'GDW_ID', 'dis': 'inflow'})

# # bias correct
# for gdw_id in sim.GDW_ID.data:
    
#     if gdw_id not in timeseries:
#         continue
        
#     inflow = sim['inflow'].sel(GDW_ID=gdw_id).to_pandas()
#     inflow.name = 'inflow'
#     ts = timeseries[gdw_id]
    
#     # compute net inflow
#     if ('outflow' in ts.columns) & ('storage' in ts.columns):
#         ΔS = ts.storage.diff().values
#         net_inflow = ΔS * 1e6 / (24 * 3600) + ts.outflow
#         net_inflow[net_inflow < 0] = 0
#         net_inflow.name = 'net_inflow'

#     # bias correct simulated inflow
#     inflow_bc = quantile_mapping(obs=net_inflow,
#                                  sim=inflow)
#     inflow_bc.name = 'inflow_bc'
    
#     # # plot raw vs bias-corrected inflow
#     # compare_flows(ts.storage, ts.outflow, inflow, inflow_bc)
    
#     # overwrite bias-corrected inflow
#     sim['inflow'].loc[{'GDW_ID': gdw_id}] = inflow_bc.values

#### Meteorology: areal

Time series of catchment-average meteorology generated with the LISFLOOD utility `catchstats`.

In [33]:
# load meteorological time series
path_meteo_areal = cfg.PATH_RESOPS / 'ancillary' / 'catchstats' / 'meteo'
rename_vars = {
    'id': 'GDW_ID',
    'time': 'date',
    'e0': 'evapo_areal',
    'tp': 'precip_areal',
    'ta': 'temp_areal',
}
variables = [x.stem for x in path_meteo_areal.iterdir() if x.is_dir() & (x.stem in rename_vars)]
meteo_areal = xr.Dataset({f'{var}': xr.open_mfdataset(f'{path_meteo_areal}/{var}/*.nc')[f'{var}_mean'] for var in variables})

# rename variables and coordinates
meteo_areal = meteo_areal.rename(rename_vars)

# correct and trim time
meteo_areal['date'] = meteo_areal['date'] - pd.Timedelta(days=1) # WARNING!! One day lag compared with LISFLOOD
meteo_areal = meteo_areal.sel(date=slice(cfg.START, cfg.END))

# keep catchments in the attributes
IDs = list(attributes.index.intersection(meteo_areal.GDW_ID.data))
meteo_areal = meteo_areal.sel(GDW_ID=IDs)

# compute
meteo_areal = meteo_areal.compute()

# # define attributes
# meteo_units = 'evapo_areal: catchment-average potential evaporation from open water from ERA5 [mm/d]\n' \
#     'precip_areal: catchment-average precipitation from ERA5 [mm/d]\n' \
#     'temp_areal: catchment-average air temperature from ERA5 [°C]\n'
# meteo_areal.attrs['Units'] = meteo_units
# meteo_areal.time.attrs['timezone'] = 'UTC+00'
# meteo_areal.GDW_ID.attrs['Description'] = 'The identifier of the reservor in GRanD (Global Reservoir and Dam database)'

print(f'{len(meteo_areal.GDW_ID)} reservoirs and {len(meteo_areal)} variables in the areal meteorological time series')

141 reservoirs and 3 variables in the areal meteorological time series


#### Meteorology: point

Time series of reservoir point meteorology extracted with the LISFLOOD utilitiy `ncextract`.

In [34]:
# load meteorological time series
path_meteo_point = cfg.PATH_RESOPS / 'ancillary' / 'ncextract' / 'meteo'
rename_vars = {
    'id': 'GDW_ID',
    'time': 'date',
    'e0': 'evapo_point',
    'tp': 'precip_point',
    'ta': 'temp_point',
}
variables = [x.stem for x in path_meteo_point.iterdir() if x.is_dir() & (x.stem in rename_vars)]
meteo_point = xr.Dataset({f'{var}': xr.open_mfdataset(f'{path_meteo_point}/{var}/*.nc')[var] for var in variables})

# rename variables and coordinates
meteo_point = meteo_point.rename(rename_vars)
meteo_point = meteo_point.drop_vars(['surface', 'lat', 'latitude', 'lon', 'longitude'], errors='ignore')

# correct and trim time
meteo_point['date'] = meteo_point['date'] - pd.Timedelta(days=1) # WARNING!! One day lag compared with LISFLOOD

# keep catchments in the attributes
IDs = list(attributes.index.intersection(meteo_point.GDW_ID.data))
meteo_point = meteo_point.sel(GDW_ID=IDs)

# meteo_point = meteo_point.drop_vars(['lon', 'lat'], errors='ignore')

# compute
meteo_point = meteo_point.compute()

# # define attributes
# meteo_units = 'evapo_point: potential evaporation at the reservoir location from open water from ERA5 [mm/d]\n' \
#     'precip_point: precipitation at the reservoir location from ERA5 [mm/d]\n' \
#     'temp_point: air temperature  at the reservoir location from ERA5 [°C]\n'
# meteo_point.attrs['Units'] = meteo_units
# meteo_point.time.attrs['timezone'] = 'UTC+00'
# meteo_point.GDW_ID.attrs['Description'] = 'The identifier of the reservor in GRanD (Global Reservoir and Dam database)'

print(f'{len(meteo_point.GDW_ID)} reservoirs and {len(meteo_point)} variables in the areal meteorological time series')

141 reservoirs and 3 variables in the areal meteorological time series


## Prepare dataset

### Convert units

In [47]:
if cfg.NORMALIZE:

    # reservoir attributes used to normalize the dataset
    area_sm = xr.DataArray.from_series(attributes.AREA_SKM) * 1e6 # m2
    capacity_cm = xr.DataArray.from_series(attributes.CAP_MCM) * 1e6 # m3
    catchment_sm = xr.DataArray.from_series(attributes.CATCH_SKM) * 1e6 # m2
    
    # Observed timeseries
    # -------------------
    for var, da in obs.items():
        # convert variables in hm3 to fraction of reservoir capacity [-]
        if var in ['storage', 'evaporation']:
            obs[f'{var}_norm'] = obs[var] * 1e6 / capacity_cm
        # convert variables in m3/s to fraction of reservoir capacity [-]
        elif var in ['inflow', 'outflow']:
            obs[f'{var}_norm'] = obs[var] * 24 * 3600 / capacity_cm

    # # Simulated timeseries
    # # -------------------
    # for var, da in sim.items():
    #     # convert variables in hm3 to fraction of reservoir capacity [-]
    #     if var.split('_')[0] in ['storage']:
    #         sim[f'{var}_norm'] = sim[var] * 1e6 / capacity_cm
    #     # convert variables in m3/s to fraction of reservoir capacity [-]
    #     elif var.split('_')[0] in ['inflow', 'outflow']:
    #         sim[f'{var}_norm'] = sim[var] * 24 * 3600 / capacity_cm
            
    # Catchment meteorology
    # ---------------------
    # convert areal evaporation and precipitation from mm to fraction filled
    for var in ['evapo', 'precip']:
        meteo_areal[f'{var}_areal_norm'] = meteo_areal[f'{var}_areal'] * catchment_sm * 1e-3 / capacity_cm

    # Point meteorology
    # ---------------------
    # convert point evaporation and precipitation from mm to fraction filled
    for var in ['evapo', 'precip']:
        meteo_point[f'{var}_point_norm'] = meteo_point[f'{var}_point'] * catchment_sm * 1e-3 / capacity_cm   

### Export

In [48]:
path_csv = cfg.PATH_TS / 'csv'
path_csv.mkdir(parents=True, exist_ok=True)
path_nc = cfg.PATH_TS / 'netcdf'
path_nc.mkdir(parents=True, exist_ok=True)

for gdw_id in tqdm(attributes.index, desc='Exporting time series'):    

    # concatenate time series
    if gdw_id in obs.GDW_ID.data:
        ds = obs.sel(GDW_ID=gdw_id).drop_vars(['GDW_ID'])
    else:
        print(f'Reservoir {gdw_id} does not have observations. Skipping to the next reservoir')
        continue
    # if gdw_id in sim.GDW_ID.data:
    #     ds = xr.merge((ds, sim.sel(GDW_ID=gdw_id).drop_vars(['GDW_ID'])))
    if gdw_id in meteo_areal.GDW_ID.data:
        ds = xr.merge((ds, meteo_areal.sel(GDW_ID=gdw_id).drop_vars(['GDW_ID'])))
    if gdw_id in meteo_point.GDW_ID.data:
        ds = xr.merge((ds, meteo_point.sel(GDW_ID=gdw_id).drop_vars(['GDW_ID'])))
        
    # delete empty variables
    for var in list(ds.data_vars):
        if (ds[var].isnull().all()):
            del ds[var]

    # trim time series to the observed period
    start, end = attributes.loc[gdw_id, ['TIME_SERIES_START', 'TIME_SERIES_END']].values
    ds = ds.sel(date=slice(start, end))

    # create time series of temporal attributes
    ds['year'] = ds.date.dt.year
    ds['month'] = ds.date.dt.month
    ds['month_sin'], ds['month_cos'] = time_encoding(ds['month'], period=12)
    ds['weekofyear'] = ds.date.dt.isocalendar().week
    ds['woy_sin'], ds['woy_cos'] = time_encoding(ds['weekofyear'], period=52)
    ds['dayofyear'] = ds.date.dt.dayofyear
    ds['doy_sin'], ds['doy_cos'] = time_encoding(ds['dayofyear'], period=365)
    ds['dayofweek'] = ds.date.dt.dayofweek
    ds['dow_sin'], ds['dow_cos'] = time_encoding(ds['dayofweek'], period=6)
        
    # export CSV
    # ..........
    ds.to_pandas().to_csv(path_csv / f'{gdw_id}.csv')

    # export NetCDF
    # .............
    ds.to_netcdf(path_nc / f'{gdw_id}.nc')

Exporting time series:   0%|          | 0/143 [00:00<?, ?it/s]

Reservoir 40851 does not have observations. Skipping to the next reservoir
