In [1]:

"""

Bank Marketing: Model Benchmark (LR, Tree, KNN, SVM-RBF)
- Auto-detects attached bank dataset (semicolon or comma separated)
- Drops 'duration' to avoid target leakage
- Preprocess: Standardize numeric, One-Hot encode categorical
- Handles class imbalance via class_weight='balanced' where applicable
- Tunes hyperparameters with 5-fold Stratified CV (scoring=ROC-AUC)
- Reports Accuracy, F1, ROC-AUC, PR-AUC; saves summary + plots

Usage:
    python bank_benchmark.py
Optional:
    python bank_benchmark.py --path /mnt/data/bank-additional-full.csv
    python bank_benchmark.py --test-size 0.2 --random-state 42
"""

from __future__ import annotations
import argparse
from pathlib import Path
import sys
import time
import warnings

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.model_selection import (
    train_test_split, GridSearchCV, StratifiedKFold
)
from sklearn.metrics import (
    accuracy_score, f1_score, roc_auc_score, average_precision_score,
    confusion_matrix, classification_report, precision_recall_curve, roc_curve, auc
)

from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.svm import SVC

warnings.filterwarnings("ignore", category=UserWarning)

# -------------------------
# Data utilities
# -------------------------
CANDIDATES = [
    "bank-additional-full.csv",
    "bank-full.csv",
    "bank-additional.csv",
    "bank.csv",
]

def detect_delimiter(path: Path) -> str:
    """Try detecting delimiter (';' or ',') from first line."""
    with path.open("r", encoding="utf-8", errors="ignore") as f:
        head = f.readline()
    if head.count(";") > head.count(","):
        return ";"
    return ","

def find_dataset_path(cli_path: str | None) -> Path:
    if cli_path:
        p = Path(cli_path)
        if p.exists():
            return p
        raise FileNotFoundError(f"Provided path not found: {cli_path}")
    data_root = Path("/mnt/data")
    for name in CANDIDATES:
        p = data_root / name
        if p.exists():
            return p
    # fallback: any csv in /mnt/data
    for p in data_root.glob("*.csv"):
        return p
    raise FileNotFoundError("No CSV dataset found. Provide --path explicitly.")

def load_bank_dataset(path: Path) -> pd.DataFrame:
    sep = detect_delimiter(path)
    df = pd.read_csv(path, sep=sep)
    if "y" not in df.columns:
        raise ValueError("Expected target column 'y' not found in dataset.")
    # Map y: {yes,no}->{1,0}
    df["y"] = df["y"].astype(str).str.lower().eq("yes").astype(int)
    # Remove 'duration' (leakage per authors)
    if "duration" in df.columns:
        df = df.drop(columns=["duration"])
    return df

# -------------------------
# Modeling
# -------------------------
def build_preprocessor(X: pd.DataFrame) -> ColumnTransformer:
    num_cols = X.select_dtypes(include=["number", "float", "int", "bool"]).columns.tolist()
    cat_cols = [c for c in X.columns if c not in num_cols]
    pre = ColumnTransformer(
        transformers=[
            ("num", StandardScaler(with_mean=False), num_cols),   # sparse-safe
            ("cat", OneHotEncoder(handle_unknown="ignore"), cat_cols),
        ]
    )
    return pre, num_cols, cat_cols

def bench_model(name, estimator, grid, pre, X_train, y_train, X_test, y_test):
    """Fit + CV tune; return metrics and artifacts."""
    cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
    pipe = Pipeline([("prep", pre), ("clf", estimator)])
    gs = GridSearchCV(
        pipe, param_grid=grid, cv=cv, scoring="roc_auc",
        n_jobs=-1, refit=True
    )
    t0 = time.perf_counter()
    gs.fit(X_train, y_train)
    t1 = time.perf_counter()

    # Predictions
    y_pred = gs.best_estimator_.predict(X_test)
    y_proba = None
    if hasattr(gs.best_estimator_["clf"], "predict_proba"):
        y_proba = gs.best_estimator_.predict_proba(X_test)[:, 1]

    metrics = {
        "model": name,
        "cv_best_score(ROC-AUC)": float(gs.best_score_),
        "test_acc": float(accuracy_score(y_test, y_pred)),
        "test_f1": float(f1_score(y_test, y_pred)),
        "test_roc_auc": float(roc_auc_score(y_test, y_proba)) if y_proba is not None else np.nan,
        "test_pr_auc": float(average_precision_score(y_test, y_proba)) if y_proba is not None else np.nan,
        "fit_time_s": float(t1 - t0),
        "best_params": gs.best_params_,
        "report": classification_report(y_test, y_pred, digits=3),
        "cm": confusion_matrix(y_test, y_pred).tolist(),
        "has_proba": y_proba is not None,
        "best_estimator": gs.best_estimator_,
    }
    return metrics

def plot_time_vs_auc(summary_df: pd.DataFrame, title: str, outpath: Path):
    plt.figure(figsize=(7, 4.5))
    plt.scatter(summary_df["fit_time_s"], summary_df["test_roc_auc"])
    for _, row in summary_df.iterrows():
        plt.annotate(row["model"], (row["fit_time_s"], row["test_roc_auc"]),
                     xytext=(5, 5), textcoords="offset points")
    plt.xlabel("Fit time (s)")
    plt.ylabel("Test ROC-AUC")
    plt.title(title)
    plt.tight_layout()
    plt.savefig(outpath, dpi=160)
    plt.close()

def plot_curves(best_estimator, X_test, y_test, name: str, outdir: Path):
    if not hasattr(best_estimator["clf"], "predict_proba"):
        return
    proba = best_estimator.predict_proba(X_test)[:, 1]
    # PR curve
    prec, rec, _ = precision_recall_curve(y_test, proba)
    plt.figure(figsize=(6, 4))
    plt.plot(rec, prec)
    plt.xlabel("Recall"); plt.ylabel("Precision"); plt.title(f"PR Curve ({name})")
    plt.tight_layout(); plt.savefig(outdir / "pr_curve.png", dpi=160); plt.close()
    # ROC curve
    fpr, tpr, _ = roc_curve(y_test, proba)
    plt.figure(figsize=(6, 4))
    plt.plot(fpr, tpr, label=f"AUC={auc(fpr, tpr):.3f}")
    plt.plot([0,1], [0,1], "--", alpha=0.5)
    plt.xlabel("FPR"); plt.ylabel("TPR"); plt.title(f"ROC Curve ({name})")
    plt.legend(); plt.tight_layout(); plt.savefig(outdir / "roc_curve.png", dpi=160); plt.close()

# -------------------------
# Main
# -------------------------
def parse_args():
    p = argparse.ArgumentParser(description="Bank Marketing Benchmark (LR, Tree, KNN, SVM-RBF)")
    p.add_argument("--path", type=str, default=None, help="Path to bank dataset CSV")
    p.add_argument("--test-size", type=float, default=0.2, help="Test set size fraction")
    p.add_argument("--random-state", type=int, default=42, help="Random state for reproducibility")
    p.add_argument("--outdir", type=str, default="bank_benchmark_outputs", help="Output directory")
    return p.parse_args()

def main():
    args = parse_args()
    outdir = Path(args.outdir)
    outdir.mkdir(parents=True, exist_ok=True)

    data_path = find_dataset_path(args.path)
    print(f"[INFO] Using dataset: {data_path}")

    df = load_bank_dataset(data_path)
    y = df["y"].values
    X = df.drop(columns=["y"])

    print(f"[INFO] Dataset shape: {df.shape} | Positive rate: {y.mean():.3f}")

    pre, num_cols, cat_cols = build_preprocessor(X)

    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=args.test_size, stratify=y, random_state=args.random_state
    )

    # Define models and grids
    models = [
        ("LogisticRegression",
         LogisticRegression(max_iter=1000, class_weight="balanced"),
         {"clf__C": [0.1, 1, 10, 100]}),

        ("DecisionTree",
         DecisionTreeClassifier(random_state=42, class_weight="balanced"),
         {"clf__max_depth": [None, 6, 10, 14],
          "clf__min_samples_leaf": [1, 5, 10]}),

        ("KNN",
         KNeighborsClassifier(),
         {"clf__n_neighbors": [5, 9, 15, 25],
          "clf__weights": ["uniform", "distance"]}),

        ("SVC(RBF)",
         SVC(kernel="rbf", class_weight="balanced", probability=True),
         {"clf__C": [0.5, 1, 5, 10],
          "clf__gamma": [0.001, 0.01, 0.1]}),
    ]

    results = []
    for name, est, grid in models:
        print(f"[INFO] Tuning {name} ...")
        res = bench_model(name, est, grid, pre, X_train, y_train, X_test, y_test)
        results.append(res)
        print(f"[DONE] {name}: "
              f"ROC-AUC={res['test_roc_auc']:.3f}, PR-AUC={res['test_pr_auc']:.3f}, "
              f"ACC={res['test_acc']:.3f}, F1={res['test_f1']:.3f}, "
              f"time={res['fit_time_s']:.2f}s")

    # Summary table
    summary = pd.DataFrame([
        {k: v for k, v in r.items()
         if k in ["model", "cv_best_score(ROC-AUC)", "test_roc_auc", "test_pr_auc",
                  "test_acc", "test_f1", "fit_time_s", "best_params"]}
        for r in results
    ]).sort_values(["test_roc_auc", "test_pr_auc"], ascending=False).reset_index(drop=True)

    summary_path = outdir / "summary.csv"
    summary.to_csv(summary_path, index=False)
    print(f"\n[INFO] Saved summary -> {summary_path}\n")
    print(summary)

    # Best model details
    best_name = summary.iloc[0]["model"]
    best = next(r for r in results if r["model"] == best_name)
    print("\n[BEST MODEL]", best_name)
    print("Best params:", best["best_params"])
    print("Confusion matrix:\n", np.array(best["cm"]))
    print(best["report"])

    # Plots
    plot_time_vs_auc(summary, f"Trade-off on {data_path.name}", outdir / "time_vs_roc_auc.png")
    plot_curves(best["best_estimator"], X_test, y_test, best_name, outdir)

    print(f"\n[INFO] Saved plots to: {outdir}/")
    print(" - time_vs_roc_auc.png")
    if best.get("has_proba", False):
        print(" - pr_curve.png")
        print(" - roc_curve.png")

if __name__ == "__main__":
    sys.exit(main())



usage: ipykernel_launcher.py [-h] [--path PATH] [--test-size TEST_SIZE]
                             [--random-state RANDOM_STATE] [--outdir OUTDIR]
ipykernel_launcher.py: error: unrecognized arguments: --f=/Users/pigeoneyevideography/Library/Jupyter/runtime/kernel-v378fb4745a58c9e7730e5df876f3f0ae2690eb613.json


SystemExit: 2