In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, to_timestamp, hour, dayofweek,
    to_date, datediff, floor,
    radians, cos, sin, atan2, sqrt,
    sum, count, when, lit
)
from pyspark.sql.types import DoubleType
from pyspark.sql.window import Window

from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

spark = SparkSession.builder.appName("FraudDetectionPipeline").getOrCreate()

df = spark.read.csv("fraudTrain.csv", header=True, inferSchema=True)

# Date/Time Features
df = df.withColumn("trans_date_trans_time", to_timestamp(col("trans_date_trans_time"), "yyyy-MM-dd HH:mm:ss"))
df = df.withColumn("hour", hour(col("trans_date_trans_time")))
df = df.withColumn("day_of_week", dayofweek(col("trans_date_trans_time")))

# Daily Spending/Transactions (Window functions)
df = df.withColumn("trans_date_only", to_date(col("trans_date_trans_time")))

window_spec_daily = Window.partitionBy("cc_num", "trans_date_only")

df = df.withColumn("daily_spending", sum("amt").over(window_spec_daily))
df = df.withColumn("daily_transactions", count("cc_num").over(window_spec_daily))

df = df.drop("trans_date_only")

# Haversine Distance
def haversine(lat1, lon1, lat2, lon2):
    R = 6371  # Earth radius in km
    return 2 * R * atan2(
        sqrt(sin((radians(lat2) - radians(lat1)) / 2) ** 2 +
             cos(radians(lat1)) * cos(radians(lat2)) *
             sin((radians(lon2) - radians(lon1)) / 2) ** 2),
        sqrt(1 - (sin((radians(lat2) - radians(lat1)) / 2) ** 2 +
                  cos(radians(lat1)) * cos(radians(lat2)) *
                  sin((radians(lon2) - radians(lon1)) / 2) ** 2))
    )

df = df.withColumn("distance", haversine(col("lat"), col("long"), col("merch_lat"), col("merch_long")))

# Age Calculation
df = df.withColumn("dob_date", to_date(col("dob")))
df = df.withColumn("transaction_date", to_date(col("trans_date_trans_time")))
df = df.withColumn("age", floor(datediff(col("transaction_date"), col("dob_date")) / 365.25))
df = df.drop("dob_date", "transaction_date")

# Class Weight (for handling imbalance)
is_fraud_count = df.filter(col("is_fraud") == 1).count()
is_not_fraud_count = df.filter(col("is_fraud") == 0).count()

class_weight_for_fraud = is_not_fraud_count / is_fraud_count

df = df.withColumn("class_weight", when(col("is_fraud") == 1, lit(class_weight_for_fraud)).otherwise(lit(1.0)))

# Label preparation (cast to DoubleType, required for MLlib)
df = df.withColumn("indexedLabel", col("is_fraud").cast(DoubleType()))

# --- Define Pipeline ---
stages = []

# Categorical Feature Processing (StringIndexer -> OneHotEncoder)
categorical_cols = ["category", "gender", "merchant", "job"]
for col_name in categorical_cols:
    indexer = StringIndexer(inputCol=col_name, outputCol=f"{col_name}_index", handleInvalid="keep")
    encoder = OneHotEncoder(inputCol=f"{col_name}_index", outputCol=f"{col_name}_encoded")
    stages.append(indexer)
    stages.append(encoder)

# Assemble all features into a single vector
feature_cols = [
    "amt", "lat", "long", "city_pop", "merch_lat", "merch_long",
    "hour", "day_of_week", "daily_spending", "daily_transactions",
    "distance", "age"
]
# Add the newly created one-hot encoded columns
for col_name in categorical_cols:
    feature_cols.append(f"{col_name}_encoded")

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
stages.append(assembler)

# GBT Classifier
gbt = GBTClassifier(
    labelCol="indexedLabel",
    featuresCol="features",
    weightCol="class_weight",
    maxDepth=5,
    maxIter=20
)
stages.append(gbt)

# --- Create the Pipeline ---
pipeline = Pipeline(stages=stages)

(trainingData, testData) = df.randomSplit([0.8, 0.2], seed=42)

print("Fitting the PySpark ML Pipeline...")
pipeline_model = pipeline.fit(trainingData)
print("Pipeline fitting complete.")

Fitting the PySpark ML Pipeline...
Pipeline fitting complete.


In [2]:
# --- Save the Trained Pipeline ---
model_path = "./kafka/Pipeline"
pipeline_model.save(model_path)
print(f"Pipeline model saved to: {model_path}")

Pipeline model saved to: ./kafka/Pipeline


In [4]:
spark.stop()