# Read & Proprseccing

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


spark = SparkSession \
        .builder \
        .appName("FraudModel") \
        .getOrCreate()

In [42]:
train_df = spark.read.csv('/home/jovyan/data/fraudTrain.csv', sep=',', encoding='UTF-8', header=True, inferSchema=True)

test_df = spark.read.csv('/home/jovyan/data/fraudTest.csv', sep=',', encoding='UTF-8', header=True, inferSchema=True)

In [43]:
train_df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- trans_date_trans_time: timestamp (nullable = true)
 |-- cc_num: double (nullable = true)
 |-- merchant: string (nullable = true)
 |-- category: string (nullable = true)
 |-- amt: double (nullable = true)
 |-- first: string (nullable = true)
 |-- last: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- street: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- city_pop: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- dob: date (nullable = true)
 |-- trans_num: string (nullable = true)
 |-- unix_time: integer (nullable = true)
 |-- merch_lat: double (nullable = true)
 |-- merch_long: double (nullable = true)
 |-- is_fraud: integer (nullable = true)



In [44]:
columns_to_drop = ['_c0', 'merchant', 'gender', 'category', 'cc_num', 'first', 'last', 
                   'street', 'city', 'state', 'zip', 'dob', 'trans_num', 'trans_date_trans_time','job']

train_df = train_df.drop(*columns_to_drop).na.drop()
test_df = test_df.drop(*columns_to_drop).na.drop()

In [45]:
train_df.printSchema()

root
 |-- amt: double (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- city_pop: integer (nullable = true)
 |-- unix_time: integer (nullable = true)
 |-- merch_lat: double (nullable = true)
 |-- merch_long: double (nullable = true)
 |-- is_fraud: integer (nullable = true)



In [32]:
train_df.groupBy("is_fraud").count().show()

+--------+-------+
|is_fraud|  count|
+--------+-------+
|       1|   6006|
|       0|1042569|
+--------+-------+



In [41]:
test_df.groupBy("is_fraud").count().show()

+--------+------+
|is_fraud| count|
+--------+------+
|       1|  2145|
|       0|553574|
+--------+------+



### So this data is imbalanced.

In [46]:
fraud_count = train_df.filter(col("is_fraud") == 1).count()
non_fraud_count = train_df.filter(col("is_fraud") == 0).count()

print(f"Fraud Count: {fraud_count}, Non-Fraud Count: {non_fraud_count}")

fraud_df = train_df.filter(col("is_fraud") == 1)
non_fraud_df = train_df.filter(col("is_fraud") == 0).sample(withReplacement=False, fraction=fraud_count / non_fraud_count, seed=42)

balanced_train_df = fraud_df.union(non_fraud_df)

balanced_train_df.groupBy("is_fraud").count().show()

Fraud Count: 6006, Non-Fraud Count: 1042569
+--------+-----+
|is_fraud|count|
+--------+-----+
|       1| 6006|
|       0| 6035|
+--------+-----+



In [47]:
balanced_train_df.show()

+-------+-------+---------+--------+----------+---------+-----------+--------+
|    amt|    lat|     long|city_pop| unix_time|merch_lat| merch_long|is_fraud|
+-------+-------+---------+--------+----------+---------+-----------+--------+
| 281.06|35.9946| -81.7266|     885|1325466397|36.430124| -81.179483|       1|
|  11.52|  29.44|  -98.459| 1595797|1325468849|29.819364| -99.142791|       1|
| 276.31|  29.44|  -98.459| 1595797|1325473523|29.273085|  -98.83636|       1|
|   7.03|35.9946| -81.7266|     885|1325475483|35.909292|  -82.09101|       1|
| 275.73|  29.44|  -98.459| 1595797|1325476547|29.786426|  -98.68341|       1|
|  844.8|35.9946| -81.7266|     885|1325511488|35.987802| -81.254332|       1|
| 843.91|35.9946| -81.7266|     885|1325548328|35.985612| -81.383306|       1|
|  10.76|  29.44|  -98.459| 1595797|1325552727|28.856712| -97.794207|       1|
| 332.35|  29.44|  -98.459| 1595797|1325554552|29.320662| -97.937219|       1|
| 315.34|  29.44|  -98.459| 1595797|1325560656|28.95

# Model

In [48]:
feature_cols = ["amt", "lat", "long", "city_pop", "unix_time", "merch_lat", "merch_long"]

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

rf = RandomForestClassifier(labelCol="is_fraud", featuresCol="features", numTrees=100)

In [49]:
pipeline = Pipeline(stages=[assembler, rf])

In [50]:
model = pipeline.fit(balanced_train_df)

predictions = model.transform(test_df)

In [51]:
evaluator = MulticlassClassificationEvaluator(labelCol="is_fraud", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Test Accuracy: {accuracy:.4f}")

predictions.select("is_fraud", "prediction", "features").show(10, truncate=False)

Test Accuracy: 0.9751
+--------+----------+-----------------------------------------------------------------------------+
|is_fraud|prediction|features                                                                     |
+--------+----------+-----------------------------------------------------------------------------+
|0       |0.0       |[2.86,33.9659,-80.9355,333497.0,1.371816865E9,33.986391,-81.200714]          |
|0       |0.0       |[29.84,40.3207,-110.436,302.0,1.371816873E9,39.450497999999996,-109.960431]  |
|0       |0.0       |[41.28,40.6729,-73.5365,34496.0,1.371816893E9,40.49581,-74.196111]           |
|0       |0.0       |[60.05,28.5697,-80.8191,54767.0,1.371816915E9,28.812397999999998,-80.883061] |
|0       |0.0       |[3.19,44.2529,-85.01700000000001,1126.0,1.371816917E9,44.959148,-85.884734]  |
|0       |0.0       |[19.55,42.1939,-76.7361,520.0,1.371816937E9,41.747157,-77.584197]            |
|0       |0.0       |[133.93,40.507,-123.9743,1139.0,1.371816944E9,41.49945800

In [52]:
conf_matrix = predictions.groupBy("is_fraud", "prediction").count().orderBy("is_fraud", "prediction")
conf_matrix.show()

+--------+----------+------+
|is_fraud|prediction| count|
+--------+----------+------+
|       0|       0.0|540332|
|       0|       1.0| 13242|
|       1|       0.0|   577|
|       1|       1.0|  1568|
+--------+----------+------+



In [54]:
# Save the trained model
model_path = "/home/jovyan/data/random_forest_model"
model.write().overwrite().save(model_path)

In [None]:
# to use the model again
from pyspark.ml.classification import RandomForestClassificationModel

model_path = "/home/jovyan/data/random_forest_model"
rf_model_loaded = RandomForestClassificationModel.load(model_path)

predictions = rf_model_loaded.transform(test_df)