In [None]:
# anomaly_eval_explain.py
import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report, confusion_matrix
from scipy.stats import zscore

FEATURES_CSV = "features_stress.csv"
OUT_RESULTS = "anomaly_results_stress.csv"
RANDOM_STATE = 42
CONTAMINATION = 0.02   # set to expected anomaly fraction used in generator
TOP_K = 3

def load_features(path):
    df = pd.read_csv(path, index_col=0)
    y_true = df["is_attack"] if "is_attack" in df.columns else None
    X = df.drop(columns=["is_attack"]) if "is_attack" in df.columns else df
    return X, y_true

def train_iforest(X, contamination=0.02):
    scaler = StandardScaler()
    Xs = scaler.fit_transform(X)
    model = IsolationForest(n_estimators=200, contamination=contamination, random_state=RANDOM_STATE)
    model.fit(Xs)
    return model, scaler, Xs

def get_scores_preds(model, scaler, X):
    Xs = scaler.transform(X)
    scores = model.decision_function(Xs)   # higher => more normal
    preds = model.predict(Xs)              # -1 anomaly, 1 normal
    y_pred = np.where(preds == -1, 1, 0)
    return scores, y_pred, Xs

def ablation_contrib(model, scaler, X, Xs):
    feature_names = list(X.columns)
    original_scores = model.decision_function(Xs)
    medians = X.median(axis=0).values
    contrib = np.zeros((X.shape[0], X.shape[1]))
    for j in range(len(feature_names)):
        X_mask = X.copy().values
        X_mask[:, j] = medians[j]
        X_mask_s = scaler.transform(X_mask)
        masked_scores = model.decision_function(X_mask_s)
        contrib[:, j] = masked_scores - original_scores
    return original_scores, contrib

def run():
    X, y_true = load_features(FEATURES_CSV)
    feature_names = list(X.columns)
    print("Features:", feature_names)

    model, scaler, Xs = train_iforest(X, contamination=CONTAMINATION)
    scores, y_pred, Xs = get_scores_preds(model, scaler, X)
    orig_scores, contribs = ablation_contrib(model, scaler, X, Xs)

    # build results frame
    df = X.copy()
    df["anomaly_score"] = orig_scores
    df["predicted_attack"] = y_pred
    if y_true is not None:
        df["true_attack"] = y_true.astype(int)
    # compute z-scores per feature (for explanation)
    z = pd.DataFrame(zscore(X.fillna(0), nan_policy="omit"), columns=feature_names, index=X.index)

    # top contributing features for each user
    rows = []
    for i, user in enumerate(X.index):
        row = {}
        row["user"] = user
        row["anomaly_score"] = float(orig_scores[i])
        row["predicted_attack"] = int(y_pred[i])
        if y_true is not None:
            row["true_attack"] = int(y_true.loc[user])
        # construct sorted list of (feature, contribution, z)
        feats = []
        for j, fname in enumerate(feature_names):
            feats.append((fname, float(contribs[i, j]), float(z.loc[user, fname])))
        feats_sorted = sorted(feats, key=lambda x: x[1], reverse=True)
        top_feats = feats_sorted[:TOP_K]
        row["top_features"] = "; ".join([f"{f[0]}({f[1]:.4f}, z={f[2]:.2f})" for f in top_feats])
        # textual short explanation
        expl_lines = []
        for f, c, zval in top_feats:
            tag = ""
            if abs(zval) >= 2.0:
                tag = "strong"
            elif abs(zval) >= 1.0:
                tag = "moderate"
            expl_lines.append(f"{f}: contrib={c:.4f}, z={zval:.2f} {tag}".strip())
        row["explanation"] = " | ".join(expl_lines)
        rows.append(row)

    res = pd.DataFrame(rows).set_index("user")
    # join with df
    out = df.join(res[["anomaly_score","predicted_attack","true_attack","top_features","explanation"]], how="left", rsuffix="_expl")
    out.to_csv(OUT_RESULTS)
    print("Wrote results to", OUT_RESULTS)

    # evaluation
    if "true_attack" in out.columns:
        print("\nClassification report (predicted vs true):")
        print(classification_report(out["true_attack"], out["predicted_attack"], zero_division=0))
        print("\nConfusion matrix (rows=true, cols=pred):")
        print(confusion_matrix(out["true_attack"], out["predicted_attack"]))
    # show most anomalous rows (lowest anomaly_score)
    print("\nTop 10 most anomalous (lowest anomaly_score):")
    display = out.sort_values("anomaly_score").head(10)
    pd.set_option("display.max_colwidth", 200)
    print(display[["anomaly_score","predicted_attack","true_attack","top_features","explanation"]])

if __name__ == "__main__":
    run()
