In [1]:
# importing necessary dependancies
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.nn.functional as F
import import_ipynb
from helpers import train_and_validate, count_parameters, plot_loss_comparison, plot_test_predictions, compare_mse_loss
import time

importing Jupyter notebook from helpers.ipynb


In [2]:
# use NVIDIA Geforce GTX 1650
device = "cuda" if torch.cuda.is_available() else "cpu"
device

'cuda'

In [3]:
# hyperparameters
batch_size = 100
input_size = 2
hidden_size = 100
num_classes = 1
lstmnet_learning_rate = 0.001
customnet_learning_rate = 0.001
momentum = 0.8
use_nesterov = True
max_norm = 1.0
epochs = 1000

In [4]:
def adding_problem_generator(N, seq_len=8, high=1):
    """ A data generator for adding problem.

    A single entry has a 2D vector with two rows of the same length. 
    The first row has random numbers, and the second row contains a 
    binary mask with ones at only two positions. The label for this 
    entry is the sum of the numbers from the first row where the mask 
    is one.

     input          label
     -----          -----
    1 4 5 3  ----->   9 (4 + 5)
    0 1 1 0

    N: the number of the entries.
    seq_len: the length of a single sequence.
    high: the random data is sampled from a [0, high] uniform distribution.
    return: (X, Y), X the data, Y the label.
    """
    X_num = np.round(np.random.uniform(low=0, high=high, size=(N, seq_len, 1)), 4)
    X_mask = np.zeros((N, seq_len, 1))
    Y = np.ones((N, 1))
    for i in range(N):
        # Default uniform distribution on position sampling
        positions = np.random.choice(seq_len, size=2, replace=False)
        X_mask[i, positions] = 1
        Y[i, 0] = np.sum(X_num[i, positions])
    X = np.append(X_num, X_mask, axis=2)
    return X, Y

In [5]:
# creating the train and test datasets
X_train, Y_train = adding_problem_generator(10000, 8, 1)
X_test, Y_test = adding_problem_generator(100, 8, 1)
X_train = torch.tensor(X_train).float()
Y_train = torch.tensor(Y_train).float()
X_test = torch.tensor(X_test).float()
Y_test = torch.tensor(Y_test).float()

In [6]:
print(X_train.shape, Y_train.shape)
print(X_val.shape, Y_val.shape)
print(X_test.shape, Y_test.shape)

torch.Size([10000, 8, 2]) torch.Size([10000, 1])
torch.Size([1000, 8, 2]) torch.Size([1000, 1])
torch.Size([100, 8, 2]) torch.Size([100, 1])


In [7]:
class AddingProblemDataset(Dataset):
    def __init__(self, X, Y):
        self.X = X
        self.Y = Y
        
    def __len__(self):
        return len(self.X)
        
    def __getitem__(self, i):
        return self.X[i], self.Y[i]

train_dataset = AddingProblemDataset(X_train, Y_train)
val_dataset = AddingProblemDataset(X_val, Y_val)

In [8]:
train_loader = DataLoader(train_dataset, batch_size = batch_size, shuffle = False)

In [9]:
class LSTMNet(nn.Module):

    def __init__(self, input_size: int, hidden_size: int, num_classes: int):
        super().__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_classes = num_classes
        self.lstm = nn.LSTM(input_size, hidden_size)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        x_, (h_t, c_t) = self.lstm(x)
        out = F.relu(self.fc(x_[:, -1, :]))
        return out

In [10]:
class CustomUnit(nn.Module):

    def __init__(self, input_size: int, hidden_size: int):
        super().__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size

        # input weight
        self.W_i = nn.Parameter(torch.Tensor(input_size, hidden_size), requires_grad = True)

        # forget gate
        self.W_f = nn.Parameter(torch.Tensor(hidden_size, hidden_size), requires_grad = True)
        self.U_f = nn.Parameter(torch.Tensor(hidden_size, hidden_size), requires_grad = True)
        self.b_f = nn.Parameter(torch.Tensor(hidden_size), requires_grad = True)

        self.init_weights()

    def init_weights(self):
        for weight in self.parameters():
            if weight.data.ndimension() < 2:  # Bias or 1D weights
                nn.init.zeros_(weight)
            else:
                nn.init.xavier_uniform_(weight)



    def forward(self, x, hidden = None):

        bs, seq, ip = x.size()
        hidden_seq = []

        if hidden is None:
            h_prev, c_prev = (
                torch.zeros(bs, self.hidden_size).to(device),
                torch.zeros(bs, self.hidden_size).to(device),
            )
        else:
            h_prev, c_prev = hidden

        for t in range(seq):
            x_t = x[:, t, :]
            x_t = torch.tanh(x_t @ self.W_i)
            f_t = torch.sigmoid(x_t @ self.W_f + h_prev @ self.U_f + self.b_f)
            h_t = (f_t * h_prev) + ((1 - f_t) * x_t)
            c_t = c_prev
            hidden_seq.append(h_t.unsqueeze(0))

        hidden_seq = torch.cat(hidden_seq, dim = 0)
        hidden_seq = hidden_seq.transpose(0, 1).contiguous()
        return hidden_seq, (h_t, c_t)


class CustomNet(nn.Module):

    def __init__(self, input_size: int, hidden_size: int, num_classes: int):
        super().__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_classes = num_classes
        self.custom_layer = CustomUnit(input_size, hidden_size)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        x_, (h_t, c_t) = self.custom_layer(x)
        out = F.relu(self.fc(x_[:, -1, :]))
        return out   

In [11]:
lstmnet = LSTMNet(input_size, hidden_size, num_classes).to(device)
customnet= CustomNet(input_size, hidden_size, num_classes).to(device)

In [12]:
total_lstmnet_parameters = count_parameters(lstmnet)
total_customnet_parameters = count_parameters(customnet)

In [13]:
print(f"Total LSTMNet Parameters: {total_lstmnet_parameters}")
print(f"Total CustomNet Parameters: {total_customnet_parameters}")

Total LSTMNet Parameters: 41701
Total CustomNet Parameters: 20401


In [14]:
criterion = nn.MSELoss()
lstmnet_optimizer = torch.optim.Adam(lstmnet.parameters(), lr = lstmnet_learning_rate)
customnet_optimizer = torch.optim.Adam(customnet.parameters(), lr = customnet_learning_rate)

In [15]:
lstmnet_train_loss_array = []
customnet_train_loss_array = []

In [None]:
lstmnet_start_time = time.time()
train_and_validate(epochs, device, lstmnet, train_loader, criterion, lstmnet_optimizer, lstmnet_train_loss_array)
lstmnet_end_time = time.time()
lstmnet_total_training_time = lstmnet_end_time - lstmnet_start_time
lstmnet_avg_training_time_per_epoch = lstmnet_total_training_time / epochs

Epoch no.: 1 | Training Loss: 1.170299892425537 | Validation Loss: 1.228244459629059
Epoch no.: 2 | Training Loss: 1.170299892425537 | Validation Loss: 1.228244459629059
Epoch no.: 3 | Training Loss: 1.170299892425537 | Validation Loss: 1.228244459629059
Epoch no.: 4 | Training Loss: 1.170299892425537 | Validation Loss: 1.228244459629059
Epoch no.: 5 | Training Loss: 1.170299892425537 | Validation Loss: 1.228244459629059
Epoch no.: 6 | Training Loss: 1.170299892425537 | Validation Loss: 1.228244459629059
Epoch no.: 7 | Training Loss: 1.170299892425537 | Validation Loss: 1.228244459629059
Epoch no.: 8 | Training Loss: 1.170299892425537 | Validation Loss: 1.228244459629059
Epoch no.: 9 | Training Loss: 1.170299892425537 | Validation Loss: 1.228244459629059
Epoch no.: 10 | Training Loss: 1.170299892425537 | Validation Loss: 1.228244459629059
Epoch no.: 11 | Training Loss: 1.170299892425537 | Validation Loss: 1.228244459629059
Epoch no.: 12 | Training Loss: 1.170299892425537 | Validation L

In [None]:
customnet_start_time = time.time()
train_and_validate(epochs, device, customnet, train_loader, criterion, customnet_optimizer, customnet_train_loss_array)
customnet_end_time = time.time()
customnet_total_training_time = customnet_end_time - customnet_start_time
customnet_avg_training_time_per_epoch = customnet_total_training_time / epochs

Epoch no.: 1 | Training Loss: 1.170299892425537 | Validation Loss: 1.228244459629059
Epoch no.: 2 | Training Loss: 1.170299892425537 | Validation Loss: 1.228244459629059
Epoch no.: 3 | Training Loss: 1.170299892425537 | Validation Loss: 1.228244459629059
Epoch no.: 4 | Training Loss: 1.170299892425537 | Validation Loss: 1.228244459629059


In [None]:
lstmnet_predictions = lstmnet(X_test.to(device)).cpu().detach().numpy()
customnet_predictions = customnet(X_test.to(device)).cpu().detach().numpy()

In [None]:
# FINAL REPORT PRINT
print()
print()
print("------------------------------------------------------------")
print(f"Total Parameters:     LSTMNet: {total_lstmnet_parameters}  |  CustomNet: {total_customnet_parameters}")
print()
print(f"Total Training Time:     LSTMNet: {lstmnet_total_training_time}s  |  CustomNet: {customnet_total_training_time}s")
print()
print(f"Average Training Time Per Epoch:     LSTMNet: {lstmnet_avg_training_time_per_epoch}s  |  CustomNet: {customnet_avg_training_time_per_epoch}s")

print("------------------------------------------------------------")

In [None]:
plot_loss_comparison(epochs, lstmnet_train_loss_array, customnet_train_loss_array, "LSTM Training Loss", "CustomNet Training Loss", "Training Loss Comparison")

In [None]:
plot_test_predictions(Y_test, lstmnet_predictions, customnet_predictions)

In [None]:
compare_mse_loss(Y_test, lstmnet_predictions, customnet_predictions)