<a href="https://colab.research.google.com/github/Joel-Mk/Fraud-Detection/blob/main/Fraud_Detection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:


import os
import math
import warnings
warnings.filterwarnings("ignore")

import numpy as np
import pandas as pd

from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.preprocessing import StandardScaler
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.metrics import (
    roc_auc_score, average_precision_score, precision_recall_curve,
    roc_curve, classification_report, confusion_matrix
)
from sklearn.linear_model import LogisticRegression
from sklearn.svm import LinearSVC, OneClassSVM
from sklearn.calibration import CalibratedClassifierCV

import matplotlib.pyplot as plt



In [None]:
try:
    from imblearn.over_sampling import SMOTE
    IMBLEARN_AVAILABLE = True
except:
    IMBLEARN_AVAILABLE = False


def load_creditcard_csv(path):
    """
    Expects Kaggle Credit Card Fraud (2013) format with columns:
    V1..V28 (numeric PCA comps), 'Time', 'Amount', 'Class' (0/1).
    Returns X (DataFrame), y (Series).
    """
    df = pd.read_csv(path)
    # Basic sanity
    assert 'Class' in df.columns, "CSV must contain 'Class' as target"
    y = df['Class'].astype(int)
    X = df.drop(columns=['Class'])
    return X, y


In [None]:
def make_synthetic(n=100_000, fraud_rate=0.005, random_state=42):
    """
    Create a synthetic fraud dataset with numeric & simple categorical features.
    Highly imbalanced; signals baked in with subtle shifts for fraud class.
    """
    rng = np.random.default_rng(random_state)

    # Numerical features
    amount = np.abs(rng.normal(50, 30, n))  # skew-ish amounts
    time_of_day = rng.integers(0, 24, n)    # hours
    days_since_signup = rng.exponential(90, n)

    # Categoricals
    country = rng.choice(['US', 'IN', 'GB', 'DE', 'BR', 'ZA', 'SG'], size=n, p=[0.35,0.25,0.1,0.1,0.1,0.05,0.05])
    channel = rng.choice(['web', 'app', 'ivr'], size=n, p=[0.6, 0.35, 0.05])
    device = rng.choice(['ios', 'android', 'desktop'], size=n, p=[0.35, 0.45, 0.2])

    # Latent fraud indicator
    y = (rng.random(n) < fraud_rate).astype(int)

    # Inject signal:
    # Fraud tends to be higher during odd hours, higher amounts, certain channels/countries
    amount += y * rng.normal(80, 50, n)          # larger amounts when fraud
    odd_hour = ((time_of_day < 4) | (time_of_day > 22)).astype(int)
    risky_country = np.isin(country, ['BR', 'SG']).astype(int)
    risky_channel = (channel == 'web').astype(int)
    risky_device = (device == 'android').astype(int)

    # Generate a few engineered features (you can pretend these came from domain knowledge)
    amount_per_day = amount / (1 + days_since_signup)
    risk_score = 0.02*amount + 0.6*odd_hour + 0.7*risky_country + 0.3*risky_channel + 0.3*risky_device \
                 + rng.normal(0, 0.3, n)
    # Increase probability for fraud class
    y = ((risk_score > np.quantile(risk_score, 0.995)) | (y == 1)).astype(int)

    df = pd.DataFrame({
        'amount': amount,
        'time_of_day': time_of_day,
        'days_since_signup': days_since_signup,
        'country': country,
        'channel': channel,
        'device': device,
        'amount_per_day': amount_per_day,
        'risk_hint': risk_score
    })
    return df, pd.Series(y, name='Class')


In [None]:
def make_splits(X, y, test_size=0.2, stratified=True, random_state=42):

    if stratified:
        return train_test_split(
            X, y, test_size=test_size, stratify=y, random_state=random_state
        )
    else:
        # fall back to simple split without stratification
        return train_test_split(X, y, test_size=test_size, random_state=random_state, shuffle=False)

In [None]:
def build_preprocessor(X):

    num_cols = X.select_dtypes(include=[np.number]).columns.tolist()
    cat_cols = X.select_dtypes(exclude=[np.number]).columns.tolist()

    # We'll implement a lightweight preprocessor with two steps inside a Pipeline:
    # 1) pandas.get_dummies for categoricals
    # 2) StandardScaler for all numeric columns (after dummies)
    class PandasDummiesScaler:
        def __init__(self, num_cols, cat_cols):
            self.num_cols = num_cols
            self.cat_cols = cat_cols
            self.columns_ = None
            self.scaler_ = StandardScaler(with_mean=False)  # CSR-friendly if needed

        def fit(self, X, y=None):
            X_ = pd.get_dummies(X, columns=self.cat_cols, drop_first=True)
            self.columns_ = X_.columns
            self.scaler_.fit(X_[self.num_cols])
            return self

        def transform(self, X):
            X_ = pd.get_dummies(X, columns=self.cat_cols, drop_first=True)
            # align columns (handle unseen categories)
            for col in self.columns_:
                if col not in X_.columns:
                    X_[col] = 0
            X_ = X_[self.columns_]
            # scale numeric subset
            X_[self.num_cols] = self.scaler_.transform(X_[self.num_cols])
            return X_.values

    return PandasDummiesScaler(num_cols, cat_cols)

In [None]:
def build_logistic(C=1.0, class_weight="balanced", max_iter=2000):
    return LogisticRegression(
        C=C,
        class_weight=class_weight,
        max_iter=max_iter,
        n_jobs=None,
        solver="liblinear",  # robust for small/medium data and class_weight
    )

def build_linear_svm(C=1.0, class_weight="balanced"):
    # Need probability calibration to get PR/ROC; wrap with CalibratedClassifierCV
    base = LinearSVC(C=C, class_weight=class_weight)
    model = CalibratedClassifierCV(base, method="sigmoid", cv=3)
    return model

def build_oneclass_svm(nu=0.01, gamma='scale'):
    # Unsupervised anomaly detector; we score then threshold by top K% as fraud
    return OneClassSVM(nu=nu, gamma=gamma)


In [None]:
def maybe_smote(X, y, use_smote=True, random_state=42):
    if use_smote and IMBLEARN_AVAILABLE:
        sm = SMOTE(random_state=random_state, sampling_strategy=0.1)  # oversample minority to 10%
        X_res, y_res = sm.fit_resample(X, y)
        return X_res, y_res
    return X, y


In [None]:
def fit_predict(model, preprocessor, X_train, y_train, X_valid):
    preprocessor.fit(X_train, y_train)
    Xtr = preprocessor.transform(X_train)
    Xva = preprocessor.transform(X_valid)
    model.fit(Xtr, y_train)
    # predict_proba if available; else decision_function
    if hasattr(model, "predict_proba"):
        p_valid = model.predict_proba(Xva)[:, 1]
    else:
        # CalibratedClassifierCV provides predict_proba; for OneClassSVM we'll handle separately
        if hasattr(model, "decision_function"):
            scores = model.decision_function(Xva)
            # Scale to 0-1 via min-max for comparability
            p_valid = (scores - scores.min()) / (scores.max() - scores.min() + 1e-9)
        else:
            # fallback
            p_valid = model.predict(Xva)
    return p_valid

In [None]:
def evaluate(y_true, y_score, threshold=None, label="Model"):
    roc = roc_auc_score(y_true, y_score)
    pr_auc = average_precision_score(y_true, y_score)
    print(f"== {label} ==")
    print(f"ROC-AUC: {roc:.4f} | PR-AUC: {pr_auc:.4f}")

    if threshold is None:
        # Choose threshold that maximizes F2 on validation
        precision, recall, thr = precision_recall_curve(y_true, y_score)
        beta = 2.0
        f2 = (1 + beta**2) * (precision * recall) / (beta**2 * precision + recall + 1e-12)
        best_idx = np.nanargmax(f2)
        threshold = thr[max(best_idx - 1, 0)] if best_idx < len(thr) else 0.5
        print(f"Chosen threshold (F2-optimal): {threshold:.4f}")

    y_pred = (y_score >= threshold).astype(int)
    print("\nClassification report:")
    print(classification_report(y_true, y_pred, digits=4))
    print("Confusion matrix:")
    print(confusion_matrix(y_true, y_pred))
    return threshold


In [None]:
def plot_curves(y_true, y_score, title_suffix=""):
    # ROC
    fpr, tpr, _ = roc_curve(y_true, y_score)
    plt.figure(figsize=(6,4))
    plt.plot(fpr, tpr, label="ROC")
    plt.plot([0,1], [0,1], linestyle="--")
    plt.xlabel("False Positive Rate")
    plt.ylabel("True Positive Rate")
    plt.title(f"ROC Curve {title_suffix}")
    plt.legend()
    plt.show()

    # PR
    precision, recall, _ = precision_recall_curve(y_true, y_score)
    plt.figure(figsize=(6,4))
    plt.plot(recall, precision, label="PR")
    plt.xlabel("Recall")
    plt.ylabel("Precision")
    plt.title(f"Precision-Recall Curve {title_suffix}")
    plt.legend()
    plt.show()

In [None]:
def run_pipeline(
    data_source="synthetic",
    data_path=None,
    test_size=0.2,
    use_smote=False,
    model_name="logreg",
    random_state=42
):
    """
    data_source: "csv" or "synthetic"
    model_name: "logreg" | "linsvm" | "oneclass"
    """
    # 1) Load data
    if data_source == "csv":
        assert data_path and os.path.exists(data_path), "Provide valid data_path for CSV mode"
        X, y = load_creditcard_csv(data_path)
    else:
        X, y = make_synthetic(n=120_000, fraud_rate=0.006, random_state=random_state)

    # 2) Split
    X_train, X_valid, y_train, y_valid = make_splits(X, y, test_size=test_size, stratified=True, random_state=random_state)

    # 3) Preprocessor
    pre = build_preprocessor(X)

    # 4) Choose model
    if model_name == "logreg":
        model = build_logistic(C=1.0, class_weight="balanced")
    elif model_name == "linsvm":
        model = build_linear_svm(C=1.0, class_weight="balanced")
    elif model_name == "oneclass":
        # One-class uses only "normal" class for training
        normal_mask = (y_train == 0)
        model = build_oneclass_svm(nu=0.01)
        # Fit on normal only
        pre.fit(X_train[normal_mask], y_train[normal_mask])
        Xtr = pre.transform(X_train[normal_mask])
        model.fit(Xtr)
        # Scoring on valid
        Xva = pre.transform(X_valid)
        scores = -model.decision_function(Xva)  # higher = more anomalous
        y_score = (scores - scores.min()) / (scores.max() - scores.min() + 1e-9)
        print("== One-Class SVM (unsupervised baseline) ==")
        threshold = evaluate(y_valid, y_score, threshold=None, label="OneClassSVM")
        plot_curves(y_valid, y_score, title_suffix="(OneClassSVM)")
        return {
            "model": model, "preprocessor": pre, "threshold": threshold
        }
    else:
        raise ValueError("Unknown model_name")

    # 5) Optional: SMOTE on training only (supervised models)
    if use_smote:
        if IMBLEARN_AVAILABLE:
            Xtr_df = pd.DataFrame(pre.fit_transform(X_train, y_train))
            Xtr_df.columns = [f"f{i}" for i in range(Xtr_df.shape[1])]
            X_res, y_res = maybe_smote(Xtr_df, y_train, use_smote=True, random_state=random_state)
            model.fit(X_res, y_res)
            # Score on valid
            y_score = fit_predict(model, pre, X_train, y_train, X_valid)
        else:
            print("imblearn not available; proceeding without SMOTE.")
            y_score = fit_predict(model, pre, X_train, y_train, X_valid)
    else:
        y_score = fit_predict(model, pre, X_train, y_train, X_valid)

    # 6) Evaluate + plots
    label = "LogisticRegression" if model_name == "logreg" else "LinearSVM (calibrated)"
    threshold = evaluate(y_valid, y_score, threshold=None, label=label)
    plot_curves(y_valid, y_score, title_suffix=f"({label})")

    return {
        "model": model,
        "preprocessor": pre,
        "threshold": threshold
    }

In [None]:
def predict_proba_pipeline(pipeline_dict, X_new_df):
    """
    Given trained {model, preprocessor, threshold},
    return probabilities and binary predictions.
    """
    model = pipeline_dict["model"]
    pre = pipeline_dict["preprocessor"]
    thr = pipeline_dict["threshold"]

    Xn = pre.transform(X_new_df)
    if hasattr(model, "predict_proba"):
        p = model.predict_proba(Xn)[:, 1]
    elif hasattr(model, "decision_function"):
        scores = model.decision_function(Xn)
        p = (scores - scores.min()) / (scores.max() - scores.min() + 1e-9)
    else:
        p = model.predict(Xn)

    pred = (p >= thr).astype(int)
    return p, pred


In [None]:
if __name__ == "__main__":
    # Option A: CSV (Kaggle creditcard.csv)
    # PIPE = run_pipeline(data_source="csv", data_path="creditcard.csv", model_name="logreg", use_smote=False)

    # Option B: Synthetic (works out of the box)
    PIPE = run_pipeline(data_source="synthetic", model_name="logreg", use_smote=False)

    # Example: inference on 5 new samples (synthetic)
    X_new, _ = make_synthetic(n=5, fraud_rate=0.2, random_state=7)  # higher fraud-rate for demo
    probs, preds = predict_proba_pipeline(PIPE, X_new)
    print("\nSample inference:")
    print(pd.DataFrame({"prob_fraud": probs, "pred": preds}))