In [238]:
# Initialize a Spark Session
from pyspark.sql import SparkSession
# Specify the number of available cores in .master()
spark = SparkSession.builder.master('local[4]').appName('Weighted Random Forest with Spark 3').getOrCreate()

# Get the data here : https://www.kaggle.com/mlg-ulb/creditcardfraud
csv_file = ".../creditcard.csv"
df = spark.read.format("csv").option("inferSchema", "true").option("header", "true").load(csv_file)

In [239]:
print(df)

DataFrame[Time: double, V1: double, V2: double, V3: double, V4: double, V5: double, V6: double, V7: double, V8: double, V9: double, V10: double, V11: double, V12: double, V13: double, V14: double, V15: double, V16: double, V17: double, V18: double, V19: double, V20: double, V21: double, V22: double, V23: double, V24: double, V25: double, V26: double, V27: double, V28: double, Amount: double, Class: int]


The 'Time' column is to be deleted since it 'contains the seconds elapsed between each transaction and the first transaction' : irrelevant in my opinion. We also rename columns for our taste :)

In [240]:
df = df.drop('Time').withColumnRenamed('Amount', 'amount').withColumnRenamed('Class', 'outcome')

In [241]:
# Check imbalance and compute weights
import pandas as pd
counts = df.groupBy('outcome').count().toPandas()
print(counts)

   outcome   count
0        1     492
1        0  284315


We only have 492 frauds out of 284807 transactions. A rather imbalanced dataset indeed. This is the reason why we compute a weight for each observation, according to its class (i.e fraud / not fraud). We will use the following popular method, even though there seems to be no strong consensus at the moment among the ML community regarding this subject : $$w_i := \frac{n}{n_i * C}$$

Where $C$ is the number of classes (today, $C = 2$), $i \in {1...C}$, $n$ is the total number of observations and $n_i$ the number of observations of class $i$.

In [242]:
# Counts
count_fraud = counts[counts['outcome']==1]['count'].values[0]
count_total = counts['count'].sum()

# Weights
c = 2
weight_fraud = count_total / (c * count_fraud)
weight_no_fraud = count_total / (c * (count_total - count_fraud))

# Append weights to the dataset
from pyspark.sql.functions import col
from pyspark.sql.functions import when

df = df.withColumn("weight", when(col("outcome") ==1, weight_fraud).otherwise(weight_no_fraud))

In [243]:
# Check everything seems ok
df.select('outcome', 'weight').where(col('outcome')==1).show(3)

+-------+-----------------+
|outcome|           weight|
+-------+-----------------+
|      1|289.4380081300813|
|      1|289.4380081300813|
|      1|289.4380081300813|
+-------+-----------------+
only showing top 3 rows



In [244]:
df.select('outcome', 'weight').where(col('outcome')==0).show(3)

+-------+------------------+
|outcome|            weight|
+-------+------------------+
|      0|0.5008652375006595|
|      0|0.5008652375006595|
|      0|0.5008652375006595|
+-------+------------------+
only showing top 3 rows



In [280]:
df.describe().toPandas()

Unnamed: 0,summary,V1,V2,V3,V4,V5,V6,V7,V8,V9,...,V22,V23,V24,V25,V26,V27,V28,amount,outcome,weight
0,count,284807.0,284807.0,284807.0,284807.0,284807.0,284807.0,284807.0,284807.0,284807.0,...,284807.0,284807.0,284807.0,284807.0,284807.0,284807.0,284807.0,284807.0,284807.0,284807.0
1,mean,2.235360406313924e-15,6.865749819392767e-17,-5.824710544452282e-15,2.011824365682532e-15,3.704311530463074e-15,1.1400338072201013e-15,-1.1496139232471606e-16,-2.9538691083434004e-16,-2.082078549880969e-15,...,-7.664092821647739e-16,2.9538691083434004e-16,4.454953538333313e-15,1.034652530922445e-15,1.8082469001075134e-15,-3.523686426202886e-16,-1.1336470632020615e-16,88.3496192509521,0.00172748563062,1.0000000000007845
2,stddev,1.9586958038574904,1.6513085794769995,1.5162550051777732,1.415868574940927,1.380246734031437,1.3322710897575714,1.2370935981826632,1.1943529026692048,1.0986320892243222,...,0.7257015604409107,0.6244602955949898,0.6056470678271603,0.5212780705409427,0.4822270132610566,0.4036324949650313,0.3300832641602503,250.1201092401885,0.0415271896354649,11.998747525629687
3,min,-56.407509631329,-72.7157275629303,-48.3255893623954,-5.68317119816995,-113.743306711146,-26.1605059358433,-43.5572415712451,-73.2167184552674,-13.4340663182301,...,-10.933143697655,-44.8077352037913,-2.83662691870341,-10.2953970749851,-2.60455055280817,-22.5656793207827,-15.4300839055349,0.0,0.0,0.5008652375006595
4,max,2.45492999121121,22.0577289904909,9.38255843282114,16.8753440335975,34.8016658766686,73.3016255459646,120.589493945238,20.0072083651213,15.5949946071278,...,10.5030900899454,22.5284116897749,4.58454913689817,7.51958867870916,3.5173456116238,31.6121981061363,33.8478078188831,25691.16,1.0,289.4380081300813


In [245]:
# Split the dataset into train and test subsets
train, test = df.randomSplit([.8, .2], seed = 0)

In [246]:
print(f"""There are {train.count()} rows in the train set, and {test.count()} in the test set""")

There are 227805 rows in the train set, and 57002 in the test set


In [247]:
test.where(col('outcome')==1).count()

101

In [248]:
df.schema.names

['V1',
 'V2',
 'V3',
 'V4',
 'V5',
 'V6',
 'V7',
 'V8',
 'V9',
 'V10',
 'V11',
 'V12',
 'V13',
 'V14',
 'V15',
 'V16',
 'V17',
 'V18',
 'V19',
 'V20',
 'V21',
 'V22',
 'V23',
 'V24',
 'V25',
 'V26',
 'V27',
 'V28',
 'amount',
 'outcome',
 'weight']

In [249]:
# Format the data for MLlib models
from pyspark.ml.feature import VectorAssembler
vector_assembler = VectorAssembler(inputCols=df.schema.names[:-2], outputCol="features")
train = vector_assembler.transform(train)
test = vector_assembler.transform(test)

In [250]:
train.select('features', 'weight', 'outcome').show(3)

+--------------------+------------------+-------+
|            features|            weight|outcome|
+--------------------+------------------+-------+
|[-56.407509631329...|0.5008652375006595|      0|
|[-36.802319908874...|0.5008652375006595|      0|
|[-34.148233651352...|0.5008652375006595|      0|
+--------------------+------------------+-------+
only showing top 3 rows



In [265]:
from pyspark.ml.classification import RandomForestClassifier

# Random Forest without weights
rf = RandomForestClassifier(numTrees = 200, featuresCol='features', labelCol='outcome', seed=0)
rf = rf.fit(train)

In [252]:
# Random Forest with weights
rfw = RandomForestClassifier(numTrees = 200, featuresCol='features', labelCol='outcome', weightCol='weight', seed=0)
rfw = rfw.fit(train)

In [253]:
# Logistic Regression without weights
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol='features', labelCol='outcome')
lr = lr.fit(train)

In [254]:
# Logistic Regression with weights
lrw = LogisticRegression(featuresCol='features', labelCol='outcome', weightCol='weight')
lrw = lrw.fit(train)

In [255]:
# Predict the outcome for the test set using the four different models computed above
res_rf = rf.transform(test)
res_rfw = rfw.transform(test)
res_lr = lr.transform(test)
res_lrw = lrw.transform(test)

In [256]:
# Let us have a look at the confusion matrices on the test set

# Random Forest without weights
res_rf.groupBy('outcome', 'prediction').count().show()

+-------+----------+-----+
|outcome|prediction|count|
+-------+----------+-----+
|      1|       0.0|   22|
|      0|       0.0|56891|
|      1|       1.0|   79|
|      0|       1.0|   10|
+-------+----------+-----+



In [257]:
# Random Forest with weights
res_rfw.groupBy('outcome', 'prediction').count().show()

+-------+----------+-----+
|outcome|prediction|count|
+-------+----------+-----+
|      1|       0.0|   11|
|      0|       0.0|56790|
|      1|       1.0|   90|
|      0|       1.0|  111|
+-------+----------+-----+



In [258]:
# Logistic Regression without weights
res_lr.groupBy('outcome', 'prediction').count().show()

+-------+----------+-----+
|outcome|prediction|count|
+-------+----------+-----+
|      1|       0.0|   35|
|      0|       0.0|56890|
|      1|       1.0|   66|
|      0|       1.0|   11|
+-------+----------+-----+



In [259]:
# Logistic Regression with weights
res_lrw.groupBy('outcome', 'prediction').count().show()

+-------+----------+-----+
|outcome|prediction|count|
+-------+----------+-----+
|      1|       0.0|    7|
|      0|       0.0|55539|
|      1|       1.0|   94|
|      0|       1.0| 1362|
+-------+----------+-----+



In [260]:
# Compute the area under the PR curve
from pyspark.ml.evaluation import BinaryClassificationEvaluator

pr = BinaryClassificationEvaluator(rawPredictionCol = 'prediction', labelCol="outcome", metricName="areaUnderPR")
pr.evaluate(res_rf)

0.7911597128082142

In [261]:
pr.evaluate(res_rfw)

0.42347464655392536

In [262]:
pr.evaluate(res_lr)

0.7089350124294008

In [263]:
pr.evaluate(res_lrw)

0.062384597959713274