In [215]:
# imports

import os
import sys
from pyspark.rdd import RDD
from pyspark.sql import Row
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from pyspark.sql.functions import desc
from pyspark.sql.functions import avg
from sklearn.model_selection import train_test_split
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier
from pyspark.ml.feature import StringIndexer

In [216]:
# spark session creation
def init_spark():
    spark = SparkSession \
        .builder \
        .appName("Python Spark SQL basic example") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()
    return spark

In [217]:
# Read csv and print number of data points
spark = init_spark()
lines = spark.read.csv("dataset/FinalTransactions.csv", header=True)
print(lines.count())

1754155


In [218]:
# convert features column to number
lines = lines.withColumn("TX_FRAUD", col("TX_FRAUD").cast("double"))
lines = lines.withColumn('TX_TIME_SECONDS', col('TX_TIME_SECONDS').cast('double'))
lines = lines.withColumn('TX_TIME_DAYS', col('TX_TIME_DAYS').cast('double'))
lines = lines.withColumn('TX_AMOUNT', col('TX_AMOUNT').cast('double'))

In [219]:
#Schema definition
lines.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- TRANSACTION_ID: string (nullable = true)
 |-- TX_DATETIME: string (nullable = true)
 |-- CUSTOMER_ID: string (nullable = true)
 |-- TERMINAL_ID: string (nullable = true)
 |-- TX_AMOUNT: double (nullable = true)
 |-- TX_TIME_SECONDS: double (nullable = true)
 |-- TX_TIME_DAYS: double (nullable = true)
 |-- TX_FRAUD: double (nullable = true)
 |-- TX_FRAUD_SCENARIO: string (nullable = true)



In [220]:
# Check if any column has null values
from pyspark.sql.functions import col, sum, count

null_counts = lines.agg(*[
    sum(col(c).isNull().cast("int")).alias(c)
    for c in lines.columns
])

null_counts.show()

+---+--------------+-----------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|_c0|TRANSACTION_ID|TX_DATETIME|CUSTOMER_ID|TERMINAL_ID|TX_AMOUNT|TX_TIME_SECONDS|TX_TIME_DAYS|TX_FRAUD|TX_FRAUD_SCENARIO|
+---+--------------+-----------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|  0|             0|          0|          0|          0|        0|              0|           0|       0|                0|
+---+--------------+-----------+-----------+-----------+---------+---------------+------------+--------+-----------------+



In [221]:
# Test train split
train_data, test_data = lines.randomSplit([0.8, 0.2], seed=0)

In [222]:
# Get class distribution in train_data

total_count = train_data.count()
class_distrib = train_data.groupby('TX_FRAUD').agg(count("*").alias("count"))
class_distrib = class_distrib.withColumn("percentage", (class_distrib["count"] / total_count) * 100)
print(class_distrib.show())

+--------+-------+------------------+
|TX_FRAUD|  count|        percentage|
+--------+-------+------------------+
|     0.0|1213927| 86.53322355641572|
|     1.0| 188918|13.466776443584289|
+--------+-------+------------------+

None


In [223]:
#Fraud scenario distribution

fraud_tx = lines.filter(lines["TX_FRAUD"] != "0")
total_count = fraud_tx.count()
csv_file = fraud_tx.groupby('TX_FRAUD_SCENARIO').agg(count("*").alias("count"))
csv_file = csv_file.withColumn("percentage", (csv_file["count"] / total_count) * 100)
print(csv_file.show())

+-----------------+------+------------------+
|TX_FRAUD_SCENARIO| count|        percentage|
+-----------------+------+------------------+
|                3|  4631| 1.962545927643038|
|                1|222261| 94.19076234590136|
|                2|  9077|3.8466917264555938|
+-----------------+------+------------------+

None


In [224]:
#Upsampling of minority class in train set

fraud_count = train_data.filter(col('TX_FRAUD') == 1).count()
non_fraud = train_data.filter(col('TX_FRAUD') == 0)
fraud = train_data.filter(col('TX_FRAUD') == 1)
sampled_non_fraud = non_fraud.sample(False, fraud_count/float(non_fraud.count()))
balanced_data = fraud.union(sampled_non_fraud)

In [225]:
# Get class distribution in dataset after upsampling

total_count = balanced_data.count()
class_distrib = balanced_data.groupby('TX_FRAUD').agg(count("*").alias("count"))
class_distrib = class_distrib.withColumn("percentage", (class_distrib["count"] / total_count) * 100)
print(class_distrib.show())

+--------+------+-----------------+
|TX_FRAUD| count|       percentage|
+--------+------+-----------------+
|     1.0|188918|49.91874773748781|
|     0.0|189533|50.08125226251219|
+--------+------+-----------------+

None


In [226]:
#Create feature set in train set

from pyspark.ml.feature import VectorAssembler, StandardScaler

# Define the feature columns
feature_cols = ['TX_AMOUNT', 'TX_TIME_SECONDS', 'TX_TIME_DAYS']

# Create a VectorAssembler to combine the feature columns into a single vector column
assembler = VectorAssembler(inputCols=feature_cols, outputCol='features')

# Apply the VectorAssembler to the DataFrame to create the vector column
vector_df = assembler.transform(balanced_data)

In [227]:
scaler = StandardScaler(inputCol='features', outputCol='scaled_features')

# Fit the scaler transformer on the vector_df DataFrame
scaler_model = scaler.fit(vector_df)

# Apply the scaler transformer to the vector_df DataFrame to create the scaled_features column
scaled_df = scaler_model.transform(vector_df)

In [228]:
#Print a sample of train set
print(scaled_df.take(2))

[Row(_c0='1000', TRANSACTION_ID='1000', TX_DATETIME='2023-01-01 05:31:44', CUSTOMER_ID='1889', TERMINAL_ID='5651', TX_AMOUNT=1474.26, TX_TIME_SECONDS=19904.0, TX_TIME_DAYS=0.0, TX_FRAUD=1.0, TX_FRAUD_SCENARIO='1', features=DenseVector([1474.26, 19904.0, 0.0]), scaled_features=DenseVector([0.6368, 0.0044, 0.0])), Row(_c0='100015', TRANSACTION_ID='100015', TX_DATETIME='2023-01-11 11:05:42', CUSTOMER_ID='3321', TERMINAL_ID='1656', TX_AMOUNT=1070.98, TX_TIME_SECONDS=903942.0, TX_TIME_DAYS=10.0, TX_FRAUD=1.0, TX_FRAUD_SCENARIO='1', features=DenseVector([1070.98, 903942.0, 10.0]), scaled_features=DenseVector([0.4626, 0.1982, 0.1895]))]


In [229]:
#define random forest

dt = RandomForestClassifier(labelCol='TX_FRAUD', featuresCol='features', maxDepth=5)


In [230]:
#model training
dt_model = dt.fit(scaled_df)

In [231]:
# making prediction & calculating test accuracy

# Apply the VectorAssembler to the test_data DataFrame to create the vector column
test_vector_df = assembler.transform(test_data)

# Apply the scaler transformer to the test_vector_df DataFrame to create the scaled_features column
test_scaled_df = scaler_model.transform(test_vector_df)

predictions = dt_model.transform(test_vector_df)

# Evaluate the model's performance
evaluator = BinaryClassificationEvaluator(labelCol=label_col)
auc = evaluator.evaluate(predictions)
print(f"AUC: {auc}")

AUC: 0.9793500056452641


In [232]:
#calculating Precision & recall

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol=label_col, metricName="weightedPrecision")
precision = evaluator.evaluate(predictions)
print(f"Precision: {precision}")

evaluator = MulticlassClassificationEvaluator(labelCol=label_col, metricName="weightedRecall")
recall = evaluator.evaluate(predictions)
print(f"Recall: {recall}")

Precision: 0.9946083675811852
Recall: 0.9945745922404714


In [233]:
#Calculating F1 score

evaluator = MulticlassClassificationEvaluator(
    labelCol=label_col, predictionCol="prediction", metricName="f1")

# Compute the F1 score
f1_score = evaluator.evaluate(predictions)
print(f"F1 Score: {f1_score:.3f}")

F1 Score: 0.995
