### Import modules and verify they work? 

In [1]:
# general python
import warnings
warnings.filterwarnings("ignore", category=UserWarning)
import numpy as np
import os
from pathlib import Path
import yaml
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime
import scipy
import xarray as xr
from tqdm import tqdm

In [2]:
# general eWC
import ewatercycle
import ewatercycle.models

Download plugin model

In [3]:
# pip install ewatercycle-HBV==1.3.0

#### set up paths

In [4]:
path = Path.cwd()
forcing_path = path / "Forcing"
observations_path = path / "Observations"
figure_path = path / "Figures"
output_path = path / "Output"
forcing_path

PosixPath('/home/davidhaasnoot/eWaterCycle-WSL-WIP/Forcing')

#### add parameter info

In [5]:
## Array of initial storage terms - we keep these constant for now 
##              Si,  Su, Sf, Ss
s_0 = np.array([0,  100,  0,  5])

## Array of parameters min/max bounds as a reference
##                      Imax,  Ce,  Sumax, beta,  Pmax,  T_lag,   Kf,   Ks
p_min_initial= np.array([0,   0.2,  40,    .5,   .001,   1,     .01,  .0001])
p_max_initial = np.array([8,    1,  800,   4,    .3,     10,    .1,   .01])
p_names = ["$I_{max}$",  "$C_e$",  "$Su_{max}$", "β",  "$P_{max}$",  "$T_{lag}$",   "$K_f$",   "$K_s$"]
S_names = ["Interception storage", "Unsaturated Rootzone Storage", "Fastflow storage", "Groundwater storage"]
param_names = ["Imax","Ce",  "Sumax", "Beta",  "Pmax",  "Tlag",   "Kf",   "Ks"]
stor_names = ["Si", "Su", "Sf", "Ss"]

# set initial as mean of max,min
par_0 = (p_min_initial + p_max_initial)/2

In [6]:
experiment_start_date = "1997-10-01T00:00:00Z"
experiment_end_date = "1999-12-01T00:00:00Z"
HRU_id = 1620500
alpha = 1.26

In [7]:
from ewatercycle_HBV.forcing import HBVForcing

In [8]:
# to do: get alpha correctly

In [9]:
test_forcing = HBVForcing(start_time = experiment_start_date,
                          end_time = experiment_end_date,
                          directory = forcing_path,
                          camels_file = f'0{HRU_id}_lump_cida_forcing_leap.txt',
                          alpha = alpha
                          )

#### import model

In [10]:
from ewatercycle.models import HBV

In [11]:
da_type = "EnFK"

# run EnFK

n = number of elements in state vector: parameters + states \
N = number of particles \
m = number of observations \
D

![Algorithm_6_Subspace_EnKF_update_Data_Assimilation_Fundamentals_DOI_978-3-030-96709-3_8.png](Figures/NB_Figures/Algorithm_6_Subspace_EnKF_update_Data_Assimilation_Fundamentals_DOI_978-3-030-96709-3_8.png)

In [12]:
import random
rng = np.random.default_rng() # Initiate a Random Number Generator

In [13]:
def setup_EnFK(n_particles, p_max_initial, p_min_initial, alpha, s_0):
    # for initial run sample a set of parameters - start with simpel linear random:  min+[0,1] * (max-min) -> easily changes after
    array_random_num = np.array([[np.random.random() for i in range(len(p_max_initial))] for i in range(n_particles)])
    p_initial = p_min_initial + array_random_num * (p_max_initial-p_min_initial)
    s_initial = np.array([s_0  for i in range(n_particles)]) # for now constant
    prior_state_vector_z = np.hstack((p_initial, s_initial)).T # combine to a state vector: shape 12 x n_particles
    
    ensemble = []
    for i in range(n_particles):
        ensemble.append(HBV(forcing=test_forcing))
        
    for index, ensembleMember in enumerate(ensemble):
        config_file, _ = ensembleMember.setup(
                                              parameters=','.join([str(p) for p in p_initial[index]]), 
                                              initial_storage=','.join([str(s) for s in s_initial[index]]),
                                              alpha=alpha
                                             )
        ensembleMember.initialize(config_file)

    
    return ensemble, prior_state_vector_z

# def generate_weights(Q_m_at_ts, obs):
#     "Takes the enseble and observations and returns the posterior"
#     prior = Q_m_at_ts # take last observation
#     innov2 = (obs - prior)
#     like_sigma = 0.05  # In [m]; so 5 mm
#     unnormalised_log_weights = scipy.stats.norm.logpdf(innov2, loc=0, scale=like_sigma)
#     normalised_weights = np.exp(unnormalised_log_weights - scipy.special.logsumexp(unnormalised_log_weights))
#     return normalised_weights

def add_noise(like_sigma):
    return rng.normal(loc=0, scale=like_sigma)   # log normal so can't go to 0 ? 

def calc_NSE(Qo, Qm):
    QoAv  = np.mean(Qo)
    ErrUp = np.sum((Qm - Qo)**2)
    ErrDo = np.sum((Qo - QoAv)**2)
    return 1 - (ErrUp / ErrDo)

In [14]:
n_particles = 100 # 50 seems max for current setup :P

###### 
#### if fails, run in cmd:
[link1](https://stackoverflow.com/questions/65272764/ports-are-not-available-listen-tcp-0-0-0-0-50070-bind-an-attempt-was-made-to)
[link2](https://asheroto.medium.com/docker-error-an-attempt-was-made-to-access-a-socket-in-a-way-forbidden-by-its-access-permissions-15a444ab217b)
```bash
net stop winnat
netsh int ipv4 set dynamic tcp start=49152 num=16384
netsh int ipv6 set dynamic tcp start=49152 num=16384
net start winnat
````

In [15]:
# # # #### run if fails 
# for index, ensembleMember in enumerate(ensemble):
#     ensembleMember.finalize()

In [16]:
# create list of ensemble members
ensemble, prior_state_vector_z  = setup_EnFK(n_particles, p_max_initial, p_min_initial, alpha, s_0)
ref_model = ensemble[0]

APIError: 500 Server Error for http+docker://localhost/v1.44/containers/b6bdee55fb1aca61f4ed4f14b1cec9f16e742e75b5c58a25811edee9bfbdc0ff/start: Internal Server Error ("Ports are not available: exposing port TCP 0.0.0.0:50399 -> 0.0.0.0:0: listen tcp 0.0.0.0:50399: bind: An attempt was made to access a socket in a way forbidden by its access permissions.")

### import observations

In [None]:
ds = xr.open_dataset(forcing_path / ref_model.forcing.pr)

In [None]:
observations = observations_path / f'0{HRU_id}_streamflow_qc.txt'

In [None]:
cubic_ft_to_cubic_m = 0.0283168466 

In [None]:
new_header = ['GAGEID','Year','Month', 'Day', 'Streamflow(cubic feet per second)','QC_flag']
new_header_dict = dict(list(zip(range(len(new_header)),new_header)))

df_Q = pd.read_fwf(observations,delimiter=' ',encoding='utf-8',header=None)
df_Q = df_Q.rename(columns=new_header_dict)
df_Q['Streamflow(cubic feet per second)'] = df_Q['Streamflow(cubic feet per second)'].apply(lambda x: np.nan if x==-999.00 else x)
df_Q['Q (m3/s)'] = df_Q['Streamflow(cubic feet per second)'] * cubic_ft_to_cubic_m
df_Q['Q'] = df_Q['Q (m3/s)'] / ds.attrs['area basin(m^2)'] * 3600 * 24 * 1000 # m3/s -> m/s ->m/d -> mm/d
df_Q.index = df_Q.apply(lambda x: pd.Timestamp(f'{int(x.Year)}-{int(x.Month)}-{int(x.Day)}'),axis=1)
df_Q.index.name = "time"
df_Q.drop(columns=['Year','Month', 'Day','Streamflow(cubic feet per second)'],inplace=True)
df_Q = df_Q.dropna(axis=0)

In [None]:
df_Q['Q'].plot()
# ax.set_xlim((pd.Timestamp('1997-08-01'),pd.Timestamp('1998-06-01')))

In [None]:
n_timesteps = len(ds.time.values) - 1
# n_timesteps  = 300

In [None]:
n_timesteps

In [None]:
# extrsa for EnKF
measurement_vector_D = np.matrix(np.zeros((n_timesteps, n_particles))) # m x N = n_timsteps x n_particles
PI = np.matrix((np.identity(n_particles) - (( np.ones(n_particles) @ np.ones(n_particles).T ) / n_particles) ) / (np.sqrt(n_particles - 1)))
A_cross_A = np.matrix((np.identity(n_particles) - (( np.ones(n_particles) @ np.ones(n_particles).T ) / n_particles) ))

# set up arrays _ same as PF
n_storage_terms = len(stor_names)
n_param_terms = len(param_names)
Q_m_arr = np.zeros((n_particles, n_timesteps))
storage_terms_arr = np.zeros((n_particles, n_timesteps, n_storage_terms))
parameter_terms_arr = np.zeros((n_particles, n_timesteps, n_param_terms))
t_lag_max = 100 # for now? 
index_t_lag = 5 # fith parameter (base 0)
print(f'{index_t_lag}th index is {param_names[index_t_lag]}')
lag_vector_memory_arr = np.zeros((n_particles, n_timesteps,t_lag_max))

In [None]:
print(param_names)
print(stor_names)

In [None]:
start = datetime.now()
time = []
Q_obs_t_lst = []
t_index = 0 
sigma_obs = 1e-5
## running for whole timeseries takes long, just do first couple of months to see inpact
# while ref_model.time < ref_model.start_time + 6* ref_model.time_step:
while ref_model.time < ref_model.end_time:

    time.append(pd.Timestamp(ref_model.time_as_datetime.date()))
    print(f'{t_index}/{n_timesteps}',end="\r")
    # run model forward
    for m_index, ensembleMember in tqdm(enumerate(ensemble),disable=True):
        ensembleMember.update()
        # get model, storage and paramers
        Q_m_arr[m_index, t_index] = ensembleMember.get_value("Q_m")[0] # 1 observation per ensemble member - TODO: would be great to just do this in one go!
        storage_terms_arr[m_index, t_index] = np.array([ensembleMember.get_value(storage) for storage in stor_names]).flatten().copy()
        parameter_terms_arr[m_index, t_index] = np.array([ensembleMember.get_value(parameter) for parameter in param_names]).flatten().copy()


        # get the memory vector
        t_lag = int(parameter_terms_arr[m_index, t_index, index_t_lag])
        lag_vector_memory_arr[m_index, t_index, :t_lag] = np.array([ensembleMember.get_value(f"memory_vector{i}") for i in range(t_lag)]).flatten()
    

    prior_state_vector_Z = np.hstack((parameter_terms_arr[:,t_index], storage_terms_arr[:,t_index])).T
    # if t_index % 1 == 0:
    ### resample
    Q_obs_t_lst.append(pd.Timestamp(ref_model.time_as_datetime.date()))

    # obtain pertubed measurements
    measurement_d  = df_Q.loc[pd.Timestamp(ref_model.time_as_datetime.date()),"Q"]
    measurement_pertubation_matrix_E = np.array([add_noise(sigma_obs) for x in range(n_particles)])
    peturbed_measurements_D = measurement_d * np.ones(n_particles).T + np.sqrt(n_particles - 1) * measurement_pertubation_matrix_E
    measurement_vector_D[t_index, :] = peturbed_measurements_D

    # retrieve predicted values from model 
    predicted_measurements_Ypsilon = Q_m_arr[:, t_index]

    # algorithm:

    # reporject
    E = np.matrix(peturbed_measurements_D) * PI
    Y = np.matrix(predicted_measurements_Ypsilon) * PI
    if prior_state_vector_Z.shape[0] < n_particles - 1:
        Y = Y * A_cross_A
    S = Y
    D_tilde = np.matrix(peturbed_measurements_D - predicted_measurements_Ypsilon)
    
    W = S.T * np.linalg.inv(S * S.T + E * E.T) * D_tilde
    T = np.identity(n_particles) + (W / np.sqrt(n_particles-1))

    post_state_vector_Z = prior_state_vector_Z * T
    
    # resample
    new_parameters = post_state_vector_Z.T[:,:n_param_terms]
    new_storage    = post_state_vector_Z.T[:,n_param_terms:]
    new_lag        = lag_vector_memory_arr[:,t_index].copy() # dont have a way to set new lag function as of yet    
    
    
    # update the parameters & states
    for index, ensembleMember in tqdm(enumerate(ensemble),disable=True):
        # TODO: adjust so that tLag cant go to 0
        new_parameters[index, index_t_lag] = 1 # set t_lag to 1
        [ensembleMember.set_value(parameter, np.array([new_parameters[index, p_index]])) for p_index, parameter in enumerate(param_names)]
        [ensembleMember.set_value(storage, np.array([new_storage[index, s_index]])) for s_index, storage in enumerate(stor_names)]
        new_tlag = new_parameters[index, index_t_lag]
        # new_tlag = 1 #? for now??
        [ensembleMember.set_value(f"memory_vector{mem_index}", np.array([new_lag[index, mem_index]])) for mem_index in range(int(new_tlag))]

    # advance the index
    t_index+=1 

print('Finalising')
# end model - IMPORTANT! when working with dockers
for index, ensembleMember in tqdm(enumerate(ensemble),disable=True):
    ensembleMember.finalize()


end = datetime.now()
run = end - start
print(f'{run.seconds//60}min,{run.seconds%60}sec')

### process the numpy data into easily acessed data types

In [None]:
save, load = False, False
current_time = str(datetime.now())[:-10].replace(":","_")

In [None]:
if not load:
    df_ensemble = pd.DataFrame(data=Q_m_arr[:,:len(time)].T,index=time,columns=[f'particle {n}' for n in range(n_particles)])

In [None]:
plt.plot(df_ensemble);

In [None]:
df_first_n = df_ensemble.iloc[50:80]
ax=df_first_n.plot(legend=False, zorder=-1)
df_Q.loc[df_first_n.index,'Q'].plot(ax=ax,marker="*",color="k")

### process states and parameters into xarrys

In [None]:
##### Save? 
if save:
    df_ensemble.to_feather(output_path /f'df_ensemble_{da_type}_{current_time}.feather')
if load:
    df_ensemble = pd.read_feather(glob.glob(str(output_path/'df_ensemble_*.feather'))[-1]) # read last

In [None]:
# if load:
# TODO: obtain from model 
units= {"Imax":"mm",
        "Ce": "-",
        "Sumax": "mm",
        "Beta": "-",
        "Pmax": "mm",
        "Tlag": "d",
        "Kf": "-",
        "Ks": "-",
        "Si": "mm",
        "Su": "mm",
        "Sf": "mm",
        "Ss": "mm",
        "Ei_dt": "mm/d",
        "Ea_dt": "mm/d",
        "Qs_dt": "mm/d",
        "Qf_dt": "mm/d",
        "Q_tot_dt": "mm/d",
        "Q_m": "mm/d"}

In [None]:
if not load:
    # combine the 3D numpy arr into one dataArray
    storage_terms_ds = xr.DataArray(storage_terms_arr,dims=["EnsembleMember","time","storage"],
                                    coords=[np.arange(n_particles),df_ensemble.index,stor_names],
                                    attrs={"title": f"HBV storage terms data over time for {n_particles} particles ", 
                                           "history": f"Storage term results from ewatercycle_HBV.model",
                                        "description":"Moddeled storage",
                                             "units": "mm"})
    
    parameter_terms_ds = xr.DataArray(parameter_terms_arr,dims=["EnsembleMember","time","parameter"],
                                coords=[np.arange(n_particles),df_ensemble.index,param_names],
                                    attrs={"title": f"HBV storage terms data over time for {n_particles} particles ", 
                                           "history": f"Storage terms results from ewatercycle_HBV.model",})
    
    # even better is to combine into one Data_set_- this is most intuitive way for me - probably more ways
    data_vars = {}
    for i, name in enumerate(stor_names):
        storage_terms_i = xr.DataArray(storage_terms_arr[:,:,i],
                                       name=name,
                                       dims=["EnsembleMember","time"],
                                      coords=[np.arange(n_particles),df_ensemble.index],
                                      attrs={"title": f"HBV storage terms data over time for {n_particles} particles ", 
                                               "history": f"Storage term results from ewatercycle_HBV.model",
                                            "description":"Moddeled storage",
                                                 "units": "mm"})
        data_vars[name] = storage_terms_i
    for i, name in enumerate(param_names):
        storage_terms_i = xr.DataArray(parameter_terms_arr[:,:,i],
                                       name=name,
                                       dims=["EnsembleMember","time"],
                                      coords=[np.arange(n_particles),df_ensemble.index],
                                      attrs={"title": f"HBV storage terms data over time for {n_particles} particles ", 
                                               "history": f"Storage term results from ewatercycle_HBV.model",
                                            "description":"Moddeled storage",
                                                 "units": units[name]})
        data_vars[name] = storage_terms_i

    ds_combined = xr.Dataset(data_vars,
                             attrs={"title": f"HBV storage terms data over time for {n_particles} particles ", 
                                    "history": f"Storage term results from ewatercycle_HBV.model",}
                              )

In [None]:
##### Save? 
if save:
    storage_terms_ds.to_netcdf(output_path / f'storage_terms_ds_{da_type}_{current_time}.nc')
    parameter_terms_ds.to_netcdf(output_path / f'parameter_terms_ds_{da_type}_{current_time}.nc')
    ds_combined.to_netcdf(output_path / f'combined_ds_{da_type}_{current_time}.nc')
    
if load:
    storage_terms_ds = xr.open_dataarray(glob.glob(str(output_path / r'storage_terms_ds_*.nc'))[-1])
    parameter_terms_ds = xr.open_dataarray(glob.glob(str(output_path / 'parameter_terms_ds_*.nc'))[-1])
    ds_combined = xr.open_dataset(glob.glob(str(output_path / 'combined_ds_*.nc'))[-1])

# Plotting

In [None]:
# df_ensemble.plot()
fig, ax = plt.subplots(1,1,figsize=(12,5))
# ax.plot(ds.time.values[:n_days],ds['Q'].values[:n_days],lw=0,marker="*",ms=2.5,zorder=0,label="Observations",color="k")
# ax.plot(df.index, Q_m_in_ref[1:],label="Modelled reference Q");
df_Q.loc[Q_obs_t_lst,"Q"].plot(ax=ax,lw=0,marker="*",ms=2.5,zorder=0,label="Observations",color='k')
ax.legend(bbox_to_anchor=(1,1))
df_ensemble.plot(ax=ax,alpha=0.5,zorder=-1,legend=False)
ax.set_ylabel("Q [mm]")
ax.set_title(f"Run ensemble of {n_particles} particles");
ax.set_xlim((pd.Timestamp('1997-08-01'),pd.Timestamp('1998-12-01')))
# ax.set_ylim((0,10))
if save:
    fig.savefig(figure_path / f"ensemble_run_for_{n_particles}_members_{da_type}_{current_time}.png")

Can calculate the mean of 50 particles as a reference

In [None]:
mean_ensemble = df_ensemble.T.mean()
Q_obs_t_lst_1 = Q_obs_t_lst[:-1]
NSE_mean_ens = calc_NSE(df_Q.loc[Q_obs_t_lst_1,"Q"],mean_ensemble.loc[Q_obs_t_lst_1])

In [None]:
# df_ensemble.plot()
fig, ax = plt.subplots(1,1,figsize=(12,5))
# ax.plot(ds.time.values[:n_days],ds['Q'].values[:n_days],lw=0,marker="*",ms=2.5,zorder=0,label="Observations",color="k")
# ax.plot(df.index, Q_m_in_ref[1:],label="Modelled reference Q");
df_Q.loc[Q_obs_t_lst,"Q"].plot(ax=ax,lw=0,marker="*",ms=2.0,zorder=0,label="Observations",color='k')

ax.plot(mean_ensemble,color="C1",lw=0.5,label=f"mean={NSE_mean_ens:.2f}",zorder=-1)
ax.fill_between(df_ensemble.index,df_ensemble.T.min(),df_ensemble.T.max(),color="C0", alpha=0.35,zorder=-10,label="bounds")
ax.legend(bbox_to_anchor=(1.25,1))
ax.set_ylabel("Q [mm]")
ax.set_title(f"Run ensemble of {n_particles} memebers {da_type} - No lag");
# ax.set_xlim((pd.Timestamp('1997-08-01'),pd.Timestamp('1998-06-01')))
if save:
    fig.savefig(figure_path / f"ensemble_run_for_{n_particles}_members_{da_type}_{current_time}.png",bbox_inches="tight",dpi=400);

In [None]:
# df_ensemble.plot()
fig, ax = plt.subplots(1,1,figsize=(12,5))
# ax.plot(ds.time.values[:n_days],ds['Q'].values[:n_days],lw=0,marker="*",ms=2.5,zorder=0,label="Observations",color="k")
# ax.plot(df.index, Q_m_in_ref[1:],label="Modelled reference Q");
df_Q.loc[Q_obs_t_lst,"Q"].plot(ax=ax,lw=0,marker="*",ms=2.0,zorder=0,label="Observations",color='k')

ax_pr = ax.twinx()
ax_pr.invert_yaxis()
ax_pr.set_ylabel(f"P [mm]")
ax_pr.bar(df_ensemble.index,ds['pr'].values[:n_timesteps],zorder=-15,label="Precipitation",color="grey")
ax_pr.legend(bbox_to_anchor=(1.25,0.8))

ax.plot(mean_ensemble,color="C1",lw=0.5,label=f"mean",zorder=-1)
ax.fill_between(df_ensemble.index,df_ensemble.T.min(),df_ensemble.T.max(),color="C0", alpha=0.35,zorder=-10,label="bounds")
ax.legend(bbox_to_anchor=(1.25,1))
ax.set_ylabel("Q [mm]")
ax.set_title(f"Run ensemble of {n_particles} particles");

if save:
    fig.savefig(figure_path / f"ensemble_run_for_{n_particles}_members_{da_type}_bounds_P_{current_time}.png",bbox_inches="tight",dpi=400);

In [None]:
n=5
fig, axs = plt.subplots(n,1,figsize=(12,n*2),sharex=True)

ax = axs[0]
df_Q.loc[Q_obs_t_lst,"Q"].plot(ax=ax,lw=0,marker="*",ms=2.5,zorder=0,label="Observations",color='k')
ax_pr = ax.twinx()
ax_pr.invert_yaxis()
ax_pr.set_ylabel(f"P [mm]")
ax_pr.bar(df_ensemble.index,ds['pr'].values[:n_timesteps],zorder=-10,label="Precipitation",color="grey")

ax.plot(mean_ensemble,color="C1",lw=0.5,label=f"mean",zorder=-1)
ax.fill_between(df_ensemble.index,df_ensemble.T.min(),df_ensemble.T.max(),color="C0", alpha=0.5,zorder=-10,label="bounds")
ax.legend(bbox_to_anchor=(1.25,1))
ax.set_ylabel("Q [mm]")

ax.set_title(f"Run ensemble of {n_particles} particles");

for i, S_name in enumerate(S_names):
    for j in range(n_particles):
        storage_terms_ds.isel(storage=i,EnsembleMember=j).plot(ax=axs[i+1],color=f"C{i}",alpha=0.5)
        axs[i+1].set_title(S_name)

# remove all unncecearry xlabels
[ax.set_xlabel(None) for ax in axs[:-1]]
[ax.set_ylabel("S [mm]") for ax in axs[1:]]

if save:
    fig.savefig(figure_path / f"ensemble_run_for__{n_particles}_members_{da_type}_storages_{current_time}.png",bbox_inches="tight",dpi=400)

In [None]:
fig, ax = plt.subplots(1,1)
for i in range(n_particles):
    storage_terms_ds.sel(storage="Ss").isel(EnsembleMember=i).plot(ax=ax)

In [None]:
fig, ax = plt.subplots(1,1)

parameter="Tlag"
for i in range(n_particles):
    parameter_terms_ds.sel(parameter=parameter).isel(EnsembleMember=i).plot(ax=ax)
ax.set_title(f'parameter={parameter} for {n_particles} Ensemble Members')

In [None]:
fig, axs = plt.subplots(2,4,figsize=(25,10),sharex=True)
axs = axs.flatten()
for j, parameter in enumerate(param_names):
    ax = axs[j]
    for i in range(n_particles):
        parameter_terms_ds.sel(parameter=parameter).isel(EnsembleMember=i).plot(ax=ax,alpha=0.3)
    ax.set_title(f'parameter={parameter}')# for {n_particles} Ensemble Members')
    ax.set_ylabel(f'[{units[parameter]}]')
if save:
    fig.savefig(figure_path /  f"ensemble_run_for__{n_particles}_members_{da_type}_parameters_{current_time}.png",bbox_inches="tight",dpi=400)