In [None]:
import xarray as xr
import os
import hvplot.xarray
from distributed import Client
import numpy as np
import fsspec
import pandas as pd
from scipy.stats.mstats import plotting_positions
import panel as pn
from holoviews import opts
from dask.distributed import progress
import s3fs
import dask

import warnings
warnings.filterwarnings('ignore')

# Client Dask

In [None]:
client = Client()
client

# Débits stochastiques laminés

In [None]:
bucket = 's3://prsim/laminage/09995'
storage_options = {'endpoint_url': 'https://s3.us-east-1.wasabisys.com'}
fs = s3fs.S3FileSystem(anon=True,
                       client_kwargs= {'endpoint_url': 'https://s3.us-east-1.wasabisys.com'})

ds = xr.open_zarr(fsspec.get_mapper(bucket,
                                    client_kwargs=storage_options,
                                    anon=True),
                     consolidated=True)

Les simulations stochastiques ont été laminées par le logiciel HEC ResSim. Au total, 171 500 séries contenant 362 jours ont été calculés pour 34 réservoirs et 3 variables ont été extraites. Ceci implique environ 6,33 milliards de lignes de données (171 500 x 362 x 3 x 34). Le format zarr permet d'interagir efficacement avec cette quantité importante de données de manière optimale et avec une compression raisonnable pour limiter les besoins en bande passante (relativement à la quantité de données).

In [None]:
ds

In [None]:
reservoir_list = ['ANGLIERS','BARK LAKE','BASKATONG','BRYSON',
                  'CABONGA','CARILLON','CEDAR RAPID','CHAT FALLS',
                  'CHELSEA','CHENAUX','CHUTE BELL','DES JOACHIMS',
                  'DOZOIS','FARMERS','HIGH FALLS','HULL 2','KAMANISKEG',
                  'KIAMIKA','KIPAWA','LAC VICTORIA','LADY EVELYN',
                  'LOWER NOTCH','MISTINIKON','MITCHINAMECUS',
                  'MOUNTAIN CHUTE','OTTO HOLDEN','PAUGAN','PREMIERE CHUTE',
                  'RABBIT LAKE','RAPIDE 15','RAPIDE 2','RAPIDE 7','RAPIDE DES ILES',
                  'TEMISCAMINGUE']
freq_list = [0.9, 0.95, 0.98, 0.99, 0.999, 0.9999]

@dask.delayed
def get_frequencies_index(reservoir_id, variable_type):
    df = ds.sel(variable_type=variable_type, 
                reservoir_id=reservoir_id)\
       .max('date')\
       .value\
       .to_dataframe()\
       .dropna()\
       .sort_values('value')

    empirical_probability = plotting_positions(range(0,df.shape[0]), alpha=0.4, beta=0.4)

    indexes = [np.argmin(np.abs(np.array(empirical_probability)-freq)) for freq in freq_list]
    return pd.DataFrame(list(df.iloc[indexes].index),
                        columns=[reservoir_id],
                        index = [str(x).split('.')[0] 
                                 for x in list(np.round(1/(1 - np.array(freq_list))))]).T.round(2)



In [None]:
variable_type='FLOW-IN'
filename='s3://prsim/laminage/09995-settings/index_flow_in.csv'
override = False

if fs.exists(filename) and override == False:
    with fs.open(filename) as f:
        basins_freq_indexes_flow_in = pd.read_csv(f, index_col=0)
else:
    results=[]
    for bv in reservoir_list:
        results.append(get_frequencies_index(bv, variable_type))
    results = dask.delayed(pd.concat)(results)
    basins_freq_indexes_flow_in = client.persist(results).compute()

    with fs.open(filename, 'w') as f:
        basins_freq_indexes_flow_in.to_csv(f, index=True)

In [None]:
variable_type='ELEV'
filename='s3://prsim/laminage/09995-settings/index_elev.csv'

if fs.exists(filename) and override == False:
    with fs.open(filename) as f:
        basins_freq_indexes_elev = pd.read_csv(f, index_col=0)
else:
    results=[]
    for bv in reservoir_list:
        results.append(get_frequencies_index(bv, variable_type))
    results = dask.delayed(pd.concat)(results)
    basins_freq_indexes_elev = client.persist(results).compute()

    with fs.open(filename, 'w') as f:
        basins_freq_indexes_elev.to_csv(f, index=True)

# Analyse fréquentielle des séries stochastiques

## Tableaux

In [None]:
def frequency_values(ds,
                     reservoir_id,
                     variable_type,
                     basins_freq_indexes):
    """

    """

    da = ds.sel(variable_type=variable_type,
                reservoir_id=reservoir_id)\
           .where(ds.member_id.isin(basins_freq_indexes.loc[reservoir_id]),
                  drop=True).max('date')
    
    return pd.DataFrame(np.array(sorted(da.value.values)),
                        columns=[reservoir_id],
                        index = [str(x).split('.')[0] 
                                 for x in list(np.round(1/(1 - np.array(freq_list))))]).T.round(2)

### a) Débit entrants

In [None]:
%%time

variable_type = 'FLOW-IN'

df = pd.concat([frequency_values(ds,
                 reservoir_id,
                 variable_type,
                 basins_freq_indexes_flow_in) for reservoir_id in reservoir_list])
df

### b) Débit sortants

In [None]:
variable_type = 'FLOW-OUT'

df = pd.concat([frequency_values(ds,
                 reservoir_id,
                 variable_type,
                 basins_freq_indexes_flow_in) for reservoir_id in reservoir_list])
df

### c) Niveaux

In [None]:
variable_type = 'ELEV'

df = pd.concat([frequency_values(ds,
                 reservoir_id,
                 variable_type,
                 basins_freq_indexes_flow_in) for reservoir_id in reservoir_list])
df

## Graphiques

In [None]:
opts.defaults(
    opts.Curve(
        active_tools=['wheel_zoom','pan'],
))


def plot_hydrographs(ds,
                     reservoir_id,
                     freq_value,
                     variable_type_select,
                     basins_freq_indexes):
    """

    """
        
    da = ds.sel(reservoir_id=reservoir_id)\
           .where(ds.member_id.isin(basins_freq_indexes.loc[reservoir_id, str(freq_value)]),
                  drop=True)
    
    da['member_id'] = np.array([freq_value])
    da = da.rename({'member_id':'periode_retour'}).value[:,0,:]
    return (da.sel(variable_type='ELEV')\
              .hvplot(x='date',
                      grid=True,
                      by='variable_type',
                      width=850) +\
            da.where(da.variable_type.isin(['FLOW-IN','FLOW-OUT']), 
                     drop=True)\
              .hvplot(x='date',
                      grid=True,
                      by='variable_type',
                      width=850))\
            .opts(shared_axes=False, 
                  title=reservoir_id).cols(1)

### a) Crues fréquentielles basées sur le débit entrant

In [None]:

x_d = pn.widgets.Select(name='RESERVOIR', options=reservoir_list)
y_d = pn.widgets.Select(name='PERIODE DE RETOUR (basé sur le débit entrant maximum atteint)',
                      options=[10, 20, 50, 100, 1000, 10000], value=10000, width=400)

plot_d = plot_hydrographs(ds, x_d.value, y_d.value, 'FLOW-IN', basins_freq_indexes_flow_in)

layout_d = pn.Column(pn.WidgetBox(x_d),
                     pn.WidgetBox(y_d), 
                     plot_d)

def update(event):
    layout_d[2] = plot_hydrographs(ds, x_d.value, y_d.value, 'FLOW-IN', basins_freq_indexes_flow_in)

x_d.param.watch(update, 'value')
y_d.param.watch(update, 'value')

layout_d

### b) Crues fréquentielles basées sur le niveau maximal atteint

In [None]:
x_n = pn.widgets.Select(name='RESERVOIR', options=reservoir_list)
y_n = pn.widgets.Select(name='PERIODE DE RETOUR (basé sur le niveau maximal atteint)',
                      options=[10, 20, 50, 100, 1000, 10000], value=10000, width=400)

plot_n = plot_hydrographs(ds, x_n.value, y_n.value, 'ELEV', basins_freq_indexes_elev)

layout_n = pn.Column(pn.WidgetBox(x_n),
                     pn.WidgetBox(y_n), 
                     plot_n)

def update_n(event):
    layout_n[2] = plot_hydrographs(ds, x_n.value, y_n.value, 'ELEV', basins_freq_indexes_elev)

x_n.param.watch(update_n, 'value')
y_n.param.watch(update_n, 'value')

layout_n