# Setup

## Model

In [254]:
import logging
import math

import torch
import torch.nn.functional as F
from torch import nn

#
# From StableDynamics.ipynb; Zico Kolter
#

# You can use this to compensate for numeric error:
VERIFY = False
V_SCALE = 0.01

global V_WRAP, SCALE_FX
V_WRAP = False
SCALE_FX = False

class Dynamics(nn.Module):
    def __init__(self, fhat, V, alpha=0.01):
        super().__init__()
        self.fhat = fhat
        self.V = V
        self.alpha = 0 # alpha

    def forward(self, x):
        fx = self.fhat(x)
        if SCALE_FX:
            fx = fx / fx.norm(p=2, dim=1, keepdim=True).clamp(min=1.0)

        Vx = self.V(x)
        gV = torch.autograd.grad([a for a in Vx], [x], create_graph=True, only_inputs=True)[0]
        rv = fx - gV * (F.relu((gV*fx).sum(dim=1) + self.alpha*Vx[:,0])/(gV**2).sum(dim=1))[:,None]

        if VERIFY:
            # Verify that rv has no positive component along gV.
            # This helps us catch:
            #   (1) numeric error in the symbolic gradient calculation, and
            #   (2) Violation of the Lyapunov function when Euler integration is used.
            verify = (gV * rv).sum(dim=1)
            num_violation = len([v for v in verify if v > 0]) # (1)
            new_V = self.V(x + V_SCALE * rv)
            if (new_V > Vx).any(): # (2)
                err = sorted([v for v in (new_V - Vx).detach().cpu().numpy().ravel() if v > 0], reverse=True)

        return rv

class ICNN(nn.Module):
    def __init__(self, layer_sizes, activation=F.relu_):
        super().__init__()
        self.W = nn.ParameterList([nn.Parameter(torch.Tensor(l, layer_sizes[0])) 
                                   for l in layer_sizes[1:]])
        self.U = nn.ParameterList([nn.Parameter(torch.Tensor(layer_sizes[i+1], layer_sizes[i]))
                                   for i in range(1,len(layer_sizes)-1)])
        self.bias = nn.ParameterList([nn.Parameter(torch.Tensor(l)) for l in layer_sizes[1:]])
        self.act = activation
        self.reset_parameters()

    def reset_parameters(self):
        # copying from PyTorch Linear
        for W in self.W:
            nn.init.kaiming_uniform_(W, a=5**0.5)
        for U in self.U:
            nn.init.kaiming_uniform_(U, a=5**0.5)
        for i,b in enumerate(self.bias):
            fan_in, _ = nn.init._calculate_fan_in_and_fan_out(self.W[i])
            bound = 1 / (fan_in**0.5)
            nn.init.uniform_(b, -bound, bound)

    def forward(self, x):
        z = F.linear(x, self.W[0], self.bias[0])
        z = self.act(z)

        for W,b,U in zip(self.W[1:-1], self.bias[1:-1], self.U[:-1]):
            z = F.linear(x, W, b) + F.linear(z, F.softplus(U)) / U.shape[0]
            z = self.act(z)

        return F.linear(x, self.W[-1], self.bias[-1]) + F.linear(z, F.softplus(self.U[-1])) / self.U[-1].shape[0]



class ReHU(nn.Module):
    """ Rectified Huber unit"""
    def __init__(self, d):
        super().__init__()
        self.a = 1/d
        self.b = -d/2

    def forward(self, x):
        return torch.max(torch.clamp(torch.sign(x)*self.a/2*x**2,min=0,max=-self.b),x+self.b)

class MakePSD(nn.Module):
    def __init__(self, f, n, eps=0.01, d=1.0):
        super().__init__()
        self.f = f
        self.zero = torch.nn.Parameter(f(torch.zeros(1,n)), requires_grad=False)
        self.eps = eps
        self.d = d
        self.rehu = ReHU(self.d)

    def forward(self, x):
        smoothed_output = self.rehu(self.f(x) - self.zero)
        quadratic_under = self.eps*(x**2).sum(1,keepdim=True)
        return smoothed_output + quadratic_under

class PosDefICNN(nn.Module):
    def __init__(self, layer_sizes, eps=0.1, negative_slope=0.05):
        super().__init__()
        self.W = nn.ParameterList([nn.Parameter(torch.Tensor(l, layer_sizes[0])) 
                                   for l in layer_sizes[1:]])
        self.U = nn.ParameterList([nn.Parameter(torch.Tensor(layer_sizes[i+1], layer_sizes[i]))
                                   for i in range(1,len(layer_sizes)-1)])
        self.eps = eps
        self.negative_slope = negative_slope
        self.reset_parameters()

    def reset_parameters(self):
        # copying from PyTorch Linear
        for W in self.W:
            nn.init.kaiming_uniform_(W, a=5**0.5)
        for U in self.U:
            nn.init.kaiming_uniform_(U, a=5**0.5)

    def forward(self, x):
        z = F.linear(x, self.W[0])
        F.leaky_relu_(z, negative_slope=self.negative_slope)

        for W,U in zip(self.W[1:-1], self.U[:-1]):
            z = F.linear(x, W) + F.linear(z, F.softplus(U))*self.negative_slope
            z = F.leaky_relu_(z, negative_slope=self.negative_slope)

        z = F.linear(x, self.W[-1]) + F.linear(z, F.softplus(self.U[-1]))
        return F.relu(z) + self.eps*(x**2).sum(1)[:,None]

def loss(Ypred, Yactual, X):
    # Force smoothness in V:
    # penalty for new_V being larget than old V:
    Vloss = torch.tensor(0)
    if SMOOTH_V:
        V = model.V
        # Successor to X:
        succ_X = (X + V_SCALE * Yactual).detach()
        if V_WRAP:
            while torch.any(succ_X < -math.pi):
                succ_X[succ_X < -math.pi] = succ_X[succ_X < -math.pi] + 2 * math.pi
            while torch.any(succ_X >= math.pi):
                succ_X[succ_X >= math.pi] = succ_X[succ_X >= math.pi] - 2 * math.pi
            succ_X.requires_grad_()

        Vloss = (V(succ_X) - V(X)).clamp(min=0).mean()

    l2loss = ((Ypred - Yactual)**2).mean()

    return (l2loss + SMOOTH_V * Vloss, l2loss, Vloss)

global model, SMOOTH_V
model = None
SMOOTH_V = 0

def loss_flatten(l):
    return l

def loss_labels():
    return ["loss", "l2", "V"]

def summary(*a, **kw):
    pass

### Verification

In [255]:
lsd = 2
ph_dim = 32
h_dim = 32

fhat = nn.Sequential(nn.Linear(lsd, h_dim), nn.ReLU(),
                        nn.Linear(h_dim, h_dim), nn.ReLU(),
                        nn.Linear(h_dim, lsd))

rehu = 0.005
projfn_eps = 0.01

V = MakePSD(ICNN([lsd, ph_dim, ph_dim, 1], activation=ReHU(float(0.005))), lsd, eps=projfn_eps, d=1.0)
model = Dynamics(fhat, V, alpha=0)


In [256]:
model.load_state_dict(torch.load("trained_models/checkpoint_10000.pth"))
model.eval()

Dynamics(
  (fhat): Sequential(
    (0): Linear(in_features=2, out_features=32, bias=True)
    (1): ReLU()
    (2): Linear(in_features=32, out_features=32, bias=True)
    (3): ReLU()
    (4): Linear(in_features=32, out_features=2, bias=True)
  )
  (V): MakePSD(
    (f): ICNN(
      (W): ParameterList(
          (0): Parameter containing: [torch.float32 of size 32x2]
          (1): Parameter containing: [torch.float32 of size 32x2]
          (2): Parameter containing: [torch.float32 of size 1x2]
      )
      (U): ParameterList(
          (0): Parameter containing: [torch.float32 of size 32x32]
          (1): Parameter containing: [torch.float32 of size 1x32]
      )
      (bias): ParameterList(
          (0): Parameter containing: [torch.float32 of size 32]
          (1): Parameter containing: [torch.float32 of size 32]
          (2): Parameter containing: [torch.float32 of size 1]
      )
      (act): ReHU()
    )
    (rehu): ReHU()
  )
)

## Real energy function

In [257]:
from sympy import Dummy, lambdify, srepr, symbols
from sympy.core.function import AppliedUndef
from sympy.core.sympify import sympify
from sympy.physics import mechanics
from sympy.physics.vector import Vector
from sympy.printing.printer import Printer
import numpy as np
import torch

from models import sympy2torch

def pendulum_energy(n=1, lengths=1, masses=1, include_gpe=True, include_ke=True):
    # Generalized coordinates and velocities
    # (in this case, angular positions & velocities of each mass) 
    q = mechanics.dynamicsymbols('q:{0}'.format(n))
    u = mechanics.dynamicsymbols('u:{0}'.format(n))

    # mass and length
    m = symbols('m:{0}'.format(n))
    l = symbols('l:{0}'.format(n))

    # gravity and time symbols
    g, t = symbols('g,t')

    #--------------------------------------------------
    # Step 2: build the model using Kane's Method

    # Create pivot point reference frame
    A = mechanics.ReferenceFrame('A')
    P = mechanics.Point('P')
    Origin = P
    P.set_vel(A, 0)

    gravity_direction = -A.x

    # lists to hold particles, forces, and kinetic ODEs
    # for each pendulum in the chain
    particles = []
    forces = []

    gpe = []
    ke = []

    cartVel = 0.0
    cartPos = 0.0

    for i in range(n):
        # Create a reference frame following the i^th mass
        Ai = A.orientnew('A' + str(i), 'Axis', [q[i], A.z])
        Ai.set_ang_vel(A, u[i] * A.z)

        # Create a point in this reference frame
        Pi = P.locatenew('P' + str(i), l[i] * Ai.x)
        Pi.v2pt_theory(P, A, Ai)

        # Create a new particle of mass m[i] at this point
        Pai = mechanics.Particle('Pa' + str(i), Pi, m[i])
        particles.append(Pai)

        # Calculate the cartesian position and velocity:
        # cartPos += l[i] * q[i]
        pos = Pi.pos_from(Origin)

        # This looks strange, but is verified numerically:
        ke.append(1/n * Pai.kinetic_energy(A))
        gpe.append(m[i] * g * (Pi.pos_from(Origin) & gravity_direction))

        P = Pi


    # lengths and masses
    if lengths is None:
        lengths = np.ones(n) / n
    lengths = np.broadcast_to(lengths, n)
    masses = np.broadcast_to(masses, n)

    # Fixed parameters: gravitational constant, lengths, and masses
    parameters = [g] + list(l) + list(m)
    parameter_vals = [9.81] + list(lengths) + list(masses)

    # define symbols for unknown parameters
    unknowns = [Dummy() for i in q + u]
    unknown_dict = dict(zip(q + u, unknowns))

    # create functions for numerical calculation
    total_energy = 0
    if include_gpe:
        total_energy += (sum(gpe)).subs(zip(parameters, parameter_vals))
    if include_ke:
        total_energy += (sum( ke)).subs(zip(parameters, parameter_vals))

    total_energy_func = sympy2torch.sympy2torch(total_energy)

    minimum_energy = total_energy_func(**fixvalue(n, torch.tensor([[0.]*2*n]))).detach()
    return lambda inp: (total_energy_func(**fixvalue(n, inp)) - minimum_energy.to(inp)).unsqueeze(1)

def fixvalue(n, value):
    keys = [f"q{i}_t" for i in range(n)] + [f"u{i}_t" for i in range(n)]
    rv = {}
    for i in range(2*n):
        if isinstance(value, list):
            rv[keys[i]] = value[i]
        else:
            rv[keys[i]] = value[:,i]
    return rv


In [258]:
N = 1
energy_func = pendulum_energy(N)
ke_func = pendulum_energy(N, include_gpe=False, include_ke=True)
gpe_func = pendulum_energy(N, include_gpe=True, include_ke=False)

# Load trajectory

In [259]:
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np
from IPython.display import HTML
from scipy.integrate import odeint


FILE = "datasets/p-physics-1.npy"
N = 1

data = np.load(FILE)
print(f"Loaded data: {data.shape}")
selected_data = data[:,9,:]

# Uncomment to use the training dataset
load = np.load("datasets/p-1-test.npz")
selected_data = load["X"] 

Loaded data: (1000, 1000, 2)


In [260]:
# # Assuming trajectory is your (1000, 2) array
# # For example: trajectory = np.random.rand(1000, 2)

# split_idx = int(len(selected_data) * 0.8)

# # Split trajectory into train and test
# trajectory_train = selected_data[:split_idx]  # First 800 points
# trajectory_test = selected_data[split_idx:]   # Last 200 points

# # Training data
# X_1step_train = trajectory_train[:-1]     # Shape: (799, 2)
# Y_1step_train = trajectory_train[1:]      # Shape: (799, 2)

# X_10step_train = trajectory_train[:-10]    # Shape: (790, 2)
# Y_10step_train = trajectory_train[10:]     # Shape: (790, 2)

# # Test data
# X_1step_test = trajectory_test[:-1]       # Shape: (199, 2)
# Y_1step_test = trajectory_test[1:]        # Shape: (199, 2)

# X_10step_test = trajectory_test[:-10]      # Shape: (190, 2)
# Y_10step_test = trajectory_test[10:]       # Shape: (190, 2)

# # Save everything in one npz file
# np.savez('p-1-train.npz', 
#          X=X_1step_train, 
#          Y=Y_1step_train)

# # Save everything in one npz file
# np.savez('p-1-train.npz', 
#          X=X_1step_test, 
#          Y=Y_1step_test)

In [261]:
# import numpy as np

# first_traj = 50
# last_traj = 100

# # Select specific trajectories and transpose to get (N_selected_traj, steps, 2)
# selected_data = data[:, first_traj:last_traj, :].transpose(1, 0, 2)  # Shape: (5, 1000, 2)
# selected_data.shape

# # Reshape selected data
# X = selected_data.reshape(-1, 2)  # Shape: (5000, 2)

# # Create Y by shifting X one step forward
# Y = np.zeros_like(X)

# # For each trajectory
# for i in range(last_traj - first_traj):
#     start_idx = i * 1000
#     end_idx = (i + 1) * 1000
    
#     # Fill Y for this trajectory
#     Y[start_idx:end_idx-1] = X[start_idx+1:end_idx]
#     Y[end_idx-1] = X[end_idx-1]

# # Calculate split point (80% training)
# split_idx = int(0.8 * X.shape[0])

# # Create training and test sets
# X_train = X[:split_idx]
# Y_train = Y[:split_idx]
# X_test = X[split_idx:]
# Y_test = Y[split_idx:]

# # Save the arrays
# np.savez('p-1-train.npz', X=X_train, Y=Y_train)
# np.savez('p-1-test.npz', X=X_test, Y=Y_test)


In [262]:
X_test.shape

(10000, 2)

## Get the estimated energy values from the model on the data

In [263]:
model_evals = []

x_plot = []
y_plot = []
for i in range(len(selected_data)):
    input = torch.FloatTensor([selected_data[i]])
    input.requires_grad = True
    x_plot.append(input[0][0].item())
    y_plot.append(input[0][1].item())
    energy_val = model.V(input).detach().item()
    model_evals.append(energy_val)

## Get energy values from the data based on the real function

In [264]:
energy     = energy_func(torch.tensor(selected_data)).numpy()
energy_ke  =     ke_func(torch.tensor(selected_data)).numpy()
energy_gpe =    gpe_func(torch.tensor(selected_data)).numpy()

true_evals = []
for i in range(len(selected_data)):
    input = torch.FloatTensor([selected_data[i]])
    input.requires_grad = True
    true_evals.append(energy[i][0])

## Compute differences between estimates and the true function

In [265]:
def top_diff_indices(true_evals, model_evals):
    # Ensure the lists are of the same length
    assert len(true_evals) == len(model_evals), "Lists must be of equal length"

    # Calculate the absolute differences and store with original indices
    differences = [(i, abs(true - model)) for i, (true, model) in enumerate(zip(true_evals, model_evals))]

    # Sort the differences in descending order
    differences.sort(key=lambda x: x[1], reverse=True)

    # Calculate how many indices make up 30%
    num_indices = int(0.8 * len(differences))

    # Return the top 30% of these indices
    return [index for index, _ in differences[:num_indices]], differences

In [266]:
diff_indices, diffs = top_diff_indices(true_evals, model_evals)
diff_indices = set(diff_indices)

In [267]:
true_evals = [true_evals[i] for i in range(len(true_evals)) if i in diff_indices]
model_evals = [model_evals[i] for i in range(len(model_evals)) if i in diff_indices]
x_plot = [x_plot[i] for i in range(len(x_plot)) if i in diff_indices]
y_plot = [y_plot[i] for i in range(len(y_plot)) if i in diff_indices]

## Energy plots

In [269]:
import plotly.graph_objects as go
import numpy as np

# Assuming your x_plot, y_plot, true_evals, and model_evals are lists or numpy arrays of 199 elements each
x_plot = np.array(x_plot)
y_plot = np.array(y_plot)
true_evals = np.array(true_evals)
model_evals = np.array(model_evals)

fig = go.Figure(data=[
    go.Scatter3d(x=x_plot, y=y_plot, z=true_evals, mode='markers', name='True Values', 
                 marker=dict(color='blue', size=5)),
    go.Scatter3d(x=x_plot, y=y_plot, z=model_evals, mode='markers', name='Model Predictions', 
                 marker=dict(color='red', size=5))
])

fig.update_layout(
    title='Comparison of True Values and Model Predictions',
    autosize=False,
    width=700,
    height=700,
    margin=dict(l=65, r=50, b=65, t=90),
    scene=dict(
        xaxis_title='X',
        yaxis_title='Y',
        zaxis_title='V(x)',
        aspectmode='cube'
    ),
    legend=dict(
        yanchor="top",
        y=0.99,
        xanchor="left",
        x=0.01
    )
)

fig.show()