In [1]:
import numpy as np
import random
from scipy.io import loadmat
from scipy.special import ellipj, ellipk

import torch
from torch.utils.data import DataLoader, Dataset

import matplotlib.pyplot as plt

In [2]:
noise = 0.03
theta = 2.4

In [3]:
def pendulum(noise, theta=2.4):
    
    np.random.seed(1)

    def sol(t,theta0):
        S = np.sin(0.5*(theta0) )
        K_S = ellipk(S**2)
        omega_0 = np.sqrt(9.81)
        sn,cn,dn,ph = ellipj( K_S - omega_0*t, S**2 )
        theta = 2.0*np.arcsin( S*sn )
        d_sn_du = cn*dn
        d_sn_dt = -omega_0 * d_sn_du
        d_theta_dt = 2.0*S*d_sn_dt / np.sqrt(1.0-(S*sn)**2)
        return np.stack([theta, d_theta_dt],axis=1)
    
    
    anal_ts = np.arange(0, 2200*0.1, 0.1)
    X = sol(anal_ts, theta)
    
    X = X.T
    Xclean = X.copy()
    X += np.random.standard_normal(X.shape) * noise
    
    
    # Rotate to high-dimensional space
    Q = np.random.standard_normal((64,2))
    Q,_ = np.linalg.qr(Q)
    
    X = X.T.dot(Q.T) # rotate
    Xclean = Xclean.T.dot(Q.T)
    
    # scale 
    X = 2 * (X - np.min(X)) / np.ptp(X) - 1
    Xclean = 2 * (Xclean - np.min(Xclean)) / np.ptp(Xclean) - 1

    
    # split into train and test set 
    X_train = X[0:600]   
    X_test = X[600:]

    X_train_clean = Xclean[0:600]   
    X_test_clean = Xclean[600:]     
    
    #******************************************************************************
    # Return train and test set
    #******************************************************************************
    return X_train, X_test, X_train_clean, X_test_clean, 64, 1

In [4]:
import torch
import torch.nn as nn
import pandas as pd
import numpy as np
from torch.utils.data import DataLoader, TensorDataset

class LSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size, steps):
        super(LSTM, self).__init__()
        self.steps = steps
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x, mode = "forward"):
        # h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        # c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
      
    
        q= x
        output_list= []
        output_back_list = []
        if mode == 'forward':
            h = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
            c = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
            for _ in range(self.steps):
                # print("q",q.shape)
                out, _ = self.lstm(q,(h,c))
                out = self.fc(out)
                q =out
                output_list.append(q)
            output_list.append(x) 
        
        return output_list,output_back_list

In [5]:
X_train, X_test, Xtrain_clean, Xtest_clean, m, n = pendulum(noise=noise, theta=theta)


X_train = X_train.reshape( X_train.shape[0], 1, X_train.shape[1])
X_train = torch.from_numpy(X_train).float().contiguous()

print (X_train.shape)
trainDat = []
start = 0
steps = 6

for i in np.arange(steps,-1, -1):
    if i == 0:
        trainDat.append(X_train[start:])
    else:
        trainDat.append(X_train[start:-i])
    start += 1

train_data = torch.utils.data.TensorDataset(*trainDat)
# train_data = torch.tensor(trainDat)
del(trainDat)

train_loader = DataLoader(dataset = train_data,batch_size = 64,shuffle = True)

torch.Size([600, 1, 64])


In [6]:

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Define the model
model = LSTM(input_size=m, hidden_size=50, num_layers=2, output_size=m, steps = steps).cuda()

# Define the loss function and the optimizer
criterion = nn.MSELoss()  # or another appropriate loss function
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)  # adjust learning rate as needed

num_epochs = 500
# Training loop
for epoch in range(num_epochs):
    model.train()
    loss_epoch =0
    for i, data_list in enumerate(train_loader):
        
        # Forward pass
        out, out_back= model(data_list[0].to(device))
        for k in range(steps):
            # print(k,out[k].shape,data_list[k+1].shape)
            if k == 0:
                loss_fwd = criterion(out[k], data_list[k+1].to(device))
            else:
                loss_fwd += criterion(out[k], data_list[k+1].to(device))
        

        # Backward and optimize
        optimizer.zero_grad()
        loss_fwd.backward()
        optimizer.step()
        loss_epoch+=loss_fwd.item()

    if (epoch+1) % 2 == 0:
        print(f'Epoch [{epoch+1}/{num_epochs}], Step [{i+1}/{len(train_loader)}], Loss: {loss_epoch}')

#     # Validation loop
#     model.eval()
#     with torch.no_grad():
#         total = 0
#         correct = 0
#         for images, labels in val_loader:
#             images = images.to(device)
#             labels = labels.to(device)
#             outputs = model(images)
#             _, predicted = torch.max(outputs.data, 1)
#             total += labels.size(0)
#             correct += (predicted == labels).sum().item()
        
#         print(f'Validation Accuracy of the model on the test images: {100 * correct / total}%')


Epoch [2/500], Step [10/10], Loss: 3.8542795181274414
Epoch [4/500], Step [10/10], Loss: 3.5896878242492676
Epoch [6/500], Step [10/10], Loss: 2.2423147708177567
Epoch [8/500], Step [10/10], Loss: 1.4777722209692001
Epoch [10/500], Step [10/10], Loss: 1.3206848502159119
Epoch [12/500], Step [10/10], Loss: 1.2234707549214363
Epoch [14/500], Step [10/10], Loss: 1.161753587424755
Epoch [16/500], Step [10/10], Loss: 1.0547170042991638
Epoch [18/500], Step [10/10], Loss: 0.8735829889774323
Epoch [20/500], Step [10/10], Loss: 0.27444878220558167
Epoch [22/500], Step [10/10], Loss: 0.08601407334208488
Epoch [24/500], Step [10/10], Loss: 0.031617759028449655
Epoch [26/500], Step [10/10], Loss: 0.014394201920367777
Epoch [28/500], Step [10/10], Loss: 0.008381827967241406
Epoch [30/500], Step [10/10], Loss: 0.005927443620748818
Epoch [32/500], Step [10/10], Loss: 0.005023412843002006
Epoch [34/500], Step [10/10], Loss: 0.004448940570000559
Epoch [36/500], Step [10/10], Loss: 0.004040821484522894

In [7]:





# Define Loss, Optimizer
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Training Loop
for epoch in range(100): # you can adjust this
    for inputs, targets in train_loader:
        inputs, targets = inputs.cuda(), targets.cuda()

        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, targets)

        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    print ('Epoch [{}/{}], Loss: {:.4f}'.format(epoch+1, 100, loss.item()))

# Testing Loop
model.eval()
with torch.no_grad():
    for inputs, targets in test_loader:
        inputs, targets = inputs.cuda(), targets.cuda()
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        print('Test Loss: {:.4f}'.format(loss.item()))


ValueError: too many values to unpack (expected 2)