# Credit Card Fraud Detection with PySpark
Step-by-step setup: data loading, EDA, preprocessing pipeline

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("FraudDetection") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.maxResultSize", "4g") \
    .getOrCreate()


25/09/16 08:24:24 WARN Utils: Your hostname, Swapnils-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.1.38 instead (on interface en0)
25/09/16 08:24:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


25/09/16 08:24:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("FraudDetectionEDA") \
    .config("spark.driver.memory", "8g") \
    .getOrCreate()

# Reduce Spark log noise
spark.sparkContext.setLogLevel("FATAL")

25/09/16 08:24:27 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


## Load CSVs

In [3]:
train_transaction_path = "../data/raw/train_transaction.csv"
train_identity_path    = "../data/raw/train_identity.csv"
test_transaction_path  = "../data/raw/test_transaction.csv"
test_identity_path     = "../data/raw/test_identity.csv"

train_transaction_df = spark.read.csv(train_transaction_path, header=True, inferSchema=True)
train_identity_df    = spark.read.csv(train_identity_path, header=True, inferSchema=True)
test_transaction_df  = spark.read.csv(test_transaction_path, header=True, inferSchema=True)
test_identity_df     = spark.read.csv(test_identity_path, header=True, inferSchema=True)

print("Train transaction rows:", train_transaction_df.count())
print("Train identity rows:", train_identity_df.count())
print("Test transaction rows:", test_transaction_df.count())
print("Test identity rows:", test_identity_df.count())

                                                                                

Train transaction rows: 590540
Train identity rows: 144233
Test transaction rows: 506691
Test identity rows: 141907


## Join transactions with identity

In [4]:
train_df = train_transaction_df.join(train_identity_df, on="TransactionID", how="left")
test_df  = test_transaction_df.join(test_identity_df, on="TransactionID", how="left")

print("Joined train rows:", train_df.count())
print("Joined test rows:", test_df.count())

                                                                                

Joined train rows: 590540


[Stage 25:>                                                         (0 + 8) / 8]

Joined test rows: 506691


                                                                                

## Basic EDA: Class balance, missing values, stats

In [5]:
# Fraud distribution
train_df.groupBy("isFraud").count().show()

# Example: Transaction amount stats
train_df.select("TransactionAmt").describe().show()

                                                                                

+-------+------+
|isFraud| count|
+-------+------+
|      1| 20663|
|      0|569877|
+-------+------+



[Stage 33:>                                                         (0 + 8) / 8]

+-------+------------------+
|summary|    TransactionAmt|
+-------+------------------+
|  count|            590540|
|   mean|135.02717637246874|
| stddev| 239.1625220137336|
|    min|             0.251|
|    max|         31937.391|
+-------+------------------+



                                                                                

## Standardize and Align Columns (fix train vs test mismatch)

In [6]:
from pyspark.sql import DataFrame

def standardize_and_align(train_df: DataFrame, test_df: DataFrame):
    # Replace '-' with '_' in both dfs
    def clean_columns(df):
        for old_col in df.columns:
            new_col = old_col.replace("-", "_").strip()
            if old_col != new_col:
                df = df.withColumnRenamed(old_col, new_col)
        return df

    train_df = clean_columns(train_df)
    test_df  = clean_columns(test_df)

    # Align schemas
    common_cols = sorted(list(set(train_df.columns).intersection(set(test_df.columns))))
    train_df = train_df.select(common_cols)
    test_df  = test_df.select(common_cols)

    print(f'✅ Standardized and aligned: Train={len(train_df.columns)}, Test={len(test_df.columns)}')
    return train_df, test_df

train_df_clean, test_df_clean = standardize_and_align(train_df, test_df)

✅ Standardized and aligned: Train=433, Test=433


## Drop High-Missing Columns (>90%)

In [7]:
from pyspark.sql.functions import col

total_count = train_df_clean.count()
missing_dict = {c: train_df_clean.filter(col(c).isNull()).count()/total_count for c in train_df_clean.columns}

drop_cols = [c for c, miss in missing_dict.items() if miss > 0.9]
train_df_clean = train_df_clean.drop(*drop_cols)
test_df_clean  = test_df_clean.drop(*drop_cols)

print(f'Dropped {len(drop_cols)} columns with >90% missing values')
print('Train shape (cols):', len(train_df_clean.columns))
print('Test shape (cols):', len(test_df_clean.columns))

                                                                                

Dropped 12 columns with >90% missing values
Train shape (cols): 421
Test shape (cols): 421


## Preprocessing Pipeline

In [8]:
from pyspark.ml.feature import Imputer, StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline

# --- Identify numeric and categorical columns ---
numeric_cols = [
    f.name for f in train_df_clean.schema.fields 
    if f.dataType.simpleString() in ["double", "int"] and f.name not in ["isFraud", "TransactionID"]
]
categorical_cols = [
    f.name for f in train_df_clean.schema.fields 
    if f.dataType.simpleString() == "string" and f.name != "TransactionID"
]

# --- Impute numerics ---
imputer = Imputer(
    inputCols=numeric_cols,
    outputCols=[f"{c}_imputed" for c in numeric_cols]
).setStrategy("median")

# --- Encode categoricals ---
indexers = [
    StringIndexer(inputCol=c, outputCol=f"{c}_idx", handleInvalid="keep") 
    for c in categorical_cols
]
encoders = [
    OneHotEncoder(inputCol=f"{c}_idx", outputCol=f"{c}_ohe") 
    for c in categorical_cols
]

# --- Assemble feature vector ---
assembler = VectorAssembler(
    inputCols=[f"{c}_imputed" for c in numeric_cols] + [f"{c}_ohe" for c in categorical_cols],
    outputCol="features"
)

# --- Build pipeline ---
pipeline = Pipeline(stages=[imputer] + indexers + encoders + [assembler])

# --- Fit + transform ---
pipeline_model = pipeline.fit(train_df_clean)
train_prepared = pipeline_model.transform(train_df_clean)
test_prepared  = pipeline_model.transform(test_df_clean)

print("✅ Preprocessing complete")

# --- Preserve labels ---
# Ensure labels are still attached
train_labels = train_df.select("TransactionID", "isFraud")  # original df before dropping
train_prepared = train_prepared.join(train_labels, on="TransactionID", how="left")

# --- Check sample ---
train_prepared.select("features", "isFraud").show(3, truncate=False)


                                                                                

✅ Preprocessing complete


[Stage 1901:>                                                       (0 + 1) / 1]

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

In [9]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.sql.functions import col
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

# --- Step 1: Train/Validation Split ---
train_split, val_split = train_prepared.randomSplit([0.8, 0.2], seed=42)

# --- Step 2: Define Logistic Regression ---
lr = LogisticRegression(
    featuresCol="features",
    labelCol="isFraud",
    maxIter=10,
    regParam=0.01,
    elasticNetParam=0.0
)

# --- Step 3: Fit model ---
lr_model = lr.fit(train_split)

# --- Step 4: Predictions on validation set ---
val_predictions = lr_model.transform(val_split)

# --- Step 5: Evaluation ---
# AUC (ROC)
auc_eval = BinaryClassificationEvaluator(
    labelCol="isFraud",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)
auc = auc_eval.evaluate(val_predictions)

# F1-score
f1_eval = MulticlassClassificationEvaluator(
    labelCol="isFraud",
    predictionCol="prediction",
    metricName="f1"
)
f1 = f1_eval.evaluate(val_predictions)

print(f"✅ Logistic Regression Validation Results: AUC = {auc:.4f}, F1 = {f1:.4f}")




✅ Logistic Regression Validation Results: AUC = 0.8580, F1 = 0.9649


                                                                                

In [11]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# --- Step 8B: Random Forest Classifier ---
rf = RandomForestClassifier(
    featuresCol="features",
    labelCol="isFraud",
    numTrees=50,
    maxDepth=10,
    subsamplingRate=0.8,
    seed=42
)

# Train RF on train_prepared
rf_model = rf.fit(train_prepared)

# Predict on train_prepared (validation split would be better, but this keeps it simple for now)
rf_predictions = rf_model.transform(train_prepared)

# --- Evaluation ---
auc_eval = BinaryClassificationEvaluator(
    labelCol="isFraud",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)
auc_rf = auc_eval.evaluate(rf_predictions)

f1_eval = MulticlassClassificationEvaluator(
    labelCol="isFraud",
    predictionCol="prediction",
    metricName="f1"
)
f1_rf = f1_eval.evaluate(rf_predictions)

print(f"✅ Random Forest Results (Train Validation): AUC = {auc_rf:.4f}, F1 = {f1_rf:.4f}")




✅ Random Forest Results (Train Validation): AUC = 0.8613, F1 = 0.9646


                                                                                