In [2]:
#!pip install optuna tensorboard

In [3]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn import TransformerEncoder, TransformerEncoderLayer, TransformerDecoder, TransformerDecoderLayer
import pandas as pd
import numpy as np

In [4]:
def create_data_sequence(X_data, Y_data, lookback):
        """
        Creates input-output sequences from raw data arrays based on the lookback period.

        :param X_data: Array of input data (features).
        :param Y_data: Array of output data (targets).
        :param lookback: The lookback window for creating sequences.
        :return: Sequences of inputs and outputs.
        """
        X_sequences, y_sequences = [], []
        for i in range(lookback, len(X_data)):
            X_sequences.append(X_data[i - lookback:i])
            y_sequences.append(Y_data[i - lookback:i])

        return np.array(X_sequences), np.array(y_sequences)

In [5]:
#load data for training and testing
import pandas as pd
import numpy as np
train_df = pd.read_csv("Combined_Training31-Aug-2023.csv")
test_df = pd.read_csv("Combined_Testing31-Aug-2023.csv")
X_data_train = train_df[['SOC', 'Current', 'Temp']].values
y_data_train =  train_df['Voltage'].values
X_sequences_train, y_sequences_train = create_data_sequence(X_data_train, y_data_train, 400)

In [8]:
X_sequences_train.shape, y_sequences_train.shape

((761802, 400, 3), (761802, 400))

In [6]:
X_tensor = torch.tensor(X_sequences_train, dtype=torch.float32)
y_tensor = torch.tensor(y_sequences_train, dtype=torch.float32)

In [7]:
torch.save(X_tensor, 'X_tensor.pt')
torch.save(y_tensor, 'y_tensor.pt')

In [9]:
from torch.utils.data import DataLoader, TensorDataset, SubsetRandomSampler

# Define parameters
batch_size = 64
train_split = 0.7
seed = 42  # for reproducibility

# Create a TensorDataset
dataset = TensorDataset(X_tensor, y_tensor)
dataset_size = len(dataset)
indices = list(range(dataset_size))

# Calculate train and validation sizes
train_size = int(dataset_size * train_split)
valid_size = dataset_size - train_size

# Shuffle the indices
np.random.seed(seed)
np.random.shuffle(indices)

# Split indices into training and validation sets
train_indices, valid_indices = indices[:train_size], indices[train_size:]

# Create samplers for training and validation
train_sampler = SubsetRandomSampler(train_indices)
valid_sampler = SubsetRandomSampler(valid_indices)

# Create DataLoaders with samplers
train_loader = DataLoader(dataset, batch_size=batch_size, sampler=train_sampler, drop_last=True)
val_loader = DataLoader(dataset, batch_size=batch_size, sampler=valid_sampler, drop_last=True)

In [16]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn import TransformerEncoder, TransformerEncoderLayer, TransformerDecoder, TransformerDecoderLayer

class TransformerModel(nn.Module):
    def __init__(self, input_size=3, output_size=1, embed_size=32, hidden_size=64, e_num_layers=1, d_num_layers=1, num_heads=4, dropout_prob=0.1, device="cpu"):
        super(TransformerModel, self).__init__()
        self.device = device

        # Embedding layer for combined input (SOC, current, temperature)
        self.embedding = nn.Linear(input_size, embed_size).to(self.device)

        # Embedding layer for decoder input (voltage)
        self.dec_embedding = nn.Linear(output_size, embed_size).to(self.device)

        # Transformer encoder for input sequence
        self.encoder = TransformerEncoder(
            TransformerEncoderLayer(
                d_model=embed_size,
                nhead=num_heads,
                dim_feedforward=hidden_size,
                dropout=dropout_prob,
                batch_first=True
            ),
            num_layers=e_num_layers
        ).to(self.device)

        # Transformer decoder for autoregressive prediction
        self.decoder = TransformerDecoder(
            TransformerDecoderLayer(
                d_model=embed_size,
                nhead=num_heads,
                dim_feedforward=hidden_size,
                dropout=dropout_prob,
                batch_first=True
            ),
            num_layers=d_num_layers
        ).to(self.device)

        # Output layer to predict voltage at each timestep
        self.output_layer = nn.Linear(embed_size, output_size).to(self.device)

    def forward(self, X, dec_input):
        """
        X: Input sequence containing (SOC, Current, Temperature), shape: (batch_size, seq_len, input_size)
        dec_input: Voltage sequence for the decoder, shape: (batch_size, seq_len, output_size)
        """
        
        # Check input shapes
        print("Shape of X before embedding:", X.shape)  # Expected: (batch_size, seq_len, input_size)
        print("Shape of dec_input before embedding:", dec_input.shape)  # Expected: (batch_size, seq_len, output_size)
        
        # Embedding the combined input
        X = self.embedding(X.to(self.device))  # Shape should be (batch_size, seq_len, embed_size)
        print("Shape of X after embedding:", X.shape)  # Expected: (batch_size, seq_len, embed_size)

        # Encoder pass
        encoder_output = self.encoder(X)  # Shape should be (batch_size, seq_len, embed_size)
        print("Shape of encoder output:", encoder_output.shape)  # Expected: (batch_size, seq_len, embed_size)

        # Embedding the decoder input
        dec_input = dec_input.unsqueeze(-1)
        dec_input = self.dec_embedding(dec_input.to(self.device))  # Shape should be (batch_size, seq_len, embed_size)
        print("Shape of dec_input after embedding:", dec_input.shape)  # Expected: (batch_size, seq_len, embed_size)

        # Create a mask for the decoder
        tgt_mask = torch.triu(torch.ones(dec_input.size(1), dec_input.size(1)), diagonal=1).bool().to(self.device)
        
        # Decoder pass
        decoder_output = self.decoder(dec_input, encoder_output, tgt_mask=tgt_mask)
        print("Shape of decoder output:", decoder_output.shape)  # Expected: (batch_size, seq_len, embed_size)

        # Output layer
        output = self.output_layer(decoder_output)  # Final shape should be (batch_size, seq_len, output_size)
        print("Shape of output after output layer:", output.shape)  # Expected: (batch_size, seq_len, output_size)

        return output


    def training_step(self, X, y_input, y_target, optimizer):
        '''Training step for the transformer model'''
    
        # Move data to device
        X, y_input, y_target = X.to(self.device), y_input.to(self.device), y_target.to(self.device)
    
        # Forward pass
        output = self(X, y_input)  # X is the combined input; y_input is the decoder input
    
        # Compute loss between model output and target
        loss = F.mse_loss(output, y_target)
        
        # Backpropagation and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
        return loss.item()
    
    def validation_step(self, X, y_input, y_target):
        '''Validation step for the transformer model'''
    
        # Move data to device
        X, y_input, y_target = X.to(self.device), y_input.to(self.device), y_target.to(self.device)
    
        # Forward pass without gradient computation
        with torch.no_grad():
            output = self(X, y_input)  # X is the combined input; y_input is the decoder input
        
        # Compute validation loss
        loss = F.mse_loss(output, y_target)
    
        return loss.item()

In [17]:
def build_transformer(input_size=3, embed_size=32, hidden_size=64, e_num_layers=1, d_num_layers=1, num_heads=4, dropout_prob=0.1, device="cpu"):
    return TransformerModel(
        input_size=input_size,
        embed_size=embed_size,
        hidden_size=hidden_size,
        e_num_layers=e_num_layers,
        d_num_layers=d_num_layers,
        num_heads=num_heads,
        dropout_prob=dropout_prob,
        device=device
    )

In [18]:
device = "cuda" if torch.cuda.is_available() else "cpu"
model = build_transformer(input_size=3, embed_size=16, hidden_size=64, e_num_layers=2, d_num_layers=2, num_heads=4, dropout_prob=0.1, device=device)

In [19]:
# caluclate the number of parameters
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f'The model has {count_parameters(model):,} trainable parameters')

The model has 15,473 trainable parameters


In [14]:
# Fetch a single batch from train_loader
X_batch, y_batch = next(iter(train_loader))

# Check the shape directly from the DataLoader
X_batch, y_batch = next(iter(train_loader))
print("X_batch shape:", X_batch.shape)  # Expected: (64, 400, 3)
print("y_batch shape:", y_batch.shape)  # Expected: (64, 400)

X_batch shape: torch.Size([64, 400, 3])
y_batch shape: torch.Size([64, 400])


In [20]:
# Forward pass through the model
output = model(X_batch, y_batch)

# Check output shapes
print("Output shape:", output.shape)  # Expected shape: (batch_size, seq_len, 1)
print("Target shape:", y_batch.shape)  # Expected shape: (batch_size, seq_len, 1)

Shape of X before embedding: torch.Size([64, 400, 3])
Shape of dec_input before embedding: torch.Size([64, 400])
Shape of X after embedding: torch.Size([64, 400, 16])
Shape of encoder output: torch.Size([64, 400, 16])
Shape of dec_input after embedding: torch.Size([64, 400, 16])
Shape of decoder output: torch.Size([64, 400, 16])
Shape of output after output layer: torch.Size([64, 400, 1])
Output shape: torch.Size([64, 400, 1])
Target shape: torch.Size([64, 400])


In [None]:
import optuna
import numpy as np

def objective(trial):
    # Define the hyperparameters to be optimized
    embed_size = trial.suggest_int("embed_size", 16, 64)
    hidden_size = trial.suggest_int("hidden_size", 64, 256)
    e_num_layers = trial.suggest_int("e_num_layers", 1, 4)
    d_num_layers = trial.suggest_int("d_num_layers", 1, 4)
    num_heads = trial.suggest_int("num_heads", 2, 8)
    dropout_prob = trial.suggest_float("dropout_prob", 0.1, 0.3)
    learning_rate = trial.suggest_float("lr", 1e-5, 1e-3)

    # Initialize the Transformer model with suggested hyperparameters
    model = TransformerModel(
        input_size=3,
        output_size=1,
        embed_size=embed_size,
        hidden_size=hidden_size,
        e_num_layers=e_num_layers,
        d_num_layers=d_num_layers,
        num_heads=num_heads,
        dropout_prob=dropout_prob,
        device=device
    )
    model.to(device)

    # Define optimizer
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

    # Training loop (for a few epochs to get an idea of performance)
    num_epochs = 5  # You can increase this for more thorough tuning
    for epoch in range(num_epochs):
        model.train()
        train_loss = 0.0
        for X_batch, y_batch in train_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            loss = model.training_step(X_batch, y_batch, optimizer)
            train_loss += loss * X_batch.size(0)
        
        avg_train_loss = train_loss / len(train_loader.dataset)

        # Validation loop
        model.eval()
        val_loss = 0.0
        with torch.no_grad():
            for X_val, y_val in val_loader:
                X_val, y_val = X_val.to(device), y_val.to(device)
                loss = model.validation_step(X_val, y_val)
                val_loss += loss * X_val.size(0)
        
        avg_val_loss = val_loss / len(val_loader.dataset)
        
        # Report validation loss to Optuna
        trial.report(avg_val_loss, epoch)

        # Prune trial if it performs poorly
        if trial.should_prune():
            raise optuna.TrialPruned()
    
    return avg_val_loss