
# Tesis — Hybrid **CNN–LSTM** para DDoS Detection

Este notebook es una plantilla **DL** para tu tesis. Incluye:
- **Construcción de secuencias** a partir de un CSV tabular (ventaneo temporal simple).
- **Modelo híbrido CNN–LSTM** en Keras.
- **Entrenamiento** con `EarlyStopping` y `ReduceLROnPlateau`.
- **Curvas de aprendizaje** desde el `history`.
- **Calibración** de probabilidades con **Temperature Scaling** (post-hoc).
- **Confiabilidad** (reliability curve), **Brier Score**, **ECE**.
- **Interpretabilidad** con **Integrated Gradients** (y SHAP Deep opcional si está disponible).



## 0) Requisitos

```bash
pip install tensorflow scikit-learn matplotlib numpy pandas
# Opcional para interpretabilidad
pip install shap
```


In [None]:

# === Configuración inicial ===
from pathlib import Path

DATA_PATH  = Path("/mnt/data/tu_dataset.csv")   # CSV tabular base
TARGET_COL = "label"                             # 0/1
FEATURES   = None   # Si None, se auto-infiere numéricas. O define lista: ["f1","f2",...]
SEQ_LEN    = 20      # largo de ventana
SEQ_STRIDE = 5       # salto entre ventanas
GROUP_BY   = None    # si tienes un ID de flujo/sesión, ponlo aquí. Si None, se usa orden temporal global

VAL_SIZE   = 0.2
TEST_SIZE  = 0.2
RANDOM_STATE = 42

print("Ruta de datos:", DATA_PATH)



## 1) Carga y construcción de secuencias

Estrategia simple y general:
1. Ordenamos el dataset por tiempo si existe columna de tiempo (si no, por índice).
2. Seleccionamos **FEATURES** numéricas.
3. Construimos ventanas deslizantes (`SEQ_LEN`, `SEQ_STRIDE`) sobre las filas.
4. El **label** de cada ventana será el de la última fila de la ventana (puedes ajustar a mayoría/voto si prefieres).


In [None]:

import pandas as pd
import numpy as np

df = pd.read_csv(DATA_PATH)

if TARGET_COL not in df.columns:
    raise ValueError(f"No encuentro '{TARGET_COL}'. Columnas disponibles: {list(df.columns)[:20]} ...")

# Orden temporal (ajusta si tienes 'timestamp')
time_col = None
for c in df.columns:
    if "time" in c.lower() or "timestamp" in c.lower():
        time_col = c; break

if time_col:
    df = df.sort_values(by=time_col).reset_index(drop=True)
else:
    df = df.reset_index(drop=True)

# Selección de features
if FEATURES is None:
    num_cols = df.select_dtypes(include=[np.number]).columns.tolist()
    FEATURES = [c for c in num_cols if c != TARGET_COL]

X_tab = df[FEATURES].astype(float).values
y_all = df[TARGET_COL].astype(int).values

print("Total filas:", len(df), "| Features:", len(FEATURES))


In [None]:

# Función de ventaneo
def build_sequences(X, y, seq_len=20, stride=5):
    xs, ys = [], []
    for start in range(0, len(X) - seq_len + 1, stride):
        end = start + seq_len
        xs.append(X[start:end])
        ys.append(y[end-1])  # etiqueta de la última fila
    return np.array(xs), np.array(ys)

X_seq, y_seq = build_sequences(X_tab, y_all, SEQ_LEN, SEQ_STRIDE)
print("Secuencias:", X_seq.shape, "| Labels:", y_seq.shape)



## 2) Partición Train/Val/Test


In [None]:

from sklearn.model_selection import train_test_split

# Primero separamos test del final (temporalmente consistente si venimos ordenados)
X_temp, X_test, y_temp, y_test = train_test_split(
    X_seq, y_seq, test_size=TEST_SIZE, random_state=RANDOM_STATE, stratify=y_seq
)

# Luego validación
X_train, X_val, y_train, y_val = train_test_split(
    X_temp, y_temp, test_size=VAL_SIZE/(1-TEST_SIZE),
    random_state=RANDOM_STATE, stratify=y_temp
)

print("Train:", X_train.shape, "Val:", X_val.shape, "Test:", X_test.shape)
class_ratio = y_train.mean()
print(f"Positives ratio (train): {class_ratio:.4f}")



## 3) Modelo Hybrid CNN–LSTM (Keras)
Arquitectura típica para secuencias numéricas: `Conv1D → BatchNorm → ReLU → LSTM → Dense`.


In [None]:

import tensorflow as tf
from tensorflow.keras import layers, models

tf.random.set_seed(RANDOM_STATE)

n_features = X_train.shape[-1]

inputs = layers.Input(shape=(X_train.shape[1], n_features))
x = layers.Conv1D(filters=64, kernel_size=3, padding="same")(inputs)
x = layers.BatchNormalization()(x)
x = layers.ReLU()(x)
x = layers.Conv1D(filters=64, kernel_size=3, padding="same")(x)
x = layers.BatchNormalization()(x)
x = layers.ReLU()(x)
x = layers.LSTM(64, return_sequences=False)(x)
x = layers.Dropout(0.3)(x)
outputs = layers.Dense(1, activation="sigmoid")(x)

model = models.Model(inputs, outputs)
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3),
              loss="binary_crossentropy",
              metrics=[tf.keras.metrics.AUC(name="auc"), tf.keras.metrics.Precision(name="precision"),
                       tf.keras.metrics.Recall(name="recall")])
model.summary()



## 4) Entrenamiento con callbacks y class weights


In [None]:

from sklearn.utils import class_weight
import numpy as np

# Class weights (útil si está desbalanceado)
cw = class_weight.compute_class_weight(
    class_weight="balanced",
    classes=np.unique(y_train),
    y=y_train
)
class_weights = {0: cw[0], 1: cw[1]}
class_weights


In [None]:

callbacks = [
    tf.keras.callbacks.EarlyStopping(monitor="val_auc", mode="max", patience=10, restore_best_weights=True),
    tf.keras.callbacks.ReduceLROnPlateau(monitor="val_auc", mode="max", factor=0.5, patience=5, min_lr=1e-5)
]

history = model.fit(
    X_train, y_train,
    validation_data=(X_val, y_val),
    epochs=60,
    batch_size=256,
    class_weight=class_weights,
    callbacks=callbacks,
    verbose=1
)



## 5) Curvas de aprendizaje (history)


In [None]:

import matplotlib.pyplot as plt

def plot_history(hist, keys=("loss","auc")):
    for k in keys:
        fig = plt.figure()
        plt.plot(hist.history[k])
        plt.plot(hist.history.get("val_"+k, []))
        plt.xlabel("Epoch"); plt.ylabel(k)
        plt.title(k)
        plt.legend(["train", "val"])
        plt.tight_layout()
        plt.show()

plot_history(history, keys=("loss","auc","precision","recall"))



## 6) Evaluación en Test y calibración (Temperature Scaling)


In [None]:

from sklearn.metrics import roc_auc_score, average_precision_score, classification_report, confusion_matrix, brier_score_loss
import numpy as np

proba_test = model.predict(X_test).ravel()
roc_auc = roc_auc_score(y_test, proba_test)
pr_auc  = average_precision_score(y_test, proba_test)
print(f"Test ROC AUC: {roc_auc:.4f} | PR AUC: {pr_auc:.4f}")

pred_test = (proba_test >= 0.5).astype(int)
print("\nClassification report:\n", classification_report(y_test, pred_test, digits=4))

cm = confusion_matrix(y_test, pred_test)
fig = plt.figure(figsize=(4,4))
plt.imshow(cm, interpolation="nearest")
plt.title("Matriz de confusión — Test")
plt.colorbar()
plt.xticks([0,1],[0,1]); plt.yticks([0,1],[0,1])
plt.xlabel("Predicho"); plt.ylabel("Real")
for i in range(cm.shape[0]):
    for j in range(cm.shape[1]):
        plt.text(j,i,cm[i,j],ha="center",va="center")
plt.tight_layout(); plt.show()

brier = brier_score_loss(y_test, proba_test)
print("Brier (sin calibrar):", brier)


In [None]:

# Temperature Scaling (optimiza T en validación para minimizar NLL)
import tensorflow as tf

val_logits = tf.math.log(model.predict(X_val).ravel() / (1 - model.predict(X_val).ravel() + 1e-12) + 1e-12)
test_logits = tf.math.log(proba_test / (1 - proba_test + 1e-12) + 1e-12)

T = tf.Variable(1.0, dtype=tf.float32)
optimizer = tf.keras.optimizers.Adam(learning_rate=0.05)

def nll_with_temperature(T):
    logits = val_logits / T
    return tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(labels=y_val.astype("float32"), logits=logits))

for _ in range(200):
    with tf.GradientTape() as tape:
        loss = nll_with_temperature(T)
    grads = tape.gradient(loss, [T])
    optimizer.apply_gradients(zip(grads, [T]))

T_opt = float(T.numpy())
print("Temperatura óptima:", T_opt)

# Calibrar test
cal_proba_test = tf.sigmoid(test_logits / T_opt).numpy()



## 7) Confiabilidad, Brier y ECE (antes vs después)


In [None]:

from sklearn.calibration import calibration_curve

def expected_calibration_error(y_true, y_prob, n_bins=15):
    y_true = np.asarray(y_true); y_prob = np.asarray(y_prob)
    bins = np.linspace(0,1,n_bins+1); ece=0.0; total=len(y_true)
    for i in range(n_bins):
        l,r=bins[i],bins[i+1]
        mask=(y_prob>=l)&(y_prob<(r if i<n_bins-1 else r))
        if np.any(mask):
            acc=y_true[mask].mean(); conf=y_prob[mask].mean()
            ece += abs(acc-conf)*(mask.sum()/total)
    return ece

ece_before = expected_calibration_error(y_test, proba_test, 15)
ece_after  = expected_calibration_error(y_test, cal_proba_test, 15)
from sklearn.metrics import brier_score_loss
brier_before = brier_score_loss(y_test, proba_test)
brier_after  = brier_score_loss(y_test, cal_proba_test)
print(f"ECE antes: {ece_before:.6f} | ECE después: {ece_after:.6f}")
print(f"Brier antes: {brier_before:.6f} | Brier después: {brier_after:.6f}")

# Reliability curves
fig = plt.figure(figsize=(5,5))
plt.plot([0,1],[0,1],"--")
for name, p in {"Original": proba_test, "TempScaled": cal_proba_test}.items():
    frac_pos, mean_pred = calibration_curve(y_test, p, n_bins=15, strategy="uniform")
    plt.plot(mean_pred, frac_pos, marker="o", label=name)
plt.xlabel("Probabilidad promedio por bin"); plt.ylabel("Fracción de positivos")
plt.title("Curva de confiabilidad — Test")
plt.legend(); plt.tight_layout(); plt.show()



## 8) Interpretabilidad: Integrated Gradients (básico) y SHAP Deep (opcional)

> Nota: IG aquí se hace **por paso temporal** y luego se promedia a nivel de feature. Para SHAP, si está instalado, se ejecuta con `DeepExplainer` sobre un *subset* pequeño.


In [None]:

# Integrated Gradients (sencillo)
import tensorflow as tf
import numpy as np

def integrated_gradients(inputs, model, baseline=None, steps=50):
    if baseline is None:
        baseline = np.zeros_like(inputs)
    interpolated = [baseline + (float(k)/steps)*(inputs - baseline) for k in range(steps+1)]
    interpolated = np.array(interpolated)  # (steps+1, seq, feat)
    with tf.GradientTape() as tape:
        tape.watch(interpolated)
        preds = model(interpolated, training=False)
    grads = tape.gradient(preds, interpolated).numpy()
    avg_grads = (grads[:-1] + grads[1:]) / 2.0
    integral = (inputs - baseline) * np.mean(avg_grads, axis=0)
    return integral  # (seq, feat)

# Ejemplo sobre una muestra positiva (si existe)
idx = np.where(y_test==1)[0]
if len(idx)>0:
    ex = X_test[idx[0:1]][0]  # (seq, feat)
    ig = integrated_gradients(ex, model, baseline=None, steps=32)  # (seq, feat)
    # Agregamos por tiempo para rankear features
    feat_scores = ig.mean(axis=0)
    top_idx = np.argsort(-np.abs(feat_scores))[:20]
    print("Top features por |IG|:", top_idx)
else:
    print("No hay positivos en test para ejemplo de IG.")


In [None]:

# SHAP Deep (opcional)
try:
    import shap
    background = X_train[:200]
    explainer = shap.DeepExplainer(model, background)
    shap_values = explainer.shap_values(X_test[:100])  # cuidado con memoria
    # Resumen: promedio sobre tiempo para obtener importancia por feature
    import numpy as np, matplotlib.pyplot as plt
    sv = shap_values[0] if isinstance(shap_values, list) else shap_values
    sv_feat = np.mean(np.abs(sv), axis=1).mean(axis=0)  # (n_features,)
    top = np.argsort(-sv_feat)[:20]
    fig = plt.figure(figsize=(8,5))
    plt.bar(range(len(top)), sv_feat[top])
    plt.xticks(range(len(top)), [str(i) for i in top], rotation=90)
    plt.title("SHAP Deep — Importancia promedio por feature (agregada en tiempo)")
    plt.tight_layout(); plt.show()
except Exception as e:
    print("SHAP Deep no disponible o falló:", e)
