# Pseudo-Hamiltonian neural networks for PDEs

In this notebook, we will give an example of how to setup and train a neural network on to learn the KdV–Burgers equation using `phlearn`. We will also demonstrate how to add an external force to the system, which can also be learnt by the pseudo-Hamiltonian framework. 

For details, see (Eidnes and Lye, 2023).

In [None]:
# Uncomment for local install: 
%pip install -e ../phlearn 

In [None]:
import numpy as np
import torch
import phlearn.phsystems as phsys
import phlearn.phnns as phnn
import matplotlib.pyplot as plt
from phlearn.utils import create_video
from scipy.sparse import diags, spdiags

ttype = torch.float32
torch.set_default_dtype(ttype)

In [None]:
make_videos = False

### Learning a KdV system

We show first how to learn a system governed by the Korteweg–de Vries (KdV) equation
$$
u_t + \eta u u_x + \gamma^2 u_{xxx} = 0,
$$

with initial condition $u(x,0) = u_0(x)$ and periodic boundary conditions $u(P,t) = u(0,t)$.

#### Set up the system

Below is an example of how to set up a Hamiltonian PDE system using the ConservativeDissipativeSystem() class. The below block sets up the differential equation that will be used to generate the data.

In [None]:
period = 20
spatial_points = 100
x = np.linspace(0, period-period/spatial_points, spatial_points)

def setup_KdV_system(x=x, eta=6., gamma=1., nu=0.):
    
    M = x.size
    dx = x[-1]/(M-1)
    e = np.ones(M)
    Dp = 1/dx*spdiags([e,-e,e], np.array([-M+1,0,1]), M, M).toarray() # Forward difference matrix
    D1 = .5/dx*spdiags([e,-e,e,-e], np.array([-M+1,-1,1,M-1]), M, M).toarray() # Central difference matrix
    D2 = 1/dx**2*spdiags([e,e,-2*e,e,e], np.array([-M+1,-1,0,1,M-1]), M, M).toarray() # 2nd order central difference matrix

    def hamiltonian(u):
        return np.sum(-1/6*eta*u**3 + (.5*gamma**2*(np.matmul(Dp,u.T))**2).T, axis=-1)

    def hamiltonian_grad(u):
        return -.5*eta*u**2 - (gamma**2 * u @ D2)
    
    def initial_condition():
        P = (x[-1]-x[0])*M/(M-1)
        sech = lambda a: 1/np.cosh(a)
        def sampler(rng):
            k1, k2 = rng.uniform(0.5, 2.0, 2)
            d1, d2 = rng.uniform(0., 1., 1), rng.uniform(0., 1., 1)
            u0 = 0
            u0 += (-6./-eta)*2 * k1**2 * sech(np.abs(k1 * ((x+P/2-P*d1) % P - P/2)))**2
            u0 += (-6./-eta)*2 * k2**2 * sech(np.abs(k2 * ((x+P/2-P*d2) % P - P/2)))**2
            u0 = np.concatenate([u0[M:], u0[:M]], axis=-1)
            return u0
        return sampler

    KdV_system = phsys.ConservativeDissipativeSystem(
        nstates=M,
        skewsymmetric_matrix=D1,
        hamiltonian=hamiltonian,
        grad_hamiltonian=hamiltonian_grad,
        init_sampler=initial_condition()
    )

    return KdV_system


KdV_system = setup_KdV_system()

#### Generate training data

Use the `KdV_system` instance to generate training data, which are numerical solutions to the exact PDE.  

In [None]:
def get_training_data(system, data_points=20, dt=.05, tmax=.05, x=x):
    nt = round(tmax / dt)
    t_axis = np.linspace(0, tmax, nt + 1)
    ntrajectories_train = int(np.ceil(data_points / nt))
    traindata = phnn.generate_dataset(system, ntrajectories_train, t_axis, xspatial=x)
    return traindata, t_axis

traindata, t_axis = get_training_data(KdV_system)

#### Set up the pseudo-Hamiltonian neural network
We set the kernel sizes of the operators applied to the left-hand side of the PDE, the variational derivative of the Hamiltonian, and the variational derivative of the dissipation integral, respectively.

We will allow additional keyword arguments to be passed to ConservativeDissipativeNN() so we have the option of adding a dissipative term and an external force to the model.

In [None]:
def setup_pseudo_hamiltonian_nn(kernel_sizes, **kwargs):
    phmodel = phnn.ConservativeDissipativeNN(
        spatial_points,
        kernel_sizes,
        **kwargs,
    )
    return phmodel

kernel_sizes = [1, 3, 0, 0]
phmodel = setup_pseudo_hamiltonian_nn(kernel_sizes)

#### Setup a baseline model
To compare against ConservativeDissiaptiveNN() , we will create a baseline model which will approximate the dynamics using a standard fully connected multilayer perceptron. 

In [None]:
def setup_baseline_nn(**kwargs):
    baseline_nn = phnn.PDEBaselineNN(spatial_points, **kwargs)
    basemodel = phnn.DynamicSystemNN(spatial_points, baseline_nn)
    return basemodel

basemodel = setup_baseline_nn()

#### Train the models

In [None]:
def train_models(*models, epochs=200, batch_size=32, **kwargs):
    for model in models:
        model, _ = phnn.train(
            model,
            integrator="midpoint",
            traindata=traindata,
            epochs=epochs,
            batch_size=batch_size,
            **kwargs
        )
    return models

phmodel, basemodel = train_models(phmodel, basemodel, epochs=1000)

#### Plot the results

We compare the learned model against the true PDE by integrating from an initial condition not in the training data and on a longer time period thain in the training data.

In [None]:
def get_solutions(system, phmodel, basemodel, u0, t_axis):
    u_exact, *_ = system.sample_trajectory(t_axis, x0=u0)
    u_phnn, _ = phmodel.simulate_trajectory(integrator=False, t_sample=t_axis, x0=u0, xspatial=x)
    if basemodel is not None:
        u_baseline, _ = basemodel.simulate_trajectory(
            integrator=False, t_sample=t_axis, x0=u0, xspatial=x
        )
    else:
        u_baseline = None
    return u_exact, u_phnn, u_baseline


def plot_solutions(u_exact, u_model, t_axis, model='', y=None):
    N = u_exact.shape[0]
    fig = plt.figure(figsize=(7,4))
    lw = 2
    colors = [(0,0.4,1),(1,0.7,0.3),(0.2,0.7,0.2),(0.8,0,0.2),(0.5,0.3,.9)]
    if N > 1:
        plt.plot(x, u_exact[0,:], 'k--', linewidth=lw, label='t = 0')
        plt.plot(x, u_exact[int(N/4),:], color = colors[1], linestyle='--', linewidth=lw, label=f't = {1/4*t_axis[-1]}, true')  
        plt.plot(x, u_exact[int(N/2),:], color = colors[2], linestyle='--', linewidth=lw, label=f't = {1/2*t_axis[-1]}, true')  
        plt.plot(x, u_exact[int(3*N/4),:], color = colors[3], linestyle='--', linewidth=lw, label=f't = {3/4*t_axis[-1]}, true')  
        plt.plot(x, u_exact[-1,:], color = colors[4], linestyle='--', linewidth=lw, label=f't = {t_axis[-1]}, true')  
        plt.plot(x, u_model[int(N/4),:], color = colors[1], linestyle='-', linewidth=lw, label=f't = {1/4*t_axis[-1]}, model')  
        plt.plot(x, u_model[int(N/2),:], color = colors[2], linestyle='-', linewidth=lw, label=f't = {1/2*t_axis[-1]}, model')  
        plt.plot(x, u_model[int(3*N/4),:], color = colors[3], linestyle='-', linewidth=lw, label=f't = {3/4*t_axis[-1]}, model')  
        plt.plot(x, u_model[-1,:], color = colors[4], linestyle='-', linewidth=lw, label=f't = {t_axis[-1]}, model')
    else:
        plt.plot(x, u_exact[0,:], 'k--', linewidth=lw, label='True')
        plt.plot(x, u_model[0,:], color = colors[4], linestyle='-', linewidth=lw, label='Model')
    plt.xlabel('$x$', fontsize=12)
    plt.ylabel('$u$' if y is None else y, fontsize=12)
    plt.title(model+' model vs. ground truth', fontsize=14)
    plt.legend()
    plt.show()

k1, k2 = 1., .75
d1, d2 = .25, .5
eta = 6.
P = (x[-1]-x[0])*x.size/(x.size-1)
u0 = (-6./-eta)*2*k1**2 * 1/np.cosh(k1*(x-P*d1))**2
u0 += (-6./-eta)*2*k2**2 * 1/np.cosh(k2*(x-P*d2))**2
t_test = np.linspace(0, 2, 201)

u_exact, u_phnn, u_baseline = get_solutions(KdV_system, phmodel, basemodel, u0, t_test)
plot_solutions(u_exact, u_phnn, t_test, 'PHNN')
plot_solutions(u_exact, u_baseline, t_test, 'Baseline')

In [None]:
if make_videos:
    create_video([u_exact, u_phnn, u_baseline], ['Ground truth', 'PHNN', 'Baseline'], x_axis=x,
                 file_name='pure_kdv.gif', output_format='GIF')

### Learning a KdV-Burgers system with an external force

We now test the PHNN model on a KdV–Burgers system, i.e. a KdV system with a viscosity term, with a space-dependent external force acting on the system:
$$
u_t + \eta u u_x - \nu u_{xx} + \gamma^2 u_{xxx} = f(x, t).
$$

In [None]:
nu = 0.3
def F(u, t):
    return .6*np.sin(2*2*np.pi/period*x)
# def F(u, t):
#     t = np.reshape(t,(-1,1))
#     return .6*np.sin(2*2*np.pi/period*x-t).reshape(u.shape)
kernel_sizes = [1, 3, 1, 1]

In [None]:
def setup_KdV_Burgers_system(x=x, eta=6., gamma=1., nu=0, force=None):
    
    M = x.size
    dx = x[-1]/(M-1)
    e = np.ones(M)
    Dp = 1/dx*spdiags([e,-e,e], np.array([-M+1,0,1]), M, M).toarray() # Forward difference matrix
    D1 = .5/dx*spdiags([e,-e,e,-e], np.array([-M+1,-1,1,M-1]), M, M).toarray() # Central difference matrix
    D2 = 1/dx**2*spdiags([e,e,-2*e,e,e], np.array([-M+1,-1,0,1,M-1]), M, M).toarray() # 2nd order central difference matrix

    def hamiltonian(u):
        return np.sum(-1/6*eta*u**3 + (.5*gamma**2*(np.matmul(Dp,u.T))**2).T, axis=-1)
    
    def dissintegral(u):
        return np.sum(.5*nu*(np.matmul(Dp,u.T)**2).T, axis=-1)

    def hamiltonian_grad(u):
        return -.5*eta*u**2 - (gamma**2 * u @ D2)
    
    def dissintegral_grad(u):
        return -nu*u @ D2
    
    def initial_condition():
        P = (x[-1]-x[0])*M/(M-1)
        sech = lambda a: 1/np.cosh(a)
        def sampler(rng):
            k1, k2 = rng.uniform(0.5, 2.0, 2)
            d1, d2 = rng.uniform(0., 1., 1), rng.uniform(0., 1., 1)
            u0 = 0
            u0 += (-6./-eta)*2 * k1**2 * sech(np.abs(k1 * ((x+P/2-P*d1) % P - P/2)))**2
            u0 += (-6./-eta)*2 * k2**2 * sech(np.abs(k2 * ((x+P/2-P*d2) % P - P/2)))**2
            u0 = np.concatenate([u0[M:], u0[:M]], axis=-1)
            return u0
        return sampler

    KdV_Burgers_system = phsys.ConservativeDissipativeSystem(
        nstates=M,
        skewsymmetric_matrix=D1,
        hamiltonian=hamiltonian,
        grad_hamiltonian=hamiltonian_grad,
        dissintegral=dissintegral,
        grad_dissintegral=dissintegral_grad,
        external_forces=force,
        init_sampler=initial_condition()
    )

    return KdV_Burgers_system

KdV_Burgers_system = setup_KdV_Burgers_system(nu=nu, force=F)

In [None]:
ext_forces_nn = phnn.PDEExternalForcesNN(spatial_points, hidden_dim=100,
                                    timedependent=True, spacedependent=True, statedependent=False,
                                    period=period)
phmodel = setup_pseudo_hamiltonian_nn(kernel_sizes, external_forces_est=ext_forces_nn)
basemodel = setup_baseline_nn(spacedependent=True, period=period)
traindata, t_axis = get_training_data(KdV_Burgers_system)
phmodel, basemodel = train_models(phmodel, basemodel, epochs=1000)

In [None]:
u_exact, u_phnn, u_baseline = get_solutions(KdV_Burgers_system, phmodel, basemodel, u0, t_test)
plot_solutions(u_exact, u_phnn, t_test, 'PHNN')
plot_solutions(u_exact, u_baseline, t_test, 'Baseline')

In [None]:
if make_videos:
    create_video([u_exact, u_phnn, u_baseline], ['Ground truth', 'PHNN', 'Baseline'], x_axis=x,
                 file_name='forced_kdv_burgers.gif', output_format='GIF')

In [None]:
F_exact = KdV_Burgers_system.external_forces(u_exact, t_test).reshape(1,-1)
F_phnn = phmodel.external_forces(torch.tensor(u_phnn.reshape(-1,1,u_phnn.shape[-1]), dtype=ttype),
                               torch.tensor(t_test.reshape(-1,1,1),dtype=ttype),
                               torch.tensor(np.tile(x.reshape(-1, 1, 1), u_phnn.shape[0]).T, dtype=ttype)
                              ).detach().numpy().reshape(u_phnn.shape)
if kernel_sizes[0] > 1:
    d = int((kernel_sizes[0]-1)/2)
    M = x.size
    D_flat = phmodel.D_flat().detach().numpy()
    diagonals = np.concatenate([D_flat[0,:,(d+1):], D_flat[0], D_flat[0,:,:-(d+1)]], axis=1).T.repeat(M, axis=1)
    offsets = np.concatenate([np.arange(-M+1,-M+1+d),np.arange(-d,d+1),np.arange(M-d,M)])
    D = diags(diagonals, offsets, (M,M)).toarray()
    DDinvF_phnn = np.matmul(KdV_Burgers_system.lhs_matrix, np.linalg.solve(D, F_phnn.T)).T
    plot_solutions(F_exact, DDinvF_phnn, t_test, 'PHNN', 'External force')
else:
    plot_solutions(F_exact, F_phnn, t_test, 'PHNN', 'External force')

#### Removing the forces

Since we learned the external forces by a separate neural network in the PHNN model, we can remove this from the model and compare to the exact system without forces:

In [None]:
phmodel_no_force = setup_pseudo_hamiltonian_nn(kernel_sizes=kernel_sizes[:3]+[0],
                                               structure_matrix=phmodel.S(),
                                               dissipation_matrix=phmodel.P(),
                                               symmetric_matrix=phmodel.D_flat(),
                                               hamiltonian_true=phmodel.hamiltonian,
                                               grad_hamiltonian_true=phmodel.dH,
                                               dissintegral_true=phmodel.dissintegral,
                                               grad_dissintegral_true=phmodel.dV
                                              )

KdV_Burgers_system = setup_KdV_Burgers_system(nu=nu, force=None)
u_exact, u_phnn, _ = get_solutions(KdV_Burgers_system, phmodel_no_force, None, u0, t_test)
plot_solutions(u_exact, u_phnn, t_test, 'PHNN')

In [None]:
if make_videos:
    create_video([u_exact, u_phnn], ['Ground truth', 'PHNN'],
                 x_axis=x, file_name='kdv_burgers.gif', output_format='GIF')

We can also remove the dissipation term an obtain a model for the system without viscosity, which corresponds to a KdV system:

In [None]:
phmodel_no_force_or_visc = setup_pseudo_hamiltonian_nn(kernel_sizes=kernel_sizes[:2]+[0,0],
                                                       structure_matrix=phmodel.S(),
                                                       symmetric_matrix=phmodel.D_flat(),
                                                       hamiltonian_true=phmodel.hamiltonian,
                                                       grad_hamiltonian_true=phmodel.dH,
                                                      )

u_exact, u_phnn, _ = get_solutions(KdV_system, phmodel_no_force_or_visc, None, u0, t_test)
plot_solutions(u_exact, u_phnn, t_test, 'PHNN')

In [None]:
if make_videos:
    create_video([u_exact, u_phnn], ['Ground truth', 'PHNN'],
                 x_axis=x, file_name='kdv.gif', output_format='GIF')