# Preprocessing Pipeline - Experiment Documentation

## Class Imbalance Handling Strategy

This notebook handles the severe class imbalance (0.58% fraud rate) through **cost-sensitive learning** rather than synthetic oversampling.

---

## Experiments Conducted

### Experiment 1: SMOTE Oversampling (REJECTED)

**Status:** Tested and rejected - code preserved below (commented out)

**Configuration Tested:**
```python
# SMOTE(sampling_strategy=0.1, k_neighbors=5, random_state=42)
# Result: 9.09% fraud rate in training data
```

**Results:**
- Validation F1: 0.53
- Test F1: **0.008** (catastrophic failure)
- Reason: Distribution shift between synthetic training data and real test data

---

### Experiment 2: SMOTE Variants Testing (ALL REJECTED)

We tested multiple SMOTE variants and sampling strategies:

| Variant | Sampling | Test F1 | Verdict |
|---------|----------|---------|---------|
| No SMOTE (Baseline) | 0.58% | **0.290** | **BEST** |
| SMOTE | 1% | 0.141 | Rejected |
| SMOTE | 2% | 0.248 | Rejected |
| SMOTE | 5% | 0.196 | Rejected |
| SMOTE | 10% | 0.183 | Rejected |
| BorderlineSMOTE | 5% | 0.095 | Rejected |
| ADASYN | 5% | 0.088 | Rejected |
| SMOTEENN | 5% | 0.265 | Rejected |
| SMOTETomek | 5% | 0.182 | Rejected |

**Conclusion:** All oversampling methods degraded test performance.

---

### Experiment 3: Natural Distribution + Class Weights (ADOPTED)

**Current Implementation:**

- Training data uses **natural distribution** (0.58% fraud)
- Class imbalance handled via:
  - **XGBoost:** `scale_pos_weight = (1 - fraud_rate) / fraud_rate`
  - **Deep Learning:** Focal Loss or Weighted BCE with `pos_weight`
- Threshold optimized on validation set using PR curve

---

## Code Organization

The SMOTE experiment code is preserved below (Section 9) but **commented out**. The active code uses natural distribution with class weights.



# Local Setup Instructions

## Prerequisites Checklist

Before running this notebook, ensure you have completed the following setup:

- [ ] **Java 11 installed** and `JAVA_HOME` configured
  - macOS: `brew install openjdk@11`
  - Set: `export JAVA_HOME=$(/usr/libexec/java_home -v 11)`
- [ ] **Conda environment `fraud-shield` created and activated**
  - Create: `conda env create -f environment.yml`
  - Activate: `conda activate fraud-shield`
- [ ] **PySpark verified working**
  - Test: `python -c "from pyspark.sql import SparkSession; print('PySpark OK')"`
- [ ] **Data directories created**
  - `data/checkpoints/` - for EDA checkpoints
  - `data/processed/` - for preprocessed data
  - `models/` - for saved preprocessors
- [ ] **EDA checkpoint available**
  - Run `01-local-fraud-detection-eda.ipynb` first
  - Checkpoint should exist: `data/checkpoints/section8_geographic_features.parquet`

## Environment Activation

```bash
conda activate fraud-shield
```

## Checkpoint Requirements

This notebook requires the Section 8 checkpoint from the EDA notebook:
- `data/checkpoints/section8_geographic_features.parquet`

**Checkpoint & Parquet Update Behavior:** Processed parquet files (train/val/test) and saved pipelines are updated when the notebook runs the relevant sections and the save path executes. They do not auto-update on a schedule. If you change the EDA checkpoint or preprocessing code, re-run the affected sections to refresh outputs.

**Note:** This is a local execution version configured for the `fraud-shield` conda environment on your local machine.

# Spark MLlib Preprocessing Pipeline for Fraud Detection

**Notebook:** 02-local-preprocessing.ipynb
**Objective:** Production-grade preprocessing using Spark MLlib for scalability

**Architecture:**
- Training data: Loaded from EDA Section 8 checkpoint (geographic features, 35 columns)
- Test data: Loaded separately from `fraudTest.csv` (never used for fitting)
- Preprocessing: Spark MLlib Pipeline (StringIndexer, VectorAssembler, StandardScaler, Imputer)
- Class Imbalance: Handled via cost-sensitive learning (NOT SMOTE - see experiment documentation)
- Output: Both Spark MLlib and sklearn pipelines saved for flexibility

**Pipeline Order:**
1. Load section8 checkpoint (row-level geographic + temporal features only)
2. Compute row-level features before split (day_of_week, peak flags, risk category)
3. Time-aware train/val split (80% / 10%)
4. Compute card-level + velocity features after split using point-in-time windows
5. Derive interaction and risk score features
6. Feature selection (~30 features including amt, city_pop, velocity)
7. Fit StandardScaler on training data only
8. Transform train / val / test consistently

**Data Leakage Safeguards:**
1. Card-level features (transaction_count, card_age_days) computed via cumulative backward-only windows per split
2. Velocity features (txn_count_last_1h, txn_count_last_24h) use backward time-range windows
3. Validation features computed on train+val union so val rows see training history (past = correct) without future leakage
4. Test data loaded separately, never used for fitting
5. Preprocessor fitted only on training data
6. Time-aware split prevents future data leakage

**Class Imbalance Strategy:**
- **SMOTE:** Tested and rejected (see experiment log above)
- **Adopted:** Natural distribution + class weights (scale_pos_weight, Focal Loss)
- **Time-aware Split:** train = first 80% of period, val = next 10%; test = separate file

In [1]:
# ============================================================
# GLOBAL IMPORTS & DEPENDENCIES
# ============================================================

import os
import shutil
import subprocess
import sys
from pathlib import Path
from typing import Tuple, List, Dict, Optional
import warnings
warnings.filterwarnings('ignore')

# Data Processing
import pandas as pd
import numpy as np

# PySpark & Spark MLlib
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.utils import AnalysisException
from pyspark.sql.window import Window
from pyspark.sql.functions import (
    col, to_timestamp, when, isnan, isnull,
    min as spark_min, max as spark_max,
    dayofweek, hour, month, datediff, lit,
    sum as spark_sum, count, floor, broadcast, first, trim,
    avg as spark_avg, coalesce, monotonically_increasing_id,
    sin, cos, sqrt, radians, atan2,
)
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler, Imputer
from pyspark.ml.functions import vector_to_array
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.types import StructType, StructField, IntegerType

# Sklearn (for pipeline conversion and inference)
from sklearn.preprocessing import StandardScaler as SklearnStandardScaler
from sklearn.preprocessing import LabelEncoder as SklearnLabelEncoder
from sklearn.impute import SimpleImputer as SklearnSimpleImputer
from sklearn.pipeline import Pipeline as SklearnPipeline
from sklearn.compose import ColumnTransformer

# Utilities
import pickle
import joblib
from datetime import datetime

print("All dependencies loaded successfully")

All dependencies loaded successfully


## Section 0: Setup & Configuration

Configure project paths, directories, and Spark session. All paths are resolved relative to the project root for consistency.

In [2]:
# ============================================================
# CONFIGURATION & PATHS
# ============================================================

NOTEBOOK_DIR = Path.cwd()
if NOTEBOOK_DIR.name == "local_notebooks":
    PROJECT_ROOT = NOTEBOOK_DIR.parent
else:
    PROJECT_ROOT = NOTEBOOK_DIR

os.chdir(PROJECT_ROOT)

DATA_DIR = PROJECT_ROOT / "data"
CHECKPOINT_DIR = DATA_DIR / "checkpoints"
INPUT_DIR = DATA_DIR / "input"
MODELS_DIR = PROJECT_ROOT / "models"
PROCESSED_DATA_DIR = DATA_DIR / "processed"

MODELS_DIR.mkdir(exist_ok=True)
PROCESSED_DATA_DIR.mkdir(exist_ok=True)

# Data paths
CHECKPOINT_SECTION8 = CHECKPOINT_DIR / 'section8_geographic_features.parquet'
TEST_DATA_PATH = INPUT_DIR / 'fraudTest.csv'

# Output paths
PREPROCESSED_TRAIN_PATH = PROCESSED_DATA_DIR / 'train_preprocessed.parquet'
PREPROCESSED_VAL_PATH = PROCESSED_DATA_DIR / 'val_preprocessed.parquet'
PREPROCESSED_TEST_PATH = PROCESSED_DATA_DIR / 'test_preprocessed.parquet'
SPARK_PREPROCESSER_PATH = MODELS_DIR / 'spark_preprocessor.pkl'
SKLEARN_PREPROCESSER_PATH = MODELS_DIR / 'sklearn_preprocessor.pkl'
FEATURE_NAMES_PATH = MODELS_DIR / 'feature_names.pkl'

print(f"Project root: {PROJECT_ROOT}")
print(f"Data directory: {DATA_DIR}")
print(f"Checkpoint directory: {CHECKPOINT_DIR}")
print(f"Input directory: {INPUT_DIR}")
print(f"Models directory: {MODELS_DIR}")
print(f"Processed data directory: {PROCESSED_DATA_DIR}")

Project root: /home/alireza/Desktop/projects/fraud-shield-ai
Data directory: /home/alireza/Desktop/projects/fraud-shield-ai/data
Checkpoint directory: /home/alireza/Desktop/projects/fraud-shield-ai/data/checkpoints
Input directory: /home/alireza/Desktop/projects/fraud-shield-ai/data/input
Models directory: /home/alireza/Desktop/projects/fraud-shield-ai/models
Processed data directory: /home/alireza/Desktop/projects/fraud-shield-ai/data/processed


In [3]:
python_exec = sys.executable
print(python_exec)

/home/alireza/anaconda3/envs/fraud-shield/bin/python


In [4]:
# ============================================================
# SET JAVA_HOME FROM CONDA ENVIRONMENT
# ============================================================

# Stop any existing Spark session first
try:
    spark_existing = SparkSession.getActiveSession()
    if spark_existing:
        print("Stopping existing Spark session...")
        spark_existing.stop()
except:
    pass

# Auto-detect JAVA_HOME from conda environment
if 'JAVA_HOME' not in os.environ or not os.environ.get('JAVA_HOME'):
    # Method 1: Try conda environment (check sys.executable path)
    python_exec = sys.executable
    if 'envs' in python_exec:
        # Extract conda env path from Python executable
        env_path = python_exec.split('envs/')[0] + 'envs/' + python_exec.split('envs/')[1].split('/')[0]
        java_bin = os.path.join(env_path, 'bin', 'java')
        if os.path.exists(java_bin):
            os.environ['JAVA_HOME'] = env_path
            print(f"✓ JAVA_HOME set from conda environment: {env_path}")
        else:
            # Method 2: Try CONDA_PREFIX if available
            conda_prefix = os.environ.get('CONDA_PREFIX')
            if conda_prefix:
                java_bin = os.path.join(conda_prefix, 'bin', 'java')
                if os.path.exists(java_bin):
                    os.environ['JAVA_HOME'] = conda_prefix
                    print(f"✓ JAVA_HOME set from CONDA_PREFIX: {conda_prefix}")
                else:
                    # Method 3: Find Java via which
                    java_path = shutil.which('java')
                    if java_path and os.path.exists(java_path):
                        java_home = os.path.dirname(os.path.dirname(java_path))
                        os.environ['JAVA_HOME'] = java_home
                        print(f"✓ JAVA_HOME set from system Java: {java_home}")
                    else:
                        raise RuntimeError(
                            "Java not found. Please install Java 11:\n"
                            "  conda install -c conda-forge openjdk=11\n"
                            "Then restart the Jupyter kernel."
                        )
            else:
                # Method 3: Find Java via which
                java_path = shutil.which('java')
                if java_path and os.path.exists(java_path):
                    java_home = os.path.dirname(os.path.dirname(java_path))
                    os.environ['JAVA_HOME'] = java_home
                    print(f"✓ JAVA_HOME set from system Java: {java_home}")
                else:
                    raise RuntimeError(
                        "Java not found. Please install Java 11:\n"
                        "  conda install -c conda-forge openjdk=11\n"
                        "Then restart the Jupyter kernel."
                    )
    else:
        # Not in conda, try system Java
        java_path = shutil.which('java')
        if java_path and os.path.exists(java_path):
            java_home = os.path.dirname(os.path.dirname(java_path))
            os.environ['JAVA_HOME'] = java_home
            print(f"✓ JAVA_HOME set from system Java: {java_home}")
        else:
            raise RuntimeError(
                "Java not found. Please install Java 11:\n"
                "  conda install -c conda-forge openjdk=11\n"
                "Then restart the Jupyter kernel."
            )
else:
    print(f"✓ JAVA_HOME already set: {os.environ['JAVA_HOME']}")

# Verify Java is accessible
java_home = os.environ.get('JAVA_HOME')
if java_home:
    java_exe = os.path.join(java_home, 'bin', 'java')
    if not os.path.exists(java_exe):
        print(f"⚠ Warning: Java executable not found at {java_exe}")
    else:
        # Test Java version
        try:
            result = subprocess.run([java_exe, '-version'], capture_output=True, text=True, stderr=subprocess.STDOUT, timeout=5)
            print(f"✓ Java verified: {result.stdout.split(chr(10))[0] if result.stdout else 'Java is working'}")
        except:
            print("✓ Java path set (version check skipped)")

# ============================================================
# INITIALIZE SPARK SESSION
# ============================================================

spark = SparkSession.builder \
    .appName("FraudDetectionPreprocessing") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.sql.execution.arrow.maxRecordsPerBatch", "10000") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()

print("Spark session initialized with optimized memory settings")
print(f"Spark version: {spark.version}")

✓ JAVA_HOME already set: /home/alireza/anaconda3/envs/fraud-shield/lib/jvm
✓ Java path set (version check skipped)


26/02/06 22:38:20 WARN Utils: Your hostname, zanganeh-ai resolves to a loopback address: 127.0.1.1; using 192.168.86.248 instead (on interface wlp129s0)
26/02/06 22:38:20 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


26/02/06 22:38:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark session initialized with optimized memory settings
Spark version: 3.5.0


In [5]:
# ============================================================
# TIMEZONE PIPELINE SETUP
# ============================================================

# Try to import from scripts (local/production)
try:
    import sys
    if Path.cwd().name == "local_notebooks":
        sys.path.insert(0, str(Path.cwd().parent))
    from scripts.timezone_pipeline import TimezonePipeline
    print("✓ Imported timezone pipeline from scripts/")
except ImportError:
    # Fallback: define inline (for Kaggle/Colab)
    print("⚠️ Could not import from scripts/, defining inline...")
    # [Copy class definitions here if needed for Kaggle/Colab]
    print("✓ Timezone pipeline defined inline")

# Build ZIP reference table (same as EDA notebook)
GRID_SIZE = 0.5  # ~50km grid resolution
full_zip_path = os.path.join(INPUT_DIR, "uszips.csv")

if os.path.exists(full_zip_path):
    zip_ref_df = (
        spark.read.csv(full_zip_path, header=True, inferSchema=True)
        .withColumnRenamed("zip", "zip_ref")
        .withColumn("lat_grid", floor(col("lat") / GRID_SIZE))
        .withColumn("lng_grid", floor(col("lng") / GRID_SIZE))
        .select("lat_grid", "lng_grid", "timezone")
        .filter(
            (trim(col("timezone")) != "") &
            (col("timezone") != "FALSE") &
            col("timezone").rlike("^[A-Za-z_/]+$")
        )
        .distinct()
        .groupBy("lat_grid", "lng_grid")
        .agg(first("timezone").alias("timezone"))
        .cache()
    )
    
    print(f"✓ ZIP reference table created: {zip_ref_df.count():,} unique grid cells")
    
    # Create TimezonePipeline instance
    timezone_pipeline = TimezonePipeline(
        zip_ref_df=zip_ref_df,
        grid_size=GRID_SIZE
    )
    print("✓ TimezonePipeline instance created")
else:
    print(f"⚠️ Warning: uszips.csv not found at {full_zip_path}")
    print("  Timezone conversion will be skipped. Ensure EDA notebook has been run first.")
    timezone_pipeline = None

✓ Imported timezone pipeline from scripts/


✓ ZIP reference table created: 3,197 unique grid cells
✓ TimezonePipeline instance created


## Section 1: Data Loading

Load training data from the Section 8 EDA checkpoint (geographic features, 35 columns) and test data from CSV. Training and test data are kept separate to prevent data leakage.

In [6]:
# ============================================================
# DATA LOADING
# ============================================================

def load_training_data() -> DataFrame:
    """Load training data from EDA Section 8 checkpoint."""
    if not CHECKPOINT_SECTION8.exists():
        raise FileNotFoundError(
            f"EDA checkpoint not found: {CHECKPOINT_SECTION8}\n"
            "Please run the EDA notebook (01-local-fraud-detection-eda.ipynb) first."
        )

    spark.catalog.clearCache()
    try:
        spark.catalog.refreshByPath(str(CHECKPOINT_SECTION8))
    except (AnalysisException, AttributeError):
        pass

    print(f"Loading training data from checkpoint: {CHECKPOINT_SECTION8}")
    spark_df = spark.read.parquet(str(CHECKPOINT_SECTION8)).cache()
    total_rows = spark_df.count()

    print(f"  Rows: {total_rows:,}")
    print(f"  Columns: {len(spark_df.columns)}")

    fraud_yes = spark_df.filter(col("is_fraud") == 1).count()
    fraud_rate = fraud_yes / total_rows if total_rows else 0.0
    print(f"  Fraud rate: {fraud_rate:.4%}")

    return spark_df


def load_test_data() -> DataFrame:
    """Load test data from CSV. Never used for fitting."""
    if not TEST_DATA_PATH.exists():
        raise FileNotFoundError(
            f"Test data not found: {TEST_DATA_PATH}\n"
            "Please ensure fraudTest.csv is in the data/input/ directory."
        )

    print(f"Loading test data from: {TEST_DATA_PATH}")
    spark_df = spark.read.csv(str(TEST_DATA_PATH), header=True, inferSchema=True)
    print(f"  Rows: {spark_df.count():,}, Columns: {len(spark_df.columns)}")

    if "is_fraud" in spark_df.columns:
        fraud_rate = spark_df.filter(col("is_fraud") == 1).count() / spark_df.count()
        print(f"  Fraud rate: {fraud_rate:.4%}")

    return spark_df


train_raw = load_training_data()
test_df_raw = load_test_data()

Loading training data from checkpoint: /home/alireza/Desktop/projects/fraud-shield-ai/data/checkpoints/section8_geographic_features.parquet


26/02/06 22:38:24 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.






  Rows: 1,296,675
  Columns: 35
  Fraud rate: 0.5789%
Loading test data from: /home/alireza/Desktop/projects/fraud-shield-ai/data/input/fraudTest.csv


  Rows: 555,719, Columns: 23
  Fraud rate: 0.3860%


## Section 2: Row-Level Features (Before Split)

Compute features that depend only on the current row. These are safe to compute before the train/val split because they introduce no cross-row information.

In [7]:
# ============================================================
# ROW-LEVEL FEATURES (safe before split)
# ============================================================

def add_row_level_features(df: DataFrame) -> DataFrame:
    """Add features derived from a single row only (no aggregation across rows)."""

    # Ensure timestamp column exists
    if "merchant_local_time" not in df.columns and "trans_date_trans_time" in df.columns:
        df = df.withColumn("merchant_local_time", to_timestamp(col("trans_date_trans_time")))

    if "trans_date_trans_time" in df.columns:
        df = df.withColumn("trans_date_trans_time", to_timestamp(col("trans_date_trans_time")))

    # Temporal
    if "hour" not in df.columns:
        df = df.withColumn("hour", hour(to_timestamp(col("merchant_local_time"))))
    if "day_of_week" not in df.columns:
        df = df.withColumn("day_of_week", dayofweek(to_timestamp(col("merchant_local_time"))))
    if "month" not in df.columns:
        df = df.withColumn("month", month(to_timestamp(col("merchant_local_time"))))
    if "time_bin" not in df.columns:
        df = df.withColumn(
            "time_bin",
            when(col("hour").between(6, 11), "Morning")
            .when(col("hour").between(12, 17), "Afternoon")
            .when(col("hour").between(18, 23), "Evening")
            .otherwise("Night"),
        )

    # Peak flags
    df = df.withColumn("is_peak_fraud_hour", when(col("hour").between(18, 23), 1).otherwise(0))
    df = df.withColumn("is_peak_fraud_day", when(col("day_of_week").isin([4, 5, 6]), 1).otherwise(0))
    df = df.withColumn("is_peak_fraud_season", when(col("month").isin([1, 2]), 1).otherwise(0))

    # High-risk category flag
    high_risk_cats = ["grocery_pos", "gas_transport", "shopping_pos", "misc_pos", "grocery_net"]
    df = df.withColumn("is_high_risk_category", when(col("category").isin(high_risk_cats), 1).otherwise(0))

    # Amount bin / city_size (needed later for interaction features)
    if "amount_bin" not in df.columns and "amt" in df.columns:
        df = df.withColumn(
            "amount_bin",
            when(col("amt") > 1000, ">$1000")
            .when(col("amt") > 500, "$500-$1000")
            .when(col("amt") > 300, "$300-$500")
            .when(col("amt") > 100, "$100-$300")
            .when(col("amt") > 50, "$50-$100")
            .otherwise("<$50"),
        )
    if "city_size" not in df.columns and "city_pop" in df.columns:
        df = df.withColumn(
            "city_size",
            when(col("city_pop") > 1000000, "Metropolitan")
            .when(col("city_pop") > 500000, "Large City")
            .when(col("city_pop") > 100000, "Medium City")
            .when(col("city_pop") > 10000, "Small City")
            .otherwise("Small Town"),
        )

    # Haversine distance between customer and merchant
    if "customer_merchant_distance_km" not in df.columns:
        if all(c in df.columns for c in ["lat", "long", "merch_lat", "merch_long"]):
            lat1 = radians(col("lat"))
            lon1 = radians(col("long"))
            lat2 = radians(col("merch_lat"))
            lon2 = radians(col("merch_long"))
            dlat = lat2 - lat1
            dlon = lon2 - lon1
            a = sin(dlat / 2) ** 2 + cos(lat1) * cos(lat2) * sin(dlon / 2) ** 2
            c = 2 * atan2(sqrt(a), sqrt(1 - a))
            df = df.withColumn("customer_merchant_distance_km", lit(6371.0) * c)

    return df


train_with_row_feats = add_row_level_features(train_raw)
print(f"After row-level features: {len(train_with_row_feats.columns)} columns")

After row-level features: 40 columns


## Section 3: Train/Validation Split

Perform time-aware splitting to respect temporal order. Train = first 80% of the date range, Validation = next 10%. Test is loaded separately.

In [8]:
# ============================================================
# TIME-AWARE DATA SPLITTING (Spark SQL)
# ============================================================

def split_data_time_aware(
    spark_df: DataFrame,
    date_col: str = 'merchant_local_time',
    train_frac: float = 0.8,
    val_frac: float = 0.9,
) -> Tuple[DataFrame, DataFrame]:
    """
    Split data into train and validation using temporal ordering.
    Dates are derived from the actual data range (no hardcoded dates).
    """
    date_col_found = None
    if date_col in spark_df.columns:
        date_col_found = date_col
    else:
        for col_name in ['merchant_local_time', 'customer_local_time', 'trans_date_trans_time']:
            if col_name in spark_df.columns:
                date_col_found = col_name
                print(f"  Using date column: {col_name}")
                break

    if date_col_found is None:
        date_like = [c for c in spark_df.columns if 'date' in c.lower() or 'time' in c.lower()]
        raise ValueError(f"Date column '{date_col}' not found. Available: {date_like}")

    date_col = date_col_found
    spark_df = spark_df.withColumn(date_col, to_timestamp(col(date_col)))

    actual_min_date = spark_df.select(spark_min(col(date_col))).collect()[0][0]
    actual_max_date = spark_df.select(spark_max(col(date_col))).collect()[0][0]
    if actual_min_date is None or actual_max_date is None:
        raise ValueError(f"Date column '{date_col}' contains no valid dates")

    actual_min_dt = pd.to_datetime(actual_min_date)
    actual_max_dt = pd.to_datetime(actual_max_date)

    if not 0 < train_frac < val_frac <= 1.0:
        raise ValueError("Require 0 < train_frac < val_frac <= 1.0")

    data_span_days = (actual_max_dt - actual_min_dt).days
    train_end_dt = actual_min_dt + pd.Timedelta(days=int(data_span_days * train_frac))
    val_end_dt = actual_min_dt + pd.Timedelta(days=int(data_span_days * val_frac))

    print(f"  Data date range: {actual_min_dt.date()} to {actual_max_dt.date()}")
    print(f"  Train: up to {train_end_dt.date()}")
    print(f"  Val:   up to {val_end_dt.date()}")

    train_df = spark_df.filter(col(date_col) <= lit(train_end_dt.strftime('%Y-%m-%d %H:%M:%S')))
    val_df = spark_df.filter(
        (col(date_col) > lit(train_end_dt.strftime('%Y-%m-%d %H:%M:%S')))
        & (col(date_col) <= lit(val_end_dt.strftime('%Y-%m-%d %H:%M:%S')))
    )

    train_count = train_df.count()
    val_count = val_df.count()
    print(f"  Train: {train_count:,} rows")
    print(f"  Val:   {val_count:,} rows")

    return train_df, val_df


print("Performing time-aware split...")
train_split, val_split = split_data_time_aware(train_with_row_feats)

Performing time-aware split...


  Data date range: 2018-12-31 to 2020-06-21
  Train: up to 2020-03-04
  Val:   up to 2020-04-27
  Train: 1,034,987 rows
  Val:   122,480 rows


## Section 4: Card-Level & Velocity Features (After Split)

These features aggregate across rows for the same card (`cc_num`). To prevent data leakage they are computed with **point-in-time backward-only windows**: each row only sees transactions at or before its own timestamp.

- **Train:** features computed on training data alone.
- **Validation:** train + val are unioned, features are computed with point-in-time windows, then only val rows are kept. This lets val rows see card history from the training period (past data) without future leakage.

In [9]:
# ============================================================
# CARD-LEVEL & VELOCITY FEATURES (point-in-time, after split)
# ============================================================

def add_card_velocity_features(df: DataFrame) -> DataFrame:
    """
    Add card-level and velocity features using backward-only windows.
    Every aggregation is cumulative up to (and including) the current row's timestamp,
    so no future information leaks into any row.
    """
    time_ordered = Window.partitionBy("cc_num").orderBy("unix_time")
    cumulative_window = time_ordered.rowsBetween(Window.unboundedPreceding, 0)
    expanding_excl = time_ordered.rowsBetween(Window.unboundedPreceding, -1)

    # Card age: days since the card's first known transaction (backward)
    df = df.withColumn(
        "first_txn_time",
        spark_min(col("trans_date_trans_time")).over(
            Window.partitionBy("cc_num").orderBy("unix_time").rowsBetween(Window.unboundedPreceding, 0)
        ),
    )
    df = df.withColumn("card_age_days", datediff(col("trans_date_trans_time"), col("first_txn_time")))
    df = df.drop("first_txn_time")

    # Cumulative transaction count (not total across all time)
    df = df.withColumn("transaction_count", count("*").over(cumulative_window))

    # Velocity: transactions in last 1h / 24h (backward time-range windows)
    window_1h = Window.partitionBy("cc_num").orderBy("unix_time").rangeBetween(-3600, 0)
    window_24h = Window.partitionBy("cc_num").orderBy("unix_time").rangeBetween(-86400, 0)
    df = df.withColumn("txn_count_last_1h", count("*").over(window_1h))
    df = df.withColumn("txn_count_last_24h", count("*").over(window_24h))

    # Amount relative to card's historical average (exclude current row)
    df = df.withColumn(
        "amt_relative_to_avg",
        col("amt") / coalesce(spark_avg("amt").over(expanding_excl), lit(1.0)),
    )

    return df


# Tag rows so we can filter val back out after union
_TAG = "__split_tag__"

train_tagged = train_split.withColumn(_TAG, lit("train"))
val_tagged = val_split.withColumn(_TAG, lit("val"))

# Train features: compute on train alone
print("Computing card-level features for train...")
train_df = add_card_velocity_features(train_tagged).filter(col(_TAG) == "train").drop(_TAG)

# Val features: union train+val, compute, keep val rows only
print("Computing card-level features for val (with train history)...")
union_df = train_tagged.unionByName(val_tagged)
val_df = add_card_velocity_features(union_df).filter(col(_TAG) == "val").drop(_TAG)

print(f"  Train rows: {train_df.count():,}, Val rows: {val_df.count():,}")

Computing card-level features for train...
Computing card-level features for val (with train history)...


  Train rows: 1,034,987, Val rows: 122,480


## Section 5: Interaction & Risk Features

Derive interaction features (e.g. evening + high amount) and composite risk scores from the card-level and row-level features computed above.

In [10]:
# ============================================================
# INTERACTION & RISK FEATURES
# ============================================================

def add_interaction_risk_features(df: DataFrame) -> DataFrame:
    """Add interaction features and composite risk scores."""
    # Bins derived from card-level features
    df = df.withColumn(
        "card_age_bin",
        when(col("card_age_days") < 7, "<7 days")
        .when(col("card_age_days") < 30, "7-30 days")
        .when(col("card_age_days") < 90, "30-90 days")
        .when(col("card_age_days") < 180, "90-180 days")
        .otherwise("180+ days"),
    )
    df = df.withColumn(
        "transaction_count_bin",
        when(col("transaction_count") <= 5, "1-5")
        .when(col("transaction_count").between(6, 20), "6-20")
        .when(col("transaction_count").between(21, 100), "21-100")
        .otherwise("100+"),
    )
    df = df.withColumn("is_new_card", when(col("card_age_days") <= 30, 1).otherwise(0))
    df = df.withColumn("is_low_volume_card", when(col("transaction_count") <= 5, 1).otherwise(0))

    # Interaction features
    high_amount_bins = ["$300-$500", "$500-$1000", ">$1000"]
    df = df.withColumn(
        "evening_high_amount",
        when((col("time_bin") == "Evening") & col("amount_bin").isin(high_amount_bins), 1).otherwise(0),
    )
    df = df.withColumn(
        "evening_online_shopping",
        when((col("time_bin") == "Evening") & col("category").isin(["shopping_net", "misc_net"]), 1).otherwise(0),
    )
    df = df.withColumn(
        "large_city_evening",
        when((col("city_size") == "Large City") & (col("time_bin") == "Evening"), 1).otherwise(0),
    )
    df = df.withColumn(
        "new_card_evening",
        when((col("is_new_card") == 1) & (col("time_bin") == "Evening"), 1).otherwise(0),
    )
    df = df.withColumn(
        "high_amount_online",
        when(col("amount_bin").isin(high_amount_bins) & col("category").isin(["shopping_net", "misc_net"]), 1).otherwise(0),
    )

    # Risk scores
    df = df.withColumn(
        "temporal_risk_score",
        col("is_peak_fraud_hour").cast("double") * 0.4
        + col("is_peak_fraud_day").cast("double") * 0.3
        + col("is_peak_fraud_season").cast("double") * 0.3,
    )
    df = df.withColumn(
        "geographic_risk_score",
        when(col("city_pop") < 10000, 0.3)
        .when(col("city_pop") < 50000, 0.2)
        .when(col("city_pop") < 100000, 0.1)
        .otherwise(0.0),
    )
    df = df.withColumn(
        "card_risk_score",
        col("is_new_card").cast("double") * 0.5
        + col("is_low_volume_card").cast("double") * 0.3
        + when(col("card_age_days") < 7, 0.2).otherwise(0.0),
    )
    df = df.withColumn(
        "total_risk",
        col("temporal_risk_score") + col("geographic_risk_score") + col("card_risk_score"),
    )
    df = df.withColumn(
        "risk_tier",
        when(col("total_risk") >= 0.8, "High")
        .when(col("total_risk") >= 0.4, "Medium")
        .otherwise("Low"),
    )
    df = df.drop("total_risk")

    return df


train_df = add_interaction_risk_features(train_df)
val_df = add_interaction_risk_features(val_df)
print(f"After interaction/risk features  train cols: {len(train_df.columns)}, val cols: {len(val_df.columns)}")

After interaction/risk features  train cols: 58, val cols: 58


## Section 6: Feature Selection

Select the final feature set for modeling. Includes critical, high-priority, interaction, enriched (risk), and newly-added raw + velocity features (~30 total).

In [11]:
# ============================================================
# FEATURE SELECTION
# ============================================================

CRITICAL_FEATURES = [
    'transaction_count_bin',
    'card_age_bin',
    'hour',
    'time_bin',
    'is_peak_fraud_hour',
    'is_new_card',
    'is_low_volume_card',
]

HIGH_PRIORITY_FEATURES = [
    'category',
    'day_of_week',
    'month',
    'is_peak_fraud_day',
    'is_peak_fraud_season',
    'is_high_risk_category',
    'card_age_days',
    'transaction_count',
]

INTERACTION_FEATURES = [
    'evening_high_amount',
    'evening_online_shopping',
    'large_city_evening',
    'new_card_evening',
    'high_amount_online',
]

ENRICHED_FEATURES = [
    'temporal_risk_score',
    'geographic_risk_score',
    'card_risk_score',
    'risk_tier',
]

RAW_AND_VELOCITY_FEATURES = [
    'amt',
    'city_pop',
    'customer_merchant_distance_km',
    'txn_count_last_1h',
    'txn_count_last_24h',
    'amt_relative_to_avg',
]

SELECTED_FEATURES = (
    CRITICAL_FEATURES
    + HIGH_PRIORITY_FEATURES
    + INTERACTION_FEATURES
    + ENRICHED_FEATURES
    + RAW_AND_VELOCITY_FEATURES
)

available_features = [f for f in SELECTED_FEATURES if f in train_df.columns]
missing_features = [f for f in SELECTED_FEATURES if f not in train_df.columns]

print("Feature Selection Summary:")
print(f"  Critical:          {len([f for f in CRITICAL_FEATURES if f in train_df.columns])}/{len(CRITICAL_FEATURES)}")
print(f"  High priority:     {len([f for f in HIGH_PRIORITY_FEATURES if f in train_df.columns])}/{len(HIGH_PRIORITY_FEATURES)}")
print(f"  Interaction:       {len([f for f in INTERACTION_FEATURES if f in train_df.columns])}/{len(INTERACTION_FEATURES)}")
print(f"  Enriched (risk):   {len([f for f in ENRICHED_FEATURES if f in train_df.columns])}/{len(ENRICHED_FEATURES)}")
print(f"  Raw + velocity:    {len([f for f in RAW_AND_VELOCITY_FEATURES if f in train_df.columns])}/{len(RAW_AND_VELOCITY_FEATURES)}")
print(f"  Total selected:    {len(available_features)} features")

if missing_features:
    print(f"  Missing (skipped): {missing_features}")

feature_names = available_features.copy()

Feature Selection Summary:
  Critical:          7/7
  High priority:     8/8
  Interaction:       5/5
  Enriched (risk):   4/4
  Raw + velocity:    6/6
  Total selected:    30 features


## Section 7: Spark MLlib Preprocessing Pipeline

Create and fit a Spark MLlib Pipeline (StringIndexer + Imputer + VectorAssembler + StandardScaler). The pipeline is fitted **only on training data** and then applied to all splits.

In [12]:
# ============================================================
# SPARK MLlib PREPROCESSOR CLASS
# ============================================================

class SparkMLlibPreprocessor:
    """
    Production-grade preprocessing using Spark MLlib Pipeline.
    Handles categorical encoding, feature assembly, scaling, and imputation.
    """

    def __init__(self, feature_names: List[str]):
        self.feature_names = feature_names
        self.categorical_features: List[str] = []
        self.numerical_features: List[str] = []
        self.pipeline: Optional[Pipeline] = None
        self.is_fitted = False

    def _identify_feature_types(self, spark_df: DataFrame) -> None:
        self.categorical_features = []
        self.numerical_features = []

        missing = [f for f in self.feature_names if f not in spark_df.columns]
        if missing:
            print(f"  WARNING: Features not in DataFrame (skipped): {missing}")

        schema = spark_df.schema
        for feat in self.feature_names:
            if feat not in spark_df.columns:
                continue
            field_type = str(schema[feat].dataType)
            if 'StringType' in field_type or 'String' in field_type:
                self.categorical_features.append(feat)
            else:
                self.numerical_features.append(feat)

    def fit(self, spark_df: DataFrame) -> 'SparkMLlibPreprocessor':
        self._identify_feature_types(spark_df)

        stages = []
        indexed_categorical = []
        imputed_numerical = []

        for feat in self.categorical_features:
            indexer = StringIndexer(inputCol=feat, outputCol=f"{feat}_indexed", handleInvalid="keep")
            stages.append(indexer)
            indexed_categorical.append(f"{feat}_indexed")

        if len(self.numerical_features) > 0:
            imputer = Imputer(
                inputCols=self.numerical_features,
                outputCols=[f"{f}_imputed" for f in self.numerical_features],
                strategy="mean",
            )
            stages.append(imputer)
            imputed_numerical = [f"{f}_imputed" for f in self.numerical_features]

        assembler_inputs = indexed_categorical + imputed_numerical
        assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features_raw", handleInvalid="skip")
        stages.append(assembler)

        scaler = StandardScaler(inputCol="features_raw", outputCol="features", withMean=True, withStd=True)
        stages.append(scaler)

        self.pipeline = Pipeline(stages=stages)
        self.pipeline_model = self.pipeline.fit(spark_df)
        self.is_fitted = True
        return self

    def transform(self, spark_df: DataFrame) -> DataFrame:
        if not self.is_fitted:
            raise ValueError("Preprocessor must be fitted before transform")
        return self.pipeline_model.transform(spark_df)

    def get_feature_names(self) -> List[str]:
        return self.feature_names.copy()


preprocessor = SparkMLlibPreprocessor(feature_names)
preprocessor.fit(train_df)

print("Spark MLlib Preprocessor fitted on training data")
print(f"  Categorical features: {len(preprocessor.categorical_features)}")
print(f"  Numerical features:   {len(preprocessor.numerical_features)}")
print(f"  Total features:       {len(preprocessor.get_feature_names())}")

[Stage 79:>                                                       (0 + 24) / 26][Stage 79:==>                                                     (1 + 24) / 26]



Spark MLlib Preprocessor fitted on training data
  Categorical features: 5
  Numerical features:   25
  Total features:       30


                                                                                

## Section 8: Engineer Test Features & Apply Pipeline

Build the same feature set for test data (self-contained, no train/val history needed) and then transform all three splits through the fitted pipeline.

In [13]:
# ============================================================
# ENGINEER FEATURES FOR TEST DATA
# ============================================================

def engineer_test_features(test_df: DataFrame) -> DataFrame:
    """
    Replicate the full feature engineering pipeline for test data.
    Test data is self-contained: card-level and velocity features are computed
    within the test set using point-in-time backward windows.
    """
    df = test_df

    # --- Row-level features ---
    df = add_row_level_features(df)

    # --- Card-level + velocity (point-in-time within test) ---
    df = add_card_velocity_features(df)

    # --- Interaction + risk ---
    df = add_interaction_risk_features(df)

    return df


print("Engineering features for test data...")
test_df_engineered = engineer_test_features(test_df_raw)
print(f"  Test columns after engineering: {len(test_df_engineered.columns)}")

# ============================================================
# APPLY PREPROCESSING PIPELINE
# ============================================================

print("\nTransforming datasets with fitted preprocessor...")

print("  Transforming training data...")
train_transformed = preprocessor.transform(train_df)
print(f"    Train samples: {train_transformed.count():,}")

print("  Transforming validation data...")
val_transformed = preprocessor.transform(val_df)
print(f"    Validation samples: {val_transformed.count():,}")

print("  Transforming test data...")
test_transformed = preprocessor.transform(test_df_engineered)
print(f"    Test samples: {test_transformed.count():,}")

print("\nAll datasets transformed successfully")

Engineering features for test data...
  Test columns after engineering: 53

Transforming datasets with fitted preprocessor...
  Transforming training data...


    Train samples: 1,034,987
  Transforming validation data...


    Validation samples: 122,480
  Transforming test data...


    Test samples: 555,719

All datasets transformed successfully


### Distribution Comparison Diagnostic

Compare the distributions of train, validation, and test sets to detect potential data drift that could cause poor model generalization.

In [14]:
# ============================================================
# DISTRIBUTION COMPARISON ANALYSIS
# ============================================================
print("=" * 80)
print("DISTRIBUTION COMPARISON: Train vs Validation vs Test")
print("=" * 80)

# 1. Fraud Rate Comparison (compute fresh)
print("\n1. FRAUD RATE COMPARISON:")
print("-" * 40)
train_fraud_rate = train_transformed.select("is_fraud").toPandas()["is_fraud"].mean()
val_fraud_rate = val_transformed.select("is_fraud").toPandas()["is_fraud"].mean()
test_fraud_rate = test_transformed.select("is_fraud").toPandas()["is_fraud"].mean()

print(f"   Train:      {train_fraud_rate:.4%}")
print(f"   Validation: {val_fraud_rate:.4%}")
print(f"   Test:       {test_fraud_rate:.4%}")

# Flag if train/test fraud rate differs significantly
if abs(train_fraud_rate - test_fraud_rate) / max(train_fraud_rate, test_fraud_rate) > 0.2:
    print("   NOTE: Train and test fraud rates differ by >20% - this is expected from different time periods")
else:
    print("   Train and test fraud rates are similar")

# 2. Sample Size Comparison
print("\n2. SAMPLE SIZE COMPARISON:")
print("-" * 40)
train_count = train_transformed.count()
val_count = val_transformed.count()
test_count = test_transformed.count()
print(f"   Train:      {train_count:,} samples")
print(f"   Validation: {val_count:,} samples")
print(f"   Test:       {test_count:,} samples")
print(f"   Total:      {train_count + val_count + test_count:,} samples")

# 3. Feature Vector Statistics (sample-based for efficiency)
print("\n3. FEATURE STATISTICS (sample of 10000 per split):")
print("-" * 40)

def get_feature_stats(df, name, sample_size=10000):
    """Get feature statistics from a sample of the DataFrame."""
    sample_df = df.select("features").sample(False, min(1.0, sample_size / df.count()), seed=42).toPandas()
    if len(sample_df) == 0:
        return None
    features = np.array([row.toArray() for row in sample_df["features"]])
    return {
        'name': name,
        'mean': np.mean(features),
        'std': np.std(features),
        'min': np.min(features),
        'max': np.max(features)
    }

train_stats = get_feature_stats(train_transformed, "Train")
val_stats = get_feature_stats(val_transformed, "Validation")
test_stats = get_feature_stats(test_transformed, "Test")

print(f"   {'Split':<12} {'Mean':>10} {'Std':>10} {'Min':>10} {'Max':>10}")
print(f"   {'-'*52}")
for stats in [train_stats, val_stats, test_stats]:
    if stats:
        print(f"   {stats['name']:<12} {stats['mean']:>10.4f} {stats['std']:>10.4f} {stats['min']:>10.4f} {stats['max']:>10.4f}")

# 4. Check for significant distribution drift
print("\n4. DISTRIBUTION DRIFT CHECK:")
print("-" * 40)
if train_stats and test_stats:
    mean_diff = abs(train_stats['mean'] - test_stats['mean'])
    std_diff = abs(train_stats['std'] - test_stats['std'])
    
    if mean_diff > 0.5 or std_diff > 0.5:
        print("   NOTE: Some distribution drift detected (mean diff: {:.4f}, std diff: {:.4f})".format(mean_diff, std_diff))
    else:
        print("   Train and test feature distributions are aligned")
        print(f"      Mean difference: {mean_diff:.4f}")
        print(f"      Std difference: {std_diff:.4f}")

print("\n" + "=" * 80)
print("Distribution comparison complete.")
print("=" * 80)


DISTRIBUTION COMPARISON: Train vs Validation vs Test

1. FRAUD RATE COMPARISON:
----------------------------------------




   Train:      0.5757%
   Validation: 0.5250%
   Test:       0.3860%
   NOTE: Train and test fraud rates differ by >20% - this is expected from different time periods

2. SAMPLE SIZE COMPARISON:
----------------------------------------


   Train:      1,034,987 samples
   Validation: 122,480 samples
   Test:       555,719 samples
   Total:      1,713,186 samples

3. FEATURE STATISTICS (sample of 10000 per split):
----------------------------------------






   Split              Mean        Std        Min        Max
   ----------------------------------------------------
   Train           -0.0037     0.9831    -2.6052    67.7234
   Validation      -0.0093     0.9490    -2.5516    47.1240
   Test             0.0645     1.1564    -2.5572    48.9739

4. DISTRIBUTION DRIFT CHECK:
----------------------------------------
   Train and test feature distributions are aligned
      Mean difference: 0.0682
      Std difference: 0.1734

Distribution comparison complete.


In [15]:
# ============================================================
# CLASS IMBALANCE HANDLING
# ============================================================

# Convert training data to pandas
print("Converting training data to pandas...")
train_pd = train_transformed.select("features", "is_fraud").toPandas()

X_train = np.array([row.toArray() for row in train_pd["features"]])
y_train = train_pd["is_fraud"].values

print(f"Training data: {X_train.shape[0]:,} samples, {X_train.shape[1]} features")
print(f"  Fraud cases: {y_train.sum():,} ({y_train.mean():.4%})")
print(f"  Non-fraud cases: {(y_train == 0).sum():,} ({(y_train == 0).mean():.4%})")

# ============================================================
# SMOTE EXPERIMENTS (ALL REJECTED - PRESERVED FOR DOCUMENTATION)
# ============================================================
# See Cell 0 for full experiment results.
# All SMOTE variants degraded test performance due to temporal distribution shift.
# ============================================================

# Adopted approach: natural distribution + class weights
X_train_resampled = X_train
y_train_resampled = y_train

print(f"\nUsing natural distribution (no SMOTE)")
print(f"  Class weight ratio: {(1 - y_train_resampled.mean()) / y_train_resampled.mean():.1f}:1")
print("  Imbalance handled via cost-sensitive learning in downstream models")

train_resampled_pd = pd.DataFrame(X_train_resampled)
train_resampled_pd['is_fraud'] = y_train_resampled

Converting training data to pandas...


[Stage 157:>                                                      (0 + 24) / 26]





                                                                                

Training data: 1,034,987 samples, 30 features
  Fraud cases: 5,958 (0.5757%)
  Non-fraud cases: 1,029,029 (99.4243%)

Using natural distribution (no SMOTE)
  Class weight ratio: 172.7:1
  Imbalance handled via cost-sensitive learning in downstream models


## Section 9: Pipeline Conversion & Saving

Convert the fitted Spark MLlib pipeline into a sklearn-compatible pipeline for lightweight inference, then persist preprocessed datasets and pipeline artifacts.

## Section Summary

**Data Loading:**
- Training data: Loaded from EDA Section 8 checkpoint (35 columns, geographic features)
- Test data: Loaded separately from CSV (never used for fitting)

**Row-Level Features (before split):**
- day_of_week, is_peak_fraud_hour, is_peak_fraud_day, is_peak_fraud_season, is_high_risk_category
- amount_bin, city_size, time_bin

**Train/Validation Split:**
- Time-aware split: train = first 80%, val = next 10%

**Card-Level & Velocity Features (after split, point-in-time):**
- card_age_days, transaction_count (cumulative backward windows)
- txn_count_last_1h, txn_count_last_24h (backward time-range windows)
- amt_relative_to_avg (expanding mean excluding current row)
- Validation computed on train+val union to see training history

**Interaction & Risk Features:**
- evening_high_amount, evening_online_shopping, large_city_evening, new_card_evening, high_amount_online
- temporal_risk_score, geographic_risk_score, card_risk_score, risk_tier

**Feature Selection:**
- ~30 features across 5 groups (Critical, High Priority, Interaction, Enriched, Raw + Velocity)

**Preprocessing Pipeline:**
- Spark MLlib Pipeline with StringIndexer, Imputer, VectorAssembler, StandardScaler
- Fitted only on training data, applied to all splits

**Artifacts Saved:**
- Preprocessed data: train, validation, test (parquet)
- Spark MLlib pipeline, sklearn pipeline, feature names

In [16]:
# ============================================================
# PIPELINE CONVERSION & SAVING
# ============================================================

def convert_spark_to_sklearn_pipeline(
    spark_preprocessor: SparkMLlibPreprocessor,
) -> SklearnPipeline:
    """Convert Spark MLlib pipeline to sklearn pipeline for fast inference."""
    transformers = []

    stage_idx = 0
    for feat in spark_preprocessor.categorical_features:
        indexer_model = spark_preprocessor.pipeline_model.stages[stage_idx]
        labels = indexer_model.labels
        le = SklearnLabelEncoder()
        le.classes_ = np.array(labels)
        transformers.append((f"label_encoder_{feat}", le, [feat]))
        stage_idx += 1

    if len(spark_preprocessor.numerical_features) > 0:
        imputer_model = spark_preprocessor.pipeline_model.stages[stage_idx]
        imputer = SklearnSimpleImputer(strategy="mean")
        try:
            stats_row = imputer_model.surrogateDF.collect()[0]
            stats_values = []
            for feat in spark_preprocessor.numerical_features:
                if feat in stats_row.asDict():
                    stats_values.append(float(stats_row[feat]))
                else:
                    stats_values.append(0.0)
            imputer.statistics_ = np.array(stats_values)
        except Exception as e:
            print(f"    Warning: Could not extract Imputer statistics: {e}")
            imputer.statistics_ = np.zeros(len(spark_preprocessor.numerical_features))
        transformers.append(("imputer", imputer, spark_preprocessor.numerical_features))
        stage_idx += 1

    scaler_model = spark_preprocessor.pipeline_model.stages[-1]
    scaler = SklearnStandardScaler(with_mean=True, with_std=True)
    scaler.mean_ = np.array(scaler_model.mean.toArray())
    scaler.scale_ = np.array(scaler_model.std.toArray())

    column_transformer = ColumnTransformer(transformers, remainder="passthrough")
    sklearn_pipeline = SklearnPipeline([
        ("preprocessor", column_transformer),
        ("scaler", scaler),
    ])
    return sklearn_pipeline


print("Converting Spark MLlib pipeline to sklearn pipeline...")
try:
    sklearn_preprocessor = convert_spark_to_sklearn_pipeline(preprocessor)
    print("  Pipeline conversion complete")
except Exception as e:
    print(f"  Pipeline conversion failed: {e}")
    sklearn_preprocessor = None

# ============================================================
# SAVE PREPROCESSED DATA
# ============================================================
print("\nSaving preprocessed data...")

train_resampled_pd.to_parquet(PREPROCESSED_TRAIN_PATH, index=False)
print(f"  Train: {PREPROCESSED_TRAIN_PATH} ({len(train_resampled_pd):,} samples)")

val_pd = val_transformed.select("features", "is_fraud").toPandas()
val_features = np.array([row.toArray() for row in val_pd["features"]])
val_preprocessed = pd.DataFrame(val_features)
val_preprocessed['is_fraud'] = val_pd['is_fraud'].values
val_preprocessed.to_parquet(PREPROCESSED_VAL_PATH, index=False)
print(f"  Validation: {PREPROCESSED_VAL_PATH} ({len(val_preprocessed):,} samples)")

test_pd = test_transformed.select("features", "is_fraud").toPandas()
test_features = np.array([row.toArray() for row in test_pd["features"]])
test_preprocessed = pd.DataFrame(test_features)
test_preprocessed['is_fraud'] = test_pd['is_fraud'].values
test_preprocessed.to_parquet(PREPROCESSED_TEST_PATH, index=False)
print(f"  Test: {PREPROCESSED_TEST_PATH} ({len(test_preprocessed):,} samples)")

# Save Spark MLlib pipeline
spark_pipeline_dir = str(SPARK_PREPROCESSER_PATH).replace('.pkl', '')
preprocessor.pipeline_model.write().overwrite().save(spark_pipeline_dir)
print(f"\nSpark MLlib pipeline saved: {spark_pipeline_dir}")

if sklearn_preprocessor is not None:
    joblib.dump(sklearn_preprocessor, SKLEARN_PREPROCESSER_PATH)
    print(f"Sklearn pipeline saved: {SKLEARN_PREPROCESSER_PATH}")

with open(FEATURE_NAMES_PATH, 'wb') as f:
    pickle.dump(preprocessor.get_feature_names(), f)
print(f"Feature names saved: {FEATURE_NAMES_PATH}")

print("\n" + "=" * 60)
print("Preprocessing pipeline complete!")
print("=" * 60)
print(f"  Selected features: {len(preprocessor.get_feature_names())}")
print(f"  Train: {len(train_resampled_pd):,} samples")
print(f"  Validation: {len(val_preprocessed):,} samples")
print(f"  Test: {len(test_preprocessed):,} samples")
print("=" * 60)

Converting Spark MLlib pipeline to sklearn pipeline...
  Pipeline conversion complete

Saving preprocessed data...


  Train: /home/alireza/Desktop/projects/fraud-shield-ai/data/processed/train_preprocessed.parquet (1,034,987 samples)


  Validation: /home/alireza/Desktop/projects/fraud-shield-ai/data/processed/val_preprocessed.parquet (122,480 samples)




  Test: /home/alireza/Desktop/projects/fraud-shield-ai/data/processed/test_preprocessed.parquet (555,719 samples)



Spark MLlib pipeline saved: /home/alireza/Desktop/projects/fraud-shield-ai/models/spark_preprocessor
Sklearn pipeline saved: /home/alireza/Desktop/projects/fraud-shield-ai/models/sklearn_preprocessor.pkl
Feature names saved: /home/alireza/Desktop/projects/fraud-shield-ai/models/feature_names.pkl

Preprocessing pipeline complete!
  Selected features: 30
  Train: 1,034,987 samples
  Validation: 122,480 samples
  Test: 555,719 samples
