### Implementing a Balanced Oversampling Technique with a 1:1 Target Label Ratio

In [1]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import regexp_replace, col, when, substring, concat, lit, to_date, date_format

from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, VectorIndexer
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.linalg import Vectors

#### Start a simple Spark Session

In [2]:
spark = SparkSession.builder.appName('fraud_detection').master("local[*]").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/11 21:29:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
col_names = ['User', 'Card', 'Year', 'Month', 'Day', 'Time', 'Amount', 'Use Chip', 'Merchant Name', 'Merchant City', 
'Merchant State', 'MCC', 'Errors?', 'Is Fraud?', 'Hour', 'Minute', 'Date', 'Day_of_Week']

df = spark.read.option("delimiter", "|").csv('credit_card_transactions.csv', header=None, inferSchema=True).toDF(*col_names)
df = df.repartition(10)

24/06/11 21:29:49 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
                                                                                

In [4]:
df.printSchema()

root
 |-- User: integer (nullable = true)
 |-- Card: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Day: integer (nullable = true)
 |-- Time: timestamp (nullable = true)
 |-- Amount: double (nullable = true)
 |-- Use Chip: string (nullable = true)
 |-- Merchant Name: long (nullable = true)
 |-- Merchant City: string (nullable = true)
 |-- Merchant State: string (nullable = true)
 |-- MCC: integer (nullable = true)
 |-- Errors?: string (nullable = true)
 |-- Is Fraud?: integer (nullable = true)
 |-- Hour: double (nullable = true)
 |-- Minute: double (nullable = true)
 |-- Date: date (nullable = true)
 |-- Day_of_Week: string (nullable = true)



##### Encoding Some Categorical Fields for Spark Pipeline & Create Stages

In [5]:
stages = []
categorical_cols = ['Use Chip', 'Day_of_Week']
numerical_cols = ['Card', 'Year', 'Month', 'Day', 'Amount', 'MCC', 'Hour', 'Minute']

# Indexers for categorical columns
indexers = [StringIndexer(inputCol=col, outputCol=col+'_indexed') for col in categorical_cols]
# Encoders for categorical columns
encoders = [OneHotEncoder(inputCol=col+'_indexed', outputCol=col+'_OHE') for col in categorical_cols]

assembler = VectorAssembler(inputCols=(numerical_cols + ['Use Chip_OHE', 'Day_of_Week_OHE']),
                            outputCol='features')

lr_model = LogisticRegression(featuresCol='features', labelCol='Is Fraud?')

stages = indexers + encoders + [assembler, lr_model]

pipeline = Pipeline(stages=stages)

####  Oversampling Minority Class

In [6]:
def oversample_minority(df):
    
    num_limit = 300000

    fraction = num_limit / df.filter(col('Is Fraud?') == 0).count()

    df_normal = df.filter(col('Is Fraud?')==0).sample(withReplacement=False, fraction=fraction, seed=42)
    
    fraud_count = df.filter(col('Is Fraud?')==1).count()

    balance_ratio = num_limit / fraud_count
    
    oversampled_minority = df.filter(col('Is Fraud?')==1).sample(withReplacement=True, fraction=(balance_ratio), seed=42)
    oversampled_df = df_normal.union(oversampled_minority)
    
    return oversampled_df

In [7]:
oversample_balanced_df = oversample_minority(df)

                                                                                

In [8]:
oversample_balanced_df.groupBy('Is Fraud?').count().show()



+---------+------+
|Is Fraud?| count|
+---------+------+
|        0|300444|
|        1|299918|
+---------+------+



                                                                                

In [9]:
train_oversampled, test_oversampled = oversample_balanced_df.randomSplit([0.8, 0.2], seed=42)

In [10]:
lr_oversampled_model = pipeline.fit(train_oversampled)

24/06/11 21:33:16 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/06/11 21:33:16 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS

In [11]:
predictions = lr_oversampled_model.transform(test_oversampled)

In [12]:
my_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='Is Fraud?')
AUC = my_eval.evaluate(predictions)
AUC

                                                                                

0.7624068112917932

In [13]:
evaluator = MulticlassClassificationEvaluator(labelCol="Is Fraud?", predictionCol="prediction")

# Compute metrics
accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
f1 = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})


evaluator_precision = MulticlassClassificationEvaluator(labelCol="Is Fraud?", predictionCol="prediction", metricName="precisionByLabel")
precision_label_0 = evaluator_precision.evaluate(predictions, {evaluator_precision.metricLabel: 0.0})
precision_label_1 = evaluator_precision.evaluate(predictions, {evaluator_precision.metricLabel: 1.0})

evaluator_recall = MulticlassClassificationEvaluator(labelCol="Is Fraud?", predictionCol="prediction", metricName="recallByLabel")
recall_label_0 = evaluator_recall.evaluate(predictions, {evaluator_recall.metricLabel: 0.0})
recall_label_1 = evaluator_recall.evaluate(predictions, {evaluator_recall.metricLabel: 1.0})


f1 = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})

# Print the metrics
print(f"Accuracy: {accuracy}")
print(f"F1 Score: {f1}")

print(f"Precision label 1: {precision_label_1}")
print(f"Recall label 1: {recall_label_1}")

print(f"Precision label 0: {precision_label_0}")
print(f"Recall label 0: {recall_label_0}")




Accuracy: 0.7622171039455238
F1 Score: 0.759422484500134
Precision label 1: 0.8354007532075151
Recall label 1: 0.6541543793005782
Precision label 0: 0.7149918306262272
Recall label 0: 0.8706592432830081


                                                                                

In [14]:
# Calculate confusion matrix manually
confusion_matrix = predictions.groupBy("Is Fraud?").pivot("prediction").count().na.fill(0).orderBy("Is Fraud?")
confusion_matrix.show()



+---------+-----+-----+
|Is Fraud?|  0.0|  1.0|
+---------+-----+-----+
|        0|52075| 7736|
|        1|20758|39263|
+---------+-----+-----+



                                                                                