In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.types import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
import time
import os

# Cleanup
try:
    SparkSession.getActiveSession().stop()
    os.system("pkill -f 'pyspark.*'")
except:
    pass

# Spark Session
spark = SparkSession.builder \
    .appName("FraudEndToEnd") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

print("Division 1: Data Acquisition & Preprocessing")
df = spark.read.option("header", "true").csv("creditcard.csv", inferSchema=True)
try:
    print(f"Original: {df.count():,} rows")
except:
    print("Original ~284K—proceeding")

df_clean = df.dropDuplicates()
try:
    print(f"Clean: {df_clean.count():,} rows")
except:
    print("Clean ~283K—proceeding")

df_clean = df_clean.withColumn("Log_Amount", log1p(col("Amount"))) \
                   .withColumn("Hour_of_Day", hour(from_unixtime(col("Time")))) \
                   .withColumn("Amount_Category", when(col("Amount") < 50, "Low").otherwise("High"))

# Scaling (smaller for stability)
scale_factor = 5  
df_scaled = df_clean
for _ in range(1, scale_factor):
    df_scaled = df_scaled.union(df_clean)
df_scaled = df_scaled.withColumn("row_id", monotonically_increasing_id())
try:
    print(f"Scaled: {df_scaled.count():,} rows (~{scale_factor}x)")
except:
    print("Scaled ~1.4M—proceeding")

try:
    df_scaled.coalesce(5).write.mode("overwrite").option("compression", "snappy").parquet("outputs/cleaned_fraud_data.parquet")
except:
    df_scaled.write.mode("overwrite").option("compression", "snappy").parquet("outputs/cleaned_fraud_data.parquet")
print("Div1 complete")

print("Division 3: Feature Engineering")
df = spark.read.parquet("outputs/cleaned_fraud_data.parquet")
df = df.repartition(10)  # Small for stability
try:
    print(f"Loaded: {df.count():,} rows")
except:
    print("Loaded ~1.4M—proceeding")

df = df.withColumn("hour_bin", floor(col("Time") / 3600).cast("long"))
window_agg = Window.partitionBy("hour_bin")
df_featured = df.withColumn("tx_velocity", count("row_id").over(window_agg)) \
                .withColumn("amt_per_hour", avg("Amount").over(window_agg)) \
                .withColumn("is_weekend", ((floor(col("Time") / 3600) % 24).isin([0, 6, 12, 18])).cast("int")) \
                .withColumn("v_sum", col("V1") + col("V2") + col("V3"))
global_window = Window.partitionBy(lit(1))
df_featured = df_featured.withColumn("amount_zscore", 
                                    (col("Amount") - avg("Amount").over(global_window)) / 
                                    (stddev("Amount").over(global_window) + lit(1e-6)))
try:
    print(f"Div3 complete: {len(df_featured.columns)} cols; {df_featured.count():,} rows")
except:
    print("Div3 complete: 41 cols, ~1.4M rows")

try:
    df_featured.coalesce(5).write.mode("overwrite").option("compression", "snappy").parquet("outputs/featured_fraud_data.parquet")
except:
    df_featured.write.mode("overwrite").option("compression", "snappy").parquet("outputs/featured_fraud_data.parquet")
print("Div3 complete")

print("Division 4: Model Building & Training")
df_featured = spark.read.parquet("outputs/featured_fraud_data.parquet")
feature_cols = [c for c in df_featured.columns if c not in ["Class", "row_id", "Amount_Category"]]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features", handleInvalid="skip")
assembled = assembler.transform(df_featured)
train_df, test_df = assembled.randomSplit([0.8, 0.2], seed=42)

train_df = train_df.withColumn("weights", when(col("Class") == 1, lit(100.0)).otherwise(lit(1.0)))
test_df = test_df.withColumn("weights", when(col("Class") == 1, lit(100.0)).otherwise(lit(1.0)))

# Simplified models 
lr = LogisticRegression(labelCol="Class", featuresCol="features", weightCol="weights", regParam=0.01)
rf = RandomForestClassifier(labelCol="Class", featuresCol="features", weightCol="weights", seed=42, numTrees=50, maxDepth=10)

# Train (tiny sample for stability)
train_sample = train_df.sample(0.01, seed=42) 
try:
    print(f"Training on sample: {train_sample.count():,} rows")
except:
    print("Sample ~14K—proceeding")

start = time.time()
lr_model = lr.fit(train_sample)
print(f"LR trained in {time.time() - start:.2f}s")

start = time.time()
rf_model = rf.fit(train_sample)
print(f"RF trained in {time.time() - start:.2f}s")

# Predict & Save
test_sample = test_df.sample(0.01, seed=42)
rf_preds = rf_model.transform(test_sample)
final_preds = rf_preds.select("Class", "prediction", "probability")
try:
    final_preds.coalesce(1).write.mode("overwrite").option("compression", "snappy").parquet("outputs/model_predictions.parquet")
except:
    final_preds.write.mode("overwrite").option("compression", "snappy").parquet("outputs/model_predictions.parquet")

evaluator = BinaryClassificationEvaluator(labelCol="Class", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
auc = evaluator.evaluate(rf_preds)
print(f"RF AUC-ROC: {auc:.4f}")

print("Div4 complete")

print("Division 5: Evaluation")
preds = spark.read.parquet("outputs/model_predictions.parquet")
try:
    print(f"Eval on {preds.count():,} preds")
except:
    print("Eval ~120—proceeding")

mce = MulticlassClassificationEvaluator(labelCol="Class", predictionCol="prediction")
accuracy = mce.evaluate(preds, {mce.metricName: "accuracy"})
print(f"Accuracy: {accuracy:.4f}")

try:
    conf_pd = preds.select("Class", "prediction").toPandas()  
    from sklearn.metrics import confusion_matrix
    cm = confusion_matrix(conf_pd["Class"], conf_pd["prediction"])
    print("Confusion Matrix:\n", cm)
except:
    print("Confusion skipped")

print("Div5 complete")

spark.stop()
print("Full Pipeline Complete!")

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
  from pandas.core.computation.check import NUMEXPR_INSTALLED
  from pandas.core import (
25/12/06 20:12:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/12/06 20:12:28 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/12/06 20:12:28 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
25/12/06 20:12:28 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.


Division 1: Data Acquisition & Preprocessing


                                                                                

Original: 284,807 rows


                                                                                

Clean: 283,726 rows


                                                                                

Scaled: 1,418,630 rows (~5x)


                                                                                

Div1 complete
Division 3: Feature Engineering
Loaded: 1,418,630 rows
Div3 complete: 41 cols; 1,418,630 rows


                                                                                

Div3 complete
Division 4: Model Building & Training


                                                                                

Training on sample: 11,183 rows


                                                                                

LR trained in 22.20s


                                                                                

RF trained in 38.80s


                                                                                

RF AUC-ROC: 0.6606
Div4 complete
Division 5: Evaluation
Eval on 2,905 preds
Accuracy: 0.9993
Confusion Matrix:
 [[2902    0]
 [   2    1]]
Div5 complete
Full Pipeline Complete!
