Dependencies

In [None]:
import copy
import math
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import numpy as np
import optuna
import os
from pathlib import Path
import random
import re
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, accuracy_score
import sys
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
import time 
from torch.utils.data import TensorDataset, DataLoader

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)
random_state = 42
torch.manual_seed(random_state)
torch.backends.cudnn.deterministic = True
torch.cuda.manual_seed_all(random_state)

os.makedirs('Hyperparmeter Tuning', exist_ok=True)
os.makedirs('Model', exist_ok=True)
os.makedirs('Plots', exist_ok=True)

Fake GUI

In [None]:
n_epochs = 500
split_ratio = 0.2
batch_size = 64
n_optuna_trials = 600
metric = 'Accuracy'

dataset_name = 'convexity_dataset'
dataset_name = 'firearm_dataset'

model_choice = 'Transformer'
model_choice = 'CNN'

if dataset_name == 'firearm_dataset':
    n_channels = 11
    n_classes = 4
    seq_len = 300
elif dataset_name == 'convexity_dataset':
    n_samples = 10000
    n_channels = 2
    n_classes = 2
    seq_len = 100

CNN_model_depth_range = [1, 3]              # step: 1
CNN_base_n_kernels_range = [4, 16]          # step: 4
CNN_base_kernel_size_range = [3, 9]         # step: 2
CNN_pool_size_range = [2, 4]                # step: 1

Transformer_model_depth_range = [1, 5]      # step: 1
Transformer_embedding_dim_range = [4]    # step: 1 (d = 2^(embedding_dim))
Transformer_n_heads_range = [2]          # step: 1 (h = 2^(n_heads))

learning_rate_range = [1e-7, 1e-1]          # log
dropout_rate_range = [0.1, 0.5]             # na
patience_range = [5, 25]                    # step: 5
delta_range = [1e-4, 1.0]                   # log

generic_params = {
    'n_epochs': n_epochs,
    'n_channels': n_channels,
    'seq_len': seq_len,
    'n_classes': n_classes,
    'random_state': random_state
}

match model_choice:
    case 'CNN':
        search_space = {
            'CNN_model_depth_range': CNN_model_depth_range,                     
            'CNN_base_n_kernels_range': CNN_base_n_kernels_range,               
            'CNN_base_kernel_size_range': CNN_base_kernel_size_range,           
            'CNN_pool_size_range': CNN_pool_size_range,                         
        }
    case 'Transformer':
        search_space = {
            'Transformer_model_depth_range': Transformer_model_depth_range,     
            'Transformer_embedding_dim_range': Transformer_embedding_dim_range, 
            'Transformer_n_heads_range': Transformer_n_heads_range              
        }
    case _:
        search_space = {}

search_space['learning_rate_range'] = learning_rate_range
search_space['dropout_rate_range'] = dropout_rate_range  
search_space['patience_range'] = patience_range
search_space['delta_range'] = delta_range

In [None]:
class_dict = {
    0: 'Good Grip, Good Trigger',
    1: 'Good Grip, Bad Trigger',
    2: 'Bad Grip, Good Trigger',
    3: 'Bad Grip, Bad Trigger' 
}

def check_for_file_duplicates(path:str) -> str:

    path = Path(path)

    if not path.exists():
        return str(path)

    file_name = path.stem
    file_extension = path.suffix
    path_directory = path.parent

    counter = 1
    while True:
        new_name = f"{file_name}_{counter}{file_extension}"
        new_path = path_directory / new_name
        
        if not new_path.exists():
            return str(new_path)
        
        counter += 1

def convex_generator(seq_len: int, noise_level: float = 0.1) -> torch.tensor:
    
    x = torch.linspace(-1, 1, seq_len)
    
    a = abs(torch.randn(1).item())
    b = torch.randn(1).item() * 0.5  
    c = torch.randn(1).item() 
    
    y = a * (x - b) ** 2 + c
    
    noise = torch.randn(seq_len) * noise_level
    y = y + noise
    
    return y.view(1, seq_len)

def concave_generator(seq_len: int, noise_level: float = 0.1) -> torch.tensor:
    
    x = torch.linspace(-1, 1, seq_len)
    
    a = abs(torch.randn(1).item())
    b = torch.randn(1).item() * 0.5 
    c = torch.randn(1).item()
    
    y = -(a * (x - b) ** 2 + c)
    
    noise = torch.randn(seq_len) * noise_level
    y = y + noise
    
    return y.view(1, seq_len)

def create_two_channel_dataset(n_samples:int, seq_len:int) -> TensorDataset:
    
    data = []
    labels = []
    generators = [(convex_generator, "convex"), (concave_generator, "concave")]
    
    for _ in range(n_samples):
        gen1, type1 = random.choice(generators)
        gen2, type2 = random.choice(generators)
        
        channel1 = gen1(seq_len)
        channel2 = gen2(seq_len)
        
        sample = torch.cat([channel1, channel2], dim=0)
        data.append(sample)
        
        label = 1 if type1 == type2 else 0
        labels.append(label)

        if _ < 5:
            print(f'Channel 1: {type1}')
            print(f'Channel 2: {type2}')
            channel1 = sample[0, :].cpu().numpy()  # Channel 1 values, shape: [seq_length]
            channel2 = sample[1, :].cpu().numpy()  # Channel 2 values, shape: [seq_length]
            indices = list(range(1, sample.shape[1] + 1))
            print(f'Label:     {label} (1 represents same, 0 represents different)')
            plt.figure(figsize=(10, 6))
            plt.plot(indices, channel1, label='Channel 1', color='blue', linewidth=2)
            plt.plot(indices, channel2, label='Channel 2', color='red', linewidth=2)
            plt.xlabel('Index')
            plt.ylabel('Value')
            plt.title(f'Two Channel Dataset Instance {_}')
            plt.legend()
            plt.grid(True)
            plt.show()
    
    features_tensor = torch.stack(data, dim=0)
    labels_tensor = torch.tensor(labels, dtype=torch.long).view(-1, 1)

    return (features_tensor, labels_tensor)

def generate_convexity_dataset(file_name, num_samples:int, seq_len:int) -> None:
    
    if not os.path.exists(f'{file_name}.npz'):
        features, labels = create_two_channel_dataset(num_samples, seq_len)
        np.savez(f'{file_name}.npz', features=features.numpy(), labels=labels.numpy())
        print(f"Dataset saved to {file_name}.")
    else:
        print(f"Dataset already exists at {file_name}.")

def load_dataset(file_name, n_channels) -> TensorDataset:
    
    with np.load(f'Segmented Dataset/{file_name}.npz') as f:
        features = torch.tensor(f['features'], dtype=torch.float32)
        features = features[:, :n_channels, :]
        print(f'Shape of features: {features.shape}')
        labels = torch.tensor(f['labels'], dtype=torch.long)
    dataset = TensorDataset(features, labels)
    print(f"Loaded dataset with {len(dataset)} samples.")
    return dataset

def plot_confusion_matrix(predictions: np.array, labels: np.array, dataloader_name: str='given dataloader'):
    
    cm = confusion_matrix(predictions, labels)
    accuracy = accuracy_score(predictions, labels)

    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
                xticklabels=[f'{class_dict[i]}' for i in range(4)], 
                yticklabels=[f'{class_dict[i]}' for i in range(4)])
    plt.xlabel('Model Prediction', fontsize=10)
    plt.ylabel('Ground Truth', fontsize=10)
    plt.xticks(rotation=45, ha='center', rotation_mode='default', fontsize=7)
    plt.yticks(rotation=45, fontsize=7)
    colorbar = plt.gcf().axes[-1]
    colorbar.yaxis.set_major_locator(ticker.MaxNLocator(integer=True))
    plt.title(f'Confusion Matrix on {dataloader_name} \n ({len(predictions)} samples, {accuracy * 100:.2f}% accuracy)', fontsize = 14)
    plt.tight_layout()
    plt.savefig(f'Plots/{model_choice}_{dataset_name.split('_')[0]}_confusion_matrix_{dataloader_name.split()[0].split('_')[0]}.png', dpi=600)
    plt.show()

def plot_train_history(train_history:list):

    (train_losses, train_accuracies), (val_losses, val_accuracies) = train_history

    plt.figure(figsize=(12, 5))

    plt.subplot(1, 2, 1)
    plt.plot(range(1, len(train_losses) + 1), train_losses, label='Train Loss', marker='o')
    plt.plot(range(1, len(train_losses) + 1), val_losses, label='Validation Loss', marker='o')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title(f'Training and Validation Loss')
    plt.legend()
    plt.grid(True)

    plt.subplot(1, 2, 2)
    plt.plot(range(1, len(train_losses) + 1), train_accuracies, label='Train Accuracy', marker='o')
    plt.plot(range(1, len(train_losses) + 1), val_accuracies, label='Validation Accuracy', marker='o')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy (%)')
    plt.title(f'Training and Validation Accuracy')
    plt.legend()
    plt.grid(True)

    plt.tight_layout()
    plt.savefig(f'Plots/{model_choice}_{dataset_name.split('_')[0]}_train_history.png', dpi=600)
    plt.show()

def scan_best_hyperparams(optuna_file_path: str):
    with open(optuna_file_path, "r") as f:
        content = f.read()
    
    start_index = content.index("Optuna Tuning:")
    optuna_tuning = content[start_index:]
    trials = re.findall(r'Best parameters: \{(.*?)\}', optuna_tuning)

    best_hyperparams = {}  

    for params in trials:
        for param in params.split(", "):
            inner_key, val = param.split(": ")
            inner_key = inner_key.strip("'")
            if val.isdigit():
                best_hyperparams[inner_key] = float(val)
            else:
                best_hyperparams[inner_key] = val.strip("'")

    match model_choice:
        case 'CNN':
            best_hyperparams['model_depth'] = int(best_hyperparams['model_depth'])
            best_hyperparams['base_n_kernels'] = int(best_hyperparams['base_n_kernels'])
            best_hyperparams['base_kernel_size'] = int(best_hyperparams['base_kernel_size'])
            best_hyperparams['pool_size'] = int(best_hyperparams['pool_size'])
            best_hyperparams['dropout_rate'] = float(best_hyperparams['dropout_rate'])
            best_hyperparams['learning_rate'] = float(best_hyperparams['learning_rate'])
            best_hyperparams['patience'] = int(best_hyperparams['patience'])
            best_hyperparams['delta'] = float(best_hyperparams['delta'])
        case 'Transformer':
            best_hyperparams['model_depth'] = int(best_hyperparams['model_depth'])
            best_hyperparams['embedding_dim'] = int(best_hyperparams['embedding_dim'])
            best_hyperparams['n_heads'] = int(best_hyperparams['n_heads'])
            best_hyperparams['learning_rate'] = float(best_hyperparams['learning_rate'])
            best_hyperparams['patience'] = int(best_hyperparams['patience'])
            best_hyperparams['delta'] = float(best_hyperparams['delta'])

    return best_hyperparams

In [None]:
class EarlyStopping:
    def __init__(self, patience=10, delta=0.001):
        self.patience = patience
        self.delta = delta
        self.best_loss = None 
        self.counter = 0
        self.early_stop = False
        self.optimal_val_accuracy = 0.0 
        self.optimal_epoch = 0
        self.best_state_dict = None

    def __call__(self, val_loss, val_accuracy, epoch, model):
        if self.best_loss is None:
            self.best_loss = val_loss
            self.optimal_val_accuracy = val_accuracy
            self.optimal_epoch = epoch
            self.best_state_dict = copy.deepcopy(model.state_dict())
        elif val_loss > self.best_loss - self.delta:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_loss = val_loss
            self.optimal_val_accuracy = val_accuracy
            self.optimal_epoch = epoch
            self.best_state_dict = copy.deepcopy(model.state_dict())
            self.counter = 0

In [None]:
class CNN1D(nn.Module):

    def __init__(self, 
                n_epochs: int,
                input_channels: int, 
                seq_length:int, 
                num_classes: int, 
                model_depth: int, 
                base_n_kernels: int, 
                base_kernel_size: int, 
                pool_size: int, 
                dropout_rate: float, 
                random_state:int = 42
        ):

        random.seed(random_state)
        np.random.seed(random_state)
        torch.manual_seed(random_state)
        if torch.cuda.is_available():
            torch.cuda.manual_seed_all(random_state)
            torch.backends.cudnn.deterministic = True
            torch.backends.cudnn.benchmark = False
        
        super(CNN1D, self).__init__()
        
        self.model_depth = model_depth
        self.n_epochs = n_epochs
        self.criterion = nn.CrossEntropyLoss()
        self.conv_layers = nn.ModuleList()
        self.relu_layers = nn.ModuleList()
        self.pool_layers = nn.ModuleList()

        for layer_idx in range(model_depth):
            self.conv_layers.append(nn.Conv1d(in_channels=input_channels if layer_idx==0 else base_n_kernels*(layer_idx), out_channels=base_n_kernels*(layer_idx+1), kernel_size=base_kernel_size, padding=1))
            self.relu_layers.append(nn.ReLU())
            self.pool_layers.append(nn.MaxPool1d(kernel_size=pool_size))

        self.dropout = nn.Dropout(dropout_rate)
        
        def calc_output_size(input_size, kernel_size, stride, padding):
            return (input_size + 2 * padding - kernel_size) // stride + 1
        
        tensor_size = seq_length
        for layer_idx in range(model_depth):
            tensor_size = calc_output_size(tensor_size, kernel_size=base_kernel_size, stride=1, padding=1)  
            tensor_size = calc_output_size(tensor_size, kernel_size=pool_size, stride=pool_size, padding=0)  
        self.fc = nn.Linear(base_n_kernels * model_depth * tensor_size, num_classes)
        self.softmax = nn.Softmax(dim=1)

    def set_Adam_learning_rate(self, lr: float):

        self.optimizer = optim.Adam(self.parameters(), lr=lr)

    def forward(self, x):

        for layer_idx in range(self.model_depth):
            x = self.conv_layers[layer_idx](x)
            x = self.relu_layers[layer_idx](x)
            x = self.pool_layers[layer_idx](x)
        x = self.dropout(x)
        x = x.view(x.size(0), -1)  
        x = self.fc(x)  

        return x
    
    def train_model(self, train_loader: DataLoader, val_loader: DataLoader, patience: int, delta: float, verbose: bool=False, save: bool=False):

        train_losses = []
        train_accuracies = []
        val_losses = []
        val_accuracies = []

        early_stopping = EarlyStopping(patience=patience, delta=delta)

        for epoch in range(1, self.n_epochs + 1):
            
            self.train()
            train_loss = 0.0
            train_correct = 0
            train_total = 0

            for features, labels in train_loader:
                features, labels = features.to(device), labels.to(device)

                labels = labels.squeeze(-1)

                self.optimizer.zero_grad()
                outputs = self(features)
                loss = self.criterion(outputs, labels)
                loss.backward()
                self.optimizer.step()
                
                train_loss += loss.item()
                _, predictions = torch.max(outputs.data, 1)
                train_total += labels.size(0)
                train_correct += (predictions == labels).sum().item()
            
            train_loss = train_loss / len(train_loader)
            train_accuracy = 100 * train_correct / train_total
            train_losses.append(train_loss)
            train_accuracies.append(train_accuracy)
            
            self.eval()
            
            val_loss = 0.0
            val_correct = 0
            val_total = 0
            
            with torch.no_grad():
                for features, labels in val_loader:
                    features, labels = features.to(device), labels.to(device)
                    outputs = self(features)
                    labels = labels.squeeze(-1)

                    loss = self.criterion(outputs, labels)
                    
                    val_loss += loss.item()
                    _, predictions = torch.max(outputs.data, dim=1)

                    val_total += labels.size(0)
                    val_correct += (predictions == labels).sum().item()

            val_loss = val_loss / len(val_loader)
            val_accuracy = 100 * val_correct / val_total
            val_losses.append(val_loss)
            val_accuracies.append(val_accuracy)
            
            print(f'Epoch [{epoch}/{self.n_epochs}], '
                f'Train Loss: {train_loss:.4f}, Train Acc: {train_accuracy:.2f}%, '
                f'Val Loss: {val_loss:.4f}, Val Acc: {val_accuracy:.2f}%') if verbose else None

            early_stopping(val_loss=val_loss, val_accuracy=val_accuracy, epoch=epoch, model=self)
            if early_stopping.early_stop:
                break

        if verbose == True:
            print(f'EarlyStopping optimal epoch: {early_stopping.optimal_epoch}')
            train_losses = train_losses[:early_stopping.optimal_epoch]
            train_accuracies = train_accuracies[:early_stopping.optimal_epoch]
            val_losses = val_losses[:early_stopping.optimal_epoch]
            val_accuracies = val_accuracies[:early_stopping.optimal_epoch]
            train_history = (train_losses, train_accuracies), (val_losses, val_accuracies)
            plot_train_history(train_history=train_history)
        
        self.load_state_dict(early_stopping.best_state_dict)
        
        if save == True:
            torch.save(self.state_dict(), f'Model/{model_choice}_{dataset_name.split('_')[0]}.pt')

        return early_stopping.optimal_val_accuracy

    def perform_inference(self, any_loader: DataLoader, dataloader_name: str='given loader', output:str = None):
        
        self.eval()
        correct = 0
        total = 0
        
        all_predictions = []
        all_labels = []

        with torch.no_grad():
            for features, labels in any_loader:
                features, labels = features.to(device), labels.to(device)
                outputs = self(features)
                labels = labels.squeeze(-1)

                _, predictions = torch.max(outputs.data, dim=1)
                total += labels.size(0)
                correct += (predictions == labels).sum().item()

            all_predictions.append(predictions)
            all_predictions_tensor = torch.cat(all_predictions, dim=0)
            all_labels.append(labels)
            all_labels_tensor = torch.cat(all_labels, dim=0)

            print(f'Inference accuracy on {dataloader_name}: {100 * correct / total:.2f}%')
            
            if output in ['numpy', 'np', 'np array', 'nparray']:
                return all_predictions_tensor.cpu().numpy(), all_labels_tensor.cpu().numpy()

        return (all_predictions_tensor, all_labels_tensor)

In [None]:
class PositionalEncoding(nn.Module):
    def __init__(self, embedding_dim: int, seq_len: int, dropout_rate: float):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout_rate)
        pe = torch.zeros(seq_len, embedding_dim)
        position = torch.arange(0, seq_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, embedding_dim, 2).float() * (-math.log(10000.0) / embedding_dim))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = x + self.pe[:, :x.size(1), :].to(x.device)
        return self.dropout(x)

class Transformer(nn.Module):
        
    def __init__(self, 
                n_epochs: int, 
                n_channels: int, 
                n_classes: int, 
                seq_len: int, 
                model_depth: int, 
                embedding_dim: int, 
                n_heads: int, 
                dropout_rate: int,
                random_state: int=42):
        
        random.seed(random_state)
        np.random.seed(random_state)
        torch.manual_seed(random_state)
        if torch.cuda.is_available():
            torch.cuda.manual_seed_all(random_state)
            torch.backends.cudnn.deterministic = True
            torch.backends.cudnn.benchmark = False
        
        super().__init__()

        self.n_epochs = n_epochs
        self.criterion = nn.CrossEntropyLoss()
        
        self.embedding = nn.Linear(n_channels, embedding_dim)
        self.pos_encoder = PositionalEncoding(embedding_dim, seq_len, dropout_rate)
        self.transformer = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(embedding_dim, n_heads), model_depth
        )
        self.fc = nn.Linear(embedding_dim, n_classes)

    def set_Adam_learning_rate(self, lr: float):

        self.optimizer = optim.Adam(self.parameters(), lr=lr)

    def forward(self, x):
        x = x.transpose(1, 2)
        x = self.embedding(x)  
        x = self.pos_encoder(x)  
        x = self.transformer(x)  
        x = x[:, -1, :] 
        x = self.fc(x)
        
        return x
    
    def train_model(self, train_loader: DataLoader, val_loader: DataLoader, patience: int, delta: float, verbose: bool=False, save: bool=False):

        train_losses = []
        train_accuracies = []
        val_losses = []
        val_accuracies = []

        early_stopping = EarlyStopping(patience=patience, delta=delta)

        for epoch in range(1, self.n_epochs + 1):
            
            self.train()
            train_loss = 0.0
            train_correct = 0
            train_total = 0

            for features, labels in train_loader:
                features, labels = features.to(device), labels.to(device)

                labels = labels.squeeze(-1)

                self.optimizer.zero_grad()
                outputs = self(features)
                loss = self.criterion(outputs, labels)
                loss.backward()
                self.optimizer.step()
                
                train_loss += loss.item()
                _, predictions = torch.max(outputs.data, 1)

                # print('')
                # print(f'train labels:      {labels}')
                # print(f'train predictions: {predictions}')

                train_total += labels.size(0)
                train_correct += (predictions == labels).sum().item()
            
            train_loss = train_loss / len(train_loader)
            train_accuracy = 100 * train_correct / train_total
            train_losses.append(train_loss)
            train_accuracies.append(train_accuracy)
            
            self.eval()
            
            val_loss = 0.0
            val_correct = 0
            val_total = 0
            
            with torch.no_grad():
                for features, labels in val_loader:
                    features, labels = features.to(device), labels.to(device)
                    outputs = self(features)
                    labels = labels.squeeze(-1)

                    loss = self.criterion(outputs, labels)
                    
                    val_loss += loss.item()
                    _, predictions = torch.max(outputs.data, dim=1)

                    # print('')
                    # print(f'val labels:      {labels}')
                    # print(f'val predictions: {predictions}')

                    val_total += labels.size(0)
                    val_correct += (predictions == labels).sum().item()

            val_loss = val_loss / len(val_loader)
            val_accuracy = 100 * val_correct / val_total
            val_losses.append(val_loss)
            val_accuracies.append(val_accuracy)
            
            print(f'Epoch [{epoch}/{self.n_epochs}], '
                f'Train Loss: {train_loss:.4f}, Train Acc: {train_accuracy:.2f}%, '
                f'Val Loss: {val_loss:.4f}, Val Acc: {val_accuracy:.2f}%') if verbose else None

            early_stopping(val_loss=val_loss, val_accuracy=val_accuracy, epoch=epoch, model=self)
            if early_stopping.early_stop:
                break

        if verbose == True:
            print(f'EarlyStopping optimal epoch: {early_stopping.optimal_epoch}')
            train_losses = train_losses[:early_stopping.optimal_epoch]
            train_accuracies = train_accuracies[:early_stopping.optimal_epoch]
            val_losses = val_losses[:early_stopping.optimal_epoch]
            val_accuracies = val_accuracies[:early_stopping.optimal_epoch]
            train_history = (train_losses, train_accuracies), (val_losses, val_accuracies)
            plot_train_history(train_history=train_history)
        
        self.load_state_dict(early_stopping.best_state_dict)
        
        if save == True:
            torch.save(self.state_dict(), f'Model/{model_choice}_{dataset_name.split('_')[0]}.pt')

        return early_stopping.optimal_val_accuracy

    def perform_inference(self, any_loader: DataLoader, dataloader_name: str='given loader', output:str = None):
        
        self.eval()
        correct = 0
        total = 0
        
        all_predictions = []
        all_labels = []

        with torch.no_grad():
            for features, labels in any_loader:
                features, labels = features.to(device), labels.to(device)
                outputs = self(features)
                labels = labels.squeeze(-1)

                _, predictions = torch.max(outputs.data, dim=1)
                total += labels.size(0)
                correct += (predictions == labels).sum().item()

            all_predictions.append(predictions)
            all_predictions_tensor = torch.cat(all_predictions, dim=0)
            all_labels.append(labels)
            all_labels_tensor = torch.cat(all_labels, dim=0)

            print(f'Inference accuracy on {dataloader_name}: {100 * correct / total:.2f}%')
            
            if output in ['numpy', 'np', 'np array', 'nparray']:
                return all_predictions_tensor.cpu().numpy(), all_labels_tensor.cpu().numpy()

        return (all_predictions_tensor, all_labels_tensor)

In [None]:
def objective(trial, search_space, generic_params, train_loader, val_loader):
    match model_choice:
        case 'CNN':
            param_space = {
                'model_depth': {'type': 'int', 'range': search_space['CNN_model_depth_range'], 'step': 1},
                'base_n_kernels': {'type': 'int', 'range': search_space['CNN_base_n_kernels_range'], 'step': 4},
                'base_kernel_size': {'type': 'int', 'range': search_space['CNN_base_kernel_size_range'], 'step': 2},
                'pool_size': {'type': 'int', 'range': search_space['CNN_pool_size_range'], 'step': 1},
                'dropout_rate': {'type': 'float', 'range': search_space['dropout_rate_range']},
                'learning_rate': {'type': 'float', 'range': search_space['learning_rate_range'], 'log': True},
                'patience': {'type': 'int', 'range': search_space['patience_range'], 'step': 5},
                'delta': {'type': 'float', 'range': search_space['delta_range'], 'log': True},
            }
        case 'Transformer':
            param_space = {
                'model_depth': {'type': 'int', 'range': search_space['Transformer_model_depth_range'], 'step': 1},
                'embedding_dim': {'type': 'int', 'range': search_space['Transformer_embedding_dim_range'], 'step': 1},
                'n_heads': {'type': 'int', 'range': search_space['Transformer_n_heads_range'], 'step': 1},
                'dropout_rate': {'type': 'float', 'range': search_space['dropout_rate_range']},
                'learning_rate': {'type': 'float', 'range': search_space['learning_rate_range'], 'log': True},
                'patience': {'type': 'int', 'range': search_space['patience_range'], 'step': 5},
                'delta': {'type': 'float', 'range': search_space['delta_range'], 'log': True},
            }
    params = {}
    for name, config in param_space.items():
        params[name] = (
            trial.suggest_int(name, config['range'][0], config['range'][-1], step=config.get('step', 1), log=config.get('log', False)) if config['type'] == 'int' else
            trial.suggest_float(name, config['range'][0], config['range'][-1], log=config.get('log', False)) if config['type'] == 'float' else
            trial.suggest_categorical(name, config['range'])
        )
        
    match model_choice:
        case 'CNN':
            model = CNN1D(n_epochs = generic_params['n_epochs'],
                        input_channels=generic_params['n_channels'], 
                        seq_length=generic_params['seq_len'], 
                        num_classes=generic_params['n_classes'], 
                        model_depth=params['model_depth'],
                        base_n_kernels=params['base_n_kernels'], 
                        base_kernel_size=params['base_kernel_size'], 
                        pool_size=params['pool_size'], 
                        dropout_rate=params['dropout_rate'],
                        random_state=generic_params['random_state'])
        case 'Transformer':
            if params['embedding_dim'] < params['n_heads']:
                print(f'prior to adjustment {params['embedding_dim']}, {params['n_heads']}')
                print(params)
                print(trial.suggest_int('n_heads', 1, 1))
                # params['n_heads'] = trial.suggest_int('n_heads', 1, params['embedding_dim'])
                params['n_heads'] = 1
                print(params)
                print(f'after adjustment {params['embedding_dim']}, {params['n_heads']}')
            model = Transformer(n_epochs = generic_params['n_epochs'],
                        n_channels=generic_params['n_channels'], 
                        seq_len=generic_params['seq_len'], 
                        n_classes=generic_params['n_classes'], 
                        model_depth=params['model_depth'],
                        embedding_dim=2**params['embedding_dim'],
                        n_heads=2**params['n_heads'],
                        dropout_rate=params['dropout_rate'],
                        random_state=generic_params['random_state'])
    model.to(device)

    model.set_Adam_learning_rate(lr=params['learning_rate'])

    best_accuracy = model.train_model(train_loader=train_loader, val_loader=val_loader, patience=params['patience'], delta=params['delta'], verbose=0)

    return best_accuracy

In [None]:
if __name__ == "__main__":

    print(f'model_choice: {model_choice}')

    generate_convexity_dataset(file_name=dataset_name, num_samples=n_samples, seq_len=seq_len) if dataset_name == 'convexity_dataset' else None
        # print(f'Shape of features: {f['features'].shape}')
        
    dataset = load_dataset(file_name=dataset_name, n_channels=n_channels)
    features, labels = dataset.tensors

    train_val_indices, test_indices = train_test_split(
        range(len(dataset)),
        test_size=split_ratio,
        stratify=labels,
        random_state=random_state
    )

    train_indices, val_indices = train_test_split(
        train_val_indices,
        test_size=split_ratio / (1 - split_ratio),
        stratify=[labels[i] for i in train_val_indices],
        random_state=random_state
    )

    train_dataset = TensorDataset(features[train_indices], labels[train_indices])
    val_dataset = TensorDataset(features[val_indices], labels[val_indices])
    test_dataset = TensorDataset(features[test_indices], labels[test_indices])

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    start_time = time.perf_counter()

    if not os.path.exists(f'Hyperparmeter Tuning/{model_choice}_{dataset_name.split('_')[0]}_optuna.txt'):
        study = optuna.create_study(direction='maximize', sampler=optuna.samplers.TPESampler(seed=random_state))
        study.optimize(lambda trial: objective(trial, search_space=search_space, generic_params=generic_params, train_loader=train_loader, val_loader=val_loader), n_optuna_trials)
        with open(check_for_file_duplicates(f'Hyperparmeter Tuning/{model_choice}_{dataset_name.split('_')[0]}_optuna.txt'), 'w') as file:

            file.write("Optuna Tuning:")
            file.write("\n\n")
            file.write(f"Best value ({metric}): {study.best_value}")
            file.write("\n")
            file.write(f"Best parameters: {study.best_params}")

            if study.best_trial.user_attrs:
                file.write("\n\nAdditional Trial Attributes:")
            for key, value in study.best_trial.user_attrs.items():
                file.write(f"\n{key}: {value}")
            
        print(f'Optuna files for model {model_choice} saved successfuly.')

    best_hyperparams = scan_best_hyperparams(f'Hyperparmeter Tuning/{model_choice}_{dataset_name.split('_')[0]}_optuna.txt')
    
    match model_choice:
        case 'CNN':
            model = CNN1D(n_epochs=n_epochs,
                        input_channels=n_channels, 
                        seq_length=seq_len, 
                        num_classes=n_classes, 
                        model_depth=int(best_hyperparams['model_depth']),
                        base_n_kernels=best_hyperparams['base_n_kernels'], 
                        base_kernel_size=best_hyperparams['base_kernel_size'], 
                        pool_size=best_hyperparams['pool_size'], 
                        dropout_rate=best_hyperparams['dropout_rate'],
                        random_state=random_state)
        case 'Transformer':
            model = Transformer(n_epochs=n_epochs, 
                                n_channels=n_channels, 
                                seq_len=seq_len, 
                                n_classes=n_classes,
                                model_depth=int(best_hyperparams['model_depth']), 
                                embedding_dim=2**int(best_hyperparams['embedding_dim']), 
                                n_heads=2**int(best_hyperparams['n_heads']), 
                                dropout_rate=float(best_hyperparams['dropout_rate']),
                                random_state=random_state)

    model.to(device)

    if not os.path.exists(f'Model/{model_choice}_{dataset_name.split('_')[0]}.pt'):
        model.set_Adam_learning_rate(lr=float(best_hyperparams['learning_rate']))
        best_accuracy = model.train_model(train_loader=train_loader, val_loader=val_loader, patience=int(best_hyperparams['patience']), delta=float(best_hyperparams['delta']), verbose=1, save=True)
        train_predictions, train_labels = model.perform_inference(any_loader=train_loader, dataloader_name='Train Loader', output='np')
        val_predictions, val_labels = model.perform_inference(any_loader=val_loader, dataloader_name='Validation Loader', output='np')
        test_predictions, test_labels = model.perform_inference(any_loader=test_loader, dataloader_name='Test Loader', output='np')
    else:
        model.load_state_dict(torch.load(f'Model/{model_choice}_{dataset_name.split('_')[0]}.pt', map_location=device))
        train_predictions, train_labels = model.perform_inference(any_loader=train_loader, dataloader_name='Train Loader', output='np')
        val_predictions, val_labels = model.perform_inference(any_loader=val_loader, dataloader_name='Validation Loader', output='np')
        test_predictions, test_labels = model.perform_inference(any_loader=test_loader, dataloader_name='Test Loader', output='np')

    plot_confusion_matrix(predictions=train_predictions, labels=train_labels, dataloader_name='Train Loader')
    plot_confusion_matrix(predictions=val_predictions, labels=val_labels, dataloader_name='Validation Loader')
    plot_confusion_matrix(predictions=test_predictions, labels=test_labels, dataloader_name='Test Loader')

    end_time = time.perf_counter()
    elapsed_time = end_time - start_time
    print(f"Elapsed time for ML pipeline: {elapsed_time:.6f} seconds")