## Problem Statement

Consider a nonlinear dynamical system characterized by its state $x \in \mathbb{R}^n$ and the corresponding vector field $F(x) \in \mathbb{R}^n$. Given a set of $N$ snapshots of the system's state, denoted as $\{x_1, x_2, \dots, x_N\}$, along with their associated time derivatives $\{\dot{x}_1, \dot{x}_2, \dots, \dot{x}_N\}$, where $\dot{x}_i = F(x_i)$, we seek to learn a mapping $\alpha(x): \mathbb{R}^n \rightarrow \mathbb{R}^m$, a linear transformation matrix $A \in \mathbb{R}^{m \times m}$, and an offset vector $b \in \mathbb{R}^m$ such that the relationship

\begin{equation}
J_\alpha(x_i) \cdot \dot{x}_i = A \cdot \alpha(x_i) + b
\end{equation}

holds for all $i = 1, 2, \dots, N$. Here, $J_\alpha(x)$ denotes the Jacobian matrix of the mapping $\alpha(x)$ with respect to the state $x$.

This constraint can be approximated by minimizing the mean squared error between the left-hand side and the right-hand side of the equation across all training examples. The optimization objective is therefore defined as

\begin{equation}
\min_{\alpha, A, b} \frac{1}{N} \sum_{i=1}^N \left\| J_\alpha(x_i) \cdot \dot{x}_i - \left( A \cdot \alpha(x_i) + b \right) \right\|_2^2,
\end{equation}

where $\|\cdot\|_2$ denotes the Euclidean norm. The learned transformation $\alpha(x)$ aims to map the original nonlinear dynamics into a space where they can be approximated by an affine system.

## Architecture

The architecture consists of two main components: a neural network that learns a mapping $\alpha(x)$ from the original state space to a higher-dimensional space, and a learned linear operator $\mathbf{A}$ with an offset vector $\mathbf{b}$, which operates in this higher-dimensional space. 

### Mapping Network


The mapping network $\alpha(x)$ is a feedforward neural network that transforms the input state vector $x \in \mathbb{R}^n$ into a higher-dimensional space $\mathbb{R}^m$. The network consists of four fully connected layers:

- **Layer 1**: A fully connected layer with 128 neurons, followed by a ReLU activation function.
- **Layer 2**: A fully connected layer with 64 neurons, followed by a ReLU activation function.
- **Layer 3**: A fully connected layer with 64 neurons, followed by a ReLU activation function.
- **Output Layer**: A fully connected layer that outputs the final mapping vector $\alpha(x) \in \mathbb{R}^m$.

The input to the network is first normalized using Batch Normalization. The activation function used in the hidden layers is ReLU, which introduces nonlinearity into the network. The output layer provides the mapped high-dimensional representation $\alpha(x)$ without an activation function, ensuring that the output can take any value in $\mathbb{R}^m$.

### Linear Operator and Offset

This transformation is mathematically described as:

\begin{equation}
\mathbf{A} \alpha(x) + \mathbf{b}
\end{equation}

### Asymmetric Learning

The learning process is asymmetric, employing a differential learning approach where one network (the mapping network) is optimized more intensively than the other (the linear operator and offset). This is because the mapping network $\alpha(x)$ is responsible for transforming the nonlinear dynamics into a space where they can be more easily modeled as an affine system, which is a more complex task.


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim


def F(state):
    """
    The Lorenz attractor nonlinear flow F(x)

    TODO: use autokoopman to integration
    """
    sigma = 10.0
    rho = 28.0
    beta = 8.0 / 3.0
    
    x = state[:, 0]
    y = state[:, 1]
    z = state[:, 2]
    
    dxdt = sigma * (y - x)
    dydt = x * (rho - z) - y
    dzdt = x * y - beta * z
    
    return torch.stack([dxdt, dydt, dzdt], dim=1)

    
class MappingNet(nn.Module):
    """
    The neural network for a(x) with high-dimensional output
    """
    def __init__(self, input_dim, output_dim=32):  # 32-dimensional a(x)
        super(MappingNet, self).__init__()
        self.normalizer = nn.BatchNorm1d(input_dim)  # Input normalizer
        self.fc1 = nn.Linear(input_dim, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 64)
        self.fco = nn.Linear(64, output_dim)
        
    def forward(self, x):
        x = self.normalizer(x)  # Normalize input
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = torch.relu(self.fc3(x))
        a = self.fco(x)
        return a


class LinearParams(nn.Module):
    """
    The learned linear operator A and offset b
    """
    def __init__(self, input_dim):  # Map from 32-dimensional a(x) to 3-dimensional x
        super(LinearParams, self).__init__()
        self.A = nn.Parameter(torch.randn(input_dim, input_dim))
        self.b = nn.Parameter(torch.randn(input_dim))
        
    def forward(self, a):
        return torch.matmul(self.A, a.T).T + self.b


def compute_jacobian(network, x):
    """Ja(x)"""
    # Make sure x requires gradients
    x = x.clone().detach().requires_grad_(True)
    
    # Forward pass to get the output a(x)
    a = network(x)
    
    # Initialize an empty list to store the Jacobian
    jacobian = []
    
    # Compute the Jacobian for each output with respect to the input
    for i in range(a.shape[1]):
        # Create a tensor of the same shape as a, filled with zeros except for the i-th column
        grad_output = torch.zeros_like(a)
        grad_output[:, i] = 1.0
        
        # Compute the gradient of the i-th output w.r.t. x
        jac_i = torch.autograd.grad(outputs=a, inputs=x,
                                    grad_outputs=grad_output, create_graph=True)[0]
        
        jacobian.append(jac_i)
    
    # Stack the computed gradients along a new dimension to form the Jacobian
    return torch.stack(jacobian, dim=1)  # Shape: (batch_size, a_dim, input_dim)

In [None]:
# use this for tensorboard
#!pip install tensorboard
#%load_ext tensorboard

In [None]:
%tensorboard --logdir=runs
from torch.utils.tensorboard import SummaryWriter
#from torch.utils.tensorboard import notebook

# Initialize the writer
writer = SummaryWriter()

# Initialize dimensions
input_dim = 3  # Lorenz system state: (x, y, z)
a_dim = 64    # High-dimensional a(x)
output_dim = 3 # Output dimension should match the state dimension

# Initialize models
mapping_net = MappingNet(input_dim, output_dim=a_dim)
linear_params = LinearParams(input_dim=a_dim)

# Optimizers for both parts
optimizer_a = optim.Adam(mapping_net.parameters(), lr=1e-4)
optimizer_linear = optim.Adam(linear_params.parameters(), lr=1e-4)

# Loss function
mse_loss = nn.MSELoss()

# Training loop
num_epochs = 3000
mapping_steps = 5  # Number of mapping optimizer steps per epoch

for epoch in range(num_epochs):

    # Generate more random input state data
    x = torch.randn((256, input_dim), requires_grad=True)  # More states (x, y, z) samples
    
    for _ in range(mapping_steps):
        # Step 1: Optimize A and b
        optimizer_a.zero_grad()
        
        # Forward pass through the network
        a = mapping_net(x)
        
        # Recalculate the Jacobian of a(x)
        #Ja = compute_jacobian(a, x)
        Ja = compute_jacobian(mapping_net, x)
        
        # Recalculate the left and right sides
        left_side = torch.einsum('bij,bj->bi', Ja, F(x))
        right_side = linear_params(a)
        #left_side = F(x)
        #right_side =  torch.einsum('bij,bj->bi', pinv(Ja), linear_params(mapping_net(x)))
        
        # Compute the loss for a(x)
        loss = mse_loss(left_side, right_side)
        loss.backward(retain_graph=True)
        optimizer_a.step()
    
    # Step 2: Optimize a(x)
    optimizer_linear.zero_grad()
    
    # Forward pass through the network
    a = mapping_net(x)
    
    # Recalculate the Jacobian of a(x)
    #Ja = compute_jacobian(a, x)
    Ja = compute_jacobian(mapping_net, x)
    
    # Recalculate the left and right sides
    left_side = torch.einsum('bij,bj->bi', Ja, F(x))
    right_side = linear_params(a)
    #left_side = F(x)
    #right_side =  torch.einsum('bij,bj->bi', pinv(Ja), linear_params(mapping_net(x)))
    
    # Compute the loss for a(x)
    loss_a = mse_loss(left_side, right_side)
    loss_a.backward(retain_graph=True)
    optimizer_linear.step()

    #Javi = pinv(compute_jacobian(mapping_net(xv), xv))
    #left_side = torch.einsum('bij,bj->bi', Javi, mapping_net(xv))

    #print(left_side, F(xv))
    
    # Print loss every 100 epochs
    #if epoch % 100 == 0:
    #    print(f'Epoch {epoch}, Loss (a): {loss_a.item()}')

    writer.add_scalar('Loss/train', loss_a.item(), epoch)

writer.close()

In [None]:
# compare against random values
x = torch.randn((256, input_dim), requires_grad=True)
res =  torch.einsum('bij,bj->bi', (compute_jacobian(mapping_net, x)), F(x)) - linear_params(mapping_net(x))
res