In [None]:
# ============================================================
# ENVIRONMENT VALIDATION & REPRODUCIBILITY CONTROL
# ============================================================

# Display Python version for experiment traceability
!python --version 
# ensure python 3.9.12 for similar results as ours is used thruought the environment

# For  reproducibility:
# Recommended stable versions (TensorFlow 2.14 + TF Privacy 0.9)
# Uncomment only if environment setup is required

# !pip uninstall tensorflow -y
# !pip uninstall tensorflow-privacy -y
# !pip install tensorflow==2.14.0
# !pip install tensorflow-privacy==0.9.0
# !pip install deap imbalanced-learn seaborn

# ------------------------------------------------------------
# Set deterministic seeds for full reproducibility
# ------------------------------------------------------------
import os
import random
import numpy as np
import tensorflow as tf

SEED = 42
os.environ['PYTHONHASHSEED'] = str(SEED)
random.seed(SEED)
np.random.seed(SEED)
tf.random.set_seed(SEED)

print("Deterministic seed set to:", SEED)


In [None]:
# ============================================================
# LIBRARY IMPORTS
# ============================================================

# ---------------------------
# Standard Library
# ---------------------------
import warnings
warnings.filterwarnings("ignore", category=UserWarning)

# ---------------------------
# Data & Numerical
# ---------------------------
import pandas as pd
import numpy as np

# ---------------------------
# Visualization
# ---------------------------
import matplotlib.pyplot as plt
import seaborn as sns

# ---------------------------
# performance Machine Learning
# ---------------------------
from sklearn.model_selection import StratifiedKFold, train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import (
    accuracy_score,
    precision_score,
    recall_score,
    f1_score,
    confusion_matrix,
    precision_recall_curve,
    auc
)

# ---------------------------
# Deep Learning
# ---------------------------
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.utils import to_categorical

# ---------------------------
# Differential Privacy
# ---------------------------
import tensorflow_privacy
from tensorflow_privacy.privacy.analysis.compute_dp_sgd_privacy import compute_dp_sgd_privacy

# ---------------------------
# Genetic Algorithm
# ---------------------------
from deap import base, creator, tools


In [None]:
# ============================================================
# DATA LOADING
# ============================================================

# IMPORTANT:
# Replace with relative path for reproducibility 
DATA_PATH = "BotNeTIoT-L01_label_NoDuplicates.csv"

data = pd.read_csv(DATA_PATH, index_col=0)

target_column = "label"

print("Unique Labels:", data[target_column].unique())
print("Number of Classes:", data[target_column].nunique())

# ------------------------------------------------------------
# Feature / Target Split
# ------------------------------------------------------------
X = data.drop(columns=[target_column])
y = data[target_column]

# ------------------------------------------------------------
# Train / Validation / Test Split
# 60% train | 15% validation | 25% test
# ------------------------------------------------------------
X_train, X_temp, y_train, y_temp = train_test_split(
    X, y, test_size=0.25, stratify=y, random_state=SEED
)

X_valid, X_test, y_valid, y_test = train_test_split(
    X_temp, y_temp, test_size=0.4, stratify=y_temp, random_state=SEED
)

# ------------------------------------------------------------
# Standard Scaling (fit ONLY on training set)
# Prevents data leakage
# ------------------------------------------------------------
scaler = StandardScaler()

X_train_scaled = scaler.fit_transform(X_train)
X_valid_scaled = scaler.transform(X_valid)
X_test_scaled  = scaler.transform(X_test)

print("Data preprocessing complete.")


In [None]:
# ============================================================
# GENETIC ALGORITHM SEARCH SPACE DEFINITION
# ============================================================

# LSTM layer width candidates
LSTM_UNITS = [16, 32, 64, 128, 256]

# Fully connected layer width candidates
DENSE_UNITS = [16, 32, 64, 128, 256]

# Activation candidates (used in Dense layers)
ACTIVATIONS = ['relu', 'gelu', 'swish']

# Optimizer choice (NOTE: DP requires DP-SGD internally)
OPTIMIZERS = ['sgd']   # constrained due to DP implementation

# For softmax(2) output we use categorical crossentropy
LOSSES = ['categorical_crossentropy']

# Learning rate search grid
LEARNING_RATES = [1e-3, 1e-4]

# Batch size candidates
BATCH_SIZES = [64, 128, 256]

# Epoch search grid
EPOCHS = [20, 30, 50, 100]

# Architectural depth constraints
MAX_LSTM_LAYERS = 3
MAX_DENSE_LAYERS = 4

# Differential Privacy defaults
DEFAULT_NOISE_MULTIPLIER = 1.3
DELTA = 1e-5

# Toggle GA execution
gene_alg = True


In [None]:
# ============================================================
# RANDOM HYPERPARAMETER SAMPLER
# ============================================================

def create_hyperparameter_set_dp():
    """
    Randomly generates a valid hyperparameter configuration
    for DP-LSTM architecture search.
    """

    n_lstm = random.randint(1, MAX_LSTM_LAYERS)
    n_dense = random.randint(1, MAX_DENSE_LAYERS)

    total_layers = n_lstm + n_dense

    return {
        "lstm_units": [random.choice(LSTM_UNITS) for _ in range(n_lstm)],
        "dense_units": [random.choice(DENSE_UNITS) for _ in range(n_dense)],
        "dropouts": [random.uniform(0.1, 0.5) for _ in range(total_layers)],
        "activation": random.choice(ACTIVATIONS),
        "optimizer": "sgd",  # constrained for DP
        "losses": "categorical_crossentropy",
        "learning_rate": random.choice(LEARNING_RATES),
        "batch_size": random.choice(BATCH_SIZES),
        "epochs": random.choice(EPOCHS),
        "l2_norm_clip": random.uniform(0.5, 2.0),
        "noise_multiplier": random.uniform(0.8, 1.5)
    }


In [None]:
# ============================================================
# FITNESS FUNCTION: DP-LSTM WITH STRATIFIED 5-FOLD CV
# ============================================================

def evaluate_lstm_with_dp(hyperparams):
    """
    Evaluates a DP-LSTM configuration using 5-fold
    stratified cross-validation.

    Fitness Objective:
        Maximize (Mean F1 Score - Epsilon)

    This encourages privacy-utility trade-off optimization.
    """

    # ---------------------------
    # Extract Hyperparameters
    # ---------------------------
    lstm_units       = hyperparams["lstm_units"]
    dense_units      = hyperparams["dense_units"]
    dropouts         = hyperparams["dropouts"]
    activation       = hyperparams["activation"]
    learning_rate    = hyperparams["learning_rate"]
    batch_size       = hyperparams["batch_size"]
    epochs           = hyperparams["epochs"]
    l2_norm_clip     = hyperparams["l2_norm_clip"]
    noise_multiplier = hyperparams["noise_multiplier"]

    # ---------------------------
    # Reshape input for LSTM
    # Shape: (samples, timesteps=1, features)
    # ---------------------------
    X_seq = np.expand_dims(X_train_scaled, axis=1)

    kfold = StratifiedKFold(n_splits=5, shuffle=True, random_state=SEED)
    f1_scores = []

    for train_idx, val_idx in kfold.split(X_seq, y_train):

        # IMPORTANT: rebuild model each fold (prevents weight leakage)
        model = build_dp_lstm_model(
            input_dim=X_train_scaled.shape[1],
            lstm_units=lstm_units,
            dense_units=dense_units,
            dropouts=dropouts,
            learning_rate=learning_rate,
            l2_norm_clip=l2_norm_clip,
            noise_multiplier=noise_multiplier
        )

        X_tr, X_val = X_seq[train_idx], X_seq[val_idx]
        y_tr, y_val = y_train.iloc[train_idx], y_train.iloc[val_idx]

        y_tr_cat = tf.keras.utils.to_categorical(y_tr, 2)
        y_val_cat = tf.keras.utils.to_categorical(y_val, 2)

        model.fit(
            X_tr,
            y_tr_cat,
            epochs=epochs,
            batch_size=batch_size,
            verbose=0
        )

        y_pred = model.predict(X_val, verbose=0).argmax(axis=1)

        f1_scores.append(f1_score(y_val, y_pred))

    # ---------------------------
    # Privacy Accounting
    # ---------------------------
    epsilon, _ = compute_dp_sgd_privacy(
        n=len(X_train),
        batch_size=batch_size,
        noise_multiplier=noise_multiplier,
        epochs=epochs,
        delta=DELTA
    )

    # ---------------------------
    # GA Fitness Objective Fitness=α⋅F1−β⋅ε
    # ---------------------------
    fitness = np.mean(f1_scores) - epsilon

    return (fitness,)


In [None]:
# ============================================================
# FUNCTION: Build Differentially Private LSTM Model
# ============================================================

def build_dp_lstm_model(input_dim,
                        lstm_units,
                        dense_units,
                        dropouts,
                        learning_rate,
                        l2_norm_clip,
                        noise_multiplier):

    """
    Constructs a Differentially Private LSTM model.

    Parameters:
        input_dim: number of input features
        lstm_units: list of LSTM layer sizes
        dense_units: list of Dense layer sizes
        dropouts: list of dropout values per layer
        learning_rate: optimizer learning rate
        l2_norm_clip: gradient clipping threshold (DP parameter)
        noise_multiplier: Gaussian noise scale (DP parameter)

    Returns:
        Compiled Keras model
    """

    model = Sequential()

    # Reshape input to (samples, timesteps=1, features)
    model.add(LSTM(
        lstm_units[0],
        return_sequences=(len(lstm_units) > 1),
        input_shape=(1, input_dim)
    ))
    model.add(Dropout(dropouts[0]))

    # Additional LSTM layers
    for i in range(1, len(lstm_units)):
        model.add(LSTM(
            lstm_units[i],
            return_sequences=(i < len(lstm_units) - 1)
        ))
        model.add(Dropout(dropouts[i]))

    # Dense layers
    offset = len(lstm_units)
    for j, units in enumerate(dense_units):
        model.add(Dense(units, activation='relu'))
        model.add(Dropout(dropouts[offset + j]))

    # Output layer (Binary classification)
    model.add(Dense(2, activation='softmax'))

    # Differentially Private Optimizer
    optimizer = tensorflow_privacy.DPKerasSGDOptimizer(
        l2_norm_clip=l2_norm_clip,
        noise_multiplier=noise_multiplier,
        num_microbatches=1,
        learning_rate=learning_rate
    )

    model.compile(
        optimizer=optimizer,
        loss="categorical_crossentropy",
        metrics=["accuracy"]
    )

    return model


In [None]:
# ============================================================
# CROSS-VALIDATION (Correctly Reinitialize Model Each Fold)
# ============================================================

def cross_validate_model(params):

    X_seq = np.expand_dims(X_train_scaled, axis=1)

    kfold = StratifiedKFold(n_splits=5, shuffle=True, random_state=SEED)
    f1_scores = []

    for train_idx, val_idx in kfold.split(X_seq, y_train):

        model = build_dp_lstm_model(
            input_dim=X_train_scaled.shape[1],
            lstm_units=params["lstm_units"],
            dense_units=params["dense_units"],
            dropouts=params["dropouts"],
            learning_rate=params["learning_rate"],
            l2_norm_clip=params["l2_norm_clip"],
            noise_multiplier=params["noise_multiplier"]
        )

        X_tr, X_val = X_seq[train_idx], X_seq[val_idx]
        y_tr, y_val = y_train.iloc[train_idx], y_train.iloc[val_idx]

        y_tr_cat = to_categorical(y_tr, 2)
        y_val_cat = to_categorical(y_val, 2)

        model.fit(X_tr, y_tr_cat,
                  epochs=params["epochs"],
                  batch_size=params["batch_size"],
                  verbose=0)

        y_pred = model.predict(X_val, verbose=0).argmax(axis=1)

        f1_scores.append(f1_score(y_val, y_pred))

    return np.mean(f1_scores)


In [None]:
# ============================================================
# PRIVACY BUDGET COMPUTATION and Calculation
# ============================================================

def compute_privacy_budget(sample_size, batch_size,
                           noise_multiplier, epochs,
                           delta=1e-5):

    """
    Computes epsilon using analytical RDP moments accountant.
    """

    epsilon, _ = compute_dp_sgd_privacy(
        n=sample_size,
        batch_size=batch_size,
        noise_multiplier=noise_multiplier,
        epochs=epochs,
        delta=delta
    )

    return epsilon


In [None]:
# ============================================================
# FINAL MODEL TRAINING FALL back
# ============================================================

best_params = {
    "lstm_units": [64],
    "dense_units": [128],
    "dropouts": [0.3, 0.3],
    "learning_rate": 0.001,
    "l2_norm_clip": 1.0,
    "noise_multiplier": 1.2,
    "epochs": 30,
    "batch_size": 64
}

X_train_seq = np.expand_dims(X_train_scaled, axis=1)
X_test_seq  = np.expand_dims(X_test_scaled, axis=1)

model = build_dp_lstm_model(
    input_dim=X_train_scaled.shape[1],
    lstm_units=best_params["lstm_units"],
    dense_units=best_params["dense_units"],
    dropouts=best_params["dropouts"],
    learning_rate=best_params["learning_rate"],
    l2_norm_clip=best_params["l2_norm_clip"],
    noise_multiplier=best_params["noise_multiplier"]
)

y_train_cat = to_categorical(y_train, 2)
y_test_cat  = to_categorical(y_test, 2)

history = model.fit(
    X_train_seq, y_train_cat,
    epochs=best_params["epochs"],
    batch_size=best_params["batch_size"],
    validation_split=0.2,
    verbose=1
)

# ------------------------------------------------------------
# Evaluation
# ------------------------------------------------------------
y_pred_test = model.predict(X_test_seq).argmax(axis=1)

print("Accuracy:", accuracy_score(y_test, y_pred_test))
print("Precision:", precision_score(y_test, y_pred_test))
print("Recall:", recall_score(y_test, y_pred_test))
print("F1 Score:", f1_score(y_test, y_pred_test))

# ------------------------------------------------------------
# Privacy Report
# ------------------------------------------------------------
epsilon = compute_privacy_budget(
    sample_size=len(X_train),
    batch_size=best_params["batch_size"],
    noise_multiplier=best_params["noise_multiplier"],
    epochs=best_params["epochs"]
)

print(f"(ε, δ)-DP Guarantee: ε = {epsilon:.4f}, δ = 1e-5")
