# HNN for the KdV equation

We train a Hamiltonian neural network (HNN) model to learn the Korteweg--de Vries (KdV) equation, given by
\begin{equation}
u_t + \eta u u_x + \gamma^2 u_{xxx} = 0,
\end{equation}
where $\eta, \gamma \in \mathbb{R}$. The Hamiltonian
$$
\begin{align*}
\mathcal{H}[u] &= \int_\mathbb{R} \left(-\frac{\eta}{6} u^3 + \frac{\gamma^2}{2}u_x^2 \right)\, dx
\end{align*}
$$
represents the energy, and is conserved, i.e. constant over time.

The variational derivative of the Hamiltonian is
$$
\begin{align*}
\frac{\delta\mathcal{H}}{\delta u}[u] &= - \frac{\eta}{2} u^2 - \gamma^2 u_{xx},\\
\end{align*}
$$
and the KdV equation may be expressed by a product of the skew-symmetric operator $\frac{\partial}{\partial x}$ and this variational derivative. That is,
\begin{equation*}
u_t = - \frac{\partial}{\partial x} \left( \frac{\eta}{2} u^2 + \gamma^2 u_{xx} \right),
\end{equation*}
which we see is equivalent to (1).

#### Exercises:
* Change `N_TRAIN`, `TRAIN_TSTEP` and `TMAX_TRAIN` to experiment with different numbers of training data points and the end point of each evolution
* Finish setting up the HNN by specifying the kernel size of the first convolutional layer

In [None]:
try:
    import matplotlib.pyplot as plt
    import numpy as np
    import torch
    import torch.nn as nn
    from tqdm import trange
    import seaborn as sns
    import phlearn.phsystems.pde as phsys
    import phlearn.phnns as phnn
    from scipy.sparse import spdiags
except ModuleNotFoundError:
    import os
    if not os.path.exists("requirements.txt"):
        print("Downloading requirements.txt from GitHub...")
        import urllib.request
        url = "https://raw.githubusercontent.com/SINTEF-Digital-Analytics-and-AI/NLDL-tutorial/main/requirements.txt"
        urllib.request.urlretrieve(url, "requirements.txt")
    import subprocess
    import sys
    subprocess.check_call([sys.executable, "-m", "pip", "install", "-r", "requirements.txt"])
    import matplotlib.pyplot as plt
    import numpy as np
    import torch
    import torch.nn as nn
    from tqdm import trange
    import seaborn as sns
    import phlearn.phsystems.pde as phsys
    import phlearn.phnns as phnn
    from scipy.sparse import spdiags
if int(np.__version__.split('.')[0]) >= 2:
    import subprocess
    import sys
    print("NumPy version >= 2 detected. Downgrading to a compatible version...")
    subprocess.check_call([sys.executable, "-m", "pip", "install", "numpy<2"])
    print("Please restart the kernel and rerun the script.")

np.random.seed(1)
torch.random.manual_seed(1)

plt.rcParams['font.size'] = 12
plt.rcParams['lines.markersize'] = 10
plt.rcParams['legend.fontsize'] = 10
colors = sns.color_palette([(0.6,0.8,.8), (1,0.7,0.3), (0.2,0.7,0.2), (0.8,0,0.2), (0,0.4,1), (0.6,0.5,.9), (0.5,0.3,.5)])

### Generate training data

We use the [phlearn](https://github.com/SINTEF/pseudo-hamiltonian-neural-networks) package to set up the system and generate training data.

In [None]:
eta = 6.
gamma = 1.
period = 20
spatial_points = 100
x = np.linspace(0, period-period/spatial_points, spatial_points)

def setup_KdV_system(x=x, eta=6., gamma=1.):
    
    M = x.size
    dx = x[-1]/(M-1)
    e = np.ones(M)
    Dp = 1/dx*spdiags([e,-e,e], np.array([-M+1,0,1]), M, M).toarray() # Forward difference matrix
    D1 = .5/dx*spdiags([e,-e,e,-e], np.array([-M+1,-1,1,M-1]), M, M).toarray() # Central difference matrix
    D2 = 1/dx**2*spdiags([e,e,-2*e,e,e], np.array([-M+1,-1,0,1,M-1]), M, M).toarray() # 2nd order central difference matrix

    def hamiltonian(u):
        return np.sum(-1/6*eta*u**3 + (.5*gamma**2*(np.matmul(Dp,u.T))**2).T, axis=-1)

    def hamiltonian_grad(u):
        return -.5*eta*u**2 - (gamma**2 * u @ D2)
    
    def initial_condition():
        P = (x[-1]-x[0])*M/(M-1)
        sech = lambda a: 1/np.cosh(a)
        def sampler(rng):
            k1, k2 = rng.uniform(0.5, 1., 2)
            d1, d2 = rng.uniform(0., 1., 1), rng.uniform(0., 1., 1)
            u0 = 0
            u0 += (-6./-eta)*2 * k1**2 * sech(np.abs(k1 * ((x+P/2-P*d1) % P - P/2)))**2
            u0 += (-6./-eta)*2 * k2**2 * sech(np.abs(k2 * ((x+P/2-P*d2) % P - P/2)))**2
            u0 = np.concatenate([u0[M:], u0[:M]], axis=-1)
            return u0
        return sampler

    KdV_system = phsys.PseudoHamiltonianPDESystem(
        nstates=M,
        skewsymmetric_matrix=D1,
        hamiltonian=hamiltonian,
        grad_hamiltonian=hamiltonian_grad,
        init_sampler=initial_condition()
    )

    return KdV_system


KdV_system = setup_KdV_system(eta=eta, gamma=gamma)

In [None]:
def get_training_data(system, data_points=5, dt=.02, tmax=.02, x=x):
    nt = round(tmax / dt)
    t_axis = np.linspace(0, tmax, nt + 1)
    ntrajectories_train = int(np.ceil(data_points / nt))
    traindata = phnn.generate_dataset(system, ntrajectories_train, t_axis, xspatial=x)
    return traindata, t_axis, ntrajectories_train, nt

**Exercise:** Experiment with different number of training data points and the end point of each evolution, which determines how many different evolutions you have (by `ntrajectories_train = int(np.ceil(data_points / nt))`).

In [None]:
N_TRAIN = 1 # Number of training states (one data point is a solution at every spatial step at one time)
TRAIN_TSTEP = 0.02 # Time step
TMAX_TRAIN = 0.02 # End time of each evolution in the training data

In [None]:
traindata, t_axis, ntrajectories_train, nt = get_training_data(KdV_system, data_points=N_TRAIN, dt=TRAIN_TSTEP, tmax=TMAX_TRAIN)

# Extracting the necessary data and reshaping the arrays:
u_start = traindata[0][0]
u_end = traindata[0][1]
u_midpoint = (u_start+u_end)/2
dudt = traindata[1]
x = x.reshape((1,-1))
t = t_axis.reshape((1,-1))[:,:-1]
u_exact = u_start.squeeze(1).detach().numpy()
dx = (x[..., 1] - x[..., 0])[0]

Plot some training data:

In [None]:
time_fractions = [1/4, 1/2, 3/4, 1-1/nt]
max_plots = min(nt, len(time_fractions))

for fraction in time_fractions[:max_plots]:
    fig = plt.figure(figsize=(10, 3))
    i_time = int(round(nt * fraction))
    for i in range(min(ntrajectories_train, 5)):
        plt.plot(x[0, :], u_exact[i_time + i * nt, :], linewidth=2, label=f'Evolution {i+1}')
    plt.xlabel('$x$')
    plt.ylabel('$u(t,x)$')
    plt.title(f'$t = {t[0, i_time]:.2f}$')
    plt.legend()
    plt.show()

### Set up the HNN model

We impose the periodic boundary conditions by padding the solution vectors.

**Exercise:** Set the size of the first convolutional kernel so that we can learn finite difference operators of the order necessary to approximate the spatial derivatives we wish to model. Note that it should correspond to the padding.

In [None]:
class HNN(nn.Module):
    def __init__(self, conv_kernel_size=SET_KERNEL_SIZE):
        super().__init__()
        self.padding_size = 1
        self.hamiltonian_net = nn.Sequential(
            nn.Conv1d(1, 100, kernel_size=conv_kernel_size),
            nn.Tanh(),
            nn.Conv1d(100, 100, kernel_size=1),
            nn.Tanh(),
            nn.Conv1d(100, 100, kernel_size=1, bias=None)
        )

    def forward_padding(self, x):
        return torch.cat([x, x[..., :self.padding_size]], dim=-1)

    def summation(self, x):
        return x.sum(dim=tuple(range(1, x.ndim)), keepdim=True)

    def hamiltonian(self, u):
        u_padded = self.forward_padding(u)
        H = self.hamiltonian_net(u_padded)
        return self.summation(H)

    def forward(self, u, dx):
        H = self.hamiltonian(u)
        dH = torch.autograd.grad(H.sum(), u, create_graph=True)[0]
        dH_padded = torch.cat([dH[..., u.shape[-1] - 1 :], dH, dH[..., :1]], dim=-1)
        S = (torch.tensor([-1., 0., 1.], dtype=torch.float32) / (2 * dx)).reshape(1, 1, 3).to(u.device)
        return torch.nn.functional.conv1d(dH_padded, S)

In [None]:
def train(model, x, dxdt, nepochs=10000, learning_rate=1e-3, **kwargs):

    # Move model to device
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)
    # Convert inputs to tensors and move to device
    x = torch.tensor(x, requires_grad=True, dtype=torch.float32).to(device)
    dxdt = torch.tensor(dxdt, dtype=torch.float32).to(device)
    
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    losses = []
    
    with trange(nepochs, desc="Training the model") as pbar:
        for epoch in range(nepochs):
            optimizer.zero_grad()
            
            # Compute loss by comparing the left-hand side (LHS) and the right-hand side (RHS) of the discretized PDE:
            rhs = model(x, **kwargs)
            loss = torch.mean((dxdt - rhs) ** 2)
            
            # Backpropagation and optimization step:
            loss.backward(retain_graph=True)
            optimizer.step()
            
            # Log the loss value:
            losses.append(loss.item())
            if epoch % 100 == 0 or epoch == nepochs - 1:
                pbar.set_postfix(loss=loss.item())
            pbar.update(1)
    
    # Plot the loss curve
    plt.figure(figsize=(7, 4))
    plt.plot(losses)
    plt.yscale('log')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Training Loss')
    plt.show()


### Compile and train the model

In [None]:
model = HNN()
train(model, u_midpoint, dudt, nepochs=2000, dx=dx)

### Integrate the learned flow and compare to exact solution

We want to integrate the learned model and compare to integration of the discretized true system. First, we set up the vector field that denotes the right-hand side of the spatially discretized PDE.

In [None]:
M = x.size
e = np.ones(M)
D1 = .5/dx*spdiags([e,-e,e,-e], np.array([-M+1,-1,1,M-1]), M, M).toarray() # Central difference matrix
D2 = 1/dx**2*spdiags([e,e,-2*e,e,e], np.array([-M+1,-1,0,1,M-1]), M, M).toarray() # 2nd order central difference matrix

def true_f(u):
    dH = -.5*eta*u**2 - (gamma**2 * u @ D2)
    SdH = D1 @ dH
    return SdH

def learned_hnn_f(u):
    u = torch.tensor(u.reshape(1,1,-1), requires_grad=True, dtype=torch.float32)
    H = model.hamiltonian(u)
    dH = torch.autograd.grad(H.sum(), u, retain_graph=False, create_graph=False)[0]
    SdH = torch.tensor(D1, dtype=torch.float32) @ dH.reshape(-1,1)
    return SdH.flatten().detach().numpy()

Decide on an initial state for the test:

In [None]:
k1, k2 = 0.6, 0.8
d1, d2 = 0.3, 0.8
M = x.size
P = (x[0,-1]-x[0,0])*M/(M-1)
sech = lambda a: 1/np.cosh(a)
u0 = 0
u0 += (-6./-eta)*2 * k1**2 * sech(np.abs(k1 * ((x+P/2-P*d1) % P - P/2)))**2
u0 += (-6./-eta)*2 * k2**2 * sech(np.abs(k2 * ((x+P/2-P*d2) % P - P/2)))**2
u0 = np.concatenate([u0[M:], u0[:M]], axis=0)[0,:]

# u0 = u_exact[0,:] # If testing with initial condition from training data

Integrate with the classic Runge–Kutta method:

In [None]:
def rk4(f, u, t_end, dt):
    t_steps = np.arange(0, t_end + dt, dt)
    u_steps = np.zeros((len(t_steps),) + np.shape(u))
    
    u_steps[0] = u
    for i in range(1, len(t_steps)):
        k1 = dt * f(u)
        k2 = dt * f(u + .5*k1)
        k3 = dt * f(u + .5*k2)
        k4 = dt * f(u + k3)
        u = u + (k1 + 2*k2 + 2*k3 + k4) / 6
        u_steps[i] = u
        
    return t_steps, u_steps

ts, u_true = rk4(true_f, u0, 4, .001)
ts, u_learned = rk4(learned_hnn_f, u0, 4, .001)

In [None]:
figsize = (10, 3)
indices = [
    int(round((u_learned.shape[0] - 1) * 1/4)),
    int(round((u_learned.shape[0] - 1) * 1/2)),
    int(round((u_learned.shape[0] - 1) * 3/4)),
    int(round((u_learned.shape[0] - 1)))
]

for i, idx in enumerate(indices):
    fig = plt.figure(figsize=figsize)
    plt.plot(x[0, :], u_true[idx, :] if i < 3 else u_true[idx - 1, :], 'k-', label='Integrated true flow')
    plt.plot(x[0, :], u_learned[idx, :], 'g-', label='Integrated learned flow')
    plt.xlabel('$x$')
    plt.ylabel('$u(t,x)$')
    plt.title('$t = %.2f$' % ts[idx])
    if i == 0:
        plt.legend()
    plt.show()


### Further modelling

The [phlearn](https://github.com/SINTEF/pseudo-hamiltonian-neural-networks) package can be used to set up PHNNs that can learn PDEs with damping (e.g. due to viscosity) and with external forces acting on the system. [This notebook](https://github.com/SINTEF/pseudo-hamiltonian-neural-networks/blob/main/example_scripts/kdv_example.ipynb) shows how to train a PHNN model for the forced KdV-Burgers equation, i.e.
$$
u_t + \eta u u_x - \nu u_{xx} + \gamma^2 u_{xxx} = f(x, t).
$$
[This notebook](https://github.com/SINTEF/pseudo-hamiltonian-neural-networks/blob/main/example_scripts/phnn_pde_examples.ipynb) is set up to learn several different pseudo-Hamiltonian PDEs. In these notebooks you can also compare the PHNN models to baseline models that do not assume a pseudo-Hamiltonian structure.