
# RAW → PROCESSED Pipeline
## This notebook reads the raw banking transaction dataset and produces:
### cleaned_transactions.csv
### transactions_feature_engineered.csv
### customer_rfm_summary.csv
### customer_features_for_model.csv
### fraud_model_dataset.csv
### churn_model_dataset.csv
### segmentation_dataset.csv


## Libraries


In [6]:
import os
from pathlib import Path
from typing import List, Tuple
import numpy as np
import pandas as pd

## Paths

In [7]:
PROJECT_ROOT = Path("../")
DATA_RAW_DIR = PROJECT_ROOT / "data" / "raw"
DATA_PROCESSED_DIR = PROJECT_ROOT / "data" / "processed"

# Change this if your raw filename is different
RAW_FILE = DATA_RAW_DIR / "banking_transactions_2023_2024.csv"

# Ensure processed dir exists
DATA_PROCESSED_DIR.mkdir(parents=True, exist_ok=True)

print("RAW DIR      :", DATA_RAW_DIR.resolve())
print("PROCESSED DIR:", DATA_PROCESSED_DIR.resolve())
print("RAW FILE     :", RAW_FILE.resolve())

RAW DIR      : /Users/goureshmadye/Documents/DataAnalyst/Integrated-Banking-Insights-Risk-Customer-Analytics/data/raw
PROCESSED DIR: /Users/goureshmadye/Documents/DataAnalyst/Integrated-Banking-Insights-Risk-Customer-Analytics/data/processed
RAW FILE     : /Users/goureshmadye/Documents/DataAnalyst/Integrated-Banking-Insights-Risk-Customer-Analytics/data/raw/banking_transactions_2023_2024.csv


## Helper Functions

In [8]:
def load_raw_transactions(raw_file: Path) -> pd.DataFrame:
    """Load raw transaction CSV."""
    df = pd.read_csv(raw_file)
    print(f"Loaded raw data: {df.shape[0]:,} rows, {df.shape[1]} columns")
    return df


def standardize_column_names(df: pd.DataFrame) -> pd.DataFrame:
    """Strip spaces and unify capitalization / underscores for column names."""
    df = df.copy()
    df.columns = (
        df.columns
        .str.strip()
        .str.replace(" ", "_")
        .str.replace("-", "_")
        .str.replace("/", "_")
    )
    return df


def convert_dtypes(df: pd.DataFrame) -> pd.DataFrame:
    """Convert common columns to appropriate data types."""
    df = df.copy()

    # Date columns
    possible_date_cols = ["Transaction_Date", "transaction_date"]
    for col in possible_date_cols:
        if col in df.columns:
            df[col] = pd.to_datetime(df[col], errors="coerce")

    # Numeric columns (adjust if your dataset has slightly different names)
    numeric_cols = [
        "Transaction_Amount",
        "Customer_Age",
        "Customer_Income",
        "Account_Balance",
        "Discount_Applied",
        "Loyalty_Points_Earned",
    ]
    for col in numeric_cols:
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors="coerce")

    # Fraud flag to binary 0/1 if present
    if "Fraud_Flag" in df.columns:
        mapping = {
            "Yes": 1, "Y": 1, "True": 1, True: 1,
            "No": 0, "N": 0, "False": 0, False: 0
        }
        df["Fraud_Flag"] = df["Fraud_Flag"].map(mapping).fillna(0).astype(int)

    return df


def handle_missing_values(df: pd.DataFrame) -> pd.DataFrame:
    """
    Handle missing values:
    - numeric → median
    - categorical → 'Unknown'
    """
    df = df.copy()

    numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()
    categorical_cols = df.select_dtypes(include=["object", "category"]).columns.tolist()

    for col in numeric_cols:
        median_val = df[col].median()
        df[col].fillna(median_val, inplace=True)

    for col in categorical_cols:
        df[col].fillna("Unknown", inplace=True)

    return df


def cap_outliers(
    df: pd.DataFrame,
    cols: List[str],
    lower_quantile: float = 0.01,
    upper_quantile: float = 0.99,
) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Cap outliers at specified quantiles for given columns.
    Returns:
        df_capped, caps_df (caps_df holds the bounds for reference)
    """
    df = df.copy()
    caps = {}

    for col in cols:
        if col in df.columns:
            lower = df[col].quantile(lower_quantile)
            upper = df[col].quantile(upper_quantile)
            df[col] = df[col].clip(lower=lower, upper=upper)
            caps[col] = {"lower": lower, "upper": upper}

    caps_df = pd.DataFrame(caps).T
    return df, caps_df


def add_time_features(df: pd.DataFrame) -> pd.DataFrame:
    """Add time-based features derived from Transaction_Date."""
    df = df.copy()

    date_col = "Transaction_Date" if "Transaction_Date" in df.columns else None
    if date_col is None:
        print("WARNING: Transaction_Date column not found. Skipping time features.")
        return df

    df["transaction_year"] = df[date_col].dt.year
    df["transaction_month"] = df[date_col].dt.to_period("M").astype(str)
    df["transaction_day"] = df[date_col].dt.day
    df["transaction_hour"] = df[date_col].dt.hour
    df["day_of_week"] = df[date_col].dt.dayofweek  # Monday=0, Sunday=6
    df["is_weekend"] = df["day_of_week"].isin([5, 6]).astype(int)

    return df


def compute_account_behavior_features(df: pd.DataFrame) -> pd.DataFrame:
    """
    Compute customer/account-level behavioral features.
    These features are aggregated per Account_Number.
    """
    df = df.copy()
    if "Account_Number" not in df.columns:
        print("WARNING: Account_Number column not found. Skipping behavior features.")
        return pd.DataFrame()

    # Helper flags
    df["is_failed"] = df["Transaction_Status"].eq("Failed") if "Transaction_Status" in df.columns else 0
    df["is_online"] = df["Payment_Method"].eq("Online") if "Payment_Method" in df.columns else 0

    group_cols = ["Account_Number"]
    agg_dict = {
        "Transaction_ID": "count" if "Transaction_ID" in df.columns else "size",
        "Transaction_Amount": ["sum", "mean", "std"],
        "is_failed": "sum",
        "is_online": "sum",
        "Discount_Applied": "mean" if "Discount_Applied" in df.columns else "sum",
        "Loyalty_Points_Earned": "sum" if "Loyalty_Points_Earned" in df.columns else "sum",
    }

    # Filter out missing columns in agg_dict
    agg_dict_clean = {}
    for key, val in agg_dict.items():
        if key in df.columns:
            agg_dict_clean[key] = val

    acc = df.groupby("Account_Number").agg(agg_dict_clean)

    # Flatten multiindex columns if any
    acc.columns = ["_".join([c for c in col if c]) if isinstance(col, tuple) else col for col in acc.columns]

    # Rename key metrics
    rename_map = {}
    for col in acc.columns:
        if col.startswith("Transaction_ID_"):
            rename_map[col] = "txn_count"
        if col.startswith("Transaction_Amount_sum"):
            rename_map[col] = "total_spend"
        if col.startswith("Transaction_Amount_mean"):
            rename_map[col] = "avg_spend"
        if col.startswith("Transaction_Amount_std"):
            rename_map[col] = "std_spend"
        if col.startswith("is_failed_sum"):
            rename_map[col] = "failed_txn_count"
        if col.startswith("is_online_sum"):
            rename_map[col] = "online_txn_count"

    acc.rename(columns=rename_map, inplace=True)

    # Derive ratios
    if "online_txn_count" in acc.columns and "txn_count" in acc.columns:
        acc["online_txn_ratio"] = acc["online_txn_count"] / acc["txn_count"].replace(0, np.nan)

    if "failed_txn_count" in acc.columns and "txn_count" in acc.columns:
        acc["failed_txn_ratio"] = acc["failed_txn_count"] / acc["txn_count"].replace(0, np.nan)

    acc.reset_index(inplace=True)
    return acc


def compute_rfm(df: pd.DataFrame) -> pd.DataFrame:
    """
    Compute RFM metrics per Account_Number:
    - Recency (days since last transaction)
    - Frequency (transaction count)
    - Monetary (total spend)
    """
    df = df.copy()
    if "Account_Number" not in df.columns:
        print("WARNING: Account_Number column not found. Skipping RFM.")
        return pd.DataFrame()

    date_col = "Transaction_Date"
    if date_col not in df.columns:
        print("WARNING: Transaction_Date column not found. Skipping RFM.")
        return pd.DataFrame()

    max_date = df[date_col].max()
    print("Max transaction date (for recency):", max_date)

    rfm = (
        df.groupby("Account_Number")
        .agg(
            last_txn_date=(date_col, "max"),
            frequency=("Transaction_ID", "count") if "Transaction_ID" in df.columns else (date_col, "count"),
            monetary=("Transaction_Amount", "sum"),
            avg_monetary=("Transaction_Amount", "mean"),
            customer_age=("Customer_Age", "max") if "Customer_Age" in df.columns else (date_col, "size"),
            customer_income=("Customer_Income", "max") if "Customer_Income" in df.columns else (date_col, "size"),
            account_balance=("Account_Balance", "last") if "Account_Balance" in df.columns else (date_col, "size"),
        )
        .reset_index()
    )

    rfm["recency_days"] = (max_date - rfm["last_txn_date"]).dt.days

    return rfm


def compute_category_share(df: pd.DataFrame) -> pd.DataFrame:
    """
    Compute share of spend by Category per Account_Number.
    Returns a wide table with columns like:
    - cat_share_Grocery
    - cat_share_Entertainment
    etc.
    """
    df = df.copy()
    if "Account_Number" not in df.columns or "Category" not in df.columns:
        print("WARNING: Account_Number or Category column not found. Skipping category share.")
        return pd.DataFrame()

    pivot = pd.pivot_table(
        df,
        index="Account_Number",
        columns="Category",
        values="Transaction_Amount",
        aggfunc="sum",
        fill_value=0,
    )

    # Convert to share of total spend
    totals = pivot.sum(axis=1).replace(0, np.nan)
    share = pivot.div(totals, axis=0)

    # Prefix column names
    share.columns = [f"cat_share_{str(c)}" for c in share.columns]
    share.reset_index(inplace=True)
    return share


def build_fraud_dataset(df: pd.DataFrame) -> pd.DataFrame:
    """Prepare dataset for fraud modeling (transaction-level)."""
    df = df.copy()
    if "Fraud_Flag" not in df.columns:
        print("WARNING: Fraud_Flag not found. Fraud dataset may be empty.")
        return pd.DataFrame()

    # Keep only rows where Fraud_Flag is known (0/1)
    fraud_df = df[df["Fraud_Flag"].isin([0, 1])].copy()

    # Example selection of useful columns
    candidate_cols = [
        "Transaction_ID",
        "Account_Number",
        "Transaction_Amount",
        "Transaction_Type",
        "Category",
        "Payment_Method",
        "Transaction_Status",
        "Customer_Age",
        "Customer_Income",
        "Account_Balance",
        "transaction_year",
        "transaction_month",
        "transaction_day",
        "transaction_hour",
        "day_of_week",
        "is_weekend",
        "Fraud_Flag",
    ]

    cols_existing = [c for c in candidate_cols if c in fraud_df.columns]
    fraud_df = fraud_df[cols_existing]

    return fraud_df


def build_churn_dataset(rfm_df: pd.DataFrame, churn_days: int = 90) -> pd.DataFrame:
    """Build churn dataset using RFM summary. Churn = no txn in last `churn_days`."""
    rfm = rfm_df.copy()
    if "recency_days" not in rfm.columns:
        print("WARNING: recency_days not in RFM. Cannot build churn dataset correctly.")
        rfm["churn_label"] = 0
        return rfm

    rfm["churn_label"] = (rfm["recency_days"] > churn_days).astype(int)
    return rfm


def build_segmentation_dataset(
    rfm_df: pd.DataFrame,
    cat_share_df: pd.DataFrame,
) -> pd.DataFrame:
    """
    Build dataset for customer segmentation (RFM + category share + demographics).
    """
    if rfm_df.empty:
        return pd.DataFrame()

    seg = rfm_df.copy()

    if not cat_share_df.empty:
        seg = seg.merge(cat_share_df, on="Account_Number", how="left")

    # Example columns to keep for clustering
    keep_cols = [
        "Account_Number",
        "recency_days",
        "frequency",
        "monetary",
        "avg_monetary",
        "customer_age",
        "customer_income",
        "account_balance",
    ]
    seg_cols_existing = [c for c in keep_cols if c in seg.columns]

    # Add any cat_share columns
    cat_cols = [c for c in seg.columns if c.startswith("cat_share_")]
    seg_cols_existing += cat_cols

    seg = seg[seg_cols_existing]

    return seg

## Run Pipeline

In [9]:
def run_pipeline():
    # 1. Load raw
    df_raw = load_raw_transactions(RAW_FILE)

    # 2. Standardize column names
    df = standardize_column_names(df_raw)

    # 3. Convert data types
    df = convert_dtypes(df)

    # 4. Handle missing values
    df = handle_missing_values(df)

    # 5. Cap outliers for key numeric columns
    outlier_cols = ["Transaction_Amount", "Customer_Income", "Account_Balance"]
    df, caps_df = cap_outliers(df, outlier_cols)
    print("\nOutlier caps used:")
    print(caps_df)

    # Save cleaned base
    cleaned_path = DATA_PROCESSED_DIR / "cleaned_transactions.csv"
    df.to_csv(cleaned_path, index=False)
    print(f"\nSaved cleaned transactions → {cleaned_path}")

    # 6. Add time-based features
    df_feat = add_time_features(df)

    # Save feature engineered transaction-level data
    feat_path = DATA_PROCESSED_DIR / "transactions_feature_engineered.csv"
    df_feat.to_csv(feat_path, index=False)
    print(f"Saved feature-engineered transactions → {feat_path}")

    # 7. Compute account-level behavior features
    acc_behavior = compute_account_behavior_features(df_feat)
    if not acc_behavior.empty:
        behavior_path = DATA_PROCESSED_DIR / "customer_features_for_model.csv"
        acc_behavior.to_csv(behavior_path, index=False)
        print(f"Saved customer behavior features → {behavior_path}")
    else:
        print("No customer behavior features computed.")

    # 8. Compute RFM metrics
    rfm = compute_rfm(df_feat)
    if not rfm.empty:
        rfm_path = DATA_PROCESSED_DIR / "customer_rfm_summary.csv"
        rfm.to_csv(rfm_path, index=False)
        print(f"Saved RFM summary → {rfm_path}")
    else:
        print("RFM summary is empty; check your data.")

    # 9. Compute category share & segmentation dataset
    cat_share = compute_category_share(df_feat)
    seg = build_segmentation_dataset(rfm, cat_share)
    if not seg.empty:
        seg_path = DATA_PROCESSED_DIR / "segmentation_dataset.csv"
        seg.to_csv(seg_path, index=False)
        print(f"Saved segmentation dataset → {seg_path}")
    else:
        print("Segmentation dataset is empty; check your data.")

    # 10. Build fraud modeling dataset
    fraud_df = build_fraud_dataset(df_feat.merge(acc_behavior, on="Account_Number", how="left") if not acc_behavior.empty else df_feat)
    if not fraud_df.empty:
        fraud_path = DATA_PROCESSED_DIR / "fraud_model_dataset.csv"
        fraud_df.to_csv(fraud_path, index=False)
        print(f"Saved fraud model dataset → {fraud_path}")
    else:
        print("Fraud model dataset is empty; check Fraud_Flag column.")

    # 11. Build churn modeling dataset from RFM
    churn_df = build_churn_dataset(rfm, churn_days=90)
    if not churn_df.empty:
        churn_path = DATA_PROCESSED_DIR / "churn_model_dataset.csv"
        churn_df.to_csv(churn_path, index=False)
        print(f"Saved churn model dataset → {churn_path}")
    else:
        print("Churn model dataset is empty; check RFM computations.")

    print("\nPipeline completed successfully ✅")



## Run the pipeline


In [10]:
run_pipeline()

Loaded raw data: 5,389 rows, 20 columns

Outlier caps used:
                         lower        upper
Transaction_Amount     51.3780    4958.1344
Customer_Income     21273.8512  148589.8880
Account_Balance       258.1616   19781.6228

Saved cleaned transactions → ../data/processed/cleaned_transactions.csv
Saved feature-engineered transactions → ../data/processed/transactions_feature_engineered.csv
Saved customer behavior features → ../data/processed/customer_features_for_model.csv
Max transaction date (for recency): 2025-01-20 12:21:18
Saved RFM summary → ../data/processed/customer_rfm_summary.csv
Saved segmentation dataset → ../data/processed/segmentation_dataset.csv
Saved fraud model dataset → ../data/processed/fraud_model_dataset.csv
Saved churn model dataset → ../data/processed/churn_model_dataset.csv

Pipeline completed successfully ✅
