In [None]:
"""
Demo reproducible del ADL-WAF (DecisionTree layer1 + SVM+TFIDF layer2).
- Soporta ejecución sobre datasets locales (CSV) o demo sintética si no hay datos.
- Ajuste: si usa datasets reales, modifique la sección `load_real_data()` para adaptar columnas.
"""

import os
import random
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.svm import SVC
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import precision_score, recall_score, accuracy_score, confusion_matrix, classification_report
import joblib
import urllib.parse


random.seed(42)
np.random.seed(42)

In [None]:
# --------------------------
# Utilidades: generación demo (si no hay datos reales)
# --------------------------
def make_benign_payload():
    words = ["user","name","profile","home","index","view","param","id","login","order","item","product"]
    return " ".join(random.choices(words, k=random.randint(3,8)))

def make_sql_injection():
    templates = [
        "SELECT * FROM users WHERE id = {} OR 1=1;".format(random.randint(1,100)),
        "' OR '1'='1' -- ",
        "'; DROP TABLE users; --",
        "admin'--",
        "UNION SELECT username, password FROM users --"
    ]
    return random.choice(templates)

def make_xss():
    templates = [
        "<script>alert('XSS')</script>",
        "<img src=x onerror=alert(1)>",
        "<svg/onload=alert(1)>",
        "<iframe src='javascript:alert(1)'></iframe>"
    ]
    return random.choice(templates)

def make_path_traversal():
    templates = [
        "../../etc/passwd",
        "/../../../../windows/win.ini",
        "../config.php"
    ]
    return random.choice(templates)

def inject_special_chars(base):
    extras = ["&&", "||", "%3C", "%3E", "<", ">", "=", "%27", "%22", "../", "/*"]
    return base + " " + " ".join(random.choices(extras, k=random.randint(1,4)))

def make_synthetic_dataset(N=5000):
    rows = []
    for i in range(N):
        if random.random() < 0.70:
            payload = make_benign_payload()
            is_anomaly = 0
            is_threat = 0
        else:
            attack_type = random.choices(["sqli","xss","path","other"], weights=[0.4,0.3,0.2,0.1])[0]
            if attack_type == "sqli":
                payload = make_sql_injection()
            elif attack_type == "xss":
                payload = make_xss()
            elif attack_type == "path":
                payload = make_path_traversal()
            else:
                payload = inject_special_chars(make_benign_payload())
            is_anomaly = 1
            is_threat = 1
        # small chance of benign but anomalous (noise)
        if is_anomaly == 0 and random.random() < 0.02:
            payload = inject_special_chars(payload)
            is_anomaly = 1
            is_threat = 0
        rows.append({"payload": payload, "anomaly_label": is_anomaly, "threat_label": is_threat})
    return pd.DataFrame(rows).sample(frac=1, random_state=42).reset_index(drop=True)


In [None]:
# --------------------------
# Feature engineering (capa 1)
# --------------------------
def alnum_ratio(s):
    if not s: return 0.0
    alnum = sum(c.isalnum() for c in s)
    return alnum / len(s) * 100.0

def badwords_ratio(s):
    badwords = ["select","union","drop","alert","script","onerror","../","etc","passwd","--","or","and","1=1"]
    s_low = s.lower()
    found = sum(s_low.count(bw) for bw in badwords)
    return (found / max(1, len(s.split()))) * 100.0

def special_char_ratio(s):
    if not s: return 0.0
    special = sum((not c.isalnum() and not c.isspace()) for c in s)
    return special / len(s) * 100.0

def illegal_special_ratio(s):
    illegal_tokens = ['%','%3C','%3E','..','&','|',';','--','/*','*/']
    total_special = sum((not c.isalnum() and not c.isspace()) for c in s)
    if total_special == 0: return 0.0
    found = sum(1 for token in illegal_tokens if token in s)
    return found / total_special * 100.0

def add_layer1_features(df):
    df = df.copy()
    df['alnum_ratio'] = df['payload'].apply(alnum_ratio)
    df['badwords_ratio'] = df['payload'].apply(badwords_ratio)
    df['special_char_ratio'] = df['payload'].apply(special_char_ratio)
    df['illegal_special_ratio'] = df['payload'].apply(illegal_special_ratio)
    return df


In [None]:
def crear_anomalias_benignas(
    df,
    payload_col="payload",
    anomaly_col="anomaly_label",
    threat_col="threat_label",
    ratio=0.15,
    random_state=42
):
    """
    Convierte una fracción de requests normales en anomalías benignas.
    
    Parámetros:
    - df: DataFrame original
    - payload_col: columna con el texto HTTP / payload
    - anomaly_col: columna anomaly_label (0/1)
    - threat_col: columna threat_label (0/1)
    - ratio: porcentaje de requests normales a convertir (ej: 0.15 = 15%)
    - random_state: semilla reproducible
    """

    random.seed(random_state)

    df = df.copy()

    # Seleccionar solo tráfico normal
    normal_idx = df[
        (df[anomaly_col] == 0) &
        (df[threat_col] == 0)
    ].index

    n_convert = int(len(normal_idx) * ratio)
    if n_convert == 0:
        return df

    selected_idx = random.sample(list(normal_idx), n_convert)

    def perturb_payload(payload):
        transformations = [
            lambda s: s + "&&",
            lambda s: s.replace("=", "==", 1),
            lambda s: s + "%20",
            lambda s: urllib.parse.quote(s, safe="=&?"),
            lambda s: s + "@@",
            lambda s: s + "??",
            lambda s: s.replace("&", "&&"),
            lambda s: s + "%3D"
        ]
        transform = random.choice(transformations)
        return transform(payload)

    for idx in selected_idx:
        original = df.at[idx, payload_col]
        df.at[idx, payload_col] = perturb_payload(original)
        df.at[idx, anomaly_col] = 1
        df.at[idx, threat_col] = 0

    return df


In [None]:
real_data_path="../data/test_dataset/HttpParamsDataset/payload_full.csv"

df = pd.read_csv(real_data_path)
df = add_layer1_features(df)
feature_cols = ['alnum_ratio','badwords_ratio','special_char_ratio','illegal_special_ratio']

df["anomaly_label"] = df["label"].map({
    "norm": 0,
    "anom": 1
})

df["threat_label"] = df["attack_type"].apply(
    lambda x: 0 if x == "norm" else 1
)


# Crear anomalias
df = crear_anomalias_benignas(df, ratio=0.15)

# 3) Split
train_df, test_df = train_test_split(df, test_size=0.3, random_state=42, stratify=df['anomaly_label'])




In [None]:
# 4) Train layer1 (Decision Tree)

from sklearn.ensemble import (
  RandomForestClassifier, 
  IsolationForest
)

dt = RandomForestClassifier(
  random_state=42,
  n_estimators=100,  
  class_weight='balanced_subsample',  # Mejor manejo de clases
  max_depth=None,
  min_samples_split=5,
  min_samples_leaf=2
)
dt.fit(train_df[feature_cols], train_df['anomaly_label'])

# 5) Evaluate layer1
pred_l1_test = dt.predict(test_df[feature_cols])
print("\nLayer1 - Anomaly detection (Decision Tree) - report:")
print(classification_report(test_df['anomaly_label'], pred_l1_test, digits=4))


In [None]:
 # 6) Prepare layer2 training using only anomalous samples from training
train_l2 = train_df[train_df['anomaly_label'] == 1].copy()
test_l2 = test_df[test_df['anomaly_label'] == 1].copy()

print("TRAIN L2:")
print(train_l2["threat_label"].value_counts())

print("\nTEST L2:")
print(test_l2["threat_label"].value_counts())



In [None]:
# if len(anom_norm) == 0:
#     # degradar algunos norm a anomalía benigna
#     benign_norm = df[df["anomaly_label"] == 0].sample(200, random_state=42)
#     benign_norm["anomaly_label"] = 1
#     benign_norm["threat_label"] = 0

#     train_l2 = pd.concat([train_l2, benign_norm])

# anom_norm = train_l2[
#     (train_l2["anomaly_label"] == 1) &
#     (train_l2["attack_type"] == "norm")
# ]
# print(len(anom_norm))


In [None]:

vectorizer = TfidfVectorizer(ngram_range=(1,4), analyzer='char_wb')

if len(train_l2) == 0:
    raise RuntimeError("No hay anomalías en el conjunto de entrenamiento para entrenar layer2 (aumente dataset o cambie split).")

X_l2_train = vectorizer.fit_transform(train_l2['payload'])
y_l2_train = train_l2['threat_label']

# 7) Entrenar SVM (modelo rápido por defecto; si desea gridsearch, activarlo)
svc = SVC(kernel='rbf', C=10, gamma='scale', probability=False, random_state=42)
svc.fit(X_l2_train, y_l2_train)

In [None]:
print("TRAIN L2:")
print(train_l2["threat_label"].value_counts())

print("\nTEST L2:")
print(test_l2["threat_label"].value_counts())

In [None]:
# 8) Evaluar layer2 (sobre anomalías del test)
if len(test_l2) > 0:
    X_l2_test = vectorizer.transform(test_l2['payload'])
    y_l2_test = test_l2['threat_label']
    pred_l2 = svc.predict(X_l2_test)
    print("\nLayer2 - Threat detection (SVM) - report (solo anomalías):")
    print(classification_report(y_l2_test, pred_l2, digits=4))
else:
    print("No hay anomalías en el conjunto de test para evaluar layer2.")


In [None]:
def adl_decision_row(row):
  l1 = dt.predict([row[feature_cols].values])[0]
  if l1 == 0:
      return 0
  else:
      x = vectorizer.transform([row['payload']])
      l2 = svc.predict(x)[0]
      return int(l2)

test_df = test_df.reset_index(drop=True)
test_df['adl_pred'] = test_df.apply(adl_decision_row, axis=1)
y_true = test_df['threat_label'].values
y_pred = test_df['adl_pred'].values

acc = accuracy_score(y_true, y_pred)
prec = precision_score(y_true, y_pred, zero_division=0)
rec = recall_score(y_true, y_pred, zero_division=0)
print("\nADL-WAF combined - métricas sobre conjunto de test (threat_label):")
print(f"Accuracy: {acc:.4f}, Precision: {prec:.4f}, Recall: {rec:.4f}")
print("Confusion matrix (TN, FP, FN, TP):", confusion_matrix(y_true, y_pred).ravel() if confusion_matrix(y_true, y_pred).size==4 else confusion_matrix(y_true, y_pred))
print("\nEjemplo de decisiones (muestra):")
print(test_df[['payload','anomaly_label','threat_label','adl_pred']].sample(6, random_state=42).to_string(index=False))

# 10) Guardar modelos
os.makedirs("models", exist_ok=True)
joblib.dump(dt, "models/adl_dt_layer1.joblib")
joblib.dump(svc, "models/adl_svm_layer2.joblib")
joblib.dump(vectorizer, "models/adl_tfidf_vectorizer.joblib")
print("\nModelos guardados en carpeta ./models/")


In [None]:
print("ADL-WAF final:")
print(classification_report(
    test_df["threat_label"],
    test_df["adl_pred"]
))


In [None]:
# --------------------------
# Main pipeline
# --------------------------
def main(use_real_data=False, real_data_path=None):
    # 1) Load data
    if use_real_data:
        if real_data_path is None or not os.path.exists(real_data_path):
            raise ValueError("Si use_real_data=True, proporcione real_data_path con un CSV válido.")
        df = pd.read_csv(real_data_path)
        # Expect columns: 'payload', 'anomaly_label', 'threat_label' or adapt accordingly.
        # Si su CSV tiene columnas distintas, ajuste aquí.
    else:
        print("No dataset real provisto: generando dataset sintético para demostración...")
        df = make_synthetic_dataset(N=5000)

    # 2) Feature engineering
    df = add_layer1_features(df)
    feature_cols = ['alnum_ratio','badwords_ratio','special_char_ratio','illegal_special_ratio']

    # 3) Split
    train_df, test_df = train_test_split(df, test_size=0.3, random_state=42, stratify=df['anomaly_label'])

    # 4) Train layer1 (Decision Tree)
    dt = DecisionTreeClassifier(random_state=42, max_depth=8)
    dt.fit(train_df[feature_cols], train_df['anomaly_label'])

    # 5) Evaluate layer1
    pred_l1_test = dt.predict(test_df[feature_cols])
    print("\nLayer1 - Anomaly detection (Decision Tree) - report:")
    print(classification_report(test_df['anomaly_label'], pred_l1_test, digits=4))

    # 6) Prepare layer2 training using only anomalous samples from training
    train_l2 = train_df[train_df['anomaly_label'] == 1].copy()
    test_l2 = test_df[test_df['anomaly_label'] == 1].copy()
    vectorizer = TfidfVectorizer(ngram_range=(1,4), analyzer='char_wb')

    if len(train_l2) == 0:
        raise RuntimeError("No hay anomalías en el conjunto de entrenamiento para entrenar layer2 (aumente dataset o cambie split).")

    X_l2_train = vectorizer.fit_transform(train_l2['payload'])
    y_l2_train = train_l2['threat_label']

    # 7) Entrenar SVM (modelo rápido por defecto; si desea gridsearch, activarlo)
    svc = SVC(kernel='rbf', C=10, gamma='scale', probability=False, random_state=42)
    svc.fit(X_l2_train, y_l2_train)

    # 8) Evaluar layer2 (sobre anomalías del test)
    if len(test_l2) > 0:
        X_l2_test = vectorizer.transform(test_l2['payload'])
        y_l2_test = test_l2['threat_label']
        pred_l2 = svc.predict(X_l2_test)
        print("\nLayer2 - Threat detection (SVM) - report (solo anomalías):")
        print(classification_report(y_l2_test, pred_l2, digits=4))
    else:
        print("No hay anomalías en el conjunto de test para evaluar layer2.")

    # 9) Evaluación combinada ADL-WAF
    def adl_decision_row(row):
        l1 = dt.predict([row[feature_cols].values])[0]
        if l1 == 0:
            return 0
        else:
            x = vectorizer.transform([row['payload']])
            l2 = svc.predict(x)[0]
            return int(l2)

    test_df = test_df.reset_index(drop=True)
    test_df['adl_pred'] = test_df.apply(adl_decision_row, axis=1)
    y_true = test_df['threat_label'].values
    y_pred = test_df['adl_pred'].values

    acc = accuracy_score(y_true, y_pred)
    prec = precision_score(y_true, y_pred, zero_division=0)
    rec = recall_score(y_true, y_pred, zero_division=0)
    print("\nADL-WAF combined - métricas sobre conjunto de test (threat_label):")
    print(f"Accuracy: {acc:.4f}, Precision: {prec:.4f}, Recall: {rec:.4f}")
    print("Confusion matrix (TN, FP, FN, TP):", confusion_matrix(y_true, y_pred).ravel() if confusion_matrix(y_true, y_pred).size==4 else confusion_matrix(y_true, y_pred))
    print("\nEjemplo de decisiones (muestra):")
    print(test_df[['payload','anomaly_label','threat_label','adl_pred']].sample(6, random_state=42).to_string(index=False))

    # 10) Guardar modelos
    os.makedirs("models", exist_ok=True)
    joblib.dump(dt, "models/adl_dt_layer1.joblib")
    joblib.dump(svc, "models/adl_svm_layer2.joblib")
    joblib.dump(vectorizer, "models/adl_tfidf_vectorizer.joblib")
    print("\nModelos guardados en carpeta ./models/")
