In [117]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import unix_timestamp, col
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier, DecisionTreeClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [118]:
spark = SparkSession.builder.appName("CreditCardFraudDetection").getOrCreate()
fraudTrain = spark.read.csv("fraudTrain.csv", header=True, inferSchema=True)
fraudTest = spark.read.csv("fraudTest.csv", header=True, inferSchema=True)

In [119]:
drop_cols = ['_c0', 'cc_num', 'first', 'last', 'gender', 'street', 'city', 'state',
             'zip', 'job', 'dob', 'trans_num', 'merchant']
fraudTrain = fraudTrain.drop(*drop_cols)
fraudTest = fraudTest.drop(*drop_cols)

In [120]:
fraudTrain = fraudTrain.withColumn("trans_date_ts", unix_timestamp("trans_date_trans_time")).drop("trans_date_trans_time")
fraudTest = fraudTest.withColumn("trans_date_ts", unix_timestamp("trans_date_trans_time")).drop("trans_date_trans_time")

In [121]:
fraudTrain = fraudTrain.filter(fraudTrain["category"].isNotNull()) 
# Remove rows with null values in "category"
fraudTest = fraudTest.filter(fraudTest["category"].isNotNull())  
# Remove rows with null values in "category"

# Fill missing values in "category" with "unknown" if necessary
fraudTrain = fraudTrain.fillna({"category": "unknown"})
fraudTest = fraudTest.fillna({"category": "unknown"})


In [122]:
# Ensure "category" column is of string type
fraudTrain = fraudTrain.withColumn("category", col("category").cast("string"))
fraudTest = fraudTest.withColumn("category", col("category").cast("string"))

# String Indexing
indexer = StringIndexer(inputCol="category", outputCol="category_index", handleInvalid="skip")
indexer_model = indexer.fit(fraudTrain)
fraudTrain = indexer_model.transform(fraudTrain)
fraudTest = indexer_model.transform(fraudTest)

In [123]:
print("Schema of fraudTrain dataset:")
fraudTrain.printSchema()

print("Sample rows from fraudTrain:")
fraudTrain.show(5)

print("Summary statistics:")
fraudTrain.select("amt", "lat", "long", "city_pop", "unix_time", "merch_lat", "merch_long").describe().show()


Schema of fraudTrain dataset:
root
 |-- category: string (nullable = false)
 |-- amt: double (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- city_pop: integer (nullable = true)
 |-- unix_time: integer (nullable = true)
 |-- merch_lat: double (nullable = true)
 |-- merch_long: double (nullable = true)
 |-- is_fraud: integer (nullable = true)
 |-- trans_date_ts: long (nullable = true)
 |-- category_index: double (nullable = false)

Sample rows from fraudTrain:
+-------------+------+-------+---------+--------+----------+------------------+-----------+--------+-------------+--------------+
|     category|   amt|    lat|     long|city_pop| unix_time|         merch_lat| merch_long|is_fraud|trans_date_ts|category_index|
+-------------+------+-------+---------+--------+----------+------------------+-----------+--------+-------------+--------------+
|     misc_net|  4.97|36.0788| -81.1781|    3495|1325376018|         36.011293| -82.048315|       0| 

In [124]:
print("Schema of fraudTest dataset:")
fraudTest.printSchema()

print("Sample rows from fraudTrain:")
fraudTest.show(5)

print("Summary statistics:")
fraudTest.select("amt", "lat", "long", "city_pop", "unix_time", "merch_lat", "merch_long").describe().show()


Schema of fraudTest dataset:
root
 |-- category: string (nullable = false)
 |-- amt: double (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- city_pop: integer (nullable = true)
 |-- unix_time: integer (nullable = true)
 |-- merch_lat: double (nullable = true)
 |-- merch_long: double (nullable = true)
 |-- is_fraud: integer (nullable = true)
 |-- trans_date_ts: long (nullable = true)
 |-- category_index: double (nullable = false)

Sample rows from fraudTrain:
+--------------+-----+-------+------------------+--------+----------+------------------+-----------+--------+-------------+--------------+
|      category|  amt|    lat|              long|city_pop| unix_time|         merch_lat| merch_long|is_fraud|trans_date_ts|category_index|
+--------------+-----+-------+------------------+--------+----------+------------------+-----------+--------+-------------+--------------+
| personal_care| 2.86|33.9659|          -80.9355|  333497|1371816865|      

In [125]:
# 6. Feature Assembly
features = ['amt', 'lat', 'long', 'city_pop', 'unix_time', 'merch_lat', 'merch_long', 'trans_date_ts', 'category_index']
assembler = VectorAssembler(inputCols=features, outputCol="features")
fraudTrain = assembler.transform(fraudTrain).select("features", "is_fraud")
fraudTest = assembler.transform(fraudTest).select("features", "is_fraud")

Train Models

In [126]:
lr = LogisticRegression(labelCol="is_fraud", featuresCol="features")
lr_model = lr.fit(fraudTrain)
print("Logistic Regression model")
print("Coefficients: " + str(lr_model.coefficients))

Logistic Regression model
Coefficients: [0.00280212298826414,0.04026609966050056,0.007865715632323522,8.71519358046513e-08,-4.521840406740288e-06,-0.03181115584332793,-0.00585068198450549,4.512287057236062e-06,-0.07032644543383682]


In [127]:
rf = RandomForestClassifier(labelCol="is_fraud", featuresCol="features", numTrees=50)
rf_model = rf.fit(fraudTrain)
print("Random Forest model")
print("Feature Importances: " + str(rf_model.featureImportances))
print("Number of Trees: " + str(rf_model.getNumTrees))
print("Max Depth: " + str(rf_model.getMaxDepth))

Random Forest model
Feature Importances: (9,[0,1,2,3,4,5,6,7,8],[0.7023911121238602,0.004562638595043176,0.004332124798876209,0.019919338772241894,0.005351964862438715,0.004415761895131943,0.0046105451822584425,0.00658888309617735,0.24782763067397245])
Number of Trees: 50
Max Depth: <bound method _DecisionTreeParams.getMaxDepth of RandomForestClassificationModel: uid=RandomForestClassifier_6d7e2f05ffd0, numTrees=50, numClasses=2, numFeatures=9>


In [128]:
gbt = GBTClassifier(labelCol="is_fraud", featuresCol="features", maxIter=20)
gbt_model = gbt.fit(fraudTrain)
print("Gradient Boosted Trees model")
print("Feature Importances: " + str(gbt_model.featureImportances))
print("Number of Trees: " + str(gbt_model.getNumTrees))
print("Max Depth: " + str(gbt_model.getMaxDepth))

Gradient Boosted Trees model
Feature Importances: (9,[0,1,2,3,4,5,6,8],[0.44429708009183283,0.01654912615240762,0.017842684240530222,0.0880958532011804,0.02934285389356067,0.0014005439564304413,0.004885449859245977,0.3975864086048118])
Number of Trees: 20
Max Depth: <bound method _DecisionTreeParams.getMaxDepth of GBTClassificationModel: uid = GBTClassifier_bf98a93d79d7, numTrees=20, numClasses=2, numFeatures=9>


In [129]:
dt = DecisionTreeClassifier(labelCol="is_fraud", featuresCol="features")
dt_model = dt.fit(fraudTrain)
print("Decision Tree model")
print("Feature Importances: " + str(dt_model.featureImportances))
print("Max Depth: " + str(dt_model.getMaxDepth))
print("Max Bins: " + str(dt_model.getMaxBins)) 

Decision Tree model
Feature Importances: (9,[0,2,3,4,5,8],[0.47256114815476546,0.0010765681447851422,0.14755690887134257,0.00324538948461812,0.0011726078266356206,0.37438737751785306])
Max Depth: <bound method _DecisionTreeParams.getMaxDepth of DecisionTreeClassificationModel: uid=DecisionTreeClassifier_60a49c0fa406, depth=5, numNodes=35, numClasses=2, numFeatures=9>
Max Bins: <bound method _DecisionTreeParams.getMaxBins of DecisionTreeClassificationModel: uid=DecisionTreeClassifier_60a49c0fa406, depth=5, numNodes=35, numClasses=2, numFeatures=9>


In [130]:
from pyspark.ml.classification import MultilayerPerceptronClassifier

# Layers: input layer (9 features), 2 hidden layers (10, 5), output (2 classes)
mlp = MultilayerPerceptronClassifier(labelCol="is_fraud", featuresCol="features",
                                     layers=[9, 10, 5, 2], blockSize=128, seed=1234, maxIter=100)
mlp_model = mlp.fit(fraudTrain)
print("Multilayer Perceptron model")
print("Weights: " + str(mlp_model.weights))
print("Number of Layers: " + str(mlp_model.getLayers))
print("Block Size: " + str(mlp_model.getBlockSize))

Multilayer Perceptron model
Weights: [0.3441669510279775,0.5335037792349232,-0.4649653548044888,-0.4236094480668578,0.6296651943077293,-0.12741048585107992,0.0005095617636667882,-0.045905564907101105,0.11947031071660863,0.2485883149683549,0.15585591205861324,-0.07123722944094106,0.047496929201056005,0.20195576205316712,-0.5665286642974072,0.27019557293097013,-0.5052809630172387,0.029538126482082944,0.14628523185909437,0.05314699227989491,-0.4471885349114361,0.5670443162514412,-0.151879115072343,-0.036047501250621984,0.6404111664720772,0.6349218121865925,0.6646456069408733,0.5606466870687726,0.08740783196628292,0.4751112307131016,-0.7379563329316778,0.4276206507691594,0.42663730231519664,0.17538638149751065,0.2602937182915073,-0.681807466962251,0.501189586992898,-0.6522285649928932,-0.6158192800855594,0.2980000990228621,-0.015371358382584965,0.22455673241012356,0.06066686709947403,-0.4366485773618715,-0.19396258428123261,0.5971140007519652,-0.7750711052584455,0.6148142856822143,-0.10619

In [131]:
from pyspark.ml.classification import LinearSVC
svm = LinearSVC(labelCol="is_fraud", featuresCol="features")
svm_model = svm.fit(fraudTrain)
print("Linear SVM model")
print("Coefficients: " + str(svm_model.coefficients))
print("Intercept: " + str(svm_model.intercept))
print("Max Iterations: " + str(svm_model.getMaxIter))
print("Reg Param: " + str(svm_model.getRegParam))

Linear SVM model
Coefficients: [4.1952552040777895e-08,-0.0001453959608717851,5.826615416237605e-09,2.3096355950573977e-13,-1.6902262282342196e-14,0.00014546223392997748,6.363476259749399e-09,-2.342326498762409e-15,-5.2402358427546885e-08]
Intercept: -1.0001217234451034
Max Iterations: <bound method HasMaxIter.getMaxIter of LinearSVCModel: uid=LinearSVC_a84789748cee, numClasses=2, numFeatures=9>
Reg Param: <bound method HasRegParam.getRegParam of LinearSVCModel: uid=LinearSVC_a84789748cee, numClasses=2, numFeatures=9>


In [132]:
# Evaluate Models
evaluator = BinaryClassificationEvaluator(labelCol="is_fraud")
print("Model Evaluation: ")
print("Logistic Regression AUC:", evaluator.evaluate(lr_model.transform(fraudTest)))
print("Random Forest AUC:", evaluator.evaluate(rf_model.transform(fraudTest)))
print("GBT AUC:", evaluator.evaluate(gbt_model.transform(fraudTest)))
print("Decision Tree AUC:", evaluator.evaluate(dt_model.transform(fraudTest)))
print("MLP AUC:", evaluator.evaluate(mlp_model.transform(fraudTest)))
print("SVM AUC:", evaluator.evaluate(svm_model.transform(fraudTest)))

Model Evaluation: 
Logistic Regression AUC: 0.8458164653013038
Random Forest AUC: 0.9165189240339087
GBT AUC: 0.9850874688650666
Decision Tree AUC: 0.6384902183794472
MLP AUC: 0.5
SVM AUC: 0.5628041815631913


In [133]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

precision_evaluator = MulticlassClassificationEvaluator(labelCol="is_fraud", predictionCol="prediction", metricName="precisionByLabel")
recall_evaluator = MulticlassClassificationEvaluator(labelCol="is_fraud", predictionCol="prediction", metricName="recallByLabel")
f1_evaluator = MulticlassClassificationEvaluator(labelCol="is_fraud", predictionCol="prediction", metricName="f1")

def evaluate_model(model, name):
    predictions = model.transform(fraudTest)
    auc = evaluator.evaluate(predictions)
    precision = precision_evaluator.evaluate(predictions)
    recall = recall_evaluator.evaluate(predictions)
    f1 = f1_evaluator.evaluate(predictions)

    print(f"\n{name} Evaluation:")
    print(f"AUC: {auc:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1 Score: {f1:.4f}")

evaluate_model(lr_model, "Logistic Regression")
evaluate_model(rf_model, "Random Forest")
evaluate_model(gbt_model, "Gradient Boosted Trees")
evaluate_model(dt_model, "Decision Tree")
evaluate_model(mlp_model, "MLP Classifier")
evaluate_model(svm_model, "Linear SVM")



Logistic Regression Evaluation:
AUC: 0.8458
Precision: 0.9961
Recall: 0.9994
F1 Score: 0.9939

Random Forest Evaluation:
AUC: 0.9165
Precision: 0.9961
Recall: 1.0000
F1 Score: 0.9942

Gradient Boosted Trees Evaluation:
AUC: 0.9851
Precision: 0.9968
Recall: 1.0000
F1 Score: 0.9956

Decision Tree Evaluation:
AUC: 0.6385
Precision: 0.9978
Recall: 0.9987
F1 Score: 0.9964

MLP Classifier Evaluation:
AUC: 0.5000
Precision: 0.9961
Recall: 1.0000
F1 Score: 0.9942

Linear SVM Evaluation:
AUC: 0.5628
Precision: 0.9961
Recall: 1.0000
F1 Score: 0.9942


In [None]:
import xgboost as xgb
import pandas as pd

train_pd = fraudTrain.select("features", "is_fraud").toPandas()
test_pd = fraudTest.select("features", "is_fraud").toPandas()

import numpy as np
X_train = np.array([row.toArray() for row in train_pd['features']])
y_train = train_pd['is_fraud']
X_test = np.array([row.toArray() for row in test_pd['features']])
y_test = test_pd['is_fraud']

model = xgb.XGBClassifier(n_estimators=50, use_label_encoder=False, eval_metric='logloss')
model.fit(X_train, y_train)

from sklearn.metrics import classification_report, roc_auc_score
y_pred = model.predict(X_test)
y_prob = model.predict_proba(X_test)[:, 1]

print("XGBoost AUC:", roc_auc_score(y_test, y_prob))
print(classification_report(y_test, y_pred))


In [None]:
# from xgboost.spark import SparkXGBClassifier

# xgb = SparkXGBClassifier(label_col="is_fraud", features_col="features", num_round=20)
# xgb_model = xgb.fit(fraudTrain)
# xgb_predictions = xgb_model.transform(fraudTest)

# # Evaluate
# evaluate_model(xgb_model, "XGBoost Classifier")

User Input

In [None]:

user_input = {
    'amt': 100.0,
    'lat': 37.7749,
    'long': -122.4194,
    'city_pop': 50000,
    'unix_time': 1325376018,
    'merch_lat': 37.0,
    'merch_long': -122.0,
    'trans_date_ts': 1577836800,
    'category': 'misc_pos'
}

input_df = spark.createDataFrame([Row(**user_input)])
input_df = indexer_model.transform(input_df)
input_df = assembler.transform(input_df)


In [None]:
print("Logistic Regression Prediction:")
lr_pred=lr_model.transform(input_df).select("prediction", "probability")
lr_pred.show()

print("Random Forest Prediction:")
rf_pred=rf_model.transform(input_df).select("prediction", "probability")
rf_pred.show()

print("GBT Prediction:")
gbt_pred=gbt_model.transform(input_df).select("prediction", "probability")
gbt_pred.show()

print("Decision Tree Prediction:")
dt_pred=dt_model.transform(input_df).select("prediction", "probability")
dt_pred.show()

print("MLP Prediction:")
mlp_pred=mlp_model.transform(input_df).select("prediction", "probability")
mlp_pred.show()

print("SVM Prediction:")
svm_pred=svm_model.transform(input_df).select("prediction", "rawPrediction")
svm_pred.show()

Logistic Regression Prediction:
+----------+--------------------+
|prediction|         probability|
+----------+--------------------+
|       1.0|[2.55771086293919...|
+----------+--------------------+

Random Forest Prediction:
+----------+--------------------+
|prediction|         probability|
+----------+--------------------+
|       0.0|[0.99671160260851...|
+----------+--------------------+

GBT Prediction:
+----------+--------------------+
|prediction|         probability|
+----------+--------------------+
|       0.0|[0.95641898530100...|
+----------+--------------------+

Decision Tree Prediction:
+----------+--------------------+
|prediction|         probability|
+----------+--------------------+
|       0.0|[0.99966359847532...|
+----------+--------------------+

MLP Prediction:
+----------+--------------------+
|prediction|         probability|
+----------+--------------------+
|       0.0|[0.99421319048786...|
+----------+--------------------+

SVM Prediction:
+----------+-

Xg Boost Pred

In [None]:
user_features = np.array([row.features.toArray() for row in input_df.select("features").collect()])

xgb_user_pred = int(model.predict(user_features)[0])
xgb_user_prob = float(model.predict_proba(user_features)[0][1])

print(f"XGBoost Prediction for User Input: {xgb_user_pred}")
print(f"XGBoost Probability of no Fraud: {1 - xgb_user_prob:.4f}")

XGBoost Prediction for User Input: 0
XGBoost Probability of no Fraud: 0.9994


In [None]:
# Ensemble => for Prediction 
preds = [
    lr_pred.first()[0],
    rf_pred.first()[0],
    gbt_pred.first()[0],
    dt_pred.first()[0],
    mlp_pred.first()[0],
    svm_pred.first()[0],
    xgb_user_pred,
]
final_vote = round(sum(preds) / len(preds))
print(f"Ensembled (majority voting) prediction: {final_vote}")

Ensembled (majority voting) prediction: 0


In [None]:
from pyspark.ml.linalg import DenseVector

#all floats
preds = [
    # float(lr_pred.first()[1][0]),     
    # Logistic Regression prob
    float(rf_pred.first()[1][0]),     
    # Random Forest prob
    float(gbt_pred.first()[1][0]),    
    # Gradient Boosted Tree prob
    float(dt_model.first()[1][0]),    
    # Decision Tree prob
    float(mlp_pred.first()[1][0]),    
    # MLP prob
    float(svm_pred.first()[1][0]),    
    # SVM prob
    float(xgb_user_prob)
]

final_prob = sum(preds) / len(preds)
final_vote = round(final_prob)

print(f"Ensembled Probability of Fraud: {final_prob:.4f}")
print(f"Ensembled (Majority Voting) Prediction: {final_vote}")


Ensembled Probability of Fraud: 0.8246
Ensembled (Majority Voting) Prediction: 1


Saving the Models

In [None]:
import os
os.environ["HADOOP_HOME"] = "C:/hadoop"
os.environ["PATH"] += os.pathsep + "C:/hadoop/bin"


lr_model.write().overwrite().save("models/lr_model")

In [None]:
# import os

# # Set the HADOOP_HOME environment variable
# os.environ["HADOOP_HOME"] = "C:/hadoop"
# os.environ["PATH"] += os.pathsep + "C:/hadoop/bin"

# # Now start Spark session after setting the environment variables
# from pyspark.sql import SparkSession

# spark = SparkSession.builder.appName("CreditCardFraudDetection").getOrCreate()

# # Now you should be able to save your models
# lr_model.write().overwrite().save("models/lr_model")
# rf_model.write().overwrite().save("models/rf_model")
# gbt_model.write().overwrite().save("models/gbt_model")
# dt_model.write().overwrite().save("models/dt_model")
# mlp_model.write().overwrite().save("models/mlp_model")
# svm_model.write().overwrite().save("models/svm_model")

# # Preprocessing stages
# indexer_model.write().overwrite().save("models/indexer_model")
# assembler.write().overwrite().save("models/assembler")


In [None]:
# # Save Spark MLlib models
# lr_model.save("models/lr_model")
# rf_model.save("models/rf_model")
# gbt_model.save("models/gbt_model")
# dt_model.save("models/dt_model")
# mlp_model.save("models/mlp_model")
# svm_model.save("models/svm_model")

# # Save preprocessing stages
# indexer_model.write().overwrite().save("models/indexer_model")
# assembler.write().overwrite().save("models/assembler")

# # Save XGBoost model
# import joblib
# joblib.dump(model, "models/xgb_model.pkl")


In [None]:
spark.version

'3.5.5'

In [None]:
from pyspark.ml.linalg import DenseVector

#all floats
preds = [
    # float(lr_pred.first()[1][0]),     
    # Logistic Regression prob
    float(rf_pred.first()[1][0]),     
    # Random Forest prob
    float(gbt_pred.first()[1][0]),    
    # Gradient Boosted Tree prob
    float(dt_model.first()[1][0]),    
    # Decision Tree prob
    float(mlp_pred.first()[1][0]),    
    # MLP prob
    float(svm_pred.first()[1][0]),    
    # SVM prob
    float(xgb_user_prob)
]

final_prob = sum(preds) / len(preds)
final_vote = round(final_prob)

print(f"Ensembled Probability of Fraud: {final_prob:.4f}")
print(f"Ensembled (Majority Voting) Prediction: {final_vote}")


Ensembled Probability of Fraud: 0.8246
Ensembled (Majority Voting) Prediction: 1
