In [None]:
!pip3 install git+https://github.com/Centre-automatique-et-systemes/learn_observe_KKL.git gwpy &> /dev/null
!pip3 install git+https://github.com/aliutkus/torchinterp1d.git gwpy &> /dev/null


In [1]:
import sys ; sys.path.append('../')
import torch.optim as optim
import torch
import seaborn as sb
import pytorch_lightning as pl
import numpy as np

from sklearn.model_selection import train_test_split
from pytorch_lightning.loggers import TensorBoardLogger
from pytorch_lightning.callbacks import ModelCheckpoint

from learn_KKL.luenberger_observer import LuenbergerObserver
from learn_KKL.system import RevDuffing
from learn_KKL.learner import Learner

sb.set_style('whitegrid')

In [2]:
# Generate the data
system = RevDuffing()
data = system.generate_mesh(np.array([[-1., 1.], [-1., 1.]]), 72000)
data, val_data = train_test_split(data, test_size=0.3, shuffle=True)

# Create the observer (autoencoder design
observer = LuenbergerObserver(dim_x=2, dim_y=1, method="Autoencoder")
observer.set_dynamics(system)

# Train using pytorch-lightning and the learner class
# Options for training
trainer_options={'max_epochs': 2}
optimizer_options = {'weight_decay': 1e-6}
scheduler_options = {'mode': 'min', 'factor': 0.1, 'patience': 3,
                     'threshold': 5e-4, 'verbose': True}
stopper = pl.callbacks.early_stopping.EarlyStopping(
    monitor='val_loss', min_delta=5e-4, patience=3, verbose=False, mode='min')
# Instantiate learner
learner = Learner(observer=observer, system=system, training_data=data,
                  validation_data=val_data, method='Autoencoder',
                  batch_size=10, lr=5e-4, optimizer=optim.Adam,
                  optimizer_options=optimizer_options,
                  scheduler=optim.lr_scheduler.ReduceLROnPlateau,
                  scheduler_options=scheduler_options)
# Define logger and checkpointing
logger = TensorBoardLogger(save_dir=learner.results_folder + '/tb_logs')
checkpoint_callback = ModelCheckpoint(monitor='val_loss')
trainer = pl.Trainer(
    callbacks=[stopper, checkpoint_callback], **trainer_options, logger=logger,
    log_every_n_steps=1, check_val_every_n_epoch=3)


GPU available: False, used: False
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs


Results saved in in /Users/mona/PhD_code/learn_observe_KKL/src/jupyter_notebooks/runs/RevDuffing/Autoencoder/exp_18


In [None]:
# To see logger in tensorboard, copy the following output name_of_folder
print(f'Logs stored in {learner.results_folder}/tb_logs')
# which should be similar to jupyter_notebooks/runs/method/exp_0/tb_logs/
# Then type this in terminal:
# tensorboard --logdir=name_of_folder --port=8080

# Train and save results
trainer.fit(learner)
learner.save_results(limits=np.array([[-1, 1.], [-1., 1.]]), nb_trajs=10,
                     tsim=(0, 60), dt=1e-2,
                     checkpoint_path=checkpoint_callback.best_model_path)


  | Name  | Type               | Params
---------------------------------------------
0 | model | LuenbergerObserver | 26.1 K
---------------------------------------------
26.1 K    Trainable params
0         Non-trainable params
26.1 K    Total params
0.104     Total estimated model params size (MB)


Logs stored in /Users/mona/PhD_code/learn_observe_KKL/src/jupyter_notebooks/runs/RevDuffing/Autoencoder/exp_18/tb_logs


Validation sanity check: 0it [00:00, ?it/s]

  rank_zero_warn(
  rank_zero_warn(
  rank_zero_warn(


Training: -1it [00:00, ?it/s]

In [None]:
# Simulate one test trajectory
import torch 
tsim = (0, 60)
dt = 1e-2
tq, simulation = system.simulate(torch.tensor([[0.5],[0.5]]), tsim, dt)
measurement = observer.h(simulation[:, :observer.dim_x, 0].T).T
y = torch.cat((tq.unsqueeze(1), measurement), dim=1)

# Predict from measurement
estimation = observer.predict(y, tsim, dt)

In [None]:
# Plot ground truth and state estimation
import matplotlib.pyplot as plt
for i in range(simulation.shape[1]):
        plt.plot(tq, simulation[:, i, 0], label=rf'$x_{i+1}$')
        plt.plot(tq, estimation[:, i].detach().numpy(),
                 label=rf'$\hat{{x}}_{i+1}$')
        plt.legend()
        plt.show()

In [None]:

# Extra tests
import os
from smt.sampling_methods import LHS
from learn_KKL.utils import RMSE, StandardScaler
num_samples = 50000
sampling = LHS(xlimits=np.array([[-2.5, 2.5], [-2.5, 2.5]]))
mesh = torch.as_tensor(sampling(num_samples))
_, mesh_hat = learner.model('Autoencoder', mesh)
error = RMSE(mesh, mesh_hat, dim=1)
for i in range(1, mesh.shape[1]):
    name = 'Invertibility_heatmap' + str(i) + '.pdf'
    plt.scatter(mesh[:, i - 1], mesh[:, i], cmap='jet',
                c=np.log(error.detach().numpy()))
    cbar = plt.colorbar()
    cbar.set_label('Log estimation error')
    plt.title('Log of RMSE between ' + r'$x$' + ' and '
              + r'$\hat{'r'x}$')
    plt.xlabel(rf'$x_{i}$')
    plt.ylabel(rf'$x_{i + 1}$')
    plt.legend()
    plt.show()

In [None]:
num_samples = 10000
sampling = LHS(xlimits=np.array([[-1., 1.], [-1., 1.]]))
mesh = torch.as_tensor(sampling(num_samples))
x = mesh[:, :learner.model.dim_x]
z = mesh[:, learner.model.dim_x:]
z_hat, x_hat = learner.model('Autoencoder', mesh)
# loss, loss1, loss2 = learner.model.loss(learner.method, mesh, x_hat, z_hat)
from learn_KKL.utils import RMSE
loss_1 = learner.model.recon_lambda * RMSE(x, x_hat, dim=1)
# Compute gradients of T_u with respect to inputs
dTdh = torch.autograd.functional.jacobian(
    learner.model.encoder, x, create_graph=False, strict=False, vectorize=False)
dTdx = torch.zeros((num_samples, learner.model.dim_z, learner.model.dim_x))
for i in range(dTdh.shape[0]):
    for j in range(dTdh.shape[1]):
        dTdx[i, j, :] = dTdh[i, j, i, :]
# lhs = dTdx * f(x)
lhs = torch.zeros((learner.model.dim_z, num_samples))
for i in range(num_samples):
    lhs[:, i] = torch.matmul(dTdx[i], learner.model.f(x.T).T[i]).T
# rhs = D * z + F * h(x)
D = learner.model.D
F = learner.model.F
h_x = learner.model.h(x.T)
rhs = torch.matmul(D, z_hat.T) + torch.matmul(F, h_x)
# PDE loss MSE(lhs, rhs)
print(lhs.shape, rhs.shape)
loss_2 = RMSE(lhs, rhs, dim=1)

In [None]:
loss_2 = RMSE(lhs, rhs, dim=0)
print(loss_2.shape, loss_1.shape)
print(x.shape, z.shape)
for i in range(1, x.shape[1]):
    name = r'$Loss_1$' + str(i) + '.pdf'
    plt.scatter(x[:, i - 1], x[:, i], cmap='jet',
                c=np.log(loss_1.detach().numpy()))
    cbar = plt.colorbar()
    cbar.set_label('Log loss')
    plt.title('Log of invertibility loss')
    plt.xlabel(rf'$x_{i}$')
    plt.ylabel(rf'$x_{i + 1}$')
    plt.legend()
    plt.show()
for i in range(1, x.shape[1]):
    name = r'$Loss_2$' + str(i) + '.pdf'
    plt.scatter(x[:, i - 1], x[:, i], cmap='jet',
                c=np.log(loss_2.detach().numpy()))
    cbar = plt.colorbar()
    cbar.set_label('Log loss')
    plt.title('Log of PDE loss')
    plt.xlabel(rf'$x_{i}$')
    plt.ylabel(rf'$x_{i + 1}$')
    plt.legend()
    plt.show()