# Modelos de Machine Learning para Detección de DDoS
Este notebook incluye el flujo completo desde la carga de datos, entrenamiento de modelos (Regresión Logística, XGBoost, GRU, LSTM, VAE + LSTM) y optimización con Optuna.

In [None]:
!pip install optuna

In [None]:
!pip install optuna-integration[tfkeras]

In [None]:
import os
from google.colab import drive
from pathlib import Path
import numpy as np
import pandas as pd
import seaborn as sns
from sklearn.preprocessing import LabelEncoder, StandardScaler, MinMaxScaler
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, classification_report
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense, LSTM, Dropout, BatchNormalization, GRU, Flatten
from tensorflow.keras.losses import BinaryCrossentropy
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import to_categorical
from tensorflow.keras import layers, regularizers, activations, backend as K
import xgboost as xgb
import optuna
from sklearn.linear_model import LogisticRegression
from optuna.integration import TFKerasPruningCallback
import matplotlib.pyplot as plt
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix

In [None]:
drive.mount('/content/drive')
!pwd

In [None]:
BASE_FOLDER = '/content/drive/MyDrive/TFM_Jesus_Morato/VAE-LSTM/'
os.makedirs(BASE_FOLDER, exist_ok=True)
os.chdir(BASE_FOLDER)
!pwd

In [None]:
final_columns = [' Source IP', ' Source Port', ' Destination IP', ' Destination Port',
       ' Protocol', 'Total Length of Fwd Packets', ' Fwd Packet Length Min',
       ' Bwd Packet Length Min', 'Flow Bytes/s', ' Flow Packets/s',
       ' Flow IAT Min', 'Bwd IAT Total', ' Bwd IAT Mean', 'Fwd PSH Flags',
       ' Bwd Header Length', 'Fwd Packets/s', ' Bwd Packets/s',
       ' Min Packet Length', ' Packet Length Mean', 'FIN Flag Count',
       ' SYN Flag Count', ' RST Flag Count', ' PSH Flag Count',
       ' ACK Flag Count', ' URG Flag Count', ' ECE Flag Count',
       ' Down/Up Ratio', ' Avg Fwd Segment Size', ' Avg Bwd Segment Size',
       'Init_Win_bytes_forward', ' Init_Win_bytes_backward',
       ' act_data_pkt_fwd', ' min_seg_size_forward', ' Active Std',
       ' Active Min', ' Idle Std', ' Idle Min']

## Modelos con el dataset Multiclase

En este apartado se evaluarán los distintos modelos de Machine Learning sobre el dataset multiclase, previamente procesado y guardado.

### Regresión Logística

Modelo de regresión variando el parámetro C de penalización

In [None]:
X = pd.read_csv('X_train_multiclass.csv')
y = pd.read_csv('y_train_multiclass.csv')

In [None]:
X = X[final_columns]

In [None]:
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

In [None]:
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, shuffle=True)

In [None]:
c_values = [0.0001, 0.001, 0.01, 0.1, 1, 10, 100, 1000]
results = []

for c in c_values:
    model = LogisticRegression(C=c, max_iter=1000)
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    precision = precision_score(y_test, y_pred, average='weighted')
    recall = recall_score(y_test, y_pred, average='weighted')
    f1 = f1_score(y_test, y_pred, average='weighted')
    results.append([c, accuracy, precision, recall, f1])

results_df = pd.DataFrame(results, columns=["C", "Accuracy", "Precision", "Recall", "F1-score"])
print(results_df)

### XGBoost

Búsqueda de hiperparámetros con Optuna para el modelo XGBoost, evaluación de métricas con los mejores parámetros encontrados.

In [None]:
X = pd.read_csv('X_train_multiclass.csv')
y = pd.read_csv('y_train_multiclass.csv')

In [None]:
X = X[final_columns]

In [None]:
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

In [None]:
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, shuffle=True)

X_train, X_valid, y_train, y_valid = train_test_split(
    X_train, y_train, test_size=0.2, random_state=42, shuffle=True)

In [None]:
# Definimos la función objetivo para la optimización de hiperparámetros
def objective(trial):
    params = {
        "objective": "multi:softmax",
        "num_class": 7,
        "eval_metric": "mlogloss",
        "n_estimators": trial.suggest_int("n_estimators", 50, 300),
        "learning_rate": trial.suggest_float("learning_rate", 0.01, 0.3),
        "max_depth": trial.suggest_int("max_depth", 3, 12),
        "subsample": trial.suggest_float("subsample", 0.5, 1.0),
        "colsample_bytree": trial.suggest_float("colsample_bytree", 0.5, 1.0),
        "min_child_weight": trial.suggest_int("min_child_weight", 1, 10),
        "gamma": trial.suggest_float("gamma", 0, 5),
        "reg_alpha": trial.suggest_float("reg_alpha", 0, 5),
        "reg_lambda": trial.suggest_float("reg_lambda", 0, 5)
    }

    model = xgb.XGBClassifier(**params, use_label_encoder=False)
    model.fit(X_train, y_train)
    y_pred = model.predict(X_valid)
    accuracy = accuracy_score(y_valid, y_pred)
    return accuracy

study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=30)

In [None]:
# Configuración de los mejores hiperparámetros encontrados
best_params = {
    'n_estimators': 172,
    'learning_rate': 0.12678245336468844,
    'max_depth': 11,
    'subsample': 0.7898323873624815,
    'colsample_bytree': 0.848728009323304,
    'min_child_weight': 10,
    'gamma': 0.7875794378542267,
    'reg_alpha': 0.5787478600366702,
    'reg_lambda': 1.1732832093441035,
    'objective': 'multi:softmax',
    'num_class': 7,
    'eval_metric': 'mlogloss',
    'use_label_encoder': False
}

model = xgb.XGBClassifier(**best_params)
model.fit(X_train, y_train)

# Predecimos el conjunto de prueba
y_pred = model.predict(X_test)

accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred, average='macro')  # Macro para equilibrar clases
recall = recall_score(y_test, y_pred, average='macro')
f1 = f1_score(y_test, y_pred, average='macro')

# Se imprimen los resultados del mejor modelo de XGBoost
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1-score: {f1:.4f}")


In [None]:
from optuna.visualization import plot_optimization_history, plot_param_importances, plot_parallel_coordinate

plot_optimization_history(study).show()

In [None]:
plot_param_importances(study).show()

In [None]:
best_model = xgb.XGBClassifier(**best_params, use_label_encoder=False)

best_model.fit(X_train, y_train)

best_model.save_model("xgboost.model")

In [None]:
xgb.plot_importance(best_model, max_num_features=20)
plt.show()

In [None]:
# Se obtiene por último ma matriz de confusión
conf_matrix = confusion_matrix(y_test, y_pred)

class_names = ['Benigno', 'Syn', 'LDAP', 'UDP', 'SSDP', 'MSSQL', 'DNS']

# Hacemos un gráfico de la matriz de confusión con seaborn
plt.figure(figsize=(10, 7))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', cbar=False,
            xticklabels=class_names, yticklabels=class_names)
plt.xlabel('Predicción')
plt.ylabel('Real')
plt.title('Matriz de Confusión')
plt.tight_layout()
plt.show()


## Series Models

A partir de este punto se entrenarán los modelos que analizan series temporales: LSTM, GRU y VAE + LSTM.


### LSTM

En este modelo se definirá de igual forma una función objetivo para que Optuna realice un análisis de las mejores configuraciones de la arquitectura.

In [None]:
X = pd.read_csv('X_train_multiclass.csv')
y = pd.read_csv('y_train_multiclass.csv')

In [None]:
std = StandardScaler()
X = std.fit_transform(X)

In [None]:
def create_windows(X, y, window_size=10):
    X_windows, y_windows = [], []
    for i in range(len(X) - window_size):
        X_windows.append(X[i:i+window_size])
        y_windows.append(y[i+window_size])  # Usamos la etiqueta del siguiente instante
    return np.array(X_windows), np.array(y_windows)

X_train_win, y_train_win = create_windows(X_train, y_train, window_size=10)
X_test_win, y_test_win = create_windows(X_test, y_test, window_size=10)

In [None]:
y_train_cat = to_categorical(y_train_win, num_classes=7)
y_test_cat = to_categorical(y_test_win, num_classes=7)

In [None]:
# Se define un conjunto para que no puedan repetirse las configuraciones
# de los hiperparámetros
tested_configs = set()

# Definimos la función objetivo para la optimización de hiperparámetros, los parámetros que pueden ser optimizados son:
# num_layers, units, dropout_rate y learning_rate
def objective(trial):
    global tested_configs

    num_layers = trial.suggest_int("num_layers", 1, 3)
    units = tuple(trial.suggest_categorical(f"units_{i}", [32, 64, 128]) for i in range(num_layers))
    dropout_rate = trial.suggest_categorical("dropout", [0.01, 0.05, 0.1, 0.2, 0.3])
    learning_rate = trial.suggest_categorical("learning_rate", [1e-4, 5e-4, 1e-3, 5e-3])

    config = (num_layers, units, dropout_rate, learning_rate)
    if config in tested_configs:
        return float("-inf")
    tested_configs.add(config)

    model = Sequential()
    for i in range(num_layers):
        model.add(LSTM(units=units[i],
                       return_sequences=i < num_layers - 1,
                       input_shape=(X_train_win.shape[1], X_train_win.shape[2]) if i == 0 else None))
    model.add(Dropout(dropout_rate))
    model.add(Dense(7, activation='softmax'))

    model.compile(optimizer=Adam(learning_rate=learning_rate),
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])

    pruning_cb = TFKerasPruningCallback(trial, "val_accuracy")

    model.fit(X_train_win, y_train_cat,
              validation_data=(X_test_win, y_test_cat),
              epochs=15, batch_size=1024, verbose=2,
              callbacks=[pruning_cb])

    y_pred = np.argmax(model.predict(X_test_win), axis=1)
    y_true = np.argmax(y_test_cat, axis=1)

    acc = accuracy_score(y_true, y_pred)

    if trial.should_prune():
        trial.set_user_attr("precision", np.nan)
        trial.set_user_attr("recall", np.nan)
        raise optuna.TrialPruned()

    trial.set_user_attr("precision", precision_score(y_true, y_pred, average='weighted'))
    trial.set_user_attr("recall", recall_score(y_true, y_pred, average='weighted'))

    return acc


In [None]:
# Nos vamos a quedar con los 5 entrenamientos principales de LTSM
# Realizamos 20 entrenamientos con diferentes configuraciones de hiperparámetros
study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=20)

top_trials = sorted(study.trials, key=lambda t: t.value, reverse=True)[:20]

results = []
for trial in top_trials:
    num_layers = trial.params["num_layers"]
    units = [trial.params[f"units_{i}"] for i in range(num_layers)]

    results.append({
        "Num Layers": num_layers,
        "LSTM Units": units,
        "Dropout": trial.params["dropout"],
        "Learning Rate": trial.params["learning_rate"],
        "Accuracy": trial.value,
        "Precision": trial.user_attrs["precision"],
        "Recall": trial.user_attrs["recall"]
    })

results_df_results = pd.DataFrame(results)

In [None]:
# Se muestran los resultados de los 5 mejores modelos en formato más legible para apuntar en la memoria
trial_data = []
for trial in top_trials:
    params = trial.params
    trial_data.append({
        "Trial": trial.number,
        "Accuracy": round(trial.values[0], 5),
        "Precision": round(trial.user_attrs.get("precision", 0), 5),
        "Recall": round(trial.user_attrs.get("recall", 0), 5),
        "Num Layers": params["num_layers"],
        "Units": [params.get(f"units_{i}", None) for i in range(params["num_layers"])],
        "Dropout": params["dropout"],
        "Learning Rate": params["learning_rate"],
    })

df_trials = pd.DataFrame(trial_data).sort_values(by="Accuracy", ascending=False)
print(df_trials)
df_trials.to_csv("optuna_trials_results.csv", index=False)


### GRU


In [None]:
X = pd.read_csv('X_train_multiclass.csv')
y = pd.read_csv('y_train_multiclass.csv')

In [None]:
std = StandardScaler()
X = std.fit_transform(X)

In [None]:
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, shuffle=False)

In [None]:
y_train = to_categorical(y_train, num_classes=7)
y_test = to_categorical(y_test, num_classes=7)

In [None]:
tested_configs = set()

def objective(trial):
    global tested_configs

    num_layers = trial.suggest_int("num_layers", 1, 3)
    fixed_units = [32, 64, 128]
    units = tuple(trial.suggest_categorical(f"units_{i}", fixed_units) for i in range(num_layers))

    dropout_rate = trial.suggest_categorical("dropout", [0.01, 0.05, 0.1, 0.2, 0.3])
    learning_rate = trial.suggest_categorical("learning_rate", [1e-4, 5e-4, 1e-3, 5e-3])

    config = (num_layers, units, dropout_rate, learning_rate)
    if config in tested_configs:
        return float("-inf")
    tested_configs.add(config)

    model = Sequential()
    for i in range(num_layers):
        return_sequences = i < num_layers - 1
        model.add(GRU(units=units[i], return_sequences=return_sequences,
                      input_shape=(X_train.shape[1], X_train.shape[2]) if i == 0 else None))
    model.add(Dropout(dropout_rate))
    model.add(Dense(units=7, activation='softmax'))

    model.compile(optimizer=Adam(learning_rate=learning_rate),
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])

    pruning_callback = TFKerasPruningCallback(trial, "val_accuracy")
    model.fit(X_train, y_train,
              validation_data=(X_test, y_test),
              epochs=15, batch_size=512,
              verbose=2, callbacks=[pruning_callback])

    y_pred = np.argmax(model.predict(X_test), axis=1)
    y_true = np.argmax(y_test, axis=1)
    accuracy = accuracy_score(y_true, y_pred)

    if trial.should_prune():
        trial.set_user_attr("precision", np.nan)
        trial.set_user_attr("recall", np.nan)
        raise optuna.TrialPruned()

    precision = precision_score(y_true, y_pred, average='weighted')
    recall = recall_score(y_true, y_pred, average='weighted')
    trial.set_user_attr("precision", precision)
    trial.set_user_attr("recall", recall)

    return accuracy

In [None]:
study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=20)

top_trials = sorted(study.trials, key=lambda t: t.value, reverse=True)[:20]

results = []
for trial in top_trials:
    num_layers = trial.params["num_layers"]
    units = [trial.params[f"units_{i}"] for i in range(num_layers)]

    results.append({
        "Num Layers": num_layers,
        "GRU Units": units,
        "Dropout": trial.params["dropout"],
        "Learning Rate": trial.params["learning_rate"],
        "Accuracy": trial.value,
        "Precision": trial.user_attrs["precision"],
        "Recall": trial.user_attrs["recall"]
    })

results_df = pd.DataFrame(results)
print(results_df)

In [None]:
trial_data = []
for trial in top_trials:
    params = trial.params
    trial_data.append({
        "Trial": trial.number,
        "Accuracy": round(trial.values[0], 5),
        "Precision": round(trial.user_attrs.get("precision", 0), 5),
        "Recall": round(trial.user_attrs.get("recall", 0), 5),
        "Num Layers": params["num_layers"],
        "Units": [params.get(f"units_{i}", None) for i in range(params["num_layers"])],
        "Dropout": params["dropout"],
        "Learning Rate": params["learning_rate"],
    })

df_trials = pd.DataFrame(trial_data).sort_values(by="Accuracy", ascending=False)
df_trials.to_csv("optuna_gru_trials_results.csv", index=False)

### Ejecuciones Binarias


In [None]:
X = pd.read_csv('X_train_binary_v2.csv')
y = pd.read_csv('y_train_binary_v2.csv')

In [None]:
X = X[final_columns]

In [None]:
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

In [None]:
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.3, random_state=42, shuffle=True)

Regresión logística


In [None]:
# Evaluamos el modelo con C=1 que fueron los mejores resultados del dataset multiclase
model = LogisticRegression(C=1, max_iter=1000)
model.fit(X_train, y_train)
y_pred = model.predict(X_test)

accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred, average='weighted')
recall = recall_score(y_test, y_pred, average='weighted')
f1 = f1_score(y_test, y_pred, average='weighted')

print(f"Accuracy: {accuracy:.5f}")
print(f"Precision: {precision:.5f}")
print(f"Recall: {recall:.5f}")
print(f"F1-score: {f1:.5f}")

results_df = pd.DataFrame({
    "C": [1],
    "Accuracy": [accuracy],
    "Precision": [precision],
    "Recall": [recall],
    "F1-score": [f1]
})

print(results_df)

#### LSTM Y GRU

En este caso se hizo una función genérica para poder evaluar ambos modelos con la mejor configuración de hiperparámetros obtenida de la anterior sección.

In [None]:
# Definimos la función para evaluar el modelo (tanto LSTM como GRU)
def evaluate(model_type, params):
    model = Sequential()
    for i in range(params["num_layers"]):
        return_seq = i < params["num_layers"] - 1
        layer = LSTM if model_type == "LSTM" else GRU
        model.add(layer(params["units"][i], return_sequences=return_seq, input_shape=(X_train.shape[1], X_train.shape[2]) if i == 0 else None))
    model.add(Dropout(params["dropout"]))
    model.add(Dense(2, activation="softmax"))

    model.compile(optimizer=Adam(learning_rate=params.get("learning_rate", 0.001)), loss="categorical_crossentropy", metrics=["accuracy"])
    model.fit(X_train, y_train, epochs=15, batch_size=512, validation_split=0.2, verbose=2)

    y_pred = np.argmax(model.predict(X_test), axis=1)
    y_true = np.argmax(y_test, axis=1)

    return {
        "Accuracy": accuracy_score(y_true, y_pred),
        "Precision": precision_score(y_true, y_pred, average="weighted"),
        "Recall": recall_score(y_true, y_pred, average="weighted"),
        "F1-score": f1_score(y_true, y_pred, average="weighted")
    }

In [None]:
# Configuraciones óptimas
lstm_params = {
    "num_layers": 2,
    "units": [32, 128],
    "dropout": 0.2
}

gru_params = {
    "num_layers": 3,
    "units": [32, 128, 128],
    "dropout": 0.2,
    "learning_rate": 0.005
}

Evaluamos los resultados del mejor modelo de LSTM para el dataset multiclase en el dataset binarizado


In [None]:
# Evaluación
print("Resultados LSTM:")
print(evaluate("LSTM", lstm_params))

Evaluamos los resultados del mejor modelo de GRU para el dataset multiclase en el dataset binarizado

In [None]:
print("Resultados GRU:")
print(evaluate("GRU", gru_params))

In [None]:
print(f"Accuracy: {accuracy:.5f}")
print(f"Precision: {precision:.5f}")
print(f"Recall: {recall:.5f}")
print(f"F1-score: {f1:.5f}")

#### XGBoost

Comprobamos los resultados del XGBoost con el dataset binarizado

In [None]:
# Configuración de los mejores hiperparámetros encontrados
best_params = {
    'n_estimators': 172,
    'learning_rate': 0.12678245336468844,
    'max_depth': 11,
    'subsample': 0.7898323873624815,
    'colsample_bytree': 0.848728009323304,
    'min_child_weight': 10,
    'gamma': 0.7875794378542267,
    'reg_alpha': 0.5787478600366702,
    'reg_lambda': 1.1732832093441035,
    'objective': 'multi:softmax',
    'num_class': 7,
    'eval_metric': 'mlogloss',
    'use_label_encoder': False
}

model = xgb.XGBClassifier(**best_params)
model.fit(X_train, y_train)

# Predecimos el conjunto de prueba
y_pred = model.predict(X_test)

accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred, average='macro')  # Macro para equilibrar clases
recall = recall_score(y_test, y_pred, average='macro')
f1 = f1_score(y_test, y_pred, average='macro')

# Se imprimen los resultados del mejor modelo de XGBoost
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1-score: {f1:.4f}")


#### VAE + LSTM

Se va a evaluar el modelo propuesto VAE + LSTM para detección de anomalías

In [None]:
X = pd.read_csv('X_train_binary_v2.csv')
y = pd.read_csv('y_train_binary_v2.csv')

Lo primero se va a explorar el efecto de la dimensión del espacio latente en el rendimiento del VAE + LSTM

In [None]:

def sampling(args, latent_dim):
    z_mean, z_log_var = args
    epsilon = K.random_normal(shape=(K.shape(z_mean)[0], latent_dim))
    return z_mean + K.exp(0.5 * z_log_var) * epsilon

def train_and_evaluate_vae_latent_dim(X, y, latent_dims, percentile=96, epochs=50, batch_size=64):
    results = []

    for latent_dim in latent_dims:
        print(f"\nEvaluando VAE con latent_dim = {latent_dim}")

        # Dividimos el dataset en muestras benignas y ataques DDoS
        X_benign = X[y == 0]
        X_attack = X[y == 1]
        X_benign_train, X_benign_test = train_test_split(X_benign, test_size=0.3, random_state=42)
        X_test_combined = np.vstack([X_benign_test, X_attack])
        y_test_combined = np.hstack([np.zeros(len(X_benign_test)), np.ones(len(X_attack))])

        # Escalamos los datos
        scaler = StandardScaler()
        X_benign_train_scaled = scaler.fit_transform(X_benign_train)
        X_test_scaled = scaler.transform(X_test_combined)

        timesteps = 1
        features = X.shape[1]
        X_benign_train_reshaped = X_benign_train_scaled.reshape((-1, timesteps, features))
        X_test_reshaped = X_test_scaled.reshape((-1, timesteps, features))

        # VAE + LSTM
        inputs = tf.keras.Input(shape=(timesteps, features))
        x = layers.LSTM(64)(inputs)
        z_mean = layers.Dense(latent_dim)(x)
        z_log_var = layers.Dense(latent_dim)(x)
        z = layers.Lambda(lambda args: sampling(args, latent_dim))([z_mean, z_log_var])

        decoder_input = layers.RepeatVector(timesteps)(z)
        decoder_lstm = layers.LSTM(64, return_sequences=True)(decoder_input)
        decoder_output = layers.TimeDistributed(layers.Dense(features))(decoder_lstm)

        vae = Model(inputs, decoder_output)

        # Definición de la función de pérdidas
        reconstruction_loss = tf.keras.losses.mse(inputs, decoder_output)
        reconstruction_loss = tf.reduce_mean(reconstruction_loss)

        kl_loss = -0.5 * tf.reduce_mean(1 + z_log_var - K.square(z_mean) - K.exp(z_log_var))
        vae_loss = reconstruction_loss + kl_loss
        vae.add_loss(vae_loss)
        vae.compile(optimizer='adam')

        vae.fit(X_benign_train_reshaped, X_benign_train_reshaped, epochs=epochs, batch_size=batch_size, validation_split=0.2, verbose=0)

        X_reconstructed = vae.predict(X_test_reshaped, verbose=0)
        reconstruction_error = np.mean((X_test_reshaped - X_reconstructed) ** 2, axis=(1, 2))

        threshold = np.percentile(reconstruction_error[:len(X_benign_test)], percentile)
        y_pred = (reconstruction_error > threshold).astype(int)

        acc = accuracy_score(y_test_combined, y_pred)
        prec = precision_score(y_test_combined, y_pred)
        rec = recall_score(y_test_combined, y_pred)
        f1 = f1_score(y_test_combined, y_pred)

        results.append({
            "latent_dim": latent_dim,
            "threshold": threshold,
            "accuracy": acc,
            "precision": prec,
            "recall": rec,
            "f1": f1
        })

    return pd.DataFrame(results), reconstruction_error, X_benign_test, y_test_combined

In [None]:
latent_dims_to_test = [4, 8, 16, 32, 64, 128, 256]
results_df, reconstruction_error, X_benign_test, y_test_combined = train_and_evaluate_vae_latent_dim(X, y, latent_dims_to_test, percentile=96, epochs=50)

print(results_df)


A continuación vemos que percentil es el mejor para el rendimiento del modelo

In [None]:
percentiles = np.arange(80, 100, 1)
results = []

for p in percentiles:
    threshold = np.percentile(reconstruction_error[:len(X_benign_test)], p)
    y_pred = (reconstruction_error > threshold).astype(int)

    accuracy = accuracy_score(y_test_combined, y_pred)
    precision = precision_score(y_test_combined, y_pred)
    recall = recall_score(y_test_combined, y_pred)
    f1 = f1_score(y_test_combined, y_pred)

    results.append({
        "percentile": p,
        "threshold": threshold,
        "accuracy": accuracy,
        "precision": precision,
        "recall": recall,
        "f1": f1
    })

results_df = pd.DataFrame(results)
print(results_df.sort_values(by="f1", ascending=False))

In [None]:
import matplotlib.pyplot as plt

# Gráfica para ver la evaluación del F1-score en función del percentil

highlight = results_df[results_df['percentile'] == 96].iloc[0]

# Plot normal
plt.figure(figsize=(8, 5))
plt.plot(results_df['percentile'], results_df['f1'], marker='o', color='green', label='F1-score')

# Añadimos punto resaltado del percentil 96
plt.scatter(highlight['percentile'], highlight['f1'], color='red', s=100, label='Percentil 96')
plt.text(highlight['percentile']+0.7, highlight['f1'],
         f"96 → {highlight['f1']:.4f}", color='red')

plt.xlabel('Percentil')
plt.ylabel('F1-score')
plt.title('F1-score vs Percentil')
plt.grid(True)
plt.legend()
plt.show()
