In [1]:
import nbimporter
from sklearn.metrics import accuracy_score
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from typing import Tuple, Callable
from skopt import gp_minimize
from skopt.space import Integer, Categorical, Real
from dataloader import load_dataset

In [2]:
X_train_tensor, X_test_tensor, y_train_tensor, y_test_tensor = load_dataset()

In [3]:
print(f"X train shape:{X_train_tensor.shape}")
print(f"X test shape:{X_test_tensor.shape}")
print(f"y train shape:{y_train_tensor.shape}")
print(f"y test shape:{y_test_tensor.shape}")

X train shape:(36776, 20, 8)
X test shape:(15762, 20, 8)
y train shape:(36776, 1)
y test shape:(15762, 1)


In [4]:
# Create a function to time our experiments
from timeit import default_timer as timer
def print_train_time(start: float, end: float, device: torch.device = None):
  total_time = end - start
  print(f"Train time on {device}: {total_time:.3f} seconds")
  return total_time

In [5]:
class AttentionLSTM(nn.Module):
    def __init__(self, input_size: int, hidden_size: int, num_layers: int, dropout_prob: float = 0.2):
        super(AttentionLSTM, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        # LSTM layer
        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout_prob
        )

        # Attention mechanism
        self.attn_weight = nn.Linear(hidden_size, 1)

        # Output layer
        self.fc = nn.Sequential(
            nn.Linear(hidden_size, 1),  
            nn.Sigmoid()  
        )

        self.dropout = nn.Dropout(dropout_prob)

    def forward(self, x):

        device = x.device 

        # Initialize hidden state
        h_0 = torch.zeros(self.num_layers, x.shape[0], self.hidden_size).to(device)
        c_0 = torch.zeros(self.num_layers, x.shape[0], self.hidden_size).to(device)

        # LSTM output
        lstm_out, _ = self.lstm(x, (h_0, c_0))  # [batch, seq_len, hidden_size]

        # Apply attention: calculate attention weights
        attn_scores = torch.tanh(self.attn_weight(lstm_out))  # [batch, seq_len, 1]
        attn_weights = F.softmax(attn_scores, dim=1)  # Normalize over seq_len

        # Weighted sum of LSTM outputs using attention weights
        context_vector = torch.sum(attn_weights * lstm_out, dim=1)  # [batch, hidden_size]

        # Dropout and output
        out = self.dropout(context_vector)
        out = self.fc(out)  # [batch, 1]
        return out


In [6]:
torch.manual_seed(42)
# Instantiate a sample model for test
model = AttentionLSTM(input_size=8, hidden_size=33, num_layers=3, dropout_prob=0.02)

In [7]:
loss_fn = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr= 0.0003)

In [8]:
def binary_accuracy_fn(y_true: torch.Tensor, y_pred: torch.Tensor) -> float:
    """Calculate binary accuracy with threshold 0.5"""
    y_pred_label = (y_pred > 0.5).float()
    correct = (y_pred_label == y_true).sum().item()
    return correct / len(y_true) * 100

In [9]:
def train_model(model: torch.nn.Module,
                X_train: torch.Tensor,
                y_train: torch.Tensor,
                loss_fn: torch.nn.Module,
                optimizer: torch.optim.Optimizer,
                device: torch.device,
                epochs: int = 100):

    # Move model to device
    model = model.to(device)

    for epoch in range(epochs):
        model.train()

        # Move data to device
        X_train = X_train.to(device)
        y_train = y_train.to(device)

        # Forward pass
        y_pred = model(X_train).squeeze()  # shape: [batch_size]
        loss = loss_fn(y_pred, y_train)

        # Backward
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Calculate accuracy
        acc = binary_accuracy_fn(y_train, y_pred)

        # Logging
        if epoch % 20 == 0 or epoch == epochs - 1:
            print(f"Epoch [{epoch+1}/{epochs}] - Loss: {loss.item():.5f} | Accuracy: {acc:.2f}%")

    return model


In [10]:
import torch
from typing import Tuple, Callable
import matplotlib.pyplot as plt


def recursive_forecast(
    model: torch.nn.Module,
    initial_input: torch.Tensor,
    forecast_steps: int,
    lookback: int,
    device: torch.device
) -> torch.Tensor:
    """
    Recursively forecasts future values using the trained LSTM model.
    
    Args:
        model: Trained LSTM model
        initial_input: Tensor of shape [1, lookback, num_features]
        forecast_steps: Number of future steps to predict
        lookback: Length of input window
        device: CPU or CUDA
    
    Returns:
        forecast: Tensor of shape [forecast_steps, output_dim]
    """
    model.eval()
    forecast = []

    current_input = initial_input.clone().to(device)

    with torch.inference_mode():
        for _ in range(forecast_steps):
            pred = model(current_input)  # [1, 1]
            forecast.append(pred.squeeze(0))  # remove batch dim

            # Update input for next prediction
            last_features = current_input[:, -1, :].clone()
            last_features[:, 0] = pred.squeeze(1)
            new_step = last_features.unsqueeze(1)
            current_input = torch.cat([current_input[:, 1:], new_step], dim=1)

    return torch.stack(forecast)  # [forecast_steps, output_dim]


def evaluate_forecast(
    forecast: torch.Tensor,
    ground_truth: torch.Tensor,
    loss_fn: torch.nn.Module,
    metrics_fn: Callable[[torch.Tensor, torch.Tensor], float] = None
) -> Tuple[float, float]:
    """
    Compares forecast with ground truth and computes evaluation metrics.

    Args:
        forecast: Tensor of predicted values [forecast_steps, output_dim]
        ground_truth: Tensor of true values [forecast_steps, output_dim]
        loss_fn: Loss function (e.g. MSE)
        metrics_fn: Optional function to compute additional metrics like R^2

    Returns:
        loss_value, metrics_value
    """
    loss_value = loss_fn(forecast, ground_truth).item()
    metrics_value = metrics_fn(forecast, ground_truth).item() if metrics_fn else None
    return loss_value, metrics_value



In [None]:
from sklearn.metrics import r2_score
import torch.nn as nn

def regression_r2(y_true: torch.Tensor, y_pred: torch.Tensor) -> torch.Tensor:
    """R² score metric for regression"""
    return torch.tensor(r2_score(y_true.cpu().numpy(), y_pred.cpu().numpy()))

def run_training_and_testing(
    model: torch.nn.Module,
    X_train: torch.Tensor,
    y_train: torch.Tensor,
    X_test: torch.Tensor,
    y_test: torch.Tensor,
    lookback: int,
    device: torch.device,
    epochs: int = 100,
    lr: float = 0.001,
):
    # Define loss and optimizer
    loss_fn = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)


    # Begin training loop
    for epoch in range(1, epochs + 1):
        model.train()

        # Forward pass
        y_pred = model(X_train).squeeze()
        loss = loss_fn(y_pred, y_train)

        # Backward pass
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Evaluate every 20 epochs
        if epoch % 20 == 0 or epoch == epochs:
            # Forecast using recursive strategy
            initial_input = X_test[0].unsqueeze(0)  # [1, lookback, features]
            forecast_steps = len(y_test)
            forecast = recursive_forecast(model, initial_input, forecast_steps, lookback, device)

            # Evaluate forecast
            test_loss, test_r2 = evaluate_forecast(forecast, y_test[:forecast_steps], loss_fn, regression_r2)

            print(f"Epoch [{epoch}/{epochs}]")
            print(f"  Train Loss: {loss.item():.5f}")
            print(f"  Test Loss:  {test_loss:.5f} | R² Score: {test_r2:.4f}")

    return model


In [None]:
print(x.size)

NameError: name 'x' is not defined

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

trained_model = run_training_and_testing(
    model=AttentionLSTM(input_size=8, hidden_size=64, num_layers=2),
    X_train=X_train_tensor,
    y_train=y_train_tensor,
    X_test=X_test_tensor,
    y_test=y_test_tensor,
    lookback=20,
    device=device,
    epochs=100
)


TypeError: 'int' object is not callable