In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, MinMaxScaler
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier, GBTClassifier, LogisticRegression
from pyspark.sql.functions import col, round, when

In [None]:
# create a SparkSession
spark = SparkSession.builder.appName("EnsembleModel").getOrCreate()

# read the csv
data = spark.read.csv("../../../data/model_data.csv", header=True, inferSchema=True)
data.show(5)

In [None]:
# assemble the feature vector
feature_cols = [col for col in data.columns if col != "isFraud"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="rawFeatures")
data = assembler.transform(data)
data.show(5)

In [None]:
# apply MinMaxScaler for scaling the features
assembler = VectorAssembler(inputCols=["rawFeatures"], outputCol="features")
data = assembler.transform(data)

scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures", min=0.0, max=1.0)
scalerModel = scaler.fit(data)

data = scalerModel.transform(data)
data.show(5)

In [None]:
# create class weight column based on values created by compute_class_weight used in other models
data = data.withColumn('weight', when(col('isFraud') == 0, 0.5006457829418431).otherwise(387.6269799825936))
data.show(5)

In [None]:
# split into training and test
train, test = data.randomSplit([0.8, 0.2], seed=1)

In [None]:
# create an ensemble model pipeline
rf = RandomForestClassifier(labelCol="isFraud", featuresCol="scaledFeatures", numTrees=64, rawPredictionCol="rf_rawPrediction", predictionCol="rf_prediction", probabilityCol="rf_probability", seed=1,  weightCol="weight")
gbt = GBTClassifier(labelCol="isFraud", featuresCol="scaledFeatures", maxIter=20, predictionCol="gbt_prediction", seed=1,  weightCol="weight")
lr = LogisticRegression(labelCol="isFraud", featuresCol="scaledFeatures", rawPredictionCol="lr_rawPrediction", predictionCol="lr_prediction", probabilityCol="lr_probability",  weightCol="weight")

pipeline = Pipeline(stages=[rf, gbt, lr])

In [None]:
# train the ensemble model
model = pipeline.fit(train)

In [None]:
# make predictions on the test data
predictions = model.transform(test)
predictions.show(5)

In [None]:
# create column of predictions based on three models
predictions = predictions.withColumn("ensemble_prediction", round((col("lr_prediction") + col("gbt_prediction") + col("rf_prediction")) / 3))
predictions.show(5)

In [None]:
# ENSEMBLE MODEL Metrics
# Compute accuracy
total_count = predictions.count()
correct_count = predictions.filter(col("ensemble_prediction") == col("isFraud")).count()
accuracy = correct_count / total_count

# Compute recall (true positive rate)
tp = predictions.filter((col("ensemble_prediction") == 1) & (col("isFraud") == 1)).count()
fn = predictions.filter((col("ensemble_prediction") == 0) & (col("isFraud") == 1)).count()
recall = tp / (tp + fn)

# Compute precision
tp = predictions.filter((col("ensemble_prediction") == 1) & (col("isFraud") == 1)).count()
fp = predictions.filter((col("ensemble_prediction") == 1) & (col("isFraud") == 0)).count()
precision = tp / (tp + fp)
print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)


In [None]:
# LOGISTIC REGRESSION Metrics
# Compute accuracy
correct_count = predictions.filter(col("rf_prediction") == col("isFraud")).count()
accuracy = correct_count / total_count

# Compute recall (true positive rate)
tp = predictions.filter((col("rf_prediction") == 1) & (col("isFraud") == 1)).count()
fn = predictions.filter((col("rf_prediction") == 0) & (col("isFraud") == 1)).count()
recall = tp / (tp + fn)

# Compute precision
tp = predictions.filter((col("rf_prediction") == 1) & (col("isFraud") == 1)).count()
fp = predictions.filter((col("rf_prediction") == 1) & (col("isFraud") == 0)).count()
precision = tp / (tp + fp)

print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)

In [None]:
# GBT Metrics
# Compute accuracy
correct_count = predictions.filter(col("gbt_prediction") == col("isFraud")).count()
accuracy = correct_count / total_count

# Compute recall (true positive rate)
tp = predictions.filter((col("gbt_prediction") == 1) & (col("isFraud") == 1)).count()
fn = predictions.filter((col("gbt_prediction") == 0) & (col("isFraud") == 1)).count()
recall = tp / (tp + fn)

# Compute precision
tp = predictions.filter((col("gbt_prediction") == 1) & (col("isFraud") == 1)).count()
fp = predictions.filter((col("gbt_prediction") == 1) & (col("isFraud") == 0)).count()
precision = tp / (tp + fp)

print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)

In [None]:
# RandomForest Metrics
# Compute accuracy
correct_count = predictions.filter(col("lr_prediction") == col("isFraud")).count()
accuracy = correct_count / total_count

# Compute recall (true positive rate)
tp = predictions.filter((col("lr_prediction") == 1) & (col("isFraud") == 1)).count()
fn = predictions.filter((col("lr_prediction") == 0) & (col("isFraud") == 1)).count()
recall = tp / (tp + fn)

# Compute precision
tp = predictions.filter((col("lr_prediction") == 1) & (col("isFraud") == 1)).count()
fp = predictions.filter((col("lr_prediction") == 1) & (col("isFraud") == 0)).count()
precision = tp / (tp + fp)

print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)