In [1]:
import torch
import torch.nn as nn
import copy
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import MinMaxScaler
from core.util.get_datasets import get_trefor_timeseries, get_timeseries_dataset
from sklearn.model_selection import ParameterGrid

Set global parameters.

In [2]:
param_grid = {
    "learning_rate": [0.001, 0.01],
    "batch_size": [16],
    "hidden_size": [32, 64],
    "num_stacked_layers": [1, 2],
    "epochs": [3, 10],
    "lookback": [6, 24, 168],
}

loss_function = nn.HuberLoss()

Read the preprocessed Trefor household data as a timeseries.

In [3]:
data = get_trefor_timeseries()
# data

Use CUDA (GPU) if available.

In [4]:
device = "cuda:0" if torch.cuda.is_available() else "cpu"
device

'cpu'

Normalize data.

In [5]:
scaler = MinMaxScaler(feature_range=(0, 1))
data_normalized = scaler.fit_transform(data)
# data_normalized

Split data into a training, validation, and test set. Output of the get_timeserie_dataset are tensors.

Create the datasets for train, validation and test.

In [6]:
class TreforData(Dataset):
    """Initialize Trefor dataset."""

    def __init__(self, x: torch.tensor, y: torch.tensor) -> None:
        """Initialize dataset.

        Arguments:
            x: feature as torch
            y: target as torch

        """
        self.x = x
        self.y = y

    def __len__(self) -> int:
        """Return length of dataset."""
        return len(self.x)

    def __getitem__(self, i: int) -> tuple:
        """Return tuple from dataset."""
        return self.x[i], self.y[i]

Initialize a very basic LSTM.

In [7]:
class LSTM(nn.Module):
    """Super scuffed LSTM."""

    def __init__(
        self, input_size: int, hidden_size: int, num_stacked_layers: int
    ) -> None:
        """Initialize the LSTM and its layers."""
        super().__init__()
        self.hidden_size = hidden_size
        self.num_stacked_layers = num_stacked_layers

        self.lstm = nn.LSTM(
            input_size, hidden_size, num_stacked_layers, batch_first=True
        )

        self.fc1 = nn.Linear((input_size * hidden_size), hidden_size)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_size, 1)

    def forward(self, x: torch.tensor) -> torch.tensor:
        """Define the forward pass."""
        batch_size = x.size(0)

        h0 = torch.zeros(self.num_stacked_layers, batch_size, self.hidden_size).to(
            device
        )
        c0 = torch.zeros(self.num_stacked_layers, batch_size, self.hidden_size).to(
            device
        )

        x, _ = self.lstm(x, (h0, c0))
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x[:, -1, :])

        return x


model = LSTM(1, 15, 5)
model.to(device)
# model

LSTM(
  (lstm): LSTM(1, 15, num_layers=5, batch_first=True)
  (fc1): Linear(in_features=15, out_features=15, bias=True)
  (relu): ReLU()
  (fc2): Linear(in_features=15, out_features=1, bias=True)
)

Function for training one epoch.

In [8]:
def train_one_epoch(
    training_loader: torch.utils.data.Dataset, optimizer: torch.optim.Adam
) -> float:
    """Train one epoch."""
    running_loss = 0.0
    last_loss = 0.0

    for i, data in enumerate(training_loader):
        # Every data instance is an input + target
        inputs, target = data

        # Zero your gradients for every batch!
        optimizer.zero_grad()

        # Make predictions for this batch
        predictions = model(inputs)

        # Compute the loss and its gradients
        loss = loss_function(predictions, target)
        loss.backward()

        # Adjust learning weights
        optimizer.step()

        # Gather data and report
        running_loss += loss.item()
        if i % 100 == 99:
            last_loss = running_loss / 100  # loss per 100 batch
            # print(f'  batch {i+1} loss: {last_loss}')
            running_loss = 0.0
    return last_loss

Train a model with specified hyperparameters

In [9]:
def train_with_params(params: dict) -> (float, LSTM):
    """Train model with the specified hyperparameters."""
    # Extract hyperparameters
    learning_rate = params["learning_rate"]
    batch_size = params["batch_size"]
    hidden_size = params["hidden_size"]
    num_stacked_layers = params["num_stacked_layers"]
    epochs = params["epochs"]
    lookback = params["lookback"]

    # Split the data into training and validation
    x_train, x_val, x_test, y_train, y_val, y_test = get_timeseries_dataset(
        data_normalized, lookback
    )

    train_dataset = TreforData(x_train, y_train)
    val_dataset = TreforData(x_val, y_val)

    training_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False)
    validation_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

    # Initialize model
    model = LSTM(
        input_size=x_train.shape[-1],
        hidden_size=hidden_size,
        num_stacked_layers=num_stacked_layers,
    ).to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    loss_function = nn.HuberLoss()

    best_v_loss = float("inf")
    best_model = None

    # Train the model
    for epoch in range(epochs):
        model.train(True)
        train_one_epoch(training_loader, optimizer)

        # Evaluate on validation set
        model.eval()
        running_v_loss = 0.0
        with torch.no_grad():
            for i, v_data in enumerate(validation_loader):
                v_inputs, v_target = v_data
                v_predictions = model(v_inputs)
                v_loss = loss_function(v_predictions, v_target)
                running_v_loss += v_loss.item()

        avg_v_loss = running_v_loss / (i + 1)

        # Save the best model based on validation loss
        if avg_v_loss < best_v_loss:
            best_v_loss = avg_v_loss
            best_model = copy.deepcopy(model)

    return best_v_loss, best_model

In [10]:
best_loss = float("inf")
best_params = None
best_model = None

for params in ParameterGrid(param_grid):
    v_loss, model = train_with_params(params)
    print(params, v_loss)
    if v_loss < best_loss:
        best_loss = v_loss
        best_params = params
        best_model = model

print("Best Hyperparameters:", best_params)
print("Best Validation Loss:", best_loss)

{'EPOCHS': 3, 'batch_size': 16, 'hidden_size': 32, 'learning_rate': 0.001, 'lookback': 6, 'num_stacked_layers': 1} 0.0739361226897348
{'EPOCHS': 3, 'batch_size': 16, 'hidden_size': 32, 'learning_rate': 0.001, 'lookback': 6, 'num_stacked_layers': 2} 0.02043758769604293
{'EPOCHS': 3, 'batch_size': 16, 'hidden_size': 32, 'learning_rate': 0.001, 'lookback': 24, 'num_stacked_layers': 1} 0.08531399169428781
{'EPOCHS': 3, 'batch_size': 16, 'hidden_size': 32, 'learning_rate': 0.001, 'lookback': 24, 'num_stacked_layers': 2} 0.03238366850736466
{'EPOCHS': 3, 'batch_size': 16, 'hidden_size': 32, 'learning_rate': 0.001, 'lookback': 168, 'num_stacked_layers': 1} 0.012444816264879265
{'EPOCHS': 3, 'batch_size': 16, 'hidden_size': 32, 'learning_rate': 0.001, 'lookback': 168, 'num_stacked_layers': 2} 0.014558678612523123
{'EPOCHS': 3, 'batch_size': 16, 'hidden_size': 32, 'learning_rate': 0.01, 'lookback': 6, 'num_stacked_layers': 1} 0.0314065998504785
{'EPOCHS': 3, 'batch_size': 16, 'hidden_size': 32,