<a href="https://colab.research.google.com/github/ZeusGav/aml_xai_ba_test/blob/main/aml_xai_ba_test.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# AML XAI Pipeline (Colab)

Minimal, end-to-end pipeline to test SHAP-guided perturbations on RandomForest and LightGBM.

#GitHub connection

Purpose: clone & authenticate to your GitHub repo using a secret token stored in Colab.

In [25]:
from google.colab import userdata
import os, shutil, subprocess

# --- EDIT THESE ---
REPO_URL = "https://github.com/ZeusGav/aml_xai_ba_test.git"
GITHUB_EMAIL = "gavras.alexios@gmail.com"
GITHUB_USERNAME = "ZeusGav"
# ------------------

REPO_NAME = REPO_URL.split('/')[-1].replace(".git", "")
REPO_DIR = f"/content/{REPO_NAME}"

# Pull your personal access token securely from Colab Secrets
GITHUB_TOKEN = userdata.get("GITHUB_TOKEN")
assert GITHUB_TOKEN, "❌ Missing GITHUB_TOKEN secret. Add it in Colab (Runtime → Secrets → Add secret)."

# Configure Git identity
_ = subprocess.run(["git", "config", "--global", "user.email", GITHUB_EMAIL], check=False)
_ = subprocess.run(["git", "config", "--global", "user.name", GITHUB_USERNAME], check=False)

# Clone if needed; otherwise pull
if not os.path.exists(REPO_DIR):
    print(f"Cloning into {REPO_DIR} ...")
    # Use token in the URL for authenticated clone
    authed_url = REPO_URL.replace("https://", f"https://{GITHUB_USERNAME}:{GITHUB_TOKEN}@")
    _ = subprocess.run(["git", "clone", authed_url, REPO_DIR], check=True)
else:
    print(f"Repo exists at {REPO_DIR}. Pulling latest changes...")
    _ = subprocess.run(["git", "-C", REPO_DIR, "pull"], check=False)

# Change working directory to repo
os.chdir(REPO_DIR)
print("✅ GitHub repo ready and authenticated at:", REPO_DIR)

# Ensure outputs dir exists and is gitignored
os.makedirs("notebooks_outputs", exist_ok=True)
gitignore_path = os.path.join(REPO_DIR, ".gitignore")
try:
    if os.path.exists(gitignore_path):
        with open(gitignore_path, "r", encoding="utf-8") as f:
            gi = f.read()
    else:
        gi = ""
    needed = ["notebooks_outputs/", "data/", "*.parquet", "*.csv", "*.pkl", "*.pickle"]
    updated = False
    for pat in needed:
        if pat not in gi:
            gi += ("" if gi.endswith("\n") else "\n") + pat + "\n"
            updated = True
    if updated:
        with open(gitignore_path, "w", encoding="utf-8") as f:
            f.write(gi)
        print("Updated .gitignore with output/data patterns.")
    else:
        print(".gitignore already contains output/data patterns.")
except Exception as e:
    print("Note: could not update .gitignore automatically:", e)


Repo exists at /content/aml_xai_ba_test. Pulling latest changes...
✅ GitHub repo ready and authenticated at: /content/aml_xai_ba_test
.gitignore already contains output/data patterns.


#Setup & installs

In [5]:
!pip -q install lightgbm shap scikit-learn numpy pandas matplotlib

import os, sys, random, json, time, math
import numpy as np
import pandas as pd
from pathlib import Path

# Reproducibility
SEED = 42
np.random.seed(SEED)
random.seed(SEED)

# Paths (inside the repo directory)
OUTPUT_DIR = Path("notebooks_outputs")
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

print("Setup complete. OUTPUT_DIR =", OUTPUT_DIR.resolve())

Setup complete. OUTPUT_DIR = /content/aml_xai_ba_test/notebooks_outputs


# Imports (ML/SHAP) & helpers

In [7]:
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score, confusion_matrix
from sklearn.ensemble import RandomForestClassifier

import lightgbm as lgb
import shap
import warnings
warnings.filterwarnings("ignore")

def set_all_seeds(seed: int = 42):
    np.random.seed(seed)
    random.seed(seed)

def metrics_full(y_true, y_score, threshold=0.5):
    """Return AUC, recall for positive class (fraud=1), and FPR at a fixed threshold."""
    auc = roc_auc_score(y_true, y_score)
    y_pred = (y_score >= threshold).astype(int)
    tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
    recall_pos = tp / (tp + fn + 1e-12)
    fpr = fp / (fp + tn + 1e-12)
    return {"AUC": auc, "Recall_fraud": recall_pos, "FPR": fpr}

def bootstrap_delta_auc_pvalue(y_true, scores_a, scores_b, n_boot=1000, seed=42):
    """Bootstrap test for ΔAUC significance (A vs B)."""
    rng = np.random.default_rng(seed)
    n = len(y_true)
    auc_diffs = []
    for _ in range(n_boot):
        idx = rng.integers(0, n, size=n)
        auc_a = roc_auc_score(y_true[idx], scores_a[idx])
        auc_b = roc_auc_score(y_true[idx], scores_b[idx])
        auc_diffs.append(auc_b - auc_a)
    auc_diffs = np.array(auc_diffs)
    p = 2 * min((auc_diffs <= 0).mean(), (auc_diffs >= 0).mean())
    return float(p)

# Synthetic dataset with tags

In [9]:
def make_synthetic_aml(n_samples=6000, n_features=12, frac_fraud=0.12, seed=42):
    """Create a toy AML-like dataset with 'behavioral' and 'identity' tags."""
    set_all_seeds(seed)
    X, y = make_classification(
        n_samples=n_samples,
        n_features=n_features,
        n_informative=6,
        n_redundant=2,
        n_repeated=0,
        n_classes=2,
        weights=[1-frac_fraud, frac_fraud],
        flip_y=0.01,
        class_sep=1.2,
        random_state=seed,
    )
    df = pd.DataFrame(X, columns=[f"f{i}" for i in range(n_features)])
    df["label"] = y

    identity = ["f0", "f1", "f2"]  # identity-like (not perturbed)
    behavioral = [c for c in df.columns if c.startswith("f") and c not in identity]

    # Make first two behavioral features integer-like (counts)
    for c in behavioral[:2]:
        df[c] = np.clip((df[c] * 3 + 5), 0, None)
        df[c] = np.round(df[c]).astype(int)

    tags = {"behavioral": behavioral, "identity": identity}
    return df, tags

df, TAGS = make_synthetic_aml()
X = df.drop(columns=["label"])
y = df["label"].values

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.3, stratify=y, random_state=42
)

print("Synthetic dataset ready.")
print("Behavioral features:", TAGS["behavioral"][:5], "...")
print("Identity features:", TAGS["identity"])
print("Fraud rate (test):", y_test.mean())

Synthetic dataset ready.
Behavioral features: ['f3', 'f4', 'f5', 'f6', 'f7'] ...
Identity features: ['f0', 'f1', 'f2']
Fraud rate (test): 0.12277777777777778


#Train RandomForest and compute SHAP

In [14]:
def train_rf(X_tr, y_tr, seed=42):
    clf = RandomForestClassifier(
        n_estimators=300,
        max_depth=None,
        min_samples_split=2,
        min_samples_leaf=1,
        random_state=seed,
        n_jobs=-1
    )
    clf.fit(X_tr, y_tr)
    return clf

def shap_values_tree_model(model, X_background, X_eval):
    """TreeExplainer for tree models (RF/LightGBM).
    Works with ndarray or Explanation; returns (exp, mean|SHAP| for positive class)."""
    import shap, numpy as np
    explainer = shap.TreeExplainer(model, feature_names=list(X_eval.columns))
    out = explainer(X_eval)                           # Explanation or list[Explanation]
    exp = out[1] if isinstance(out, list) else out    # if list -> class 1; else Explanation
    vals = exp.values if hasattr(exp, "values") else exp
    vals = np.asarray(vals, dtype=float)

    # If SHAP returned per-class at last axis, select positive class (index 1)
    if vals.ndim == 3 and vals.shape[-1] == 2:
        vals = vals[:, :, 1]                          # (n_samples, n_features)
    elif vals.ndim != 2:
        # Fallback: collapse any extra axes except features
        vals = np.reshape(vals, (vals.shape[0], -1))  # keep samples, flatten rest

    mean_abs = np.mean(np.abs(vals), axis=0).astype(float)  # (n_features,)
    return exp, mean_abs


rf_shap, rf_meanabs = shap_values_tree_model(rf, X_train, X_test)
imp = pd.Series(rf_meanabs, index=X_test.columns).sort_values(ascending=False).head(5)
print("RF mean|SHAP| (top5):")
for name, val in imp.items():
    print(f"  {name}: {val:.4f}")

RF mean|SHAP| (top5):
  f3: 0.0956
  f7: 0.0410
  f0: 0.0365
  f4: 0.0315
  f10: 0.0157


# Added: Perturbation utilities

In [18]:
# Targeted fraud-only 1-feature perturbation and bounds
def empirical_bounds(X, q_low=0.01, q_high=0.99):
    low = X.quantile(q_low)
    high = X.quantile(q_high)
    return low, high

def perturb_1feature_targeted(
    X_df, y_true, feature, eps=0.25, mask_positive_only=True, integer_safe=False, seed=42
):
    import numpy as np
    import pandas as pd
    np.random.seed(seed)
    X = X_df.copy()
    mask = (y_true == 1) if mask_positive_only else np.ones_like(y_true, dtype=bool)
    std = X[feature].std(ddof=0)
    delta = eps * std
    X.loc[mask, feature] = X.loc[mask, feature] + delta

    lo, hi = empirical_bounds(X)
    X[feature] = np.clip(X[feature], lo[feature], hi[feature])

    if integer_safe:
        X[feature] = np.round(X[feature]).astype(int)

    diff = (X[feature] - X_df[feature]).values
    l2 = float(np.sqrt((diff**2).sum()))
    linf = float(np.abs(diff).max())
    l2_attacked = float(np.sqrt((diff[mask]**2).sum()))
    linf_attacked = float(np.abs(diff[mask]).max())

    mags = {
        "L2": l2, "Linf": linf,
        "L2_attacked": l2_attacked, "Linf_attacked": linf_attacked,
        "n_attacked": int(mask.sum())
    }
    return X, mags


#Added LightGBM + attack

In [19]:
# LightGBM train, SHAP, and targeted attack
import lightgbm as lgb

def train_lgbm(X_tr, y_tr, seed=42):
    params = dict(
        objective="binary",
        metric="auc",
        learning_rate=0.05,
        num_leaves=31,
        feature_fraction=0.9,
        bagging_fraction=0.9,
        bagging_freq=1,
        min_data_in_leaf=20,
        verbosity=-1,
        seed=seed,
        n_jobs=-1
    )
    dtrain = lgb.Dataset(X_tr, label=y_tr, feature_name=list(X_tr.columns))
    model = lgb.train(params, dtrain, num_boost_round=300)
    return model

# Train LGBM
lgbm = train_lgbm(X_train, y_train, seed=42)
lgbm_scores = lgbm.predict(X_test)
lgbm_metrics_pre = metrics_full(y_test, lgbm_scores)
lgbm_shap, lgbm_meanabs = shap_values_tree_model(lgbm, X_train, X_test)

# Pick top behavioral feature (fallback to global top)
top_feats_lgbm = [X_test.columns[i] for i in np.argsort(-lgbm_meanabs)]
top_behavioral_lgbm = [f for f in top_feats_lgbm if f in TAGS.get("behavioral", list(X_test.columns))]
attack_feature_lgbm = top_behavioral_lgbm[0] if top_behavioral_lgbm else X_test.columns[np.argmax(lgbm_meanabs)]
integer_safe_lgbm = pd.api.types.is_integer_dtype(X_test[attack_feature_lgbm].dtype)

# Apply targeted perturbation
X_test_pert_lgbm, mags_lgbm = perturb_1feature_targeted(
    X_test, y_test, feature=attack_feature_lgbm, eps=0.25,
    mask_positive_only=True, integer_safe=integer_safe_lgbm, seed=42
)

# Evaluate deltas
lgbm_scores_post = lgbm.predict(X_test_pert_lgbm)
lgbm_metrics_post = metrics_full(y_test, lgbm_scores_post)

# Bootstrap p-value
def bootstrap_delta_auc_pvalue(y_true, scores_a, scores_b, n_boot=1000, seed=42):
    rng = np.random.default_rng(seed)
    n = len(y_true)
    auc_diffs = []
    for _ in range(n_boot):
        idx = rng.integers(0, n, size=n)
        auc_a = roc_auc_score(y_true[idx], scores_a[idx])
        auc_b = roc_auc_score(y_true[idx], scores_b[idx])
        auc_diffs.append(auc_b - auc_a)
    auc_diffs = np.array(auc_diffs)
    p = 2 * min((auc_diffs <= 0).mean(), (auc_diffs >= 0).mean())
    return float(p)

pval_lgbm = bootstrap_delta_auc_pvalue(y_test, lgbm_scores, lgbm_scores_post, n_boot=1000, seed=42)

print(f"LGBM attack on feature '{attack_feature_lgbm}' (integer_safe={integer_safe_lgbm})")
print("Pre:", lgbm_metrics_pre)
print("Post:", lgbm_metrics_post)
print(f"ΔAUC: {lgbm_metrics_post['AUC'] - lgbm_metrics_pre['AUC']:.4f} (p≈{pval_lgbm:.3f})")

LGBM attack on feature 'f3' (integer_safe=True)
Pre: {'AUC': np.float64(0.9807312606925169), 'Recall_fraud': np.float64(0.8416289592760143), 'FPR': np.float64(0.006966434452184923)}
Post: {'AUC': np.float64(0.9841356720990145), 'Recall_fraud': np.float64(0.8733031674208106), 'FPR': np.float64(0.006966434452184923)}
ΔAUC: 0.0034 (p≈0.000)


# LightGBM train, SHAP, and targeted attack

In [20]:
import lightgbm as lgb

def train_lgbm(X_tr, y_tr, seed=42):
    params = dict(
        objective="binary",
        metric="auc",
        learning_rate=0.05,
        num_leaves=31,
        feature_fraction=0.9,
        bagging_fraction=0.9,
        bagging_freq=1,
        min_data_in_leaf=20,
        verbosity=-1,
        seed=seed,
        n_jobs=-1
    )
    dtrain = lgb.Dataset(X_tr, label=y_tr, feature_name=list(X_tr.columns))
    model = lgb.train(params, dtrain, num_boost_round=300)
    return model

# Train LGBM
lgbm = train_lgbm(X_train, y_train, seed=42)
lgbm_scores = lgbm.predict(X_test)
lgbm_metrics_pre = metrics_full(y_test, lgbm_scores)
lgbm_shap, lgbm_meanabs = shap_values_tree_model(lgbm, X_train, X_test)

# Pick top behavioral feature (fallback to global top)
top_feats_lgbm = [X_test.columns[i] for i in np.argsort(-lgbm_meanabs)]
top_behavioral_lgbm = [f for f in top_feats_lgbm if f in TAGS.get("behavioral", list(X_test.columns))]
attack_feature_lgbm = top_behavioral_lgbm[0] if top_behavioral_lgbm else X_test.columns[np.argmax(lgbm_meanabs)]
integer_safe_lgbm = pd.api.types.is_integer_dtype(X_test[attack_feature_lgbm].dtype)

# Apply targeted perturbation
X_test_pert_lgbm, mags_lgbm = perturb_1feature_targeted(
    X_test, y_test, feature=attack_feature_lgbm, eps=0.25,
    mask_positive_only=True, integer_safe=integer_safe_lgbm, seed=42
)

# Evaluate deltas
lgbm_scores_post = lgbm.predict(X_test_pert_lgbm)
lgbm_metrics_post = metrics_full(y_test, lgbm_scores_post)

# Bootstrap p-value
def bootstrap_delta_auc_pvalue(y_true, scores_a, scores_b, n_boot=1000, seed=42):
    rng = np.random.default_rng(seed)
    n = len(y_true)
    auc_diffs = []
    for _ in range(n_boot):
        idx = rng.integers(0, n, size=n)
        auc_a = roc_auc_score(y_true[idx], scores_a[idx])
        auc_b = roc_auc_score(y_true[idx], scores_b[idx])
        auc_diffs.append(auc_b - auc_a)
    auc_diffs = np.array(auc_diffs)
    p = 2 * min((auc_diffs <= 0).mean(), (auc_diffs >= 0).mean())
    return float(p)

pval_lgbm = bootstrap_delta_auc_pvalue(y_test, lgbm_scores, lgbm_scores_post, n_boot=1000, seed=42)

print(f"LGBM attack on feature '{attack_feature_lgbm}' (integer_safe={integer_safe_lgbm})")
print("Pre:", lgbm_metrics_pre)
print("Post:", lgbm_metrics_post)
print(f"ΔAUC: {lgbm_metrics_post['AUC'] - lgbm_metrics_pre['AUC']:.4f} (p≈{pval_lgbm:.3f})")

LGBM attack on feature 'f3' (integer_safe=True)
Pre: {'AUC': np.float64(0.9807312606925169), 'Recall_fraud': np.float64(0.8416289592760143), 'FPR': np.float64(0.006966434452184923)}
Post: {'AUC': np.float64(0.9841356720990145), 'Recall_fraud': np.float64(0.8733031674208106), 'FPR': np.float64(0.006966434452184923)}
ΔAUC: 0.0034 (p≈0.000)


# Added: Results logging

In [21]:
# Save quick results row to notebooks_outputs/
from pathlib import Path
OUTPUT_DIR = Path("notebooks_outputs")
OUTPUT_DIR.mkdir(exist_ok=True, parents=True)

delta_auc_rf = None
try:
    delta_auc_rf = rf_metrics_post["AUC"] - rf_metrics_pre["AUC"]
except Exception:
    pass

row = {
    "timestamp": pd.Timestamp.utcnow().isoformat(),
    "dataset": "synthetic",
    "seed": 42,
    "attack_mask": "fraud-only",
    "eps": 0.25,
    "feature_rf": globals().get("attack_feature", None),
    "feature_lgbm": globals().get("attack_feature_lgbm", None),
    "AUC_pre_RF": globals().get("rf_metrics_pre", {}).get("AUC", None),
    "AUC_post_RF": globals().get("rf_metrics_post", {}).get("AUC", None),
    "Delta_AUC_RF": delta_auc_rf,
    "AUC_pre_LGBM": lgbm_metrics_pre["AUC"],
    "AUC_post_LGBM": lgbm_metrics_post["AUC"],
    "Delta_AUC_LGBM": lgbm_metrics_post["AUC"] - lgbm_metrics_pre["AUC"],
}

df_out = pd.DataFrame([row])
csv_path = OUTPUT_DIR / f"synthetic_minimal_results_seed{42}.csv"
try:
    if csv_path.exists():
        df_prev = pd.read_csv(csv_path)
        df_all = pd.concat([df_prev, df_out], ignore_index=True)
        df_all.to_csv(csv_path, index=False)
    else:
        df_out.to_csv(csv_path, index=False)
    print("Saved:", csv_path.resolve())
except Exception as e:
    print("Could not save results:", e)

df_out

Saved: /content/aml_xai_ba_test/notebooks_outputs/synthetic_minimal_results_seed42.csv


Unnamed: 0,timestamp,dataset,seed,attack_mask,eps,feature_rf,feature_lgbm,AUC_pre_RF,AUC_post_RF,Delta_AUC_RF,AUC_pre_LGBM,AUC_post_LGBM,Delta_AUC_LGBM
0,2025-11-08T16:35:36.534933+00:00,synthetic,42,fraud-only,0.25,,f3,0.980347,,,0.980731,0.984136,0.003404


# Git commit & push helper

In [29]:
# Git commit & push helper (run when you want to save to GitHub)
import subprocess

def git_status():
    print(subprocess.run(["git", "status", "--porcelain"], capture_output=True, text=True).stdout)

def git_add_commit_push(message="update: synthetic RF+LGBM run"):
    print("Adding files...")
    subprocess.run(["git", "add", "-A"], check=False)
    print("Committing...")
    subprocess.run(["git", "commit", "-m", message], check=False)
    print("Pushing...")
    subprocess.run(["git", "push", "origin", "main"], check=False)
    print("✅ Done.")

print("Run git_status(); then git_add_commit_push(\"your message\").")


Run git_status(); then git_add_commit_push("your message").


In [33]:
git_status()

 M .gitignore



In [34]:
git_add_commit_push("update: synthetic RF+LGBM run")

Adding files...
Committing...
Pushing...
✅ Done.


# Added: IBM hi_small scaffold

In [None]:
# IBM hi_small scaffold (fill once dataset is mounted)
# Steps:
# 1) Place the dataset at data/hi_small.csv (or adjust path).
# 2) Identify label column and identity columns.
# 3) Behavioral = all other non-label, non-identity columns.
# 4) Repeat RF/LGBM + SHAP + targeted perturbation and append results to notebooks_outputs/.

# Example:
# df_ibm = pd.read_csv('data/hi_small.csv')
# label_col = 'label'   # <-- adjust
# identity_cols = ['customer_id','account_id']  # <-- adjust
# behavioral_cols = [c for c in df_ibm.columns if c not in identity_cols + [label_col]]
# TAGS_IBM = {'behavioral': behavioral_cols, 'identity': identity_cols}
#
# X_ibm = df_ibm.drop(columns=[label_col])
# y_ibm = df_ibm[label_col].values
#
# from sklearn.model_selection import train_test_split
# X_tr, X_te, y_tr, y_te = train_test_split(X_ibm, y_ibm, test_size=0.3, stratify=y_ibm, random_state=42)
# rf_ibm = train_rf(X_tr, y_tr, seed=42)
# rf_scores_pre = rf_ibm.predict_proba(X_te)[:,1]
# rf_shap_ibm, rf_meanabs_ibm = shap_values_tree_model(rf_ibm, X_tr, X_te)
# top_beh_ibm = [X_te.columns[i] for i in np.argsort(-rf_meanabs_ibm) if X_te.columns[i] in TAGS_IBM['behavioral']][0]
# X_te_pert, mags_ibm = perturb_1feature_targeted(X_te, y_te, feature=top_beh_ibm, eps=0.25, mask_positive_only=True,
#     integer_safe=pd.api.types.is_integer_dtype(X_te[top_beh_ibm].dtype), seed=42)
# rf_scores_post = rf_ibm.predict_proba(X_te_pert)[:,1]
# # ... mirror the synthetic logging here
print("IBM scaffold appended.")

In [43]:
!find /content -name "*.ipynb"