In [None]:
import sys
sys.path.append("..")
from numpy import sin, cos
import numpy as np
import matplotlib.pyplot as plt
import scipy.integrate as integrate
import matplotlib.animation as animation
import random
import tqdm
import pickle
import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
import math
import tqdm
import timeit
import random
from torch.utils.data import Subset, DataLoader, Dataset
from models import *
#from torch.utils.tensorboard import SummaryWriter

## Define Pendulum Parameters and Equation 

In [None]:
G = 9.8  # acceleration due to gravity, in m/s^2
L1 = 1.0  # length of pendulum 1 in m
L2 = 1.0  # length of pendulum 2 in m
M1 = 1.0  # mass of pendulum 1 in kg
M2 = 1.0  # mass of pendulum 2 in kg

In [None]:
#Derivatives for the double pendulum
def derivs(state, t):

    dydx = np.zeros_like(state)
    dydx[0] = state[1]

    del_ = state[2] - state[0]
    den1 = (M1 + M2)*L1 - M2*L1*cos(del_)*cos(del_)
    dydx[1] = (M2*L1*state[1]*state[1]*sin(del_)*cos(del_) +
               M2*G*sin(state[2])*cos(del_) +
               M2*L2*state[3]*state[3]*sin(del_) -
               (M1 + M2)*G*sin(state[0]))/den1

    dydx[2] = state[3]

    den2 = (L2/L1)*den1
    dydx[3] = (-M2*L2*state[3]*state[3]*sin(del_)*cos(del_) +
               (M1 + M2)*G*sin(state[0])*cos(del_) -
               (M1 + M2)*L1*state[1]*state[1]*sin(del_) -
               (M1 + M2)*G*sin(state[2]))/den2

    return dydx

## Create Trajectories

In [None]:
Y = []

for i in tqdm.tqdm(range(100)):
    #Time to integrate solution
    dt = 0.1
    t = np.arange(0.0, 3, dt)

    #Initial values
    th1 = np.random.uniform(-90,90)
    w1 = np.random.uniform(-90,90)
    th2 = np.random.uniform(-90,90)
    w2 = np.random.uniform(-90,90)

    #Integrate
    # initial state
    state = np.radians([th1, w1, th2, w2])

    # integrate your ODE using scipy.integrate.
    y = integrate.odeint(derivs, state, t)
    Y.append(y)

#     #Get euclidean values from solution
#     x1 = L1*sin(y[:, 0])
#     y1 = -L1*cos(y[:, 0])

#     x2 = L2*sin(y[:, 2]) + x1
#     y2 = -L2*cos(y[:, 2]) + y1

In [None]:
with open("90angle_90veloc_1k_3s.data", "wb") as f:
    pickle.dump(Y, f)

## Load Trajectories and create Dataset

In [None]:
with open("90angle_90veloc_1k_3s.data", "rb") as f:
    Y = pickle.load(f)

In [None]:
class PendulumDataset(Dataset):

    def __init__(self, dataset_list):
        self.label_array = dataset_list
    def __getitem__(self, idx):
        return self.label_array[idx]
    def __len__(self):
        return len(self.label_array)

In [None]:
dataset = PendulumDataset(Y)

In [None]:
trainset, testset = torch.utils.data.random_split(dataset, [90, 10])

In [None]:
dataloader = DataLoader(trainset, batch_size=8, shuffle=False)
dataloader_test = DataLoader(testset, batch_size=8, shuffle=False)

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
device

In [None]:
##Train Steps to work with Dataloader

def train_step(y, hidden_size, net):
    """
    x: List of float values of the time series
    y: List of float values of the target time series
    hidden_size: hidden size of the recurrent layers
    """
    net.zero_grad()
    loss = 0
    top_t0 = torch.ones(1, hidden_size).to(device)
    bottom_t0 = torch.ones(1, hidden_size).to(device)
    inp = torch.ones(1,4).to(device)

    
    for i in range(len(y[0])-1):
        #out, bottom_t0, top_t0 = net(out_d, bottom_t0, top_t0)
        if i==0:
            out, bottom_t0, top_t0 = net(y[:,i,:].float().to(device), bottom_t0.to(device), top_t0.to(device))
        else:
            out, bottom_t0, top_t0 = net(inp, bottom_t0, top_t0)
            #out, bottom_t0, top_t0 = net(y[:,i,:].float(), bottom_t0, top_t0)


        l = criterion(out, torch.tensor(y[:,i+1,:]).float())
        loss += l
    loss.backward()
    optimizer.step()
    return out, loss.item()

def val_step(y, hidden_size, net):
    """
    x: List of float values of the time series
    y: List of float values of the target time series
    hidden_size: hidden size of the recurrent layers
    """
    net.zero_grad()
    top_t0 = torch.ones(1, hidden_size).to(device)
    bottom_t0 = torch.ones(1, hidden_size).to(device)
    inp = torch.ones(1,4).to(device)


    loss = 0
    Outs = []
    for i in range(len(y[0])-1):
        if i==0:
            out, bottom_t0, top_t0 = net(y[:,i,:].float(), bottom_t0, top_t0)
        else:
            out, bottom_t0, top_t0 = net(inp, bottom_t0, top_t0)
            #out, bottom_t0, top_t0 = net(y[:,i,:].float(), bottom_t0, top_t0)


        l = criterion(out, torch.tensor(y[:, i+1, :]).float())
        loss += l
        Outs.append(out.cpu().detach().numpy())
    return Outs, loss.item()


def train_step_ARNN(y, hidden_size, net):
    """
    x: List of float values of the time series
    y: List of float values of the target time series
    hidden_size: hidden size of the recurrent layers
    """
    net.zero_grad()
    loss = 0
    top_t0 = torch.ones(1, hidden_size).to(device)
    bottom_t0 = torch.ones(1, hidden_size).to(device)
    inp = torch.ones(1,4).to(device)

    
    for i in range(len(y[0])-1):
        #out, bottom_t0, top_t0 = net(out_d, bottom_t0, top_t0)
        if i==0:
            out, bottom_t0= net(y[:,i,:].float().to(device), bottom_t0.to(device))
        else:
            out, bottom_t0= net(inp, bottom_t0)
            #out, bottom_t0, top_t0 = net(y[:,i,:].float(), bottom_t0, top_t0)


        l = criterion(out, torch.tensor(y[:,i+1,:]).float())
        loss += l
    loss.backward()
    optimizer.step()
    return out, loss.item()

def val_step_ARNN(y, hidden_size, net):
    """
    x: List of float values of the time series
    y: List of float values of the target time series
    hidden_size: hidden size of the recurrent layers
    """
    net.zero_grad()
    top_t0 = torch.ones(1, hidden_size).to(device)
    bottom_t0 = torch.ones(1, hidden_size).to(device)
    inp = torch.ones(1,4).to(device)


    loss = 0
    Outs = []
    for i in range(len(y[0])-1):
        if i==0:
            out, bottom_t0= net(y[:,i,:].float(), bottom_t0)
        else:
            out, bottom_t0 = net(inp, bottom_t0)
            #out, bottom_t0, top_t0 = net(y[:,i,:].float(), bottom_t0, top_t0)


        l = criterion(out, torch.tensor(y[:, i+1, :]).float())
        loss += l
        Outs.append(out.cpu().detach().numpy())
    return Outs, loss.item()

def test_step(y, hidden_size, net, leng):
    """
    y: Initial values for the pendulum 
    hidden_size: hidden size of the recurrent layers
    leng: How long to predict(in 0.1s unit)
    """
    net.zero_grad()
    top_t0 = torch.ones(1, hidden_size)
    bottom_t0 = torch.ones(1, hidden_size)
    inp = torch.ones(1,4)


    loss = 0
    Outs = []
    for i in range(leng):
        if i==0:
            out, bottom_t0, top_t0 = net(y, bottom_t0, top_t0)
        else:
            out, bottom_t0, top_t0 = net(inp, bottom_t0, top_t0)
            #out, bottom_t0, top_t0 = net(y[:,i,:].float(), bottom_t0, top_t0)


        Outs.append(out.detach().numpy())
    return Outs

## Training for FRNN 

In [None]:
#Train Procedure

hidden_size = 100
#net = FRNN_AS_SC(4, hidden_size, 4, device, 0.1, 0.15)
net = FRNN_SC(4, hidden_size, 4, device, 0.1, 0.15)
#net = TLRNN_AS_SC(4, hidden_size, device, 4, 0.1, 0.15)


net.to(device)


criterion = torch.nn.MSELoss(reduction='sum')
optimizer = torch.optim.Adam(net.parameters(), lr=1e-3)
epochs = 1000
Losses = []
Val_Losses = []


for e in (range(epochs)):
    epoch_loss = 0
    epoch_val_loss = 0
    for idx, batch in enumerate(dataloader):
        batch = batch.to(device)
        #Train step
        out, loss = train_step(batch, hidden_size, net)
        epoch_loss += loss
        
    for idx, batch in enumerate(dataloader_test):
        batch = batch.to(device)
        #Train step
        out, loss = val_step(batch, hidden_size, net)
        epoch_val_loss += loss
    
    Losses.append(epoch_loss/len(dataloader))
    Val_Losses.append(epoch_val_loss/len(dataloader_test))

    print("Loss: ", epoch_loss/len(dataloader))
    print("Val Loss: ", epoch_val_loss/len(dataloader_test))


    
#Visualisation
O, Y = val_step(batch, hidden_size, net)

In [None]:
Losses_Frnn = Losses
ValLosses_Frnn = Val_Losses

In [None]:
plt.xlabel("Epochs")
plt.ylabel("Train_Loss")
plt.plot(Losses)
plt.savefig("Train_Loss_Frnn.png")

In [None]:
np.amin(Losses)

In [None]:
plt.xlabel("Epochs")
plt.ylabel("Val_Loss")
plt.plot(Val_Losses[:])
plt.savefig("Val_Loss_Frnn.png")

In [None]:
np.amin(Val_Losses)

In [None]:
np.argmin(Val_Losses)