Importing libraries

In [None]:
import arff
import ast
from catboost import CatBoostClassifier, Pool
import copy
from dataclasses import dataclass
import datetime
import glob
import graphviz
# from datetime import datetime
from itertools import product
from imblearn.metrics import specificity_score
from itertools import combinations, chain
import joblib
import json
import matplotlib.pyplot as plt
from matplotlib.colors import LinearSegmentedColormap
import numpy as np
import optuna
from optuna.importance import get_param_importances
from optuna.visualization import plot_optimization_history, plot_param_importances, plot_parallel_coordinate
import os
import pandas as pd
from pathlib import Path
import pydot
from pytorch_tabnet.tab_model import TabNetClassifier
import random
import re
import seaborn as sns
import shap
import statistics
from scipy.io import arff
import sklearn
import sklearn.utils
from sklearn import tree
from sklearn.metrics import (
    accuracy_score,
    f1_score,
    fbeta_score,
    precision_score,
    recall_score
)
from sklearn.datasets import make_classification
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import roc_curve, auc, confusion_matrix, classification_report, f1_score, precision_score, recall_score, fbeta_score, roc_auc_score, accuracy_score
from sklearn.model_selection import StratifiedKFold, train_test_split
from sklearn.svm import SVC
from sklearn.tree import export_graphviz
from sklearn.neighbors import KNeighborsClassifier
import time
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader, random_split
#from torch.utils.tensorboard import SummaryWriter
from tqdm import tqdm
from typing import Generator, Tuple, TypedDict, ClassVar, Union, List, Optional
import xgboost as xgb
from xgboost import XGBClassifier

device = torch.device("cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu")
print(device)
random_state = 42
torch.manual_seed(random_state)
torch.cuda.manual_seed_all(random_state)
np.random.seed(random_state)
random.seed(random_state)
torch.backends.cudnn.deterministic = True

print(sklearn.__version__)
print(torch.__version__)
print(xgb.__version__)

pd.set_option('future.no_silent_downcasting', True)

Tuning Workflow

In [None]:
notebook_path = os.path.abspath(os.getcwd())
notebook_name = os.path.basename(notebook_path)

try:
    file_idx = int(notebook_name.split('_')[-1])
    print(f"The integer at the end of the file name is: {file_idx}")
except ValueError:
    try:
        file_idx = int(notebook_name.split('_')[-2])
        print(f"The integer at the end of the file name is: {file_idx}")
    except ValueError:
        print("The file name does not contain a valid integer at the end.")

combinations_1 = [[i] for i in range(1, 6)]
combinations_2 = [list(c) for c in combinations(range(1, 6), 2)]
combinations_3 = [list(c) for c in combinations(range(1, 6), 3)]
combinations_4 = [list(c) for c in combinations(range(1, 6), 4)]
combinations_5 = [[1, 2, 3, 4, 5]]

all_combinations = sorted(combinations_1 + combinations_2 + combinations_3 + combinations_4 + combinations_5)
# all_combinations = [[1, 2, 3, 4, 5]]
# all_combinations = all_combinations[0:0]
print(all_combinations)

Predefined Dictionaries and Classes

In [None]:
replacement_rules_feature = {
    "M": 1,
    "F": 0,
    "yes": 1,
    "no":0
}

replacement_rules_label = {
    "yes": 1,
    "no": 0,
    "1": 1,
    "2": 0,
    "-1": 0,
    # "Male": -1,
    "Male": 0,
    "Female": 1
}

torch.serialization.add_safe_globals([nn.CrossEntropyLoss])
loss_function_dict = {
    # 'L1':  nn.L1Loss(),
    # 'MSE': nn.MSELoss(),
    'cross entropy': nn.CrossEntropyLoss()
    # 'NLL': nn.NLLLoss(),
    # 'CTC': nn.CTCLoss(),
    # 'KL divergence': nn.KLDivLoss(),
    # 'BCE logit': nn.BCEWithLogitsLoss()
}

index_val_count_dict = {
    'index': 0, 
    'value': float('inf'), 
    'no_improvement_count': 0,
    'model_to_save': None
}

class ModuleResults:

    def __init__(
        self, 
        trained_models, 
        train_accuracy, 
        val_accuracy,
        train_f1=None,
        val_f1=None,
        val_fb=None,
        train_precision=None,
        val_precision=None,
        train_recall=None,
        val_recall=None,
        val_specificity=None,
        train_auc=None,
        val_auc=None,
        stopping_epoch=None
    ) -> None:

        self.trained_models = trained_models
        self.train_accuracy = train_accuracy
        self.val_accuracy = val_accuracy
        self.train_f1 = train_f1
        self.val_f1 = val_f1
        self.val_fb = val_fb
        self.train_precision = train_precision
        self.val_precision = val_precision
        self.train_recall = train_recall
        self.val_recall = val_recall
        self.val_specificity = val_specificity
        self.train_auc = train_auc
        self.val_auc = val_auc
        self.stopping_epoch=stopping_epoch

class ValLossIndexAndValue(TypedDict):
    index: int
    value: float
    no_improvement_count: int

def create_dataframe(feature_combination_list, module_results_list):
    data = [
        {
            "Feature Combination": feature_combination,
            "Accuracy": metrics.accuracy_val,
            "F1 Score": metrics.f1_val,
            "F-beta Score": metrics.fb_val,
            "Precision": metrics.precision_val,
            "Recall": metrics.recall_val,
            "Specificity": metrics.specificity_val,
            "Area Under Curve": metrics.auc_val,
            "Stopping_Epoch": metrics.stopping_epoch
        }
        for feature_combination, metrics in zip(feature_combination_list, module_results_list) 
    ]
    return pd.DataFrame(data)

class ModelConfig:
    def __init__(
        self,
        model_type:str,
        search_space:object,
        name:str=None,
        ensemble_group:str=None
    ):
        self.model_type = model_type       
        self.search_space = search_space       
        self.ensemble_group = ensemble_group     
        self.name = name                   
        self.model_instance = None            

In [None]:
def combine_list_elements(list_to_combine:list[int]) -> str:
    return ''.join(map(str, list_to_combine))

def check_for_file_duplicates(directory:str, base_name:str, file_type:str) -> str:
    
    file_name = os.path.join(directory, base_name + "." + file_type)
    
    counter = 1
    while os.path.exists(file_name):
        file_name = os.path.join(directory, base_name + f" ({counter}).{file_type}")
        counter += 1

    return(file_name)

def check_for_file_duplicates_short(path:str, file_type:str) -> str:
    
    counter = 1
    file_name = path + f"_{counter}{file_type}"

    while os.path.exists(file_name):
        file_name = path + f"_{counter}{file_type}"
        counter += 1

    return(file_name)

def check_for_file_duplicates_simple(path:str) -> str:

    path = Path(path)

    if not path.exists():
        return str(path)

    file_name = path.stem
    file_extension = path.suffix
    path_directory = path.parent

    counter = 1
    while True:
        new_name = f"{file_name}_{counter}{file_extension}"
        new_path = path_directory / new_name
        
        if not new_path.exists():
            return str(new_path)
        
        counter += 1

def custom_json_serializer(obj):
    if isinstance(obj, datetime):
        return obj.isoformat()
    else:
        return str(obj)

def sort_keys(key):
    integers = [int(x) for x in re.findall(r'\d+', str(key))]
    return (len(integers), integers)

def extract_integers(string):
    # Use the re.findall() function to find all the integers in the string
    integers = [int(x) for x in re.findall(r'\d+', string)]
    return integers

def get_latest_base_folder(directory: str) -> str:
    if not os.path.isdir(directory):
        raise ValueError(f"The directory '{directory}' does not exist or is not a directory.")

    entries = os.listdir(directory)
    
    base_folders = [
        entry for entry in entries 
        if os.path.isdir(os.path.join(directory, entry)) and entry.startswith('base10000_')
    ]
    
    if not base_folders:
        raise ValueError(f"No folders starting with 'base10000_' found in '{directory}'.")
        return None
    
    timestamp_pattern = r'base10000_(\d{4})_(\d{2})_(\d{2})__(\d{2})_(\d{2})_(\d{2})'
    
    latest_folder = None
    latest_timestamp = None
    
    for folder in base_folders:
        match = re.match(timestamp_pattern, folder)
        if match:
            year, month, day, hour, minute, second = map(int, match.groups())
            
            try:
                folder_timestamp = datetime.datetime(year, month, day, hour, minute, second)
            except ValueError as e:
                print(f"Invalid timestamp in folder '{folder}': {e}")
                continue
            
            if latest_timestamp is None or folder_timestamp > latest_timestamp:
                latest_timestamp = folder_timestamp
                latest_folder = folder
        else:
            print(f"Folder '{folder}' does not match the expected timestamp format.")
    
    if latest_folder is None:
        print("No folders with valid timestamps found.")
        return None
    
    return os.path.join(directory, latest_folder)

def tensor_2_np(loader):
    X, y = [], []
    for i in range(len(loader.dataset)):
        x, target = loader.dataset[i]
        X.append(x)
        y.append(target)
    return torch.stack(X).numpy(), torch.stack(y).numpy()

def numpy_2_dataloader(X: np.ndarray, Y: np.ndarray, batch_size: int) -> tuple[DataLoader, DataLoader]:

    X_tensor = torch.tensor(X, dtype=torch.float32, device=device)
    Y_tensor = torch.tensor(Y, dtype=torch.long, device=device)

    dataset = TensorDataset(X_tensor, Y_tensor)
    loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

    return loader

def majority_vote(predictions: List[np.ndarray]) -> np.ndarray:
    pred_array = np.vstack(predictions)
    votes_for_1 = np.sum(pred_array, axis=0)
    n_models = len(predictions)
    return (votes_for_1 > n_models / 2).astype(int)

def check_tune_consistency(tuned_performance, actual_performance):
    
    print('\n')
    print(f'Tuned: {tuned_performance}')
    print(f'Actual: {actual_performance}')
    print('\n')
    if tuned_performance == actual_performance:
        print(f'Successful tuning!')
    else: 
        print(f'Failed tuning!')

def string_to_list(s):
    try:
        # Parse string to list
        result = ast.literal_eval(s)
        # Ensure all elements are integers
        if isinstance(result, list):
            return [int(x) for x in result]
        else:
            raise ValueError("Input is not a list")
    except (ValueError, SyntaxError) as e:
        print(f"Error parsing string {s}: {e}")
        return []

Define Modality class

In [None]:
class Modality:

    def __init__(self, 
                mode:str,
                model_choice:str, 
                metric:str,
                is_developer_test_mode:bool, 
                uses_K_Fold:bool, 
                uses_existing_model:bool, 
                saves_plot:bool,
                is_verbose:bool,
                standardizes_input:bool,
                uses_early_stopping:bool,
                bagging_strategy:str,
                random_state:int) -> None:
        
        self.mode = mode
        self.model_choice = model_choice
        self.metric = metric
        self.is_developer_test_mode = is_developer_test_mode
        self.uses_K_Fold = uses_K_Fold
        self.uses_existing_model = uses_existing_model
        self.saves_plot = saves_plot
        self.is_verbose = is_verbose
        self.standardizes_input = standardizes_input
        self.uses_early_stopping = uses_early_stopping
        self.bagging_strategy = bagging_strategy
        self.random_state = random_state

    def __str__(self) -> str:
        lines = ["Modality =============================:"]        
        for name, value in vars(self).items():
            lines.append(f"       - {name}: {value}")
        return "\n".join(lines)

Define data class

In [None]:
class Data:
    
    def __init__(self, 
                raw_dataset_name:str, 
                train_ratio:float, 
                n_classes:int,
                columns_to_keep:list[int],
                data_split_random_seed_modfier:int) -> None:
        
        self.raw_dataset_name = raw_dataset_name
        self.dataset_name = None
        self.batch_sizes:list[int] = None
        
        self.columns_to_keep = columns_to_keep
        self.data_split_random_seed_modfier = data_split_random_seed_modfier
        
        self.n_classes = n_classes
        
        self.train_ratio = train_ratio
        self.validation_ratio:float = 1 - self.train_ratio
        self.k_folds:int = int(1 / self.validation_ratio)
        
        self.df:pd.DataFrame = None
        
        self.columns_all:list[int] = None
        self.columns_remaining:list[int] = None
        self.columns_remaining_str:str = None

        self.X:torch.Tensor = None
        self.Y:torch.Tensor = None

        self.X_train_val:torch.Tensor = None
        self.Y_train_val:torch.Tensor = None

        self.X_train:torch.Tensor = None
        self.Y_train:torch.Tensor = None

        self.X_val:torch.Tensor = None
        self.Y_val:torch.Tensor = None
        
        self.X_test:torch.Tensor = None
        self.Y_test:torch.Tensor = None

        self.train_val_loader:torch.Tensor = None
        self.test_loader:torch.Tensor = None
            
    def __str__(self) -> str:
        lines = ["Dataset =============================:"]        
        for name, value in vars(self).items():
            lines.append(f"       - {name}: {value}")
        return "\n".join(lines)
    

Define searcg space classes

In [None]:
@dataclass
class MLP_SearchSpace:
    max_epoch_range: int
    batch_size_range: List[int]
    model_depth_range: List[int]
    model_width_range: List[int]
    model_lr_range: List[float]
    model_dr_range: List[float]
    activation_function_range: List[str]
    weight_decay_range: List[float]
    criterion_range: List[str]
    num_optuna_trials: int
    # warm_up_range: int
    patience_range: int
    # moving_average_range: int
    l1_weight_range: List[float]
    l2_weight_range: List[float]
    num_optuna_trials:int

    optuna_step_for_batch_size:int = 32
    optuna_step_for_model_depth:int = 1
    optuna_step_for_model_width:int = 1
    optuna_step_for_lr:float = None
    optuna_step_for_dr:float = None
    optuna_step_for_weight_decay:float = None
    optuna_step_for_warm_up:int = 5
    optuna_step_for_patience:int = 5
    optuna_step_for_moving_average:int = 5

    def __str__(self) -> str:
        lines = ["MLP Search Space =============================:"]
        for name, value in vars(self).items():
            lines.append(f"       - {name}: {value}")
        return "\n".join(lines)

@dataclass
class TAB_SearchSpace: 
    batch_size_range: List[int]
    width_prediction_and_attention_range: List[int]
    n_step_range: List[int]
    gamma_range: List[float]
    cat_idx_range: List[int]
    cat_dim_range: List[int]
    n_independent_range: List[int]
    n_shared_range: List[int]
    momentum_range: List[float]
    lambda_sparse_range: List[float]
    mask_type_range: List[str]
    # optimizer_param_range: List[float]
    # max_epoch_range: List[int]
    # patience_range: List[int]
    # max_epoch_range: List[int]
    num_optuna_trials:int

    optuna_step_for_batch_size:int = 32

    def __str__(self) -> str:
        lines = ["TAB Search Space =============================:"]
        for name, value in vars(self).items():
            lines.append(f"       - {name}: {value}")
        return "\n".join(lines)

@dataclass
class SVM_SearchSpace:
    batch_size_range: List[int]
    c_range: List[float]
    kernel_range: List[str]
    gamma_range: List[float]
    tol_range: List[float]
    max_iter_range: List[int]
    degree_range: List[int]
    num_optuna_trials: int

    optuna_step_for_batch_size:int = 32

    def __str__(self) -> str:
        lines = ["SVM Search Space =============================:"]
        for name, value in vars(self).items():
            lines.append(f"       - {name}: {value}")
        return "\n".join(lines)

@dataclass
class KNN_SearchSpace:
    batch_size_range: List[int]
    n_neighbor_range: List[int]
    weight_range: List[str]
    algorithm_range: List[str]
    leaf_size_range: List[int]
    p_range: List[float]
    metric_range: List[str]
    num_optuna_trials: int

    optuna_step_for_batch_size:int = 32
    optuna_step_for_n_neighbor:int = 1
    optuna_step_for_leaf_size:int = 1

    def __str__(self) -> str:
        lines = ["KNN Search Space =============================:"]
        for name, value in vars(self).items():
            lines.append(f"       - {name}: {value}")
        return "\n".join(lines)


@dataclass
class RF_SearchSpace:
    batch_size_range: List[int]
    num_tree_range: List[int]
    criterion_range: List[str]
    max_depth_range: List[Optional[int]]
    min_sample_split_range: List[Union[int, float]]
    min_sample_leaf_range: List[int]
    max_feature_range: List[Optional[str]]
    max_leaf_node_range: List[Optional[int]]
    bootstrap_range: List[bool]
    min_impurity_decrease_range: List[float]
    ccp_alpha_range: List[float]
    max_sample_range: List[float]
    num_optuna_trials: int

    optuna_step_for_batch_size:int = 32
    optuna_step_for_num_tree:int = 1
    optuna_step_for_max_depth:int = 1
    optuna_step_for_max_leaf_node:int = 1
    optuna_step_for_min_sample_split:float = None
    optuna_step_for_min_smaple_leaf:float = None

    def __str__(self) -> str:
        lines = ["RF Search Space =============================:"]
        for name, value in vars(self).items():
            lines.append(f"       - {name}: {value}")
        return "\n".join(lines)


@dataclass
class XGB_SearchSpace:
    batch_size_range: List[int]
    eta_range: List[float]
    gamma_range: List[float]
    max_depth_range: List[int]
    min_child_weight_range: List[float]
    subsample_range: List[float]
    sampling_method_range: List[str]
    reg_lambda_range: List[float]
    reg_alpha_range: List[float]
    early_stopping_rounds_range: List[int]
    grow_policy_range: List[str]
    max_leaf_range: List[int]
    num_optuna_trials: int

    optuna_step_for_batch_size:int = 32
    optuna_step_for_max_depth:int = 1
    optuna_step_for_max_leaf:int = 1
    
    def __str__(self) -> str:
        lines = ["XGB Search Space =============================:"]
        for name, value in vars(self).items():
            lines.append(f"       - {name}: {value}")
        return "\n".join(lines)


@dataclass
class CAT_SearchSpace:
    batch_size_range: List[int]
    cat_features_range: List[int]
    early_stopping_rounds_range: List[int]
    learning_rate_range: List[float]
    depth_range: List[int]
    l2_leaf_reg_range: List[float]
    random_strength_range: List[float]
    colsample_range: List[float]
    bagging_temperature_range: List[float]
    border_count_range: List[int]
    num_optuna_trials: int

    optuna_step_for_batch_size:int = 32
    optuna_step_for_iteration:int = 5
    optuna_step_for_depth:int = 1
    optuna_step_for_early_stopping_round:int = 1

    def __str__(self) -> str:
        lines = ["CAT Search Space =============================:"]
        for name, value in vars(self).items():
            lines.append(f"       - {name}: {value}")
        return "\n".join(lines)

Define trial information class

In [None]:
class TrialInformation:

    def __init__(self, time_stamp:str, trial_name:str):

        self.time_stamp:str = time_stamp
        self.trial_name:str = trial_name

        self.model_save_path:str = None
        self.hyperparam_save_path:str = None
        self.summary_save_path:str = None
        self.candidate_list_save_path:str = None

    def __str__(self) -> str:
        lines = ["Trial Information =============================:"]        
        for name, value in vars(self).items():
            lines.append(f"       - {name}: {value}")
        return "\n".join(lines)

Define MLP Model

In [None]:
class myMlpModel(nn.Module):
    
    activation_function_dict = {
        'relu': nn.ReLU(),
        'tanh': nn.Tanh(),
        'elu': nn.ELU(),
        'leaky relu': nn.LeakyReLU(),
        'log sigmoid': nn.LogSigmoid(),
        'continuous relu': nn.CELU(),
        'relu 6': nn.ReLU6(),
        'gaussian relu': nn.GELU(),
        'sigmoid': nn.Sigmoid(),
        'sigmoid relu': nn.SiLU()
    }
    
    def __init__(self, 
                max_epoch:int,
                batch_size: int,
                device:torch.device, 
                input_dim:int, 
                n_classes:int, 
                model_depth:int, 
                model_width:int,
                criterion, 
                model_dr:float,
                model_lr:float, 
                l1_weight:float,
                l2_weight:float,
                weight_decay:float,
                activation_function:str,
                # warm_up:int,
                patience:int,
                # moving_average:int,
                random_state:int,
                feature_combination_str: str = None):
        
        super(myMlpModel, self).__init__()

        torch.manual_seed(random_state)
        np.random.seed(random_state)
        random.seed(random_state)

        for name, value in locals().items():
            if name != 'self': 
                setattr(self, name, value)

        self.fc_layers = nn.ModuleList()
        self.batchnorm_layers = nn.ModuleList()
        self.dropout_layers = nn.ModuleList()
        self.activation_layers = nn.ModuleList()

        for i in range(model_depth):
            self.fc_layers.append(nn.Linear(input_dim if i == 0 else model_width, model_width, dtype=torch.float32).to(self.device))
            self.batchnorm_layers.append(nn.BatchNorm1d(model_width).to(self.device))
            self.dropout_layers.append(nn.Dropout(model_dr).to(self.device))
            self.activation_layers.append(self.activation_function_dict[activation_function].to(self.device))
        
        self.optimizer = optim.Adam(self.parameters(), lr=model_lr, weight_decay=weight_decay)

        self.fc_last = nn.Linear(model_width, n_classes, dtype=torch.float32).to(self.device)
        self.softmax = nn.Softmax(dim=1).to(self.device)
        
    def forward(self, x_initial:torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
        x = x_initial.to(self.device).float()
        for i in range(self.model_depth):
            residual = x

            x = self.activation_layers[i](self.fc_layers[i](x))

            if self.training:
                x = self.dropout_layers[i](x)
    
            if (i != 0):
                x = residual + x
            x = self.batchnorm_layers[i](x)

        x = self.fc_last(x)
        x_soft = self.softmax(x)
        return x_soft
    

    def calculate_l1_loss(self):
        l1 = 0.0
        for param in self.parameters():
            l1 += torch.sum(torch.abs(param))
        return self.l1_weight * l1

    def calculate_l2_loss(self):
        l2 = 0.0
        for param in self.parameters():
            l2 += torch.sum(param ** 2)
        return self.l2_weight * l2

    def predict(self, loader, _=None):
        self.eval()
        predictions = []
        with torch.no_grad():
            for features, _ in loader:
                features = features.to(device, dtype=torch.float32)
                outputs = self(features)  
                predicted_class = torch.argmax(outputs, dim=1)
                predictions.append(predicted_class.cpu())
        return torch.cat(predictions).numpy()  

    def predict_proba(self, loader, _=None):
        self.eval()
        probas = []
        with torch.no_grad():
            for features, _ in loader:
                features = features.to(device, dtype=torch.float32)
                outputs = self(features)  
                probas.append(outputs.cpu())
        return torch.cat(probas).numpy()  

    def score(self, loader, _=None):
        
        self.eval()
        correct = 0
        total = 0

        with torch.no_grad():
            for features, targets in loader:
                features = features.to(device, dtype=torch.float32)
                targets = targets.to(device, dtype=torch.long)

                softmax = self(features)

                predicted_class = torch.max(softmax, 1).indices
                correct += (predicted_class == targets).sum().item()
                total += targets.size(0)

        accuracy = correct / total

        return accuracy

    def predict_and_score(self, loader):
        
        self.eval()
        correct = 0
        total = 0
        total_loss = 0

        with torch.no_grad():
            for features, targets in loader:
                features = features.to(device, dtype=torch.float32)
                targets = targets.to(device, dtype=torch.long)

                softmax = self(features)
                loss_nominal = self.criterion(softmax, targets)  
                
                l1_loss = self.calculate_l1_loss()
                l2_loss = self.calculate_l2_loss()

                total_loss += loss_nominal.item()

                predicted_class = torch.max(softmax, 1).indices
                correct += (predicted_class == targets).sum().item()
                total += targets.size(0)

        accuracy = correct / total
        avg_loss = total_loss / len(loader)

        return accuracy, avg_loss

    def train_epoch(self, train_loader, val_loader=None):
        
        self.train()
        correct = 0
        total = 0
        total_loss = 0
        total_loss_nominal = 0

        for batch_idx, (features, targets) in enumerate(train_loader) :

            self.zero_grad()
            self.optimizer.zero_grad()
            features, targets = features.to(device, dtype=torch.float32), targets.to(device, dtype=torch.long)
            
            softmax = self(features)
            loss_nominal = self.criterion(softmax, targets)

            l1_loss = self.calculate_l1_loss()
            l2_loss = self.calculate_l2_loss()

            batch_loss = loss_nominal + l1_loss + l2_loss
            batch_loss.backward()
            self.optimizer.step()
            
            total_loss += batch_loss.item()
            total_loss_nominal += loss_nominal.item()
            
            predicted_class = torch.max(softmax, 1).indices
            correct += (predicted_class == targets).sum().item()
            total += targets.size(0)
        
        train_accuracy = correct / total
        train_loss = total_loss / len(train_loader)
        train_loss_nominal = total_loss_nominal / len(train_loader)

        val_accuracy, val_loss = None, None
        if val_loader is not None:
            val_accuracy, val_loss = self.predict_and_score(val_loader)

        return train_accuracy, val_accuracy, val_loss
    
    def fit(self, train_loader, val_loader=None):

        best_val_loss = float('inf')
        patience_counter = 0
        best_model_state = None

        for epoch in range(self.max_epoch):
            train_accuracy, val_accuracy, val_loss = self.train_epoch(train_loader, val_loader)
            if val_loader is not None: 
                if val_loss < best_val_loss:
                    best_val_loss = val_loss
                    best_model_state = self.state_dict()  
                    best_epoch = epoch + 1
                    patience_counter = 0 
                else:
                    patience_counter += 1
                    if patience_counter >= self.patience:
                        break
            else:
                best_model_state = self.state_dict()
                best_epoch = epoch + 1
        
        self.load_state_dict(best_model_state)
        self.best_iteration = best_epoch

        return self

Get model ROC and AUC

In [None]:
def get_model_AUC_and_ROC(model:nn.Module, validate_dataloader:DataLoader):
    model.eval()
    with torch.no_grad():
        Y_pred = []
        Y_true = []
        for batch_idx, (features, targets) in enumerate(validate_dataloader) :
            features, targets = features.to(device, dtype=torch.float32), targets.to(device, dtype=torch.long)
            softmax = model(features)
            Y_pred.extend(softmax.detach().cpu().numpy()[:, 1])
            Y_true.extend(targets.detach().cpu().numpy())
            predicted_class = torch.max(softmax, 1).indices
            fpr, tpr, thresholds = roc_curve(Y_true, Y_pred) 
            roc_auc = auc(fpr, tpr)

            plt.figure(figsize=(4, 4))
            plt.plot(fpr, tpr, color='darkorange', lw=2, label='ROC curve (AUC = %0.4f)' % roc_auc)
            plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
            plt.xlim([0.0, 1.0])
            plt.ylim([0.0, 1.05])
            plt.xlabel('False Positive Rate')
            plt.ylabel('True Positive Rate')
            plt.title('Receiver Operating Characteristic (ROC) Curve')
            plt.legend(loc="lower right")
            plt.show()

Defining pipeline class

In [None]:
class Pipeline:
    
    def __init__(self) -> None:
        
        self.modality:Modality = None
        self.data:Data = None
        self.model_configs: list[ModelConfig] = []
        self._registry = {
            "MLP": MLP_SearchSpace,
            "TAB": TAB_SearchSpace,
            "SVM": SVM_SearchSpace,
            "RF": RF_SearchSpace,
            "KNN": KNN_SearchSpace,
            "XGB": XGB_SearchSpace,
            "CAT": CAT_SearchSpace
        }
        self.trial_info:TrialInformation = None
        self.n_base_models:list[int] = None

    def add_model_config(self, model_type:str, ensemble_group:str=None, name:str=None, **kwargs) -> None:
        if model_type not in self._registry:
            raise ValueError(f"Unsupported model type: {model_type}")

        SearchSpaceClass = self._registry[model_type]
        search_space = SearchSpaceClass(**kwargs)

        config = ModelConfig(
            model_type=model_type,
            search_space=search_space,
            ensemble_group=ensemble_group,
            name=name
        )
        self.model_configs.append(config)

    def get_configs_by_group(self, group_name:str) -> list[ModelConfig]:
        return [config for config in self.model_configs if config.ensemble_group == group_name]

    def list_models(self) -> None:
        for cfg in self.model_configs:
            print(f"Model: {cfg.name}, Type: {cfg.model_type}, Group: {cfg.ensemble_group}")

    def set_n_base_model_range(self, range:list[int]) -> None:
        self.n_base_models = range

    def set_Modality(self, 
        mode:str, 
        model_choice:str,
        metric:str,
        is_developer_test_mode:bool=False, 
        uses_K_Fold:bool=True, 
        uses_existing_model:bool=False, 
        saves_plot:bool=False,
        is_verbose:bool=False,
        standardizes_input:bool=True,
        uses_early_stopping:bool=True,
        bagging_strategy:str='None',
        random_state:int=42) -> None:
        
        self.modality = Modality(
            mode=mode, 
            model_choice=model_choice,
            metric=metric,
            is_developer_test_mode=is_developer_test_mode, 
            uses_K_Fold=uses_K_Fold, 
            uses_existing_model=uses_existing_model, 
            saves_plot=saves_plot, 
            is_verbose=is_verbose,
            standardizes_input=standardizes_input,
            uses_early_stopping=uses_early_stopping,
            bagging_strategy=bagging_strategy,
            random_state=random_state)

    def set_Data(self, 
        raw_dataset_name:str='tibial_slope', 
        train_ratio:float=0.9, 
        n_classes:int=2, 
        columns_to_keep:list[int]=[], 
        data_split_random_seed_modfier:int=0) -> None:
        
        self.data = Data(
            raw_dataset_name=raw_dataset_name, 
            train_ratio=train_ratio, 
            n_classes=n_classes, 
            columns_to_keep=columns_to_keep,
            data_split_random_seed_modfier=data_split_random_seed_modfier)

    def set_TrialInformation(self) -> None:
        
        timestamp = datetime.datetime.now().strftime('%Y_%m_%d__%H_%M_%S')
        trial_name = f"{self.modality.model_choice}_{self.modality.metric}_{self.data.columns_remaining}"
        self.trial_info = TrialInformation(timestamp, trial_name)

        current_dir = os.getcwd()
        trial_base_name = f'{self.trial_info.trial_name}'

        if self.modality.bagging_strategy == 'None':
            trial_sub_folder_upper = f'{self.modality.model_choice}'
        elif self.modality.bagging_strategy == 'single':
            trial_sub_folder_upper = f'{self.modality.model_choice}_ensemble'
        elif self.modality.bagging_strategy == 'diverse': 
            trial_sub_folder_upper = f'ensemble_diverse'

        trial_sub_folder = f'{trial_sub_folder_upper}/{self.data.columns_remaining}/{self.modality.metric}/seed_{self.modality.random_state+self.data.data_split_random_seed_modfier}/{self.trial_info.time_stamp}'

        summary_save_directory = f'../Model Evaluation/{self.data.raw_dataset_name}/{trial_sub_folder}'
        os.makedirs(summary_save_directory, exist_ok=True) if self.modality.mode == 'train' else None
        model_save_path = os.path.join(summary_save_directory, f'{trial_base_name}.pth')

        hyperparam_save_directory = f'../Hyperparameter tuning/Optuna/{self.data.raw_dataset_name}/{trial_sub_folder}'
        os.makedirs(hyperparam_save_directory, exist_ok=True) if self.modality.mode == 'optuna' else None
        hyperparam_save_path = os.path.join(hyperparam_save_directory, f'{trial_base_name}.txt')

        summary_save_directory = f'../Model Evaluation/Test Performance Summary/{self.data.raw_dataset_name}/{self.modality.metric}/seed_{self.modality.random_state+self.data.data_split_random_seed_modfier}'
        os.makedirs(summary_save_directory, exist_ok=True) if self.modality.mode == 'eval' else None
        summary_save_path = os.path.join(summary_save_directory, f'metric_summary_{self.modality.metric}_{self.data.raw_dataset_name}.csv')
        candidate_list_save_path = os.path.join(summary_save_directory, f'candidate_list_{self.modality.metric}_{self.data.raw_dataset_name}.csv')

        self.trial_info.model_save_path = model_save_path
        self.trial_info.hyperparam_save_path = hyperparam_save_path
        self.trial_info.summary_save_path = summary_save_path
        self.trial_info.candidate_list_save_path = candidate_list_save_path

    def load_dataset(self) -> None:

        current_dir = os.getcwd()

        dataset_dir = os.path.join(os.path.dirname(current_dir), "Datasets")
        os.chdir(dataset_dir)

        files = [f for f in os.listdir('.') if f.startswith(self.data.raw_dataset_name)]
        if not files:
            raise ValueError(f"No files found in the current directory starting with '{self.data.raw_dataset_name}'.")
        
        file = files[0]
        self.data.dataset_name = file
        try:
            if file.endswith('.csv'):
                self.data.df = pd.read_csv(file, header=0)
            elif file.endswith('.arff'):
                with open(file, 'r') as f:
                    data, meta = arff.loadarff(f)
                    data_list = [dict(zip(meta.names(), row)) for row in data]
                    for row in data_list:
                        for col, value in row.items():
                            if isinstance(value, bytes):
                                row[col] = value.decode('utf-8')
                    self.data.df = pd.DataFrame(data_list)
            else:
                with open(file, 'r') as f:
                    data, meta = arff.loadarff(f)
                    data_list = [dict(zip(meta.names(), row)) for row in data]
                    for row in data_list:
                        for col, value in row.items():
                            if isinstance(value, bytes):
                                row[col] = value.decode('utf-8')
                    self.data.df = pd.DataFrame(data_list)
        except Exception as e:
            raise ValueError(f"Unable to load the dataset file: {e}")
        
        if self.modality.is_verbose:
            corr_matrix = pd.DataFrame(self.data.df[self.data.df.columns.tolist()[1 :]], columns=self.data.df.columns.tolist()[1 :]).corr()
            sns.heatmap(corr_matrix, cmap='coolwarm', center=0, annot=True, fmt='.2f')
            plt.show()

        self.data.columns_all = list(range(1, len(self.data.df.columns) - 1))
        os.chdir(current_dir)

    def modify_dataset(self) -> None:

        columns_to_discard = list(set(self.data.columns_all) - set(self.data.columns_to_keep))
        self.data.columns_remaining = list(set(self.data.columns_all) - set(columns_to_discard))
        self.data.columns_remaining_str = combine_list_elements(self.data.columns_remaining)

    def split_dataset(self) -> None:

        # the replace() invoked triggers an additional datatype evaluation and does not do anything else
        self.data.X = self.data.df.iloc[:, 0:-1].iloc[:, self.data.columns_remaining].replace(replacement_rules_feature).astype(float).values 
        self.data.Y = self.data.df.iloc[:, -1].replace(replacement_rules_label).values
        self.data.X_train_val, self.data.X_test, self.data.Y_train_val, self.data.Y_test = train_test_split(self.data.X, self.data.Y, test_size=0.15, random_state=self.modality.random_state+self.data.data_split_random_seed_modfier, stratify=self.data.Y, shuffle=True)
        
    def kfold_loaders(self, model_type: str = None) -> Generator[tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray], None, None]:

        def replace_func(X):

            vectorized_replace = np.vectorize(lambda x: replacement_rules_feature.get(x, 0))
            return vectorized_replace(X).astype(np.float32).reshape(-1, 1)

        def standardize(X_train, X_other):
            
            X_train_main, X_train_last = X_train[:, :-1], X_train[:, -1:]
            X_other_main, X_other_last = X_other[:, :-1], X_other[:, -1:]

            if model_type not in ['CAT', 'TAB', 'TABold']:

                X_train_main = np.hstack((X_train_main, X_train_last))
                X_other_main = np.hstack((X_other_main, X_other_last))

            means = np.mean(X_train_main, axis=0)
            stds = np.std(X_train_main, axis=0)

            X_train_standardized_portion = (X_train_main - means) / stds
            X_other_standardized_portion = (X_other_main - means) / stds

            if model_type in ['CAT', 'TAB', 'TABold']:

                X_train = np.hstack((X_train_standardized_portion, X_train_last))
                X_other = np.hstack((X_other_standardized_portion, X_other_last))

            else:

                X_train = X_train_standardized_portion
                X_other = X_other_standardized_portion

            return X_train, X_other
        
        if self.modality.mode in ['train', 'eval', 'feature', 'deploy']:
            X_train = self.data.X_train_val
            X_test = self.data.X_test
            Y_train = self.data.Y_train_val
            Y_test = self.data.Y_test

            if self.modality.mode in ['train', 'eval', 'feature']:
                X_train, X_test = standardize(X_train, X_test)
                yield X_train, Y_train, X_test, Y_test
            elif self.modality.mode in ['deploy']:
                X_full = np.concatenate((X_train, X_val), dim=0)
                Y_full = np.concatenate((Y_train, Y_val), dim=0)
                yield None, None, X_full, Y_full 

        else:
            X = self.data.X_train_val
            Y = self.data.Y_train_val
            skf = StratifiedKFold(n_splits=self.data.k_folds, shuffle=True, random_state=self.modality.random_state)

            for train_index, test_index in skf.split(X, Y):
                X_train = X[train_index]
                X_val = X[test_index]
                Y_train = Y[train_index]
                Y_val = Y[test_index]

                X_train, X_val = standardize(X_train, X_val)

                yield X_train, Y_train, X_val, Y_val
            
    def fit_and_compute_metrics(self, models, model_type, X_train, Y_train, X_val, Y_val, metrics, **kwargs):

        Y_pred_train = []
        Y_pred_val = []
        Y_pred_val_proba = []
        Y_pred_train_proba = []

        val_loader = numpy_2_dataloader(X=X_val,  Y=Y_val, batch_size=128)

        for base_model_idx, base_model in enumerate(models):
            
            if self.modality.bagging_strategy == 'None':
                X_train_bagged, Y_train_bagged = (X_train, Y_train)
            elif self.modality.bagging_strategy == 'single':
                X_train_bagged, Y_train_bagged = sklearn.utils.resample(X_train, Y_train, replace=True, n_samples=len(X_train), random_state=self.modality.random_state+base_model_idx)

            # print(f'X_train_bagged: {X_train_bagged}')
            train_loader = numpy_2_dataloader(X=X_train_bagged,  Y=Y_train_bagged, batch_size=128)

            if model_type == 'MLP':
                # print(train_loader[0].shape)
                X_train_bagged = train_loader
                X_val = val_loader
                val_loader_early_stopping = val_loader

            if model_type in ['TAB', 'TABold']:
                patience = 25 if self.modality.mode in ['optuna', 'check'] else 0
                eval_set = [(X_train, Y_train), (X_val, Y_val)]
                # print(f'train:\n{X_train[0:5,:]}\n\n val:\n{X_val[0:5,:]}')
                eval_name = ['train', 'test']

            if model_type == 'CAT':

                cat_features = [self.data.columns_remaining.index(5)] if '5' in self.data.columns_remaining_str else []

                if cat_features:  # If there are categorical features
                    cat_idx = cat_features[0]  # Index of the categorical column (e.g., '5')
                    # Ensure categorical column is str for CatBoost
                    X_train_bagged = X_train_bagged.copy().astype(object)  # Avoid modifying original array
                    X_val = X_val.copy().astype(object)
                    X_train_bagged[:, cat_idx] = X_train_bagged[:, cat_idx].astype(np.int64)
                    X_val[:, cat_idx] = X_val[:, cat_idx].astype(np.int64)
        
                train_pool = Pool(X_train_bagged, Y_train_bagged, cat_features=cat_features)
                val_pool = Pool(X_val, Y_val, cat_features=cat_features)
            
            if model_type == 'XGB':
                eval_set = [(X_val, Y_val)]

            if self.modality.mode == 'train':
                val_loader_early_stopping = None
                val_pool = None
                eval_set = None
                eval_name = None

            if self.modality.mode not in ['eval', 'deploy', 'eval_val']:
                if model_type in ['MLP']:
                    base_model.fit(train_loader, val_loader_early_stopping)
                elif model_type in ['TAB', 'TABold']:
                    base_model.fit(X_train=X_train, 
                                y_train=Y_train, 
                                eval_set=eval_set, 
                                eval_name=eval_name, 
                                eval_metric=['accuracy', 'logloss'], 
                                max_epochs=kwargs.get('max_epoch', 200), 
                                patience=patience, 
                                batch_size=X_train.shape[0]) 
                elif model_type in ['XGB']:
                    base_model.fit(X_train_bagged, Y_train_bagged, eval_set=eval_set, verbose=False)
                elif model_type in ['CAT']:
                    base_model.fit(train_pool, eval_set=val_pool)
                elif model_type in ['SVM', 'RF', 'KNN']:
                    base_model.fit(X_train_bagged, Y_train_bagged)

            Y_pred_train.append(base_model.predict(X_train_bagged))
            Y_pred_train_proba.append(base_model.predict_proba(X_train_bagged)[:, 1])
            Y_pred_val.append(base_model.predict(X_val))
            Y_pred_val_proba.append(base_model.predict_proba(X_val)[:, 1])

        Y_pred_train = majority_vote(Y_pred_train)
        Y_pred_train_proba = np.mean(Y_pred_train_proba, axis=0)
        Y_pred_val = majority_vote(Y_pred_val)
        Y_pred_val_proba = np.mean(Y_pred_val_proba, axis=0)

        # print(f"Y_pred_train: {Y_pred_train}")
        metrics['train_accuracy'].append(accuracy_score(Y_train_bagged, Y_pred_train))
        metrics['train_f1'].append(f1_score(Y_train_bagged, Y_pred_train))
        metrics['train_precision'].append(precision_score(Y_train_bagged, Y_pred_train, zero_division=0))
        metrics['train_recall'].append(recall_score(Y_train_bagged, Y_pred_train))
        metrics['val_auc'].append(roc_auc_score(Y_train, Y_pred_train_proba))

        metrics['val_accuracy'].append(accuracy_score(Y_val, Y_pred_val))
        metrics['val_f1'].append(f1_score(Y_val, Y_pred_val))
        metrics['val_fb'].append(fbeta_score(Y_val, Y_pred_val, beta=2))
        metrics['val_precision'].append(precision_score(Y_val, Y_pred_val, zero_division=0))
        metrics['val_recall'].append(recall_score(Y_val, Y_pred_val))
        metrics['val_specificity'].append(specificity_score(Y_val, Y_pred_val))
        metrics['val_auc'].append(roc_auc_score(Y_val, Y_pred_val_proba))
        if self.modality.mode not in ['train', 'eval', 'feature', 'deploy', 'eval_val']:
            if model_type in ['MLP', 'XGB']:
                metrics['stopping_epoch'].append(base_model.best_iteration)
            elif model_type in ['TAB', 'TABold']:
                best_epoch = np.argmax(base_model.history['test_logloss'])
                metrics['stopping_epoch'].append(best_epoch)
            elif model_type == 'CAT':
                metrics['stopping_epoch'].append(base_model.best_iteration_)
            else:
                pass

        return models

    def average_metrics_across_folds(self, metrics):
        return {key: statistics.mean(values) for key, values in metrics.items() if values}

    def train_and_metric_module(self,
        batch_size: int,
        model_type: str,
        feature_combination_str: str = None,
        **kwargs) -> ModuleResults:

        metrics = {
            'train_accuracy': [], 'val_accuracy': [],
            'train_f1': [], 'val_f1': [],
            'val_fb': [],
            'train_precision': [], 'val_precision': [],
            'train_recall': [], 'val_recall': [],
            'val_specificity': [],
            'val_auc': [],
            'stopping_epoch':[]
        }

        extra_fitting_params = {}

        if model_type =='MLP':
            model = myMlpModel(
                max_epoch=kwargs.get('max_epoch'),
                batch_size=kwargs.get('batch_size'),
                device=kwargs.get('device'),
                input_dim=kwargs.get('input_dim'),
                n_classes=kwargs.get('n_classes'),
                model_depth=kwargs.get('model_depth'),
                model_width=kwargs.get('model_width'),
                criterion=kwargs.get('criterion'),
                model_dr=kwargs.get('model_dr'),
                model_lr=kwargs.get('model_lr'),
                l1_weight=kwargs.get('l1_weight'),
                l2_weight=kwargs.get('l2_weight'),
                weight_decay=kwargs.get('weight_decay'),
                activation_function=kwargs.get('activation_function'),
                patience=kwargs.get('patience'),
                random_state=self.modality.random_state
            )
        elif model_type in ['TAB', 'TABold']:
            # print(f'{kwargs.get('cat_idxs')}, {kwargs.get('cat_dims')}')

            model = TabNetClassifier(
                n_d = kwargs.get('width_prediction_and_attention'),
                n_a = kwargs.get('width_prediction_and_attention'),
                n_steps = kwargs.get('n_steps'),
                gamma = kwargs.get('gamma'),
                cat_idxs = kwargs.get('cat_idxs'),
                cat_dims = kwargs.get('cat_dims'),
                n_independent = kwargs.get('n_independent'),
                n_shared = kwargs.get('n_shared'),
                momentum = kwargs.get('momentum'),
                lambda_sparse = kwargs.get('lambda_sparse'),
                mask_type = kwargs.get('mask_type'),
                seed=self.modality.random_state,
                verbose=0,
                device_name=device
            )
        elif model_type == "SVM":
            model = SVC(
                probability=True,
                C=kwargs.get('c'),
                kernel=kwargs.get('kernel'),
                gamma=kwargs.get('gamma'),
                tol=kwargs.get('tol'),
                max_iter=kwargs.get('max_iter'),
                degree=kwargs.get('degree'),
                random_state=self.modality.random_state
            )
        elif model_type == "KNN": #* no random state required as it is deterministic
            model = KNeighborsClassifier(
                n_neighbors=kwargs.get('n_neighbors'),
                weights=kwargs.get('weights'),
                algorithm=kwargs.get('algorithm'),
                leaf_size=kwargs.get('leaf_size'),
                p=kwargs.get('p'),
                metric=kwargs.get('metric'),
            )
        elif model_type == "RF":
            model = RandomForestClassifier(
                n_estimators=kwargs.get('num_tree'),
                criterion=kwargs.get('criterion'),
                max_depth=kwargs.get('max_depth'),
                min_samples_split=kwargs.get('min_sample_split'),
                min_samples_leaf=kwargs.get('min_sample_leaf'),
                max_features=kwargs.get('max_feature'),
                max_leaf_nodes=kwargs.get('max_leaf_node'),
                bootstrap=kwargs.get('bootstrap'),
                min_impurity_decrease=kwargs.get('min_impurity_decrease'),
                ccp_alpha=kwargs.get('ccp_alpha'),
                max_samples=kwargs.get('max_samples'),
                random_state=self.modality.random_state
            )
        elif model_type == "XGB":
            model = XGBClassifier(
                n_estimators=kwargs.get('n_estimators'),
                eta=kwargs.get('eta'),
                gamma=kwargs.get('gamma'),
                max_depth=kwargs.get('max_depth'),
                min_child_weight=kwargs.get('min_child_weight'),
                subsample=kwargs.get('subsample'),
                sampling_method=kwargs.get('sampling_method'),
                reg_lambda=kwargs.get('reg_lambda'),
                reg_alpha=kwargs.get('reg_alpha'),
                grow_policy=kwargs.get('grow_policy'),
                max_leaves=kwargs.get('max_leaves'),
                early_stopping_rounds=kwargs.get('early_stopping_rounds'),
                verbosity=0,
                random_state=self.modality.random_state
            )
        elif model_type == "CAT":
            print(f'Number of iterations set: {kwargs.get('iterations')}')
            model = CatBoostClassifier(
                iterations=kwargs.get('iterations'),
                learning_rate=kwargs.get('learning_rate'),
                depth=kwargs.get('depth'),
                l2_leaf_reg=kwargs.get('l2_leaf_reg'),
                early_stopping_rounds=kwargs.get('early_stopping_rounds'),
                random_strength=kwargs.get('random_strength'),
                colsample_bylevel=kwargs.get('colsample_bylevel'),
                bagging_temperature=kwargs.get('bagging_temperature'),
                border_count=kwargs.get('border_count'),
                cat_features=kwargs.get('cat_features'),
                eval_metric='AUC',
                verbose=False,
                random_seed=self.modality.random_state
            )
        else:
            raise ValueError("model_type must be 'RF', 'KNN', 'XGB', or 'SVM'")
        
        n_base_models = kwargs.get('n_base_models')

        for fold, (X_train, Y_train, X_val, Y_val) in enumerate(self.kfold_loaders(model_type=model_type)):
            models = []
            for base_model_idx in range(n_base_models):
                base_model = copy.deepcopy(model)
                models.append(base_model)
            models = self.fit_and_compute_metrics(models, model_type, X_train, Y_train, X_val, Y_val, metrics, **extra_fitting_params)

        average_metrics = self.average_metrics_across_folds(metrics)

        module_results = ModuleResults(
            trained_models=models,
            **average_metrics
        )

        return module_results
    
    def check_folder_structure(self, check_target:str):
        dataset_len = 14
        model_len = 3
        feature_combination_len = 15
        trial_len = 26
        file_len = 29
        
        dataset = self.data.raw_dataset_name

        dataset_folder = Path(f'{check_target}/{dataset}')

        model_class_folders = [f for f in dataset_folder.iterdir() if f.is_dir()]
        if len(model_class_folders) != 7:
            print(f'Error: Folder {dataset} contains {len(model_class_folders)} subfolders, expected 7')
            return False
        
        for model_class_folder in model_class_folders:

            if model_class_folder.name in ['CAT', 'TAB']:
                continue

            feature_combination_folders = [f for f in model_class_folder.iterdir() if f.is_dir()]
        
            if model_class_folder.name not in ['TAB', 'TABold'] and len(feature_combination_folders) != 31:
                print(f'Error: Folder {model_class_folder} contains {len(feature_combination_folders)} subfolders, expected 31')
                return False
            elif model_class_folder.name in ['TAB', 'TABold'] and len(feature_combination_folders) != 1:
                print(f'Error: Folder {model_class_folder} contains {len(feature_combination_folders)} subfolders, expected 1')
                return False
            
            for feature_combination_folder in feature_combination_folders:
                metric_folders = [f for f in feature_combination_folder.iterdir() if f.is_dir()]

                for metric_folder in metric_folders:
                    
                    if metric_folder.name == self.modality.metric:
                        seed_folders = [f for f in metric_folder.iterdir() if f.is_dir()]

                        for seed_folder in seed_folders:
                            
                            if seed_folder.name != f'seed_{self.modality.random_state+self.data.data_split_random_seed_modfier}':
                                continue

                            trial_folders = [f for f in seed_folder.iterdir() if f.is_dir()]
                            
                            if len(trial_folders) != 1:
                                print(f'Error: Folder {seed_folder} contains {len(trial_folders)} folders, expected 1')
                                return False

                            for trial_folder in trial_folders:
                                
                                files = [f for f in trial_folder.iterdir() if f.is_file()]
                                
                                if len(files) != 1:
                                    print(f"Error: Folder {feature_combination_folder} contains {len(files)} .txt files, expected 1")
                                    return False

                                # for file in files:
                                #     print(f'{dataset:<{dataset_len}} | {model_class_folder.name:<{model_len}} | {feature_combination_folder.name:<{feature_combination_len}} | {trial_folder.name:<{trila_len}} | {file.name:<{file_len}}')
                                
                                if trial_folder.name.startswith('final_'): 
                                    new_name = trial_folder.parent / f"final_{trial_folder.name}".strip('final_')
                                    try:
                                        trial_folder.rename(new_name)
                                    except FileExistsError:
                                        print(f"Error: Cannot rename {trial_folder} to {new_name} because the target file already exists")
                                        return False
                                    except PermissionError:
                                        print(f"Error: Permission denied when renaming {trial_folder} to {new_name}")
                                        return False

        return True

    def finalize_trials(self, check_target:str):

        models_stored = []

        dataset_folder = Path(f'{check_target}/{self.data.raw_dataset_name}')
        model_class_folders = [f for f in dataset_folder.iterdir() if f.is_dir()]

        for model_class_folder in model_class_folders:
            
            if model_class_folder.name in ['CAT', 'TAB']:
                continue
            
            feature_combination_folders = [f for f in model_class_folder.iterdir() if f.is_dir()]
        
            for feature_combination_folder in feature_combination_folders:
                metric_folders = [f for f in feature_combination_folder.iterdir() if f.is_dir()]
                
                for metric_folder in metric_folders:
                    if metric_folder.name == self.modality.metric:

                        seed_folders = [f for f in metric_folder.iterdir() if f.is_dir()]

                        for seed_folder in seed_folders:
                            
                            if seed_folder.name != f'seed_{self.modality.random_state+self.data.data_split_random_seed_modfier}':
                                continue

                            trial_folders = [f for f in seed_folder.iterdir() if f.is_dir()]
                            for trial_folder in trial_folders:
                                if not trial_folder.name.startswith('final_'): 
                                    new_name = trial_folder.parent / f"final_{trial_folder.name}"
                                    try:
                                        trial_folder.rename(new_name)
                                    except FileExistsError:
                                        print(f"Error: Cannot rename {trial_folder} to {new_name} because the target file already exists")
                                    except PermissionError:
                                        print(f"Error: Permission denied when renaming {trial_folder} to {new_name}")
                                    trial_folder = new_name
                                # print(trial_folder)
                                files = [f for f in trial_folder.iterdir() if f.is_file()]
                                model_and_info = {
                                    'dataset': self.data.raw_dataset_name,
                                    'model class': model_class_folder.name,
                                    'feature combination': feature_combination_folder.name,
                                    'model': files[0]
                                }
                                models_stored.append(model_and_info)

        return models_stored if check_target == '../Model Evaluation' else []
    
    def scan_best_accuracy(self, root_path, model_type, feature_combination):
        pattern = rf"Best value \({self.modality.metric}\):\s*([0-1]?\.\d+)"
        dataset = self.data.raw_dataset_name
        dataset_folder = Path(f'{root_path}/{dataset}')
        model_class_folders = [f for f in dataset_folder.iterdir() if f.is_dir()]

        for model_class_folder in model_class_folders:
            feature_combination_folders = [f for f in model_class_folder.iterdir() if f.is_dir()]
        
            for feature_combination_folder in feature_combination_folders:
                metric_folders = [f for f in feature_combination_folder.iterdir() if f.is_dir()]
                
                for metric_folder in metric_folders:
                    if metric_folder.name == self.modality.metric:
                        seed_folders = [f for f in metric_folder.iterdir() if f.is_dir()]

                        for seed_folder in seed_folders:
                            
                            if seed_folder.name != f'seed_{self.modality.random_state+self.data.data_split_random_seed_modfier}':
                                continue

                            trial_folders = [f for f in seed_folder.iterdir() if f.is_dir()]

                            for trial_folder in trial_folders:
                                files = [f for f in trial_folder.iterdir() if f.is_file()]

                                if model_class_folder.name == model_type and feature_combination_folder.name == feature_combination:
                                    try:
                                        with open(files[0], 'r', encoding='utf-8') as file:
                                            content = file.read()
                                            # Search for the float value after 'Best value (Acc):'
                                            match = re.search(pattern, content)
                                            if match:
                                                accuracy = float(match.group(1))
                                                results = accuracy
                                            else:
                                                results = None
                                    except Exception as e:
                                        print(files)
                                        results = f"Error reading file: {str(e)}"
                                        print(f"Error in file {files[0]}: {str(e)}")
                                    return results
        return float('-inf')

    def load_models_by_type(self, models_to_load): #todo: maybe make device a pipelin attribute as well?
        
        for model_to_load in models_to_load:
        
            print(f'model_to_load: {model_to_load}')

            self.data.columns_to_keep = string_to_list(model_to_load['feature combination'])
            self.load_dataset()
            self.modify_dataset()
            self.split_dataset()
            self.set_TrialInformation()

            if model_to_load['model class'] == 'MLP':
                checkpoint = torch.load(model_to_load['model'], map_location=device)
                base_model = myMlpModel(
                    max_epoch=checkpoint['max_epoch'],
                    batch_size=128,
                    device=torch.device(device),
                    input_dim=checkpoint['input_dim'],
                    n_classes=checkpoint['n_classes'],
                    model_depth=checkpoint['model_depth'],
                    model_width=checkpoint['model_width'],
                    model_dr=checkpoint['dropout_rate'],
                    criterion=loss_function_dict['cross_entropy'] if checkpoint['criterion'] == 'cross_entropy' else checkpoint['criterion'],          # improve this
                    model_lr=checkpoint['model_lr'],
                    l1_weight=checkpoint['l1_weight'],
                    l2_weight=checkpoint['l2_weight'],
                    weight_decay=checkpoint['weight_decay'],
                    patience=checkpoint.get('patience'),
                    activation_function=checkpoint['activation_function'],
                    random_state=self.modality.random_state
                )
                try:
                    base_model.load_state_dict(checkpoint['model_state_dict'])
                    base_model.to(device)
                    base_model.eval()
                except Exception as e:
                    print(f"Error loading MLP model {model_to_load['model']}: {e}")
            elif model_to_load['model class'] in ['TAB', 'TABold']:
                try: 
                    categorical_idxs = [self.data.columns_remaining.index(5)] if '5' in self.data.columns_remaining_str else []
                    categorical_dims = [2] if categorical_idxs else []
                    base_model = TabNetClassifier(cat_idxs=categorical_idxs, cat_dims=categorical_dims)
                    base_model.load_model(model_to_load['model'])
                    base_model.network.eval()
                except Exception as e:
                    print(f"Error loading TabNet model {model_to_load['model']}: {e}")
            elif model_to_load['model class'] in ['SVM', 'KNN', 'RF']:
                try:
                    base_model = joblib.load(model_to_load['model'])
                except Exception as e:
                    print(f"Error loading sklearn model {model_to_load['model']}: {e}")
            elif model_to_load['model class'] == 'XGB':
                try:
                    base_model = xgb.XGBClassifier()
                    base_model.load_model(model_to_load['model'])
                except Exception as e:
                    print(f"Error loading XGB model {model_to_load['model']}: {e}")
            elif model_to_load['model class'] == 'CAT':
                try:
                    base_model = CatBoostClassifier()
                    base_model.load_model(model_to_load['model'], format='cbm')
                except Exception as e:
                    print(f"Error loading CAT model {model_to_load['model']}: {e}")
    
            for fold, (X_train, Y_train, X_val, Y_val) in enumerate(self.kfold_loaders(model_type=model_to_load['model class'])):
                
                metrics = {
                    'train_accuracy': [], 'val_accuracy': [],
                    'train_f1': [], 'val_f1': [],
                    'val_fb': [],
                    'train_precision': [], 'val_precision': [],
                    'train_recall': [], 'val_recall': [],
                    'val_specificity': [],
                    'val_auc': [],
                    'stopping_epoch':[]
                }
                self.fit_and_compute_metrics(models=[base_model], model_type=model_to_load['model class'], X_train=X_train, Y_train=Y_train, X_val=X_val, Y_val=Y_val, metrics=metrics)
            
            average_metrics = self.average_metrics_across_folds(metrics)
            module_results = ModuleResults(
                trained_models=None,
                **average_metrics
            )

            if self.modality.mode in ['eval_val', 'eval']:

                set_name = 'Test' if self.modality.mode == 'eval' else 'Val' if self.modality.mode == 'eval_val' else None
                
                model_to_load[f'Val {self.modality.metric}'] = self.scan_best_accuracy('../Hyperparameter tuning/Optuna', model_to_load['model class'], model_to_load['feature combination'])
                model_to_load[f'{set_name} Acc'] = average_metrics['val_accuracy']
                model_to_load[f'{set_name} F1'] = average_metrics['val_f1']
                model_to_load[f'{set_name} F2'] = average_metrics['val_fb']
                model_to_load[f'{set_name} AUC'] = average_metrics['val_auc']

            elif self.modality.mode == 'deploy':

                model_to_load[f'{self.data.raw_dataset_name} Acc'] = average_metrics['val_accuracy']
                model_to_load[f'{self.data.raw_dataset_name} F1'] = average_metrics['val_f1']
                model_to_load[f'{self.data.raw_dataset_name} F2'] = average_metrics['val_fb']
                model_to_load[f'{self.data.raw_dataset_name} AUC'] = average_metrics['val_auc']

        return models_to_load

Defining Optuna Objective function

In [None]:
def objective(trial, pipeline:Pipeline, model_config_idx:int):
    
    model_config = pipeline.model_configs[model_config_idx]
    search_space = model_config.search_space
    model_type = model_config.model_type

    param_space = {}
    if model_type == "MLP":
        param_space = {
            'batch_size': {'type': 'int', 'range': search_space.batch_size_range, 'step': search_space.optuna_step_for_batch_size},
            'model_lr': {'type': 'float', 'range': search_space.model_lr_range, 'log': True},
            'model_dr': {'type': 'float', 'range': search_space.model_dr_range, 'log': False},
            'l1_weight': {'type': 'float', 'range': search_space.l1_weight_range, 'log': True},
            'l2_weight': {'type': 'float', 'range': search_space.l2_weight_range, 'log': True},
            'model_depth': {'type': 'int', 'range': search_space.model_depth_range, 'step': search_space.optuna_step_for_model_depth},
            'model_width': {'type': 'int', 'range': search_space.model_width_range, 'step': search_space.optuna_step_for_model_width},
            'activation_function': {'type': 'categorical', 'range': search_space.activation_function_range},
            'weight_decay': {'type': 'float', 'range': search_space.weight_decay_range, 'log': False},
            'criterion': {'type': 'categorical', 'range': search_space.criterion_range},
            'patience': {'type': 'int', 'range': search_space.patience_range, 'step': search_space.optuna_step_for_patience},
            'max_epoch': {'type': 'categorical', 'range': [search_space.max_epoch_range]},
            'device': {'type': 'categorical', 'range': [device]}, #todo: check if needed
            'input_dim': {'type': 'categorical', 'range': [pipeline.data.X.shape[1]]},
            'n_classes': {'type': 'categorical', 'range': [pipeline.data.n_classes]},
        }
    elif model_type == "TAB":
        param_space = {
            'batch_size': {'type': 'int', 'range': search_space.batch_size_range, 'step': search_space.optuna_step_for_batch_size},
            'width_prediction_and_attention': {'type': 'int', 'range': search_space.width_prediction_and_attention_range},
            'n_steps': {'type': 'int', 'range': search_space.n_step_range},
            'gamma': {'type': 'float', 'range': search_space.gamma_range, 'log': False},
            'n_independent': {'type': 'int', 'range': search_space.n_independent_range},
            'n_shared': {'type': 'int', 'range': search_space.n_shared_range},
            'momentum': {'type': 'float', 'range': search_space.momentum_range, 'log': True},
            'lambda_sparse': {'type': 'float', 'range': search_space.lambda_sparse_range, 'log': True},
            'mask_type': {'type': 'categorical', 'range': search_space.mask_type_range},
        }
    elif model_type == "SVM":
        param_space = {
            'batch_size': {'type': 'int', 'range': search_space.batch_size_range, 'step': search_space.optuna_step_for_batch_size},
            'c': {'type': 'float', 'range': search_space.c_range, 'log': True},
            'kernel': {'type': 'categorical', 'range': search_space.kernel_range},
            'gamma': {'type': 'float', 'range': search_space.gamma_range, 'log': True},
            'tol': {'type': 'float', 'range': search_space.tol_range, 'log': True},
            'max_iter': {'type': 'int', 'range': search_space.max_iter_range}
        }
    elif model_type == "KNN":
        param_space = {
            'batch_size': {'type': 'int', 'range': search_space.batch_size_range, 'step': search_space.optuna_step_for_batch_size},
            'n_neighbors': {'type': 'int', 'range': search_space.n_neighbor_range, 'step': search_space.optuna_step_for_n_neighbor},
            'weights': {'type': 'categorical', 'range': search_space.weight_range},
            'algorithm': {'type': 'categorical', 'range': search_space.algorithm_range},
            'leaf_size': {'type': 'int', 'range': search_space.leaf_size_range, 'step': search_space.optuna_step_for_leaf_size, 'log': True},
            # 'p': {'type': 'float', 'range': search_space.p_range},
            'metric': {'type': 'categorical', 'range': search_space.metric_range}
        }
    elif model_type == "RF":
        param_space = {
            'batch_size': {'type': 'int', 'range': search_space.batch_size_range, 'step': search_space.optuna_step_for_batch_size},
            'num_tree': {'type': 'int', 'range': search_space.num_tree_range, 'step': search_space.optuna_step_for_num_tree},
            'criterion': {'type': 'categorical', 'range': search_space.criterion_range},
            'max_depth': {'type': 'categorical', 'range': search_space.max_depth_range},
            'min_sample_split': {'type': 'float', 'range': search_space.min_sample_split_range, 'log': True},
            'min_sample_leaf': {'type': 'int', 'range': search_space.min_sample_leaf_range},
            'max_feature': {'type': 'categorical', 'range': search_space.max_feature_range},
            'max_leaf_node': {'type': 'categorical', 'range': search_space.max_leaf_node_range},
            'bootstrap': {'type': 'categorical', 'range': search_space.bootstrap_range},
            'min_impurity_decrease': {'type': 'float', 'range': search_space.min_impurity_decrease_range, 'log': True},
            'ccp_alpha': {'type': 'float', 'range': search_space.ccp_alpha_range, 'log': True}
        }
    elif model_type == "XGB":
        param_space = {
            'batch_size': {'type': 'int', 'range': search_space.batch_size_range, 'step': search_space.optuna_step_for_batch_size},
            'eta': {'type': 'float', 'range': search_space.eta_range, 'log': False},
            'gamma': {'type': 'float', 'range': search_space.gamma_range, 'log': True},
            'max_depth': {'type': 'int', 'range': search_space.max_depth_range, 'step': search_space.optuna_step_for_max_depth, 'log': True},
            'min_child_weight': {'type': 'float', 'range': search_space.min_child_weight_range, 'log': True},
            'subsample': {'type': 'float', 'range': search_space.subsample_range}, 
            'sampling_method': {'type': 'categorical', 'range': search_space.sampling_method_range},
            'reg_lambda': {'type': 'float', 'range': search_space.reg_lambda_range, 'log': True},
            'reg_alpha': {'type': 'float', 'range': search_space.reg_alpha_range, 'log': True},
            'early_stopping_rounds' : {'type': 'int', 'range': search_space.early_stopping_rounds_range, 'log': True},
            'grow_policy': {'type': 'categorical', 'range': search_space.grow_policy_range},
            'max_leaves': {'type': 'int', 'range': search_space.max_leaf_range, 'step': search_space.optuna_step_for_max_depth, 'log': True}
        }
    elif model_type == "CAT":
        param_space = {
            'batch_size': {'type': 'int', 'range': search_space.batch_size_range, 'step': search_space.optuna_step_for_batch_size},
            'learning_rate': {'type': 'float', 'range': search_space.learning_rate_range, 'log': True},
            'depth': {'type': 'int', 'range': search_space.depth_range, 'step': search_space.optuna_step_for_depth},
            'l2_leaf_reg': {'type': 'float', 'range': search_space.l2_leaf_reg_range, 'log': True},
            'early_stopping_rounds': {'type': 'int', 'range': search_space.early_stopping_rounds_range, 'step': search_space.optuna_step_for_early_stopping_round},
            'random_strength': {'type': 'float', 'range': search_space.random_strength_range},
            'colsample_bylevel': {'type': 'float', 'range': search_space.colsample_range}, 
            'bagging_temperature': {'type': 'float', 'range': search_space.bagging_temperature_range},
            'border_count': {'type': 'int', 'range': search_space.border_count_range}, 
        }
        if '5' in pipeline.data.columns_remaining_str:
            param_space['cat_features'] = {'type': 'categorical', 'range': [[pipeline.data.columns_remaining.index(5)]]} 
    if pipeline.modality.bagging_strategy == 'single':
        param_space['n_base_models'] = {'type': 'int', 'range': pipeline.n_base_models}
    if pipeline.modality.bagging_strategy == 'None':
        param_space['n_base_models'] = {'type': 'int', 'range': [1]}

    params = {}
    for name, config in param_space.items():
        params[name] = (
            trial.suggest_int(name, config['range'][0], config['range'][-1], step=config.get('step', 1), log=config.get('log', False)) if config['type'] == 'int' else
            trial.suggest_float(name, config['range'][0], config['range'][-1], log=config.get('log', False)) if config['type'] == 'float' else
            trial.suggest_categorical(name, config['range'])
        )

    if pipeline.modality.bagging_strategy !='single':     # todo: improve logic here
        params['n_base_models'] = 1
    if model_type == 'MLP':
        params['criterion'] = loss_function_dict[params['criterion']]
    elif model_type in ['TAB', 'TABold']:
        params['cat_idxs'] = [pipeline.data.columns_remaining.index(5)] if '5' in pipeline.data.columns_remaining_str else []
        params['cat_dims'] = [2] if 'cat_idxs' in params else []
    elif model_type == "RF":
        params['max_samples'] = (
            trial.suggest_float('max_samples', search_space.max_sample_range[0], search_space.max_sample_range[-1])
            if params['bootstrap'] else None
        )
    elif model_type == "KNN":
        params['p'] = (
            trial.suggest_float('p', search_space.p_range[0], search_space.p_range[-1])
            if params['metric'] == 'minkowski' else 2
        )    
    elif model_type == "SVM":
        params['degree'] = (
            trial.suggest_int('degree', search_space.degree_range[0], search_space.degree_range[-1], step=1)
            if params['kernel'] == 'poly' else 1
        )

    module_results = pipeline.train_and_metric_module(model_type=model_type, **params)

    if model_type in ['MLP', 'TAB', 'TABold', 'CAT', 'XGB']:
        trial.set_user_attr('stopping_epoch', module_results.stopping_epoch)

    if pipeline.modality.metric == 'Acc':
        return module_results.val_accuracy
    elif pipeline.modality.metric == 'F1':
        return module_results.val_f1
    elif pipeline.modality.metric == 'Fb':
        return module_results.val_fb

Hyper-parameters and user inputs

In [None]:
# UI

# The lists in the moodel section can each take 1 or more elements (arbitrary).
# In training mode, only the 1st element in each list will be used.
# In tuning, multiple elements will be used.
#   - In optuna tuning, only the 1st and last element in the lists are taken and used to define the search range.
#   - In grid searching, all elements are passed as search coordinates.

# ---------------------------------- dataset --------------------------------- #

GUI_raw_dataset_name = 'phpAmSP4g'
GUI_raw_dataset_name = 'kc2'
GUI_raw_dataset_name = 'gender'
GUI_raw_dataset_name = 'tibial_slope_1'
GUI_raw_dataset_name = 'tibial_slope_2'

GUI_random_seed_modifier = 20
GUI_random_seed_modifier = 0
GUI_random_seed_modifier = 10

GUI_train_ratio = 0.8
GUI_n_classes = 2
GUI_warm_up = 10
GUI_patience = 10
GUI_moving_average_range = 10000
GUI_num_test = 10
GUI_n_epochs = 500

GUI_num_optuna_trials = 10000
# ----------------------------------- modes ---------------------------------- #

GUI_is_verbose = False

GUI_saves_plot = True
GUI_saves_plot = False

GUI_uses_K_Fold = False
GUI_uses_K_Fold = True

GUI_is_developoer_test_mode = True

GUI_uses_existing_model = False
GUI_uses_existing_model = True

GUI_uses_early_stopping = False 
GUI_uses_early_stopping = True

GUI_standardizes_input = False
GUI_standardizes_input = True

GUI_bagging_strategy = 'diverse'
GUI_bagging_strategy = 'single'
GUI_bagging_strategy = 'None'
    
GUI_model_choice = 'TAB'
GUI_model_choice = 'CAT'
GUI_model_choice = 'XGB'
GUI_model_choice = 'SVM'
GUI_model_choice = 'MLP'
GUI_model_choice = 'RF'
GUI_model_choice = 'KNN'  

GUI_mode = None 
GUI_mode = 'feature'  
GUI_mode = 'deploy'     #! make sure only running on one feature combination
GUI_mode = 'optuna'
GUI_mode = 'check'
GUI_mode = 'train' 
GUI_mode = 'eval'       #! make sure only running on one feature combination
all_combinations =[[1, 2, 3, 4, 5]] if (GUI_model_choice in ['TAB', 'TABold'] or GUI_mode in ['deploy', 'eval']) else all_combinations

GUI_metric = 'Fb'
GUI_metric = 'Acc'
GUI_metric = 'F1'

GUI_reg = 'dropout'
GUI_reg = 'L1_L2'

device = 'cuda'
device = 'cpu'

# ----------------------------------- model ---------------------------------- #

GUI_batch_sizes = [128, 128]
GUI_n_base_models = [2, 10]
GUI_drop_columns = False

GUI_mlp_num_steps_width = 20
GUI_mlp_step_size_width = 1
GUI_mlp_lrs = [1e-7, 1]
GUI_mlp_weight_decays = [0, 2]
GUI_mlp_activation_functions = ['relu', 'tanh', 'elu', 'leaky relu', 'log sigmoid', 'continuous relu', 'relu 6', 'gaussian relu', 'sigmoid', 'sigmoid relu']
GUI_mlp_activation_functions = ['relu 6', 'tanh', 'relu']
GUI_mlp_criterions = ['L1', 'MSE', 'cross entropy', 'NLL', 'CTC', 'KL divergence', 'BCE logit']
GUI_mlp_criterions = ['cross entropy']
GUI_mlp_depths = [1, 3]
GUI_mlp_widths = [GUI_mlp_step_size_width + 1, GUI_mlp_num_steps_width * GUI_mlp_step_size_width + 1]
GUI_mlp_patience = [1, 100]

GUI_tab_width_prediction_and_attention = [8, 64]
GUI_tab_n_steps = [3, 10]
GUI_tab_gamma = [1.0, 2.0]
GUI_tab_cat_dims = [4]
GUI_tab_cat_dims = [2]
GUI_tab_n_independent = [1, 5]
GUI_tab_n_shared = [1, 5]
GUI_tab_momentum = [1e-2, 0.4]
GUI_tab_lambda_sparse = [1e-5, 1e-1]
GUI_tab_mask_type = ['sparsemax', 'entmax']

GUI_rf_num_trees = [2, 100]
GUI_rf_criterions = ['gini']
GUI_rf_max_depths = list(range(1, 21)) + [None]
GUI_rf_min_sample_split = [0.02, 1.0]
GUI_rf_min_sample_leaf = [1, 10]  
GUI_rf_max_features = ['sqrt', 'log2', None]
GUI_rf_max_leaf_nodes = list(range(2, 101)) + [None]
GUI_rf_bootstraps = [True, False]
GUI_rf_min_impurity_decrease = [1e-7, 1e7]
GUI_rf_ccp_alpha = [1e-7, 1e7]
GUI_rf_max_samples = [0.1, 1.0]
GUI_mlp_drs = [0, 1]
GUI_mlp_l1_weight = [1e-7, 1e2]
GUI_mlp_l2_weight = [1e-7, 1e2]

GUI_knn_n_neighbors = [1, 15]
GUI_knn_weights = ['uniform', 'distance']
GUI_knn_algorithms = ['kd_tree', 'brute']
GUI_knn_leaf_sizes = [1, 100]
GUI_knn_ps = [1, 5]
GUI_metrics = ['minkowski']

GUI_xgb_etas = [0, 1]
GUI_xgb_gammas = [1e-7, 10000]
GUI_xgb_max_depths = [1, 100]
GUI_xgb_min_child_weights = [1e-7, 10000]
GUI_xgb_subsamples = [0.1, 1.0]
GUI_xgb_sampling_methods = ['uniform']
GUI_xgb_reg_lambdas = [1e-7, 10000]
GUI_xgb_reg_alphas = [1e-7, 10000]
GUI_grow_policies = ['depthwise', 'lossguide']
GUI_xgb_max_leaves = [1, 10000]
GUI_xgb_early_stopping_rounds = [1e1, 1e2]

GUI_svm_c = [1e-5, 1e5]
GUI_svm_kernel = ['linear', 'poly', 'rbf', 'sigmoid']
GUI_svm_gamma = [1e-5, 1e5]
GUI_svm_tol = [1e-6, 1e-2]
GUI_svm_max_iter = [1e1, 1e6]
GUI_svm_degree = [2, 5]

GUI_cat_eval_metric = ['Accuracy']
GUI_cat_features = [[4]]
GUI_cat_early_stopping_rounds = [1e1, 1e2]
GUI_cat_learning_rate = [1e-3, 0.5]
GUI_cat_depth = [4, 10]
GUI_cat_l2_leaf_reg = [1e-2, 1e2]
GUI_cat_random_strength = [0, 10]
GUI_cat_colsample = [0.4, 1.0]
GUI_cat_bagging_temperature = [0, 1]
GUI_cat_border_count = [32, 128]

In [None]:
model_search_space_registry = {
    "mlp": MLP_SearchSpace,
    "tab": TAB_SearchSpace,
    "rf": RF_SearchSpace,
    "svm": SVM_SearchSpace,
    "xgb": XGB_SearchSpace,
    "knn": KNN_SearchSpace,
    "cat": CAT_SearchSpace
}

user_defined_search_spaces = {
    "MLP": {
        "max_epoch_range": GUI_n_epochs,
        "batch_size_range": GUI_batch_sizes, 
        "model_depth_range": GUI_mlp_depths, 
        "model_width_range": GUI_mlp_widths, 
        "model_lr_range": GUI_mlp_lrs, 
        "model_dr_range": GUI_mlp_drs, 
        "activation_function_range": GUI_mlp_activation_functions, 
        "weight_decay_range": GUI_mlp_weight_decays, 
        "criterion_range": GUI_mlp_criterions,
        "num_optuna_trials": GUI_num_optuna_trials, # todo: check if the num_optuna_trials in MLP and other models are actually needed
        "patience_range": GUI_mlp_patience,
        "l1_weight_range": GUI_mlp_l1_weight,
        "l2_weight_range": GUI_mlp_l2_weight
    },
    "TAB": {
        "batch_size_range": GUI_batch_sizes,
        "width_prediction_and_attention_range": GUI_tab_width_prediction_and_attention,
        "n_step_range": GUI_tab_n_steps,
        "gamma_range": GUI_tab_gamma,
        "cat_idx_range": GUI_tab_cat_dims,
        "cat_dim_range": GUI_tab_cat_dims,
        "n_independent_range": GUI_tab_n_independent,
        "n_shared_range": GUI_tab_n_shared,
        "momentum_range": GUI_tab_momentum,
        "lambda_sparse_range": GUI_tab_lambda_sparse,
        "mask_type_range": GUI_tab_mask_type,
        "num_optuna_trials": GUI_num_optuna_trials
        # "patience_range": List[int]
        # "optimizer_param_range": List[float]
        # "max_epoch_range": List[int]
        # "max_epoch_range": List[int]
    },
    "SVM": {
        "batch_size_range": GUI_batch_sizes,
        "c_range": GUI_svm_c,
        "kernel_range": GUI_svm_kernel,
        "gamma_range": GUI_svm_gamma,
        "tol_range": GUI_svm_gamma,
        "max_iter_range": GUI_svm_max_iter,
        "degree_range": GUI_svm_degree,
        "num_optuna_trials": GUI_num_optuna_trials
    },
    "KNN": {
        "batch_size_range": GUI_batch_sizes,
        "n_neighbor_range": GUI_knn_n_neighbors,
        "weight_range": GUI_knn_weights,
        "algorithm_range": GUI_knn_algorithms,
        "leaf_size_range": GUI_knn_leaf_sizes,
        "p_range": GUI_knn_ps,
        "metric_range": GUI_metrics,
        "num_optuna_trials": GUI_num_optuna_trials
    },
    "RF": {
        "batch_size_range": GUI_batch_sizes,
        "num_tree_range": GUI_rf_num_trees,
        "criterion_range": GUI_rf_criterions,
        "max_depth_range": GUI_rf_max_depths,
        "min_sample_split_range": GUI_rf_min_sample_split,
        "min_sample_leaf_range": GUI_rf_min_sample_leaf,
        "max_feature_range": GUI_rf_max_features,
        "max_leaf_node_range": GUI_rf_max_leaf_nodes,
        "bootstrap_range": GUI_rf_bootstraps,
        "min_impurity_decrease_range": GUI_rf_min_impurity_decrease,
        "ccp_alpha_range": GUI_rf_ccp_alpha,
        "max_sample_range": GUI_rf_max_samples,
        "num_optuna_trials": GUI_num_optuna_trials
    },
    "XGB": {
        "batch_size_range": GUI_batch_sizes,
        "eta_range": GUI_xgb_etas,
        "gamma_range": GUI_xgb_gammas,
        "max_depth_range": GUI_xgb_max_depths,
        "min_child_weight_range": GUI_xgb_min_child_weights,
        "subsample_range": GUI_xgb_subsamples,
        "sampling_method_range": GUI_xgb_sampling_methods,
        "reg_lambda_range": GUI_xgb_reg_lambdas,
        "reg_alpha_range": GUI_xgb_reg_alphas,
        "early_stopping_rounds_range": GUI_xgb_early_stopping_rounds,
        "grow_policy_range": GUI_grow_policies,
        "max_leaf_range": GUI_xgb_max_leaves,
        "num_optuna_trials": GUI_num_optuna_trials
    },
    "CAT": {
        "batch_size_range": GUI_batch_sizes,
        "cat_features_range": GUI_cat_features,
        "early_stopping_rounds_range": GUI_cat_early_stopping_rounds,
        "learning_rate_range": GUI_cat_learning_rate,
        "depth_range": GUI_cat_depth,
        "l2_leaf_reg_range": GUI_cat_l2_leaf_reg,
        "random_strength_range": GUI_cat_random_strength,
        "colsample_range": GUI_cat_colsample,
        "bagging_temperature_range": GUI_cat_bagging_temperature,
        "border_count_range": GUI_cat_border_count,
        "num_optuna_trials": GUI_num_optuna_trials
    }
}

Main code

In [None]:
if __name__ == "__main__":

    start_time = time.time()

    for GUI_columns_to_keep in all_combinations:

        pipeline = Pipeline()
        pipeline.set_Modality(
            mode=GUI_mode, 
            model_choice=GUI_model_choice,
            metric=GUI_metric,
            is_developer_test_mode=GUI_is_developoer_test_mode, 
            uses_K_Fold=GUI_uses_K_Fold, 
            uses_existing_model=GUI_uses_existing_model, 
            saves_plot=GUI_saves_plot, 
            is_verbose=GUI_is_verbose,
            standardizes_input=GUI_standardizes_input,
            uses_early_stopping=GUI_uses_early_stopping,
            bagging_strategy=GUI_bagging_strategy,
            random_state=random_state
        )
        pipeline.set_Data(
            raw_dataset_name=GUI_raw_dataset_name, 
            train_ratio=GUI_train_ratio, 
            n_classes=GUI_n_classes, 
            columns_to_keep=GUI_columns_to_keep,
            data_split_random_seed_modfier=GUI_random_seed_modifier,
        )
        
        pipeline.load_dataset()
        pipeline.modify_dataset()
        pipeline.split_dataset()
        pipeline.set_TrialInformation()
        pipeline.set_n_base_model_range(GUI_n_base_models)
        
        print(f'Dataset: {pipeline.data.raw_dataset_name}')
        print(pipeline.modality)
        print(pipeline.trial_info)

        if pipeline.modality.bagging_strategy == 'single': 
            bagged_models = [pipeline.modality.model_choice]
        elif pipeline.modality.bagging_strategy == 'diverse':
            bagged_models = ['MLP', 'TAB', 'SVM', 'KNN', 'RF', 'XGB', 'CAT', 'TABold']
        elif pipeline.modality.bagging_strategy == 'None':
            bagged_models = [pipeline.modality.model_choice]
        else:
            raise ValueError("Invalid value for pipeline.modality.model_choice. It should be 'single', 'diverse', or 'None'.")

        for i, model_type in enumerate(bagged_models):
            config = user_defined_search_spaces[model_type] 
            pipeline.add_model_config(
                model_type=model_type,
                ensemble_group="group_1",  
                name=f"Model_{i+1}",  
                **config
            )
        
        print('All Models to Tune:')
        pipeline.list_models()
            
        if pipeline.modality.mode == 'optuna':

            print("\nOptuna Tuning:\n")

            for model_config_idx, model_config in enumerate(pipeline.model_configs):
                
                model_name = f'Bag model {model_config.name}'
                print(f'Tuning {model_config.name}: {model_config.model_type}')
                search_space = model_config.search_space

                study = optuna.create_study(direction='maximize', sampler=optuna.samplers.TPESampler(seed=random_state))
                study.optimize(lambda trial: objective(trial, pipeline, model_config_idx), search_space.num_optuna_trials)

                with open(check_for_file_duplicates_simple(pipeline.trial_info.hyperparam_save_path.replace(pipeline.modality.model_choice, model_config.model_type)), 'w') as file:

                    if pipeline.modality.model_choice == 'MLP':
                        file.write(f"Regularization Option: {GUI_reg}\n\n")

                    file.write(f'Dataset: {pipeline.data.raw_dataset_name}')
                    file.write(str(pipeline.modality))
                    file.write("\n\n")
                    
                    file.write(str(search_space))
                    file.write("\n\n")

                    file.write(str(pipeline.trial_info))
                    file.write("\n\n")

                    file.write(f'Feature_used (columns): {str(pipeline.data.columns_remaining)}')
                    file.write("\n")
                    file.write(f'Feature_used (compact): {pipeline.data.columns_remaining_str}')
                    file.write("\n\n")

                    file.write("Optuna Tuning:")
                    file.write("\n\n")
                    file.write(f"Best value ({pipeline.modality.metric}): {study.best_value}")
                    file.write("\n")
                    file.write(f"Best parameters: {study.best_params}")

                    if study.best_trial.user_attrs:
                        file.write("\n\nAdditional Trial Attributes:")
                    for key, value in study.best_trial.user_attrs.items():
                        file.write(f"\n{key}: {value}")
                    
                print(f'Optuna files for model {pipeline.model_configs[model_config_idx].name} saved successfuly.')

        elif pipeline.modality.mode in ['check', 'train']:
            
            optimal_hyperparam_directory = Path(pipeline.trial_info.hyperparam_save_path).parent.parent
            
            matching_folders = [p for p in Path(optimal_hyperparam_directory).iterdir() if p.is_dir() and p.name.startswith('final_')]
            # matching_folders = [p for p in Path(optimal_hyperparam_directory).iterdir() if p.is_dir() and p.name.startswith('try_')]
            
            if not matching_folders:
                raise FileNotFoundError(f"No folder starting with 'final_' found in {optimal_hyperparam_directory}")
            
            target_folder = matching_folders[0]

            optimal_hyperparam_path = os.path.join(target_folder, f'{pipeline.modality.model_choice}_{pipeline.modality.metric}_{pipeline.data.columns_remaining}.txt')

            with open(optimal_hyperparam_path, "r") as f:
                content = f.read()

            start_index = content.index("Optuna Tuning:")
            optuna_tuning = content[start_index:]
            start_index = content.index(f"{pipeline.modality.model_choice} Search Space")
            optuna_search_sapce = content[start_index:]

            trials = re.findall(r'Best parameters: \{(.*?)\}', optuna_tuning)

            best_hyper_params = {}  

            for params in trials:
                for param in params.split(", "):
                    inner_key, val = param.split(": ")
                    inner_key = inner_key.strip("'")
                    if val.isdigit():
                        best_hyper_params[inner_key] = float(val)
                    else:
                        best_hyper_params[inner_key] = val.strip("'")

            best_hyper_params['n_base_models'] = 1 if best_hyper_params.get("n_base_models", None) is None else int(best_hyper_params.get('n_base_models', None))
            best_tuned_performance = re.search(rf'Best value \({pipeline.modality.metric}\): (\d+(?:\.\d+)?)', optuna_tuning)
            rounds_tuning = re.search(r'num_optuna_trials: \d+', optuna_search_sapce)
            
            if pipeline.modality.model_choice in ['MLP', 'TAB', 'TABold', 'CAT', 'XGB']:
                stopping_epoch = re.search(r'stopping_epoch: (\d+(?:\.\d+)?)', optuna_tuning)
                stopping_epoch = round(float(stopping_epoch.group(1))) + 1
                
            if pipeline.modality.model_choice == 'MLP':
                best_hyper_params['batch_size'] = int(best_hyper_params['batch_size'])
                best_hyper_params['model_lr'] = float(best_hyper_params['model_lr'])
                best_hyper_params['model_dr'] = float(best_hyper_params['model_dr'])
                best_hyper_params['l1_weight'] = float(best_hyper_params['l1_weight'])
                best_hyper_params['l2_weight'] = float(best_hyper_params['l2_weight'])
                best_hyper_params['model_depth'] = int(best_hyper_params['model_depth'])
                best_hyper_params['model_width'] = int(best_hyper_params['model_width'])
                best_hyper_params['activation_function'] = best_hyper_params['activation_function'].strip("'")
                best_hyper_params['weight_decay'] = float(best_hyper_params['weight_decay'])
                best_hyper_params['criterion'] = loss_function_dict[best_hyper_params['criterion'].strip("'")]
                # best_hyper_params['patience'] = int(best_hyper_params['patience']) if pipeline.modality.mode == 'check' else None
                best_hyper_params['max_epoch'] = stopping_epoch if pipeline.modality.mode == 'train' else 500 #! hard coded
                best_hyper_params['input_dim'] = int(best_hyper_params['input_dim'])
                best_hyper_params['n_classes'] = int(best_hyper_params['n_classes'])
            elif pipeline.modality.model_choice in ['TAB', 'TABold']:
                best_hyper_params['batch_size'] = int(best_hyper_params['batch_size'])
                # best_hyper_params['width_prediction'] = int(best_hyper_params['width_prediction_and_attention'])
                # best_hyper_params['width_attention'] = int(best_hyper_params['width_prediction_and_attention'])
                best_hyper_params['width_prediction_and_attention'] = int(best_hyper_params['width_prediction_and_attention'])
                best_hyper_params['n_steps'] = int(best_hyper_params['n_steps'])
                best_hyper_params['gamma'] = float(best_hyper_params['gamma'])
                best_hyper_params['n_independent'] = int(best_hyper_params['n_independent'])
                best_hyper_params['n_shared'] = int(best_hyper_params['n_shared'])
                best_hyper_params['momentum'] = float(best_hyper_params['momentum'])
                best_hyper_params['lambda_sparse'] = float(best_hyper_params['lambda_sparse'])
                best_hyper_params['mask_type'] = best_hyper_params['mask_type'].strip("'")
                best_hyper_params['max_epoch'] = stopping_epoch if pipeline.modality.mode == 'train' else 200 #! hard coded
                best_hyper_params['cat_idxs'] = [pipeline.data.columns_remaining.index(5)] if '5' in pipeline.data.columns_remaining_str else []
                best_hyper_params['cat_dims'] = [2] if best_hyper_params['cat_idxs'] else []
            elif pipeline.modality.model_choice == 'SVM':
                best_hyper_params['batch_size'] = int(best_hyper_params['batch_size'])
                best_hyper_params['c'] = float(best_hyper_params['c'])
                best_hyper_params['kernel'] = (best_hyper_params['kernel']).strip("'")
                best_hyper_params['gamma'] = float(best_hyper_params['gamma'])
                best_hyper_params['tol'] = float(best_hyper_params['tol'])
                best_hyper_params['max_iter'] = int(best_hyper_params['max_iter'])
                best_hyper_params['degree'] = 1 if best_hyper_params.get("degree", None) is None else int(best_hyper_params.get("degree", None))
            elif pipeline.modality.model_choice == 'KNN':
                best_hyper_params['batch_size'] = int(best_hyper_params['batch_size'])
                best_hyper_params['n_neighbors'] = int(best_hyper_params['n_neighbors'])
                best_hyper_params['weights'] = (best_hyper_params['weights']).strip("'")
                best_hyper_params['algorithm'] = (best_hyper_params['algorithm']).strip("'")
                best_hyper_params['leaf_size'] = int(best_hyper_params['leaf_size'])
                best_hyper_params['p'] = float(best_hyper_params['p'])
                best_hyper_params['metric'] = (best_hyper_params['metric']).strip("'")
            elif pipeline.modality.model_choice == 'RF':
                best_hyper_params['batch_size'] = int(best_hyper_params['batch_size'])
                best_hyper_params['num_tree'] = int(best_hyper_params['num_tree'])
                best_hyper_params['criterion'] = (best_hyper_params['criterion']).strip("'")
                best_hyper_params['max_depth'] = None if best_hyper_params["max_depth"] == 'None' else int(best_hyper_params['max_depth'])
                best_hyper_params['min_sample_split'] = float(best_hyper_params['min_sample_split'])
                best_hyper_params['min_sample_leaf'] = int(best_hyper_params['min_sample_leaf'])
                best_hyper_params['max_feature'] = None if best_hyper_params['max_feature'] == 'None' else best_hyper_params['max_feature'].strip("'")
                best_hyper_params['max_leaf_node'] = None if best_hyper_params['max_leaf_node'] == 'None' else int(best_hyper_params['max_leaf_node'])
                best_hyper_params['bootstrap'] = ast.literal_eval(best_hyper_params['bootstrap'])
                best_hyper_params['min_impurity_decrease'] = float(best_hyper_params['min_impurity_decrease'])
                best_hyper_params['ccp_alpha'] = float(best_hyper_params['ccp_alpha'])
                best_hyper_params['max_samples'] = None if best_hyper_params.get("max_sample", None) is None else float(best_hyper_params.get('max_sample', None))
            elif pipeline.modality.model_choice == 'XGB':
                best_hyper_params['batch_size'] = int(best_hyper_params['batch_size'])
                best_hyper_params['eta'] = float(best_hyper_params['eta'])
                best_hyper_params['gamma'] = float(best_hyper_params['gamma'])
                best_hyper_params['max_depth'] = int(best_hyper_params['max_depth'])
                best_hyper_params['min_child_weight'] = float(best_hyper_params['min_child_weight'])
                best_hyper_params['subsample'] = float(best_hyper_params['subsample'])
                best_hyper_params['sampling_method'] = (best_hyper_params['sampling_method']).strip("'")
                best_hyper_params['reg_lambda'] = float(best_hyper_params['reg_lambda'])
                best_hyper_params['reg_alpha'] = float(best_hyper_params['reg_alpha'])
                best_hyper_params['grow_policy'] = (best_hyper_params['grow_policy']).strip("'")
                best_hyper_params['max_leaves'] = int(best_hyper_params['max_leaves'])
                best_hyper_params['n_estimators'] = stopping_epoch if pipeline.modality.mode == 'train' else None
                best_hyper_params['early_stopping_rounds'] = int(best_hyper_params['early_stopping_rounds']) if pipeline.modality.mode == 'check' else None
            elif pipeline.modality.model_choice == 'CAT':
                best_hyper_params['batch_size'] = int(best_hyper_params['batch_size'])
                best_hyper_params['learning_rate'] = float(best_hyper_params['learning_rate'])
                best_hyper_params['depth'] = int(best_hyper_params['depth'])
                best_hyper_params['l2_leaf_reg'] = float(best_hyper_params['l2_leaf_reg'])
                best_hyper_params['random_strength'] = float(best_hyper_params['random_strength'])
                best_hyper_params['colsample_bylevel'] = float(best_hyper_params['colsample_bylevel'])
                best_hyper_params['bagging_temperature'] = float(best_hyper_params['bagging_temperature'])
                best_hyper_params['border_count'] = int(best_hyper_params['border_count'])
                best_hyper_params['iterations'] = stopping_epoch if pipeline.modality.mode == 'train' else None
                best_hyper_params['early_stopping_rounds'] = int(best_hyper_params['early_stopping_rounds']) if pipeline.modality.mode == 'check' else None
                best_hyper_params['cat_features'] = ast.literal_eval(best_hyper_params['cat_features']) if 'cat_features' in best_hyper_params else []
                best_hyper_params['cat_features'] = [int(i) for i in best_hyper_params['cat_features']]

            # print(best_hyper_params)
            print(f'\nA total of {best_hyper_params['n_base_models']} models read, tuned with {rounds_tuning.group(0)} trials.')
            
            module_results = pipeline.train_and_metric_module(model_type=pipeline.modality.model_choice, **best_hyper_params)
            
            print(f'\n')
            # print(f'Train Accuracy: {module_results.train_accuracy:.4f}')
            # print(f'Train F1: {module_results.train_f1:.4f}')
            # print(f'Train AUC: {module_results.train_auc:.4f}')
            print(f'Test Accuracy: {module_results.val_accuracy:.4f}')
            print(f'Test F1: {module_results.val_f1:.4f}')
            print(f'Test AUC: {module_results.val_auc:.4f}')
            
            if pipeline.modality.mode == 'check':
                
                actual_performance = module_results.val_accuracy if pipeline.modality.metric == 'Acc' else module_results.val_f1 if pipeline.modality.metric == 'F1' else module_results.val_fb
                check_tune_consistency(tuned_performance=float(best_tuned_performance.group(1)), actual_performance=actual_performance) 
            
            elif pipeline.modality.mode == 'train':
                
                for model_idx, base_model in enumerate(module_results.trained_models):

                    if pipeline.modality.model_choice == 'MLP':
                        model_info = {
                            'model_state_dict': base_model.state_dict(),
                                
                            'optimizer_state_dict': base_model.optimizer.state_dict(),
                            
                            'input_dim': base_model.fc_layers[0].in_features,
                            'n_classes': base_model.fc_last.out_features,
                            'model_width': base_model.fc_layers[0].out_features,
                            'model_depth': len(base_model.fc_layers),
                            'dropout_rate': base_model.dropout_layers[0].p,
                            'criterion': base_model.criterion,
                            'activation_function': base_model.activation_function,
                            
                            'model_lr': base_model.model_lr,
                            'weight_decay': base_model.weight_decay,
                            'l1_weight': base_model.l1_weight,
                            'l2_weight': base_model.l2_weight,
                            'random_state': base_model.random_state,
                            
                            'max_epoch': base_model.max_epoch,
                        }
                        torch.save(model_info, check_for_file_duplicates_short(os.path.splitext(pipeline.trial_info.model_save_path)[0], os.path.splitext(pipeline.trial_info.model_save_path)[1]))
                    elif pipeline.modality.model_choice in ['TAB', 'TABold']:
                        # torch.save(base_model.network, check_for_file_duplicates_short(os.path.splitext(pipeline.trial_info.model_save_path)[0], os.path.splitext(pipeline.trial_info.model_save_path)[1]))
                        base_model.save_model(check_for_file_duplicates_short(os.path.splitext(pipeline.trial_info.model_save_path)[0], os.path.splitext(pipeline.trial_info.model_save_path)[1]))
                    elif pipeline.modality.model_choice in ['SVM', 'KNN', 'RF']:
                        joblib.dump(base_model, check_for_file_duplicates_short(os.path.splitext(pipeline.trial_info.model_save_path)[0], '.joblib'))
                    elif pipeline.modality.model_choice == 'XGB':
                        base_model.save_model(check_for_file_duplicates_short(os.path.splitext(pipeline.trial_info.model_save_path)[0], '.json'))
                    elif pipeline.modality.model_choice == 'CAT':
                        base_model.save_model(check_for_file_duplicates_short(os.path.splitext(pipeline.trial_info.model_save_path)[0], '.cbm'))
                    print(f'Final Model {pipeline.modality.model_choice} trained and saved successfully. ({model_idx + 1} / {len(module_results.trained_models)})')

        elif pipeline.modality.mode == 'eval':
            
            for check_target in ['../Hyperparameter tuning/Optuna', '../Model Evaluation']:
                if pipeline.check_folder_structure(check_target):
                    models_to_load = pipeline.finalize_trials(check_target)
        
            models_to_load_list = []
            for sub_mode in ['eval']:  
                pipeline.modality.mode = sub_mode
                models_to_load = pipeline.load_models_by_type(models_to_load)
                print(f'models_to_load: {models_to_load}')
                models_to_load_df = pd.DataFrame(models_to_load)
                # models_to_load_df = models_to_load_df.iloc[:, -4:] if sub_mode == 'eval' else models_to_load_df
                models_to_load_list.append(models_to_load_df)

            metric_summary = pd.concat(models_to_load_list, axis=1)
            metric_summary = metric_summary.drop(columns='model')
            print(f'metric_summary: {metric_summary}')
            metric_summary.to_csv(pipeline.trial_info.summary_save_path, index=False)

            # * 
            df = metric_summary
            # heatmap_cols = ['Val Accuracy', 'Test Accuracy', 'Test F1', 'Test F2', 'Test AUC']
            # for col in heatmap_cols:
            #     df[col] = pd.to_numeric(df[col], errors='coerce')

            # # Debug: Check data types
            # print("Data types before heatmap:")
            # print(df[heatmap_cols].dtypes)

            # # Get unique model classes
            # model_classes = df['model class'].unique()

            # # Get all unique feature combinations for consistent y-axis
            # all_features = sorted(df['feature combination'].unique())

            # # Create a figure with subplots (one per model class)
            # fig, axes = plt.subplots(1, len(model_classes), figsize=(len(model_classes) * 5, max(8, len(all_features) * 0.4)),
            #                         sharey=True, gridspec_kw={'wspace': 0.1})

            # # Ensure axes is a list for single model case
            # if len(model_classes) == 1:
            #     axes = [axes]

            # # Create heatmap for each model class
            # for idx, model in enumerate(model_classes):
            #     # Filter data for the current model
            #     model_data = df[df['model class'] == model]
                
            #     # Create a DataFrame with all feature combinations, filling missing with NaN
            #     heatmap_data = pd.DataFrame(index=all_features, columns=heatmap_cols)
            #     for feature in all_features:
            #         if feature in model_data['feature combination'].values:
            #             row = model_data[model_data['feature combination'] == feature][heatmap_cols].iloc[0]
            #             heatmap_data.loc[feature] = row
            #         else:
            #             heatmap_data.loc[feature] = [float('nan')] * len(heatmap_cols)
                
            #     # Convert heatmap_data to numeric to ensure float type
            #     heatmap_data = heatmap_data.astype(float)
                
            #     # Debug: Check heatmap_data for non-numeric values
            #     print(f"\nHeatmap data for {model}:")
            #     print(heatmap_data)
                
            #     # Plot heatmap
            #     sns.heatmap(
            #         heatmap_data,
            #         annot=True,  # Show values in cells
            #         cmap='YlGnBu',  # Color scheme
            #         fmt='.3f',  # Format floats to 3 decimal places
            #         cbar=(idx == len(model_classes) - 1),  # Colorbar only on last subplot
            #         cbar_kws={'label': 'Score'} if idx == len(model_classes) - 1 else None,
            #         ax=axes[idx],
            #         yticklabels=all_features if idx == 0 else [],  # Y-labels only on first subplot
            #     )
            #     axes[idx].set_title(f'{model} Metrics')
            #     axes[idx].set_xlabel('Metrics')
            #     if idx == 0:
            #         axes[idx].set_ylabel('Feature Combination')
            #     else:
            #         axes[idx].set_ylabel('')

            # # Adjust layout
            # plt.tight_layout()

            # # Save the heatmap
            # plt.savefig(f'../Model Evaluation/performance_heatmap_subplots_{pipeline.data.raw_dataset_name}.png')
            # plt.close()

            # ! only accounts for accuracy it seems like
            # Create Table of Highest Val Accuracy per Model
            highest_val_acc = df.loc[df.groupby('model class')[f'Val {pipeline.modality.metric}'].idxmax()]
            # table_cols = ['dataset', 'model class', 'feature combination', 'Val Accuracy', 'Test Accuracy', 'Test F1', 'Test F2', 'Test AUC']
            # highest_val_acc_table = highest_val_acc[table_cols]
            highest_val_acc_table = highest_val_acc
            highest_val_acc_table.to_csv(pipeline.trial_info.candidate_list_save_path, index=False)

            # Print the table
            print("\nHighest Val Accuracy per Model:")
            print(highest_val_acc_table.to_string(index=False))
            # * 

        elif pipeline.modality.mode == 'deploy':
        
            # model_candidate_summary_path = f'../Model Evaluation/highest_val_accuracy_per_model_{pipeline.data.raw_dataset_name}.csv'
            # best_model_candidates_df = pd.read_csv(model_candidate_summary_path, header=0)
            # print(best_model_candidates_df)

            # models_to_load = []

            # for best_model_candidate_idx in range(len(best_model_candidates_df)):
            #     best_model_candidate_type = best_model_candidates_df.loc[best_model_candidate_idx, 'model class']
            #     best_model_candidate_features = best_model_candidates_df.loc[best_model_candidate_idx, 'feature combination']
            #     print(best_model_candidate_path)
            #     best_model_candidate_path = f'../Model Evaluation/{pipeline.data.raw_dataset_name}/{best_model_candidate_type}/{best_model_candidate_features}/final*/{best_model_candidate_type}*'
            #     model_and_info = {
            #         'dataset': pipeline.data.raw_dataset_name,
            #         'model class': best_model_candidate_type,
            #         'feature combination': best_model_candidate_features,
            #         'model': best_model_candidate_path
            #     }
            #     models_to_load.append(model_and_info)

            # for dataset in ['tibial_slope_1', 'tibial_slope_2']:
            #     pipeline.data.raw_dataset_name = dataset
            #     models_to_load = pipeline.load_models_by_type

            # metric_summary = pd.DataFrame(models_to_load)
            # metric_summary = metric_summary.drop(columns='model')
            # print(metric_summary)
            # metric_summary.to_csv('../cross_test/cross_dataset_performance.csv', index=False)
            pass

    end_time = time.time()

    execution_time = end_time - start_time
    print(f"\nTotal Execution Time: {execution_time:.2f} seconds")