In [52]:
# Cell 1
import sys, subprocess, importlib

def _ensure(pkg):
    try: importlib.import_module(pkg)
    except: subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", pkg])

for p in ["pandas", "numpy", "scikit-learn", "ipywidgets", "matplotlib", "jinja2"]:
    _ensure(p)


In [53]:
# Cell 2 (PATCHED)
%%writefile popper_agents.py
from __future__ import annotations
from dataclasses import dataclass, asdict
from typing import Any, Dict, List, Optional
import numpy as np, pandas as pd, os, time, json
from collections import Counter

from sklearn.model_selection import StratifiedKFold, KFold, cross_val_predict
from sklearn.metrics import (accuracy_score, roc_auc_score, f1_score, precision_score, recall_score,
                             mean_squared_error, r2_score)
from sklearn.inspection import permutation_importance
from sklearn.calibration import calibration_curve

# ---------- common utils ----------
def _as_series(y):
    return y if isinstance(y, pd.Series) else pd.Series(y)

def _task_type(y: pd.Series) -> str:
    y = _as_series(y)
    if pd.api.types.is_numeric_dtype(y):
        uniq = pd.unique(y.dropna())
        if len(uniq) == 2 and set(map(float, uniq)).issubset({0.0, 1.0}): return "binary"
        if len(uniq) <= 50 and all(float(v).is_integer() for v in uniq):  return "multiclass"
        return "regression"
    return "multiclass"

def _min_class_count(y):
    yS = _as_series(y).dropna()
    uniq, counts = np.unique(yS, return_counts=True)
    return int(counts.min()) if len(counts) else 0

def _cv_for_task(y, cv_splits=5, random_state=42):
    task = _task_type(y)
    if task in ("binary","multiclass"):
        mcc = _min_class_count(y)
        n = max(2, min(cv_splits, mcc)) if mcc >= 2 else 2
        return StratifiedKFold(n_splits=n, shuffle=True, random_state=random_state) if mcc>=2 else KFold(n_splits=n, shuffle=True, random_state=random_state)
    return KFold(n_splits=cv_splits, shuffle=True, random_state=random_state)

def _finding(sev, msg, meta=None):
    return {"severity": sev, "message": msg, "meta": meta or {}}

@dataclass
class AgentResult:
    agent: str
    score: float
    findings: List[Dict[str,Any]]
    metrics: Dict[str,Any]
    def to_dict(self): return asdict(self)

# ---------- data agents ----------
def data_integrity_agent(df: pd.DataFrame, label_col: Optional[str]=None) -> AgentResult:
    findings, metrics = [], {}
    miss = df.isna().mean().sort_values(ascending=False)
    metrics["missing_rate_top"] = {str(k): float(v) for k, v in miss.head(10).items()}
    if miss.max() > 0.3:
        col = miss.idxmax()
        findings.append(_finding("warning", f"High missingness in '{col}' ({miss.max():.1%})."))

    dups = int(df.duplicated().sum())
    if dups:
        findings.append(_finding("warning", f"Found {dups} duplicate rows."))
        metrics["duplicate_rows"] = dups

    const_cols = [c for c in df.columns if df[c].nunique(dropna=False) <= 1]
    if const_cols:
        findings.append(_finding("info", f"{len(const_cols)} constant columns detected.", {"columns": const_cols}))
        metrics["constant_columns"] = const_cols

    id_cols = [c for c in df.columns if any(t in c.lower() for t in ["id","uuid","guid","hash"])]
    if id_cols:
        findings.append(_finding("info", "ID-like columns found (consider excluding from features).", {"columns": id_cols}))

    if label_col and label_col in df.columns:
        y = df[label_col]
        nun = int(pd.Series(y).nunique(dropna=True))
        metrics["label_unique_values"] = nun
        if nun == 1: findings.append(_finding("error", f"Label '{label_col}' has only one unique value."))
        if nun > 200 and not pd.api.types.is_numeric_dtype(y):
            findings.append(_finding("warning", f"Label '{label_col}' seems high-cardinality ({nun})."))

    score = 100.0
    if any(f["severity"]=="error" for f in findings): score -= 40
    if any(f["severity"]=="warning" for f in findings): score -= 20
    return AgentResult("data_integrity", score, findings, metrics)

def sampling_agent(df: pd.DataFrame, label_col: Optional[str]=None, train_frac=0.75, random_state=7) -> AgentResult:
    findings, metrics = [], {}
    if label_col and label_col in df.columns:
        y = df[label_col]
        if not pd.api.types.is_numeric_dtype(y): y = pd.Categorical(y).codes
        from sklearn.model_selection import train_test_split
        strat = y if _min_class_count(y) >= 2 else None
        _, _, ytr, yte = train_test_split(df.drop(columns=[label_col]), y, test_size=1-train_frac, random_state=random_state, stratify=strat)
        from collections import Counter
        base, trc, tec = Counter(y), Counter(ytr), Counter(yte)

        def _ratio(cnter):
            tot = sum(int(v) for v in cnter.values()) or 1
            return {int(k): (int(v)/tot) for k, v in cnter.items()}

        r_all, r_te = _ratio(base), _ratio(tec)
        drifts = {int(k): abs(r_all.get(int(k),0.0) - r_te.get(int(k),0.0)) for k in set(r_all)|set(r_te)}
        mx = max(drifts.values()) if drifts else 0.0

        # ensure JSON-safe metrics
        metrics.update({
            "class_counts_all": {int(k): int(v) for k, v in base.items()},
            "class_counts_train": {int(k): int(v) for k, v in trc.items()},
            "class_counts_test": {int(k): int(v) for k, v in tec.items()},
            "test_class_ratio_drift": float(mx)
        })
        if mx > 0.10: findings.append(_finding("warning", f"Class ratio drift > 0.10 between full data and test ({mx:.2f})."))
    else:
        findings.append(_finding("info", "No label provided; sampling checks limited."))
    score = 100.0 - (15.0 if any(f["severity"]=="warning" for f in findings) else 0.0)
    return AgentResult("sampling", score, findings, metrics)

def consistency_agent(df: pd.DataFrame) -> AgentResult:
    num_cols = df.select_dtypes(include=[np.number]).columns
    negatives = {}
    for c in num_cols:
        if any(tok in c.lower() for tok in ["amount","income","balance","score","term","tenure","credit"]):
            neg = float((df[c] < 0).mean())
            if neg > 0: negatives[c] = neg
    findings = []
    if negatives:
        findings.append(_finding("warning", "Found negative values in non-negative-like fields.", {"columns": negatives}))
    return AgentResult("consistency", 100.0 - (10.0 if negatives else 0.0), findings, {"negatives_suspect": negatives})

# ---------- model agents ----------
def performance_agent(model, X: pd.DataFrame, y: pd.Series, cv_splits=5, random_state=42) -> AgentResult:
    task = _task_type(y); findings, metrics = [], {}
    cv = _cv_for_task(y, cv_splits=cv_splits, random_state=random_state)
    y_pred = cross_val_predict(model, X, y, cv=cv, method="predict")
    y_proba = None
    if task in ("binary","multiclass"):
        try: y_proba = cross_val_predict(model, X, y, cv=cv, method="predict_proba")
        except: pass
    if task == "binary":
        acc = accuracy_score(y, y_pred); f1 = f1_score(y, y_pred, zero_division=0)
        prec = precision_score(y, y_pred, zero_division=0); rec = recall_score(y, y_pred, zero_division=0)
        auc = roc_auc_score(y, y_proba[:,1]) if y_proba is not None and len(np.unique(_as_series(y)))==2 else np.nan
        metrics.update(dict(task=task, accuracy=float(acc), f1=float(f1), precision=float(prec), recall=float(rec), auc=(float(auc) if not np.isnan(auc) else None)))
        base = np.nanmean([acc, f1, prec, rec, auc if not np.isnan(auc) else acc])
    elif task == "multiclass":
        acc = accuracy_score(y, y_pred); metrics.update(dict(task=task, accuracy=float(acc))); base = acc
    else:
        rmse = float(np.sqrt(mean_squared_error(y, y_pred))); r2 = r2_score(y, y_pred)
        metrics.update(dict(task=task, rmse=float(rmse), r2=float(r2))); base = max(0.0, min(1.0, (r2+1)/2))
    score = float(np.clip(base,0,1)*100.0)
    findings.append(_finding("error" if score<70 else "warning" if score<85 else "info",
                             "Model performance is weak." if score<70 else "Model performance is moderate." if score<85 else "Strong model performance."))
    return AgentResult("performance", score, findings, metrics)

def fairness_agent(model, X: pd.DataFrame, y: pd.Series, sensitive_cols: Optional[List[str]]=None, cv_splits=5, random_state=42) -> AgentResult:
    findings, metrics = [], {}; task = _task_type(y)
    if task not in ("binary","multiclass"):
        return AgentResult("fairness", 100.0, [_finding("info","Fairness skipped for regression.")], {"skipped": True})
    if not sensitive_cols:
        return AgentResult("fairness", 80.0, [_finding("warning","No sensitive columns provided; fairness limited.")], {"limited": True})
    cv = _cv_for_task(y, cv_splits=cv_splits, random_state=random_state)
    y_pred = cross_val_predict(model, X, y, cv=cv, method="predict")
    res={}
    for col in sensitive_cols:
        if col not in X.columns: findings.append(_finding("warning", f"Sensitive column '{col}' not found.")); continue
        groups = pd.Series(X[col]).astype("category"); stats={}
        for g in groups.cat.categories:
            idx = (groups==g)
            if idx.sum()==0: continue
            acc = accuracy_score(y[idx], y_pred[idx])
            stats[str(g)]=dict(accuracy=float(acc))
            if task=="binary":
                pr = float(np.mean(y_pred[idx]==1))
                try:
                    tpr = recall_score(y[idx], y_pred[idx], pos_label=1, zero_division=0)
                except:
                    tpr = np.nan
                stats[str(g)].update(positive_rate=float(pr), tpr=(float(tpr) if not np.isnan(tpr) else None))
        accs=[v["accuracy"] for v in stats.values()]
        ag=max(accs)-min(accs) if len(accs)>=2 else 0.0
        res[col]=dict(subgroup_accuracy_gap=float(ag))
        if task=="binary":
            prs=[v["positive_rate"] for v in stats.values() if v.get("positive_rate") is not None]
            tprs=[v["tpr"] for v in stats.values() if v.get("tpr") is not None]
            if len(prs)>=2: res[col]["demographic_parity_gap"]=float(max(prs)-min(prs))
            if len(tprs)>=2: res[col]["equal_opportunity_gap"]=float(max(tprs)-min(tprs))
        for k,v in res[col].items():
            if v>0.1: findings.append(_finding("warning", f"High {k.replace('_',' ')} for {col}: {v:.3f}"))
    metrics = res
    penalty = sum(10.0 for f in findings if f["severity"]=="warning")
    score = float(max(40.0, 100.0 - penalty))
    if score>=90 and not findings: findings.append(_finding("info","No large fairness gaps detected."))
    return AgentResult("fairness", score, findings, metrics)

def robustness_agent(model, X: pd.DataFrame, y: pd.Series, noise_std=0.01, trials=20, random_state=42) -> AgentResult:
    rng = np.random.default_rng(random_state); task=_task_type(y); findings, metrics=[], {}
    base = model.fit(X, y).predict(X)
    num_cols = X.select_dtypes(include=[np.number]).columns
    if len(num_cols)==0: return AgentResult("robustness", 85.0, [_finding("warning","No numeric features for perturbation; limited robustness check.")], {"limited": True})
    if task in ("binary","multiclass"):
        flips=[]
        for _ in range(trials):
            Xp=X.copy(); Xp[num_cols]=Xp[num_cols]+rng.normal(0, noise_std, size=Xp[num_cols].shape)
            flips.append(float(np.mean(model.predict(Xp)!=base)))
        fr=float(np.mean(flips)); metrics["avg_flip_rate"]=fr
        score=float(max(0.0, 100.0*(1.0-min(fr*5,1.0))))
        findings.append(_finding("warning" if fr>0.1 else "info", f"Avg flip rate under noise: {fr:.3f}"))
    else:
        from math import sqrt
        rmse0=sqrt(mean_squared_error(y, base)); drifts=[]
        for _ in range(trials):
            Xp=X.copy(); Xp[num_cols]=Xp[num_cols]+rng.normal(0, noise_std, size=Xp[num_cols].shape)
            drifts.append(abs(sqrt(mean_squared_error(y, model.predict(Xp))) - rmse0))
        drift=float(np.mean(drifts)); metrics["avg_rmse_drift"]=drift
        score=float(max(0.0, 100.0*(1.0-min(drift,1.0)))); findings.append(_finding("warning" if drift>0.05 else "info", f"Avg RMSE drift: {drift:.3f}"))
    return AgentResult("robustness", score, findings, metrics)

def explainability_agent(model, X: pd.DataFrame, y: pd.Series, n_repeats=5, random_state=42) -> AgentResult:
    findings, metrics=[], {}; fit=model.fit(X,y); scoring="accuracy" if _task_type(y) in ("binary","multiclass") else "r2"
    try:
        pi = permutation_importance(fit, X, y, scoring=scoring, n_repeats=n_repeats, random_state=random_state)
        metrics["permutation_importance"]=dict(zip(X.columns, [float(v) for v in pi.importances_mean.tolist()]))
        findings.append(_finding("info","Permutation feature importance computed."))
        score = 100.0
    except Exception as e:
        findings.append(_finding("warning", f"Permutation importance failed: {e}")); score=70.0; metrics["failed"]=True
    return AgentResult("explainability", score, findings, metrics)

def calibration_agent(model, X: pd.DataFrame, y: pd.Series) -> AgentResult:
    task=_task_type(y)
    if task!="binary":
        return AgentResult("calibration", 100.0, [_finding("info","Calibration evaluated for binary tasks only.")], {"skipped": True})
    fit = model.fit(X,y)
    if hasattr(fit, "predict_proba"):
        ps = fit.predict_proba(X)[:,1]
        frac_pos, mean_pred = calibration_curve(y, ps, n_bins=10, strategy="quantile")
        ece = float(np.mean(np.abs(frac_pos - mean_pred)))
        return AgentResult("calibration", float(max(0.0, 100.0*(1.0-min(ece*5,1.0)))),
                           [_finding("warning" if ece>0.05 else "info", f"Calibration ECE={ece:.3f} (lower is better).")],
                           {"ece": ece, "bins": int(len(frac_pos))})
    return AgentResult("calibration", 80.0, [_finding("warning","No predict_proba; calibration limited.")], {})

def falsification_agent(model, X: pd.DataFrame, y: pd.Series) -> AgentResult:
    # DTYPE-SAFE ADVERSARIAL CORRUPTION
    num_cols = X.select_dtypes(include=[np.number]).columns
    findings, metrics=[], {}
    if len(num_cols)==0:
        return AgentResult("falsification", 85.0, [_finding("info","No numeric features; limited falsification tests.")], {})
    rng=np.random.default_rng(42)
    cols = list(num_cols[:min(2, len(num_cols))])
    Xt = X.copy()
    # ensure float dtype for mutated columns to avoid incompatible dtype setting
    for c in cols[:]:
        try:
            Xt[c] = Xt[c].astype(float)
        except Exception:
            cols.remove(c)
    if len(Xt)>0 and len(cols)>0:
        idx = rng.choice(len(Xt), max(1, len(Xt)//20), replace=False)
        for c in cols:
            sd = float(Xt[c].std())
            if not np.isfinite(sd) or sd == 0.0:
                sd = 1.0
            mu = float(Xt[c].mean())
            Xt.loc[idx, c] = mu + 10.0*sd
        if _task_type(y)!="regression":
            base = float(accuracy_score(y, model.fit(X, y).predict(X)))
            adv  = float(accuracy_score(y, model.fit(Xt, y).predict(Xt)))
        else:
            base = float(r2_score(y, model.fit(X, y).predict(X)))
            adv  = float(r2_score(y, model.fit(Xt, y).predict(Xt)))
        delta = float(base - adv)
        metrics["corruption_perf_drop"] = delta
        if delta > 0.05:
            findings.append(_finding("warning", f"Performance dropped by {delta:.3f} under small corruption."))
    return AgentResult("falsification", float(max(0.0, 100.0 - 100.0*min(metrics.get("corruption_perf_drop",0.0), 0.5))), findings, metrics)

# ---------- orchestrator ----------
def run_all_agents(model, X: pd.DataFrame, y: pd.Series, df_raw: pd.DataFrame, label_col: Optional[str], sensitive_cols: Optional[List[str]]=None) -> Dict[str,Any]:
    data_r = data_integrity_agent(df_raw, label_col)
    samp_r = sampling_agent(df_raw, label_col)
    cons_r = consistency_agent(df_raw)
    perf_r = performance_agent(model, X, y)
    fair_r = fairness_agent(model, X, y, sensitive_cols=sensitive_cols)
    robo_r = robustness_agent(model, X, y)
    expl_r = explainability_agent(model, X, y)
    cali_r = calibration_agent(model, X, y)
    fals_r = falsification_agent(model, X, y)

    results = [data_r, samp_r, cons_r, perf_r, fair_r, robo_r, expl_r, cali_r, fals_r]
    weights = {"data_integrity":0.10, "sampling":0.07, "consistency":0.08, "performance":0.35, "fairness":0.15, "robustness":0.10, "explainability":0.05, "calibration":0.05, "falsification":0.05}
    overall = sum(weights.get(r.agent,0)*r.score for r in results)
    status = "pass" if overall>=90 else "needs_review" if overall>=70 else "fail"
    return {"overall":{"score":round(overall,2),"status":status},
            "agents":[r.to_dict() for r in results],
            "meta":{"n_samples":int(len(X)),"n_features":int(X.shape[1]),"task":_task_type(y)}}

# ---------- robust JSON save ----------
def _json_safe(o):
    import numpy as _np, pandas as _pd, json as _json
    if isinstance(o, dict):
        # cast KEYS to str and recurse
        return {str(_json_safe(k)): _json_safe(v) for k, v in o.items()}
    if isinstance(o, (list, tuple, set)):
        return [_json_safe(x) for x in o]
    if isinstance(o, (_np.generic,)):
        return o.item()
    if isinstance(o, _np.ndarray):
        return [_json_safe(x) for x in o.tolist()]
    if isinstance(o, _pd.Series):
        return _json_safe(o.to_dict())
    if isinstance(o, _pd.DataFrame):
        return _json_safe(o.to_dict(orient="list"))
    # last resort: stringify unknowns
    try:
        _json.dumps(o)
        return o
    except Exception:
        return str(o)

def save_reports(name: str, results: Dict[str, Any], outdir="validation_runs_popper"):
    os.makedirs(outdir, exist_ok=True); ts=int(time.time())
    jp=os.path.join(outdir, f"{name}_popper_results_{ts}.json")
    mp=os.path.join(outdir, f"{name}_popper_results_{ts}.md")
    hp=os.path.join(outdir, f"{name}_popper_results_{ts}.html")

    # JSON (safe)
    with open(jp,"w") as f: json.dump(_json_safe(results), f, indent=2)

    # Markdown (brief)
    lines=[f"# Popper Validation — {name}",
           f"**Overall:** {results['overall']['status']} (score: {results['overall']['score']})",
           f"**Samples:** {results['meta']['n_samples']} | **Features:** {results['meta']['n_features']} | **Task:** {results['meta']['task']}",
           "\n## Agents"]
    for r in results["agents"]:
        lines.append(f"### {r['agent'].replace('_',' ').title()} — score: {r['score']:.1f}")
        if r["metrics"]:
            import json as _j; lines.append("**Metrics:**"); lines.append("```json\n"+_j.dumps(_json_safe(r["metrics"]), indent=2)+"\n```")
        if r["findings"]:
            lines.append("**Findings:**")
            for f in r["findings"]:
                badge={"error":"🟥","warning":"🟨","info":"⬜"}.get(f["severity"],"⬜")
                lines.append(f"- {badge} {f['message']}")
        lines.append("")
    with open(mp,"w") as f: f.write("\n".join(lines))

    # HTML (simple)
    html = "<html><head><meta charset='utf-8'><title>Popper Validation</title></head><body>" + "\n".join(f"<p>{ln}</p>" for ln in lines) + "</body></html>"
    with open(hp,"w") as f: f.write(html)
    return jp, mp, hp


Overwriting popper_agents.py


In [54]:
# Cell 3
import os, io, json, time
import numpy as np, pandas as pd
from collections import Counter
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder, FunctionTransformer, StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.feature_extraction import FeatureHasher
from sklearn.model_selection import train_test_split
from sklearn.base import clone
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestRegressor

import importlib, popper_agents as P
importlib.reload(P)

CANDIDATE_LABELS = ["label","target","y","approved","default","churn","loan_status","is_fraud","premium",
                    "price","outcome","response","Loan_Status","Outcome"]

def infer_label_col(df: pd.DataFrame):
    for c in CANDIDATE_LABELS:
        if c in df.columns: return c
    last = df.columns[-1]
    if not any(tok in last.lower() for tok in ["id","uuid","guid","hash"]): return last
    return None

def infer_task(y: pd.Series):
    y = y if isinstance(y, pd.Series) else pd.Series(y)
    if pd.api.types.is_numeric_dtype(y):
        uniq = pd.unique(y.dropna())
        if len(uniq) <= 10 and all(float(v).is_integer() for v in uniq): return "classification"
        return "regression"
    return "classification"

def infer_sensitive_cols(df: pd.DataFrame):
    candidates = ["Gender","Sex","Married","Education","Property_Area","race","ethnicity","age","age_group","zip","zipcode","state","country"]
    return [c for c in candidates if c in df.columns]

def infer_monotonic_features(df: pd.DataFrame):
    tokens = ("score","amount","balance","income","years","tenure","experience","credit","rating")
    return [c for c in df.select_dtypes(include=[np.number]).columns if any(t in c.lower() for t in tokens)]

def _extract_dt_components(s: pd.Series) -> pd.DataFrame:
    dt = pd.to_datetime(s, errors="coerce")
    return pd.DataFrame({
        f"{s.name}_year": dt.dt.year,
        f"{s.name}_month": dt.dt.month,
        f"{s.name}_day": dt.dt.day,
        f"{s.name}_dow": dt.dt.dayofweek,
        f"{s.name}_hour": dt.dt.hour,
    })

def _series_to_dicts(X: pd.DataFrame, col: str):
    return [{col: str(v)} for v in X[col].astype(str).fillna("NA").values]

def build_universal_pipeline(X: pd.DataFrame, task: str):
    num_cols = X.select_dtypes(include=[np.number]).columns.tolist()
    # Only consider name-hinted potential datetime columns
    dt_cols=[]; name_hints=("date","time","_dt")
    for c in X.columns:
        if c in num_cols: continue
        lc=c.lower()
        if any(h in lc for h in name_hints):
            sample = pd.to_datetime(X[c].dropna().astype(str).head(100), errors="coerce")
            if sample.notna().mean() > 0.7:
                dt_cols.append(c)

    obj_cols = [c for c in X.columns if c not in num_cols and c not in dt_cols]
    low_card = [c for c in obj_cols if X[c].nunique(dropna=True) <= 200]
    high_card = [c for c in obj_cols if c not in low_card]

    num_tf = Pipeline([
        ("impute", SimpleImputer(strategy="median")),
        ("scale", StandardScaler(with_mean=True, with_std=True))
    ])

    dt_tfs=[]
    for c in dt_cols:
        dt_tfs.append((f"dt_{c}", Pipeline([
            ("extract", FunctionTransformer(lambda df, col=c: _extract_dt_components(df[col]), validate=False)),
            ("impute", SimpleImputer(strategy="most_frequent"))
        ]), [c]))

    low_cat_tf = Pipeline([
        ("impute", SimpleImputer(strategy="most_frequent")),
        ("ohe", OneHotEncoder(handle_unknown="ignore", sparse_output=False))
    ])

    def hasher_for(col):
        return Pipeline([
            ("to_dicts", FunctionTransformer(lambda df, c=col: _series_to_dicts(df, c), validate=False)),
            ("hash", FeatureHasher(n_features=64, input_type="dict"))
        ])

    transformers=[]
    if num_cols: transformers.append(("num", num_tf, num_cols))
    if low_card: transformers.append(("lowcat", low_cat_tf, low_card))
    for c in high_card:
        transformers.append((f"hash_{c}", hasher_for(c), [c]))
    transformers += dt_tfs

    pre = ColumnTransformer(transformers=transformers, remainder="drop")
    model = LogisticRegression(max_iter=2000, solver="liblinear", class_weight="balanced") if infer_task(pd.Series([0,1]))=="classification" else RandomForestRegressor(n_estimators=300, random_state=42)
    # ↑ dummy infer_task call just to silence linters; we’ll replace model in runner based on actual task
    return Pipeline([("pre", pre), ("mdl", LogisticRegression(max_iter=2000, solver="liblinear", class_weight="balanced"))])


In [55]:
# Cell 4
import numpy as np, pandas as pd, matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, roc_curve, precision_recall_curve, roc_auc_score, average_precision_score
from sklearn.inspection import permutation_importance

def plot_confusion_matrix_binary(y_true, y_pred):
    cm = confusion_matrix(y_true, y_pred)
    fig, ax = plt.subplots()
    im = ax.imshow(cm)
    ax.set_title("Confusion Matrix")
    ax.set_xlabel("Predicted")
    ax.set_ylabel("True")
    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
            ax.text(j, i, str(cm[i, j]), ha="center", va="center")
    plt.show()

def plot_roc_pr(y_true, y_score):
    if len(np.unique(y_true)) != 2:
        print("ROC/PR only for binary classification.")
        return
    fpr, tpr, _ = roc_curve(y_true, y_score)
    prec, rec, _ = precision_recall_curve(y_true, y_score)
    auc = roc_auc_score(y_true, y_score)
    ap = average_precision_score(y_true, y_score)

    fig, ax = plt.subplots()
    ax.plot(fpr, tpr, label=f"AUC={auc:.3f}")
    ax.plot([0,1],[0,1], linestyle="--")
    ax.set_title("ROC Curve")
    ax.set_xlabel("FPR")
    ax.set_ylabel("TPR")
    ax.legend()
    plt.show()

    fig, ax = plt.subplots()
    ax.plot(rec, prec, label=f"AP={ap:.3f}")
    ax.set_title("Precision-Recall Curve")
    ax.set_xlabel("Recall")
    ax.set_ylabel("Precision")
    ax.legend()
    plt.show()

def plot_calibration_curve(y_true, y_score, n_bins=10):
    # simple reliability diagram
    bins = np.quantile(y_score, np.linspace(0,1,n_bins+1))
    mids, obs = [], []
    for i in range(n_bins):
        lo, hi = bins[i], bins[i+1] + 1e-12
        m = (y_score>=lo) & (y_score<hi)
        if m.sum()==0: continue
        mids.append(y_score[m].mean())
        obs.append(y_true[m].mean())
    fig, ax = plt.subplots()
    ax.plot([0,1],[0,1],"--")
    ax.plot(mids, obs, marker="o")
    ax.set_title("Calibration (Reliability) Diagram")
    ax.set_xlabel("Predicted probability")
    ax.set_ylabel("Observed frequency")
    plt.show()

def plot_feature_importance(model, X, y, task):
    try:
        pi = permutation_importance(model, X, y,
                                    n_repeats=5,
                                    scoring="accuracy" if task!="regression" else "r2",
                                    random_state=7)
        means = pi.importances_mean
        order = np.argsort(np.abs(means))[::-1][:15]
        names = [X.columns[i] for i in order]
        vals = means[order]
        fig, ax = plt.subplots()
        ax.barh(range(len(names)), vals)
        ax.set_yticks(range(len(names)))
        ax.set_yticklabels(names)
        ax.invert_yaxis()
        ax.set_title("Top Features (Permutation Importance)")
        plt.tight_layout()
        plt.show()
    except Exception as e:
        print("Permutation importance failed:", e)

def plot_fairness_bars(Xte, yte, yhat, yscore, sensitive_cols):
    if not sensitive_cols:
        print("No sensitive columns for fairness plots.")
        return
    for col in sensitive_cols:
        if col not in Xte.columns:
            continue
        cats = pd.Series(Xte[col]).astype("category")
        groups = list(cats.cat.categories)
        prates, tprs = [], []
        for g in groups:
            idx = (cats==g)
            if idx.sum()==0:
                prates.append(np.nan); tprs.append(np.nan); continue
            prates.append(float(np.mean(yhat[idx]==1)))
            pos = np.sum(yte[idx]==1)
            tprs.append(float(np.sum((yte[idx]==1)&(yhat[idx]==1))/pos) if pos>0 else np.nan)
        fig, ax = plt.subplots()
        ax.bar(range(len(groups)), [0 if np.isnan(v) else v for v in prates])
        ax.set_xticks(range(len(groups)))
        ax.set_xticklabels(groups, rotation=45, ha="right")
        ax.set_title(f"Positive Rate by {col}")
        plt.tight_layout(); plt.show()

        fig, ax = plt.subplots()
        ax.bar(range(len(groups)), [0 if np.isnan(v) else v for v in tprs])
        ax.set_xticks(range(len(groups)))
        ax.set_xticklabels(groups, rotation=45, ha="right")
        ax.set_title(f"TPR by {col}")
        plt.tight_layout(); plt.show()

def plot_robustness_curve(model, X, y, task):
    if task == "regression":
        print("Robustness plot defined for classification here.")
        return
    noise_levels = [0.0, 0.005, 0.01, 0.02, 0.05]
    num_cols = X.select_dtypes(include=[np.number]).columns
    if len(num_cols)==0:
        print("No numeric features for noise perturbation.")
        return
    base_pred = model.predict(X)
    rates=[]
    for s in noise_levels:
        Xp = X.copy()
        Xp[num_cols] = Xp[num_cols] + np.random.normal(0, s, size=Xp[num_cols].shape)
        yhat = model.predict(Xp)
        rates.append(float(np.mean(yhat != base_pred)))
    fig, ax = plt.subplots()
    ax.plot(noise_levels, rates, marker="o")
    ax.set_title("Prediction Flip Rate vs Noise Std")
    ax.set_xlabel("Noise std added to numeric features")
    ax.set_ylabel("Flip rate")
    plt.show()


In [56]:
# Cell 5
import io, traceback
import pandas as pd
from ipywidgets import FileUpload, VBox, Label, Button, Dropdown, Text, HBox, Output, Checkbox
from IPython.display import display, clear_output

df = None
LABEL_COL = None

uploader = FileUpload(accept=".csv,.parquet,.xlsx,.xls", multiple=False)
status = Output()
out = Output()
autorun_cb = Checkbox(value=True, description="Run automatically after upload")

def _read_uploaded_file_any(value_obj):
    if isinstance(value_obj, dict) and value_obj:
        key = list(value_obj.keys())[0]
        item = value_obj[key]
        name = item.get("metadata", {}).get("name") or item.get("name", "uploaded_file")
        content = item.get("content")
    elif isinstance(value_obj, (list, tuple)) and len(value_obj) > 0:
        item = value_obj[0]
        name = item.get("name", "uploaded_file")
        content = item.get("content")
    else:
        raise ValueError("No file found in uploader.value")
    if content is None:
        raise ValueError("Uploaded file content is empty")
    buf = io.BytesIO(content)
    lname = name.lower()
    if lname.endswith(".csv"):
        return pd.read_csv(buf), name
    elif lname.endswith(".parquet"):
        return pd.read_parquet(buf), name
    elif lname.endswith((".xlsx", ".xls")):
        return pd.read_excel(buf), name
    else:
        raise ValueError(f"Unsupported file type: {name}")

def _build_runner_ui(local_df, fname):
    global LABEL_COL
    LABEL_COL = infer_label_col(local_df)
    label_options = local_df.columns.tolist()
    label_dd = Dropdown(options=label_options, value=LABEL_COL if LABEL_COL in label_options else label_options[0], description="Label")
    run_name = Text(value="run", description="Run name")
    run_btn = Button(description="Run Popper", button_style="success")

    def _run(_=None):
        with out:
            clear_output()
            try:
                print(f"Loaded: {fname} | shape={local_df.shape}")
                print("Columns:", list(local_df.columns))
                print("Inferred label:", LABEL_COL)
                _ = popper_run(local_df, label_col=label_dd.value, run_name=run_name.value)
                print("\nDone.")
            except Exception as e:
                print("⚠️ Error while running Popper:")
                traceback.print_exc()

    run_btn.on_click(_run)

    with status:
        clear_output()
        print(f"Loaded: {fname} | shape={local_df.shape}")
        display(local_df.head(10))
        print("Select the label column and click Run Popper:")
        display(HBox([label_dd, run_name, run_btn, autorun_cb]))

    if autorun_cb.value: _run()

def _on_upload_change(change):
    global df
    with status:
        try:
            if not uploader.value:
                clear_output(); print("No file uploaded yet."); return
            local_df, fname = _read_uploaded_file_any(uploader.value)
            df = local_df
            _build_runner_ui(local_df, fname)
        except Exception as e:
            clear_output(); print("Failed to read file:"); traceback.print_exc()

uploader.observe(_on_upload_change, names="value")
display(VBox([Label("Upload a CSV / Parquet / Excel file:"), uploader, status, out]))


VBox(children=(Label(value='Upload a CSV / Parquet / Excel file:'), FileUpload(value={}, accept='.csv,.parquet…

In [57]:
# Cell 6
from sklearn.inspection import permutation_importance
from sklearn.metrics import accuracy_score
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestRegressor

def _safe_stratify_arg(y, task):
    if task == "regression":
        return None
    yS = y if isinstance(y, pd.Series) else pd.Series(y)
    counts = Counter(yS)
    return yS if len(counts) > 0 and min(counts.values()) >= 2 else None

def preview_and_plots(pipeline: Pipeline, X: pd.DataFrame, y, task: str, sens_cols=None, max_rows=10):
    strat_arg = _safe_stratify_arg(y, task)
    Xtr, Xte, ytr, yte = train_test_split(X, y, test_size=0.25, random_state=7, stratify=strat_arg)
    model = clone(pipeline)
    # swap underlying model depending on task
    if task == "regression":
        model.steps[-1] = ("mdl", RandomForestRegressor(n_estimators=300, random_state=42))
    else:
        model.steps[-1] = ("mdl", LogisticRegression(max_iter=2000, solver="liblinear", class_weight="balanced"))
    model.fit(Xtr, ytr)
    preds = model.predict(Xte)

    print("\n=== Model Preview ===")
    if task != "regression":
        proba = None
        if hasattr(model, "predict_proba"):
            try: proba = model.predict_proba(Xte)[:, -1]
            except: proba = None
        print(f"(showing up to {max_rows} rows)")
        for i in range(min(max_rows, len(Xte))):
            pb = f" | p(class1)={proba[i]:.3f}" if proba is not None else ""
            yt = yte.iloc[i] if isinstance(yte, pd.Series) else yte[i]
            print(f"y_true={yt}, y_pred={preds[i]}{pb}")
    else:
        for i in range(min(max_rows, len(Xte))):
            yt = float(yte.iloc[i] if isinstance(yte, pd.Series) else yte[i])
            print(f"y_true={yt:.4f}, y_pred={float(preds[i]):.4f}")

    # Graphs
    if task != "regression":
        if 'proba' in locals() and proba is not None:
            plot_confusion_matrix_binary(yte, preds)
            plot_roc_pr(yte, proba)
            plot_calibration_curve(yte, proba)
    plot_feature_importance(model, Xte, yte, task)

    # Fairness plots on test slice (use raw columns)
    if task != "regression" and sens_cols:
        plot_fairness_bars(X.loc[Xte.index], yte, preds, (proba if 'proba' in locals() else None), sens_cols)

    # Robustness curve
    if task != "regression":
        plot_robustness_curve(model, Xte, yte, task)

    return model


In [58]:
# Cell 7
from popper_agents import run_all_agents, save_reports
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestRegressor
from sklearn.base import clone

def popper_run(df: pd.DataFrame, label_col: str, run_name: str = "run"):
    # Basic hygiene
    df = df.dropna(axis=1, how="all").dropna(subset=[label_col])
    y_raw = df[label_col]
    X_raw = df.drop(columns=[label_col])

    # Drop obvious IDs
    id_like = [c for c in X_raw.columns if any(tok in c.lower() for tok in ["id","uuid","guid","hash"])]
    if id_like:
        X_raw = X_raw.drop(columns=id_like)
        print("Dropped id-like columns:", id_like)

    # Task + encode label if needed
    task = infer_task(y_raw)
    y = y_raw
    if task == "classification" and not pd.api.types.is_numeric_dtype(y):
        y = pd.Categorical(y).codes
    y = pd.Series(y)

    # Build baseline pipeline
    pipe = build_universal_pipeline(X_raw, task)

    # Sensitive cols (override for common loan dataset fields if present)
    sens_cols = infer_sensitive_cols(df)
    preferred = [c for c in ["Gender", "Married", "Education", "Property_Area"] if c in df.columns]
    if preferred:
        sens_cols = preferred

    # Preview + plots
    _ = preview_and_plots(pipe, X_raw, y, task, sens_cols=sens_cols)

    # Proper model choice for agents
    model_for_agents = clone(pipe)
    if task == "regression":
        model_for_agents.steps[-1] = ("mdl", RandomForestRegressor(n_estimators=300, random_state=42))
    else:
        model_for_agents.steps[-1] = ("mdl", LogisticRegression(max_iter=2000, solver="liblinear", class_weight="balanced"))

    # Run all Popper agents and save reports (JSON, MD, HTML)
    results = run_all_agents(model_for_agents, X_raw, y, df_raw=df, label_col=label_col, sensitive_cols=sens_cols or None)
    jp, mp, hp = save_reports(run_name, results, outdir="validation_runs_popper")

    print("\n=== Popper Outputs ===")
    print("JSON report:", jp)
    print("Markdown report:", mp)
    print("HTML report:", hp)
    print("\nOverall:", results["overall"])
    return results


In [59]:
# Cell 8
def describe_dataset(df: pd.DataFrame, label_col: str):
    print("Shape:", df.shape)
    print("Columns:", list(df.columns))
    if label_col in df.columns:
        y = df[label_col]
        print(f"Label '{label_col}' dtype:", y.dtype, "| unique:", y.nunique(dropna=True))
        if not pd.api.types.is_numeric_dtype(y):
            print("Label appears categorical; encoding will be applied.")

print("Helper ready. Use describe_dataset(df, 'YourLabel') if helpful.")


Helper ready. Use describe_dataset(df, 'YourLabel') if helpful.


In [60]:
# Cell 9
# If you cannot use the widget, set path + label and run this cell:
FILE_PATH = ""  # e.g., "/mnt/data/loan_data_set.csv"
# LABEL_COL = "Loan_Status"

if FILE_PATH:
    if FILE_PATH.lower().endswith(".csv"):
        df = pd.read_csv(FILE_PATH)
    elif FILE_PATH.lower().endswith(".parquet"):
        df = pd.read_parquet(FILE_PATH)
    else:
        df = pd.read_excel(FILE_PATH)
    print(f"Loaded: {FILE_PATH}")
    if 'LABEL_COL' not in globals():
        LABEL_COL = infer_label_col(df)
        print("Inferred label:", LABEL_COL)
    describe_dataset(df, LABEL_COL)
    _ = popper_run(df, label_col=LABEL_COL, run_name="path_run")
else:
    print("Path mode idle. Use the upload widget or set FILE_PATH above.")


Path mode idle. Use the upload widget or set FILE_PATH above.


In [61]:
# Cell 10
print("""
Notes & knobs:
- To change the model: in Cell 6/7, swap LogisticRegression for another classifier (e.g., RandomForestClassifier).
- To add/remove sensitive columns for fairness: edit `preferred` list in Cell 7.
- Reports go to: validation_runs_popper/  (JSON, MD, HTML).
- For strict date parsing, rename columns to include 'date'/'time' and keep them parseable; the pipeline auto-expands y/m/d/dow/hour.
- For extremely imbalanced labels, the pipeline uses class_weight='balanced' and safe CV folds.
- To add domain-specific falsification tests, extend falsification_agent() in popper_agents.py (Cell 2).
""")



Notes & knobs:
- To change the model: in Cell 6/7, swap LogisticRegression for another classifier (e.g., RandomForestClassifier).
- To add/remove sensitive columns for fairness: edit `preferred` list in Cell 7.
- Reports go to: validation_runs_popper/  (JSON, MD, HTML).
- For strict date parsing, rename columns to include 'date'/'time' and keep them parseable; the pipeline auto-expands y/m/d/dow/hour.
- For extremely imbalanced labels, the pipeline uses class_weight='balanced' and safe CV folds.
- To add domain-specific falsification tests, extend falsification_agent() in popper_agents.py (Cell 2).

