<a href="https://colab.research.google.com/github/ContiPaolo/ModellingFromMeasurements/blob/main/Tutorial_Dynamics_Identification_(trace).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Dynamics identification from data

Perform all necessary imports

In [None]:
########################### INSTALL VINDy PACKAGE  ###########################
from IPython.display import clear_output
!pip install vindy
clear_output()
print("VENI-VINDy-VICI ready to go :)")

import tensorflow as tf
from tensorflow.keras.layers import Dense, Input
from tensorflow.keras.models import Model
import keras
import os
import scipy
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split

import pysindy as ps
import matplotlib.cm as cm
from vindy import SindyNetwork
from vindy.libraries import PolynomialLibrary
from vindy.layers import SindyLayer, VindyLayer
from vindy.distributions import Gaussian, Laplace
from vindy.callbacks import (
    SaveCoefficientsCallback,
)
from vindy.utils import add_lognormal_noise

seed = 29 # random seed

sindy_type = "vindy"  # "sindy" or "vindy", if you either want a deterministic or probabilistic model for the dynamics, respectively.
model_name = "roessler"

VENI-VINDy-VICI ready to go :)


## Roessler System
The Roessler system is a set of three ordinary differential equations describing a simple chaotic dynamical system. The equations take the form:

$$
\begin{align}
\dot{\mathbf{x}} = \mathbf{f}(\mathbf{x})
\end{align}
$$

where $\mathbf{x} = [x, y, z]^\top$ and $\mathbf{f}$ is given by:

$$
\begin{align}
&\dot{x} = -y - z,  \\
&\dot{y} = x + a y, \\
&\dot{z} = b + z(x-c),
\end{align}
$$

where $a = 0.2, b = 0.2, c = 5.7$.

Our goal is to learn the dynamics $\mathbf{f}$ of the system directly from measurements of the system's states.

In [None]:
def roessler(t, x0, a=0.2, b=0.2, c=5.7):
    #TODO
    return #TODO

In [None]:
# Roessler system parameters
a = #TODO
b = #TODO
c = #TODO

# initial conditions
x0 = -5
y0 = -5
z0 = 0
ic = [x0, y0, z0]
dim = 3
var_names = ["z_1", "z_2", "z_3"]

# time vector
t0, T, nt = 0, 24, 2000
t = np.linspace(t0, T, nt)
dt = t[1] - t[0]

## 1. Approximation of the dynamics with neural networks
One approach is to approximate the *map* $\mathbf{F}$ $-$ which is the discrete counterpart of $\mathbf{f}$ $-$ using a neural network $\mathbf{F}_\texttt{NN}$, such that

$$
\begin{align}
  \mathbf{x}_{t+1} = \mathbf{F}(\mathbf{x}_t) \approx \mathbf{F}_\texttt{NN}(\mathbf{x}_t) =: \hat{\mathbf{x}}_{t+1}
  \end{align}
$$
This map takes the system's current state as input and provides the state at the next time step as output.



### Generate data:
First, we will generate the data for the Roessler system. We will use the `scipy.integrate.odeint` function to solve the ODEs and generate the data.

In [None]:
n_sim = 20

# Generate initial conditions and parameters
np.random.seed(seed)
x0 = np.random.normal(ic, 2, size=(n_sim, 3))

# Generate data
x = []
nn_input = []
nn_output = []

np.random.seed(seed)
for x0_ in x0:
  x_sol = #TODO
  x.append(x_sol)
  nn_input.append(#TODO)
  nn_output.append(#TODO)

# Plot the data
ig,ax = plt.subplots(1,1,subplot_kw={'projection': '3d'})
for n, x_sol in enumerate(x):
  ax.plot(x_sol[:,0], x_sol[:,1], x_sol[:,2],linewidth=1)

  ax.set_xlabel('x', labelpad = 10)
  ax.set_ylabel('y', labelpad = 10)
  ax.set_zlabel('z', labelpad = 10)
  ax.scatter(x_sol[0,0], x_sol[0,1], x_sol[0,2] ,color='r', s=10)

### Train the neural network:

In [None]:
# Set seed for reproducibility
np.random.seed(seed)
#TODO: split in train and test using sklearn.model_selection.train_test_split
x_train, x_test, y_train, y_test = #TODO
n_train = len(x_train)
n_test = len(x_test)

x_train = np.array(x_train)
x_test = np.array(x_test)
y_train = np.array(y_train)
y_test = np.array(y_test)

# Reshape train data to feed the NN
x_train_nn = #TODO
y_train_nn = #TODO

# Build model
inputs = #TODO
x = Dense(32, activation='tanh')(inputs)
x = Dense(32, activation='tanh')(x)
outputs = #TODO

# Compile model
nn_model = Model(inputs, outputs)
opt = tf.keras.optimizers.Adam(learning_rate=1e-3)
nn_model.compile(loss='mse', optimizer=opt)

# Fit
tf.random.set_seed(seed)
batch_size=150

History = nn_model.fit(x_train_nn, y_train_nn, epochs=500, verbose = 1, batch_size=batch_size, callbacks=keras.callbacks.EarlyStopping(monitor='loss', patience=20))

### 3. Test the performance

We approximate the system trajectories $\{\mathbf{x}_{t_1}, \mathbf{x}_{t_2}, \ldots, \mathbf{x}_{T}\}$ as:

$${\mathbf{x}}_{t+1}\approx\hat{\mathbf{x}}_{t+1} = \mathbf{F}_\texttt{NN}(\mathbf{x}_t).$$

In [None]:
ind_test = 0

In [None]:
y_test_pred = #TODO
print('Mean Squared Error:', np.mean(np.square(y_test[ind_test] - y_test_pred)))

In [None]:
#Plot trajectory
ig,ax = plt.subplots(1,1,subplot_kw={'projection': '3d'})
ax.plot(y_test[ind_test][:,0], y_test[ind_test][:,1], y_test[ind_test][:,2], 'k-', linewidth=2, label = 'Exact traj.')
ax.plot(y_test_pred[:,0], y_test_pred[:,1], y_test_pred[:,2], 'r--',  linewidth=2, label = 'Predicted traj.')

ax.set_xlabel('x', labelpad = 10)
ax.set_ylabel('y', labelpad = 10)
ax.set_zlabel('z', labelpad = 10)
ax.scatter(y_test[ind_test][0,0], y_test[ind_test][0,1], y_test[ind_test][0,2] ,color='g', s=30)
plt.legend()

# Plot evolution of each state
plt.figure()
plt.subplot(3,1,1)
plt.plot(t[:-1], y_test[ind_test][:,0], 'k-', linewidth = 2, label = 'x')
plt.plot(t[:-1], y_test_pred[:,0], 'r--',  linewidth=2, label = ' x pred')
plt.legend(loc = 'lower right')

plt.subplot(3,1,2)
plt.plot(t[:-1], y_test[ind_test][:,1], 'k-', linewidth = 2, label = 'y')
plt.plot(t[:-1], y_test_pred[:,1], 'r--',  linewidth=2, label = 'y pred')
plt.legend(loc = 'lower right')

plt.subplot(3,1,3)
plt.plot(t[:-1], y_test[ind_test][:,2], 'k-', linewidth = 2, label = 'z')
plt.plot(t[:-1], y_test_pred[:,2], 'r--',  linewidth=2, label = 'z pred')
plt.legend(loc = 'lower right')
plt.show()

### 3. Test the performance (reality)

In practice, we typically only know $\mathbf{x}_0$ and we do not have access to $\mathbf{x}_t$, at any other $t$. Therefore, trajectories are typically computed recursively as:

$$\hat{\mathbf{x}}_{t+1} = \mathbf{F}_\texttt{NN}(\hat{\mathbf{x}}_t)= \mathbf{F}_\texttt{NN}(\mathbf{F}_\texttt{NN}(\hat{\mathbf{x}}_{t-1})) = \cdots = \mathbf{F}_\texttt{NN}(\mathbf{F}_\texttt{NN}(\cdots(\mathbf{F}_\texttt{NN}(\mathbf{x}_0))).$$


In [None]:
nt_ = nt - 1
y_test_pred_rec = np.zeros((nt_, 3))
y_test_pred_rec[0] = #TODO

for time in range(nt_-1):
  if time%100==0:
    print('time = ', time)
  #TODO: update y_test_pred_rec

print('Mean Squared Error:', np.mean(np.square(y_test[ind_test] - y_test_pred_rec)))

In [None]:
#Plot trajectory
ig,ax = plt.subplots(1,1,subplot_kw={'projection': '3d'})
ax.plot(y_test[ind_test][:,0], y_test[ind_test][:,1], y_test[ind_test][:,2], 'k-', linewidth=2, label = 'Exact traj.')
ax.plot(y_test_pred_rec[:,0], y_test_pred_rec[:,1], y_test_pred_rec[:,2], 'r--',  linewidth=2, label = 'Predicted traj.')

ax.set_xlabel('x', labelpad = 10)
ax.set_ylabel('y', labelpad = 10)
ax.set_zlabel('z', labelpad = 10)
ax.scatter(y_test[ind_test][0,0], y_test[ind_test][0,1], y_test[ind_test][0,2] ,color='g', s=30)
plt.legend()

# Plot evolution of each state
plt.figure()
plt.subplot(3,1,1)
plt.plot(t[:-1], y_test[ind_test][:,0], 'k-', linewidth = 2, label = 'x')
plt.plot(t[:-1], y_test_pred_rec[:,0], 'r--',  linewidth=2, label = ' x pred')
plt.legend(loc = 'lower right')

plt.subplot(3,1,2)
plt.plot(t[:-1], y_test[ind_test][:,1], 'k-', linewidth = 2, label = 'y')
plt.plot(t[:-1], y_test_pred_rec[:,1], 'r--',  linewidth=2, label = 'y pred')
plt.legend(loc = 'lower right')

plt.subplot(3,1,3)
plt.plot(t[:-1], y_test[ind_test][:,2], 'k-', linewidth = 2, label = 'z')
plt.plot(t[:-1], y_test_pred_rec[:,2], 'r--',  linewidth=2, label = 'z pred')
plt.legend(loc = 'lower right')
plt.show()


## 1. Approximation of the dynamics with SINDy
We use SINDy (Sparse Identification of Nonlinear Dynamics) to approximate the function $\mathbf{f}$ as a linear combination of candidate features as follows

$$
\dot{\mathbf{x}} = \mathbf{f} \approx \mathbf{\Theta}(\mathbf{x})\mathbf{\Xi},
$$
where $\mathbf{\Theta}$ is the library of prescribed features which can potentially describe the dynamics of the system and $\mathbf{\Xi}$ are corresponding multiplicative coefficients.

For instance, in a one-dimensional case:
$$ \dot{z} = f(z) \approx  \xi_0 + \xi_1 z + \xi_2 z^2 + \ldots = \mathbf{\Theta}(z)\cdot \mathbf{\xi}$$

In [None]:
X_sindy = #TODO
dX_sindy = #TODO
######################      CREATE SINDy model       ######################
model = ps.SINDy(feature_names  = ['x','y','z'],
                 feature_library= #TODO
                 optimizer= #TODO
)

#TODO: Fit the model

model.print()

In [None]:
ind_test = 0

In [None]:
y_pred_sindy = #TODO
print('Mean Squared Error:', np.mean(np.square(y_test[ind_test] - y_pred_sindy)))

In [None]:
#Plot trajectory
ig,ax = plt.subplots(1,1,subplot_kw={'projection': '3d'})
ax.plot(y_test[ind_test][:,0], y_test[ind_test][:,1], y_test[ind_test][:,2], 'k-', linewidth=2, label = 'Exact traj.')
ax.plot(y_pred_sindy[:,0], y_pred_sindy[:,1], y_pred_sindy[:,2], 'r--',  linewidth=2, label = 'Predicted traj.')

ax.set_xlabel('x', labelpad = 10)
ax.set_ylabel('y', labelpad = 10)
ax.set_zlabel('z', labelpad = 10)
ax.scatter(y_test[ind_test][0,0], y_test[ind_test][0,1], y_test[ind_test][0,2] ,color='g', s=30)
plt.legend()

# Plot evolution of each state
plt.figure()
plt.subplot(3,1,1)
plt.plot(t[:-1], y_test[ind_test][:,0], 'k-', linewidth = 2, label = 'x')
plt.plot(t[:-1], y_pred_sindy[:,0], 'r--',  linewidth=2, label = ' x pred')
plt.legend(loc = 'lower right')

plt.subplot(3,1,2)
plt.plot(t[:-1], y_test[ind_test][:,1], 'k-', linewidth = 2, label = 'y')
plt.plot(t[:-1], y_pred_sindy[:,1], 'r--',  linewidth=2, label = 'y pred')
plt.legend(loc = 'lower right')

plt.subplot(3,1,3)
plt.plot(t[:-1], y_test[ind_test][:,2], 'k-', linewidth = 2, label = 'z')
plt.plot(t[:-1], y_pred_sindy[:,2], 'r--',  linewidth=2, label = 'z pred')
plt.legend(loc = 'lower right')
plt.show()


In [None]:
measurement_noise_factor = 0.1 # measurement noise factor
model_noise_factor = 0.2 # model noise factor

# Random model coefficients: model noise
np.random.seed(seed)
a_samples = #TODO
b_samples = #TODO
c_samples = #TODO

# Generate data
x_train_noise = []
for i in range(n_train):
  x_sol_noise = #TODO
  x_train_noise.append(x_sol_noise)

# add measurement noise
x_train_noise = np.array([add_lognormal_noise(x_, measurement_noise_factor)[0] for x_ in x_train_noise])

# calculate time derivatives
dx_train_noise = np.array([np.gradient(traj, dt, axis=0) for traj in x_train_noise])

In [None]:
# three-dimensional plot of roessler attractor
fig = plt.figure()
ax = fig.add_subplot(projection="3d")
for i, x_ in enumerate(x_train_noise):
    if i == 0:
        ax.plot(x_[:, 0], x_[:, 1], x_[:, 2], c="gray", label="Training data")
    else:
        ax.plot(x_[:, 0], x_[:, 1], x_[:, 2], c="gray")
for i, x_ in enumerate(x_test):
    if i == 0:
        ax.plot(x_[:, 0], x_[:, 1], x_[:, 2], c="red", label="Test data")
    else:
        ax.plot(x_[:, 0], x_[:, 1], x_[:, 2], c="red")
plt.xlabel("$z_1$")
plt.ylabel("$z_2$")
ax.set_zlabel("$z_3$")
plt.legend()
plt.tight_layout()

# # Plot the training data
# Number of trajectories
num_trajectories = len(x_train_noise)

# Colormap
cmap_blue = cm.Blues(np.linspace(0., 1, num_trajectories))
cmap_red  = cm.Reds(np.linspace(0., 1, num_trajectories))
cmap_green  = cm.Greens(np.linspace(0., 1, num_trajectories))

# Plot the training data
fig, axs = plt.subplots(2, 3, figsize=(18, 8))

# Plotting each trajectory for states and velocities
for i in range(num_trajectories):
    # States
    axs[0, 0].plot(t, x_train_noise[i][:, 0], color=cmap_blue[i])
    axs[0, 1].plot(t, x_train_noise[i][:, 1], color=cmap_red[i])
    axs[0, 2].plot(t, x_train_noise[i][:, 2], color=cmap_green[i])

    # Velocities
    axs[1, 0].plot(t, dx_train_noise[i][:, 0], color=cmap_blue[i])
    axs[1, 1].plot(t, dx_train_noise[i][:, 1], color=cmap_red[i])
    axs[1, 2].plot(t, dx_train_noise[i][:, 2], color=cmap_green[i])

# Setting labels and titles for states
axs[0, 0].set_title("$x$", fontsize=20)
axs[0, 1].set_title("$y$", fontsize=20)
axs[0, 2].set_title("$z$", fontsize=20)

# Setting labels
axs[1, 0].set_title("$\dot{x}$", fontsize=20)
axs[1, 1].set_title("$\dot{y}$", fontsize=20)
axs[1, 2].set_title("$\dot{z}$", fontsize=20)

for ax in axs[1, :]:
    ax.set_xlabel("t", fontsize=14)

plt.tight_layout()
plt.show()


In [None]:
X_sindy_noise = [traj for traj in x_train_noise]
dX_sindy_noise = [traj_deriv for traj_deriv in dx_train_noise]

######################      CREATE SINDy model       ######################
model_noise = ps.SINDy(feature_names  = ['x','y','z'],
                 feature_library= ps.PolynomialLibrary(degree = 2, include_bias=True),
                 optimizer=ps.STLSQ(threshold=0.1))

model_noise.fit(x=X_sindy_noise, t=dt, x_dot = dX_sindy_noise, multiple_trajectories=True)

model_noise.print()

In [None]:
ind_test = 0

In [None]:
y_pred_sindy = model_noise.simulate(x_test[ind_test][0], t[:-1])
print('Mean Squared Error:', np.mean(np.square(y_test[ind_test] - y_pred_sindy)))

#Plot trajectory
ig,ax = plt.subplots(1,1,subplot_kw={'projection': '3d'})
ax.plot(y_test[ind_test][:,0], y_test[ind_test][:,1], y_test[ind_test][:,2], 'k-', linewidth=2, label = 'Exact traj.')
ax.plot(y_pred_sindy[:,0], y_pred_sindy[:,1], y_pred_sindy[:,2], 'r--',  linewidth=2, label = 'Predicted traj.')

ax.set_xlabel('x', labelpad = 10)
ax.set_ylabel('y', labelpad = 10)
ax.set_zlabel('z', labelpad = 10)
ax.scatter(y_test[ind_test][0,0], y_test[ind_test][0,1], y_test[ind_test][0,2] ,color='g', s=30)
plt.legend()

# Plot evolution of each state
plt.figure()
plt.subplot(3,1,1)
plt.plot(t[:-1], y_test[ind_test][:,0], 'k-', linewidth = 2, label = 'x')
plt.plot(t[:-1], y_pred_sindy[:,0], 'r--',  linewidth=2, label = ' x pred')
plt.legend(loc = 'lower right')

plt.subplot(3,1,2)
plt.plot(t[:-1], y_test[ind_test][:,1], 'k-', linewidth = 2, label = 'y')
plt.plot(t[:-1], y_pred_sindy[:,1], 'r--',  linewidth=2, label = 'y pred')
plt.legend(loc = 'lower right')

plt.subplot(3,1,3)
plt.plot(t[:-1], y_test[ind_test][:,2], 'k-', linewidth = 2, label = 'z')
plt.plot(t[:-1], y_pred_sindy[:,2], 'r--',  linewidth=2, label = 'z pred')
plt.legend(loc = 'lower right')
plt.show()

## Problem Setup
Let's define some general script parameter.

In [None]:
sindy_type = "vindy"  # "sindy" or "vindy", if you either want a deterministic or probabilistic model for the dynamics, respectively.
model_name = "roessler"
seed = 29 # random seed
random_IC = True # use random initial conditions
random_coeff = True # use random parameters (model noise)
measurement_noise_factor = 0.01 # measurement noise factor
model_noise_factor = 0.1 # model noise factor
n_train = 30 # number of training trajectories
n_test = 4 # number of test trajectories

Let's define directories for saving the results

In [None]:
def generate_directories(model_name, sindy_type, scenario_info, outdir):
    # noise before derivative, model error, seed
    outdir = os.path.join(outdir, f"{model_name}", f"{sindy_type}")
    figdir = os.path.join(outdir, "figures", f"{scenario_info}")
    log_dir = os.path.join(
        outdir,
        '{model_name}', 'log', f'{scenario_info}',
    )
    weights_dir = os.path.join(outdir, "weights", f"{scenario_info}")
    # save figure
    for dir in [outdir, figdir, log_dir, weights_dir]:
        if not os.path.isdir(dir):
            os.makedirs(dir)

    return outdir, figdir, log_dir, weights_dir

scenario_info = f"{sindy_type}_nbd__me_{random_coeff}_{model_noise_factor}_seed_{seed}_noise_{measurement_noise_factor}"
_, _, _, weights_dir = generate_directories(model_name, sindy_type, scenario_info, "results")

 Let's generate data and plot them to see what it looks like

In [None]:
# Generate initial conditions and parameters
if random_IC:
    np.random.seed(seed)
    x0 = np.concatenate(
        [np.random.normal(ic_, scale=2, size=(n_train + n_test, 1)) for ic_ in ic],
        axis=1,
    )
else:
    x0 = np.repeat(np.array([ic])[:, :, np.newaxis], n_train + 1, axis=2)

if random_coeff:
    np.random.seed(seed)
    a_samples = np.random.normal(a, a * model_noise_factor, size=n_train)
    b_samples = np.random.normal(b, b * model_noise_factor, size=n_train)
    c_samples = np.random.normal(c, c * model_noise_factor, size=n_train)
else:
    a_samples = [a]
    b_samples = [b]
    c_samples = [c]

# Generate data
x = np.array(
    [
        scipy.integrate.odeint(lambda x, t: roessler(t, x, a=a_, b=b_, c=c_), x0_, t)
        for i, (a_, b_, c_, x0_) in enumerate(
            zip(a_samples, b_samples, c_samples, x0[:n_train])
        )
    ]
)

# add measurement noise
x = np.array([add_lognormal_noise(x_, measurement_noise_factor)[0] for x_ in x])
x_test = np.array(
    [
        scipy.integrate.odeint(lambda x, t: roessler(t, x, a=a, b=b, c=c), x0_, t)
        for x0_ in x0[n_train:]
    ]
)

# calculate time derivatives
dxdt = [np.array(np.gradient(x_, dt, axis=0)) for x_ in x]
dxdt_test = [np.array(np.gradient(x_, dt, axis=0)) for x_ in x_test]

## Model Generation

Now, we will define the VINDy model and train it on the generated data to learn the Roessler system. We will use the `VariationalSindyLayer` to define the model. The `VariationalSindyLayer` is a Bayesian version of the SINDy model that uses a variational inference approach to learn the model coefficients. We will use the `Laplacian` priors for the coefficients and use a polynomial library of degree 2 to learn the model.

In [None]:
# reshape data to fit the model
x_train = np.concatenate(x, axis=0)
dxdt_train = np.concatenate(dxdt, axis=0)
x_test = np.concatenate(x_test, axis=0)
dxdt_test = np.concatenate(dxdt_test, axis=0)

# model parameters
libraries = [
    PolynomialLibrary(2, include_bias=True),
]
dt = t[1] - t[0]

# create sindy layer
layer_params = dict(
    state_dim=3,
    param_dim=0,
    feature_libraries=libraries,
    second_order=False,
    mask=None,
    kernel_regularizer=tf.keras.regularizers.L1L2(l1=0, l2=0),
)
if sindy_type == "vindy":
    sindy_layer = VindyLayer(
        beta=1e-3,
        priors=Laplace(0.0, 1.0),
        **layer_params,
    )
elif sindy_type == "sindy":
    sindy_layer = SindyLayer(
        **layer_params,
    )
else:
    raise ValueError(f"Unknown SINDy type: {sindy_type}")

# create autoencoder sindy model
model = SindyNetwork(
    sindy_layer=sindy_layer,
    x=x_train,
    l_dz=1e0,
    dt=dt,
    second_order=False,
)
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001), loss="huber")

Let's train the VINDy model on the (noisy) Roessler data.
We aim to minimize the loss:
$$
\min_{\mathbf{W}_\mathbf{\Xi}} \frac{1}{n_s} \sum_{i=1}^{n_s} \lambda_1 ||\dot{\mathbf{x}} - \mathbf{\Theta}(\mathbf{x})\mathbf{\Xi}||_2^2+\lambda_2 \mathcal{KL}(q(\mathbf{\Xi}) || p(\mathbf{\Xi})),
$$

where

*   $\mathbf{x}=[x,y,z]^\top$ is the vector of the system states and $\dot{\mathbf{x}}$ its time derivatives.
*   $n_s$ is the total number of samples.
*   $\mathbf{\Theta}$ is the library of candidate features for describing the system dynamics.
*   $\mathbf{\Xi}\sim q(\Xi)=q(\Xi|\mathbf{x};\mathbf{W}_\mathbf{\Xi})$ is a matrix of random variables with posterior $q$ parametrized by $\mathbf{W}_\mathbf{\Xi}$. Priors on the entries of the matrix are set to be Laplacian distribution, i.e. $W_{ij}\sim \mathcal{L}(0,1)$.
*   $\lambda_1=1.0$, $\lambda_2=10^{-3}$ are the loss weighting coefficients.







In [None]:
weights_path = os.path.join(weights_dir, ".weights.h5")

callbacks = [
    tf.keras.callbacks.ModelCheckpoint(
        filepath=os.path.join(weights_path),
        save_weights_only=True,
        save_best_only=True,
        monitor="loss",
        verbose=0,
    ),
    SaveCoefficientsCallback()
]

trainhist = model.fit(
    x=[x_train, dxdt_train],
    callbacks=callbacks,
    y=None,
    epochs=500,
    batch_size=256,
    verbose=1,
)
# load best weights
model.load_weights(os.path.join(weights_path))
# apply pdf threshold
threshold = 0.5
sindy_layer.pdf_thresholding(threshold=threshold)

Let's plot the training history of the VINDy model and check how the coefficients evolved during training.
We plot the coefficients of the VINDy model to see which terms are learned by the model.

In [None]:
# plot training history
plt.figure()
plt.title("Loss over epochs")
plt.semilogy(trainhist.history["loss"])
plt.semilogy(trainhist.history["dz"])
plt.semilogy(trainhist.history["kl_sindy"])
plt.legend(["total loss", "dz", "kl_sindy"])
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.show()

plt.figure()
plt.title("VINDy Coefficients over epochs")
plt.plot(np.array(trainhist.history["coeffs_mean"]).squeeze())
plt.legend()
plt.xlabel("Epoch")
plt.ylabel("Coefficient")
plt.show()

equation = sindy_layer.model_equation_to_str(z=var_names, precision=3)
sindy_layer.visualize_coefficients(x_range=[-1.6, 1.6], z=var_names, mu=None)
plt.suptitle(equation)
plt.tight_layout()

We can now use the trained VINDy model to predict the future states of the Roessler system. We will use the `scipy.integrate.solve_ivp` function to integrate the ODEs using the predicted time derivatives.

In [None]:
# %% integration
nt = t.shape[0]
i_test = 0
# integrate the model
t_0 = i_test * int(nt)
sol = model.integrate(x_test[t_0 : t_0 + 1].squeeze(), t.squeeze(), mu=None)
t_pred = sol.t
x_pred = sol.y

fig, axs = plt.subplots(dim, 1, figsize=(8, 5))
fig.suptitle(f"Integrated Test Trajectory")
t_0 = i_test * int(nt)
for i in range(dim):
    axs[i].plot(
        t, x_test[t_0 : t_0 + nt, i], label=f"${var_names[i]}$", color="black"
    )
    axs[i].plot(
        t_pred,
        x_pred[i],
        label=f"${var_names[i]}$ pred",
        linestyle="--",
        color="orange",
    )
    axs[i].set_xlabel("$t$")
    axs[i].legend(loc = 'upper right')
plt.tight_layout()

## Uncertainty quantification
Instead of only using the mean coefficients for a single prediction, we can also sample various models and use them to predict the future states of the Roessler system. This will give us an idea of the uncertainty in the model predictions.
Forward UQ:
* We sample SINDy coefficients from the predicted posterior distribution
* We integrate the ODE with the sampled coefficients and collect the trajectories

In [None]:
n_traj = 10
# Store the original coefficients
kernel_orig, kernel_scale_orig = sindy_layer.kernel, sindy_layer.kernel_scale

t_preds = []
x_preds = []
t_0 = i_test * int(nt)
print(f"test trajectory {i_test}")
# List to store the solution trajectories in latent space
for traj in range(n_traj):
    print(f"\t sample {traj+1} out of {n_traj}")
    # Sample from the posterior distribution of the coefficients
    sampled_coeff, f_, ff_ = sindy_layer._coeffs
    # only take non-zero coefficients
    sampled_coeff = sampled_coeff.numpy()
    sampled_coeff = sampled_coeff[sampled_coeff != 0]

    # Assign the sampled coefficients to the SINDy layer
    sindy_layer.kernel = tf.reshape(sampled_coeff, (-1, 1))
    sol = model.integrate(x_test[t_0:t_0 + 1].squeeze(),
                          t.squeeze())
    t_preds.append(sol.t)
    x_preds.append(sol.y)
# restore original coefficients
sindy_layer.kernel, sindy_layer.kernel_scale = kernel_orig, kernel_scale_orig
# calculate mean and variance of the trajectories
x_uq = np.array(x_preds)
x_uq_mean_sampled = np.mean(x_uq, axis=0)
x_uq_std = np.std(x_uq, axis=0)

# UQ plot
fig, axs = plt.subplots(dim, 1, figsize=(10, 6), sharex=True)
fig.suptitle(f"Integrated Test Trajectories")
t_0 = i_test * int(nt)
axs[0].set_title(f"Test Trajectory {i_test + 1}")

for i in range(dim):
    axs[i].fill_between(
        t,
        x_uq_mean_sampled[i] - 3 * x_uq_std[i],
        x_uq_mean_sampled[i] + 3 * x_uq_std[i],
        color="grey",
        alpha=0.3,
        label="UQ bounds ($\pm 3$ std)"
    )
    axs[i].plot(t, x_test[t_0: t_0 + nt, i], color="black", label=f"${var_names[i]}$ true")
    axs[i].plot(t, x_pred[i], color="orange", linestyle="--", label=f"${var_names[i]}$ pred mean")
    # Adjust the legend to be outside the plot
    axs[i].legend(loc='upper left', bbox_to_anchor=(1, 1))

plt.tight_layout(rect=[0, 0, 0.8, 1])  # Adjust the layout to make space for the legends
plt.show()