In [None]:
import torch
from tqdm import tqdm
import utils
from model import TimeSeriesTransformer
import os
import matplotlib.pyplot as plt
import numpy as np
import random
import hyperparameters
from psd import power_spectrum_error, compute_power_spectrum
from torch import nn, Tensor
import math
from torch.utils.data import Dataset
from typing import Tuple
import csv



In [None]:
class TransformerDataset(Dataset):
    def __init__(self,
                 data: torch.tensor,
                 indices: list,
                 enc_seq_len: int,
                 dec_seq_len: int,
                 target_seq_len: int
                 ) -> None:
        super().__init__()
        self.indices = indices
        self.data = data
        self.enc_seq_len = enc_seq_len
        self.dec_seq_len = dec_seq_len
        self.target_seq_len = target_seq_len

    def __len__(self):
        return len(self.indices)

    def __getitem__(self, index):
        """
        Returns a tuple with 3 elements:
        1) src (the encoder input)
        2) trg (the decoder input)
        3) trg_y (the target)
        """
        # Get the first element of the i'th tuple in the list self.indicesasdfas
        start_idx = self.indices[index][0]

        # Get the second (and last) element of the i'th tuple in the list self.indices
        end_idx = self.indices[index][1]

        sequence = self.data[start_idx:end_idx]

        src, trg, trg_y = self.get_src_trg(
            sequence=sequence,
            enc_seq_len=self.enc_seq_len,
            dec_seq_len=self.dec_seq_len,
            target_seq_len=self.target_seq_len
        )

        return src, trg, trg_y

    def get_src_trg(
            self,
            sequence: torch.Tensor,
            enc_seq_len: int,
            dec_seq_len: int,
            target_seq_len: int
    ) -> Tuple[torch.tensor, torch.tensor, torch.tensor]:
        
        assert len(
            sequence) == enc_seq_len + target_seq_len, "Sequence length does not equal (input length + target length)"

        # encoder input
        src = sequence[:enc_seq_len]
        trg = sequence[enc_seq_len - 1:len(sequence) - 1]
        assert len(trg) == target_seq_len, "Length of trg does not match target sequence length"

        # The target sequence against which the model output will be compared to compute loss
        trg_y = sequence[-target_seq_len:]
        assert len(trg_y) == target_seq_len, "Length of trg_y does not match target sequence length"

        return src, trg, trg_y

Now we define the TimeSeriesTransformer

In [None]:
class TimeSeriesTransformer(nn.Module):
    def __init__(self,
                 input_size: int,
                 dec_seq_len: int,
                 d_model: int = 512,
                 n_encoder_layers: int = 4,
                 n_decoder_layers: int = 4,
                 dropout: float = 0.2,
                 max_seq_len: int = 512,
                 dim_feedforward_encoder: int = 2048,
                 n_heads: int = 8,
                 dim_feedforward_decoder: int = 2048,
                 num_predicted_features: int = 3
                 ):

        super().__init__()
        # positional encoder

        self.dropout = nn.Dropout(p=dropout)

        # create array for positional encoding
        position_counter = torch.arange(max_seq_len).unsqueeze(1)
        # taken from the positional encoding torch tutorial
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))

        # create positional encoding shift to add to the sequential data
        pos_encoding = torch.zeros(1, max_seq_len, d_model)
        pos_encoding[0, :, 0::2] = torch.sin(position_counter * div_term)
        pos_encoding[0, :, 1::2] = torch.cos(position_counter * div_term)

        # this makes torch register the positional encoding as non-trainable parameter
        self.register_buffer('pe', pos_encoding)

        # encoder


        self.encoder_input_layer = nn.Linear(
            in_features=input_size,
            out_features=d_model
        )

        # Create positional encoder from other module

        # now build encoder layer
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead = n_heads,
            dim_feedforward=dim_feedforward_encoder,
            dropout=dropout,
            batch_first=True
        )

        # stack encoder layers to obtain Encoder
        self.encoder = nn.TransformerEncoder(
            encoder_layer=encoder_layer,
            num_layers=n_encoder_layers,
            norm=None
        )

        self.dec_seq_len = dec_seq_len

        self.decoder_input_layer = nn.Linear(
            in_features=num_predicted_features,
            out_features=d_model,
        )

        # create one decoder layer
        decoder_layer = nn.TransformerDecoderLayer(
            d_model=d_model,
            nhead=n_heads,
            dim_feedforward=dim_feedforward_decoder,
            dropout=dropout,
            batch_first=True
        )

        # stack the decoder layers
        self.decoder = nn.TransformerDecoder(
            decoder_layer=decoder_layer,
            num_layers=n_decoder_layers,
            norm=None
        )

        self.linear_mapping = nn.Linear(
            in_features=d_model,
            out_features=num_predicted_features
        )

    def pos_encoding(self, x: Tensor) -> Tensor:
        """
        Args:
            x: Tensor, shape [batch_size, enc_seq_len, dim_val]

        returns: Tensor, shape [batch_size,enc_seq_len, dim_val]
        """
        #print(f"forward tensor shape: {x.shape}, pos enc shape: {self.pe[:, :x.size(1)].shape}")
        x = x + self.pe[:,:x.size(1)]

        return self.dropout(x)


    def forward(self, src: Tensor, tgt: Tensor, src_mask: Tensor=None, tgt_mask: Tensor=None) -> Tensor:

        src = self.encoder_input_layer(src)

        # Pass through the positional encoding layer
        # src = src.unsqueeze(0)
        pos_encoded_src = self.pos_encoding(src)

        encoder_output = self.encoder(
            src=pos_encoded_src
        )

        tgt = self.decoder_input_layer(tgt)

        decoder_output = self.decoder(
            tgt=tgt,
            memory=encoder_output,
            tgt_mask=tgt_mask,
            memory_mask=src_mask
        )

        decoder_output = self.linear_mapping(decoder_output)

        return decoder_output

Now the training procedure of the neural network

In [None]:
def train_TimeSeriesTransformer(data_path, args):
    # Training parameters
    epochs = args["epochs"]
    batch_size = args["batch_size"]

    # Initialize data
    data = utils.read_data(data_path)

    ## Params from args
    dec_seq_len = args["dec_seq_len"]
    enc_seq_len = args["enc_seq_len"]
    output_seq_len = args["output_seq_len"]
    window_size = args["window_size"]
    step_size = args["step_size"]


    training_indices = utils.get_indices_entire_sequence(
        data=data,
        window_size=window_size,
        step_size=step_size)

    training_data = TransformerDataset(data=data,
                                     indices=training_indices,
                                     enc_seq_len=enc_seq_len,
                                     dec_seq_len=dec_seq_len,
                                     target_seq_len=output_seq_len)

    training_data = DataLoader(training_data, batch_size,shuffle=True)

    model = TimeSeriesTransformer(
        input_size=data.shape[1],  # Assuming 'data' is already loaded
        dec_seq_len=args["dec_seq_len"],
        d_model=args["dim_val"],
        n_encoder_layers=args["n_encoder_layers"],
        n_decoder_layers=args["n_decoder_layers"],
        dropout=0.2,
        max_seq_len=args["max_seq_len"],
        dim_feedforward_encoder=args["in_features_encoder_linear_layer"],
        n_heads=args["n_heads"],
        dim_feedforward_decoder=args["in_features_decoder_linear_layer"],
        num_predicted_features= 3 # Assuming prediction targets match input features
    )


    optimizer = torch.optim.Adam(params=model.parameters())
    criterion = torch.nn.HuberLoss()

    # Make src mask for decoder with size:
    # [batch_size*n_heads, output_sequence_length, enc_seq_len]
    src_mask = utils.generate_square_subsequent_mask(
        dim1=output_seq_len,
        dim2=enc_seq_len
        )

    # Make tgt mask for decoder with size:
    # [batch_size*n_heads, output_sequence_length, output_sequence_length]
    tgt_mask = utils.generate_square_subsequent_mask(
        dim1=output_seq_len,
        dim2=output_seq_len
        )

    losses = []

    # Iterate over all epochs
    for epoch in tqdm(range(epochs)):

        # Iterate over all (x,y) pairs in training dataloader
        for i, (src, tgt, tgt_y) in enumerate(training_data):
            # zero the parameter gradients
            optimizer.zero_grad()
            #print(src.shape, tgt.shape)

            # Make forecasts
            #print(f"src: {src.shape}, tgt: {tgt.shape}")
            prediction = model(src=src, tgt=tgt, src_mask=src_mask, tgt_mask=tgt_mask)

            # Compute and backprop loss
            loss = criterion(tgt_y, prediction)
            losses.append(loss.detach())

            loss.backward()
            #print(loss.detach())

            # Take optimizer step
            optimizer.step()

        # Iterate over all (x,y) pairs in validation dataloader



    model_name = args["model_name"]
    torch.save(model.state_dict(), f"models/{model_name}.pth")

    plt.plot(range(len(losses)),losses)
    plt.ylabel("loss")
    plt.xlabel("epochs")
    plt.savefig(f"plots/training_{model_name}.png")

    # Save hyperparameters as CSV
    with open(f"models/{model_name}.csv", "w", newline="") as file:
        writer = csv.writer(file)
        writer.writerow(["Name", "Age"])  # Column headers
        for key, value in args.items():
            writer.writerow([key, value])