# Code zu der Arbeit:
# "Comparitve Study von Machine Learning Modellen zur Erkennung von Web Schwachstellen"
## von Nils Pudenz

In [None]:

%pip install kaggle scikit-learn xgboost catboost tabpfn pandas numpy matplotlib seaborn -q
%pip install --quiet scikit-learn xgboost catboost tabpfn chardet
%pip install -U scikit-learn
# in deiner (Conda/venv) Umgebung
%pip install --upgrade "torch==2.*" --index-url https://download.pytorch.org/whl/cu121
%pip install --upgrade xgboost catboost scikit-learn pandas scipy tabpfn
##wenn es komplikationen mit torch gibt, deiinstallieren und neu installieren
%pip uninstall torch
%pip install torch --index-url https://download.pytorch.org/whl/cu121 --upgrade
%pip install openpyxl
%pip install XlsxWriter
%pip install openpyxl

## Check ob GPU verwendet werden kann

In [None]:
print(sys.version)
print("Python:", sys.executable)
print("Torch:", torch.__version__, "| CUDA build:", torch.version.cuda)
print("CUDA available:", torch.cuda.is_available())

## Dowload Kaggle Datasets
Requires Kaggle API credentials ('~/.kaggle/kaggle.json') für API-Token, um zugriff auf die Datenbanken über das Kaggle Konto zu bekommen

In [None]:
DATA_DIR = Path("data")
DATA_DIR.mkdir(exist_ok=True)

In [None]:
#Dowload der Datasets von Kaggle, Output =1 erfoglgreich, Output = 0 fehlerhaft
os.system("kaggle datasets download -d syedsaqlainhussain/sql-injection-dataset -p data --unzip --quiet")
os.system("kaggle datasets download -d syedsaqlainhussain/cross-site-scripting-xss-dataset-for-deep-learning -p data --unzip --quiet")

In [None]:
def kaggle_download(dataset, path="data", unzip=True):
    api = KaggleApi()
    api.authenticate() #nutzt ~/.kaggle/kaggle.json für Authentifizierung oder Environment-Variablen
    api.dataset_download_files(dataset, path=path, unzip=unzip)
    print(f"Downloaded {dataset}")

kaggle_download("syedsaqlainhussain/sql-injection-dataset")#, path=DATA_DIR, unzip=True)
kaggle_download("syedsaqlainhussain/cross-site-scripting-xss-dataset-for-deep-learning")#, path=DATA_DIR, unzip=True)

In [None]:
sql_df = pd.read_csv("data/SQLiV3.csv", encoding="utf-8", low_memory=False)
xss_df = pd.read_csv("data/XSS_dataset.csv", encoding="utf-8", low_memory=False)

## Infos über den Datensatz

In [None]:
# Spalten ansehen
print(sql_df.columns.tolist())

# Typische Index-/Hilfsspalten loswerden
sql_df = sql_df.loc[:, ~sql_df.columns.str.contains(r"^Unnamed|^index$", case=False)]

# Auf die Kernspalten reduzieren (falls etwas anderes drin ist)
sql_df = sql_df[["Sentence", "Label"]].copy()

# Optional: Duplikate auf Satzebene entfernen (falls noch nicht passiert)
sql_df = sql_df.drop_duplicates(subset=["Sentence"]).reset_index(drop=True)

print("clean shape:", sql_df.shape)  # Erwartung: (30873, 2)
print(sql_df["Label"].value_counts(normalize=True))

In [None]:
for name, df in {"SQL": sql_df, "XSS": xss_df}.items():
    print(f"{name} dataset shape: {df.shape}")
    display(df.head(20))
    display(df.describe())
    display(df.info())

In [None]:
def class_distribution(df, label_col="Label"):
    """Zeigt die Klassenverteilung in einem DataFrame."""
    counts = df[label_col].value_counts(normalize=True)
    print("Klassenverteilung:")
    print(counts)
    plt.figure(figsize=(8, 4))
    counts.plot(kind="bar")
    plt.title("Klassenverteilung")
    plt.xlabel(label_col)
    plt.ylabel("Anteil")
    plt.xticks(rotation=0)
    plt.show()

print("SQL Distribution:")
print(class_distribution(sql_df, "Label"))

print("XSS Distribution:")
print(class_distribution(xss_df, "Label"))

## Basic Cleaning
* Drop Duplicate rows
* Handle missing values (simple fill-na)

In [None]:
for df in (sql_df, xss_df):
    df.drop_duplicates(inplace=True)
    df.fillna(0, inplace=True)

In [None]:
def preprocess_xy(df: pd.DataFrame,
                  label_candidates=("label", "class", "target"),
                  label_map=None):
    if label_map is None:
        label_map = {
            "0": "0", "1": "1",
            "benign": "0", "normal": "0", "legitimate": "0", "safe": "0",
            "attack": "1", "malicious": "1", "sql injection": "1",
            "sql-injection": "1", "xss": "1"
        }

    # Zielspalte finden (im *übergebenen* df!)
    cols_lower = {c.lower(): c for c in df.columns}
    target_col = next((cols_lower[c] for c in label_candidates if c in cols_lower), None)
    if target_col is None:
        raise ValueError(f"Keine Label-Spalte gefunden. Kandidaten: {label_candidates}")

    # Labels normieren -> nur 0/1 behalten
    y_str = df[target_col].astype(str).str.strip().str.lower()
    y_map = y_str.map(label_map)
    mask = y_map.notna()
    y = pd.to_numeric(y_map[mask]).astype(int).to_numpy()

    # Rohtext aus allen Nicht-Label-Spalten zusammenbauen
    feat_cols = [c for c in df.columns if c != target_col]
    X_raw = df.loc[mask, feat_cols].astype(str).agg(" ".join, axis=1)

    return X_raw, y, target_col


# Globale Settings

In [None]:

#Deterministische Ausgabe generieren, um die Reproduzierbarkeit zu gewährleisten
RANDOM_STATE = 42

os.environ["OMP_NUM_THREADS"] = "8"
os.environ["OPENBLAS_NUM_THREADS"] = "8"
torch.set_num_threads(8)

USE_CUDA = torch.cuda.is_available()  # für TabPFN & XGBoost

# Hilfsfunktionen 

In [None]:
def binary_metrics(y_true, y_pred) -> dict:
    """Präzision/Recall/F1 & Raten (FPR/FNR) für binäre Klassifikation."""
    y_pred = np.asarray(y_pred).ravel()
    p  = precision_score(y_true, y_pred)
    r  = recall_score(y_true, y_pred)
    f1 = f1_score(y_true, y_pred)
    tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
    fpr = float(fp / (fp + tn)) if (fp + tn) else 0.0
    fnr = float(fn / (fn + tp)) if (fn + tp) else 0.0
    return dict(Precision=p, Recall=r, F1=f1, FPR=fpr, FNR=fnr)

def evaluate_model(model, X_test, y_test, name, use_batches=False, batch_size=256) -> dict: #Dictionary für Evaluierungsmetriken
    """Zeitmessung + Vorhersage (optional in Batches) + Metriken."""
    print(f"→ Evaluate {name} on X_test={getattr(X_test,'shape',None)}")
    t0 = time.perf_counter()
    if use_batches:
        y_pred = predict_in_batches(model, X_test, batch_size=batch_size, verbose=True)
    else:
        y_pred = model.predict(X_test)
    pred_s = time.perf_counter() - t0
    m = binary_metrics(y_test, y_pred)
    res = dict(Model=name, Pred_s=pred_s, **m)
    return res

In [None]:
def plot_confusion_matrix(y_true, y_pred, labels=None, title="Confusion Matrix"):
    """Zeigt eine Konfusionsmatrix an."""
    cm = confusion_matrix(y_true, y_pred, labels=labels)
    plt.figure(figsize=(8, 6))
    plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(labels))
    plt.xticks(tick_marks, labels, rotation=45)
    plt.yticks(tick_marks, labels)

    thresh = cm.max() / 2
    for i in range(len(labels)):
        for j in range(len(labels)):
            plt.text(j, i, cm[i, j], horizontalalignment='center',
                     color='white' if cm[i, j] > thresh else 'black')

    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    plt.tight_layout()
    plt.show()

## Preprocessing

### Batchweise Vorhersage für TabPFN

In [None]:
def predict_in_batches(model, X, batch_size=256, verbose=False):
    """Vorhersage in Batches (schont RAM/VRAM; wichtig für TabPFN)."""
    n = X.shape[0]
    out = []
    total = math.ceil(n / batch_size)
    for b, i in enumerate(range(0, n, batch_size), start=1):
        j = min(i + batch_size, n)
        t1 = time.perf_counter()
        with torch.inference_mode():
            out.append(model.predict(X[i:j]))
        dt = time.perf_counter() - t1
        if verbose:
            print(f"   [predict] batch {b:>3}/{total} ({j-i} rows) in {dt:.2f}s")
        if USE_CUDA:
            torch.cuda.synchronize()
    return np.concatenate(out)

### Label bereinigung der Datensets

In [None]:
def clean_labels(df, label_col="Label", text_cols=("Sentence",)):
    """Bringt Labels robust auf {0,1} und gibt (X_raw, y) zurück."""
    df = df.copy()
    
    # Labels in {0,1} umwandeln
    y_raw = df[label_col].astype(str).str.strip().str.lower()
    map01 = {
        "0": "0", "1": "1", "benign": "0", "normal": "0", "legitimate": "0", "safe": "0",
        "attack": "1", "malicious": "1", "sql injection": "1", "sql-injection": "1", "xss": "1"
    }
    y_map = y_raw.map(map01)
    mask = y_map.notna()
    y = pd.to_numeric(y_map[mask]).astype(int).to_numpy()
    
    # Explizit nur angegebene Textspalten verwenden!
    X_raw = df.loc[mask, text_cols].astype(str).agg(" ".join, axis=1)
    
    return X_raw, y

## Split

In [None]:
# Falls clean_labels(X) bereits (Rohtext, y) zurückgibt:
X_txt_sql, y_sql = clean_labels(sql_df, label_col="Label", text_cols=("Sentence",))
X_train_sql, X_test_sql, y_train_sql, y_test_sql = train_test_split(
    X_txt_sql, y_sql, test_size=0.2, stratify=y_sql, random_state=RANDOM_STATE
)

X_txt_xss, y_xss = clean_labels(xss_df, label_col="Label", text_cols=("Sentence",))
X_train_xss, X_test_xss, y_train_xss, y_test_xss = train_test_split(
    X_txt_xss, y_xss, test_size=0.2, stratify=y_xss, random_state=RANDOM_STATE
)

In [None]:
def plot_class_distribution(y, title="Label-Verteilung"):
    """Zeigt die Label-Verteilung in einem Balkendiagramm."""
    counts = pd.Series(y).value_counts(normalize=True)
    plt.figure(figsize=(8, 4))
    counts.plot(kind="bar")
    plt.title(title)
    plt.xlabel("Label (0=benign, 1=attack)")
    plt.ylabel("Anzahl")
    plt.xticks(rotation=0)
    plt.show()

plot_class_distribution(y_train_sql, "SQL Train Label-Verteilung")
plot_class_distribution(y_test_sql, "SQL Test Label-Verteilung")

plot_class_distribution(y_train_xss, "XSS Train Label-Verteilung")
plot_class_distribution(y_test_xss, "XSS Test Label-Verteilung")

# Modelle

In [None]:
models = {
    "RandomForest": RandomForestClassifier(n_estimators=300, n_jobs=-1, random_state=RANDOM_STATE),
    "MLP": MLPClassifier(hidden_layer_sizes=(256,128), activation="relu",
                         early_stopping=True, n_iter_no_change=5, max_iter=200,
                         random_state=RANDOM_STATE),
    "XGBoost": XGBClassifier(
        n_estimators=500, max_depth=6, learning_rate=0.1,
        subsample=0.9, colsample_bytree=0.8,
        tree_method="hist", device=("cuda" if USE_CUDA else "cpu"),
        random_state=RANDOM_STATE
    ),
    "CatBoost": CatBoostClassifier(
        iterations=400, depth=8, learning_rate=0.1,
        loss_function="Logloss", random_seed=RANDOM_STATE, verbose=False,
        task_type="CPU"   # stabil über Pool + Sparse; bei GPU: task_type="GPU" und ggf. Dense verwenden
    ),
    "TabPFN": TabPFNClassifier(
        device=("cuda" if USE_CUDA else "cpu"),
        ignore_pretraining_limits=True
    )
}

# Pipeline

In [None]:
#TDF-IDF Einstellungen
VEC_ARGS = dict(analyzer="char", ngram_range=(3,5), min_df=3, max_features=50_000, sublinear_tf=True, lowercase=False, dtype=np.float32)


TABPFN_MAX_SAMPLES = 4000     # starte konservativ; später 6000/8000 testen
TABPFN_N_COMPONENTS = 150     # <=500, kleiner = schneller/ram-sparender
TABPFN_BATCH = 128            # Batch für Predict; 128/64 bei RAM-Engpässen

results = []

In [None]:
## Pipeline je Datensatz (TF-IDF; TabPFN: SVD + batched predict)
from sklearn.base import clone
from sklearn.model_selection import StratifiedShuffleSplit, cross_val_score
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.decomposition import TruncatedSVD
from catboost import Pool
import numpy as np, time, torch


splits = {
    "SQL": (X_train_sql, X_test_sql, y_train_sql, y_test_sql),
    "XSS": (X_train_xss, X_test_xss, y_train_xss, y_test_xss),
}

for ds_name, (X_train_txt, X_test_txt, y_train, y_test) in splits.items():
    print("\n" + "="*70)
    print(f"DATASET: {ds_name} | train={len(y_train)} test={len(y_test)} (pos_rate_train={np.mean(y_train):.3f})")

    #TF-IDF nur auf Train fitten (kein Leakage)
    vec = TfidfVectorizer(**VEC_ARGS)
    t0 = time.perf_counter()
    X_train_vec = vec.fit_transform(X_train_txt)
    #Feature anzeigen
    feature_names = vec.get_feature_names_out()
    print(f"TF-IDF Features: {len(feature_names)}")
    print("Beispiel-Feature:", feature_names[:10])  # Zeigt das
    X_test_vec  = vec.transform(X_test_txt)
    print(f"[VEC] TF-IDF train={X_train_vec.shape}, test={X_test_vec.shape} in {time.perf_counter()-t0:.2f}s")

    # Füge K-fold Cross Validation auf den Trainingsdaten hinzu:
    cv = StratifiedKFold(n_splits=10, shuffle=True, random_state=RANDOM_STATE)
    print(f"\nPerforming {cv.get_n_splits()}‑Fold CV on the training data:\n")

    # Beispielweise mit F1-Score – beachte, dass manche Modelle (z.B. CatBoost, TabPFN) evtl. spezielle Datenformate erwarten:
    for name, base_model in models.items():
        model = clone(base_model)
        # Falls das Modell direkt den Sparse Input akzeptiert, kannst du CV nutzen:
        if name not in ["CatBoost", "TabPFN"]:
            scores = cross_val_score(model, X_train_vec, y_train, cv=cv, scoring="f1")
            print(f"[{name}] CV F1 Score: {scores.mean():.4f} (± {scores.std():.4f})")
        else:
            print(f"[{name}] CV abgebrochen, da spezielles Preprocessing erforderlich ist.")

    #Modelle trainieren/evaluieren (jeweils frischen Klon verwenden)
    for name, base_model in models.items():
        model = clone(base_model)
        print("-"*50 + f"\nMODEL: {name} on {ds_name}")

        if name == "CatBoost":
            # CatBoost: Pool nutzen (sparse ok)
            pool_train = Pool(X_train_vec, y_train)
            t0 = time.perf_counter()
            model.fit(pool_train)
            print(f"[{name}] fit in {time.perf_counter()-t0:.2f}s")

            # Test direkt auf X_test_vec (CatBoost kann CSR); alternativ: Pool(X_test_vec, y_test)
            res = evaluate_model(model, X_test_vec, y_test, f"{name}-{ds_name}")
            results.append(res)
            print(res)

        elif name == "TabPFN":
            # TabPFN: ggf. stratifiziertes Subset (Limits), danach SVD -> dichte float32
            X_tab, y_tab = X_train_vec, y_train
            if X_train_vec.shape[0] > TABPFN_MAX_SAMPLES:
                sss = StratifiedShuffleSplit(n_splits=1, train_size=TABPFN_MAX_SAMPLES, random_state=RANDOM_STATE)
                idx, _ = next(sss.split(np.zeros(len(y_train)), y_train))
                X_tab = X_train_vec[idx]
                y_tab = np.asarray(y_train)[idx]
            print(f"[{name}] train subset: {X_tab.shape[0]} rows")

            t0 = time.perf_counter()
            svd = TruncatedSVD(n_components=TABPFN_N_COMPONENTS, random_state=RANDOM_STATE)
            X_train_svd = svd.fit_transform(X_tab).astype("float32", copy=False)
            X_test_svd  = svd.transform(X_test_vec).astype("float32", copy=False)
            print(f"[{name}] SVD train={X_train_svd.shape}, test={X_test_svd.shape} in {time.perf_counter()-t0:.2f}s")

            # Gerät einstellen & fitten
            if hasattr(model, "set_params"):
                model.set_params(device=("cuda" if USE_CUDA else "cpu"), ignore_pretraining_limits=True)
            t0 = time.perf_counter()
            model.fit(X_train_svd, y_tab)
            if USE_CUDA:
                torch.cuda.synchronize()
            print(f"[{name}] fit in {time.perf_counter()-t0:.2f}s (device={'cuda' if USE_CUDA else 'cpu'})")

            # Evaluieren (wenn deine evaluate_model batched predict unterstützt, ansonsten normal)
            try:
                res = evaluate_model(model, X_test_svd, y_test, f"{name}-{ds_name}",
                                     use_batches=True, batch_size=TABPFN_BATCH)
            except TypeError:
                res = evaluate_model(model, X_test_svd, y_test, f"{name}-{ds_name}")
            results.append(res)
            print(res)

        else:
            # RF / MLP / XGBoost: direkt auf sparse TF-IDF
            t0 = time.perf_counter()
            model.fit(X_train_vec, y_train)
            print(f"[{name}] fit in {time.perf_counter()-t0:.2f}s")
            res = evaluate_model(model, X_test_vec, y_test, f"{name}-{ds_name}")
            results.append(res)
            print(res)



# Ergebnisse für Modelle

In [None]:
# Ergebnis-Tabelle ausgeben
df_results = pd.DataFrame(results)
display(df_results.sort_values(["Model"]).reset_index(drop=True))
df_results.to_csv("results_all.csv", index=False)
with pd.ExcelWriter("results_all.xlsx") as w:
    df_results.to_excel(w, sheet_name="All", index=False)
print(" Ergebnisse gespeichert: results_all.csv / results_all.xlsx")

# Hyperparameter-Tuning & Cross-Validation

## Feature und Modelle

In [None]:
# Feature-Builder (schnell & schlank)

def make_tfidf(max_features=20_000):
    return TfidfVectorizer(
        analyzer="char", ngram_range=(3,5),
        min_df=3, max_features=max_features,
        sublinear_tf=True, lowercase=False,
        dtype=np.float32
    )

# Cache, damit TF-IDF/SVD nicht in jedem Kandidaten neu gelernt werden müssen
memory = Memory(location="skcache", verbose=0)


#schlanke Suchräume
def pos_weight(y):
    pos = max(1, int(np.sum(y))); neg = max(1, int(len(y)-pos))
    return neg / pos

def grids(y):
    return {
        "RF": {
            "svd__n_components": [120, 150, 200],
            "clf__n_estimators": [300, 600],
            "clf__max_depth": [None, 20, 40],
            "clf__min_samples_split": [2, 5, 10],
            "clf__max_features": ["sqrt", None],
        },
        "MLP": {
            "svd__n_components": [120, 150, 200],
            "clf__hidden_layer_sizes": [(256,128), (512,256)],
            "clf__alpha": np.logspace(-5, -3, 3),
            "clf__learning_rate_init": [1e-4, 5e-4, 1e-3],
            "clf__batch_size": [64, 128],
        },
        "XGB": {
            "tfidf__max_features": [15_000, 20_000],
            "clf__n_estimators": [300, 500],
            "clf__max_depth": [4, 6, 8],
            "clf__learning_rate": [0.05, 0.1],
            "clf__subsample": [0.7, 1.0],
            "clf__colsample_bytree": [0.6, 1.0],
            "clf__reg_lambda": [0, 5],
            "clf__gamma": [0, 1],
            "clf__scale_pos_weight": [pos_weight(y)],
        },
        "CAT": {
            "tfidf__max_features": [15_000, 20_000],
            "clf__iterations": [400, 800],
            "clf__depth": [6, 8],
            "clf__learning_rate": [0.05, 0.1],
            "clf__l2_leaf_reg": [3, 5],
        }
    }


## HPO-Pipeline

In [None]:
def plot_results(results_df, title="Modellevaluation", metric="F1"):
    """Zeigt die Ergebnisse als Balkendiagramm an."""
    plt.figure(figsize=(12, 6))
    results_df.sort_values(by=metric, ascending=False).plot(
        x="Model", y=metric, kind="bar", legend=False, color="skyblue"
    )
    plt.title(title)
    plt.xlabel("Modelle")
    plt.ylabel(metric)
    plt.xticks(rotation=45)
    plt.grid(axis='y')
    plt.tight_layout()
    plt.show()

In [None]:
def get_hpo_subset(X, y, n=8000):
    """Erzeugt ein stratifiziertes Subset für HPO."""
    if len(y) <= n:
        return X, y
    sss = StratifiedShuffleSplit(n_splits=1, train_size=n, random_state=RANDOM_STATE)
    idx, _ = next(sss.split(np.zeros(len(y)), y))
    if hasattr(X, "iloc"):  # DataFrame oder Series
        return X.iloc[idx], np.asarray(y)[idx]
    else:
        return X[idx], np.asarray(y)[idx]
    


In [None]:
#Pipelines
def pipe_rf_svd(max_features=20_000, n_comp=150):
    return Pipeline([
        ("tfidf", make_tfidf(max_features)),
        ("svd", TruncatedSVD(n_components=n_comp, random_state=RANDOM_STATE)),
        ("clf", RandomForestClassifier(n_jobs=-1, random_state=RANDOM_STATE))
    ], memory=memory)

def pipe_mlp_svd(max_features=20_000, n_comp=150):
    return Pipeline([
        ("tfidf", make_tfidf(max_features)),
        ("svd", TruncatedSVD(n_components=n_comp, random_state=RANDOM_STATE)),
        ("scaler", StandardScaler(with_mean=True)),
        ("clf", MLPClassifier(
            early_stopping=True, n_iter_no_change=8, max_iter=200,
            random_state=RANDOM_STATE
        ))
    ], memory=memory)

def pipe_xgb_sparse(max_features=20_000):
    return Pipeline([
        ("tfidf", make_tfidf(max_features)),
        ("clf", XGBClassifier(
            tree_method="hist",
            device=("cuda" if USE_CUDA else "cpu"),
            n_estimators=400,
            random_state=RANDOM_STATE,
            n_jobs=1  # Parallelisierung über CV, nicht im Estimator
        ))
    ], memory=memory)

def pipe_cat_sparse(max_features=20_000):
    return Pipeline([
        ("tfidf", make_tfidf(max_features)),
        ("clf", CatBoostClassifier(
            loss_function="Logloss",
            verbose=False,
            random_seed=RANDOM_STATE
        ))
    ], memory=memory)

In [None]:
# Gemeinsames CV-Objekt für faire Vergleiche
cv_shared = StratifiedKFold(n_splits=10, shuffle=True, random_state=RANDOM_STATE)

# Hilfsfunktion für Hyperparameter-Suche
def make_search(estimator, param_distributions, y_train, use_halving=True, cv=None):
    if cv is None:
        cv = StratifiedKFold(n_splits=10, shuffle=True, random_state=RANDOM_STATE)
    scorer = make_scorer(f1_score)
    if use_halving:
        n = len(y_train)
        min_res = max(200, int(0.15 * n))
        return HalvingRandomSearchCV(
            estimator=estimator,
            param_distributions=param_distributions,
            resource="n_samples",
            min_resources=min_res,
            max_resources="auto",
            factor=3,
            scoring=scorer,
            cv=cv,
            random_state=RANDOM_STATE,
            n_jobs=2,
            verbose=2
        )
    else:
        return RandomizedSearchCV(
            estimator=estimator,
            param_distributions=param_distributions,
            n_iter=10,
            scoring=scorer,
            cv=cv,
            random_state=RANDOM_STATE,
            n_jobs=2,
            verbose=2
        )

# Training + Suche + Refit + Threshold
def tune_one(name, X_train_txt, y_train, cv):
    if name == "RF":   pipe = pipe_rf_svd()
    elif name == "MLP": pipe = pipe_mlp_svd()
    elif name == "XGB": pipe = pipe_xgb_sparse()
    elif name == "CAT": pipe = pipe_cat_sparse()
    else: raise ValueError(name)

    # Teilmenge für HPO
    X_hpo, y_hpo = get_hpo_subset(X_train_txt, y_train, n=8000)
    space = grids(y_hpo)[name]
    search = make_search(pipe, space, y_hpo, use_halving=True, cv=cv)

    t0 = time.perf_counter()
    search.fit(X_hpo, y_hpo)
    dt = time.perf_counter() - t0
    print(f"[{name}] best F1 (CV): {search.best_score_:.4f} in {dt/60:.1f} min")
    print(f"[{name}] best params:", search.best_params_)

    best = clone(search.best_estimator_)
    best.fit(X_train_txt, y_train)
    return best, search.best_params_, search.best_score_, dt

# Threshold-Optimierung + Testauswertung
def eval_on_test(name, est, X_train_txt, y_train, X_test_txt, y_test, tune_threshold=True):
    thr = 0.5
    if tune_threshold and hasattr(est, "predict_proba"):
        X_tr2, X_val, y_tr2, y_val = train_test_split(X_train_txt, y_train, test_size=0.15, stratify=y_train, random_state=RANDOM_STATE)
        est_for_thr = est
        est_for_thr.fit(X_tr2, y_tr2)
        p = est_for_thr.predict_proba(X_val)[:, 1]
        grid = np.linspace(0.1, 0.9, 81)
        thr = float(grid[int(np.argmax([f1_score(y_val, (p >= t).astype(int)) for t in grid]))])

    y_pred = (est.predict_proba(X_test_txt)[:,1] >= thr).astype(int) if hasattr(est, "predict_proba") else est.predict(X_test_txt)
    plot_confusion_matrix(y_test, y_pred, labels=[0, 1], title=f"Confusion Matrix {name} (Thr={thr:.2f})")
    p, r, f1, _ = precision_recall_fscore_support(y_test, y_pred, average="binary", zero_division=0)
    tn, fp, fn, tp = confusion_matrix(y_test, y_pred).ravel()
    fpr = float(fp / (fp + tn)) if (fp + tn) else 0.0
    fnr = float(fn / (fn + tp)) if (fn + tp) else 0.0
    return dict(Model=name, Thr=thr, Precision=p, Recall=r, F1=f1, FPR=fpr, FNR=fnr)

# Ausführung mit übergebenem Split
def run_fast_from_split(X_train_txt, X_test_txt, y_train, y_test, ds_name, cv):
    results = []
    for short, label in [("RF", "RandomForest"), ("MLP", "MLP"), ("XGB", "XGBoost"), ("CAT", "CatBoost")]:
        print("\n" + "=" * 60)
        print(f"TUNING: {label} ({ds_name})")
        best_est, params, best_cv, dt = tune_one(short, X_train_txt, y_train, cv=cv)
        res = eval_on_test(f"{label}-{ds_name}", best_est, X_train_txt, y_train, X_test_txt, y_test)
        print("TEST:", res)
        results.append(res)
    return pd.DataFrame(results)

# Anwendung auf beide Datensätze
sql_res = run_fast_from_split(X_train_sql, X_test_sql, y_train_sql, y_test_sql, ds_name="SQL", cv=cv_shared)
xss_res = run_fast_from_split(X_train_xss, X_test_xss, y_train_xss, y_test_xss, ds_name="XSS", cv=cv_shared)

display(sql_res)
display(xss_res)
plot_results(sql_res, title="SQL Modellevaluation", metric="F1")
plot_results(xss_res, title="XSS Modellevaluation", metric="F1")

In [None]:
# Combine results and tag with dataset names
sql_res["Dataset"] = "SQL"
xss_res["Dataset"] = "XSS"

df_all = pd.concat([sql_res, xss_res], ignore_index=True)

# Anzeigen
display(df_all)

# LLM-Vergleich

In [None]:
#%pip show openai
#%pip install openai==0.28
#%pip install --upgrade openai
#%pip install python-dotenv
#%pip install mistralai tenacity
#%pip install mistralai 
#%pip show mistralai

In [None]:
import numpy as np
import pandas as pd

def _safe_div(num, den):
    return float(num/den) if den else 0.0

def compute_binary_metrics(y_true, y_pred):
    """Berechnet binäre Klassifikationsmetriken für positive Klasse (1 = malicious)."""
    y_true = np.asarray(y_true, dtype=int)
    y_pred = np.asarray(y_pred, dtype=int)

    TP = int(np.sum((y_true == 1) & (y_pred == 1)))
    TN = int(np.sum((y_true == 0) & (y_pred == 0)))
    FP = int(np.sum((y_true == 0) & (y_pred == 1)))
    FN = int(np.sum((y_true == 1) & (y_pred == 0)))

    precision = _safe_div(TP, TP + FP)
    recall    = _safe_div(TP, TP + FN)
    f1        = _safe_div(2*precision*recall, precision + recall) if (precision+recall) else 0.0
    fpr       = _safe_div(FP, FP + TN)
    specificity = _safe_div(TN, TN + FP)
    accuracy  = _safe_div(TP + TN, TP + TN + FP + FN)

    return {
        "N_valid": int(len(y_true)),
        "Support_pos(1)": int(np.sum(y_true == 1)),
        "Support_neg(0)": int(np.sum(y_true == 0)),
        "TP": TP, "FP": FP, "TN": TN, "FN": FN,
        "Accuracy": accuracy,
        "Precision": precision,
        "Recall": recall,
        "F1": f1,
        "FPR": fpr,
        "Specificity": specificity,
    }

def confusion_matrix_df(y_true, y_pred):
    """Erstellt eine Pandas-Confusion-Matrix."""
    y_true = np.asarray(y_true, dtype=int)
    y_pred = np.asarray(y_pred, dtype=int)
    cm = pd.crosstab(pd.Series(y_true, name="True"),
                     pd.Series(y_pred, name="Pred"),
                     dropna=False)
    for cls in [0,1]:
        if cls not in cm.index: cm.loc[cls] = 0
        if cls not in cm.columns: cm[cls] = 0
    return cm.sort_index().sort_index(axis=1)

def metrics_for_dataset(df: pd.DataFrame, name: str):
    """Wrapper: zieht y_true / y_pred aus df und berechnet Metriken + Confusion Matrix."""
    valid = df.dropna(subset=["Pred"]).copy()
    if valid.empty:
        return name, pd.DataFrame([{"Dataset": name, "N_valid": 0}]), pd.DataFrame()
    y_true = valid["True"].astype(int).values
    y_pred = valid["Pred"].astype(int).values
    m = compute_binary_metrics(y_true, y_pred)
    m["Dataset"] = name
    cm = confusion_matrix_df(y_true, y_pred)
    return name, pd.DataFrame([m]), cm


In [None]:

name_sql, metrics_sql_df, cm_sql = metrics_for_dataset(df_llm_sql, "SQL")
name_xss, metrics_xss_df, cm_xss = metrics_for_dataset(df_llm_xss, "XSS")

print("=== Metrics: SQL ===")
display(metrics_sql_df.style.format({
    "Accuracy":"{:.4f}", "Precision":"{:.4f}", "Recall":"{:.4f}", "F1":"{:.4f}",
    "FPR":"{:.4f}", "Specificity":"{:.4f}"
}))
print("Confusion Matrix (SQL):")
display(cm_sql)

print("\n=== Metrics: XSS ===")
display(metrics_xss_df.style.format({
    "Accuracy":"{:.4f}", "Precision":"{:.4f}", "Recall":"{:.4f}", "F1":"{:.4f}",
    "FPR":"{:.4f}", "Specificity":"{:.4f}"
}))
print("Confusion Matrix (XSS):")
display(cm_xss)
