In [None]:
"""
credit_card_fraud.py
====================
Detect fraudulent credit-card transactions on a highly-imbalanced data set.

Supported workflows
-------------------
1. Supervised classifiers + imbalance handling
   • Logistic Regression          (plain / SMOTE / Random-Under-Sampling)
   • Random Forest                (plain / SMOTE / RUS)

2. Unsupervised anomaly detectors
   • Isolation Forest
   • Local Outlier Factor (LOF)

Quick examples
--------------
# Train Logistic-Regression + SMOTE, evaluate, save model
python credit_card_fraud.py --data data/creditcard.csv --target Class \
       --model lr_smote --test_size 0.2

# Train Isolation Forest (unsupervised) on *all* data
python credit_card_fraud.py --data data/creditcard.csv --model iso_forest

# Predict on one new transaction (JSON), using a saved model
python credit_card_fraud.py --predict sample_tx.json \
       --model_path models/fraud_model.pkl
"""
from __future__ import annotations

import argparse, json, sys
from pathlib import Path
from typing import Tuple, Dict

import joblib
import numpy as np
import pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.metrics import (
    precision_recall_fscore_support,
    roc_auc_score,
    average_precision_score,
)
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from imblearn.over_sampling import SMOTE
from imblearn.under_sampling import RandomUnderSampler
from imblearn.pipeline import Pipeline as ImbPipeline

RANDOM_STATE = 42
# ---------------------------------------------------------------------
# Utility helpers
# ---------------------------------------------------------------------
def load(path: str | Path) -> pd.DataFrame:
    return pd.read_csv(path)

def split_X_y(df: pd.DataFrame, target: str | None) -> Tuple[pd.DataFrame, pd.Series | None]:
    if target is None:
        return df, None
    if target not in df.columns:
        raise ValueError(f"Target column '{target}' not found")
    return df.drop(columns=[target]), df[target]

def num_preprocessor(cols) -> ColumnTransformer:
    pipe = Pipeline(
        [("imp", SimpleImputer(strategy="median")), ("sc", StandardScaler())]
    )
    return ColumnTransformer([("num", pipe, cols)])

def get_supervised(name: str):
    name = name.lower()
    if name == "lr":
        return LogisticRegression(max_iter=1000, n_jobs=-1, random_state=RANDOM_STATE)
    if name == "rf":
        return RandomForestClassifier(
            n_estimators=500, n_jobs=-1, random_state=RANDOM_STATE
        )
    raise ValueError("Unknown supervised base model")

def build_pipeline(key: str, X: pd.DataFrame):
    # Unsupervised
    if key == "iso_forest":
        return IsolationForest(contamination=0.001, random_state=RANDOM_STATE)
    if key == "lof":
        return LocalOutlierFactor(n_neighbors=20, novelty=True)

    # Supervised + imbalance strategies
    imb = None
    if key.endswith("_smote"):
        base_key, imb = key.replace("_smote", ""), "smote"
    elif key.endswith("_rus"):
        base_key, imb = key.replace("_rus", ""), "rus"
    else:
        base_key = key

    clf = get_supervised(base_key)
    pre = num_preprocessor(X.columns.tolist())

    steps = [("pre", pre)]
    if imb == "smote":
        steps.append(("smote", SMOTE(random_state=RANDOM_STATE)))
    if imb == "rus":
        steps.append(("rus", RandomUnderSampler(random_state=RANDOM_STATE)))
    steps.append((base_key, clf))
    return ImbPipeline(steps)

def metrics(y_true, y_pred, y_prob) -> Dict[str, float]:
    precision, recall, f1, _ = precision_recall_fscore_support(
        y_true, y_pred, average="binary", zero_division=0
    )
    return dict(
        precision=precision,
        recall=recall,
        f1=f1,
        roc_auc=roc_auc_score(y_true, y_prob) if y_prob is not None else np.nan,
        pr_auc=average_precision_score(y_true, y_prob)
        if y_prob is not None
        else np.nan,
    )

# ---------------------------------------------------------------------
# CLI main
# ---------------------------------------------------------------------
def main():
    p = argparse.ArgumentParser(description="Credit-Card Fraud Detection")
    p.add_argument("--data", type=str, help="CSV dataset path")
    p.add_argument(
        "--target",
        type=str,
        default="Class",
        help="Fraud label column (1=fraud, 0=legit). Omit for unsupervised",
    )
    p.add_argument(
        "--model",
        type=str,
        default="lr_smote",
        help="lr, rf, lr_smote, rf_rus, iso_forest, lof",
    )
    p.add_argument("--test_size", type=float, default=0.2, help="Hold-out fraction")
    p.add_argument("--save_dir", type=str, default="models")
    p.add_argument("--predict", type=str, help="JSON file for single inference")
    p.add_argument("--model_path", type=str, help="Saved model for --predict")
    args = p.parse_args()

    # Inference-only path
    if args.predict and args.model_path:
        model = joblib.load(args.model_path)
        record = pd.DataFrame([json.load(open(args.predict))])
        if hasattr(model, "predict_proba"):
            prob = model.predict_proba(record)[0][1]
            print(f"Fraud probability: {prob:.4f}")
        else:
            score = -model.decision_function(record)[0]
            print(f"Anomaly score: {score:.4f}  (higher = more anomalous)")
        sys.exit(0)

    # -----------------------------------------------------------------
    # Training / evaluation path
    # -----------------------------------------------------------------
    if not args.data:
        p.error("--data is required for training")

    df = load(args.data)
    X, y = split_X_y(df, None if args.model in {"iso_forest", "lof"} else args.target)

    model = build_pipeline(args.model, X)

    if y is None:
        # Unsupervised: fit on entire data
        model.fit(X)
    else:
        X_train, X_test, y_train, y_test = train_test_split(
            X,
            y,
            test_size=args.test_size,
            stratify=y,
            random_state=RANDOM_STATE,
        )
        model.fit(X_train, y_train)
        y_pred = model.predict(X_test)
        y_prob = (
            model.predict_proba(X_test)[:, 1] if hasattr(model, "predict_proba") else None
        )
        m = metrics(y_test, y_pred, y_prob)
        print(
            "Precision: {precision:.3f}  Recall: {recall:.3f}  "
            "F1: {f1:.3f}  ROC_AUC: {roc_auc:.3f}  PR_AUC: {pr_auc:.3f}".format(**m)
        )

    # Save model
    Path(args.save_dir).mkdir(parents=True, exist_ok=True)
    fp = Path(args.save_dir) / "fraud_model.pkl"
    joblib.dump(model, fp)
    print(f"Model saved ➜ {fp.resolve()}")

if __name__ == "__main__":
    main()
