In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import isnan, when, count, col
from pyspark.ml.feature import VectorAssembler
import pandas as pd
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve, auc
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [2]:
spark = SparkSession.builder.appName("myapp").master('local[*]').getOrCreate()

24/12/03 15:02:06 WARN Utils: Your hostname, MacBook-Pro-di-Lorenzo-2.local resolves to a loopback address: 127.0.0.1; using 192.168.1.166 instead (on interface en0)
24/12/03 15:02:06 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/03 15:02:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
df = spark.read.csv('dataset/preprocessed_data/preprocessedData_SI.csv', header=True, inferSchema=True)

                                                                                

In [4]:
df.printSchema()

root
 |-- fraud_bool: double (nullable = true)
 |-- income: double (nullable = true)
 |-- name_email_similarity: double (nullable = true)
 |-- prev_address_months_count: double (nullable = true)
 |-- current_address_months_count: double (nullable = true)
 |-- customer_age: double (nullable = true)
 |-- days_since_request: double (nullable = true)
 |-- intended_balcon_amount: double (nullable = true)
 |-- zip_count_4w: double (nullable = true)
 |-- velocity_6h: double (nullable = true)
 |-- velocity_24h: double (nullable = true)
 |-- velocity_4w: double (nullable = true)
 |-- bank_branch_count_8w: double (nullable = true)
 |-- date_of_birth_distinct_emails_4w: double (nullable = true)
 |-- credit_risk_score: double (nullable = true)
 |-- email_is_free: double (nullable = true)
 |-- phone_home_valid: double (nullable = true)
 |-- phone_mobile_valid: double (nullable = true)
 |-- bank_months_count: double (nullable = true)
 |-- has_other_cards: double (nullable = true)
 |-- proposed_credi

In [5]:
df.count()

24/12/03 15:02:23 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
                                                                                

1000000

In [6]:
#Rename target_bool column to label
df_transformed = df.withColumnRenamed("fraud_bool", "label")

# Classification with GBTClassifier and base dataset

In [18]:
# Get a list of the columns in the dataset
columns = df_transformed.columns

# Remove the target column from the list
columns.remove("label")

In [19]:
assembler = VectorAssembler(inputCols=columns, outputCol='features')
data_transformed = assembler.transform(df_transformed)

In [20]:
# Prepare the final dataset
dataset = data_transformed.select('features', 'label')  # Ensure you have 'features' and 'label' columns

In [21]:
train_data, test_data = dataset.randomSplit([0.8, 0.2], seed=42)

## Training the baseline model and no class imbalance handling or hyperparameter tuning

In [16]:
gbt_baseline = GBTClassifier(labelCol="label", seed=42, featuresCol="features")

In [18]:
baseline_model = gbt_baseline.fit(train_data)

24/12/03 01:25:42 WARN MemoryStore: Not enough space to cache rdd_37_1 in memory! (computed 7.9 MiB so far)
24/12/03 01:25:42 WARN MemoryStore: Not enough space to cache rdd_37_7 in memory! (computed 7.9 MiB so far)
24/12/03 01:25:42 WARN MemoryStore: Not enough space to cache rdd_37_4 in memory! (computed 5.2 MiB so far)
24/12/03 01:25:42 WARN MemoryStore: Not enough space to cache rdd_37_8 in memory! (computed 7.9 MiB so far)
24/12/03 01:25:42 WARN MemoryStore: Not enough space to cache rdd_37_3 in memory! (computed 7.9 MiB so far)
24/12/03 01:25:42 WARN MemoryStore: Not enough space to cache rdd_37_9 in memory! (computed 7.9 MiB so far)
24/12/03 01:25:42 WARN MemoryStore: Not enough space to cache rdd_37_11 in memory! (computed 5.2 MiB so far)
24/12/03 01:25:42 WARN MemoryStore: Not enough space to cache rdd_37_0 in memory! (computed 7.9 MiB so far)
24/12/03 01:25:42 WARN MemoryStore: Not enough space to cache rdd_37_10 in memory! (computed 7.9 MiB so far)
24/12/03 01:25:42 WARN Mem

In [20]:
baseline_predictions = baseline_model.transform(test_data)

In [21]:
evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")

# Calculate accuracy
auc_baseline = evaluator.evaluate(baseline_predictions)
print(f"AUC = {auc_baseline:.2f}")

                                                                                

AUC = 0.86


In [24]:
results = baseline_predictions.withColumn("prediction", F.col("prediction").cast("double"))

tp = results.filter((F.col("label") == 1) & (F.col("prediction") == 1)).count()
tn = results.filter((F.col("label") == 0) & (F.col("prediction") == 0)).count()
fp = results.filter((F.col("label") == 0) & (F.col("prediction") == 1)).count()
fn = results.filter((F.col("label") == 1) & (F.col("prediction") == 0)).count()

confusion_matrix_baselinemodel = pd.DataFrame(
    [[tn, fp], [fn, tp]],
    index=["Actual Negative", "Actual Positive"],
    columns=["Predicted Negative", "Predicted Positive"]
)

print(confusion_matrix_baselinemodel)

[Stage 446:>                                                      (0 + 12) / 12]

                 Predicted Negative  Predicted Positive
Actual Negative              197501                   3
Actual Positive                2189                   7


                                                                                

In [23]:
# Recall, precision and f1-score
recall = tp / (tp + fn)
precision = tp / (tp + fp)
f1_score = 2 * (precision * recall) / (precision + recall)

print(f"Recall: {recall:.2f}")
print(f"Precision: {precision:.2f}")
print(f"F1 Score: {f1_score:.2f}")

Recall: 0.00
Precision: 0.70
F1 Score: 0.01


We can see that the model is totally biased towards the majority class. This is because the dataset is imbalanced. We can handle this by using weights or by undersampling the majority class.

## Using weights to handle class imbalance

In [29]:
total_count = df_transformed.count()
pos_count = df_transformed.filter(col("label") == 1).count()
neg_count = total_count - pos_count

# Compute class weights
pos_weight = total_count / (2.0 * pos_count)
neg_weight = total_count / (2.0 * neg_count)

# Add the weight column to the dataset
df_withWeights = df_transformed.withColumn(
    "weight", when(col("label") == 1, pos_weight).otherwise(neg_weight)
)

                                                                                

In [30]:
columns = df_withWeights.columns

# Remove the target column and weight column from the list of feature columns
columns.remove("label")
columns.remove("weight")

# Assemble features
assembler = VectorAssembler(inputCols=columns, outputCol="features")
data_transformed = assembler.transform(df_withWeights)

# Prepare the final dataset
dataset = data_transformed.select("features", "label", "weight")  # Include 'weight' column

# Split into training and testing datasets
train_data, test_data = dataset.randomSplit([0.8, 0.2], seed=42)

In [31]:
gbt_withWeights = GBTClassifier(featuresCol="features", labelCol="label", weightCol="weight", maxIter=30, maxDepth=3, stepSize=0.3, seed=42)

model_withWeights = gbt_withWeights.fit(train_data)

# We will not execute it because we already did it
'''
# paramGrid = ParamGridBuilder() \
#     .addGrid(gbt_withWeights.maxDepth, [3, 5, 7]) \
#     .addGrid(gbt_withWeights.maxIter, [10, 20, 30]) \
#     .addGrid(gbt_withWeights.stepSize, [0.1, 0.2, 0.3]) \
#     .build()
# 
# evaluator = BinaryClassificationEvaluator(
#     labelCol="label",
#     rawPredictionCol="rawPrediction",
#     metricName="areaUnderROC"  # Use AUC as the metric
# )
# 
# crossval = CrossValidator(
#     estimator=gbt_withWeights,
#     estimatorParamMaps=paramGrid,
#     evaluator=evaluator,
#     numFolds=3,  # Use 3-fold cross-validation
#     parallelism=2  # Number of threads to use
# )
# 
# cvModel = crossval.fit(train_data)
# 
# bestModel = cvModel.bestModel
# print(f"Best maxDepth: {bestModel.getMaxDepth()}")
# print(f"Best maxIter: {bestModel.getMaxIter()}")
# print(f"Best stepSize: {bestModel.getStepSize()}")
'''

predictions_withWeights = model_withWeights.transform(test_data)
roc_auc_weights = evaluator.evaluate(predictions_withWeights)
print(f"Test AUC: {roc_auc_weights}")

24/12/03 02:05:26 WARN MemoryStore: Not enough space to cache rdd_1077_11 in memory! (computed 5.2 MiB so far)
24/12/03 02:05:26 WARN MemoryStore: Not enough space to cache rdd_1077_6 in memory! (computed 5.2 MiB so far)
24/12/03 02:05:26 WARN MemoryStore: Not enough space to cache rdd_1077_1 in memory! (computed 5.2 MiB so far)
24/12/03 02:05:26 WARN MemoryStore: Not enough space to cache rdd_1077_5 in memory! (computed 5.2 MiB so far)
24/12/03 02:05:26 WARN MemoryStore: Not enough space to cache rdd_1077_0 in memory! (computed 7.9 MiB so far)
24/12/03 02:05:26 WARN MemoryStore: Not enough space to cache rdd_1077_3 in memory! (computed 5.2 MiB so far)
24/12/03 02:05:26 WARN MemoryStore: Not enough space to cache rdd_1077_7 in memory! (computed 5.2 MiB so far)
24/12/03 02:05:26 WARN MemoryStore: Not enough space to cache rdd_1077_8 in memory! (computed 7.9 MiB so far)
24/12/03 02:05:26 WARN MemoryStore: Not enough space to cache rdd_1077_10 in memory! (computed 7.9 MiB so far)
24/12/03

Test AUC: 0.8855868299215742


In [32]:
results = predictions_withWeights.withColumn("prediction", F.col("prediction").cast("double"))

tp = results.filter((F.col("label") == 1) & (F.col("prediction") == 1)).count()
tn = results.filter((F.col("label") == 0) & (F.col("prediction") == 0)).count()
fp = results.filter((F.col("label") == 0) & (F.col("prediction") == 1)).count()
fn = results.filter((F.col("label") == 1) & (F.col("prediction") == 0)).count()

confusion_matrix_weighted = pd.DataFrame(
    [[tn, fp], [fn, tp]],
    index=["Actual Negative", "Actual Positive"],
    columns=["Predicted Negative", "Predicted Positive"]
)

print(confusion_matrix_weighted)

[Stage 671:====>                                                  (1 + 11) / 12]

                 Predicted Negative  Predicted Positive
Actual Negative              160007               37497
Actual Positive                 441                1755


                                                                                

In [33]:
# Recall, precision and f1-score
recall = tp / (tp + fn)
precision = tp / (tp + fp)
f1_score = 2 * (precision * recall) / (precision + recall)

print(f"Recall: {recall:.2f}")
print(f"Precision: {precision:.2f}")
print(f"F1 Score: {f1_score:.2f}")

Recall: 0.80
Precision: 0.04
F1 Score: 0.08


## Training model with random undersampling

In [23]:
# Under-sampling the majority class
majority_class = df_transformed.filter(df_transformed['label'] == 0)
minority_class = df_transformed.filter(df_transformed['label'] == 1)

# Setting the seed for reproducibility
seed = 42
# Downsample the majority class
majority_downsampled = majority_class.sample(False, minority_class.count() / majority_class.count(), seed)

# Combine the downsampled majority class with the minority class
df_undersampled = majority_downsampled.union(minority_class)

# Show the number of fraud and non-fraud transactions
df_undersampled.groupBy('label').count().show()

24/12/03 15:08:17 WARN MemoryStore: Not enough space to cache rdd_645_11 in memory! (computed 5.2 MiB so far)
24/12/03 15:08:17 WARN MemoryStore: Not enough space to cache rdd_645_3 in memory! (computed 7.9 MiB so far)
24/12/03 15:08:17 WARN MemoryStore: Not enough space to cache rdd_645_6 in memory! (computed 7.9 MiB so far)
24/12/03 15:08:17 WARN MemoryStore: Not enough space to cache rdd_645_10 in memory! (computed 5.2 MiB so far)
24/12/03 15:08:17 WARN MemoryStore: Not enough space to cache rdd_645_8 in memory! (computed 7.9 MiB so far)
24/12/03 15:08:17 WARN MemoryStore: Not enough space to cache rdd_645_0 in memory! (computed 5.2 MiB so far)
24/12/03 15:08:17 WARN MemoryStore: Not enough space to cache rdd_645_1 in memory! (computed 7.9 MiB so far)
24/12/03 15:08:17 WARN MemoryStore: Not enough space to cache rdd_645_5 in memory! (computed 7.9 MiB so far)
24/12/03 15:08:17 WARN MemoryStore: Not enough space to cache rdd_645_2 in memory! (computed 5.2 MiB so far)
24/12/03 15:08:17

+-----+-----+
|label|count|
+-----+-----+
|  0.0|11122|
|  1.0|11029|
+-----+-----+


In [None]:
assembler = VectorAssembler(inputCols=columns, outputCol='features')
data_transformed = assembler.transform(df_undersampled)

In [None]:
# Prepare the final dataset
dataset = data_transformed.select('features', 'label')  # Ensure you have 'features' and 'label' columns

In [None]:
train_data, test_data = dataset.randomSplit([0.8, 0.2], seed=42)

In [None]:
# Declare base gbt model for cv
gbt_undersampling = GBTClassifier(featuresCol="features", labelCol="label", maxIter=30, maxDepth=3, stepSize=0.3, seed=42)
'''
# We will not execute it because we already did it
#paramGrid = ParamGridBuilder() \
    .addGrid(gbt.maxDepth, [3, 5, 7]) \
    .addGrid(gbt.maxIter, [10, 20, 30]) \
    .addGrid(gbt.stepSize, [0.1, 0.2, 0.3]) \
    .build()

# Step 3: Set up the evaluator
evaluator = BinaryClassificationEvaluator(
    labelCol="label",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"  # Use AUC as the metric
)

# Step 4: Set up CrossValidator
crossval = CrossValidator(
    estimator=gbt,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=3,  # Use 3-fold cross-validation
    parallelism=2  # Number of threads to use
)
# Step 5: Train the model with cross-validation

# Step 6: Extract the best model and parameters
bestModel = cvModel.bestModel
print(f"Best maxDepth: {bestModel.getMaxDepth()}")
print(f"Best maxIter: {bestModel.getMaxIter()}")
print(f"Best stepSize: {bestModel.getStepSize()}")
'''

model_undersampling = gbt_undersampling.fit(train_data)

predictions_undersampling = model_undersampling.transform(test_data)

evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")

undersampled_auc = evaluator.evaluate(predictions_undersampling)

print(f"Test AUC: {undersampled_auc}")

Same hyperparameters as before but with random undersampling

In [None]:
results = predictions_undersampling.withColumn("prediction", F.col("prediction").cast("double"))

tp = results.filter((F.col("label") == 1) & (F.col("prediction") == 1)).count()
tn = results.filter((F.col("label") == 0) & (F.col("prediction") == 0)).count()
fp = results.filter((F.col("label") == 0) & (F.col("prediction") == 1)).count()
fn = results.filter((F.col("label") == 1) & (F.col("prediction") == 0)).count()

confusion_matrix_undersampling = pd.DataFrame(
    [[tn, fp], [fn, tp]],
    index=["Actual Negative", "Actual Positive"],
    columns=["Predicted Negative", "Predicted Positive"]
)

print(confusion_matrix_undersampling)

In [None]:
# Recall, precision and f1-score
recall = tp / (tp + fn)
precision = tp / (tp + fp)
f1_score = 2 * (precision * recall) / (precision + recall)

print(f"Recall: {recall:.2f}")
print(f"Precision: {precision:.2f}")
print(f"F1 Score: {f1_score:.2f}")

#Accuracy
accuracy = (tp + tn) / (tp + tn + fp + fn)
print(f"Accuracy: {accuracy:.2f}")

In [22]:
# Make predictions
predictions_undersampling = model_undersampling.transform(test_data)

# Evaluate the model
evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")

undersampled_auc = evaluator.evaluate(predictions_undersampling)

print(f"Test AUC: {undersampled_auc}")

ERROR:root:KeyboardInterrupt while sending command.               (0 + 12) / 12]
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

In [None]:
# Features importance
feature_importance = model_undersampling.featureImportances

# print the feature importance of each feature
for i, feature in enumerate(columns):
    print(f"{feature}: {feature_importance[i]}")