#### ============  Ashesi University
#### ============  Department of Computer Science and Information Systems
#### ============  "A Novel Cascading Method for Threats Detection Using Deep Learning Models"
#### ============  Clovis Mushagalusa CIRUBAKADERHA

### =================  Importing Necessary Libraries  =================

In [None]:
import os
import warnings
from absl import logging

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' 
os.environ['TF_ENABLE_ONEDNN_OPTS'] = '0'
logging.set_verbosity(logging.ERROR)
warnings.filterwarnings("ignore")

In [None]:
import time
import json
import csv
import torch
import joblib
import itertools
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import torch.nn as nn
import torch.nn.functional as F
from datetime import datetime
from itertools import permutations
from sklearn.metrics import (accuracy_score, f1_score, precision_score, recall_score, 
                            confusion_matrix, classification_report, roc_curve, auc, 
                            roc_auc_score, top_k_accuracy_score, ConfusionMatrixDisplay)
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler
from scipy.stats import wilcoxon
from torch.utils.data import DataLoader, TensorDataset
from typing import List, Tuple, Union, Optional
from collections import defaultdict

### ================  Exploratory Data Analysis (EDA)  =================

==================================  Loading the dataset  ================================== 

In [None]:
drdos_dns = pd.read_csv("DrDoS_DNS.csv")

In [None]:
drdos_ldap = pd.read_csv("DrDoS_LDAP.csv")

In [None]:
drdos_mssql = pd.read_csv("DrDoS_MSSQL.csv")

In [None]:
drdos_ntp = pd.read_csv("DrDoS_NTP.csv")

In [None]:
drdos_ssdp = pd.read_csv("DrDoS_SSDP.csv")

In [None]:
drdos_udp = pd.read_csv("DrDoS_UDP.csv")

In [None]:
mssql = pd.read_csv("MSSQL.csv")

In [None]:
netbios = pd.read_csv("NetBIOS.csv")

In [None]:
portmap = pd.read_csv("Portmap.csv")

In [None]:
syn = pd.read_csv("Syn.csv")

In [None]:
udp = pd.read_csv("UDP.csv")

In [None]:
udplag = pd.read_csv("UDPLag.csv")

=================================  Processing the dataset  ================================= 

In [None]:
list_of_full_datasets = ["drdos_dns", "drdos_ldap", "drdos_mssql", "drdos_ntp", "drdos_ssdp", "drdos_udp", "mssql", "netbios", "portmap", "syn", "udp", "udplag"]

In [None]:
# Stripping the dataset features

def strip_column_names(dataset):
    """
    Strips leading/trailing whitespace from column names of a DataFrame.
    """
    dataset.columns = dataset.columns.str.strip()
    return dataset

# Looping through the list and updating each dataset in globals()

for data in list_of_full_datasets:
    if data in globals():
        globals()[data] = strip_column_names(globals()[data])

=======================  Handling the missing and infinite values in the datasets  =======================

In [None]:
def handle_missing_and_infinite_values(dataset):
    """
    Fills missing and infinite values:
    - Numerical (int/float): with mean
    - Categorical (object/string): with mode (most frequent value)
    """
    for feature in dataset.columns:
        
        # Replacing infinite values with NaN first
        
        dataset[feature].replace([np.inf, -np.inf], np.nan, inplace=True)

        if dataset[feature].isnull().any():
            if dataset[feature].dtype in ['int64', 'float64']:
                mean_value = dataset[feature].mean()
                dataset[feature].fillna(mean_value, inplace=True)
            else:
                if not dataset[feature].mode().empty:
                    mode_value = dataset[feature].mode()[0]
                    dataset[feature].fillna(mode_value, inplace=True)
    return dataset

# Applying the function to each dataset

for data in list_of_full_datasets:
    if data in globals():
        print(f"✅ Handled ..... {data}")
        globals()[data] = handle_missing_and_infinite_values(globals()[data])

======================  Creating a combined dataset for a better model building  ======================

In [None]:
def create_a_combined_threats_dataset(datasets_names: list) -> pd.DataFrame:
    """
    Combines multiple datasets into one DataFrame.

    Arguments:
        datasets_names (list): List of variable names (as strings)

    Returns:
        pd.DataFrame: Concatenated dataset
    """
    combined_data = []

    for name in datasets_names:
        if name in globals():
            dataframe = globals()[name]
            if isinstance(dataframe, pd.DataFrame):
                combined_data.append(dataframe)
            else:
                print(f"⚠️ Variable '{name}' is not a DataFrame. Skipping. ⚠️")
        else:
            print(f"❌ Dataset '{name}' not found in global scope. ❌")

    if not combined_data:
        raise ValueError("No valid datasets found in the list.")

    final_dataset = pd.concat(combined_data, axis=0, ignore_index=True)
    
    print(f"✅ The new dataset shape: {final_dataset.shape}")
    
    return final_dataset

In [None]:
combined_threats_dataset = create_a_combined_threats_dataset(list_of_full_datasets) 

================================  Checking the new datasets  ================================

In [None]:
combined_threats_dataset

In [None]:
combined_threats_dataset.info()

In [None]:
number_of_unique_labels = combined_threats_dataset["Label"].nunique()

In [None]:
print(f"✅ We have {number_of_unique_labels} unique labels in the combined dataset.")

==============================  Threats to be used for modeling  ==============================

In [None]:
list_of_modeling_threats = ["DrDoS_DNS", "DrDoS_NTP", "Syn", "UDP"]

=================================  Reducing the dataset  =================================

In [None]:
def reduce_dataset(data: pd.DataFrame, label_column: str = "Label", threats: list = None, max_per_threat: int = 500_000):
    """
    Creates a dataset by retaining all BENIGN samples and sampling up to max_per_threat 
    samples for each class in the 'threats' list.

    Arguments:
        data (pd.DataFrame): The whole dataset containing all classes.
        label_column (str): The name of the label column.
        threats (list): List of target threat labels to include.
        max_per_threat (int): Number of samples to retain per threat label.

    Returns:
        pd.DataFrame: Filtered and balanced dataset.
    """
    if label_column not in data.columns:
        raise KeyError(f"❌ The column '{label_column}' was not found in the dataset. ❌")
    
    if threats is None or len(threats) == 0:
        raise ValueError("Please provide a non-empty list of threat labels to include.")
    
    # Keeping all BENIGN samples
    
    benign_data = data[data[label_column] == "BENIGN"]

    # Selecting and sampling only the specified threat classes
    
    sampled_threats = []
    for threat in threats:
        threat_subset = data[data[label_column] == threat]
        if threat_subset.empty:
            print(f"⚠️ Warning: No data found for threat '{threat}' ⚠️")
            continue
        if len(threat_subset) > max_per_threat:
            threat_sample = threat_subset.sample(n=max_per_threat, random_state=42)
        else:
            threat_sample = threat_subset
        sampled_threats.append(threat_sample)

    # Combining all
    
    modeling_data = pd.concat([benign_data] + sampled_threats, ignore_index=True)

    print(f"✅ Reduced dataset contains: {len(modeling_data)} observations across {len(sampled_threats)} threats. ✅")
    
    return modeling_data.sample(frac=1.0, random_state=42).reset_index(drop=True)

In [None]:
modeling_dataset = reduce_dataset(data=combined_threats_dataset, label_column="Label", threats=list_of_modeling_threats, max_per_threat=2_000_000)

In [None]:
print(f"✅ Reduced dataset shape: {modeling_dataset.shape}")

==============================  Dropping Unnecessary columns  ==============================

In [None]:
# Columns to be dropped

columns_to_drop = ['Unnamed: 0', 'Flow ID', 'Source IP', 'Destination IP', 
                   'Timestamp', 'Fwd Header Length.1', 'SimillarHTTP']

In [None]:
def drop_columns(data: pd.DataFrame, columns_to_drop: list) -> pd.DataFrame:
    """
    Drops specified columns from the dataset.

    Arguments:
        data (pd.DataFrame): Input dataset.
        columns_to_drop (list): List of column names to be removed.

    Returns:
        pd.DataFrame: Cleaned dataset with specified columns dropped.
    """
    if not isinstance(columns_to_drop, list):
        raise TypeError("columns_to_drop must be a list of column names.")

    # Identifying only columns that exist in the DataFrame
    
    columns_present = [col for col in columns_to_drop if col in data.columns]

    # Dropping only the columns that are present
    
    cleaned_data = data.drop(columns=columns_present)
    
    print(f"✅ Handled ✅")
    
    return cleaned_data

In [None]:
cleaned_combined_threats_dataset = drop_columns(modeling_dataset, columns_to_drop)

=================================== Shuffling the data  ===================================

In [None]:
def shuffle_dataset_in_batches(dataset: pd.DataFrame, batch_size: int = 100_000, random_seed: int = 42) -> pd.DataFrame:
    """
    Shuffles the DataFrame in memory-safe batches for consistent randomness and scalability.

    Arguments:
        dataset (pd.DataFrame): Input dataset to shuffle.
        batch_size (int): Size of each chunk to shuffle independently.
        random_seed (int): Seed for reproducibility.

    Returns:
        pd.DataFrame: Shuffled dataset.
    """
    if dataset.empty:
        raise ValueError("The input DataFrame is empty.")

    number_of_rows = len(dataset)
    rng = np.random.default_rng(seed=random_seed)
    shuffled_indices = rng.permutation(number_of_rows)

    shuffled_chunks = [
        dataset.iloc[shuffled_indices[start:end]]
        for start in range(0, number_of_rows, batch_size)
        for end in [min(start + batch_size, number_of_rows)]
    ]

    shuffled_dataframe = pd.concat(shuffled_chunks, ignore_index=True)
    
    print(f"✅ Handled ✅")

    return shuffled_dataframe

In [None]:
shuffled_combined_threats_dataset = shuffle_dataset_in_batches(cleaned_combined_threats_dataset, batch_size=100_000)

In [None]:
shuffled_combined_threats_dataset

===============================  Encoding the "Label" feature  ===============================

In [None]:
def encode_labels_one_hot_and_save(dataset: pd.DataFrame, label_column: str = "Label", output_path: str = "Encoded_modeling_dataset.csv"):
    """
    One-hot encodes the "Label" column,
    combining it with the feature set, 
    and saves the whole dataset to a CSV file.

    Arguments:
        dataset (pd.DataFrame): DataFrame with features and "Label" column.
        label_column (str): Name of the "Label" column.
        output_path (str): Path to save the final encoded dataset.

    Returns:
        pd.DataFrame: The combined dataset with one-hot encoded labels.
    """
    if label_column not in dataset.columns:
        raise KeyError(f"❌ The column '{label_column}' not found in the dataset. ❌")

    # Separating and encoding
    features_dataframe = dataset.drop(columns=[label_column])
    labels_dataframe = pd.get_dummies(dataset[label_column])

    # Combining features and encoded labels
    final_dataframe = pd.concat([features_dataframe, labels_dataframe], axis=1)

    # Saving to CSV
    final_dataframe.to_csv(output_path, index=False)
    
    print(f"✅ Encoded dataset saved to: {output_path}")
    
    return final_dataframe

In [None]:
final_encoded_dataset = encode_labels_one_hot_and_save(shuffled_combined_threats_dataset)

In [None]:
# Splitting the combined DataFrame into features and labels

label_columns = ["BENIGN", "DrDoS_DNS", "DrDoS_NTP", "Syn", "UDP"]

X_features = final_encoded_dataset.drop(columns=label_columns)
y_labels = final_encoded_dataset[label_columns]

==================================  Splitting the dataset  ==================================  

In [None]:
def split_and_scale_dataset(features: pd.DataFrame, labels: pd.DataFrame, test_size: float = 0.2, 
                            val_size: float = 0.0, scale_features: bool = True, random_seed: int = 42):
    """
    Splits features and labels into train, test, and validation sets and scales features.

    Arguments:
        features (pd.DataFrame): Feature matrix (X).
        labels (pd.DataFrame): One-hot encoded labels (y).
        test_size (float): Proportion of the data for testing (default: 0.2).
        val_size (float): Proportion of training set to reserve for validation (default: 0.0).
        scale_features (bool): Whether to apply StandardScaler normalization to features (default: True).
        random_seed (int): Random seed for reproducibility.

    Returns:
        dict: Dictionary with keys:
            - 'X_train', 'X_test', ('X_val' if val_size > 0)
            - 'y_train', 'y_test', ('y_val' if val_size > 0)
            - 'scaler' (optional, only if scale_features is True)
    """
    # Stratified splitting based on class distribution.
    
    stratify_labels = labels.values.argmax(axis=1)

    X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size=test_size,
                                                        stratify=stratify_labels, random_state=random_seed)

    if val_size > 0:
        stratify_train = y_train.values.argmax(axis=1)
        X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=val_size,
                                                          stratify=stratify_train, random_state=random_seed)

    # Normalizing when requested.
    
    scaler = None
    if scale_features:
        scaler = StandardScaler()
        X_train = scaler.fit_transform(X_train)
        X_test = scaler.transform(X_test)
        if val_size > 0:
            X_val = scaler.transform(X_val)

    # Preparing output.
    data_splits = {
        "X_train": X_train,
        "X_test": X_test,
        "y_train": y_train.values,
        "y_test": y_test.values,
    }

    if val_size > 0:
        data_splits["X_val"] = X_val
        data_splits["y_val"] = y_val.values

    if scale_features:
        data_splits["scaler"] = scaler

    print(f"✅ Handled ✅")
    
    return data_splits

In [None]:
# Test dataset: 20 percent of the original dataset.

# Validation dataset: 10 percent of the original dataset.

splitted_dataset = split_and_scale_dataset(features=X_features, labels=y_labels, test_size=0.2,
                                           val_size=0.125, scale_features=True, random_seed=42)

X_train = splitted_dataset["X_train"]
X_val = splitted_dataset["X_val"]
X_test = splitted_dataset["X_test"]

y_train = splitted_dataset["y_train"]
y_val = splitted_dataset["y_val"]
y_test = splitted_dataset["y_test"]

===================================  Splitting Report  ===================================

In [None]:
def report_split_statistics(y_train, y_val=None, y_test=None, label_names=None):
    """
    Prints the number and percentage of each class in the dataset splits.

    Arguments:
        y_train (np.ndarray or pd.DataFrame): One-hot encoded or label vector for training.
        y_val (np.ndarray or pd.DataFrame, optional): Same for validation.
        y_test (np.ndarray or pd.DataFrame, optional): Same for testing.
        label_names (list, optional): Custom list of class labels (column headers).
    """
    def summarize(y, name):
        print(f"\n📊 Class Distribution in {name.upper()} Set:")
        
        if isinstance(y, pd.DataFrame):
            y_array = y.values
            columns = y.columns
        elif isinstance(y, np.ndarray):
            y_array = y
            columns = label_names if label_names else [f"Class_{i}" for i in range(y.shape[1])]
        else:
            raise ValueError("Unsupported label type. Provide a NumPy array or DataFrame.")

        class_counts = y_array.sum(axis=0)
        total = class_counts.sum()

        for label, count in zip(columns, class_counts):
            percent = (count / total) * 100
            print(f" - {label}: {int(count)} ({percent:.2f}%)")

        print(f"➡️ Total samples: {int(total)}")

    summarize(y_train, "train")
    if y_val is not None:
        summarize(y_val, "val")
    if y_test is not None:
        summarize(y_test, "test")

In [None]:
report_split_statistics(y_train=splitted_dataset["y_train"], y_val=splitted_dataset["y_val"], 
                        y_test=splitted_dataset["y_test"], label_names=y_labels.columns.tolist())

=================================  Normalizing the dataset  =================================

In [None]:
def normalize_features(X_train: pd.DataFrame, X_test: pd.DataFrame, 
                       X_val: Optional[pd.DataFrame] = None) -> Tuple[np.ndarray, np.ndarray, Optional[np.ndarray], StandardScaler]:
    """
    Normalizes training, testing, and validation feature sets using StandardScaler.

    Arguments:
        X_train (pd.DataFrame): Training features.
        X_test (pd.DataFrame): Testing features.
        X_val (Optional[pd.DataFrame]): Validation features.

    Returns:
        Tuple containing:
            - X_train_scaled (np.ndarray): Scaled training data.
            - X_test_scaled (np.ndarray): Scaled test data.
            - X_val_scaled (np.ndarray or None): Scaled validation data if provided.
            - scaler (StandardScaler): The fitted scaler.
    """
    scaler = StandardScaler()

    # Fitting only on training data
    
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)
    X_val_scaled = scaler.transform(X_val) if X_val is not None else None

    print(f"✅ Normalization completed: Train shape {X_train_scaled.shape}, Test shape {X_test_scaled.shape}" + 
          (f", Val shape {X_val_scaled.shape}" if X_val_scaled is not None else ""))

    return X_train_scaled, X_test_scaled, X_val_scaled, scaler

In [None]:
X_train_scaled, X_test_scaled, X_val_scaled, scaler = normalize_features(X_train=pd.DataFrame(X_train), 
                                                                         X_test=pd.DataFrame(X_test),
                                                                         X_val=pd.DataFrame(X_val))

==================================  Converting to tensor  ==================================

In [None]:
def convert_to_tensor(X: Union[np.ndarray, pd.DataFrame], y: Union[np.ndarray, pd.Series, pd.DataFrame],
                      one_hot_labels: bool = True) -> Tuple[torch.FloatTensor, torch.Tensor]:
    """
    Converts features and labels to PyTorch tensors.

    Arguments:
        X: Feature matrix (NumPy array or DataFrame).
        y: Labels (one-hot encoded or class indices).
        one_hot_labels: If True, returns FloatTensor labels (for BCEWithLogitsLoss).
                        If False, returns LongTensor class indices (for CrossEntropyLoss).

    Returns:
        Tuple:
            - X_tensor (torch.FloatTensor): Tensor of features.
            - y_tensor (torch.FloatTensor or torch.LongTensor): Labels tensor.
    """
    if isinstance(X, (pd.DataFrame, pd.Series)):
        X = X.values
    if isinstance(y, (pd.DataFrame, pd.Series)):
        y = y.values

    X_tensor = torch.tensor(X, dtype=torch.float32)

    if one_hot_labels:
        y_tensor = torch.tensor(y, dtype=torch.float32)
    else:
        y_tensor = torch.tensor(np.argmax(y, axis=1), dtype=torch.long)

    print(f"✅ Converted to tensors | X: {X_tensor.shape}, y: {y_tensor.shape}")
    
    return X_tensor, y_tensor

In [None]:
# Training

X_train_tensor, y_train_tensor = convert_to_tensor(X_train, y_train, one_hot_labels=False)

In [None]:
# Testing

X_test_tensor, y_test_tensor = convert_to_tensor(X_test, y_test, one_hot_labels=False)

In [None]:
# Validation

X_val_tensor, y_val_tensor = convert_to_tensor(X_val, y_val, one_hot_labels=False)

### ====================  Building The Models  ====================

In [None]:
# Wrap training, validation, and test tensors into TensorDataset objects 
# to enable efficient batching and iteration using DataLoader during training and evaluation.

train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
val_dataset = TensorDataset(X_val_tensor, y_val_tensor)

In [None]:
# Create DataLoaders for training, validation, and testing.
# - `batch_size=128` defines the number of samples per batch.
# - `shuffle=True` is used for training to ensure data is randomly sampled each epoch,
#   while `shuffle=False` is used for validation and testing to maintain consistent ordering.

train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=128, shuffle=False)
val_loader = DataLoader(val_dataset, batch_size=128, shuffle=False)

In [None]:
label_mapping = {
    0: "BENIGN",
    1: "DrDoS_DNS",
    2: "DrDoS_NTP",
    3: "Syn",
    5: "UDP"
}

In [None]:
input_dimension = X_train_tensor.shape[1]           
number_of_classes = y_labels.shape[1] 

#### =======================  Building Individual Models  =======================

##### ==========================  Convolutional Neural Network - (CNN)  ==========================

In [None]:
def build_intrusion_detection_cnn(input_dimension: int, number_of_classes: int = None, return_extractor: bool = False) -> nn.Module:
    """
    Builds a CNN-based model for intrusion detection.
    
    Arguments:
        input_dimension (int): Number of input features.
        number_of_classes (int, optional): Number of output classes.
                                           If None and return_extractor=True, only the feature extractor is returned.
        return_extractor (bool, optional): If True, returns only the feature extractor module without the classifier.

    Returns:
        nn.Module: CNN feature extractor if return_extractor=True, otherwise full CNN model.
    """

    class CNNModel(nn.Module):
        def __init__(self, input_dimension, number_of_classes=None):
            super(CNNModel, self).__init__()
            self.hidden_dim = 128

            self.feature_extractor = nn.Sequential(
                nn.Conv1d(1, 64, kernel_size=5, padding=2),
                nn.BatchNorm1d(64),
                nn.GELU(),
                nn.Dropout(0.25),

                nn.Conv1d(64, 128, kernel_size=3, padding=1),
                nn.BatchNorm1d(128),
                nn.GELU(),
                nn.Dropout(0.3),

                nn.Conv1d(128, self.hidden_dim, kernel_size=3, padding=1),
                nn.BatchNorm1d(self.hidden_dim),
                nn.GELU(),

                nn.AdaptiveAvgPool1d(1)
            )

            if number_of_classes is not None:
                self.classifier = nn.Sequential(
                    nn.Flatten(),
                    nn.Linear(self.hidden_dim, 256),
                    nn.BatchNorm1d(256),
                    nn.GELU(),
                    nn.Dropout(0.5),

                    nn.Linear(256, 128),
                    nn.BatchNorm1d(128),
                    nn.GELU(),
                    nn.Dropout(0.3),

                    nn.Linear(128, number_of_classes)
                )
            else:
                self.classifier = None  

        def forward(self, x, return_features=False):
            x = x.unsqueeze(1)             
            x = self.feature_extractor(x)
            x = x.squeeze(-1)               

            if return_features or self.classifier is None:
                return x
            return self.classifier(x)

    model = CNNModel(input_dimension, number_of_classes)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    if return_extractor:
        return model.feature_extractor.to(device)
    else:
        return model.to(device)

In [None]:
cnn_model = build_intrusion_detection_cnn(input_dimension, number_of_classes)

##### ===========================  Long Short-Term Memory  - (LSTM)  ===========================

In [None]:
def build_intrusion_detection_lstm(input_dimension: int, number_of_classes: int = None, return_extractor: bool = False) -> nn.Module:
    """
    Builds a robust LSTM-based model for intrusion detection.
    
    Arguments:
        input_dimension (int): Number of input features (sequence length).
        number_of_classes (int, optional): Number of output classes. If None, returns feature extractor only.
        return_extractor (bool, optional): If True, returns only the feature extractor module.

    Returns:
        nn.Module: LSTM feature extractor if return_extractor=True, otherwise full LSTM classifier.
    """

    class LSTMModel(nn.Module):
        def __init__(self, input_dimension, number_of_classes=None):
            super(LSTMModel, self).__init__()

            self.hidden_dim = 128
            self.num_layers = 2
            self.feature_dim = 128

            self.lstm = nn.LSTM(
                input_size=1,
                hidden_size=self.hidden_dim,
                num_layers=self.num_layers,
                batch_first=True,
                dropout=0.3
            )

            self.feature_proj = nn.Sequential(
                nn.Linear(self.hidden_dim, self.feature_dim),
                nn.BatchNorm1d(self.feature_dim),
                nn.GELU(),
                nn.Dropout(0.3)
            )

            if number_of_classes is not None:
                self.classifier = nn.Sequential(
                    nn.Linear(self.feature_dim, 256),
                    nn.BatchNorm1d(256),
                    nn.GELU(),
                    nn.Dropout(0.5),

                    nn.Linear(256, 128),
                    nn.BatchNorm1d(128),
                    nn.GELU(),
                    nn.Dropout(0.3),

                    nn.Linear(128, number_of_classes)
                )
            else:
                self.classifier = None

        def forward(self, x, return_features=False):
            x = x.unsqueeze(-1)
            lstm_out, _ = self.lstm(x)
            last_hidden = lstm_out[:, -1, :]
            features = self.feature_proj(last_hidden) 

            if return_features or self.classifier is None:
                return features
            return self.classifier(features)

    model = LSTMModel(input_dimension, number_of_classes)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    if return_extractor:
        return model.feature_proj.to(device)
    else:
        return model.to(device)

In [None]:
lstm_model = build_intrusion_detection_lstm(input_dimension, number_of_classes)

##### ========================  Transformer Neurol Network  - (Transformer)  ========================

In [None]:
def build_intrusion_detection_transformer(input_dimension: int, number_of_classes: int = None, return_extractor: bool = False) -> nn.Module:
    """
    Builds an enhanced Transformer model tailored for tabular intrusion detection.

    Arguments:
        input_dimension (int): Number of input features.
        number_of_classes (int, optional): Number of output classes. If None, acts only as a feature extractor.
        return_extractor (bool, optional): If True, returns only the feature extractor.

    Returns:
        nn.Module: Transformer feature extractor or full classification model based on arguments.
    """

    class TransformerModel(nn.Module):
        def __init__(self, input_dimension, number_of_classes=None):
            super(TransformerModel, self).__init__()

            self.embedding_dim = 128
            self.num_heads = 4
            self.num_layers = 2
            self.dropout_rate = 0.3

            self.embedding = nn.Linear(1, self.embedding_dim)
            self.positional_encoding = None

            encoder_layer = nn.TransformerEncoderLayer(
                d_model=self.embedding_dim,
                nhead=self.num_heads,
                dim_feedforward=256,
                dropout=self.dropout_rate,
                activation='gelu',
                batch_first=True
            )
            self.transformer_encoder = nn.TransformerEncoder(
                encoder_layer,
                num_layers=self.num_layers
            )

            if number_of_classes is not None:
                self.classifier = nn.Sequential(
                    nn.Linear(self.embedding_dim, 256),
                    nn.BatchNorm1d(256),
                    nn.GELU(),
                    nn.Dropout(self.dropout_rate),

                    nn.Linear(256, 128),
                    nn.BatchNorm1d(128),
                    nn.GELU(),
                    nn.Dropout(self.dropout_rate),

                    nn.Linear(128, number_of_classes)
                )
            else:
                self.classifier = None

        def forward(self, x, return_features=False):
            x = x.unsqueeze(-1)
            x = self.embedding(x)

            if self.positional_encoding is None or self.positional_encoding.size(1) != x.size(1):
                self.positional_encoding = nn.Parameter(
                    torch.randn(1, x.size(1), self.embedding_dim).to(x.device),
                    requires_grad=True
                )

            x = x + self.positional_encoding
            x = self.transformer_encoder(x)
            x = x.mean(dim=1)

            if return_features or self.classifier is None:
                return x
            return self.classifier(x)

    model = TransformerModel(input_dimension, number_of_classes)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    if return_extractor:
        return model.embedding.to(device)
    else:
        return model.to(device)

In [None]:
transformer_model = build_intrusion_detection_transformer(input_dimension, number_of_classes)

##### =============================  Graph Neural Network  - (GNN)  =============================

In [None]:
def build_intrusion_detection_gnn(input_dimension: int, number_of_classes: int = None, return_extractor: bool = False) -> nn.Module:
    """
    Builds a GNN-inspired model for tabular intrusion detection using multihead attention.
    
    Supports both feature extraction mode and full classification mode.
    
    Arguments:
        input_dimension (int): Number of input features.
        number_of_classes (int, optional): Number of output classes. If None, acts only as a feature extractor.
        return_extractor (bool, optional): If True, returns only the feature extractor module.
    
    Returns:
        nn.Module: A feature extractor or full classifier model depending on the configuration.
    """

    class GNNModel(nn.Module):
        def __init__(self, input_dimension, number_of_classes=None):
            super(GNNModel, self).__init__()

            self.hidden_dim = 128
            self.output_dim = 128
            self.dropout_rate = 0.3

            self.feature_proj = nn.Sequential(
                nn.Linear(input_dimension, self.hidden_dim),
                nn.BatchNorm1d(self.hidden_dim),
                nn.GELU(),
                nn.Dropout(self.dropout_rate),

                nn.Linear(self.hidden_dim, self.output_dim),
                nn.BatchNorm1d(self.output_dim),
                nn.GELU(),
                nn.Dropout(self.dropout_rate)
            )

            self.attention = nn.MultiheadAttention(
                embed_dim=self.output_dim,
                num_heads=4,
                batch_first=True
            )

            if number_of_classes is not None:
                self.classifier = nn.Sequential(
                    nn.Linear(self.output_dim, 256),
                    nn.BatchNorm1d(256),
                    nn.GELU(),
                    nn.Dropout(0.5),

                    nn.Linear(256, 128),
                    nn.BatchNorm1d(128),
                    nn.GELU(),
                    nn.Dropout(0.3),

                    nn.Linear(128, number_of_classes)
                )
            else:
                self.classifier = None

        def forward(self, x, return_features=False):
            if len(x.shape) == 2:
                x = self.feature_proj(x)     
                x = x.unsqueeze(1)            
            elif len(x.shape) == 3:
                x = x.mean(dim=1)             
                x = self.feature_proj(x).unsqueeze(1)

            attn_output, _ = self.attention(x, x, x)
            attn_output = attn_output.squeeze(1)

            if return_features or self.classifier is None:
                return attn_output
            return self.classifier(attn_output)

    model = GNNModel(input_dimension, number_of_classes)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    return model.to(device)

In [None]:
gnn_model = build_intrusion_detection_gnn(input_dimension, number_of_classes)

#### ========================  Training Individual Models  ========================

In [None]:
def train_model(model, train_loader, val_loader, num_epochs=50, learning_rate=1e-3,
                weight_decay=1e-4, device=None, save_best_model_path=None, use_scheduler=True,):
    """
    Trains a PyTorch model and evaluates on the validation set at the end of each epoch.

    Arguments:
        model (nn.Module): PyTorch neural network model.
        train_loader (DataLoader): Training data loader.
        val_loader (DataLoader): Validation data loader.
        num_epochs (int): Number of training epochs.
        learning_rate (float): Optimizer learning rate.
        weight_decay (float): L2 regularization term.
        device (torch.device or str): 'cuda' or 'cpu' device target.
        save_best_model_path (str): Path to save the best-performing model.
        use_scheduler (bool): Whether to apply learning rate decay on the plateau.

    Returns:
        model (nn.Module): Best validation accuracy model.
    """

    if device is None:
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    model.to(device)
    print(f"🚀 Training started on device: {device}")

    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay)

    scheduler = (
        torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode="max", patience=3, verbose=True)
        if use_scheduler else None
    )

    best_val_acc = 0.0
    best_model_state = None
    start_time = time.time()

    for epoch in range(num_epochs):
        model.train()
        epoch_train_loss = 0
        y_true_train, y_pred_train = [], []

        for X_batch, y_batch in train_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)

            optimizer.zero_grad()
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            loss.backward()
            optimizer.step()

            epoch_train_loss += loss.item()
            y_pred_train.extend(outputs.argmax(dim=1).detach().cpu().numpy())
            y_true_train.extend(y_batch.detach().cpu().numpy())

        train_acc = accuracy_score(y_true_train, y_pred_train)
        train_loss_avg = epoch_train_loss / len(train_loader)

        # ==============================  Validation  ============================== #
        
        model.eval()
        val_loss = 0
        y_true_val, y_pred_val = [], []

        with torch.no_grad():
            for X_val, y_val in val_loader:
                X_val, y_val = X_val.to(device), y_val.to(device)
                outputs = model(X_val)
                loss = criterion(outputs, y_val)
                val_loss += loss.item()
                y_pred_val.extend(outputs.argmax(dim=1).cpu().numpy())
                y_true_val.extend(y_val.cpu().numpy())

        val_acc = accuracy_score(y_true_val, y_pred_val)
        val_f1 = f1_score(y_true_val, y_pred_val, average="weighted")
        val_loss_avg = val_loss / len(val_loader)

        print(
            f"Epoch [{epoch + 1}/{num_epochs}] "
            f"Train Loss: {train_loss_avg:.4f} | Train Acc: {train_acc:.4f} "
            f"| Val Loss: {val_loss_avg:.4f} | Val Acc: {val_acc:.4f} | Val F1: {val_f1:.4f}"
        )

        # Scheduler step
        
        if scheduler:
            scheduler.step(val_acc)

        # Saving the best model
        
        if save_best_model_path and val_acc > best_val_acc:
            best_val_acc = val_acc
            best_model_state = model.state_dict()
            torch.save(best_model_state, save_best_model_path)
            print(f"✅ New best model saved at {save_best_model_path}")

    print(f"\n✅ Training complete in {time.time() - start_time:.2f} seconds.")
    print(f"🥇 Best Validation Accuracy: {best_val_acc:.4f}")

    if best_model_state:
        model.load_state_dict(best_model_state)

    return model

##### =======================  Training Convolutional Neural Network - (CNN)  =======================

In [None]:
trained_cnn_model = train_model(model=cnn_model, train_loader=train_loader, val_loader=val_loader, num_epochs=10,
                                learning_rate=1e-3, weight_decay=1e-5, save_best_model_path="cnn_threats_detection_model.pt")

##### =========================  Training Long Short-Term Memory - (LSTM)  =========================

In [None]:
trained_lstm_model = train_model(model=lstm_model, train_loader=train_loader, val_loader=val_loader, num_epochs=10,
                                 learning_rate=1e-3, weight_decay=1e-5, save_best_model_path="lstm_threats_detection_model.pt")

##### ======================  Training Transformer Neurol Network - (Transformer)  ======================

In [None]:
trained_transformer_model = train_model(model=transformer_model, train_loader=train_loader, val_loader=val_loader, num_epochs=10,
                                  learning_rate=1e-3, weight_decay=1e-5, save_best_model_path="transformer_threats_detection_model.pt")

##### ==========================  Training Graph Neural Network - (GNN)  ==========================

In [None]:
trained_gnn_model = train_model(model=gnn_model, train_loader=train_loader, val_loader=val_loader, num_epochs=10, 
                                learning_rate=1e-3, weight_decay=1e-5, save_best_model_path="gnn_threats_detection_model.pt")

#### =======================  Evaluating Individual Models  =======================

In [None]:
def evaluate_model(model, data_loader, model_name="Model", device=None, class_names=None, verbose=True):
    """
    Evaluates a trained PyTorch model and visualizes both raw and normalized confusion matrices.

    Arguments:
        model (nn.Module): Trained model to evaluate.
        data_loader (DataLoader): Evaluation DataLoader.
        model_name (str): Name of the model (e.g., 'CNN', 'LSTM') for labeling purposes.
        device (torch.device or str): Evaluation device.
        class_names (list): Optional list of class names.
        verbose (bool): If True, prints classification report.

    Returns:
        dict: Evaluation metrics including confusion matrices.
    """

    if device is None:
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    model.to(device)
    model.eval()
    all_preds, all_labels = [], []

    with torch.no_grad():
        for X_batch, y_batch in data_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            outputs = model(X_batch)
            preds = outputs.argmax(dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(y_batch.cpu().numpy())

    # Computing the evaluation metrics
    
    accuracy = accuracy_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds, average='weighted', zero_division=0)
    recall = recall_score(all_labels, all_preds, average='weighted', zero_division=0)
    f1 = f1_score(all_labels, all_preds, average='weighted', zero_division=0)

    if verbose:
        print(f"\n📊 Classification Report for {model_name} Model:\n")
        print(classification_report(all_labels, all_preds, target_names=class_names if class_names else None))

    # Raw and normalized confusion matrices
    
    cm = confusion_matrix(all_labels, all_preds)
    cm_normalized = cm.astype('float') / cm.sum(axis=1, keepdims=True)

    if verbose:
        fig, axs = plt.subplots(1, 2, figsize=(18, 6))

        sns.heatmap(cm, annot=True, fmt="d", cmap="Blues",
                    xticklabels=class_names if class_names else "auto",
                    yticklabels=class_names if class_names else "auto", ax=axs[0])
        axs[0].set_title(f"{model_name} - Raw Confusion Matrix")
        axs[0].set_xlabel("Predicted Label")
        axs[0].set_ylabel("True Label")

        sns.heatmap(cm_normalized, annot=True, fmt=".2f", cmap="YlGnBu",
                    xticklabels=class_names if class_names else "auto",
                    yticklabels=class_names if class_names else "auto", ax=axs[1])
        axs[1].set_title(f"{model_name} - Normalized Confusion Matrix")
        axs[1].set_xlabel("Predicted Label")
        axs[1].set_ylabel("True Label")

        plt.tight_layout()
        plt.show()

    return {
        "accuracy": accuracy,
        "precision": precision,
        "recall": recall,
        "f1_score": f1,
        "confusion_matrix": cm.tolist(),
        "normalized_confusion_matrix": cm_normalized.tolist(),
        "model_name": model_name
    }

In [None]:
class_names = list(label_mapping.values())

##### =======================  Evaluating Convolutional Neural Network - (CNN)  =======================

In [None]:
cnn_metrics = evaluate_model(model=trained_cnn_model, data_loader=test_loader, model_name="CNN",
                         class_names=class_names, verbose=True)

##### ========================  Evaluating Long Short-Term Memory - (LSTM)  ========================

In [None]:
lstm_metrics = evaluate_model(model=trained_lstm_model, data_loader=test_loader, model_name="LSTM",
                         class_names=class_names, verbose=True)

##### =====================  Evaluating Transformer Neurol Network - (Transformer)  =====================

In [None]:
transformer_metrics = evaluate_model(model=trained_transformer_model, data_loader=test_loader, model_name="Transformer",
                         class_names=class_names, verbose=True)

##### =========================  Evaluating Graph Neural Network - (GNN)  =========================

In [None]:
gnn_metrics = evaluate_model(model=trained_gnn_model, data_loader=test_loader, model_name="GNN",
                         class_names=class_names, verbose=True)

#### ======================== Saving the Models Metrics  ========================

In [None]:
best_standalone_metrics = {
    "CNN": cnn_metrics,
    "LSTM": lstm_metrics,
    "Transformer": transformer_metrics,
    "GNN": gnn_metrics
}

In [None]:
def export_best_standalone_metrics_to_csv(metrics_dict: dict, save_path: str):
    
    dataframe = pd.DataFrame([{
        "Model": model_name,
        "Accuracy": metrics['accuracy'],
        "Precision": metrics['precision'],
        "Recall": metrics['recall'],
        "F1 Score": metrics['f1_score']
    } for model_name, metrics in metrics_dict.items()])

    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    base_path = os.path.splitext(save_path)[0]
    excel_path = f"{base_path}_{timestamp}.xlsx"
    csv_path = f"{base_path}_{timestamp}.csv"

    dataframe.to_excel(excel_path, index=False)
    dataframe.to_csv(csv_path, index=False)
    
    print(f"📊 Best standalone model metrics exported to:\n- Excel: {excel_path}\n- CSV: {csv_path}")

In [None]:
export_best_standalone_metrics_to_csv(best_standalone_metrics, "best_standalone_models_metrics.csv")

#### ======================  Building the Cascading Model  ======================

In [None]:
def build_intrusion_cascading_model(model_sequence: list, input_dimension: int, feature_dimension: int,
                                    number_of_classes: int, freeze_extractors: bool = True, verbose: bool = False) -> nn.Module:
    """
    Constructs a clean cascading model composed of four sequential deep learning models.
    
    Each model extracts and refines features from the previous one.
    The final model directly performs classification.
    
    Returns:
        nn.Module: A ready-to-train cascading model.
    """

    class CascadingModel(nn.Module):
        def __init__(self, models, feature_dimension, freeze_extractors, verbose):
            super(CascadingModel, self).__init__()
            assert len(models) == 4, "Exactly four models must be provided."

            self.model1, self.model2, self.model3, self.model4 = models
            self.feature_dim = feature_dimension
            self.verbose = verbose

            if freeze_extractors:
                for model in [self.model1, self.model2, self.model3]:
                    for param in model.parameters():
                        param.requires_grad = False

        def forward(self, x):
            device = next(self.parameters()).device
            x = x.to(device)

            x = self._pass_model(self.model1, x, "Model 1")
            x = self._pass_model(self.model2, x, "Model 2")
            x = self._pass_model(self.model3, x, "Model 3")

            # Final model directly performs classification
            out = self.model4(x, return_features=False)
            return out

        def _pass_model(self, model, x, model_name="Model"):
            if hasattr(model, "forward"):
                try:
                    x = model(x, return_features=True)
                except TypeError as e:
                    raise TypeError(f"{model_name} must implement 'forward(x, return_features=True)': {e}")
            else:
                raise AttributeError(f"{model_name} does not implement a forward method.")

            # Enforcing feature dimension consistency
            if x.shape[1] != self.feature_dim:
                raise ValueError(
                    f"{model_name} output shape mismatch: expected ({x.shape[0]}, {self.feature_dim}), got {x.shape}"
                )
            return x

    model = CascadingModel(models=model_sequence, feature_dimension=feature_dimension, 
                           freeze_extractors=freeze_extractors, verbose=verbose)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    return model.to(device)

==============================  Loading the feature extractor  ==============================

In [None]:
def extract_and_save_feature_extractors(input_dimension: int, number_of_classes: int, save_dir: str = "."):
    """
    Loads trained full models, extracts feature layers, saves the extractors, 
    and returns them as a list.
    
    Args:
        input_dimension (int): Number of input features.
        number_of_classes (int): Number of output classes.
        save_dir (str): Directory containing trained models and where to save feature extractors.
    
    Returns:
        list[nn.Module]: Extracted feature extractors.
    """

    os.makedirs(save_dir, exist_ok=True)

    ordered_model_names = ["CNN", "LSTM", "Transformer", "GNN"]

    models_information = {
        "CNN": {
            "builder": build_intrusion_detection_cnn,
            "path": os.path.join(save_dir, "cnn_threats_detection_model.pt"),
            "extractor_save": os.path.join(save_dir, "cnn_model_feature_extractor.pt")
        },
        "LSTM": {
            "builder": build_intrusion_detection_lstm,
            "path": os.path.join(save_dir, "lstm_threats_detection_model.pt"),
            "extractor_save": os.path.join(save_dir, "lstm_model_feature_extractor.pt")
        },
        "Transformer": {
            "builder": build_intrusion_detection_transformer,
            "path": os.path.join(save_dir, "transformer_threats_detection_model.pt"),
            "extractor_save": os.path.join(save_dir, "transformer_model_feature_extractor.pt")
        },
        "GNN": {
            "builder": build_intrusion_detection_gnn,
            "path": os.path.join(save_dir, "gnn_threats_detection_model.pt"),
            "extractor_save": os.path.join(save_dir, "gnn_model_feature_extractor.pt")
        }
    }

    extractors = []

    for model_name in ordered_model_names:
        information = models_information[model_name]
        print(f" Processing {model_name.upper()}...")

        # Loading trained full models
        
        full_trained_models = information["builder"](input_dimension, number_of_classes)
        full_trained_models.load_state_dict(torch.load(information["path"]), strict=False)

        # Building and loading the feature extractors
        
        feature_extractor = information["builder"](input_dimension, return_extractor=True)
        feature_extractor.load_state_dict(full_trained_models.state_dict(), strict=False)

        # Saving and appending
        
        torch.save(feature_extractor.state_dict(), information["extractor_save"])
        extractors.append(feature_extractor)

        print(f"✅ Saved {model_name.upper()} feature extractor to {information['extractor_save']}")

    return extractors

In [None]:
# Loading the feature extractors: CNN, LSTM, Transformer, and GNN for the cascading models building.

feature_extractor_models = extract_and_save_feature_extractors(input_dimension, number_of_classes)

In [None]:
# Building the cascading model

cascading_model = build_intrusion_cascading_model(model_sequence=feature_extractor_models, input_dimension=input_dimension,
                                                  feature_dimension=128, number_of_classes=number_of_classes,
                                                  freeze_extractors=False, verbose=True)

In [None]:
class FinalClassifierWrapper(nn.Module):
    def __init__(self, feature_model: nn.Module, feature_dimension: int, number_of_classes: int):
        """
        Wraps a feature model and appends a final classification head.

        Arguments:
            feature_model (nn.Module): The feature extractor module (must support return_features=True).
            feature_dimension (int): The expected dimension of extracted features.
            number_of_classes (int): The number of target output classes.
        """
        super(FinalClassifierWrapper, self).__init__()
        self.feature_model = feature_model
        self.feature_dim = feature_dimension

        self.classifier = nn.Sequential(
            nn.BatchNorm1d(feature_dimension),
            nn.GELU(),
            nn.Linear(feature_dimension, 256),
            nn.BatchNorm1d(256),
            nn.GELU(),
            nn.Dropout(0.3),
            nn.Linear(256, number_of_classes)
        )

    def forward(self, x, return_features: bool = False):
        features = self.feature_model(x, return_features=True)
        
        if features.shape[1] != self.feature_dim:
            raise ValueError(f"Expected feature dim {self.feature_dim}, but got {features.shape}")
            
        if return_features:
            return features
            
        return self.classifier(features)

============================  Generating the cascading sequences  ============================

In [None]:
def generate_model_sequence(perm, model_builders, input_dimension, feature_dimension, number_of_classes):
    """
    Generates a sequence of models for the cascading architecture.

    Arguments:
        perm (tuple): Ordered names of model types.
        model_builders (dict): Dictionary mapping model names to builder functions.
        input_dimension (int): Original input feature size.
        feature_dimension (int): Unified feature size for cascading.
        number_of_classes (int): Number of output classes.

    Returns:
        list[nn.Module]: List of 4 models, with the last wrapped in FinalClassifierWrapper.
    """
    models = []
    
    for i, name in enumerate(perm):
        if name not in model_builders:
            raise ValueError(f"Model '{name}' not found in model_builders.")
        
        model_input_dimension = input_dimension if i == 0 else feature_dimension
        model_output_dimension = feature_dimension

        model = model_builders[name](model_input_dimension, model_output_dimension)
        models.append(model)

    # Wrapping the last model in a classification head
    
    final_model = FinalClassifierWrapper(models[-1], feature_dimension, number_of_classes)
    models[-1] = final_model

    return models

#### =======================  Training the Cascading Models  ======================

In [None]:
def train_cascading_model(model, train_loader, val_loader, num_epochs=50, learning_rate=1e-3,
                weight_decay=1e-4, device=None, save_best_model_path=None, use_scheduler=True,):
    """
    Trains the model and evaluates on the validation set at the end of each epoch.

    Arguments:
        model (nn.Module): PyTorch neural network model.
        train_loader (DataLoader): Training data loader.
        val_loader (DataLoader): Validation data loader.
        num_epochs (int): Number of training epochs.
        learning_rate (float): Optimizer learning rate.
        weight_decay (float): L2 regularization term.
        device (torch.device or str): 'cuda' or 'cpu' device target.
        save_best_model_path (str): Path to save the best-performing model.
        use_scheduler (bool): Whether to apply learning rate decay on the plateau.

    Returns:
        model (nn.Module): Best validation accuracy model.
    """

    if device is None:
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    model.to(device)
    print(f"🚀 Training started on device: {device}")

    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay)

    scheduler = (
        torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode="max", patience=3, verbose=True)
        if use_scheduler else None
    )

    best_val_acc = 0.0
    best_model_state = None
    start_time = time.time()

    for epoch in range(num_epochs):
        model.train()
        epoch_train_loss = 0
        y_true_train, y_pred_train = [], []

        for X_batch, y_batch in train_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)

            optimizer.zero_grad()
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            loss.backward()
            optimizer.step()

            epoch_train_loss += loss.item()
            y_pred_train.extend(outputs.argmax(dim=1).detach().cpu().numpy())
            y_true_train.extend(y_batch.detach().cpu().numpy())

        train_acc = accuracy_score(y_true_train, y_pred_train)
        train_loss_avg = epoch_train_loss / len(train_loader)

        # ==============================  Validation  ============================== #
        
        model.eval()
        val_loss = 0
        y_true_val, y_pred_val = [], []

        with torch.no_grad():
            for X_val, y_val in val_loader:
                X_val, y_val = X_val.to(device), y_val.to(device)
                outputs = model(X_val)
                loss = criterion(outputs, y_val)
                val_loss += loss.item()
                y_pred_val.extend(outputs.argmax(dim=1).cpu().numpy())
                y_true_val.extend(y_val.cpu().numpy())

        val_acc = accuracy_score(y_true_val, y_pred_val)
        val_f1 = f1_score(y_true_val, y_pred_val, average="weighted")
        val_loss_avg = val_loss / len(val_loader)

        print(
            f"Epoch [{epoch + 1}/{num_epochs}] "
            f"Train Loss: {train_loss_avg:.4f} | Train Acc: {train_acc:.4f} "
            f"| Val Loss: {val_loss_avg:.4f} | Val Acc: {val_acc:.4f} | Val F1: {val_f1:.4f}"
        )

        # Scheduler step
        
        if scheduler:
            scheduler.step(val_acc)

        # Saving the best model
        
        if save_best_model_path and val_acc > best_val_acc:
            best_val_acc = val_acc
            best_model_state = model.state_dict()
            torch.save(best_model_state, save_best_model_path)
            print(f"✅ New best model saved at {save_best_model_path}")

    print(f"\n✅ Training complete in {time.time() - start_time:.2f} seconds.")
    print(f"🥇 Best Validation Accuracy: {best_val_acc:.4f}")

    if best_model_state:
        model.load_state_dict(best_model_state)

    return model

#### ======================  Evaluating the Cascading Models  =====================

In [None]:
def evaluate_cascading_model(model, data_loader, model_name="Model", device=None, class_names=None, verbose=True):
    """
    Evaluates the trained model and visualizes both raw and normalized confusion matrices.

    Arguments:
        model (nn.Module): Trained model to evaluate.
        data_loader (DataLoader): Evaluation DataLoader.
        model_name (str): Name of the model (e.g., 'CNN', 'LSTM') for labeling purposes.
        device (torch.device or str): Evaluation device.
        class_names (list): Optional list of class names.
        verbose (bool): If True, prints classification report.

    Returns:
        dict: Evaluation metrics including confusion matrices.
    """

    if device is None:
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    model.to(device)
    model.eval()
    all_preds, all_labels = [], []

    with torch.no_grad():
        for X_batch, y_batch in data_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            outputs = model(X_batch)
            preds = outputs.argmax(dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(y_batch.cpu().numpy())

    # Computing the evaluation metrics
    
    accuracy = accuracy_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds, average='weighted', zero_division=0)
    recall = recall_score(all_labels, all_preds, average='weighted', zero_division=0)
    f1 = f1_score(all_labels, all_preds, average='weighted', zero_division=0)

    if verbose:
        print(f"\n📊 Classification Report for {model_name} Model:\n")
        print(classification_report(all_labels, all_preds, target_names=class_names if class_names else None))

    # Raw and normalized confusion matrices
    
    cm = confusion_matrix(all_labels, all_preds)
    cm_normalized = cm.astype('float') / cm.sum(axis=1, keepdims=True)

    if verbose:
        fig, axs = plt.subplots(1, 2, figsize=(18, 6))

        sns.heatmap(cm, annot=True, fmt="d", cmap="Blues",
                    xticklabels=class_names if class_names else "auto",
                    yticklabels=class_names if class_names else "auto", ax=axs[0])
        axs[0].set_title(f"{model_name} - Raw Confusion Matrix")
        axs[0].set_xlabel("Predicted Label")
        axs[0].set_ylabel("True Label")

        sns.heatmap(cm_normalized, annot=True, fmt=".2f", cmap="YlGnBu",
                    xticklabels=class_names if class_names else "auto",
                    yticklabels=class_names if class_names else "auto", ax=axs[1])
        axs[1].set_title(f"{model_name} - Normalized Confusion Matrix")
        axs[1].set_xlabel("Predicted Label")
        axs[1].set_ylabel("True Label")

        plt.tight_layout()
        plt.show()

    return {
        "accuracy": accuracy,
        "precision": precision,
        "recall": recall,
        "f1_score": f1,
        "confusion_matrix": cm.tolist(),
        "normalized_confusion_matrix": cm_normalized.tolist(),
        "model_name": model_name
    }

#### ====================  Saving the Cascading Model Summary  ====================

In [None]:
model_builders = {
    "CNN": build_intrusion_detection_cnn,
    "LSTM": build_intrusion_detection_lstm,
    "Transformer": build_intrusion_detection_transformer,
    "GNN": build_intrusion_detection_gnn
}

In [None]:
def plot_confusion_matrix(model, dataloader, device=None, class_names=None, normalize=True, title="Confusion Matrix", cmap="Blues"):
    """
    Plots the confusion matrix of a trained model on a given dataset.

    Arguments:
        model (nn.Module): Trained model to evaluate.
        dataloader (DataLoader): DataLoader with samples to evaluate.
        device (torch.device, optional): Computation device. If None, auto-selects.
        class_names (list, optional): List of class labels to show on axes.
        normalize (bool): If True, normalize values by row (per class).
        title (str): Title of the plot.
        cmap (str): Matplotlib colormap to use.
    """
    if device is None:
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    model.to(device)
    model.eval()
    all_preds, all_labels = [], []

    with torch.no_grad():
        for X_batch, y_batch in dataloader:
            X_batch = X_batch.to(device)
            outputs = model(X_batch)
            preds = outputs.argmax(dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(y_batch.numpy())

    # Generating the confusion matrix
    
    cm = confusion_matrix(all_labels, all_preds)
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1, keepdims=True)

    if class_names is None:
        unique = sorted(set(all_labels + all_preds))
        class_names = [str(c) for c in unique]

    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=class_names)
    disp.plot(cmap=cmap, values_format=".2f" if normalize else "d")
    plt.title(title)
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.show()

==============================  Cascading Models Benchmark  =========================

In [None]:
def benchmark_all_permutations_model(model_builders: dict, input_dimension: int, feature_dimension: int, number_of_classes: int,
                                     train_loader, val_loader, test_loader, num_epochs: int = 50, 
                                     save_dir: str = "./permutation_models", verbose: bool = False):
    """
    Benchmarks all 24 permutations of CNN, LSTM, Transformer, GNN models in a cascading architecture.

    Trains, evaluates, and records F1 scores for each permutation. 
    Automatically plots confusion matrix for the best model and F1 score distribution by the leading model.
    """

    os.makedirs(save_dir, exist_ok=True)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model_names = list(model_builders.keys())

    best_f1 = -1.0
    best_model_info = {}
    results = []
    grouped_results = {name: [] for name in model_names}

    print(f"\n📊 Starting 24-model permutation benchmark...\n")

    for i, models in enumerate(permutations(model_names), 1):
        print(f"\n🔁 Permutation {i}/24: {models}")

        # Generating the model sequence
        model_sequence = generate_model_sequence(perm=models, model_builders=model_builders, input_dimension=input_dimension,
                                                 feature_dimension=feature_dimension, number_of_classes=number_of_classes)

        # Building the full cascading model
        cascading_model = build_intrusion_cascading_model(model_sequence=model_sequence, input_dimension=input_dimension,
                                                          feature_dimension=feature_dimension, number_of_classes=number_of_classes, freeze_extractors=False, verbose=verbose)

        model_name = "_".join(models)
        model_path = os.path.join(save_dir, f"{model_name}.pt")

        # Training the model
        trained_cascading_model = train_cascading_model(model=cascading_model, train_loader=train_loader, val_loader=val_loader,
                                    num_epochs=num_epochs, learning_rate=1e-3, weight_decay=1e-5, 
                                    save_best_model_path=model_path, use_scheduler=True, device=device)

        # Loading best weights
        trained_cascading_model.load_state_dict(torch.load(model_path))

        # Evaluating the model
        metrics = evaluate_cascading_model(model=trained_cascading_model, data_loader=test_loader, model_name=model_name, 
                                 device=device, class_names=[str(i) for i in range(number_of_classes)], verbose=False)

        print(f"✅ Permutation {i} completed. Test F1: {metrics['f1_score']:.4f}")

        results.append({
            "permutation": models,
            "metrics": metrics,
            "model_path": model_path
        })

        grouped_results[models[0]].append(metrics['f1_score'])

        if metrics['f1_score'] > best_f1:
            best_f1 = metrics['f1_score']
            best_model_info = {
                "permutation": models,
                "metrics": metrics,
                "model_path": model_path
            }

    # Saving all results
    summary_path = os.path.join(save_dir, "benchmark_results.json")
    with open(summary_path, "w") as f:
        json.dump(results, f, indent=4)

    print(f"\n🏆 Best permutation: {best_model_info['permutation']} | F1-score: {best_model_info['metrics']['f1_score']:.4f}")
    print(f"📁 Saved best model at: {best_model_info['model_path']}")
    print(f"🗂 Benchmark results saved to: {summary_path}")

    # Rebuilding and reloading the best model
    best_model_sequence = generate_model_sequence(perm=best_model_info['permutation'], model_builders=model_builders,
                                                  input_dimension=input_dimension, feature_dimension=feature_dimension, number_of_classes=number_of_classes)

    best_model = build_intrusion_cascading_model(model_sequence=best_model_sequence, input_dimension=input_dimension, 
                                                 feature_dimension=feature_dimension, number_of_classes=number_of_classes, freeze_extractors=False, verbose=False)

    best_model.load_state_dict(torch.load(best_model_info['model_path']))

    # Plotting the confusion matrix for the best model
    plot_confusion_matrix(model=best_model, dataloader=test_loader, device=device,
                          class_names=[str(i) for i in range(number_of_classes)], title="Best Cascading Model Confusion Matrix")

    # Visualizing F1 distribution by leading model
    plt.figure(figsize=(10, 6))
    sns.boxplot(data=[grouped_results[k] for k in grouped_results.keys()],
                palette="coolwarm")
    plt.xticks(ticks=range(len(grouped_results)), labels=grouped_results.keys())
    plt.title("📈 F1 Score Distribution by Leading Model")
    plt.ylabel("F1 Score")
    plt.xlabel("Leading Model")
    plt.tight_layout()
    plt.show()

    return best_model_info, results

In [None]:
best_model_information, full_results = benchmark_all_permutations_model(model_builders=model_builders, input_dimension=128,
                                                      feature_dimension=128, number_of_classes=5, train_loader=train_loader,
                                                      val_loader=val_loader, test_loader=test_loader, num_epochs=10, save_dir="./permutation_models",
                                                      verbose=True)

========================  Saving the Cascading Permutations Models Metrics  ========================

In [None]:
def export_cascading_metrics_to_csv(results: list, save_path: str):
    
    dataframe = pd.DataFrame([{
        "Permutation": " > ".join(result['permutation']),
        "Accuracy": result['metrics']['accuracy'],
        "Precision": result['metrics']['precision'],
        "Recall": result['metrics']['recall'],
        "F1 Score": result['metrics']['f1_score'],
        "Model Path": result['model_path']
    } for result in results])

    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    base_path = os.path.splitext(save_path)[0]
    excel_path = f"{base_path}_{timestamp}.xlsx"
    csv_path = f"{base_path}_{timestamp}.csv"

    dataframe.to_excel(excel_path, index=False)
    dataframe.to_csv(csv_path, index=False)
    
    print(f"📊 Cascading benchmark results exported to:\n- Excel: {excel_path}\n- CSV: {csv_path}")

In [None]:
export_cascading_metrics_to_csv(full_results, "All_cascading_permutations_metrics_results.csv")

========================= Building the Cascading models Manually  ========================

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
cltg_model = build_intrusion_cascading_model(
    model_sequence=generate_model_sequence(
        perm=("CNN", "LSTM", "Transformer", "GNN"),  
        model_builders=model_builders,               
        input_dimension=input_dimension,                          
        feature_dimension=128,                       
        number_of_classes=number_of_classes                        
    ),
    input_dimension=input_dimension,
    feature_dimension=128,
    number_of_classes=number_of_classes,
    freeze_extractors=False,                         
    verbose=False
)

In [None]:
 # Training the cascading model

trained_cltg_model = train_cascading_model(model=tlgc_model, train_loader=train_loader, val_loader=val_loader,
                                    num_epochs=10, learning_rate=1e-3, weight_decay=1e-5, 
                                    save_best_model_path="./permutations/CNN_LSTM_Transformer_GNN.pt", use_scheduler=True, device=device)

In [None]:
# Evaluating the cascading model

cltg_metrics = evaluate_model(model=trained_cltg_model, data_loader=test_loader, model_name="CLTG",
                         class_names=class_names, verbose=True)

In [None]:
clgt_model = build_intrusion_cascading_model(
    model_sequence=generate_model_sequence(
        perm=("CNN", "LSTM", "GNN", "Transformer"),
        model_builders=model_builders,              
        input_dimension=input_dimension,                       
        feature_dimension=128,                      
        number_of_classes=number_of_classes                      
    ),
    input_dimension=input_dimension,
    feature_dimension=128,
    number_of_classes=number_of_classes,
    freeze_extractors=False,                         
    verbose=False
)

In [None]:
 # Training the cascading model

trained_clgt_model = train_cascading_model(model=clgt_model, train_loader=train_loader, val_loader=val_loader,
                                    num_epochs=10, learning_rate=1e-3, weight_decay=1e-5, 
                                    save_best_model_path="./permutations/CNN_LSTM_GNN_Transformer.pt", use_scheduler=True, device=device)

In [None]:
# Evaluating the cascading model

clgt_metrics = evaluate_model(model=trained_clgt_model, data_loader=test_loader, model_name="CLGT",
                         class_names=class_names, verbose=True)

In [None]:
ctlg_model = build_intrusion_cascading_model(
    model_sequence=generate_model_sequence(
        perm=("CNN","Transformer", "LSTM", "GNN"), 
        model_builders=model_builders,               
        input_dimension=input_dimension,                         
        feature_dimension=128,                       
        number_of_classes=number_of_classes                 
    ),
    input_dimension=input_dimension,
    feature_dimension=128,
    number_of_classes=number_of_classes,
    freeze_extractors=False,                        
    verbose=False
)

In [None]:
 # Training the cascading model

trained_ctlg_model = train_cascading_model(model=ctlg_model, train_loader=train_loader, val_loader=val_loader,
                                    num_epochs=10, learning_rate=1e-3, weight_decay=1e-5, 
                                    save_best_model_path="./permutations/CNN_Transformer_LSTM_GNN.pt", use_scheduler=True, device=device)

In [None]:
# Evaluating the cascading model

ctlg_metrics = evaluate_model(model=trained_ctlg_model, data_loader=test_loader, model_name="CTLG",
                         class_names=class_names, verbose=True)

In [None]:
ctgl_model = build_intrusion_cascading_model(
    model_sequence=generate_model_sequence(
        perm=("CNN", "Transformer", "GNN", "LSTM"), 
        model_builders=model_builders,               
        input_dimension=input_dimension,                          
        feature_dimension=128,                       
        number_of_classes=number_of_classes                         
    ),
    input_dimension=input_dimension,
    feature_dimension=128,
    number_of_classes=number_of_classes,
    freeze_extractors=False,                        
    verbose=False
)

In [None]:
# Training the cascding model

trained_ctgl_model = train_cascading_model(model=ctgl_model, train_loader=train_loader, val_loader=val_loader,
                                    num_epochs=10, learning_rate=1e-3, weight_decay=1e-5, 
                                    save_best_model_path="./permutations/CNN_Transformer_GNN_LSTM.pt", use_scheduler=True, device=device)

In [None]:
# Evalutating the cascading model

ctgl_metrics = evaluate_model(model=trained_ctgl_model, data_loader=test_loader, model_name="CTGL",
                         class_names=class_names, verbose=True)

In [None]:
cglt_model = build_intrusion_cascading_model(
    model_sequence=generate_model_sequence(
        perm=("CNN", "GNN", "LSTM", "Transformer"), 
        model_builders=model_builders,               
        input_dimension=input_dimension,                         
        feature_dimension=128,                       
        number_of_classes=number_of_classes                          
    ),
    input_dimension=input_dimension,
    feature_dimension=128,
    number_of_classes=number_of_classes,
    freeze_extractors=False,                        
    verbose=False
)

In [None]:
# Training the cascading model

trained_cglt_model = train_cascading_model(model=cglt_model, train_loader=train_loader, val_loader=val_loader,
                                    num_epochs=10, learning_rate=1e-3, weight_decay=1e-5, 
                                    save_best_model_path="./permutations/CNN_GNN_LSTM_Transformer.pt", use_scheduler=True, device=device)

In [None]:
# Evaluating the cascading model

cglt_metrics = evaluate_model(model=trained_cglt_model, data_loader=test_loader, model_name="CGLT",
                         class_names=class_names, verbose=True)

In [None]:
cgtl_model = build_intrusion_cascading_model(
    model_sequence=generate_model_sequence(
        perm=("CNN", "GNN", "Transformer", "LSTM"), 
        model_builders=model_builders,               
        input_dimension=input_dimension,                         
        feature_dimension=128,                       
        number_of_classes=number_of_classes                          
    ),
    input_dimension=input_dimension,
    feature_dimension=128,
    number_of_classes=number_of_classes,
    freeze_extractors=False,                        
    verbose=False
)

In [None]:
# Training the cascading model

trained_cgtl_model = train_cascading_model(model=cgtl_model, train_loader=train_loader, val_loader=val_loader,
                                    num_epochs=10, learning_rate=1e-3, weight_decay=1e-5, 
                                    save_best_model_path="./permutations/CNN_GNN_Transformer_LSTM.pt", use_scheduler=True, device=device)

In [None]:
# Evaluating the cascading model

cgtl_metrics = evaluate_model(model=trained_cgtl_model, data_loader=test_loader, model_name="CGTL",
                         class_names=class_names, verbose=True)

In [None]:
lctg_model = build_intrusion_cascading_model(
    model_sequence=generate_model_sequence(
        perm=("LSTM", "CNN", "Transformer", "GNN"), 
        model_builders=model_builders,               
        input_dimension=input_dimension,                         
        feature_dimension=128,                       
        number_of_classes=number_of_classes                          
    ),
    input_dimension=input_dimension,
    feature_dimension=128,
    number_of_classes=number_of_classes,
    freeze_extractors=False,                        
    verbose=False
)

In [None]:
# Training the cascading model

trained_lctg_model = train_cascading_model(model=lctg_model, train_loader=train_loader, val_loader=val_loader,
                                    num_epochs=10, learning_rate=1e-3, weight_decay=1e-5, 
                                    save_best_model_path="./permutations/LSTM_CNN_Transformer_GNN.pt", use_scheduler=True, device=device)

In [None]:
# Evaluating the cascading model

lctg_metrics = evaluate_model(model=trained_lctg_model, data_loader=test_loader, model_name="LCTG",
                         class_names=class_names, verbose=True)

In [None]:
lcgt_model = build_intrusion_cascading_model(
    model_sequence=generate_model_sequence(
        perm=("LSTM", "CNN", "GNN", "Transformer"), 
        model_builders=model_builders,               
        input_dimension=input_dimension,                         
        feature_dimension=128,                       
        number_of_classes=number_of_classes                          
    ),
    input_dimension=input_dimension,
    feature_dimension=128,
    number_of_classes=number_of_classes,
    freeze_extractors=False,                        
    verbose=False
)

In [None]:
# Training the cascading model

trained_lcgt_model = train_cascading_model(model=lcgt_model, train_loader=train_loader, val_loader=val_loader,
                                    num_epochs=10, learning_rate=1e-3, weight_decay=1e-5, 
                                    save_best_model_path="./permutations/LSTM_CNN_GNN_Transformer.pt", use_scheduler=True, device=device)

In [None]:
# Evaluating the cascading model

lcgt_metrics = evaluate_model(model=trained_lcgt_model, data_loader=test_loader, model_name="LCGT",
                         class_names=class_names, verbose=True)

In [None]:
ltcg_model = build_intrusion_cascading_model(
    model_sequence=generate_model_sequence(
        perm=("LSTM", "Transformer", "CNN", "GNN"), 
        model_builders=model_builders,               
        input_dimension=input_dimension,                         
        feature_dimension=128,                       
        number_of_classes=number_of_classes                          
    ),
    input_dimension=input_dimension,
    feature_dimension=128,
    number_of_classes=number_of_classes,
    freeze_extractors=False,                        
    verbose=False
)

In [None]:
# Training the cascading model

trained_ltcg_model = train_cascading_model(model=ltcg_model, train_loader=train_loader, val_loader=val_loader,
                                    num_epochs=10, learning_rate=1e-3, weight_decay=1e-5, 
                                    save_best_model_path="./permutations/LSTM_Transformer_CNN_GNN.pt", use_scheduler=True, device=device)

In [None]:
# Evaluating the cascading model

ltcg_metrics = evaluate_model(model=trained_ltcg_model, data_loader=test_loader, model_name="LTCG",
                         class_names=class_names, verbose=True)

In [None]:
ltgc_model = build_intrusion_cascading_model(
    model_sequence=generate_model_sequence(
        perm=("LSTM", "Transformer", "GNN", "CNN"), 
        model_builders=model_builders,               
        input_dimension=input_dimension,                         
        feature_dimension=128,                       
        number_of_classes=number_of_classes                          
    ),
    input_dimension=input_dimension,
    feature_dimension=128,
    number_of_classes=number_of_classes,
    freeze_extractors=False,                        
    verbose=False
)

In [None]:
# Training the cascading model

trained_ltgc_model = train_cascading_model(model=ltgc_model, train_loader=train_loader, val_loader=val_loader,
                                    num_epochs=10, learning_rate=1e-3, weight_decay=1e-5, 
                                    save_best_model_path="./permutations/LSTM_Transformer_GNN_CNN.pt", use_scheduler=True, device=device)

In [None]:
# Evaluating the cascading model

ltgc_metrics = evaluate_model(model=trained_ltgc_model, data_loader=test_loader, model_name="LTGC",
                         class_names=class_names, verbose=True)

In [None]:
lgct_model = build_intrusion_cascading_model(
    model_sequence=generate_model_sequence(
        perm=("LSTM", "GNN", "CNN", "Transformer"), 
        model_builders=model_builders,               
        input_dimension=input_dimension,                         
        feature_dimension=128,                       
        number_of_classes=number_of_classes                          
    ),
    input_dimension=input_dimension,
    feature_dimension=128,
    number_of_classes=number_of_classes,
    freeze_extractors=False,                        
    verbose=False
)

In [None]:
# Training the cascading model

trained_lgct_model = train_cascading_model(model=lgct_model, train_loader=train_loader, val_loader=val_loader,
                                    num_epochs=10, learning_rate=1e-3, weight_decay=1e-5, 
                                    save_best_model_path="./permutations/LSTM_GNN_CNN_Transformer.pt", use_scheduler=True, device=device)

In [None]:
# Evaluating the cascading model

lgct_metrics = evaluate_model(model=trained_lgct_model, data_loader=test_loader, model_name="LGCT",
                         class_names=class_names, verbose=True)

In [None]:
lgtc_model = build_intrusion_cascading_model(
    model_sequence=generate_model_sequence(
        perm=("LSTM", "GNN", "Transformer", "CNN"), 
        model_builders=model_builders,               
        input_dimension=input_dimension,                         
        feature_dimension=128,                       
        number_of_classes=number_of_classes                          
    ),
    input_dimension=input_dimension,
    feature_dimension=128,
    number_of_classes=number_of_classes,
    freeze_extractors=False,                        
    verbose=False
)

In [None]:
# Training the cascading model

trained_lgtc_model = train_cascading_model(model=lgtc_model, train_loader=train_loader, val_loader=val_loader,
                                    num_epochs=10, learning_rate=1e-3, weight_decay=1e-5, 
                                    save_best_model_path="./permutations/LSTM_GNN_Transformer_CNN.pt", use_scheduler=True, device=device)

In [None]:
# Evaluating the cascading model

lgtc_metrics = evaluate_model(model=trained_lgtc_model, data_loader=test_loader, model_name="LGTC",
                         class_names=class_names, verbose=True)

In [None]:
tclg_model = build_intrusion_cascading_model(
    model_sequence=generate_model_sequence(
        perm=("Transformer", "CNN", "LSTM", "GNN"),  
        model_builders=model_builders,               
        input_dimension=input_dimension,                          
        feature_dimension=128,                       
        number_of_classes=number_of_classes                          
    ),
    input_dimension=input_dimension,
    feature_dimension=128,
    number_of_classes=number_of_classes,
    freeze_extractors=False,                        
    verbose=False
)

In [None]:
# Training the cascading model

trained_tclg_model = train_cascading_model(model=tclg_model, train_loader=train_loader, val_loader=val_loader,
                                    num_epochs=10, learning_rate=1e-3, weight_decay=1e-5, 
                                    save_best_model_path="./permutations/Transformer_CNN_LSTM_GNN.pt", use_scheduler=True, device=device)

In [None]:
# Evaluating the cascading model

tclg_metrics = evaluate_model(model=trained_tclg_model, data_loader=test_loader, model_name="TCLG",
                         class_names=class_names, verbose=True)

In [None]:
tcgl_model = build_intrusion_cascading_model(
    model_sequence=generate_model_sequence(
        perm=("Transformer", "CNN", "GNN", "LSTM"),  
        model_builders=model_builders,               
        input_dimension=input_dimension,                          
        feature_dimension=128,                       
        number_of_classes=number_of_classes                          
    ),
    input_dimension=input_dimension,
    feature_dimension=128,
    number_of_classes=number_of_classes,
    freeze_extractors=False,                        
    verbose=False
)

In [None]:
# Training the cascading model

trained_tcgl_model = train_cascading_model(model=tcgl_model, train_loader=train_loader, val_loader=val_loader,
                                    num_epochs=10, learning_rate=1e-3, weight_decay=1e-5, 
                                    save_best_model_path="./permutations/Transformer_CNN_GNN_LSTM.pt", use_scheduler=True, device=device)

In [None]:
# Evaluating the cascading model

tcgl_metrics = evaluate_model(model=trained_tcgl_model, data_loader=test_loader, model_name="TCGL",
                         class_names=class_names, verbose=True)

In [None]:
tlcg_model = build_intrusion_cascading_model(
    model_sequence=generate_model_sequence(
        perm=("Transformer", "LSTM", "CNN", "GNN"),  
        model_builders=model_builders,               
        input_dimension=input_dimension,                          
        feature_dimension=128,                       
        number_of_classes=number_of_classes                          
    ),
    input_dimension=input_dimension,
    feature_dimension=128,
    number_of_classes=number_of_classes,
    freeze_extractors=False,                        
    verbose=False
)

In [None]:
# Training the cascading model

trained_tlcg_model = train_cascading_model(model=tlcg_model, train_loader=train_loader, val_loader=val_loader,
                                    num_epochs=10, learning_rate=1e-3, weight_decay=1e-5, 
                                    save_best_model_path="./permutations/Transformer_LSTM_CNN_GNN.pt", use_scheduler=True, device=device)

In [None]:
# Evaluating the cascading model

tlcg_metrics = evaluate_model(model=trained_tlcg_model, data_loader=test_loader, model_name="TLCG",
                         class_names=class_names, verbose=True)

In [None]:
tlgc_model = build_intrusion_cascading_model(
    model_sequence=generate_model_sequence(
        perm=("Transformer", "LSTM", "GNN", "CNN"),  
        model_builders=model_builders,               
        input_dimension=input_dimension,                          
        feature_dimension=128,                       
        number_of_classes=number_of_classes                          
    ),
    input_dimension=input_dimension,
    feature_dimension=128,
    number_of_classes=number_of_classes,
    freeze_extractors=False,                        
    verbose=False
)

In [None]:
# Training the cascading model

trained_tlgc_model = train_cascading_model(model=tlgc_model, train_loader=train_loader, val_loader=val_loader,
                                    num_epochs=10, learning_rate=1e-3, weight_decay=1e-5, 
                                    save_best_model_path="./permutations/Transformer_LSTM_GNN_CNN.pt", use_scheduler=True, device=device)

In [None]:
# Evaluating the cascading model

tlgc_metrics = evaluate_model(model=trained_tlgc_model, data_loader=test_loader, model_name="TLGC",
                         class_names=class_names, verbose=True)

In [None]:
tglc_model = build_intrusion_cascading_model(
    model_sequence=generate_model_sequence(
        perm=("Transformer", "GNN", "CNN", "LSTM"),  
        model_builders=model_builders,               
        input_dimension=input_dimension,                          
        feature_dimension=128,                       
        number_of_classes=number_of_classes                          
    ),
    input_dimension=input_dimension,
    feature_dimension=128,
    number_of_classes=number_of_classes,
    freeze_extractors=False,                        
    verbose=False
)

In [None]:
# Training the cascading model

trained_tgcl_model = train_cascading_model(model=tcgl_model, train_loader=train_loader, val_loader=val_loader,
                                    num_epochs=10, learning_rate=1e-3, weight_decay=1e-5, 
                                    save_best_model_path="./permutations/Transformer_GNN_CNN_LSTM.pt", use_scheduler=True, device=device)

In [None]:
# Evaluating the cascading model

tgcl_metrics = evaluate_model(model=trained_tgcl_model, data_loader=test_loader, model_name="TGCL",
                         class_names=class_names, verbose=True)

In [None]:
tglc_model = build_intrusion_cascading_model(
    model_sequence=generate_model_sequence(
        perm=("Transformer", "GNN", "LSTM", "CNN"),  
        model_builders=model_builders,               
        input_dimension=input_dimension,                          
        feature_dimension=128,                       
        number_of_classes=number_of_classes                          
    ),
    input_dimension=input_dimension,
    feature_dimension=128,
    number_of_classes=number_of_classes,
    freeze_extractors=False,                        
    verbose=False
)

In [None]:
# Training the cascading model

trained_tglc_model = train_cascading_model(model=tglc_model, train_loader=train_loader, val_loader=val_loader,
                                    num_epochs=10, learning_rate=1e-3, weight_decay=1e-5, 
                                    save_best_model_path="./permutations/Transformer_GNN_LSTM_CNN.pt", use_scheduler=True, device=device)

In [None]:
# Evaluating the cascading model

tglc_metrics = evaluate_model(model=trained_tglc_model, data_loader=test_loader, model_name="TGLC",
                         class_names=class_names, verbose=True)

In [None]:
gclt_model = build_intrusion_cascading_model(
    model_sequence=generate_model_sequence(
        perm=("GNN", "CNN", "LSTM", "Transformer"), 
        model_builders=model_builders,               
        input_dimension=input_dimension,                        
        feature_dimension=128,                       
        number_of_classes=number_of_classes                          
    ),
    input_dimension=input_dimension,
    feature_dimension=128,
    number_of_classes=number_of_classes,
    freeze_extractors=False,                         
    verbose=False
)

In [None]:
# Training the cascading model

trained_gclt_model = train_cascading_model(model=gclt_model, train_loader=train_loader, val_loader=val_loader,
                                    num_epochs=10, learning_rate=1e-3, weight_decay=1e-5, 
                                    save_best_model_path="./permutations/GNN_CNN_LSTM_Transformer.pt", use_scheduler=True, device=device)

In [None]:
# Evaluating the cascading model

gclt_metrics = evaluate_model(model=trained_gclt_model, data_loader=test_loader, model_name="GCLT",
                         class_names=class_names, verbose=True)

In [None]:
gctl_model = build_intrusion_cascading_model(
    model_sequence=generate_model_sequence(
        perm=("GNN", "CNN", "Transformer", "LSTM"), 
        model_builders=model_builders,               
        input_dimension=input_dimension,                        
        feature_dimension=128,                     
        number_of_classes=number_of_classes                          
    ),
    input_dimension=input_dimension,
    feature_dimension=128,
    number_of_classes=number_of_classes,
    freeze_extractors=False,                         
    verbose=False
)

In [None]:
# Training the cascading model

trained_gctl_model = train_cascading_model(model=gctl_model, train_loader=train_loader, val_loader=val_loader,
                                    num_epochs=10, learning_rate=1e-3, weight_decay=1e-5, 
                                    save_best_model_path="./permutations/GNN_CNN_Transformer_LSTM.pt", use_scheduler=True, device=device)

In [None]:
# Evaluating the cascading model

gctl_metrics = evaluate_model(model=trained_gctl_model, data_loader=test_loader, model_name="GCTL",
                         class_names=class_names, verbose=True)

In [None]:
glct_model = build_intrusion_cascading_model(
    model_sequence=generate_model_sequence(
        perm=("GNN", "LSTM", "CNN", "Transformer"),  
        model_builders=model_builders,               
        input_dimension=input_dimension,                          
        feature_dimension=128,                       
        number_of_classes=number_of_classes            
    ),
    input_dimension=input_dimension,
    feature_dimension=128,
    number_of_classes=number_of_classes,
    freeze_extractors=False,                        
    verbose=False
)

In [None]:
# Training the cascading model

trained_glct_model = train_cascading_model(model=glct_model, train_loader=train_loader, val_loader=val_loader,
                                    num_epochs=10, learning_rate=1e-3, weight_decay=1e-5, 
                                    save_best_model_path="./permutations/GNN_LSTM_CNN_Transformer.pt", use_scheduler=True, device=device)

In [None]:
# Evaluating the cascading model

glct_metrics = evaluate_model(model=trained_glct_model, data_loader=test_loader, model_name="GLCT",
                         class_names=class_names, verbose=True)

In [None]:
gltc_model = build_intrusion_cascading_model(
    model_sequence=generate_model_sequence(
        perm=("GNN", "LSTM", "Transformer", "CNN"),  
        model_builders=model_builders,               
        input_dimension=input_dimension,                          
        feature_dimension=128,                       
        number_of_classes=number_of_classes                         
    ),
    input_dimension=input_dimension,
    feature_dimension=128,
    number_of_classes=number_of_classes,
    freeze_extractors=False,                         
    verbose=False
)

In [None]:
# Training the cascading model

trained_gltc_model = train_cascading_model(model=gltc_model, train_loader=train_loader, val_loader=val_loader,
                                    num_epochs=10, learning_rate=1e-3, weight_decay=1e-5, 
                                    save_best_model_path="./permutations/GNN_LSTM_Transformer_CNN.pt", use_scheduler=True, device=device)

In [None]:
# Evaluating the cascading model

gltc_metrics = evaluate_model(model=trained_gltc_model, data_loader=test_loader, model_name="GLTC",
                         class_names=class_names, verbose=True)

In [None]:
gtcl_model = build_intrusion_cascading_model(
    model_sequence=generate_model_sequence(
        perm=("GNN", "Transformer", "CNN", "LSTM"),  
        model_builders=model_builders,               
        input_dimension=input_dimension,                          
        feature_dimension=128,                       
        number_of_classes=number_of_classes                        
    ),
    input_dimension=input_dimension,
    feature_dimension=128,
    number_of_classes=number_of_classes,
    freeze_extractors=False,                       
    verbose=False
)

In [None]:
# Training the cascading model

trained_gtcl_model = train_cascading_model(model=gtcl_model, train_loader=train_loader, val_loader=val_loader,
                                    num_epochs=10, learning_rate=1e-3, weight_decay=1e-5, 
                                    save_best_model_path="./permutations/GNN_Transformer_CNN_LSTM.pt", use_scheduler=True, device=device)

In [None]:
# Evaluating the cascading model

gtcl_metrics = evaluate_model(model=trained_gtcl_model, data_loader=test_loader, model_name="GTCL",
                         class_names=class_names, verbose=True)

In [None]:
gtlc_model = build_intrusion_cascading_model(
    model_sequence=generate_model_sequence(
        perm=("GNN","Transformer", "LSTM", "CNN"),  
        model_builders=model_builders,              
        input_dimension=input_dimension,                         
        feature_dimension=128,                       
        number_of_classes=number_of_classes                         
    ),
    input_dimension=input_dimension,
    feature_dimension=128,
    number_of_classes=number_of_classes,
    freeze_extractors=False,                        
    verbose=False
)

In [None]:
 # Training the cascading model

trained_gtlc_model = train_cascading_model(model=gtlc_model, train_loader=train_loader, val_loader=val_loader,
                                    num_epochs=10, learning_rate=1e-3, weight_decay=1e-5, 
                                    save_best_model_path="./permutations/GNN_Transformer_LSTM_CNN.pt", use_scheduler=True, device=device)

In [None]:
# Evaluating the cascading model

gtlc_metrics = evaluate_model(model=trained_gtlc_model, data_loader=test_loader, model_name="GTLC",
                         class_names=class_names, verbose=True)

========================  Saving the Cascading Models Metrics  ========================

In [None]:
cascading_metrics = {"CLTG": cltg_metrics, "CLGT": clgt_metrics, "CTLG": ctlg_metrics, "CTGL": ctgl_metrics,
                           "CGLT": cglt_metrics, "CGTL": cgtl_metrics, "LCTG": lctg_metrics, "LCGT": lcgt_metrics,
                           "LTCG": ltcg_metrics, "LTGC": ltgc_metrics, "LGCT": lgct_metrics, "LGTC": lgtc_metrics,
                           "TCLG": tclg_metrics, "TCGL": tcgl_metrics, "TLCG": tlcg_metrics, "TLGC": tlgc_metrics,
                           "TGCL": tgcl_metrics, "TGLC": tglc_metrics, "GCLT": gclt_metrics, "GCTL": gctl_metrics,
                           "GLCT": glct_metrics, "GLTC": gltc_metrics, "GTCL": gltc_metrics, "GTLC": gtlc_metrics}

In [None]:
def export_the_cascading_metrics_to_csv(metrics_dict: dict, save_path: str):
    
    dataframe = pd.DataFrame([{
        "Model": model_name,
        "Accuracy": metrics['accuracy'],
        "Precision": metrics['precision'],
        "Recall": metrics['recall'],
        "F1 Score": metrics['f1_score']
    } for model_name, metrics in metrics_dict.items()])

    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    base_path = os.path.splitext(save_path)[0]
    excel_path = f"{base_path}_{timestamp}.xlsx"
    csv_path = f"{base_path}_{timestamp}.csv"

    dataframe.to_excel(excel_path, index=False)
    dataframe.to_csv(csv_path, index=False)
    
    print(f"📊 Best standalone model metrics exported to:\n- Excel: {excel_path}\n- CSV: {csv_path}")

In [None]:
export_the_cascading_metrics_to_csv(cascading_metrics, "Cascading_permutations_metrics_results.csv")

===============================  Reloading the cascading models  ==============================  

In [152]:
def load_all_cascading_models(directory: str, verbose: bool = True) -> list:
    """
    Loads all fully saved PyTorch models from the directory.

    Arguments:
        directory (str): Path to directory containing model .pt files.

    Returns:
        list: List of loaded models.
    """
    cascading_models_list = []

    for file_name in sorted(os.listdir(directory)):
        if file_name.endswith(".pt"):
            model_path = os.path.join(directory, file_name)

            # Loading full model directly
            model = torch.load(model_path)

            # Appending to list
            cascading_models_list.append(model)

            if verbose:
                print(f"✅ Loaded full model from {file_name}")

    return cascading_models_list

In [None]:
# Setting the path to the saved models directory

saving_directory = "permutations"

cascading_models_list = load_all_cascading_models(directory=saving_directory, verbose=True)

# Confirming how many models were loaded

print(f"Loaded {len(cascading_models_list)} models from '{saving_directory}' directory.")

#### ====================  Clovis Mushagalusa CIRUBAKADERHA  ====================