In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, datediff, current_date
from pyspark.sql.types import DoubleType, IntegerType
import math
from hdfs import InsecureClient
import pandas as pd

# Start Spark
spark = SparkSession.builder \
    .appName("FraudDetectionSparkML") \
    .getOrCreate()

# Load data
client = InsecureClient('http://hadoop-namenode:9870', user='root')
with client.read('/data') as reader:
    df = pd.read_csv(reader)
    
# Haversine formula as a Spark UDF
def haversine(lat, lon, merch_lat, merch_lon):
    R = 6371.0
    lat1 = math.radians(lat)
    lon1 = math.radians(lon)
    lat2 = math.radians(merch_lat)
    lon2 = math.radians(merch_lon)
    dlat = lat2 - lat1
    dlon = lon2 - lon1
    a = math.sin(dlat / 2) ** 2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon / 2) ** 2
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
    return R * c

# Spark ML stages


# Save the model


In [None]:

haversine_udf = udf(haversine, DoubleType())

# Feature engineering
df = df.withColumn("dob", col("dob").cast("date"))
df = df.withColumn("age", (datediff(current_date(), col("dob")) / 365.25).cast(IntegerType()))
df = df.withColumn("distance", haversine_udf(col("lat"), col("long"), col("merch_lat"), col("merch_long")))

# Drop unused columns
drop_cols = ["Unnamed: 0", "trans_date_trans_time", "trans_num", "dob", "unix_time",
             "lat", "long", "merch_lat", "merch_long", "first", "last"]
df = df.drop(*drop_cols)

# Undersampling (randomly reduce majority class)
fraud_df = df.filter(col("is_fraud") == 1)
nonfraud_df = df.filter(col("is_fraud") == 0).sample(fraction=fraud_df.count() / df.filter(col("is_fraud") == 0).count(), seed=42)
df_balanced = fraud_df.union(nonfraud_df)

# Categorical & numeric columns
categorical_cols = [field.name for field in df_balanced.schema.fields if str(field.dataType) == "StringType"]
numeric_cols = [field.name for field in df_balanced.schema.fields if str(field.dataType) != "StringType" and field.name != "is_fraud"]


In [None]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import GBTClassifier
from pyspark.ml import Pipeline

indexers = [StringIndexer(inputCol=colname, outputCol=colname + "_index", handleInvalid="keep") for colname in categorical_cols]
encoders = [OneHotEncoder(inputCol=colname + "_index", outputCol=colname + "_ohe") for colname in categorical_cols]

assembler = VectorAssembler(inputCols=[col + "_ohe" for col in categorical_cols] + numeric_cols, outputCol="features")

# Classifier (can replace with xgboost4j-spark if needed)
classifier = GBTClassifier(labelCol="is_fraud", featuresCol="features", maxIter=50)

# Build pipeline
pipeline = Pipeline(stages=indexers + encoders + [assembler, classifier])

# Train/test split
train_df, test_df = df_balanced.randomSplit([0.8, 0.2], seed=42)

# Train model
model = pipeline.fit(train_df)

# Evaluate
predictions = model.transform(test_df)
predictions.select("is_fraud", "prediction", "probability").show(5, truncate=False)

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="is_fraud", metricName="accuracy")
print("Test Accuracy:", evaluator.evaluate(predictions))

In [None]:
model.save("fraud_detection_sparkml_model")
