Items for ablation study:

Shrink the model (Change ```dims``` of ```ODEFunc```)

Train for fewer timesteps (Edit ```num_epochs``` variable)

Add varied levels of noise (Add noise after grabbing the batch)

Use fewer training trajectories (???)

# Imports + Necessary Functions

In [None]:
!pip install torchdiffeq

In [None]:
import sys
sys.path.append(sys.path[0]+"\\\\..")  # assuming the first element of sys.path is the path to the scripts folder, this allows imports from within double-pendulum

import numpy as np
import torch
import torch.nn as nn
from torchdiffeq import odeint
from torch.utils.data.dataloader import DataLoader
import argparse
from tqdm import tqdm
from scipy.integrate import solve_ivp
from torch.utils.data.dataloader import Dataset
import matplotlib.pyplot as plt

In [None]:
class FeedForward(nn.Module):
    #So bog basic it should tap for black mana
    #DT That one's for you
    def __init__(self):
        super().__init__()
        self.linear1 = nn.Linear(8, 30)
        self.linear2 = nn.Linear(30,30)
        self.linear3 = nn.Linear(30,30)
        self.linear4 = nn.Linear(30,4)
        self.relu = nn.ReLU()
    def forward(self, x):
        x = self.relu(self.linear1(x))
        x = self.relu(self.linear2(x))
        x = self.relu(self.linear3(x))
        x = self.linear4(x)
        x = torch.clamp(x, -1e6, 1e6)
        return x


class Pendulum_Data(Dataset):
    def __init__(self, min_length=1, max_length=1, G=9.81, delta_t=0.005, size=2**15):
        self.__dict__.update(locals())

    @staticmethod
    def derivatives(t, state, params, G=9.81):
        L1, L2, m1, m2 = params
        theta1, z1, theta2, z2 = state
        delta = theta2 - theta1

        denominator1 = (m1 + m2) * L1 - m2 * L1 * np.cos(delta) ** 2
        denominator2 = (L2 / L1) * denominator1

        dtheta1_dt = z1
        dz1_dt = (
                (m2 * L1 * z1 ** 2 * np.sin(delta) * np.cos(delta)
                 + m2 * G * np.sin(theta2) * np.cos(delta)
                 + m2 * L2 * z2 ** 2 * np.sin(delta)
                 - (m1 + m2) * G * np.sin(theta1))
                / denominator1
        )
        dtheta2_dt = z2
        dz2_dt = (
                (-m2 * L2 * z2 ** 2 * np.sin(delta) * np.cos(delta)
                 + (m1 + m2) * G * np.sin(theta1) * np.cos(delta)
                 - (m1 + m2) * L1 * z1 ** 2 * np.sin(delta)
                 - (m1 + m2) * G * np.sin(theta2))
                / denominator2
        )

        return np.array([dtheta1_dt, dz1_dt, dtheta2_dt, dz2_dt])

    def run_simulation(self, theta1_init, theta2_init, l1, l2, m1, m2, v1, v2, t_eval):
        state_0 = [theta1_init, v1, theta2_init, v2]
        # Solve the system
        params = [[l1, l2, m1, m2]]
        solution = solve_ivp(
            Pendulum_Data.derivatives, (0, t_eval[-1]), state_0, t_eval=t_eval, args=params,
        )
        # Return data as dictionary
        return np.array(
            [[l1] * len(t_eval), [l2] * len(t_eval), [m1] * len(t_eval), [m2] * len(t_eval), *solution.y, t_eval]).T

    def __getitem__(self, i):
        t_final = (np.random.randint(self.min_length, self.max_length + 1)+1) * self.delta_t  # the extra +1 makes it so the arange has the right number of steps
        theta1_init, theta2_init = np.random.uniform(-np.pi, np.pi, 2)
        l1, l2 = np.clip(np.random.normal(1, .5, 2), 0.1, 3)
        m1, m2 = np.clip(np.random.normal(1, .5, 2), 0.1, 3)
        v1, v2 = np.random.normal(size=2)
        theta =  self.run_simulation(theta1_init=theta1_init, theta2_init=theta2_init, l1=l1, l2=l2, m1=m1, m2=m2, v1=v1,
                                   v2=v2, t_eval=np.arange(0, t_final, self.delta_t))
        return theta[:-1], theta[1:]

    def __len__(self):
        return self.size

In [None]:
class ODEFunc(nn.Module):
    def __init__(self, dims):
        super(ODEFunc, self).__init__()
        self.num_calls = 0
        self.net = nn.Sequential(
            nn.Linear(5, dims[0]),  # state: (theta1, w1, theta2, w2, t)
            nn.Tanh(),
        )
        if len(dims) > 1:
            for i in range(1,len(dims)):
                self.net.append(nn.Linear(dims[i-1], dims[i]))
                self.net.append(nn.Tanh())
        self.net.append(nn.Linear(dims[-1], 5))

    def forward(self, t, y):
        # y has shape (batch_size, 4)
        self.num_calls += 1
        return self.net(y)

# Training

In [None]:
def train_model(num_epochs, dims, std_dev):
    min_steps = 1
    max_steps = 1
    G = 9.81
    delta_t = 0.005
    lr = 1e-3
    weight_decay = 1e-4
    batch_size = 1
    lam = 0.5
    save_every = 100
    val_every = 25
    val_size = 100

    # Set model, optimizer and loss
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    model = ODEFunc()
    criterion = nn.MSELoss()
    model = model.to(device=device)
    optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)
    losses = []

    # Training
    data = Pendulum_Data(min_steps, max_steps, G, delta_t, batch_size)
    dl = DataLoader(data, batch_size=batch_size)
    train_losses = []
    val_losses = []

    for i in tqdm(range(num_epochs)):
        train_losses_local = []
        # each batch is as long as the dataset, so an epoch is one batch
        for X_batch, y_batch in dl:
            X_batch += np.random.normal(0, std_dev, X_batch.shape)
            X_batch, y_batch = X_batch.to(torch.float), y_batch.to(torch.float)
            optimizer.zero_grad()

            # get prediction and make a tuple to pass to the loss
            y_pred = odeint(model, X_batch[:, :, 4:], delta_t * torch.arange(len(X_batch)+1))[1:]
            y_batch = y_batch[:, :, 4:]  # remove mass/length information from outputs
            loss_args = (y_pred.squeeze(), y_batch.squeeze())

            # calculate loss and backprop
            loss = criterion(*loss_args)
            loss.backward()
            train_losses_local.append(loss.item())
            optimizer.step()
        train_losses.append(np.mean(train_losses_local))

        if (i + 1) % val_every == 0:
            model.eval()
            val_losses_local = []

            # run val_size batches to test validation error
            for j in range(val_size):
                for X_batch, y_batch in dl:
                    X_batch, y_batch = X_batch.to(torch.float), y_batch.to(torch.float)
                    optimizer.zero_grad()

                    # get prediction
                    y_pred = None
                    loss_args = None
                    if model == 'PINN':
                        # the last element of X_batch is time, and should not be included
                        y_pred = model(X_batch[:, :, :-1])
                        loss_args = (y_pred, y_batch[:, :, 4:-1], X_batch[:, :, 4:-1])
                    else:
                        y_pred = odeint(model, X_batch[:, :, 4:], delta_t * torch.arange(len(X_batch) + 1))[1:]
                        y_batch = y_batch[:, :, 4:]  # remove mass/length information from outputs
                        loss_args = (y_pred.squeeze(), y_batch.squeeze())

                    # calculate loss, then average
                    loss = criterion(*loss_args)
                    val_losses_local.append(loss.item())
            val_losses.append(np.mean(val_losses_local))
            model.train()

        # if (i + 1) % save_every == 0:
            # torch.save(model.state_dict(), f'{path}_{i}.pt')
            # torch.save(train_losses, f'{path}_{i}_train_losses.pt')
            # torch.save(val_losses, f'{path}_{i}_val_losses.pt')
    return train_losses, val_losses

In [2]:
epochs = [200, 500, 1000]
dim_sets = [[64,128,64],[64,64],[32]]
noise = [0, 0.05, 0.5]

for dims in dim_sets:
    print("ODEFunc dimensions:", dims)
    for std_dev in noise:
        print("Noise variance:", std_dev)
        for num_epochs in epochs:
            print("Epochs:", num_epochs)
            train_losses, val_losses = train_model(num_epochs, dims, std_dev)
            plt.subplot(121)
            plt.plot(train_losses)
            plt.title("Training Loss")
            plt.subplot(122)
            plt.plot(val_losses)
            plt.title("Validation Loss")
            plt.tight_layout()
            plt.show()
            print('\n')

ODEFunc dimensions: [64, 128, 64]
Noise variance: 0
Epochs: 200


NameError: name 'train_model' is not defined