 # Question 2 - sub question 1, 4 and 5

In [8]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np

# load training and test data
def loadData():
    X_train = np.load('X_train.npy',allow_pickle=True)
    y_train = np.load('y_train.npy',allow_pickle=True)
    X_test = np.load('X_test.npy',allow_pickle=True)
    y_test = np.load('y_test.npy',allow_pickle=True)

    X_train = [torch.Tensor(x) for x in X_train]  # List of Tensors (SEQ_LEN[i],INPUT_DIM) i=0..NUM_SAMPLES-1
    X_test = [torch.Tensor(x) for x in X_test]  # List of Tensors (SEQ_LEN[i],INPUT_DIM)
    y_train = torch.Tensor(y_train) # (NUM_SAMPLES,1)
    y_test = torch.Tensor(y_test) # (NUM_SAMPLES,1)

    return X_train, X_test, y_train, y_test


# Define a Vanilla RNN layer by hand
class RNNLayer(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(RNNLayer, self).__init__()
        self.hidden_size = hidden_size
        self.input_size = input_size
        self.W_xh = nn.Linear(input_size, hidden_size)
        self.W_hh = nn.Linear(hidden_size, hidden_size)
        self.activation = nn.Tanh()

    def forward(self, x, hidden):
        combined = self.W_xh(x) + self.W_hh(hidden)
        hidden = self.activation(combined)
        return hidden

# Define a sequence prediction model using the Vanilla RNN
class SequenceModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(SequenceModel, self).__init__()
        self.hidden_size = hidden_size
        self.rnn = RNNLayer(input_size, hidden_size)
        self.linear = nn.Linear(hidden_size, output_size)

    def forward(self, input_seq, seq_lengths):
        batch_size = len(input_seq)
        last_hidden = torch.zeros(batch_size, self.hidden_size).to(device)

        for b in range(batch_size):
            hidden = torch.zeros(1, self.hidden_size).to(device)

            seq_length = seq_lengths[b]
            for t in range(seq_length):
                hidden = self.rnn(input_seq[b][t], hidden)

            # Store the last hidden state in the output tensor
            last_hidden[b] = hidden

        output = self.linear(last_hidden)
        return output

# Define hyperparameters and other settings
input_size = 10  
hidden_size = 64
output_size = 1
num_epochs = 10
learning_rate = 0.001
batch_size = 32

# Load data 
X_train, X_test, y_train, y_test = loadData()
device = y_train.device

# Create the model using min length input
seq_lengths = [seq.shape[0] for seq in X_train]

# Create the model
model = SequenceModel(input_size, hidden_size, output_size).to(device)

# Training loop
def train(model, num_epochs, lr, batch_size, X_train, y_train, seq_lengths):
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)

    for epoch in range(num_epochs):
        for i in range(0, len(X_train), batch_size):
            inputs = X_train[i:i+batch_size]
            targets = y_train[i:i+batch_size]
            lengths = seq_lengths[i:i+batch_size]

            optimizer.zero_grad()
            outputs = model(inputs, lengths)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()

        print(f'Epoch {epoch + 1}/{num_epochs}, Loss: {loss.item()}')

    return model

# initialize and train Vanilla RNN
trained_model = train(model, num_epochs, learning_rate, batch_size, X_train, y_train, seq_lengths)

# Evaluate the trained model
def evaluate(model, X, y):
    model.eval()
    with torch.no_grad():
        predictions = model(X, [seq.shape[0] for seq in X])
    mse = nn.MSELoss()(predictions, y)
    return mse.item()

train_mse = evaluate(trained_model, X_train, y_train)
test_mse = evaluate(trained_model, X_test, y_test)

print(f"Training MSE: {train_mse}")
print(f"Test MSE: {test_mse}")


Epoch 1/10, Loss: 0.02664298191666603
Epoch 2/10, Loss: 0.004362978041172028
Epoch 3/10, Loss: 0.00336488988250494
Epoch 4/10, Loss: 0.002841379027813673
Epoch 5/10, Loss: 0.0024992539547383785
Epoch 6/10, Loss: 0.002202842151746154
Epoch 7/10, Loss: 0.0019441695185378194
Epoch 8/10, Loss: 0.0017336936434730887
Epoch 9/10, Loss: 0.0015591580886393785
Epoch 10/10, Loss: 0.001412449637427926
Training MSE: 0.0011888565495610237
Test MSE: 0.002450475934892893


# Question 2 - sub question 2, 4 and 5

In [5]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np

# Load training and test data
def loadData():
    X_train = np.load('X_train.npy', allow_pickle=True)
    y_train = np.load('y_train.npy', allow_pickle=True)
    X_test = np.load('X_test.npy', allow_pickle=True)
    y_test = np.load('y_test.npy', allow_pickle=True)

    X_train = [torch.Tensor(x) for x in X_train]
    X_test = [torch.Tensor(x) for x in X_test]
    y_train = torch.Tensor(y_train)
    y_test = torch.Tensor(y_test)

    return X_train, X_test, y_train, y_test

# Defining a simple RNN-based sequence prediction model with fixed time steps
class SequenceModelFixedTimeSteps(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, seq_length):
        super(SequenceModelFixedTimeSteps, self).__init__()
        self.hidden_dim = hidden_dim
        self.seq_length = seq_length
        self.rnn = nn.RNN(input_size=input_dim, hidden_size=hidden_dim, num_layers=1, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        # Initialize hidden state with zeros
        h0 = torch.zeros(1, x.size(0), self.hidden_dim).to(x.device)
        out, _ = self.rnn(x, h0)
        out = self.fc(out[:, -1, :])  # Use the output from the last time step
        return out

# Define hyperparameters and other settings
input_dim = 10  
hidden_dim = 64
output_dim = 1
num_epochs = 10
learning_rate = 0.001
batch_size = 32

# Load data
X_train, X_test, y_train, y_test = loadData()
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Determine the minimum sequence length
min_seq_length = min(len(seq) for seq in X_train)

# Trim or pad sequences to the minimum length
X_train = [seq[:min_seq_length] for seq in X_train]
X_test = [seq[:min_seq_length] for seq in X_test]

# Convert the data to tensors and create dataloaders
X_train = torch.stack(X_train)
X_test = torch.stack(X_test)
y_train = y_train.view(-1, 1)  # Ensure the correct shape for the labels

# Initialize the model
model = SequenceModelFixedTimeSteps(input_dim, hidden_dim, output_dim, min_seq_length).to(device)

# Training loop
def train(model, num_epochs, lr, batch_size, X_train, y_train):
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    for epoch in range(num_epochs):
        for i in range(0, len(X_train), batch_size):
            inputs = X_train[i:i+batch_size]
            targets = y_train[i:i+batch_size]
            
            
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()

        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item()}')
    return model

# initialize and train Sequential NN fixing #timesteps to the minimum sequence length
trained_model = train(model, num_epochs, learning_rate, batch_size, X_train, y_train)

# -----  Question 2_Sub question 5 -----#

# Evaluate the trained model
def evaluate(model, X, y):
    model.eval()
    with torch.no_grad():
        predictions = model(X)
    mse = nn.MSELoss()(predictions, y)
    return mse.item()

train_mse = evaluate(trained_model, X_train, y_train)
test_mse = evaluate(trained_model, X_test, y_test)

print(f"Training MSE: {train_mse}")
print(f"Test MSE: {test_mse}")


Epoch [1/10], Loss: 0.009028473868966103
Epoch [2/10], Loss: 0.009373912587761879
Epoch [3/10], Loss: 0.008757087402045727
Epoch [4/10], Loss: 0.008849642239511013
Epoch [5/10], Loss: 0.008843314833939075
Epoch [6/10], Loss: 0.008863409049808979
Epoch [7/10], Loss: 0.00888094399124384
Epoch [8/10], Loss: 0.008892207406461239
Epoch [9/10], Loss: 0.008898316882550716
Epoch [10/10], Loss: 0.008899688720703125
Training MSE: 0.009202002547681332
Test MSE: 0.009862154722213745


# Question 2 - sub question 3, 4 and 5

In [4]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np

# Load training and test data
def loadData():
    X_train = np.load('X_train.npy',allow_pickle=True)
    y_train = np.load('y_train.npy',allow_pickle=True)
    X_test = np.load('X_test.npy',allow_pickle=True)
    y_test = np.load('y_test.npy',allow_pickle=True)

    X_train = [torch.Tensor(x) for x in X_train]  # List of Tensors (SEQ_LEN[i],INPUT_DIM) i=0..NUM_SAMPLES-1
    X_test = [torch.Tensor(x) for x in X_test]  # List of Tensors (SEQ_LEN[i],INPUT_DIM)
    y_train = torch.Tensor(y_train) # (NUM_SAMPLES,1)
    y_test = torch.Tensor(y_test) # (NUM_SAMPLES,1)

    return X_train, X_test, y_train, y_test


# Define a Vanilla RNN layer by hand
class RNNLayer(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(RNNLayer, self).__init__()
        self.hidden_size = hidden_size
        self.input_size = input_size
        self.W_xh = nn.Linear(input_size, hidden_size)
        self.W_hh = nn.Linear(hidden_size, hidden_size)
        self.activation = nn.Tanh()

    def forward(self, x, hidden):
        hidden = self.activation(self.W_xh(x) + self.W_hh(hidden))
        return hidden

# Define a sequence prediction model for variable length sequences, NO SHARED WEIGHTS
class SequenceModelVarLen(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(SequenceModelVarLen, self).__init__()
        self.hidden_size = hidden_size
        self.rnn_layer = RNNLayer(input_size, hidden_size)
        self.linear = nn.Linear(hidden_size, output_size)

    def forward(self, input_seq, seq_lengths):
        batch_size = input_seq.size(0)
        max_seq_length = input_seq.size(1)
        
        # Initialize hidden state for each sequence in the batch
        hidden = torch.zeros(batch_size, self.hidden_size).to(input_seq.device)

        for t in range(max_seq_length):
            # Apply RNN layer at each time step
            hidden = self.rnn_layer(input_seq[:, t, :], hidden)
        output = self.linear(hidden)
        return output

# Define hyperparameters and other settings
input_size = 10  
hidden_size = 64
output_size = 1
num_epochs = 10
learning_rate = 0.001
batch_size = 32

# Load data
X_train, X_test, y_train, y_test = loadData()
device = X_train[0].device

# Create the model
model = SequenceModelVarLen(input_size, hidden_size, output_size).to(device)

# Training loop
def train(model, num_epochs, lr, batch_size, X_train, y_train):
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)

    for epoch in range(num_epochs):
        for i in range(0, len(X_train), batch_size):
            inputs = X_train[i:i+batch_size]
            targets = y_train[i:i+batch_size]
            
            # Pad sequences to the same length (maximum sequence length in the batch)
            max_seq_length = max([seq.size(0) for seq in inputs])
            padded_inputs = torch.zeros(batch_size, max_seq_length, input_size).to(device)
            for j, seq in enumerate(inputs):
                seq_len = seq.size(0)
                padded_inputs[j, :seq_len, :] = seq

            optimizer.zero_grad()
            outputs = model(padded_inputs, None)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()

        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item()}')
    return model

# initialize and train Sequential NN fixing #timesteps to the maximum sequence length
trained_model = train(model, num_epochs, learning_rate, batch_size, X_train, y_train)

# -----  Question 2_Sub question 5 -----#

# Evaluate the trained model
def evaluate(model, X, y):
    model.eval()
    with torch.no_grad():
        mse_sum = 0.0
        for i in range(len(X)):
            max_seq_length = X[i].size(0)
            outputs = model(X[i].unsqueeze(0), [max_seq_length])
            mse_sum += nn.MSELoss()(outputs, y[i:i+1])  # Calculate MSE for each sequence individually
        mse = mse_sum / len(X)
    return mse.item()

train_mse = evaluate(trained_model, X_train, y_train)
test_mse = evaluate(trained_model, X_test, y_test)

print(f"Training MSE: {train_mse}")
print(f"Test MSE: {test_mse}")

Epoch [1/10], Loss: 0.014521529898047447
Epoch [2/10], Loss: 0.007491786032915115
Epoch [3/10], Loss: 0.006498521659523249
Epoch [4/10], Loss: 0.006131777539849281
Epoch [5/10], Loss: 0.005994045175611973
Epoch [6/10], Loss: 0.006073987111449242
Epoch [7/10], Loss: 0.0072938185185194016
Epoch [8/10], Loss: 0.007871345616877079
Epoch [9/10], Loss: 0.007722165901213884
Epoch [10/10], Loss: 0.007495713885873556
Training MSE: 0.00991019792854786
Test MSE: 0.011621176265180111
