In [1]:
import numpy as np 
import scipy.io as sio
import matplotlib.pyplot as plt 
import seaborn as sns
sns.set_theme()

import torch 
import torch.nn as nn 
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

torch.manual_seed(42)
torch.cuda.manual_seed_all(42)
torch.set_default_dtype(torch.float32)

%matplotlib inline

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using {device} device")

Using cuda device


In [None]:
# Simple NN
class ToyNet(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(ToyNet, self).__init__()
        self.linear1 = nn.Linear(input_size, hidden_size) 
        self.linear2 = nn.Linear(hidden_size, hidden_size)
        self.linear3 = nn.Linear(hidden_size, output_size)
        self.activation = nn.ReLU()
    
    def forward(self, x):
        out = self.activation(self.linear1(x))
        out = self.activation(self.linear2(out))
        out = self.linear3(out)
        return out
    
# Expressive LSTM v2
class LSTM(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(LSTM, self).__init__()
        self.lstm = nn.LSTM(input_size=input_size, hidden_size=hidden_size, num_layers=2, bidirectional=True, batch_first=True)
        self.linear1 = nn.Linear(hidden_size, hidden_size) # nn.Linear(hidden_size * 2, hidden_size) for self lstm_out
        self.linear2 = nn.Linear(hidden_size, output_size)
        self.activation = nn.ReLU()
    
    def forward(self, x):
        batch_size = x.shape[0]
        # h0 = torch.zeros((4, batch_size, 256), device=device, requires_grad=True)
        # c0 = torch.zeros((4, batch_size, 256), device=device, requires_grad=True)
        out, (hn, cn) = self.lstm(x.view(batch_size, 1, -1))
        out = self.activation(self.linear1(hn[0]))
        out = self.linear2(out)
        return out

In [3]:
data = sio.loadmat("./Dataset/data_23_03.mat")["training_matrix"]

X = data[:, :-1].astype(np.float32)
y = data[:, -1].reshape(-1, 1).astype(np.float32)

# Normalization
mu = X.mean()
sigma = X.std()

# Normalized Dataset
X_norm = (X - mu) / sigma

# Inputs & Targets
X_train = torch.from_numpy(X_norm[0:25_000]).to(device)
y_train = torch.from_numpy(y[0:25_000]).to(device)

# Inputs & Targets
X_test = torch.from_numpy(X_norm[25_000:25_500]).to(device)
y_test = torch.from_numpy(y[25_000:25_500]).to(device)

X.shape, y.shape, X_train.shape, y_train.shape, X_test.shape, y_test.shape

((27378, 8),
 (27378, 1),
 torch.Size([25000, 8]),
 torch.Size([25000, 1]),
 torch.Size([500, 8]),
 torch.Size([500, 1]))

In [None]:
 # ToyNet Inputs
input_size = 2
hidden_size = 256
output_size = 1

# FNN
model = ToyNet(input_size, hidden_size, output_size).to(device)
delta_t = torch.tensor([0.05], device=device)

epochs = 150
optimizer = optim.AdamW(model.parameters(), lr = 6e-6)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size = 15, gamma = 0.1)
loss_fn = nn.MSELoss()

avg_train_loss = []
for epoch in range(epochs):
    losses_train = []
    for i in range(X_train.shape[0]):
        
        inp = X_train[i].view(4, -1)
        pred = torch.cumsum(model(inp) * delta_t, dim=0)

        loss = loss_fn(pred[-1], y_train[i])        

        optimizer.zero_grad()
        loss.backward(retain_graph=True)
        optimizer.step()        
        losses_train.append(loss.detach().cpu().numpy())
    
    print(f"Epoch: {epoch + 1}, Train Loss: {np.average(losses_train)}")

    scheduler.step()
    avg_train_loss.append(np.average(losses_train))

In [None]:
x = X_test[12].view(4, -1)

pred = model(x)

print(pred)
print(y_test[12])

### Autoregressive Approach

In [None]:
# Autoregressive RNN
class ToyRNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(ToyRNN, self).__init__()
        
        self.hidden_size = hidden_size
        
        self.i2h = nn.Linear(input_size + hidden_size, hidden_size)
        self.h2o = nn.Linear(hidden_size, output_size) 
        self.activation = nn.ReLU()
        
    def forward(self, x, hidden):
        combined = torch.cat((x, hidden))
        hidden = self.activation(self.i2h(combined))
        output = self.activation(self.i2h(combined))
        output = self.h2o(output)
        return output, hidden 
    
    def initHidden(self):
        return torch.zeros(self.hidden_size, device=device)

# ToyRNN Inputs
input_size = 2 
hidden_size = 512
output_size = 1

# Autoregressive Model
model = ToyRNN(input_size, hidden_size, output_size).to(device)
optimizer = optim.AdamW(model.parameters(), lr = 6e-6)
delta_t = torch.tensor([0.05], device=device)

epochs = 240
loss_fn = nn.MSELoss()

avg_train_loss = []
for epoch in range(epochs):
    losses_train = []
    for i in range(X_train.shape[0]):
        l, k = 0, 2
        hidden = model.initHidden()
        model.zero_grad()
        
        for j in range(4):
            inp = X_train[i, l:k]
            output, hidden = model(inp, hidden)
            l += 2
            k += 2

        loss = loss_fn(output, y_train[i])        
        loss.backward()
        optimizer.step()

        losses_train.append(loss.detach().cpu().numpy())
    
    print(f"Epoch: {epoch + 1}, Train Loss: {np.average(losses_train)}")
    avg_train_loss.append(np.average(losses_train))

In [None]:
with torch.no_grad():
    model.eval()
    hidden = model.initHidden()
    
    l, k = 0, 2
    for j in range(4):
        inp = X_test[0, l:k]
        output, hidden = model(inp, hidden)
        l += 2
        k += 2
        
        print(f"For x{j+1}, Pred y{j+1}: {output.item()}\n")
        

    print(f"True y: {y_test[0].item()}")

In [5]:
### Autoregressive GRU
class ProGRU(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(ProGRU, self).__init__()
        self.hidden_size = hidden_size
        self.lstm = nn.GRU(input_size=input_size, hidden_size=hidden_size, num_layers=1, bidirectional=False, batch_first=True)
        self.linear1 = nn.Linear(hidden_size, hidden_size)
        self.linear2 = nn.Linear(hidden_size, hidden_size)
        self.linear3 = nn.Linear(hidden_size, output_size)
        self.activation = nn.PReLU()
    
    def forward(self, x, hidden):
        batch_size = x.shape[0]
        out, hn = self.lstm(x.view(batch_size, 1, -1), hidden)
        out = self.activation(self.linear1(hn[0])) 
        out = self.activation(self.linear2(out))
        out = self.linear3(out)
        return out, hn
    
    def initHidden(self):
        return torch.zeros((1, 1, self.hidden_size), device=device)
    
# ProGRU Inputs
input_size = 2 
hidden_size = 256
output_size = 1

# Autoregressive Model
model = ProGRU(input_size, hidden_size, output_size).to(device)
optimizer = optim.AdamW(model.parameters(), lr = 6e-6)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size = 10, gamma = 0.1)
loss_fn = nn.MSELoss()

epochs = 250
                           
avg_train_loss = []
for epoch in range(epochs):
    losses_train = []
    for i in range(X_train.shape[0]):
        l, k = 0, 2
        hidden = model.initHidden()
        model.zero_grad()
        
        for j in range(4):
            inp = X_train[i, l:k]
            output, hidden = model(inp.unsqueeze(0), hidden)
            l += 2
            k += 2
        
        loss = loss_fn(output.flatten(), y_train[i])        
        loss.backward()
        optimizer.step()

        losses_train.append(loss.detach().cpu().numpy())
    
    print(f"Epoch: {epoch + 1}, Train Loss: {np.average(losses_train)}")
    
    scheduler.step()
    avg_train_loss.append(np.average(losses_train))

Epoch: 1, Train Loss: 0.2773677706718445
Epoch: 2, Train Loss: 0.19978561997413635
Epoch: 3, Train Loss: 0.17691145837306976
Epoch: 4, Train Loss: 0.168458491563797
Epoch: 5, Train Loss: 0.1645454317331314
Epoch: 6, Train Loss: 0.16199879348278046
Epoch: 7, Train Loss: 0.16009503602981567
Epoch: 8, Train Loss: 0.15861491858959198
Epoch: 9, Train Loss: 0.15741877257823944
Epoch: 10, Train Loss: 0.15644396841526031
Epoch: 11, Train Loss: 0.15657185018062592
Epoch: 12, Train Loss: 0.15460681915283203
Epoch: 13, Train Loss: 0.15425047278404236
Epoch: 14, Train Loss: 0.15411032736301422
Epoch: 15, Train Loss: 0.1540076732635498
Epoch: 16, Train Loss: 0.15391629934310913
Epoch: 17, Train Loss: 0.15382935106754303
Epoch: 18, Train Loss: 0.15374493598937988
Epoch: 19, Train Loss: 0.15366394817829132
Epoch: 20, Train Loss: 0.15358316898345947
Epoch: 21, Train Loss: 0.153359055519104
Epoch: 22, Train Loss: 0.1533491164445877
Epoch: 23, Train Loss: 0.15334057807922363
Epoch: 24, Train Loss: 0.153

In [11]:
x = np.arange(0, 500)

y_pred, gt = [], []
for i in range(len(x)):
    with torch.no_grad():
        model.eval()
        hidden = model.initHidden()

        l, k = 0, 2
        for j in range(4):
            inp = X_test[i, l:k]
            output, hidden = model(inp.unsqueeze(0), hidden)
            l += 2
            k += 2

            # print(f"For x{j+1}, Pred y{j+1}: {output.item()}\n")

        y_pred.append(output.item())
        gt.append(y_test[i].item())
        # print(f"True y: {y_test[0].item()}")

In [14]:
%matplotlib qt

plt.figure( figsize=(18, 12))
plt.plot(x, gt, label="Ground Truth", color="blue")
plt.plot(x, y_pred, label="Predictions", color="orange")
plt.legend(loc=1 ,prop={'size': 12})
plt.xlim(0, 502)
# plt.ylim(-10, 10)
plt.xlabel("t")
plt.ylabel("Velocity")
# plt.savefig("GRU Predictions.png")
plt.show()

In [None]:
torch.save(model.state_dict(), './Weights/weights_regressive_gru.pth')