# Experiment of Hybrid Ensemble Malware Detection (GWO + PHMM + DAE + Stacked Classifier) - 
<em> Reference: https://github.com/delphi20/Malware-Detection-with-ML-DL-/blob/main </em>

##### 
### Importing the dataset

In [44]:
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score
from sklearn.metrics import make_scorer, f1_score
from sklearn.preprocessing import StandardScaler
from mealpy import FloatVar
from mealpy.swarm_based import GWO

# ---- STEP 1: Load & prepare your dataset ----

# Load labeled malware and benign datasets
benign_df = pd.read_csv('benign_samples.csv')
malware_df = pd.read_csv('malware_samples.csv')

# Label data
benign_df['label'] = 0
malware_df['label'] = 1

# Merge and shuffle
df = pd.concat([benign_df, malware_df], axis=0).sample(frac=1).reset_index(drop=True)

# Select features and label
X = df.iloc[:, 3:-2].values  # Adjust based on real feature layout
y = df['label'].values

# Scale features
scaler = StandardScaler()
X = scaler.fit_transform(X)

n_features = X.shape[1]

# ---- STEP 2: Define your GWO-compatible objective function ----

def objective_function(solution):
    binary_mask = [1 if x > 0 else 0 for x in solution]
    
    # Avoid empty feature subset
    if sum(binary_mask) == 0:
        return 1.0

    selected_X = X[:, binary_mask]
    
    # You can switch classifiers (RandomForest, LogisticRegression, etc.)
    clf = RandomForestClassifier(random_state=42)
    
    f1 = cross_val_score(clf, selected_X, y, cv=3, scoring=make_scorer(f1_score)).mean()
    
    return 1 - f1  # because GWO minimizes the objective

# ---- STEP 3: Define the Mealpy problem ----

bounds = [FloatVar(lb=-10., ub=10., name=f"x{i}") for i in range(n_features)]

problem = {
    "bounds": bounds,
    "minmax": "min",
    "obj_func": objective_function
}

# ---- STEP 4: Run GWO ----

model = GWO.OriginalGWO(epoch=100, pop_size=30)  # You can increase epochs for better results
model.solve(problem)

# ---- STEP 5: Extract best solution ----

if model.g_best is not None and hasattr(model.g_best, "solution"):
    binary_solution = [1 if val > 0 else 0 for val in model.g_best.solution]
    selected_features = [i for i, val in enumerate(binary_solution) if val == 1]

    print("Selected feature indices:", selected_features)
    print("Fitness (1 - F1 score):", model.g_best.target)
    print("Estimated F1 score:", 1 - model.g_best.target.fitness)
else:
    print("No valid solution found.")


2025/05/31 11:36:33 PM, INFO, mealpy.swarm_based.GWO.OriginalGWO: OriginalGWO(epoch=100, pop_size=30)
2025/05/31 11:48:20 PM, INFO, mealpy.swarm_based.GWO.OriginalGWO: >>>Problem: P, Epoch: 1, Current best: 0.08208439471223039, Global best: 0.08208439471223039, Runtime: 368.57790 seconds
2025/05/31 11:53:58 PM, INFO, mealpy.swarm_based.GWO.OriginalGWO: >>>Problem: P, Epoch: 2, Current best: 0.08208439471223039, Global best: 0.08208439471223039, Runtime: 338.40540 seconds


KeyboardInterrupt: 

In [None]:

import pandas as pd
import torch


# Step 1: Load the data
benign_df = pd.read_csv('benign_samples.csv')   # CSV with benign samples
malware_df = pd.read_csv('malware_samples.csv') # CSV with malware samples

# Step 2: Label the data
benign_df['label'] = 0  # Label all benign samples as 0
malware_df['label'] = 1  # Label all malware samples as 1

In [43]:
import pandas as pd
from sklearn.utils import shuffle
from sklearn.preprocessing import StandardScaler

# Merge and suffle the data
df = pd.concat([malware_df, benign_df], axis=0).reset_index(drop=True)
df = df.sample(frac=1).reset_index(drop=True)

# Split features and target
X = df.iloc[:, 3:-2].values  # adjust based on your actual feature columns
y = df['label'].values

# Scale features
scaler = StandardScaler()
X = scaler.fit_transform(X)

n_features = X.shape[1]


### Add Feature Selection - GWO for optimizing weights before discretization


In [None]:
# from sklearn.ensemble import RandomForestClassifier
# from sklearn.model_selection import cross_val_score
# def fitness(solution):
#     binary_solution = [1 if val > 0 else 0 for val in solution]
#     selected_idx = [i for i, bit in enumerate(binary_solution) if bit == 1]

#     if not selected_idx:  # avoid empty feature set
#         return 1.0

#     X_selected = X[:, selected_idx]
#     clf = RandomForestClassifier(n_estimators=50, random_state=42)
#     score = cross_val_score(clf, X_selected, y, cv=3, scoring='f1', n_jobs=-1).mean()

#     return 1 - score  # because GWO minimizes




### Add Feature Selection - GWO for optimizing weights before discretization


In [None]:
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score
from sklearn.metrics import make_scorer, f1_score

def objective_function(solution):
    binary_mask = [1 if x > 0 else 0 for x in solution]
    
    # Avoid empty feature subset
    if sum(binary_mask) == 0:
        return 1.0

    selected_X = X[:, binary_mask]
    
    # You can switch classifiers (RandomForest, LogisticRegression, etc.)
    clf = RandomForestClassifier(random_state=42)
    
    f1 = cross_val_score(clf, selected_X, y, cv=3, scoring=make_scorer(f1_score)).mean()
    
    return 1 - f1  # because GWO minimizes the objective


In [None]:
import numpy as np
from mealpy import FloatVar
from mealpy.swarm_based import GWO
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score
from sklearn.metrics import f1_score

# def objective_function(solution):
#     return np.sum(solution**2)

# X, y = load_breast_cancer(return_X_y=True)  # Replace with your dataset

def objective_function(solution):
    # Convert float solution to binary mask
    mask = [1 if x > 0 else 0 for x in solution]
    if sum(mask) == 0:  # Avoid empty feature set
        return 1.0
    X_selected = X[:, mask]
    clf = RandomForestClassifier()
    f1 = cross_val_score(clf, X_selected, y, scoring='f1', cv=3).mean()
    return 1 - f1


bounds = [FloatVar(lb=-10., ub=10., name=f"x{i}") for i in range(30)] 

problem_dict = {
    "bounds": bounds,
    "minmax": "min",
    "obj_func": objective_function
}

model = GWO.OriginalGWO(epoch=1000, pop_size=50)
result = model.solve(problem_dict)
if model.g_best is not None:
    if hasattr(model.g_best, "solution") and hasattr(model.g_best, "target"):
        print(f"Solution: {model.g_best.solution}, Fitness: {model.g_best.target}")
        # Get binary solution
        binary_solution = [1 if val > 0 else 0 for val in model.g_best.solution]
        selected_features = [i for i, val in enumerate(binary_solution) if val == 1]

        print("Selected feature indices:", selected_features)
        print("Fitness (1 - F1 score):", model.g_best.target)
        print("Estimated F1 score:", model.g_best.target.fitness)
    else:
        print("Best agent attributes:", dir(model.g_best))
        print("Best agent object:", model.g_best)
else:
    print("No best solution found. model.g_best is None.")





2025/05/31 11:28:36 PM, INFO, mealpy.swarm_based.GWO.OriginalGWO: OriginalGWO(epoch=1000, pop_size=50)
2025/05/31 11:28:36 PM, INFO, mealpy.swarm_based.GWO.OriginalGWO: >>>Problem: P, Epoch: 1, Current best: 499.2756586709646, Global best: 499.2756586709646, Runtime: 0.02000 seconds
2025/05/31 11:28:36 PM, INFO, mealpy.swarm_based.GWO.OriginalGWO: >>>Problem: P, Epoch: 2, Current best: 357.8251815085439, Global best: 357.8251815085439, Runtime: 0.01751 seconds
2025/05/31 11:28:36 PM, INFO, mealpy.swarm_based.GWO.OriginalGWO: >>>Problem: P, Epoch: 3, Current best: 295.3785289691553, Global best: 295.3785289691553, Runtime: 0.01806 seconds
2025/05/31 11:28:36 PM, INFO, mealpy.swarm_based.GWO.OriginalGWO: >>>Problem: P, Epoch: 4, Current best: 200.01863026453, Global best: 200.01863026453, Runtime: 0.01842 seconds
2025/05/31 11:28:36 PM, INFO, mealpy.swarm_based.GWO.OriginalGWO: >>>Problem: P, Epoch: 5, Current best: 172.57887533305563, Global best: 172.57887533305563, Runtime: 0.01657 se

Solution: [-4.37671881e-41 -5.06958735e-41 -4.43309035e-41  5.20654847e-41
 -4.91545644e-41 -4.51668720e-41  4.77423809e-41  4.93216332e-41
  5.06159788e-41 -4.88116518e-41  4.63810631e-41  4.83756035e-41
  4.36395603e-41 -4.69264913e-41  4.35354648e-41  4.67328861e-41
 -3.65246785e-41 -5.35504554e-41 -4.76840052e-41 -4.97336978e-41
 -4.65047126e-41 -4.31271532e-41 -5.24411888e-41 -4.76555201e-41
  4.70971112e-41  5.12496077e-41  4.94588921e-41 -4.70240050e-41
 -5.02893941e-41 -4.59519367e-41], Fitness: Objectives: [6.80869987e-80], Fitness: 6.808699872126948e-80
Selected feature indices: [3, 6, 7, 8, 10, 11, 12, 14, 15, 24, 25, 26]
Fitness (1 - F1 score): Objectives: [6.80869987e-80], Fitness: 6.808699872126948e-80
Estimated F1 score: 6.808699872126948e-80


### Discretising to create observation sequences

In [None]:
from sklearn.cluster import KMeans

features_malware = malware_df.iloc[:,3:-2]


kmeans = KMeans(n_clusters=5)

df_discretized = features_malware.apply(lambda x: kmeans.fit_predict(x.values.reshape(-1, 1)))

malware_df['Observation_Sequence'] = df_discretized.apply(lambda row: ','.join(map(str, row)), axis=1)

malware_df.head()


features_benign = benign_df.iloc[:,3:-2]
features_benign.head()

kmeans_b = KMeans(n_clusters=5)

df_discretized = features_benign.apply(lambda x: kmeans_b.fit_predict(x.values.reshape(-1, 1)))

benign_df['Observation_Sequence'] = df_discretized.apply(lambda row: ','.join(map(str, row)), axis=1)

benign_df.head()

### Training seperate markov models

Malware samples markov model

In [None]:
import numpy as np
from hmmlearn import hmm

# Extract sequences of observations for each sample
sequences_malware = [list(map(int, seq.split(','))) for seq in malware_df['Observation_Sequence']]
lengths = [len(seq) for seq in sequences_malware]

# Train HMM (use GaussianHMM if continuous or MultinomialHMM if discrete)
model_malware = hmm.MultinomialHMM(n_components=3, n_iter=100)  # 3 hidden states
X = np.concatenate(sequences_malware).reshape(-1, 1)

# Train the HMM on the observation sequences
model_malware.fit(X, lengths)

Benign samples model

In [None]:
# Extract sequences of observations for each sample
sequences_benign = [list(map(int, seq.split(','))) for seq in benign_df['Observation_Sequence']]
lengths = [len(seq) for seq in sequences_benign]

# Train HMM (use GaussianHMM if continuous or MultinomialHMM if discrete)
model_benign = hmm.MultinomialHMM(n_components=3, n_iter=100)  # 3 hidden states
X = np.concatenate(sequences_benign).reshape(-1, 1)

# Train the HMM on the observation sequences
model_benign.fit(X, lengths)

Metrics, change model var to display as necessary

In [None]:
# log_likelihood = model.score(X)
# print(f"Log Likelihood: {log_likelihood}")


# print("Number of hidden states:", model.n_components)


# print("Maximum iterations:", model.n_iter)
# print("Did the model converge?", model.monitor_.converged, "\n\n")

# print("Transition matrix (A):")
# print(model.transmat_)

# print("\n\nEmission probabilities (B):")
# print(model.emissionprob_)


# print("\n\nInitial state probabilities (π):")
# print(model.startprob_)

In [None]:
# params = model.get_params()
# print("HMM Parameters:")
# print(params)

Adding the scores to the dataframes

In [None]:
def add_hmm_scores(df_m, df_b, benign_hmm, malware_hmm):
    # Convert sequences to proper format for HMM
    sequences_m = [list(map(int, seq.split(','))) for seq in df_m['Observation_Sequence']]
    sequences_b = [list(map(int, seq.split(','))) for seq in df_b['Observation_Sequence']]

    # Calculate scores for each sequence
    malware_scores = []
    for seq in sequences_m:
        # Reshape the sequence and compute the score
        score = malware_hmm.score(np.array(seq).reshape(-1, 1))
        malware_scores.append(score)

    benign_scores = []
    for seq in sequences_b:
        # Reshape the sequence and compute the score
        score = benign_hmm.score(np.array(seq).reshape(-1, 1))
        benign_scores.append(score)

    # Add scores as new columns
    df_m['malware_hmm_score'] = malware_scores
    df_b['benign_hmm_score'] = benign_scores

    return df_m, df_b

# Add scores to train and test dataframes
malware_df, benign_df = add_hmm_scores(malware_df, benign_df, model_benign, model_malware)

Check dataframes

In [None]:
malware_df.head()

Reorder columns

In [None]:
# Swap the last two columns with the two columns before them
cols = malware_df.columns.tolist()
# Swap the last two with the two before them
cols[-4:-2], cols[-2:] = cols[-2:], cols[-4:-2]

# Reorder the DataFrame using the new column order
malware_df = malware_df[cols]

malware_df.head()

# Swap the last two columns with the two columns before them
cols = benign_df.columns.tolist()
# Swap the last two with the two before them
cols[-4:-2], cols[-2:] = cols[-2:], cols[-4:-2]

# Reorder the DataFrame using the new column order
benign_df = benign_df[cols]

malware_df.head()

Temporal test train splitting

In [None]:
# Step 3: Concatenate both datasets
combined_df = pd.concat([benign_df, malware_df])

# Step 4: Shuffle the combined data
shuffled_df = shuffle(combined_df, random_state=42)
shuffled_df["Year "].isnull().sum()

# fill nan values
cols = shuffled_df[-2:]
shuffled_df= shuffled_df.fillna(0)


train_df = shuffled_df[shuffled_df['Year ']<2020]
test_df = shuffled_df[shuffled_df['Year ']>=2020]

train_df.tail()

code that prints number of samples that are past the year 2020, and all that are before

In [None]:
# Samples with Year > 2020
count_after_2020 = (shuffled_df["Year "] > 2020).sum()
print("Samples after 2020:", count_after_2020)

# Samples with Year <= 2020
count_before_or_in_2020 = (shuffled_df["Year "] <= 2020).sum()
print("Samples in or before 2020:", count_before_or_in_2020)

Label encode the month column

In [None]:
train_df_features = train_df.iloc[:, 1:-3]
train_df_labels = train_df.iloc[:,-2]

test_df_features = test_df.iloc[:,1:-3]
test_df_labels = test_df.iloc[:,-2]

train_df_labels.head()

In [None]:
print(train_df.iloc[:, 1:-5].index.equals(train_df.iloc[:, -2:].index))

Fixing messed up values in month column

In [None]:
train_df_features['Month'].unique()

In [None]:
import pandas as pd
from sklearn.impute import SimpleImputer

# Define a mapping for misspelled and inconsistent month values
month_corrections = {
    'January': 'January', 'Jan': 'January',
    'February': 'February', 'February ': 'February',
    'March': 'March', 'Marh': 'March', 'March ': 'March',
    'April': 'April',
    'May': 'May', ' May': 'May', 'mAY': 'May',
    'June': 'June', 'June ': 'June',
    'July': 'July', 'July ': 'July',
    'August': 'August', 'August ': 'August',
    'September': 'September', 'September ': 'September', 'September  ': 'September',
    'October': 'October', 'October ': 'October',
    'November': 'November', 'November ': 'November', 'Novembber': 'November', 'november ': 'November',
    'December': 'December', 'December ': 'December', 'Dececmber ': 'December', 'Devember': 'December', 'Dember ': 'December'
}

# Clean the column by stripping extra spaces, converting to title case, and mapping values
train_df_features['Month'] = train_df_features['Month'].str.strip().str.title()  # Remove extra spaces and ensure proper case
train_df_features['Month'] = train_df_features['Month'].map(month_corrections)  # Map corrected values

imputer = SimpleImputer(strategy='most_frequent')

# Fix: Extract the first column (ravel()) from the 2D array returned by fit_transform
train_df_features['Month'] = imputer.fit_transform(train_df_features[['Month']]).ravel()

# Verify unique values after cleaning
print(train_df_features['Month'].unique())
print(train_df_features['Month'].isna().sum())

# Apply same to test features dataset
test_df_features['Month'] = test_df_features['Month'].str.strip().str.title()
test_df_features['Month'] = test_df_features['Month'].map(month_corrections)

# For the test data, we should use transform() instead of fit_transform() to use the same imputer
test_df_features['Month'] = imputer.transform(test_df_features[['Month']]).ravel()

print(test_df_features['Month'].unique())
print(test_df_features['Month'].isna().sum())

Label encoding the month columns

In [None]:
from sklearn.preprocessing import LabelEncoder

label_encoder = LabelEncoder()

# Transform the datasets
train_df_features['Month'] = label_encoder.fit_transform(train_df_features['Month'])
test_df_features['Month'] = label_encoder.fit_transform(test_df_features['Month'])

train_df_features.head()


Applying standard scaling to the datasets

In [None]:
from sklearn.preprocessing import StandardScaler

sc = StandardScaler()


train_df_features.iloc[:,:-2] = sc.fit_transform(train_df_features.iloc[:,:-2])
test_df_features.iloc[:,:-2] = sc.fit_transform(test_df_features.iloc[:,:-2])

train_df_features = train_df_features.iloc[:,:-2]
test_df_features = test_df_features.iloc[:,:-2]

test_df_features.head()

Converting the dataframes to pytorch tensors

In [None]:
# train features and labels
train_features = train_df_features.values
train_labels = train_df_labels.values

# test features and labels
test_features = test_df_features.values
test_labels = test_df_labels.values

# print(train_df_features[].dtypes)
print(train_df_labels.dtypes)


train_features_tensor = torch.tensor(train_features, dtype=torch.float32)
train_labels_tensor = torch.tensor(train_labels, dtype=torch.int)

test_features_tensor = torch.tensor(test_features, dtype=torch.float32)
test_labels_tensor = torch.tensor(test_labels, dtype=torch.int)

train_labels_tensor.shape

checking data ranges and ensuring no null values in tensors

In [None]:
print(torch.isnan(test_features_tensor).sum())  # Count NaN values
print(torch.isinf(test_features_tensor).sum())  # Count Inf values
print(test_features_tensor.max(), test_features_tensor.min())  # Check for extreme values

In [None]:
print(torch.unique(train_labels_tensor))

Building the autoencoder

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

import time
import psutil
import os
import torch.cuda

# First, check the actual input dimension
actual_input_dim = train_features_tensor.shape[1]  # Should be 57 based on the error
print(f"Actual input dimension: {actual_input_dim}")

# Define the Autoencoder model with correct input dimension
class DeepAutoencoder(nn.Module):
    def __init__(self, input_dim):
        super(DeepAutoencoder, self).__init__()
        
        # Encoder
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, 50),
            nn.BatchNorm1d(50),
            nn.ReLU(),
            nn.Linear(50, 30),
            nn.BatchNorm1d(30),
            nn.ReLU(),
            nn.Linear(30, 60),
            nn.BatchNorm1d(60),
            nn.ReLU(),
            nn.Linear(60, 12)  # Latent layer
        )
        
        # Decoder
        self.decoder = nn.Sequential(
            nn.Linear(12, 60),
            nn.BatchNorm1d(60),
            nn.ReLU(),
            nn.Linear(60, 30),
            nn.BatchNorm1d(30),
            nn.ReLU(),
            nn.Linear(30, 50),
            nn.BatchNorm1d(50),
            nn.ReLU(),
            nn.Linear(50, input_dim),
            nn.Sigmoid()  # Output layer with Sigmoid
        )

    def forward(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded

# Hyperparameters
learning_rate = 0.001
epochs = 30
batch_size = 20

# Instantiate the model with the correct input dimension
model = DeepAutoencoder(actual_input_dim)
criterion = nn.MSELoss()  # Mean Squared Error as loss
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Prepare data
train_loader = DataLoader(TensorDataset(train_features_tensor), batch_size=batch_size, shuffle=True)

# Optional: warm-up or dummy pass (to trigger memory allocation)
with torch.no_grad():
    dummy_input = next(iter(train_loader))[0]
    _ = model(dummy_input)


# Now start timer and memory tracking
process = psutil.Process(os.getpid())
start_time = time.time()
start_memory = process.memory_info().rss / (1024 ** 2)  # MB
if torch.cuda.is_available():
    torch.cuda.reset_peak_memory_stats()

epoch_losses = []  # <- Add this before the training loop
total_memory = 0

# Training loop
for epoch in range(epochs):
    running_loss = 0.0
    for batch in train_loader:
        inputs, = batch  # Unpack tuple from DataLoader
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, inputs)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
    
    avg_loss = running_loss / len(train_loader)
    epoch_losses.append(avg_loss)  # <- Store the average loss for this epoch
    print(f"Epoch [{epoch+1}/{epochs}], Loss: {avg_loss:.4f}")
    current_memory = process.memory_info().rss / (1024 ** 2)
    total_memory += current_memory
    
#  End Timer and memory tracking
end_time = time.time()
total_time = end_time - start_time

print(f"\nTotal training time: {total_time:.2f} seconds")
print(f"CPU memory used: {total_memory:.2f} MB")

# If using GPU
if torch.cuda.is_available():
    gpu_memory = torch.cuda.max_memory_allocated() / (1024 ** 2)
    print(f"Peak GPU memory allocated: {gpu_memory:.2f} MB")

# After training, keep only the encoder for feature extraction
trained_encoder = model.encoder

Extract latent features from the hidden layer

In [None]:
import numpy as np

# Extract latent features using the trained encoder
def extract_latent_features(encoder, dataloader):
    encoder.eval()  # Set encoder to evaluation mode
    latent_features = []

    with torch.no_grad():  # Disable gradient calculation
        for batch in dataloader:
            inputs, = batch  
            latent = encoder(inputs)  
            latent_features.append(latent.cpu().numpy())

    return np.concatenate(latent_features, axis=0)

# Create a DataLoader for the train dataset
train_full_loader = DataLoader(TensorDataset(train_features_tensor), batch_size=batch_size, shuffle=False)
# DataLoader for the test dataset
test_full_loader = DataLoader(TensorDataset(test_features_tensor), batch_size=batch_size, shuffle=False) 

# Extract latent features from the encoder
train_latent_features = extract_latent_features(trained_encoder, train_full_loader)
test_latent_features = extract_latent_features(trained_encoder, test_full_loader)
# Now latent_features contains the compressed representations of the data
print("Latent train features shape:", train_latent_features.shape)
print("Latent test features shape:", test_latent_features.shape)

print("latent train labels shape:", train_labels.shape)

Building the stacked ensemble

In [None]:
from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegression, SGDClassifier
from sklearn.model_selection import train_test_split
from catboost import CatBoostClassifier
from sklearn.metrics import classification_report, roc_curve, auc
import numpy as np


# Use latent features and their corresponding labels
X_train_latent = train_latent_features
X_test_latent = test_latent_features
y_train_latent = train_labels
y_test_latent = test_labels

print(y_train_latent.shape)

# Base Classifiers
base_clf1 = SVC(probability=True, random_state=42)  # Support Vector Machine
base_clf2 = LogisticRegression(random_state=42)    # Logistic Regression
base_clf3 = SGDClassifier(loss="log_loss", random_state=42)  # Stochastic Gradient Descent with log loss

# Train the base classifiers on latent features
base_clf1.fit(X_train_latent, y_train_latent)
base_clf2.fit(X_train_latent, y_train_latent)
base_clf3.fit(X_train_latent, y_train_latent)

# Generate predictions (probabilities) for the meta-classifier
train_meta_features = np.column_stack([
    base_clf1.predict_proba(X_train_latent)[:, 1],
    base_clf2.predict_proba(X_train_latent)[:, 1],
    base_clf3.predict_proba(X_train_latent)[:, 1]
])

# Meta-Classifier: CatBoost
meta_clf = CatBoostClassifier(iterations=100, learning_rate=0.1, depth=4, random_state=42, verbose=0)
meta_clf.fit(train_meta_features, y_train_latent)

# Measure full ensemble inference time
start_time = time.time()

# Base classifiers: get test set probabilities (part of inference!)
test_meta_features = np.column_stack([
    base_clf1.predict_proba(X_test_latent)[:, 1],
    base_clf2.predict_proba(X_test_latent)[:, 1],
    base_clf3.predict_proba(X_test_latent)[:, 1]
])

# Meta-classifier makes final prediction
y_pred = meta_clf.predict(test_meta_features)
end_time = time.time()
inference_time = end_time - start_time
# inference_formatted = f"{(inference_time)*1000:.2f}"
inference_formatted = round(inference_time*1000,2)

average_time_per_sample = inference_formatted / len(test_meta_features)
# print(len(test_meta_features))

print(f"Inference time (total): {inference_formatted} Ms")
print(f"Average inference time per sample: {average_time_per_sample:.2f} Ms")



# Predictions and Evaluation
# y_pred = meta_clf.predict(test_meta_features)
print("Classification Report:\n", classification_report(y_test_latent, y_pred))

Final model evaluation metrics

In [None]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score

accuracy = accuracy_score(y_test_latent, y_pred)
precision = precision_score(y_test_latent, y_pred)
recall = recall_score(y_test_latent, y_pred)
f1 = f1_score(y_test_latent, y_pred)
roc_auc = roc_auc_score(y_test_latent, meta_clf.predict_proba(test_meta_features)[:, 1])

print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1-Score: {f1:.4f}")
print(f"ROC-AUC: {roc_auc:.4f}")

### Comparative Analysis

In [None]:
from catboost import train
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
from matplotlib.ticker import FuncFormatter



def show_float(x, pos):
    return f'{x:.2f}'  # Adjust .4f to control decimal places


# Metrics for comparison (sample values, replace with your actual results)
models = ['Autoencoder + Ensemble', 'ResNet + Ensemble']
accuracy = [0.9986, 0.9998]
f1_score = [0.9986, 0.9998]
training_times = [total_time, 127.75]  # seconds (replace with actual training time for ResNet)
inference_times = [inference_time, 555.62]   # seconds

# Set seaborn style
sns.set(style="whitegrid")

# Bar Chart: Accuracy, F1, Training, Inference Time
fig, axes = plt.subplots(2, 2, figsize=(12, 8))
metrics = [accuracy, f1_score, training_times ,inference_times]
titles = ['Accuracy', 'F1 Score', 'Training Time (s)', 'Inference Time (Ms)']
colors = ['skyblue', 'salmon']

for ax, metric, title in zip(axes.flatten(), metrics, titles):
    sns.barplot(x=models, y=metric, palette=colors, ax=ax)
    ax.set_title(title)
    ax.set_ylim(0, max(metric)*1.2)

plt.tight_layout()
plt.show()

# Line Plot: Loss per Epoch (Replace with your actual epoch loss values)
epochs = list(range(1, 31))
loss_autoencoder = epoch_losses
# loss_resnet = [0.48, 0.40, 0.34, 0.29, 0.25, 0.22, 0.20, 0.19, 0.18, 0.17]

plt.figure(figsize=(8, 5))
plt.plot(epochs, loss_autoencoder, label='Autoencoder', marker='o')
# plt.plot(epochs, loss_resnet, label='ResNet', marker='s')
plt.xlabel('Epoch')
plt.ylabel('Training Loss')
plt.title('Loss Convergence per Epoch')
plt.legend()
plt.grid(True)
plt.show()


Plotting ROC curve

In [None]:
import matplotlib.pyplot as plt

fpr, tpr, _ = roc_curve(y_test_latent, meta_clf.predict_proba(test_meta_features)[:, 1])
plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, label=f"ROC Curve (AUC = {roc_auc:.4f})")
plt.plot([0, 1], [0, 1], linestyle='--', color='gray')
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("Receiver Operating Characteristic (ROC) Curve")
plt.legend()
plt.grid()
plt.show()