In [1]:
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim

In [2]:
# Define the model
class FiniteDifferenceOperator(nn.Module):
    def __init__(self, Nleft, Nright, num_layers, hidden_dim):
        super(FiniteDifferenceOperator, self).__init__()
        # Number of total nodes in finite difference stencils
        self.Nstencil = Nleft + Nright + 1
        self.Nleft = Nleft
        self.Nright = Nright

        # The learnable part of the stencil
        self.stencil = torch.nn.Parameter(torch.randn(self.Nstencil))
        # The diffusion parameter. To enforce positivity, we evolve the log of the magnitude
        self.logstabilizer = torch.nn.Parameter(torch.tensor(0.0, dtype=torch.float64))

        # Construct nonlinear network
        # -- The first layer scales up from Nstencil point to a hidden dimension (Input layer)
        # -- Next, hidden layers repeatedly alternate between linear transforms and nonlinear activations (Hidden layers)
        # -- Finally, a linear layer maps back down to Nstencil outputs
        # We'll use skip connections in the hidden layer to improve numerical stability

        layers = []

        # Input layer
        layers.append(nn.Linear(self.Nstencil, hidden_dim, dtype=torch.float64))
        layers.append(nn.Tanh())  # Activation function

        # Hidden layers
        # First make a list of hidden layers
        hiddenlayers = []
        for _ in range(num_layers):
            block = nn.Sequential(
                nn.Linear(hidden_dim, hidden_dim, dtype=torch.float64), nn.Tanh()
            )
            hiddenlayers.append(block)

        # Next apply skip connections
        class SkipConnectionBlock(nn.Module):
            def __init__(self, block):
                super(SkipConnectionBlock, self).__init__()
                self.block = block

            def forward(self, x):
                return x + self.block(x)

        skip_connection_layers = [SkipConnectionBlock(block) for block in hiddenlayers]
        hidden_layers = nn.Sequential(*skip_connection_layers)
        layers.extend(hidden_layers)

        # Output layer
        output_layer = nn.Linear(hidden_dim, self.Nstencil, dtype=torch.float64)
        output_layer.weight.data.fill_(0.0)  # Set weights to zero
        output_layer.bias.data.fill_(0.0)  # Set bias to zero (if any)
        layers.append(output_layer)

        self.nonlinear_network = nn.Sequential(*layers)

    def forward(self, x, h):
        # Apply the finite difference stencil to the gridfunction x, assuming periodic BC
        N_nodes = x.shape[0]
        f_out = torch.zeros_like(x)
        for i in range(N_nodes):
            # Wrap indices periodically using the modulo operator (%)
            indices = [
                (i + j - self.Nleft) % (N_nodes - 1) for j in range(self.Nstencil)
            ]

            # Grab solution at indices
            xstencil = x[indices]

            # Hard coded diffusion operator
            D2x = h ** (-2) * (
                x[(i - 1) % (N_nodes - 1)] - 2 * x[i] + x[(i + 1) % (N_nodes - 1)]
            )

            # Apply neural network to xstencil
            nonlinear_output = self.nonlinear_network(xstencil)

            # Add constraint that sum of stencil coefficient = 0
            nonlinear_output[-1] = -torch.sum(nonlinear_output[:-1])

            # Apply learned stencil to xstencil, including the viscosity term
            # f_out[i] = torch.sum(self.stencil * xstencil) + torch.exp(self.logstabilizer)*D2x
            f_out[i] = (
                torch.sum(nonlinear_output * xstencil)
                + torch.exp(self.logstabilizer) * D2x
            )

        # Return stencil applied to current state consisting of nonlinearity and stabilizing diffusion
        return f_out

In [3]:
class HamiltonianNet(nn.Module):
    def __init__(self, hidden_dim=32):
        super().__init__()

        # either use learned T (self.T) or true T (self.true_T)
        # Kinetic energy network T(p)
        self.Tnet = nn.Sequential(
            nn.Linear(2, hidden_dim),
            nn.Tanh(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.Tanh(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.Tanh(),
            nn.Linear(hidden_dim, 1),
        )

        # Potential energy network V(q)
        self.Vnet = nn.Sequential(
            nn.Linear(2, hidden_dim),
            nn.Tanh(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.Tanh(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.Tanh(),
            nn.Linear(hidden_dim, 1),
        )

    def T(self, p):
        """Kinetic energy T(p)"""
        return 0.5 * p.norm(dim=-1) ** 2
        # return self.Tnet(p)

    def V(self, q):
        """Potential energy V(q)"""
        return self.Vnet(q)

    def hamiltonian(self, q, p):
        """Compute Hamiltonian H = T(p) + V(q)"""
        return self.T(p) + self.V(q)

    def forward(self, q, p, dt):
        """Leapfrog integration step"""
        # Set requires_grad=True for q and p so that we can take derivatives of the Hamiltonian
        q = q.requires_grad_(True)
        p = p.requires_grad_(True)

        # Half step in momentum
        dV = torch.autograd.grad(self.V(q).sum(), q, create_graph=True)[0]
        p_half = p - 0.5 * dt * dV

        # Full step in position
        dT = torch.autograd.grad(self.T(p_half).sum(), p_half, create_graph=True)[0]
        q_new = q + dt * dT

        # Half step in momentum
        dV = torch.autograd.grad(self.V(q_new).sum(), q_new, create_graph=True)[0]
        p_new = p_half - 0.5 * dt * dV

        return [q_new, p_new]