In [None]:
! pip install scipy -q
! pip install tensorflow==2.12.0 -q
! pip install keras==2.12.0 -q
! pip install pydantic -q
! pip install matplotlib -q
! pip install pandas -q
! pip install seaborn -q

In [40]:
import tqdm
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import tensorflow as tf
from scipy.integrate import odeint, solve_ivp

## Data Generation


In [27]:
class DoublePendulum:
    def __init__(self, m1=None, L1=None, m2=None, L2=None, g=None, initial_conditions=None, T=None, N_t=None):
        self.m1, self.L1, self.m2, self.L2, self.g = m1, L1, m2, L2, g
        self.initial_conditions, self.T, self.N_t = initial_conditions, T, N_t

    def energy_kinetic(self, theta1, theta1_dot, theta2, theta2_dot):
        return 0.5 * self.m1 * self.L1**2 * theta1_dot**2 + 0.5 * self.m2 * (
            self.L1**2 * theta1_dot**2 + self.L2**2 * theta2_dot**2 +
            2 * self.L1 * self.L2 * theta1_dot * theta2_dot * np.cos(theta1 - theta2))

    def energy_potential(self, theta1, theta2):
        return (
            self.m1 * (self.L1 * (1.0 - np.cos(theta1)) + self.L2) +
            self.m2 * (self.L1 * (1.0 - np.cos(theta1)) + self.L2 * (1.0 - np.cos(theta2)))
        ) * self.g

    def equation_motion(self, u, t):
        c, s = np.cos(u[0] - u[2]), np.sin(u[0] - u[2])
        return [
            u[1],
            (self.m2 * self.g * np.sin(u[2]) * c - self.m2 * s * (self.L1 * c * u[1] ** 2 + self.L2 * u[3] ** 2) -
             (self.m1 + self.m2) * self.g * np.sin(u[0])) / (self.L1 * (self.m1 + self.m2 * s**2)),
            u[3],
            ((self.m1 + self.m2) * (self.L1 * u[1] ** 2 * s - self.g * np.sin(u[2]) +
             self.g * np.sin(u[0]) * c) + self.m2 * self.L2 * u[3] ** 2 * s * c) /
            (self.L2 * (self.m1 + self.m2 * s**2))
        ]

    def gen_sol_df(self):
        sol = odeint(func=self.equation_motion, y0=self.initial_conditions, t=np.linspace(0, self.T, self.N_t))
        
        df = pd.DataFrame(sol, columns=["theta1", "theta1_dot", "theta2", "theta2_dot"])
        df["x1"] = np.sin(df["theta1"]) * self.L1
        df["y1"] = -np.cos(df["theta1"]) * self.L1
        df["x2"] = df["x1"] + np.sin(df["theta2"]) * self.L2
        df["y2"] = df["y1"] - np.cos(df["theta2"]) * self.L2
        df["time_step"] = np.linspace(0, self.T, self.N_t)
        df["energy_kinetic"] = self.energy_kinetic(df["theta1"], df["theta1_dot"], df["theta2"], df["theta2_dot"])
        df["energy_potential"] = self.energy_potential(df["theta1"], df["theta2"])
        df["energy_total"] = df["energy_kinetic"] + df["energy_potential"]
        return df

    def create_generalized_coord_momenta(self, df):
        theta1, theta2, theta1_dot, theta2_dot = df["theta1"], df["theta2"], df["theta1_dot"], df["theta2_dot"]
        c, s = np.cos(theta1 - theta2), np.sin(theta1 - theta2)
        df["q1"], df["q2"] = theta1, theta2
        df["p1"] = (self.m1 + self.m2) * self.L1**2 * theta1_dot + self.m2 * self.L1 * self.L2 * theta2_dot * c
        df["p2"] = self.m2 * self.L2**2 * theta2_dot + self.m2 * self.L1 * self.L2 * theta1_dot * c
        p1, p2 = df["p1"], df["p2"]
        h1 = p1 * p2 * s / (self.L1 * self.L2 * (self.m1 + self.m2 * s**2))
        h2 = (self.m2 * self.L2**2 * p1**2 + (self.m1 + self.m2) * self.L1**2 * p2**2 -
              2 * self.m2 * self.L1 * self.L2 * p1 * p2 * c) / (2 * (self.L1 * self.L2 * (self.m1 + self.m2 * s**2))**2)
        df["dq1dt"] = (self.L2 * p1 - self.L1 * p2 * c) / (self.L1**2 * self.L2 * (self.m1 + self.m2 * s**2))
        df["dq2dt"] = (-self.m2 * self.L2 * p1 * c + (self.m1 + self.m2) * self.L1 * p2) / \
                      (self.m2 * self.L1 * self.L2**2 * (self.m1 + self.m2 * s**2))
        df["dp1dt"] = -(self.m1 + self.m2) * self.g * self.L1 * np.sin(theta1) - h1 + h2 * np.sin(2 * (theta1 - theta2))
        df["dp2dt"] = -self.m2 * self.g * self.L2 * np.sin(theta2) + h1 - h2 * np.sin(2 * (theta1 - theta2))
        return df


In [28]:
config = {
    "m1": 1,
    "L1": 1,  
    "m2": 1,  
    "L2": 1,  
    "g": 9.8,  # gravitational acceleration
    "initial_conditions": [-20, 0, 35, 0],
    "T": 200,   # time span
    "N_t": 2001  # number of grid for time span
}

X_vars = ["q1", "q2", "p1", "p2"]
y_vars = ["dq1dt", "dq2dt", "dp1dt", "dp2dt"]
rtol = 1e-6
p = DoublePendulum(**config)
df = p.gen_sol_df()
df = p.create_generalized_coord_momenta(df)
mlg = [p.m1, p.L1, p.m2, p.L2, p.g]

In [29]:
df.head()

Unnamed: 0,theta1,theta1_dot,theta2,theta2_dot,x1,y1,x2,y2,time_step,energy_kinetic,energy_potential,energy_total,q1,q2,p1,p2,dq1dt,dq2dt,dp1dt,dp2dt
0,-20.0,0.0,35.0,0.0,-0.912945,-0.408082,-1.341128,0.49561,0.0,0.0,40.057775,40.057775,-20.0,35.0,0.0,0.0,0.0,0.0,17.893727,4.19619
1,-19.955676,0.882606,35.020983,0.439254,-0.893967,-0.448133,-1.341016,0.446376,0.1,0.874995,39.18278,40.057775,-19.955676,35.020983,1.764679,0.438184,0.882606,0.439254,17.134061,4.76877
2,-19.825217,1.711499,35.095412,1.103267,-0.828073,-0.560621,-1.3404,0.298169,0.2,3.429801,36.627974,40.057775,-19.825217,35.095412,3.35988,1.00535,1.711499,1.103267,14.345075,6.905958
3,-19.618611,2.378301,35.2545,2.147032,-0.695457,-0.718568,-1.337362,0.048216,0.3,7.427228,32.630548,40.057776,-19.618611,35.2545,4.532089,1.898336,2.378301,2.147032,8.552658,11.368967
4,-19.363187,2.629912,35.536829,3.541836,-0.491343,-0.870966,-1.321456,-0.31337,0.4,12.464273,27.593503,40.057776,-19.363187,35.536829,4.984353,3.337291,2.629912,3.541836,0.343828,17.421606


# Double Pendlum


### Train


In [30]:
def get_params(df, X_vars, train_ratio= 0.8):
    """Get parameters for training."""
    N_train = int(df.shape[0] * train_ratio)
    t_eval = df["time_step"].iloc[N_train:].values
    t_span = [t_eval[0], t_eval[-1]]
    y0 = df[X_vars].iloc[N_train].values
    return N_train, t_eval, t_span, y0

N_train, t_eval, t_span, y0 = get_params(df=df, X_vars=X_vars)
X_train = df[X_vars].iloc[:N_train].to_numpy()
y_train = df[y_vars].iloc[:N_train].to_numpy()

In [31]:
class HNN(tf.keras.Model):
    """Hamiltonian Neural Networks."""

    def __init__(self, input_dim, hidden_dim):
        super().__init__()
        
        self.feature_extractor = tf.keras.Sequential(
            [
            tf.keras.Input(shape=(input_dim,)),
             tf.keras.layers.Dense(hidden_dim, activation="tanh"),
             tf.keras.layers.Dense(hidden_dim, activation="tanh"),
             tf.keras.layers.Dense(hidden_dim, activation="tanh"),
             tf.keras.layers.Dense(hidden_dim, activation="tanh"),
            ]
        )
        
        self.last_layer = tf.keras.layers.Dense(1)
        self.input_dim = input_dim
        self.matrix = tf.constant(np.eye(self.input_dim)[::-1], dtype=tf.float64)
        
    def call(self, x):
        """Call."""
        features = self.feature_extractor(x)
        outputs = self.last_layer(features)
        return outputs

    def forward(self, x):
        """Forward."""
        with tf.GradientTape() as tape:
            features = self.feature_extractor(x)
            outputs = self.last_layer(features)
        return tape.gradient(outputs, x) @ self.matrix

In [53]:
def calculate_double_pendulum_hamiltonian(mlg, x):
    m1, L1, m2, L2, g = mlg
    q1, q2, p1, p2 = x
    sin_q1q2 = np.sin(q1 - q2)
    cos_q1q2 = np.cos(q1 - q2)
    kinetic_term = (m2 * L2**2 * p1**2 + (m1 + m2) * L1**2 * p2**2 - 2 * m2 * L1 * L2 * p1 * p2 * cos_q1q2) / (
        2 * m2 * L1**2 * L2**2 * (m1 + m2 * sin_q1q2**2)
    )
    potential_term = -(m1 + m2) * g * L1 * np.cos(q1) - m2 * g * L2 * np.cos(q2)
    return kinetic_term + potential_term


def get_loss(model, x, y, ham0, mlg, penalty_lamb):
    predictions = model.forward(tf.Variable(tf.stack(x)))
    ham_new = calculate_double_pendulum_hamiltonian(mlg, x.T)
    physics_embedded_penalty = tf.reduce_mean(tf.square(ham0 - ham_new))
    return tf.reduce_mean(tf.square(predictions - tf.Variable(tf.stack(y))))+ penalty_lamb * physics_embedded_penalty
    
    
def get_grad(model, optimizer, x, y, ham0, mlg, penalty_lamb):
    with tf.GradientTape() as tape:
        loss = get_loss(model, x, y, ham0, mlg, penalty_lamb)
    gradients = tape.gradient(loss, model.trainable_variables, unconnected_gradients=tf.UnconnectedGradients.ZERO)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))
    return loss, optimizer

def train_hnn(x, y, config):
    mlg = config["mlg"]
    ham0 = calculate_double_pendulum_hamiltonian(mlg, x[0].T)
    model = HNN(input_dim=x.shape[1], hidden_dim=config["hidden_dim"])
    optimizer = tf.keras.optimizers.Adam(learning_rate=config["learning_rate"])

    losses = []
    with tqdm.tqdm(total=range(1, config["epochs"])) as pbar:
        for itr in epochs:
            loss, optimizer = get_grad(model, optimizer, x, y, ham0, mlg, config["penalty_lamb"])
            losses.append(loss.numpy())
            pbar.set_postfix({'Loss': loss.numpy()})
            pbar.update(1)

    plt.plot(epochs, losses)
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.title('Training Loss')
    plt.show()
    return model

In [52]:
config = {
    "mlg":mlg,
    "learning_rate":0.01,
    "hidden_dim":400,
    "epochs":2000,
    "penalty_lamb":0.2
}

hnn = train_hnn(X_train, y_train, config)

TypeError: unsupported operand type(s) for +: 'range' and 'float'

In [49]:
def integratation(model, t_span, y0, t_eval=t_eval, rtol=rtol):
    def fun(t, y):
        y = tf.Variable(tf.reshape(y, (1, len(y0))), dtype="double")
        dx = model.forward(y)
        return dx
    return solve_ivp(fun=fun, t_span=t_span, y0=y0, t_eval=t_eval, rtol=rtol)


hnn_pred = integratation(model=hnn, t_span=t_span, y0=y0, t_eval=t_eval, rtol=rtol)
y_predicted = hnn_pred["y"].T
y_actual = df.iloc[N_train:][X_vars].values

AttributeError: 'NoneType' object has no attribute 'forward'

In [39]:
def rollout_error(predicted, actual):
    """
    Calculate the Rollout Error between the predicted and actual trajectories.

    Args:
    predicted (np.ndarray): Predicted trajectory with shape (time_steps, 4).
    actual (np.ndarray): Actual trajectory with shape (time_steps, 4).

    Returns:
    float: Rollout Error.
    """
    error = np.linalg.norm(predicted - actual, axis=1)
    norm_sum = np.linalg.norm(predicted, axis=1) + np.linalg.norm(actual, axis=1)
    return np.mean(error / norm_sum)


re_hnn = rollout_error(y_predicted, y_actual)
print(f'Rollout Error for HNN: {re_hnn}')

Rollout Error for HNN: 0.533142859309086
