This notebook trains a fraud detection model using simulated credit card transactions history generated for 6 months. Usually training a model to detect fraudulent transaction is quite difficult considering the low nnumber of confirmed instances of fraudulent behaviour.

In [0]:
import os
import random
import sys
import time
import numpy as np
import pandas as pd
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import dense_rank, desc
from pyspark.sql.types import (
    DoubleType,
    LongType,
    StringType,
    StructField,
    StructType,
    TimestampType,
    IntegerType,
)
from pyspark.sql.window import Window
from sklearn.model_selection import train_test_split
from pyspark.sql.functions import round

### DATA AGGREGATION
The following block implements a couple of window function to gather weekly/10 min aggregation of average amount/number of transactions per credit card. The average amount and number of transactions are normlalized by dividing the same over the weekly aggregated amount and number of transactions. 

In addition to the above, the aggregated features are grouped by credit card number and selected features are written to a file (exploring the possibility of using online feature store like Feast to record to store selected features. More info on this can be found at the experimental section of the repository).

In [0]:
def aggregate_features(spark):
    schema = StructType([StructField('txn_id', StringType(), True),
                         StructField('cc_num', LongType(), True),
                         StructField('ts', TimestampType(), True),
                         StructField('amt', DoubleType(), True),
                         StructField('label', DoubleType(), True)])
    transactions_df = spark.read.csv('dbfs:/FileStore/tables/transactions.csv', \
                                     header=False, \
                                     schema=schema)
    query = """
    SELECT *, \
           avg_amt_last_10m/avg_amt_last_1w AS amt_ratio1, \
           amt/avg_amt_last_1w AS amt_ratio2, \
           num_trans_last_10m/num_trans_last_1w AS count_ratio \
    FROM \
        ( \
        SELECT *, \
               COUNT(*) OVER w1 as num_trans_last_10m, \
               AVG(amt) OVER w1 as avg_amt_last_10m, \
               COUNT(*) OVER w2 as num_trans_last_1w, \
               AVG(amt) OVER w2 as avg_amt_last_1w \
        FROM transactions_df \
        WINDOW \
               w1 AS (PARTITION BY cc_num order by cast(ts AS timestamp) RANGE INTERVAL 10 MINUTE PRECEDING), \
               w2 AS (PARTITION BY cc_num order by cast(ts AS timestamp) RANGE INTERVAL 1 WEEK PRECEDING) \
        ) 
    """
    transactions_df.createOrReplaceTempView('transactions_df')
    aggregated_features = spark.sql(query)
    return aggregated_features

def group_by_cc(aggregated_features):
    window = Window.partitionBy('cc_num').orderBy(desc('ts'))
    sorted_df = aggregated_features.withColumn('rank', dense_rank().over(window))
    grouped_df = sorted_df.filter(sorted_df.rank == 1).drop(sorted_df.rank)
    sliced_df = grouped_df.select('ts', 'cc_num','num_trans_last_10m','avg_amt_last_10m','num_trans_last_1w','avg_amt_last_1w')
    sliced_df = sliced_df.withColumn('avg_amt_last_1w', round('avg_amt_last_1w', 2))
    return sliced_df

In [0]:
dbutils.fs.rm('/FileStore/tables/aggregated_output.csv', True)
dbutils.fs.rm('/FileStore/tables/aggregated_output.parquet', True)
spark = SparkSession.builder.appName('SparkFraudAnalysis').getOrCreate()
df = aggregate_features(spark)
sliced_df = group_by_cc(df)
sliced_df.coalesce(1).write.csv('dbfs:/FileStore/tables/aggregated_output.csv')
sliced_df.coalesce(1).write.parquet('dbfs:/FileStore/tables/aggregated_output.parquet')

Create a set of training and testing data (last couple of weeks used for this).

In [0]:
dbutils.fs.rm('/FileStore/tables/transactions_test.csv', True)
start = '2021-08-15'
end = '2022-01-15'
train_df = df.filter( (df.ts  > start) & (df.ts  < end) )
test_df = df.filter((df.ts > end))
test_df.coalesce(1).write.csv('dbfs:/FileStore/tables/transactions_test.csv')

In [0]:
from pyspark.sql.functions import round
train_df = train_df.select('label', 'amt', 'amt_ratio1','amt_ratio2','count_ratio')

for col in train_df.columns:
    if col != 'label':
      train_df = train_df.withColumn(col, round(col, 2))
      
test_df = test_df.select('label', 'amt', 'amt_ratio1', 'amt_ratio2', 'count_ratio')
for col in test_df.columns:
    if col != 'label':
      test_df = test_df.withColumn(col, round(col, 2))

### TRAINING MODEL
Once the raw dataset is transformed to extract spending patterns, we use the normalized ratios as features to train the model using XGBoost. We evaluate the model performance against the metrics precisely area under ROC and area under Precision-Recall. Additionally, Mlflow is used to track and compare metrics between different runs.

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluatorPR = BinaryClassificationEvaluator(labelCol = "label", rawPredictionCol = "prediction", metricName = "areaUnderPR")
evaluatorAUC = BinaryClassificationEvaluator(labelCol = "label", rawPredictionCol = "prediction", metricName = "areaUnderROC")

In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from sparkdl.xgboost import XgboostClassifier
from pyspark.ml import Pipeline

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

featuresCols = train_df.columns
featuresCols.remove('label')

vectorAssembler = VectorAssembler(inputCols=featuresCols, outputCol="features")

xgb = XgboostClassifier(num_workers=3, labelCol='label', missing=0.0)
evaluator = BinaryClassificationEvaluator(labelCol='label', metricName='areaUnderROC')
cv = CrossValidator(estimator=Pipeline(stages = [vectorAssembler, xgb]),
            estimatorParamMaps=ParamGridBuilder() \
                                .addGrid(xgb.max_depth, [2, 5])\
                                .addGrid(xgb.n_estimators, [10, 100])\
                                .build(),                             
            evaluator=evaluator,
            numFolds=3)

In [0]:
import mlflow
import mlflow.spark
with mlflow.start_run():
  cvModel = cv.fit(train_df)
  test_metric_PR = evaluatorPR.evaluate(cvModel.transform(test_df))
  test_metric_AUC = evaluatorAUC.evaluate(cvModel.transform(test_df))
  train_metric_PR = evaluatorPR.evaluate(cvModel.transform(train_df))
  train_metric_AUC  = evaluatorAUC.evaluate(cvModel.transform(train_df))
  mlflow.log_metric('test_PR_' + evaluatorPR.getMetricName(), test_metric_PR) 
  mlflow.log_metric('test_AUC_' + evaluatorAUC.getMetricName(), test_metric_AUC) 
  mlflow.log_metric('train_PR_' + evaluatorPR.getMetricName(), train_metric_PR) 
  mlflow.log_metric('train_AUC_' + evaluatorAUC.getMetricName(), train_metric_AUC)
  mlflow.spark.log_model(spark_model=cvModel.bestModel, artifact_path='best_model_xgb') 