# Template for autoencoder prototyping

In [None]:
import torch
import numpy as np
from matplotlib.colors import LinearSegmentedColormap
import matplotlib.pyplot as plt
import matplotlib.animation as animation
%matplotlib inline

# Name for the case
name = 'template'

## Hyper-parameters

In [None]:
dt = 1.0e-5
ae_weight = 1e0
sindy_weight = 1.e-6
coef_weight= 1.e-9
lr = 1e-5

## Training data

- Consider we only have one time-series simulation.
- suppose we have grid of size (20x30) points.
- solution is a 2-dimensional vector (on each grid point)
- A simulation of 50 time steps.
- the first index of snapshot array should be timestep.
- we consider dof as solution dimension, and number of particles as grid size.

Can load other data files for your custom autoencoder.

In [None]:
ntrain = 1
sol_dim = 2
grid_dim = [20,30]
nt = 50

# number of cases x number of time step x solution dimension x grid size
snapshots = np.random.rand(ntrain, nt, sol_dim, grid_dim[0], grid_dim[1])
X_train = torch.Tensor(snapshots)

## Setting up a physics wrapper.

You can choose from `lasdi.physics` module, or create a custom physics model. Following snippet is an example with some necessary specifications.

In [None]:
from lasdi.physics import Physics

class CustomPhysicsModel(Physics):
    def __init__(self):
        self.dim = 2
        self.nt = nt
        self.dt = dt
        self.qdim = sol_dim
        self.grid_size = grid_dim
        self.qgrid_size = [sol_dim] + grid_dim

        ''' Set up a grid as you see fit. '''
        self.x_grid = np.zeros([self.dim] + self.grid_size)
        xg = np.linspace(0., 1., grid_dim[1])
        yg = np.linspace(0., 1., grid_dim[0])
        self.x_grid[0], self.x_grid[1] = np.meshgrid(xg, yg)
        return
    
    ''' See lasdi.physics.Physics class for necessary subroutines '''

physics = CustomPhysicsModel()

## Setting up autoencoder

You can choose from `lasdi.latent_space` module, or create a custom autoencoder. The snippet below uses a built-in `Autoencoder` class.

In [None]:
from lasdi.latent_space import Autoencoder

n_train = 1
hidden_units = [3000, 300, 300, 100, 100, 30]
# latent space dimension
n_z = 5
print(physics.grid_size, hidden_units, n_z)

# I think these options are straightforward.
ae_cfg = {'hidden_units': hidden_units, 'latent_dimension': n_z, 'activation': 'softplus'}

ae = Autoencoder(physics, ae_cfg)
best_loss = np.Inf

## Setting up latent dynamics

Similarly, we either choose a latent dynamics model from `lasdi.latent_dynamics` module or create a custom model. Here we use `SINDy` class.

In [None]:
from lasdi.latent_dynamics.sindy import SINDy

sindy_options = {'sindy': {'fd_type': 'sbp12'}} # finite-difference operator for computing time derivative of latent trajectory.
ld = SINDy(ae.n_z, physics.nt, sindy_options)

## Training: (1) Running a custom optimization process

Set up optimizer and loss term

In [None]:
optimizer = torch.optim.Adam(ae.parameters(), lr = lr)
MSE = torch.nn.MSELoss()

Set up device: `'cuda'`, `'mps'` (apple) or `'cpu'`

In [None]:
device = 'cpu'
d_ae = ae.to(device)
d_Xtrain = X_train.to(device)

This is a part of GPLaSDI training procedure, where the training is executed without on-the-fly sampling.

In [None]:
n_iter = 10
save_interval = 1
hist_file = '%s.loss_history.txt' % name

loss_hist = np.zeros([n_iter, 4])
grad_hist = np.zeros([n_iter, 4])
for iter in range(n_iter):
    optimizer.zero_grad()
    d_ae = ae.to(device)
    d_Z = d_ae.encoder(d_Xtrain)
    d_Xpred = d_ae.decoder(d_Z)
    Z = d_Z.cpu()

    loss_ae = MSE(d_Xtrain, d_Xpred)
    coefs, loss_sindy, loss_coef = ld.calibrate(Z, physics.dt, compute_loss=True, numpy=True)

    max_coef = np.abs(np.array(coefs)).max()
    loss = ae_weight * loss_ae + sindy_weight * loss_sindy / n_train + coef_weight * loss_coef / n_train

    loss_hist[iter] = [loss.item(), loss_ae.item(), loss_sindy.item(), loss_coef.item()]

    loss.backward()

    optimizer.step()

    if ((loss.item() < best_loss) and (iter % save_interval == 0)):
        torch.save(d_ae.cpu().state_dict(), './%s_checkpoint.pt' % name)
        best_loss = loss.item()

    # print("Iter: %05d/%d, Loss: %.5e" % (iter + 1, n_iter, loss.item()))
    print("Iter: %05d/%d, Loss: %.5e, Loss AE: %.5e, Loss SI: %.5e, Loss COEF: %.5e, max|c|: %.5e"
            % (iter + 1, n_iter, loss.item(), loss_ae.item(), loss_sindy.item(), loss_coef.item(), max_coef))



Save the history and the trained parameters.

In [None]:
np.savetxt('%s.loss_history.txt' % name, loss_hist)

if (loss.item() < best_loss):
    torch.save(d_ae.cpu().state_dict(), './%s_checkpoint.pt' % name)
    best_loss = loss.item()

Load a checkpoint and re-evaluate the losses.

In [None]:
state_dict = torch.load('%s_checkpoint.pt' % name)
ae.load_state_dict(state_dict)
ae.eval()
d_ae = ae.to(device)
d_Z = d_ae.encoder(d_Xtrain)
d_Xpred = d_ae.decoder(d_Z)
Z = d_Z.cpu()
print(Z.shape)

loss_ae = MSE(d_Xtrain, d_Xpred)
coefs, loss_sindy, loss_coef = ld.calibrate(Z, physics.dt, compute_loss=True, numpy=True)

max_coef = np.abs(np.array(coefs)).max()
loss = loss_ae + sindy_weight * loss_sindy / n_train + coef_weight * loss_coef / n_train
print(loss.item())
print(loss_ae.item())
print((sindy_weight * loss_sindy / n_train).item())
print((coef_weight * loss_coef / n_train).item())

Visualize the loss history

In [None]:
loss_hist = np.loadtxt('%s.loss_history.txt' % name)

plt.figure(1)
plt.loglog(loss_hist[:, 0])
plt.xlabel('iteration')
plt.ylabel('loss')

plt.rcParams.update({'font.size': 15})
plt.figure(2, figsize=(8, 6))
plt.loglog(loss_hist[:, 1:])
plt.xlabel('iteration')
plt.ylabel('loss')
plt.legend(["ae", "sindy", "coef"])

Obtain the predicted solution

In [None]:
tgrid = np.linspace(0, (nt-1)*dt, nt)

d_Z = d_ae.encoder(d_Xtrain)
Z = d_Z.cpu().detach().numpy()

Zpred = np.zeros([n_train, physics.nt, ae.n_z])
for case_idx in range(len(coefs)):
    Zpred[case_idx] = ld.simulate(coefs[case_idx], Z[case_idx, 0], tgrid)

d_Zpred = torch.Tensor(Zpred).to(device)
X_pred = d_ae.decoder(d_Zpred).cpu().detach().numpy()

Plot the latent variable evolution and compare the prediction with the truth

In [None]:
colors = ['r','g','b','c','m']

for case_idx in range(len(coefs)):
    plt.figure(1+case_idx, figsize=(8,6))
    for k, color in enumerate(colors):
        plt.plot(Zpred[case_idx][:, k],'-'+color)
        plt.plot(Z[case_idx][:, k],'--'+color)
    plt.ylabel('$Z$')
    plt.xlabel('$t$ ($\\times\Delta t$)')
    plt.legend(['pred','truth'])

Visualization of full-order model solution.
Feel free to change as you see fit.

In [None]:
%matplotlib inline

case_idx = 0
time_idx = 14
var_idx = 0

truth = snapshots[case_idx, time_idx, var_idx]
pred = X_pred[case_idx, time_idx, var_idx]

plt.figure(1)
plt.contourf(physics.x_grid[0], physics.x_grid[1], truth, 200)

plt.figure(2)
plt.contourf(physics.x_grid[0], physics.x_grid[1], pred, 200)

## Training: (2) Using built-in GPLaSDI trainer

**Work in progress**

In [None]:
from lasdi.gplasdi import BayesianGLaSDI

trainer_options = {'n_samples': 20,         # number of samples when choosing a new parameter point. Not used in this case
                   'lr': lr,                # learning rate
                   'n_iter': 10000,         # number of iteration in training
                   'max_greedy_iter': 0,    # maximum iteration to perform greedy sampling. Not used in this case
                   'n_greedy': -1,          # frequency of greedy sampling. Not used in this case
                   'sindy_weight': sindy_weight, # weight for latent dynamics loss
                   'coef_weight': coef_weight,   # weight for latent dynamics coefficient regularization
                   'device': 'cpu',              # device to perform training (cpu, cuda, mps)
                   'path_checkpoint': 'checkpoints', # directory to save training checkpoint files
                   'path_results': 'results',        # directory to save training results
                   }

trainer = BayesianGLaSDI(physics, ae, ld, trainer_options)
trainer.X_train = X_train

trainer.train()