# Code zu der Bachelorarbeit:
# "Comparitve Study von Machine Learning Modellen zur Erkennung von Web Schwachstellen"
## von Nils Pudenz, 2735230

# Importe

In [1]:

#%pip install kaggle scikit-learn xgboost catboost tabpfn pandas numpy matplotlib seaborn -q
#%pip install --quiet scikit-learn xgboost catboost tabpfn chardet
#%pip install -U scikit-learn
## in deiner (Conda/venv) Umgebung
#%pip install --upgrade "torch==2.*" --index-url https://download.pytorch.org/whl/cu121
#%pip install --upgrade xgboost catboost scikit-learn pandas scipy tabpfn




In [22]:
import os
import zipfile
import random
from pathlib import Path
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os, joblib
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.metrics import (precision_score, recall_score, f1_score,
                             confusion_matrix)
from sklearn.preprocessing import StandardScaler
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.pipeline import make_pipeline
from sklearn.decomposition import TruncatedSVD
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.neural_network import MLPClassifier
from xgboost import XGBClassifier
from catboost import CatBoostClassifier, Pool
from tabpfn import TabPFNClassifier
from sklearn.model_selection import RandomizedSearchCV
from sklearn.utils import resample
from sklearn.model_selection import StratifiedShuffleSplit




In [3]:
#Deterministische Ausgabe generieren, um die Reproduzierbarkeit zu gewährleisten
RANDOM_STATE = 42
np.random.seed(RANDOM_STATE)
random.seed(RANDOM_STATE)

## Dowload Kaggle Datasets
Requires Kaggle API credentials ('~/.kaggle/kaggle.json') für API-Token, um zugriff auf die Datenbanken über das Kaggle Konto zu bekommen

In [4]:
DATA_DIR = Path("data")
DATA_DIR.mkdir(exist_ok=True)

In [5]:
#Dowload der Datasets von Kaggle
os.system("kaggle datasets download -d syedsaqlainhussain/sql-injection-dataset -p data --unzip --quiet")
os.system("kaggle datasets download -d syedsaqlainhussain/cross-site-scripting-xss-dataset-for-deep-learning -p data --unzip --quiet")
#KAGGLE_DATASETS = { #gleich wie oben nur renaming auf sql & xss
#    "sql": "syedsaqlainhussain/sql-injection-dataset",
#    "xss": "syedsaqlainhussain/cross-site-scripting-xss-dataset-for-deep-learning"
#}

0

## Load and Inspect Data

In [6]:
SQL_CSV = next(DATA_DIR.glob("**/sql*/*.csv"), None) or next(DATA_DIR.glob("**/*SQL*.csv"), None)
XSS_CSV = next(DATA_DIR.glob("**/xss*/*.csv"), None) or next(DATA_DIR.glob("**/*XSS*.csv"), None)


In [7]:
#csv to dataframe
sql_df = pd.read_csv(SQL_CSV, encoding="utf-16", sep=",", low_memory=False) #utf-8 Fehler
xss_df = pd.read_csv(XSS_CSV)
##

In [8]:
for name, df in {"SQL": sql_df, "XSS": xss_df}.items():
    print(f"{name} dataset shape: {df.shape}")
    display(df.head())

SQL dataset shape: (4200, 2)


Unnamed: 0,Sentence,Label
0,a,1
1,a',1
2,a' --,1
3,a' or 1 = 1; --,1
4,@,1


XSS dataset shape: (13686, 3)


Unnamed: 0.1,Unnamed: 0,Sentence,Label
0,0,"<li><a href=""/wiki/File:Socrates.png"" class=""i...",0
1,1,"<tt onmouseover=""alert(1)"">test</tt>",1
2,2,"\t </span> <span class=""reference-text"">Steeri...",0
3,3,"\t </span> <span class=""reference-text""><cite ...",0
4,4,"\t </span>. <a href=""/wiki/Digital_object_iden...",0


## Basic Cleaning
* Drop Duplicate rows
* Handle missing values (simple fill-na)

In [9]:
for df in (sql_df, xss_df):
    df.drop_duplicates(inplace=True)
    df.fillna(0, inplace=True)

In [10]:
def preprocess_xy(df: pd.DataFrame):
    # Zielspalte finden und Features extrahieren
    #Split in Features (roher Text) und Label-Vector.
    target_col = next(c for c in sql_df.columns if c.lower() in {"label", "class", "target"})
    X_raw = df.drop(columns=[target_col]).astype(str).agg(" ".join, axis=1)
    y = df[target_col].values
    print("Target column assumed:", target_col)
    FEATURES = [c for c in sql_df.columns if c != target_col]
    return X_raw, y, target_col

    

In [11]:
vectorizer = TfidfVectorizer(
    analyzer="char", ngram_range=(3,5), min_df=2, max_features=50000
)

## Splitting & Measure-Metrics

In [12]:
def split(df):
    X = df[FEATURES].values
    y = df[target_col].astype(int).values
    return train_test_split(X, y, test_size=0.2, stratify=y, random_state=RANDOM_STATE)

Erst den Datensatz splitten, um Data Leakage vorzubeugen, Wujek et al. (2016)

In [13]:
def evaluate_model(model, X_test, y_test, name):

    """Evaluiert das Modell und gibt ein dic mit den Metriken zurück."""
    y_pred = model.predict(X_test)
    p = precision_score(y_test, y_pred)
    r = recall_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred)
    tn, fp, fn, tp = confusion_matrix(y_test, y_pred).ravel()
    fpr = fp / (fp + tn)
    fnr = fn / (fn + tp)
    return dict(Model=name, Precision=p, Recall=r, F1=f1, FPR=fpr, FNR=fnr)

Dictionary für die Evaulierungsmetriken

## Modeldefinition

In [14]:
'''# Parallelisierung der Modelle für bessere Laufzeit

# scikit-learn / joblib
os.environ["OMP_NUM_THREADS"] = "8"         # OpenMP
os.environ["OPENBLAS_NUM_THREADS"] = "8"    # NumPy / SciPy
joblib.parallel_backend("loky", n_jobs=-1)  # überall -1 = alle Kerne
'''

'# Parallelisierung der Modelle für bessere Laufzeit\n\n# scikit-learn / joblib\nos.environ["OMP_NUM_THREADS"] = "8"         # OpenMP\nos.environ["OPENBLAS_NUM_THREADS"] = "8"    # NumPy / SciPy\njoblib.parallel_backend("loky", n_jobs=-1)  # überall -1 = alle Kerne\n'

In [15]:
'''
xgb = XGBClassifier(
    tree_method="hist",        # CPU-Optimierung
    n_estimators=400,
    max_depth=6,
    learning_rate=0.1,
    subsample=0.9,
    colsample_bytree=0.8,
    n_jobs=-1,
    random_state=42
)

from catboost import CatBoostClassifier
cat = CatBoostClassifier(
    iterations=400,
    depth=8,
    learning_rate=0.1,
    random_seed=42,
    loss_function="Logloss",
    task_type="CPU",
    thread_count=8,
    od_type="Iter",            # early stopping
    od_wait=30,
    verbose=False
)
'''

'\nxgb = XGBClassifier(\n    tree_method="hist",        # CPU-Optimierung\n    n_estimators=400,\n    max_depth=6,\n    learning_rate=0.1,\n    subsample=0.9,\n    colsample_bytree=0.8,\n    n_jobs=-1,\n    random_state=42\n)\n\nfrom catboost import CatBoostClassifier\ncat = CatBoostClassifier(\n    iterations=400,\n    depth=8,\n    learning_rate=0.1,\n    random_seed=42,\n    loss_function="Logloss",\n    task_type="CPU",\n    thread_count=8,\n    od_type="Iter",            # early stopping\n    od_wait=30,\n    verbose=False\n)\n'

In [16]:
#Speicher optimieren
'''
vec = TfidfVectorizer(
    ngram_range=(3,5),
    max_features=50_000,
    sublinear_tf=True,
    lowercase=False
).fit(X_train_raw)           # nur einmal fitten
'''

'\nvec = TfidfVectorizer(\n    ngram_range=(3,5),\n    max_features=50_000,\n    sublinear_tf=True,\n    lowercase=False\n).fit(X_train_raw)           # nur einmal fitten\n'

In [17]:
#kleine Hyperparamsuche statt Grid-Overkill
'''
search_space = {
    "max_depth": [4, 6, 8],
    "learning_rate": [0.05, 0.1, 0.2],
    "n_estimators": [200, 400, 600]
}
randcv = RandomizedSearchCV(
    xgb,
    search_space,
    n_iter=10,            # statt 3×3×3 = 27
    scoring="f1",
    cv=3,
    n_jobs=-1
)
randcv.fit(X_train_vec, y_train)
'''

'\nsearch_space = {\n    "max_depth": [4, 6, 8],\n    "learning_rate": [0.05, 0.1, 0.2],\n    "n_estimators": [200, 400, 600]\n}\nrandcv = RandomizedSearchCV(\n    xgb,\n    search_space,\n    n_iter=10,            # statt 3×3×3 = 27\n    scoring="f1",\n    cv=3,\n    n_jobs=-1\n)\nrandcv.fit(X_train_vec, y_train)\n'

In [18]:
models = {

"RandomForest": RandomForestClassifier(n_estimators=300, max_depth=None, n_jobs=-1, random_state=RANDOM_STATE),
"MLP": MLPClassifier(hidden_layer_sizes=(512, 256), activation="relu", alpha= 1e-4, learning_rate_init=1e-3, early_stopping=True, random_state=RANDOM_STATE, max_iter=30),
"XGBoost": XGBClassifier(n_estimators=500, max_depth=10, learning_rate=0.1, subsample=0.8, colsample_bytree=0.8, objective="binary:logistic", eval_metric="logloss", tree_method="hist", random_state=RANDOM_STATE, n_jobs=1),
"CatBoost": CatBoostClassifier(iterations=400, depth=8, learning_rate=0.1, loss_function="Logloss", random_seed=RANDOM_STATE, verbose=False),
"TabPFN": TabPFNClassifier(device="cpu")# , ignore_pretraining_limits=True) # ingorieren der 500/10 000-Grenzen -> führt zu Absturz

#-----GPU--------

#'''XXXX---- 
#'''
}

## Training & Evaluation

In [19]:
results = []
for df, ds_name in [(sql_df, "SQL"), (xss_df, "XSS")]:
    X_raw, y, target_col = preprocess_xy(df) 
    X_train, X_test, y_train, y_test = train_test_split(
        X_raw,
        y,
        test_size=0.2,
        stratify=y,
        random_state=RANDOM_STATE
    )

Target column assumed: Label
Target column assumed: Label


In [20]:
#Gemeinsmaer TF-IDF-Vectorizer (fit nur auf Train, wegen Oversampling)
vec = vectorizer.fit(X_train)
X_train_vec = vec.transform(X_train)
X_test_vec = vec.transform(X_test)

In [None]:




for name, model in models.items():
    '''if name in {"CatBoost", "TabPFN"}:
        # Dichte der Matrix erforderlich
        X_tr, X_te = X_train_vec.toarray(), X_test_vec.toarray()'''
    if name == "CatBoost":
        # CatBoost benötigt Pool-Objekte, Sparse-Pool speicherschondender statt .toarray()
        # CatBoost kann auch Sparse-Matrizen verarbeiten, aber Pool ist effizienter
        train_pool = Pool(X_train_vec, y_train)
        #test_pool = Pool(X_test_vec, y_test) nicht nötig, da CatBoost selbst Testdaten verarbeitet
        model.fit(train_pool)
        res = evaluate_model(model, X_test_vec, y_test, f"{name}-{ds_name}")
        
    elif name in {"TabPFN"}:
        # Subsampling, stratifiziert auf 10.000 Trainingszeilen reduziert
        MAX_SAMPLES = 10_000
        if X_train_vec.shape[0] > MAX_SAMPLES:
            # Resampling, um die Trainingsdaten auf 10.000 zu reduzieren
            sss= StratifiedShuffleSplit(n_splits=1, test_size=MAX_SAMPLES, random_state=RANDOM_STATE)
            idx_sub, _ = next(sss.split(X_train_vec, y_train))
            X_train_vec_tabpfn = X_train_vec[idx_sub]
            y_train_tabpfn = y_train[idx_sub]
        else:
            X_train_vec_tabpfn = X_train_vec
            y_train_tabpfn = y_train

        # Truncated SVD reduziert TF-IDF-Matrix auf 300 komponenten
        svd = TruncatedSVD(n_components=300, random_state=RANDOM_STATE)
        X_train_svd = svd.fit_transform(X_train_vec_tabpfn) #fit nur auf dem Subset
        X_test_svd = svd.transform(X_test_vec)
        model.fit(X_train_svd, y_train_tabpfn)  # TabPFN benötigt dichte Matrizen
        res = evaluate_model(model, X_test_svd, y_test, f"{name}-{ds_name}")

    else:
        X_tr, X_te = X_train_vec, X_test_vec
        
        #Zwei Argumente: Features + Train-Labels    
        model.fit(X_train_vec, y_train) #dense oder sparse Matrix, je nach Modell
        res = evaluate_model(model, X_test_vec, y_test, f"{name}-{ds_name}")

    
    results.append(res)
    print(res)

{'Model': 'RandomForest-XSS', 'Precision': 1.0, 'Recall': 0.9952542372881356, 'F1': 0.9976214746856948, 'FPR': np.float64(0.0), 'FNR': np.float64(0.0047457627118644066)}
{'Model': 'MLP-XSS', 'Precision': 1.0, 'Recall': 1.0, 'F1': 1.0, 'FPR': np.float64(0.0), 'FNR': np.float64(0.0)}
{'Model': 'XGBoost-XSS', 'Precision': 1.0, 'Recall': 0.9938983050847457, 'F1': 0.9969398163889833, 'FPR': np.float64(0.0), 'FNR': np.float64(0.006101694915254237)}
{'Model': 'CatBoost-XSS', 'Precision': 1.0, 'Recall': 0.9945762711864407, 'F1': 0.9972807613868117, 'FPR': np.float64(0.0), 'FNR': np.float64(0.005423728813559322)}


NameError: name 'StratifiedShuffleSplit' is not defined

## Ergebnistabelle

In [None]:
results_df = pd-DataFrame(results)
results_df.sort_values(["Model"], inplace=True)
print("\n Gesamt Ergebnisse: \n", results_df)

results_df.to_csv("results.csv", index=False)

In [None]:

'''
# Variante A – wenn df_results bereits eine 'Dataset'-Spalte hat
if "Dataset" in df_results.columns:
    df_sql = df_results[df_results["Dataset"] == "SQL"].copy()
    df_xss = df_results[df_results["Dataset"] == "XSS"].copy()
# Variante B – Spalte fehlt -> am Modell-Namen aufteilen
else:
    df_sql = df_results[df_results["Model"].str.contains("-SQL")].copy()
    df_xss = df_results[df_results["Model"].str.contains("-XSS")].copy()

# ------------------------------------------------------------
# 2) zwei CSV-Dateien 
# ------------------------------------------------------------
df_sql.to_csv("results_sql.csv", index=False, sep=";")
df_xss.to_csv("results_xss.csv", index=False, sep=";")

# ------------------------------------------------------------
# Excel-Workbook mit zwei Sheets
# ------------------------------------------------------------
with pd.ExcelWriter("results_by_dataset.xlsx") as writer:
    df_sql.to_excel(writer, sheet_name="SQL", index=False)
    df_xss.to_excel(writer, sheet_name="XSS", index=False)

print( "Dateien wurden exportiert: results_sql.csv, results_xss.csv, results_by_dataset.xlsx")
'''

## Hyperparameter-Tuning

## K-Fold-Cross Validation