In this approach, I am using an LSTM, as it is more reliable and effective than an RNN, and more easily scalable than transformers, with faster training and inference time.

In [3]:
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
import numpy as np

In [6]:
X = np.load('preprocessed_data/50_data_pts_features_sequential.npy')
y = np.load('preprocessed_data/50_data_pts_labels_sequential.npy')

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, shuffle=False)

# Shuffle the training data after splitting, to prevent data leakage
train_permutation = np.random.permutation(len(X_train))
X_train = X_train[train_permutation]
y_train = y_train[train_permutation]

# Shuffle the testing data
test_permutation = np.random.permutation(len(X_test))
X_test = X_test[test_permutation]
y_test = y_test[test_permutation]
print("X_train:\n", X_train)
print("y_train:\n", y_train)
print("X_test:\n", X_test)

In [8]:
X_train.shape, y_train.shape, X_test.shape, y_test.shape

((348, 43, 18), (348,), (150, 43, 18), (150,))

In [None]:
from sklearn.preprocessing import StandardScaler
X_train_reshaped = X_train.reshape(-1, 18)  # Reshape to (348 * 43, 18)
X_val_reshaped = X_test.reshape(-1, 18)  # Reshape to (150 * 43, 18) --> For normalization

scaler = StandardScaler()

X_train_normalized = scaler.fit_transform(X_train_reshaped) 
X_val_normalized = scaler.transform(X_val_reshaped)  # Only transform on validation data, no fitting

# Reshape back to the original 3D shape
X_train_normalized = X_train_normalized.reshape(348, 43, 18)
X_val_normalized = X_val_normalized.reshape(150, 43, 18)

# Convert to PyTorch tensors
X_train_tensor = torch.tensor(X_train_normalized, dtype=torch.float32)
X_val_tensor = torch.tensor(X_val_normalized, dtype=torch.float32)

print("Training data shape after normalization:", X_train_tensor.shape)
print("Validation data shape after normalization:", X_val_tensor.shape)

Training data shape after normalization: torch.Size([348, 43, 18])
Validation data shape after normalization: torch.Size([150, 43, 18])


In [12]:
target_scaler = StandardScaler()

y_train_normalized = target_scaler.fit_transform(y_train.reshape(-1, 1)).flatten()  # Normalize training targets
y_val_normalized = target_scaler.transform(y_test.reshape(-1, 1)).flatten()  # Normalize validation targets

# Convert to PyTorch tensors
y_train_tensor = torch.tensor(y_train_normalized, dtype=torch.float32).view(-1, 1)
y_val_tensor = torch.tensor(y_val_normalized, dtype=torch.float32).view(-1, 1)

print("Training targets shape after normalization:", y_train_tensor.shape)
print("Validation targets shape after normalization:", y_val_tensor.shape)

Training targets shape after normalization: torch.Size([348, 1])
Validation targets shape after normalization: torch.Size([150, 1])


# LSTM

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim


class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers=1):
        super(LSTMModel, self).__init__()
        
        # LSTM layer
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        
        # Fully connected layer to output a single value
        self.fc = nn.Linear(hidden_size, output_size)
    
    def forward(self, x):
        # Forward pass through LSTM
        lstm_out, (hn, cn) = self.lstm(x)
        # Get the output of the last time step (last hidden state)
        last_hidden_state = lstm_out[:, -1, :]
        # Pass through the fully connected layer
        output = self.fc(last_hidden_state)
        # Scalar value
        return output

input_size = 18  # Number of features (dimensionality of each time step)
hidden_size = 64  # Number of LSTM units (hidden state dimension)
output_size = 1
num_layers = 2  # Number of LSTM layers

model = LSTMModel(input_size=input_size, hidden_size=hidden_size, output_size=output_size, num_layers=num_layers)

criterion = nn.MSELoss()  # Mean Squared Error loss for regression, bigger penalty for large errors, which we want to prevent for this task, otherwise we run larger risks of losing revenue
optimizer = optim.Adam(model.parameters(), lr=0.001)


# Train the model
num_epochs = 100  # Set the number of epochs

for epoch in range(num_epochs):
    model.train()  
    optimizer.zero_grad()
    outputs = model(X_train_tensor)
    train_loss = criterion(outputs, y_train_tensor)
    train_loss.backward()
    optimizer.step()
    model.eval()  
    with torch.no_grad(): 
        val_outputs = model(X_val_tensor)
        val_loss = criterion(val_outputs, y_val_tensor)
    if (epoch+1) % 10 == 0:
        print(f"poch [{epoch+1}/{num_epochs}], Train Loss: {train_loss.item():.4f}, Val Loss: {val_loss.item():.4f}")


Epoch [10/100], Train Loss: 1.0025, Val Loss: 0.3170
Epoch [20/100], Train Loss: 1.0006, Val Loss: 0.3281
Epoch [30/100], Train Loss: 1.0000, Val Loss: 0.3221
Epoch [40/100], Train Loss: 1.0000, Val Loss: 0.3228
Epoch [50/100], Train Loss: 1.0000, Val Loss: 0.3242
Epoch [60/100], Train Loss: 1.0000, Val Loss: 0.3228
Epoch [70/100], Train Loss: 1.0000, Val Loss: 0.3236
Epoch [80/100], Train Loss: 1.0000, Val Loss: 0.3233
Epoch [90/100], Train Loss: 1.0000, Val Loss: 0.3233
Epoch [100/100], Train Loss: 1.0000, Val Loss: 0.3234


Given that these are just test runs, it is not appropriate to make conclusions as to which method is more effective. However, given the results and assuming that these are legitimate results, then choosing the LSTM would be a better choice since the loss on the test set is better than non-sequential models. However, a much more rigorous testing and validation process would be needed, such as K-fold cross validation.