# Test mit den Modellen einzeln

### Imports

In [None]:
import numpy as np
from sklearn.preprocessing import StandardScaler
import pandas as pd
from sklearn.cluster import DBSCAN
from sklearn.metrics import classification_report
from sklearn.ensemble import IsolationForest
from sklearn.svm import OneClassSVM
from matplotlib import pyplot as plt
import tensorflow as tf
import logging
from sklearn.preprocessing import LabelEncoder, OneHotEncoder
from sklearn.metrics import matthews_corrcoef
from sklearn.metrics import balanced_accuracy_score
import pandas as pd
import os
import sys
import logging
from sklearn.decomposition import PCA
from sklearn.ensemble import IsolationForest
from sklearn.svm import OneClassSVM
import matplotlib.pyplot as plt
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve, roc_auc_score, precision_recall_curve, average_precision_score

### Logging

In [None]:
logfile_two = open("output_log_isolated.txt", "w")

sys.stdout = logfile_two
sys.stderr = logfile_two

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    stream=logfile_two,
    force=True  # falls schon vorher etwas konfiguriert war
)

print("Das ist eine Print-Ausgabe.")
logging.info("Das ist eine Log-Nachricht.")

### Logs laden

In [None]:
data_train = pd.read_json("train_logs_isolated.json", lines=False)
data_test = pd.read_json("test_logs.json", lines=False)
data_val = pd.read_json("val_logs.json", lines=False)

In [None]:
X_train_full = pd.DataFrame(data_train)
X_test_full = pd.DataFrame(data_test)
X_val_full = pd.DataFrame(data_val)
y_test_full = data_test["label"]
y_val_full = data_val["label"]

### Daten in numerisch/kategorisch unterteilen

In [None]:
import numpy as np
from sklearn.preprocessing import LabelEncoder, OneHotEncoder

def is_missing(val):
    return val is None or (isinstance(val, float) and np.isnan(val))

def auto_encode_features(logs, one_hot_numeric=False, 
                         label_encoders=None, onehot_encoders=None, fit=True):
    if logs is None or len(logs) == 0:
        return [], {}, {}

    if hasattr(logs, "to_dict"):
        logs = logs.to_dict(orient='records')

    if label_encoders is None:
        label_encoders = {}
    if onehot_encoders is None:
        onehot_encoders = {}

    sample = logs[0]
    all_keys = sample.keys()
    encoded_logs = []

    for log in logs:
        encoded = {}
        for key in all_keys:
            val = log.get(key)

            # Fehlt etwas? → Speziell ersetzen
            if is_missing(val):
                val = "__MISSING__"

            # Numerisch?
            if isinstance(val, (int, float)) and not isinstance(val, bool):
                if val == "__MISSING__":
                    encoded[key] = -9999  # spezieller Platzhalter für fehlende Zahl
                elif one_hot_numeric:
                    if key not in onehot_encoders and fit:
                        values = np.array([[l.get(key) if not is_missing(l.get(key)) else -9999]
                                           for l in logs])
                        encoder = OneHotEncoder(sparse_output=False, handle_unknown='ignore')
                        encoder.fit(values)
                        onehot_encoders[key] = encoder

                    if key in onehot_encoders:
                        enc = onehot_encoders[key].transform([[val]])[0]
                        for i, v in enumerate(enc):
                            encoded[f"{key}_{i}"] = v
                else:
                    encoded[key] = val

            # String (kategorisch)
            elif isinstance(val, str):
                if key not in label_encoders and fit:
                    values = list(set(l.get(key) if not is_missing(l.get(key)) else "__MISSING__"
                                      for l in logs))
                    le = LabelEncoder()
                    le.fit(values)
                    label_encoders[key] = le

                if key in label_encoders:
                    le = label_encoders[key]
                    if val in le.classes_:
                        encoded[key] = le.transform([val])[0]
                    else:
                        encoded[key] = -1  # unbekannte Kategorie

            # Listen
            elif isinstance(val, list):
                for i, item in enumerate(val):
                    if is_missing(item):
                        item = "__MISSING__"

                    label = f"{key}_{i}"
                    if label not in label_encoders and fit:
                        values = list(set(itm if not is_missing(itm) else "__MISSING__"
                                          for l in logs for itm in l.get(key, [])))
                        le = LabelEncoder()
                        le.fit(values)
                        label_encoders[label] = le

                    if label in label_encoders:
                        le = label_encoders[label]
                        if item in le.classes_:
                            encoded[label] = le.transform([item])[0]
                        else:
                            encoded[label] = -1

        encoded_logs.append(encoded)

    return encoded_logs, label_encoders, onehot_encoders

### Zusammenfügen

In [None]:
def dicts_to_feature_matrix(encoded_logs):
    feature_names = sorted({key for d in encoded_logs for key in d.keys()})

    X = np.zeros((len(encoded_logs), len(feature_names)), dtype=np.float32)

    for i, d in enumerate(encoded_logs):
        for j, feat in enumerate(feature_names):
            if feat in d:
                X[i, j] = d[feat]

    return X, feature_names

### Daten unterteilen

In [None]:
logs_train = X_train_full.to_dict(orient='records')
logs_test = X_test_full.to_dict(orient='records')
logs_val = X_val_full.to_dict(orient='records')

encoded_logs, label_encoders, onehot_encoders = auto_encode_features(
    logs_train, one_hot_numeric=True, fit=True
)
X_train, feature_names = dicts_to_feature_matrix(encoded_logs)

encoded_test_logs, _, _ = auto_encode_features(
    logs_test, one_hot_numeric=True,
    label_encoders=label_encoders, onehot_encoders=onehot_encoders, fit=False
)
X_test, _ = dicts_to_feature_matrix(encoded_test_logs)

encoded_val_logs, _, _ = auto_encode_features(
    logs_val, one_hot_numeric=True,
    label_encoders=label_encoders, onehot_encoders=onehot_encoders, fit=False
)
X_val, _ = dicts_to_feature_matrix(encoded_val_logs)

### Skalieren

In [None]:
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
X_val_scaled = scaler.transform(X_val)

### Test mit IF

In [None]:
iforest = IsolationForest()
iforest.fit(X_train_scaled)

test_scores = iforest.decision_function(X_test_scaled)
test_preds = iforest.predict(X_test_scaled)
y_test_pred = (test_preds == -1).astype(int)
logging.info("Test Classification Report:\n" + classification_report(y_test_full, y_test_pred))

### Test IF MCC

In [None]:
mcc = matthews_corrcoef(y_test_full, y_test_pred)
logging.info("Matthews Correlation Coefficient: %f", mcc)

### Test IF Balanced Accuracy

In [None]:
balanced_acc = balanced_accuracy_score(y_test_full, y_test_pred)
logging.info("Balanced Accuracy: %f", balanced_acc)

### Test IF AUC-Kurven

In [None]:
# Anomaly Scores invertieren (höher = anomal)
anomaly_scores = -test_scores

# ROC-Kurve und AUC
fpr, tpr, _ = roc_curve(y_test_full, anomaly_scores)
roc_auc = roc_auc_score(y_test_full, anomaly_scores)

# Precision-Recall-Kurve und Average Precision (PR AUC)
precision, recall, _ = precision_recall_curve(y_test_full, anomaly_scores)
pr_auc = average_precision_score(y_test_full, anomaly_scores)

# Plot ROC Curve
plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
plt.plot(fpr, tpr, label=f"AUC = {roc_auc:.3f}")
plt.plot([0, 1], [0, 1], linestyle="--", color="gray")
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("ROC Curve")
plt.legend()
plt.tight_layout()
plt.show()

# Plot Precision-Recall Curve
plt.subplot(1, 2, 2)
plt.plot(recall, precision, label=f"AP = {pr_auc:.3f}")
plt.xlabel("Recall")
plt.ylabel("Precision")
plt.title("Precision-Recall Curve")
plt.legend()
plt.tight_layout()
plt.show()

### Test DBSCAN 

In [None]:
dbscan = DBSCAN(eps = 0.05, min_samples = 40)
dbscan_labels_test = dbscan.fit_predict(X_test_scaled)
dbscan_anomaly_test = (dbscan_labels_test == -1).astype(int)
logging.info("Test Classification Report:\n" + classification_report(y_test_full, dbscan_anomaly_test, zero_division=0))

### Test DBSCAN MCC

In [None]:
mcc = matthews_corrcoef(y_test_full, dbscan_anomaly_test)
logging.info("Matthews Correlation Coefficient: %f", mcc)

### Test DBSCAN Balanced Accuracy

In [None]:
balanced_acc = balanced_accuracy_score(y_test_full, dbscan_anomaly_test)
logging.info("Balanced Accuracy: %f", balanced_acc)

### Test DBSCAN AUC-Kurven

In [None]:
# Dummy-Scores: Anomalie = 1.0, Normal = 0.0 (nicht ideal!)
anomaly_scores = (dbscan_labels_test == -1).astype(float)

fpr, tpr, _ = roc_curve(y_test_full, anomaly_scores)
roc_auc = roc_auc_score(y_test_full, anomaly_scores)

precision, recall, _ = precision_recall_curve(y_test_full, anomaly_scores)
pr_auc = average_precision_score(y_test_full, anomaly_scores)

# Plotten
plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
plt.plot(fpr, tpr, label=f"AUC = {roc_auc:.3f}")
plt.plot([0, 1], [0, 1], linestyle="--", color="gray")
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("ROC Curve")
plt.legend()
plt.tight_layout()
plt.show()

plt.subplot(1, 2, 2)
plt.plot(recall, precision, label=f"AP = {pr_auc:.3f}")
plt.xlabel("Recall")
plt.ylabel("Precision")
plt.title("Precision-Recall Curve")
plt.legend()
plt.tight_layout()
plt.show()

### Test OCSVM

In [None]:
ocsvm = OneClassSVM(nu=0.005, gamma=50)
ocsvm.fit(X_train_scaled)

y_pred_test = ocsvm.predict(X_test_scaled)
anomaly_test = (y_pred_test == -1).astype(int)
logging.info("Test Report:\n" + classification_report(y_test_full, anomaly_test))

### OCSVM MCC

In [None]:
mcc = matthews_corrcoef(y_test_full, anomaly_test)
logging.info("Matthews Correlation Coefficient: %f", mcc)

### OCSVM Balanced Accuracy

In [None]:
balanced_acc = balanced_accuracy_score(y_test_full, anomaly_test)
logging.info("Balanced Accuracy: %f", balanced_acc)

### OCSVM AUC-Kurven

In [None]:
# Score: Decision Function (negiert für "mehr Anomalie = höherer Score")
scores = -ocsvm.decision_function(X_test_scaled)

# ROC
fpr, tpr, _ = roc_curve(y_test_full, scores)
roc_auc = roc_auc_score(y_test_full, scores)

# Precision-Recall
precision, recall, _ = precision_recall_curve(y_test_full, scores)
pr_auc = average_precision_score(y_test_full, scores)

# Plotten
plt.figure(figsize=(12, 5))

# ROC
plt.subplot(1, 2, 1)
plt.plot(fpr, tpr, label=f"AUC = {roc_auc:.3f}")
plt.plot([0, 1], [0, 1], linestyle="--", color="gray")
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("ROC Curve")
plt.legend()
plt.tight_layout()
plt.show()

# PR
plt.subplot(1, 2, 2)
plt.plot(recall, precision, label=f"AP = {pr_auc:.3f}")
plt.xlabel("Recall")
plt.ylabel("Precision")
plt.title("Precision-Recall Curve")
plt.legend()
plt.tight_layout()
plt.show()