# Prepare Spark

Upload the fraud data to google colab and then use the command below to unzip

In [None]:
!unzip fraud\ test.csv.zip

Archive:  fraud test.csv.zip
  inflating: fraud test.csv          


Install Spark

In [None]:
!apt-get update -qq > /dev/null
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.4.2/spark-3.4.2-bin-hadoop3.tgz
!tar xf spark-3.4.2-bin-hadoop3.tgz
!pip install -q findspark

After installing spark and java, set the environment path to run pyspark in colab environment

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.2-bin-hadoop3"

Then start a spark session since we want to use spark SQL and data frames to preprocess the data and for visualization

In [None]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
import random

spark = SparkSession.builder.appName("YourTest").master("local[2]").config('spark.ui.port', random.randrange(4000,5000)).getOrCreate()

# Data Preprocessing and Visualization

## Preprocess the Data

Read the data to a spark data frame and modify column names. Also check the schema.

In [None]:
fraud_raw = spark.read.csv("fraud test.csv", header = True, inferSchema = True)

fraud = fraud_raw.withColumnRenamed("_c0", "instance_id")\
                 .withColumnRenamed("cc_num", "cust_id")\
                 .withColumnRenamed("trans_num", "trans_id")

fraud.show()
fraud.schema

+-----------+---------------------+----------+--------------------+--------------+------+---------+--------+------+--------------------+-------------+-----+-----+-------+---------+--------+--------------------+----------+--------------------+----------+---------+-----------+--------+
|instance_id|trans_date_trans_time|   cust_id|            merchant|      category|   amt|    first|    last|gender|              street|         city|state|  zip|    lat|     long|city_pop|                 job|       dob|            trans_id| unix_time|merch_lat| merch_long|is_fraud|
+-----------+---------------------+----------+--------------------+--------------+------+---------+--------+------+--------------------+-------------+-----+-----+-------+---------+--------+--------------------+----------+--------------------+----------+---------+-----------+--------+
|          0|     21/06/2020 12:14|2.29116E15|fraud_Kirlin and ...| personal_care|  2.86|     Jeff| Elliott|     M|   351 Darlene Green|     Colu

StructType([StructField('instance_id', IntegerType(), True), StructField('trans_date_trans_time', StringType(), True), StructField('cust_id', DoubleType(), True), StructField('merchant', StringType(), True), StructField('category', StringType(), True), StructField('amt', DoubleType(), True), StructField('first', StringType(), True), StructField('last', StringType(), True), StructField('gender', StringType(), True), StructField('street', StringType(), True), StructField('city', StringType(), True), StructField('state', StringType(), True), StructField('zip', IntegerType(), True), StructField('lat', DoubleType(), True), StructField('long', DoubleType(), True), StructField('city_pop', IntegerType(), True), StructField('job', StringType(), True), StructField('dob', StringType(), True), StructField('trans_id', StringType(), True), StructField('unix_time', IntegerType(), True), StructField('merch_lat', DoubleType(), True), StructField('merch_long', DoubleType(), True), StructField('is_fraud'

Now we use the column in the data called "unix_time" which gives us the unix time for transaction date to use in the function "from_unixtime" in spark SQL to convert to date format "yyyyMMdd HHmmss". This is useful because this format is safe for many data types in all languages but if I use the format provided in trans_date_trans_time variable which is "ddMMyyyy HHmm", it is more complicated to work around with in SQL or python because it is not standard.

In [None]:
from pyspark.sql import functions as f

# Drop unix time since we used it and no longer need to use it again
fraud = fraud.withColumn("trans_date_trans_time", f.from_unixtime("unix_time"))\
             .drop("unix_time").cache()

fraud.show()

+-----------+---------------------+----------+--------------------+--------------+------+---------+--------+------+--------------------+-------------+-----+-----+-------+---------+--------+--------------------+----------+--------------------+---------+-----------+--------+
|instance_id|trans_date_trans_time|   cust_id|            merchant|      category|   amt|    first|    last|gender|              street|         city|state|  zip|    lat|     long|city_pop|                 job|       dob|            trans_id|merch_lat| merch_long|is_fraud|
+-----------+---------------------+----------+--------------------+--------------+------+---------+--------+------+--------------------+-------------+-----+-----+-------+---------+--------+--------------------+----------+--------------------+---------+-----------+--------+
|          0|  2013-06-21 12:14:25|2.29116E15|fraud_Kirlin and ...| personal_care|  2.86|     Jeff| Elliott|     M|   351 Darlene Green|     Columbia|   SC|29209|33.9659| -80.935

## Visualizations

### Visualizing Fraud over Months

Since the transaction dates are all within a year, what matters is the trend during the months so we use spark sql to analyze the fraud within each month.

In [None]:
fraud.createOrReplaceTempView("fraud")

# Get number of transactions per month
fraud_per_month_1 = spark.sql("SELECT M.Month, COUNT(M.Month) AS num_transactions \
FROM (SELECT MONTH(trans_date_trans_time) as Month FROM fraud) M GROUP BY M.Month")

# Get percentage of fraud within each month
fraud_per_month_2 = spark.sql("SELECT M.Month, AVG(M.is_fraud)*100 as \
fraud_percentage FROM (SELECT *, MONTH(trans_date_trans_time) \
AS Month FROM fraud) M GROUP BY M.Month")

# Get total fraud amount within each month
fraud_per_month_3 = spark.sql("SELECT Q.Month, SUM(Q.amt) AS total_fraud_amt \
FROM (SELECT MONTH(P.trans_date_trans_time) AS Month, P.amt FROM \
(SELECT * FROM fraud WHERE is_fraud = 1) P) Q GROUP BY Q.Month")

# Now join the 3 summaries of fraud above to one data frame and
# get a new variable called avg_fraud_per_transaction which gives what the
# name says within each month. It is total_fraud_amt/num_transactions
fraud_per_month = fraud_per_month_1.join(fraud_per_month_2, on = "Month")\
                                   .join(fraud_per_month_3, on = "Month")\
                                   .withColumn("avg_fraud_per_transaction",\
          fraud_per_month_3.total_fraud_amt/fraud_per_month_1.num_transactions)\
          .select("Month", "fraud_percentage", "avg_fraud_per_transaction")\
          .sort("Month", ascending = True)

In [None]:
spark.sql("SELECT is_fraud, count(is_fraud) FROM fraud GROUP BY is_fraud ").show()

+--------+---------------+
|is_fraud|count(is_fraud)|
+--------+---------------+
|       1|           2145|
|       0|         553574|
+--------+---------------+



Now we display the two variables which are fraud percentage (i.e. percentage of fraud within this month) per month and average fraud per transaction each month.

In [None]:
fraud_per_month.show()

+-----+------------------+-------------------------+
|Month|  fraud_percentage|avg_fraud_per_transaction|
+-----+------------------+-------------------------+
|    6|0.4424778761061947|       2.4377846164082775|
|    7|0.3739166899636567|       1.8482607632093928|
|    8|0.4675582194481686|       2.3522733469281993|
|    9|0.4889764572217508|       2.9151768225159307|
|   10|0.5537290188613947|        2.820167416508044|
|   11|0.4047635437461279|       2.1089308184759425|
|   12|0.1848958706588886|       1.0114712838079947|
+-----+------------------+-------------------------+



Now we will use python library with jupyter notebook to do some visualization. The plotting library to be used is plotly to plot a line plot of fraud_percentage and avg_fraud_per_transaction each month.

Since fraud_percentage and avg_fraud_per_transaction vary quite a bit, we first normalize them so that the plot looks nicer and easier to interpret.

Then we use toPandas() to convert the spark data frame to pandas data frame since Spark SQL is integrated with pandas. This is so that we can use the plotly library to plot as it requires a pandas data frame. The Pandas data frame is really small as it only contains 7 months so there should be no memory issue for the driver program. In reality when the data is bigger, we can do the same procedure to summarize the fraud data each month using spark. By the time all the spark operations are finished, the data frame is small enough to turn into pandas data frame since it only contains 7 rows (i.e. 7 months) and 3 columns.

In [None]:
import pandas as pd
import plotly.graph_objects as go

# Minimum and Maximum values are given in the table above

fraud_plot = fraud_per_month.withColumn("normalize_fraud_percent",
                           (fraud_per_month.fraud_percentage - 0.1848958706588886)/\
                            (0.5537290188613947-0.1848958706588886))\
                            .withColumn("normalize_avg_fraud_per_tran", \
                             (fraud_per_month.avg_fraud_per_transaction - 1.0114712838079947)/\
                              (2.9151768225159307-1.0114712838079947))\
                              .toPandas()

# Plot the multiple line plot
data = [go.Scatter(x=fraud_plot.Month, y = fraud_plot.normalize_fraud_percent,\
  name = "% of Fraud during <br> Current Month", text = fraud_plot.fraud_percentage),\
        go.Scatter(x=fraud_plot.Month, y = fraud_plot.normalize_avg_fraud_per_tran,\
                   name = "Average Fraud Amount <br> per Transaction", text = fraud_plot.avg_fraud_per_transaction)]

fig = go.Figure(data, layout=go.Layout(title=go.layout.Title(text="Quantity of Fraud over Months"), xaxis_title = "Month"))
fig.update_layout(width=1000, height=600)
fig.show()

The above interactive plot shows us the relationship between percentage of fraud cases and average fraud amount per transaction each month. It is not necessarily true that high fraud percentage corresponds to high fraud amount per transaction. For example, in month 10, fraud percentage is the highest but average fraud amount per transaction is lower than month 9. This means although month 9 had a lower fraud percentage, the individual fraud amounts were big enough so that its average fraud per transaction is higher.

The reason for this visualization is so that we can justify picking the testing set to live stream and predict later on. From the plot we see that month December has the lowest fraud percentage and also the lowest average fraud amount per transaction (i.e. the red and blue line dips the lowest). This means fraud is the lowest during the month of December since not only is the proportion of fraud cases low, the transaction amounts of fraud cases are also low. This means most of the fraud is during the months June to November.

To make the situation more realistic we should choose the training set to be June to November since it contains most of the fraud where as December contains the least fraud. In reality, we would have a lot of fraud data to learn from so it makes sense to choose June to November as training set as it contains most of the fraud. Where as picking December as the test set also makes sense since it is during a time after training (i.e. June - November).

### Visualizing Number of Distinct Merchants Transacted

In [None]:
import plotly.express as px

# Get number of distinct merchants visited for each unique customer
num_unique_merch = spark.sql("SELECT cust_id, COUNT(DISTINCT(merchant)) as num_unique_merchant \
FROM fraud GROUP BY cust_id ORDER BY num_unique_merchant DESC").select("num_unique_merchant").toPandas()

# Plot the Histogram
fig = px.histogram(num_unique_merch, "num_unique_merchant", \
                   title = "Distribution of Distinct Merchants Visited for each Unique Customer",\
                   labels = {"num_unique_merchant": "Number of Distinct Merchants visited"})
fig.show()

Basically every customer has transacted with more than 100 different merchants. The histogram exhibits a pattern where the frequency decreases after each spike and there are multiple spikes. To further analyze the pattern of different number of merchants being visited, we plot a heat map of the total number of distinct merchants visited within each month and hour.

In [None]:
# Get the number of distinct merchants visited by all customers within each month
# and hour
Month_hour_merch = spark.sql("SELECT M.Month, M.Hour, COUNT(DISTINCT(M.merchant)) AS num_unique_merch \
FROM (SELECT merchant, MONTH(trans_date_trans_time) AS Month, HOUR(trans_date_trans_time) \
AS Hour FROM fraud) M GROUP BY M.Month, M.Hour").toPandas()

# Plot the Heatmap
df = Month_hour_merch.pivot(index = "Month", columns = "Hour")["num_unique_merch"]

fig = px.imshow(df, x=df.columns, y = df.index, \
                labels = dict(color = "Number of Distinct <br> Merchants Transacted"),\
                title = "Number of Distinct Merchants <br> Transacted during different <br>  Months & Hours")
fig.update_layout(width=1000, height=400)
fig.show()

There is a clear symmetry in the heatmap showing that the number of distinct merchants transacted increases by a huge amount during hour 12 and after, regardless of month. The solid color before (purple) and after (yellow) hour 12 shows that the number of distinct merchants transacted stays consistent throughout the hours and only depends on before or after hour 12. Before hour 12 there is around 400 different merchants transacted regardless of month (except june because it has very little transactions) and after hour 12 there is around 500 different merchants transacted. Since during and after hour 12, there are 100 more merchants that are possible to transact with, the expected payout of fraud could increase.

To confirm if exposing to more merchants increases our expected payout of fraud, we plot histograms of expected fraud amount for before and after hour 12. The expected fraud amount for each merchant is the probability of fraud times the total amount of fraud transactions.

In [None]:
before_12 = spark.sql("SELECT merchant, amt, is_fraud FROM fraud WHERE HOUR(trans_date_trans_time) < 12")

after_12 = spark.sql("SELECT merchant, amt, is_fraud FROM fraud WHERE HOUR(trans_date_trans_time) >= 12")

before_12.createOrReplaceTempView("before_12")
after_12.createOrReplaceTempView("after_12")

before_12_fraud_total = spark.sql("SELECT F.merchant, SUM(F.amt) as total_fraud FROM \
(SELECT * FROM before_12 where is_fraud = 1) F GROUP BY F.merchant")

after_12_fraud_total = spark.sql("SELECT F.merchant, SUM(F.amt) as total_fraud FROM \
(SELECT * FROM after_12 where is_fraud = 1) F GROUP BY F.merchant")

before_12_prob_fraud = spark.sql("SELECT merchant, AVG(is_fraud) as prob_fraud \
FROM before_12 GROUP BY merchant")

after_12_prob_fraud = spark.sql("SELECT merchant, AVG(is_fraud) as prob_fraud \
FROM after_12 GROUP BY merchant")

before_12_pd = before_12_fraud_total.join(before_12_prob_fraud, "merchant")\
                                    .withColumn("expected_fraud", \
                                 before_12_fraud_total.total_fraud*\
                                 before_12_prob_fraud.prob_fraud)\
                                    .select("expected_fraud")\
                                    .toPandas()

after_12_pd = after_12_fraud_total.join(after_12_prob_fraud, "merchant")\
                                  .withColumn("expected_fraud", \
                                              after_12_fraud_total.total_fraud*\
                                              after_12_prob_fraud.prob_fraud)\
                                              .select("expected_fraud")\
                                              .toPandas()

data = [go.Histogram(x = before_12_pd.expected_fraud, \
                     name = "before hour 12", histnorm="probability density"), \
        go.Histogram(x = after_12_pd.expected_fraud, \
                     name = "after hour 12", opacity = 0.5, histnorm="probability density")]

fig = go.Figure(data, layout=go.Layout(barmode='overlay', \
                                       title=go.layout.Title(text="Probability Distribution of Expected Fraud Amount Before/After Hour 12"), \
                                       xaxis_title = "Expected Fraud"))

fig.update_layout(width=1000, height=600)
fig.show()

Based on the probability distribution above of expected fraud amounts, if we only know the hour of transaction is during or after hour 12 and the transaction could be with any merchant, the expected fraud payment is much higher than compared to knowing the transaction is before hour 12. This is exhibited by the red histogram above where there is probability mass even for really high expected fraud amounts.

### Visualizing total fraud amount for different categories

In [None]:
category_pd = spark.sql("SELECT F.category, SUM(F.amt) AS total_amt FROM \
(SELECT category, amt FROM fraud WHERE is_fraud = 1) F GROUP BY F.category").toPandas()

fig = px.pie(category_pd, values = "total_amt", names = "category", color = "category",\
             title = "Total Fraud Amount for Different Transaction Types")

fig.update_layout(width=800, height=500)
fig.show()

From the total amount of fraud in value, shopping_net takes up 44.4%, followed by misc_net taking up 18.9%, then shopping_pos taking up 16.7% and grocery_pos taking up 13.4%.

### Visualizing Total Fraud Amounts from Customers with Fraudulent Transactions

In [None]:
fraud_amt = spark.sql("SELECT SUM(F.fraud_amt) AS total_fraud FROM \
(SELECT cust_id, amt AS fraud_amt FROM fraud WHERE is_fraud = 1) F \
GROUP BY F.cust_id").toPandas()

fig = px.histogram(fraud_amt, "total_fraud", \
                   title = "Distribution of Total Fraud for Customers with Fraudulent Transactions",\
                   labels = {"total_fraud": "Total Fraud"})
fig.show()

Most customers with fraudulent transactions have total fraud amount around 4k to 7k. While more extreme cases of total fraud amount exceeds 14k. There is exactly one customer with more than 14k total fraud amount by hovering over the last block in the jupyter notebook.

# Feature Engineering

Now we add additional variables to include more realistic measures like knowledge of past transaction details. We also drop the variables that are not important or redundant like ID, names, and street addresses.


**New variables:**
*   Age of credit card holder
*   Hour of day during transaction
*   Average Spent Shopping on the Internet
*   Number of distinct merchants transacted with
*   Customer Average Transaction Amount




In [None]:
add_age = fraud.withColumn("age", f.to_date(fraud.dob, "dd/MM/yyyy"))\
               .withColumn("age", f.year("age"))

add_age = add_age.withColumn("age", 2020 - add_age.age).select("instance_id", "age")

add_hour = spark.sql("SELECT instance_id, HOUR(trans_date_trans_time) AS hour FROM fraud")

add_shop = spark.sql("SELECT cust_id, AVG(amt) as average_spent_shop \
FROM fraud WHERE category = 'shopping_net' GROUP BY cust_id")

add_merchant = spark.sql("SELECT cust_id, COUNT(DISTINCT(merchant)) as \
num_merch FROM fraud GROUP BY cust_id")

add_avgtran = spark.sql("SELECT cust_id, AVG(amt) as avg_transaction FROM \
fraud GROUP BY cust_id")

fraud_data = fraud.join(add_age, "instance_id").join(add_hour, "instance_id")\
                                  .join(add_shop, "cust_id")\
                                  .join(add_merchant, "cust_id")\
                                  .join(add_avgtran, "cust_id")\
                                  .drop("cust_id")\
                                  .drop("instance_id")\
                                  .drop("first")\
                                  .drop("last")\
                                  .drop("street")\
                                  .drop("state")\
                                  .drop("zip")\
                                  .drop("city_pop")\
                                  .drop("dob")\
                                  .drop("trans_id")\
                                  .drop("city")\
                                  .drop("merchant")\
                                  .drop("job")

fraud_data.createOrReplaceTempView("fraud_data")
fraud_data.show()

+---------------------+--------------+------+------+-------+--------+---------+----------+--------+---+----+------------------+---------+------------------+
|trans_date_trans_time|      category|   amt|gender|    lat|    long|merch_lat|merch_long|is_fraud|age|hour|average_spent_shop|num_merch|   avg_transaction|
+---------------------+--------------+------+------+-------+--------+---------+----------+--------+---+----+------------------+---------+------------------+
|  2013-06-21 12:39:56|   food_dining| 60.07|     M|44.8955|-85.4082|45.877554|-85.362472|       0| 34|  12| 271.5626086956522|      185|139.51893023255815|
|  2013-06-21 14:38:24|  shopping_pos|  3.17|     M|41.0935|-81.0425|40.859421|-80.528177|       0| 39|  14|18.358666666666668|      191| 50.83199095022624|
|  2013-06-21 18:58:27| personal_care| 70.37|     M|44.8955|-85.4082|44.706944|-84.944875|       0| 34|  18| 271.5626086956522|      185|139.51893023255815|
|  2013-06-22 04:53:44|      misc_pos| 35.55|     M|44.895

# Creating Training and Test Data

The testing data set which is for the month of December is approximately 25% of the entire data set.

To summarize, the numerical predictors are

*   amount of transaction (amt)
*   latitude of card holder's location (lat)
*   longitude of card holder's location (long)
*   latitude of merchant's location (merch_lat)
*   longitude of merchant's location (merch_long)
*   age of card holder (age)
*   hour of transaction (hour)
*   average spent on shopping online (average_spent_shop)
*   number of distinct merchants transacted with (num_merch)
*   average transaction amount (avg_transaction)

and the categorical predictors are

*   Category of transaction (category)
*   Gender (gender)



In [None]:
train_data = spark.sql("SELECT * FROM fraud_data WHERE MONTH(trans_date_trans_time) < 12")

train_data = train_data.drop("trans_date_trans_time")\
                       .withColumnRenamed("is_fraud", "label")\
                       .cache()

test_data = spark.sql("SELECT * FROM fraud_data WHERE MONTH(trans_date_trans_time) = 12")

test_data = test_data.drop("trans_date_trans_time")\
                     .withColumnRenamed("is_fraud", "label")\
                     .cache()

print(train_data.filter("is_fraud = 1").count())
print(test_data.filter("is_fraud = 1").count())
print(train_data.count())
print(test_data.count())
train_data.show()


1887
258
416181
139538
+--------------+------+------+-------+--------+---------+----------+-----+---+----+------------------+---------+------------------+
|      category|   amt|gender|    lat|    long|merch_lat|merch_long|label|age|hour|average_spent_shop|num_merch|   avg_transaction|
+--------------+------+------+-------+--------+---------+----------+-----+---+----+------------------+---------+------------------+
|   food_dining| 60.07|     M|44.8955|-85.4082|45.877554|-85.362472|    0| 34|  12| 271.5626086956522|      185|139.51893023255815|
| personal_care| 70.37|     M|44.8955|-85.4082|44.706944|-84.944875|    0| 34|  18| 271.5626086956522|      185|139.51893023255815|
|      misc_pos| 35.55|     M|44.8955|-85.4082|44.647156| -85.30773|    0| 34|   4| 271.5626086956522|      185|139.51893023255815|
|      misc_net| 120.7|     M|44.8955|-85.4082|45.729853|-86.232562|    0| 34|   4| 271.5626086956522|      185|139.51893023255815|
|      misc_pos| 30.53|     M|44.8955|-85.4082|44.182

# Setting up the Pipeline for Machine Learning Models

The first stage in the pipeline will be to encode the categorical features which are string type using StringIndexer. This is so that we can do the second stage which is to map the categorical features represented as label index to a binary vector using OneHotEncoder. This encoding will allow us to build machine learning models like logistic regression since we need numeric features for it. OneHotEncoder can transform multiple columns. Then in the third stage we can scale the numeric features using MinMaxScaler after using VectorAssembler to convert the numeric features into one vector. Then in the last step combine all the vectors using VectorAssembler into one feature. The stages consists of what is called transformers and estimators in pyspark's machine learning library for data frames.

In spark, an estimator is basically a learning algorithm like logistic regression and other machine learning algorithms. Once we obtain a model from fitting the training data we have what is called a transformer which is what transforms one data frame to another. For example, a transformer will transform the training data set into another data frame with predictions. Finally, a pipeline is a chain of estimators and transformers where we specify our machine learning workflow. In this workflow we first encode the variables and create a column "features". This column consists of vectors of our features which is needed to run classification algorithms in Spark MLlib.

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler

step_1 = StringIndexer(inputCols = ["category", "gender"], \
                       outputCols = ["category_enc", "gender_enc"])

step_2 = OneHotEncoder(inputCols = ["category_enc", "gender_enc"],\
                       outputCols = ["category_vec", "gender_vec"])

step_3 = VectorAssembler(inputCols = ["amt", "lat", "long", "merch_lat", \
                                      "merch_long", "age", "hour", \
                                      "average_spent_shop", "num_merch", \
                                      "avg_transaction"], \
                         outputCol = "numeric_features")

step_4 = MinMaxScaler(inputCol = "numeric_features", \
                      outputCol = "numeric_features_scaled")

step_5 = VectorAssembler(inputCols = ["numeric_features_scaled", \
                                      "category_vec", "gender_vec"], \
                         outputCol = "features")


# Logistic Regression Classifier

The first machine learning model is logistic regression which we have already conveniently one hot encoded all categorical variables in the preparing pipeline stage.

## Tuning

The tool we used for tuning is called CrossValidator in MLlib, which requires a pipeline and evaluator class to evaluate the held out test data using the metric we specify for the evaluator. It also requires a paramter grid so that it knows which parameters to tune for, and for this we use ParamGridBuilder. Lastly, we specify the number of folds using numFolds.

The pipeline used is as mentioned in the last section, with the last step being the estimator LogisticRegression with maxIter of 10. We perform model selection where the best parameter in question is the regularization parameter. We are tuning only for the logistic regression model in here even though the estimator is our entire pipeline. In reality, the pipeline could contain different algorithms and featurizations where we would have to tune at the same time which is where the functionality of the pipeline comes from apart from having cleaner code. For example, if the data contains missing values, we could tune for different parameters for imputation methods but since our data set is clean from the beginning I find no need to include more complications into the pipeline. The evaluator used is MulticlassClassificationEvaluator for which we specify the f-score as evluation metric and label 1 representing fraud as the label to calculate the f-score. The number of folds is 3 and the parameter grid contains 3 regularization parameters.

The model selection process will work by considering each regularization paramter in the paramter grid.

For each of the 3 regularization parameter, the following will be repeated 3 times:

  1. Randomly split the data into 2/3 of training data and 1/3 held out test data
  2. Run logistic regression on training data to get a model
  3. Evaluate the model using the test data with f-score

Then for each regularization parameter we will have 3 f-scores. Take the average of these 3 f-scores and we have an average metric for each regularization parameter. The regularization parameter giving highest average metric will be the one chosen to fit for the best model using the entire data set in the end.

With our 3 regularization parameters in the grid and 3 folds, a total of 9 models will be trained.

In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

lr = LogisticRegression(maxIter=10)

pipeline = Pipeline(stages = [step_1, step_2, step_3, step_4, step_5, lr])

paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.0, 0.01, 0.1]).build()

crossval = CrossValidator(estimator = pipeline, estimatorParamMaps = paramGrid,\
                          evaluator = MulticlassClassificationEvaluator\
                           (metricName='fMeasureByLabel', metricLabel = 1),\
                          numFolds = 3)

cvModel = crossval.fit(train_data)

cvModel.avgMetrics

[0.20418814795082071, 0.1309417276585377, 0.01878275494881886]

The best regularization parameter for logistic regression turns out to be 0. Once we fit the training data using CrossValidator, it will represent the best model (i.e. cvModel will contain the model trained with regularization parameter 0). We can then use bestModel attribute of the CrossValidationModel class to get our respective model. In our case it would be the LogisticRegressionModel. Once we do this, we can access all attributes of our LogisticRegressionModel and get additional information.

We tuned for the regularization parameter only and overall the parameter grid is small so that the run time does not take hours as our data set is not small. As we are not doing this on a cluster, the tuning is very expensive. Additionally, since the objective is more about learning, the tuning process is simple with a small grid. As we will see in the end with gradient boosted trees that despite simple tuning, it is sufficient to get good results.

## Evaluation of Training Set

Using the best model, we get the f-score and accuracy of it on the training data.

In [None]:
predict_train = cvModel.transform(train_data)

# f-score evaluator
evaluator_f = MulticlassClassificationEvaluator(metricName = "fMeasureByLabel", metricLabel = 1)

# accuracy evaluator
evaluator_acc = MulticlassClassificationEvaluator(metricName = "accuracy")

# F score for the best lr model
lr_f = evaluator_f.evaluate(predict_train)

# accuracy for the best lr model
lr_acc = evaluator_acc.evaluate(predict_train)

print(f"f score = {lr_f}")
print(f"accuracy = {lr_acc}")


f score = 0.20487364620938628
accuracy = 0.995766265158669


Get recall value as well.

In [None]:
TP = predict_train.filter("label = 1 and prediction = 1").count()

FN = predict_train.filter("label = 1 and prediction = 0").count()

FP = predict_train.filter("label = 0 and prediction = 1").count()

TN = predict_train.filter("label = 0 and prediction = 0").count()

lr_recall = TP / (TP + FN)

print(TP)
print(TP + FN)
print(f"recall = {lr_recall}")

227
1887
recall = 0.12029676735559089


In [None]:
TP/(TP + FP)

0.6899696048632219

Confusion Matrix for the Best Logistic Regression Model on Training data:

In [None]:
print(TP)
print(TN)
print(FN)
print(FP)

227
414192
1660
102


# Gradient Boosted Trees Classifier

The second and last machine learning algorithm to implement in MLlib is gradient boosted trees classifier which performs significantly better than logistic regression.

## Tuning

Just like logistic regression, we perform model selection using CrossValidator with the the usual pipeline but the last step involves the GBTClassifier estimator. The parameter grid this time consists of maxDepth values 3, 5, and 8. maxDepth is the parameter in GBTClassifier specifying the maximum height of the tree. The default maxDepth is 5 but we try a lower maxDepth in case of overfitting and a higher maxDepth to experiment. The same evaluator MulticlassClassificationEvaluator with metric being f-score is used as we are only interested in f-score when selecting model. The number of folds is again 3. I tried to use a larger grid to tune for more parameters like iterations and larger folds but the run time is atleast an hour with google colab CPU and the result is not significant. Thus, we only tune for the maxDepth parameter in GBTClassifier.

In [None]:
# Takes more than an hour to run

# Results are [0.5280599821917251, 0.694683860976658, 0.7826349553743238]
# so the best model from cross validation with 3 folds is the one with maxDepth
# 8

from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

gbt = GBTClassifier()

gbt_pipeline = Pipeline(stages = [step_1, step_2, step_3, step_4, step_5, gbt])

gbt_paramGrid = ParamGridBuilder().addGrid(gbt.maxDepth, [3, 5, 8]).build()

gbt_crossval = CrossValidator(estimator = gbt_pipeline, \
                              estimatorParamMaps = gbt_paramGrid, \
                          evaluator = MulticlassClassificationEvaluator\
                           (metricName='fMeasureByLabel', metricLabel = 1),\
                          numFolds = 3)

gbt_cvModel = gbt_crossval.fit(train_data)

gbt_cvModel.avgMetrics

[0.5280599821917251, 0.694683860976658, 0.7826349553743238]

The best value for maxDepth turns out to be 8 so we now train a gradient boosted tree with maxDepth 8. I ran cross validation once and found that maxDepth is optimized at 8 and it took more than an hour to run. Since it is not necessary to run the cross validation again as we already know the optimal maxDepth and the fact that it will take a long time, which is why I trained a gradient boosted tree with maxDepth of 8 by fitting the pipeline using pipeline.fit(train_data) directly. This gives back a PipelineModel instead of CrossValidationModel like in logistic regression but there is no practical difference in terms of evaluating our training set.

In [None]:
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

gbt_final = GBTClassifier(maxDepth = 8, seed = 28)

gbt_pipeline_final = Pipeline(stages = [step_1, step_2, step_3, step_4, step_5, gbt_final])

gbt_model = gbt_pipeline_final.fit(train_data)

## Evaluation of Training Set

Now we evaluate the best model which is the gradient boosted tree we trained using maxDepth of 8. We evaluate on the training set for the f-score, accuracy, and recall.

In [None]:
# Gradient Boosted Tree Model Class
final_model = gbt_model.stages[-1]

# Metric evaluator for f score
evaluator_f = MulticlassClassificationEvaluator(metricName = "fMeasureByLabel", metricLabel = 1)

# Metric evaluator for accuracy
evaluator_acc = MulticlassClassificationEvaluator(metricName = "accuracy")

# transform training data set using gbt model
gbt_predict_train = gbt_model.transform(train_data)

Get the f1 score

In [None]:
gbt_f_score = evaluator_f.evaluate(gbt_predict_train)

print(gbt_f_score)

0.8505540581012279


Get the recall value

In [None]:
TP = gbt_predict_train.filter("label = 1 and prediction = 1").count()

FN = gbt_predict_train.filter("label = 1 and prediction = 0").count()

FP = gbt_predict_train.filter("label = 0 and prediction = 1").count()

TN = gbt_predict_train.filter("label = 0 and prediction = 0").count()

gbt_recall = TP / (TP + FN)

print(TP)
print(TP + FN)
print(gbt_recall)

1420
1887
0.7525172231054584


In [None]:
TP/(TP + FP)

0.977961432506887

Get Accuracy

In [None]:
gbt_acc = evaluator_acc.evaluate(gbt_predict_train)

print(gbt_acc)

0.9988010024484539


Confusion matrix for the best GBT model on the training data:

In [None]:
print(TP)
print(TN)
print(FN)
print(FP)

1420
414262
467
32


## Get Feature Importances

Get most important features according to the best model

In [None]:
top_features = final_model.featureImportances

top_features

SparseVector(24, {0: 0.1354, 1: 0.0283, 2: 0.024, 3: 0.0207, 4: 0.0173, 5: 0.1448, 6: 0.1334, 7: 0.0934, 8: 0.037, 9: 0.1068, 10: 0.0369, 11: 0.063, 12: 0.0111, 13: 0.02, 14: 0.01, 15: 0.0231, 16: 0.0175, 17: 0.0052, 18: 0.0077, 19: 0.0033, 20: 0.0146, 21: 0.0209, 22: 0.0065, 23: 0.0191})

Amount of transaction, age of card holder and hour of transaction were important features.

# Structured Streaming

In the last section we stream in the test data which represents the transactions during December to do prediction. We will use the gradient boosted tree model that we trained to do the predictions of whether the transaction is a fraud or not. We will not consider the logistic regression model to do live prediction as it performs significantly worse.

Since the input data format to stream is in batches, we will first split the test data by hour since we have a variable representing hour of transaction. After splitting we will have 24 files where each will represent transactions during an hour and the hour ranges from 0 to 23. This will represent a more realistic scenario where we stream in data every hour and do prediction of fraud.

## Splitting the test data

We use for loop and filter the test data frame by hour and then use write.csv to write into test_data_split in our directory where we include the header and use "append" mode to add to existing file. The order in which we write the files starts with hour 0 data and all the way to hour 23. This is good since spark streaming will read in oldest files first which means hour 0 is read first since its the first file we write, which is what we want.

In [None]:
import shutil, os
# Remove folder if it already exist so we don't rewrite
if os.path.isdir("test_data_split"):
    shutil.rmtree("test_data_split")

# Split the test data and save into folder test_data_split
for i in range(24):
  hour = test_data.filter(f"hour = {i}").repartition(1)
  hour.write.csv(f"test_data_split/", header = True, mode = "append")

## Create streaming data frame

We create a streaming data frame which represents an unbounded table to be appeneded to as we stream the data. This data frame is not receiving any data yet as streaming is yet to start. We can think of this step as setting up the data stream as an unbounded table and we have to specify our options for this "unbounded" table. For example we specify that maxFilesPerTrigger is 1 so only 1 file which represents a particular hour will be considered in every trigger.

Before creating the source for streaming data frame, we have to get the schema from the test data since by default, structured streaming from file source requires specification of schema according to spark. This is so that consistent schema will be used for the streaming query despite failure.

In [None]:
# read the schema from the existing test data csv file
read_schema = spark.read.format("csv").option("inferSchema", True)\
                                      .option("header", True)\
                                      .load("test_data_split")\
                                      .limit(1).schema

# Create data frame that represents the stream of our test data
df = spark.readStream.option("maxFilesPerTrigger", 1)\
                     .csv("test_data_split", schema=read_schema, header=True)

## "real time" prediction

Now we create a query to run on the streaming data frame. After each trigger, new input data will be streamed into the data frame and this query is applied to it. The query will transform the streaming data set using the gradient boosted tree model that we trained previously. The transformed data frame will include the new column "prediction" to show us what we predict the test label to be.

In [None]:
# Set up the query
prediction = gbt_model.transform(df).select("label","prediction","hour","amt")


Now we use writeStream to decide what is returned as data is streamed. We have to decide on the output sink and output mode.

The output sink can be console, memory, file, or other built in output sinks. I could not use console because google colab requires premium and memory sink will store an in-memory table where the entire output is collected as it is for debugging. So I decided to use file output sink and get back 24 parquet files of the predictions for each hour and then convert these parquet files back into a spark data frame.

The resulting output sink which is what is outputed externally depends on the output mode. Structured streaming has 3 output modes which are append, complete and update. For the task we will be doing, only append and complete is relevant. For append, only new rows added to the resulting table is provided. This is the default option if we only apply queries that do not change the row like the select method. If our queries aggregate the data frame, then the complete mode is used which returns the entire data frame after each trigger.

For real time prediction of the test data, we will be using the gradient boosted tree model that we trained to do prediction. This will transform the test data by adding columns like "prediction" which will not change the rows of the data frame after each trigger. After all we are just adding new predicted values as more data is streamed, so the old predicted values do not need to appear again. Hence, append mode will be used.

In [None]:
if os.path.isdir("predictions"):
    shutil.rmtree("predictions")

if os.path.isdir("check_point"):
    shutil.rmtree("check_point")

prediction.writeStream.format("parquet").option("path", "predictions/")\
                                  .option("header", True)\
                                  .option("checkpointLocation","check_point/")\
                                  .outputMode("append")\
                                  .start()

<pyspark.sql.streaming.query.StreamingQuery at 0x7a95c8ad4d30>

After running writeStream we obtain 24 parquet files each containing predictions on the test data for every hour. Convert each of these into spark data frame and we can see the predictions using show(). We also obtain the total amount of fraud predicted for each hour.

In [None]:
import os

directory = 'predictions'
# iterate over files in
# that directory

total_fraud = 0

for filename in os.listdir(directory):

    f = os.path.join(directory, filename)

    if f[0:16] == "predictions/part":

      df_pred = spark.read.parquet(f)
      df_pred.sort("label", ascending = False).show()

      if df_pred.filter("prediction = 1").count() != 0:

        total_pred_fraud = df_pred.filter("prediction = 1").select("amt").rdd\
                                  .flatMap(lambda x: x).reduce(lambda x,y:x+y)
        print(total_pred_fraud)

        total_fraud += total_pred_fraud

print(total_fraud)

+-----+----------+----+------+
|label|prediction|hour|   amt|
+-----+----------+----+------+
|    0|       0.0|   9| 44.07|
|    0|       0.0|   9| 76.84|
|    0|       0.0|   9| 37.64|
|    0|       0.0|   9|108.51|
|    0|       0.0|   9|  8.78|
|    0|       0.0|   9| 63.46|
|    0|       0.0|   9| 79.56|
|    0|       0.0|   9|  48.1|
|    0|       0.0|   9|121.15|
|    0|       0.0|   9|133.34|
|    0|       0.0|   9| 73.06|
|    0|       0.0|   9| 40.33|
|    0|       0.0|   9| 61.98|
|    0|       0.0|   9| 58.07|
|    0|       0.0|   9| 38.84|
|    0|       0.0|   9| 44.43|
|    0|       0.0|   9|232.25|
|    0|       0.0|   9|  81.1|
|    0|       0.0|   9| 26.41|
|    0|       0.0|   9| 21.07|
+-----+----------+----+------+
only showing top 20 rows

+-----+----------+----+------+
|label|prediction|hour|   amt|
+-----+----------+----+------+
|    1|       0.0|   5|798.67|
|    1|       1.0|   5| 392.0|
|    0|       0.0|   5|132.33|
|    0|       0.0|   5| 60.07|
|    0|      

# Evaluation of Test Data

f-score and accuracy for test data

In [None]:
gbt_predict_test = gbt_model.transform(test_data)

print(evaluator_f.evaluate(gbt_predict_test))
print(evaluator_acc.evaluate(gbt_predict_test))

0.7356828193832599
0.9991400192062377


Recall value for test data

In [None]:
TP = gbt_predict_test.filter("label = 1 and prediction = 1").count()

FN = gbt_predict_test.filter("label = 1 and prediction = 0").count()

FP = gbt_predict_test.filter("label = 0 and prediction = 1").count()

TN = gbt_predict_test.filter("label = 0 and prediction = 0").count()

gbt_recall = TP / (TP + FN)

print(TP)
print(TP + FN)
print(gbt_recall)

167
258
0.6472868217054264


precision

In [None]:
TP/(TP + FP)

0.8520408163265306

Confusion matrix

In [None]:
print(TP)
print(FP)
print(FN)
print(TN)

167
29
91
139251


f score for logistic regression model on test data

In [None]:
lr_predict_test = cvModel.transform(test_data)

evaluator_f.evaluate(lr_predict_test)

0.21818181818181817

recall for LR model on test data

In [None]:
TP = lr_predict_test.filter("label = 1 and prediction = 1").count()

FN = lr_predict_test.filter("label = 1 and prediction = 0").count()

FP = lr_predict_test.filter("label = 0 and prediction = 1").count()

TN = lr_predict_test.filter("label = 0 and prediction = 0").count()

lr_recall = TP / (TP + FN)

print(TP)
print(TP + FN)
print(lr_recall)

36
258
0.13953488372093023


precision for LR on test data

In [None]:
TP/(TP + FP)

0.5

accuracy for LR on test data

In [None]:
evaluator_acc.evaluate(lr_predict_test)

0.9981510412934111

In [None]:
print(TP)
print(TN)
print(FN)
print(FP)

36
139244
222
36


# References

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#overview

https://spark.apache.org/docs/latest/ml-features.html#vectorassembler

https://spark.apache.org/docs/latest/ml-tuning.html#cross-validation

https://www.kaggle.com/datasets/kelvinkelue/credit-card-fraud-prediction/data


