### Pyspark initialization, Read data

In [1]:
from pyspark.sql.types import FloatType
from pyspark.sql.functions import *
from pyspark.ml.classification import NaiveBayes
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

#import spark
from pyspark.sql import SparkSession
# Create a spark session (which will run spark jobs)
spark = (
    SparkSession.builder.appName("MAST30034 ass2 BNPL group 28")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .getOrCreate()
)

22/09/29 19:06:44 WARN Utils: Your hostname, 威猛先生沃 resolves to a loopback address: 127.0.1.1; using 172.24.189.237 instead (on interface eth0)
22/09/29 19:06:44 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/09/29 19:06:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/09/29 19:06:46 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/09/29 19:06:46 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [2]:
curated_csv = spark.read.options(header = True).csv('../../data/curated/full_data.csv')
merchant_fraud = spark.read.options(header = True).csv('../../data/tables/merchant_fraud_probability.csv')
consumer_fraud = spark.read.options(header = True).csv('../../data/tables/consumer_fraud_probability.csv')

                                                                                

22/09/30 13:45:24 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 48734987 ms exceeds timeout 120000 ms
22/09/30 13:45:24 WARN NettyRpcEnv: Ignored message: true
22/09/30 13:45:24 WARN SparkContext: Killing executors is not supported by current scheduler.


In [20]:
merchant_fraud.printSchema()

root
 |-- merchant_abn: string (nullable = true)
 |-- order_datetime: string (nullable = true)
 |-- fraud_probability: string (nullable = true)



In [21]:
consumer_fraud.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- order_datetime: string (nullable = true)
 |-- fraud_probability: string (nullable = true)



### Add new feature "is_fraud" to classify whether a record is a fraud record.

In [22]:
## only keep records that has fraud probability exceed 40% (which are treated as fraud records)
consumer_fraud = consumer_fraud.withColumn("fraud_probability",consumer_fraud.fraud_probability.cast(FloatType()))
merchant_fraud = merchant_fraud.withColumn("fraud_probability",merchant_fraud.fraud_probability.cast(FloatType()))

consumer_fraud = consumer_fraud.filter(col("fraud_probability") >= 30)
merchant_fraud = merchant_fraud.filter(col("fraud_probability") >= 30)

In [23]:
curated_csv.join(consumer_fraud, (curated_csv.user_id == consumer_fraud.user_id) \
    & (curated_csv.order_datetime == consumer_fraud.order_datetime), "inner").count()

                                                                                

4977

In [24]:
## join fraud data with main dataset

from cmath import nan


curated_csv = curated_csv.join(consumer_fraud, (curated_csv.user_id == consumer_fraud.user_id) \
    & (curated_csv.order_datetime == consumer_fraud.order_datetime), "leftouter") \
        .drop(consumer_fraud.user_id) \
            .drop(consumer_fraud.order_datetime) \
                .withColumnRenamed("fraud_probability", "consumer_fraud_prob")

curated_csv = curated_csv.join(merchant_fraud, (curated_csv.merchant_abn == merchant_fraud.merchant_abn) \
    & (curated_csv.order_datetime == merchant_fraud.order_datetime), "leftouter") \
        .drop(merchant_fraud.merchant_abn) \
            .drop(merchant_fraud.order_datetime)\
                .withColumnRenamed('fraud_probability', 'merchant_fraud_prob')

curated_csv = curated_csv.withColumn("is_fraud", when((col('consumer_fraud_prob') != nan) | (col('merchant_fraud_prob') != nan), 'True').otherwise('False')) \
    .drop('consumer_fraud_prob') \
        .drop('merchant_fraud_prob')

In [25]:
curated_csv.limit(5)

user_id,merchant_abn,dollar_value,order_id,order_datetime,consumer_id,merchant_name,user_name,address,state,postcode,gender,field,revenue_level,take_rate,is_fraud
13153,72472909171,112.17,baa29c79-6969-416...,2021-08-21,1018,Nullam Consulting,Kelly Frey,98937 Brian Passage,NSW,2058,Female,digital goods: bo...,a,6.33,False
13153,23338656015,324.21,463dd933-d5b3-441...,2022-02-03,1018,Iaculis LLC,Kelly Frey,98937 Brian Passage,NSW,2058,Female,"watch, clock, and...",b,3.25,False
13153,13093581573,117.23,9bd2915e-c3b5-47a...,2021-08-22,1018,Dis Parturient Co...,Kelly Frey,98937 Brian Passage,NSW,2058,Female,"gift, card, novel...",e,0.24,False
13153,89726005175,24.57,cd1007fb-2336-44f...,2022-02-03,1018,Est Nunc Consulting,Kelly Frey,98937 Brian Passage,NSW,2058,Female,tent and awning s...,a,6.01,False
13153,40590483301,26.08,a3e457c3-c0bb-467...,2021-08-15,1018,Donec Tempus Corp...,Kelly Frey,98937 Brian Passage,NSW,2058,Female,"cable, satellite,...",c,3.1,False


In [26]:
## num of instance in both case
curated_csv.select('is_fraud').groupBy('is_fraud').count().show()



+--------+-------+
|is_fraud|  count|
+--------+-------+
|   False|8145570|
|    True|   6014|
+--------+-------+



                                                                                

In [27]:
## view those records that are recongnised as frauds
curated_csv.where(col('is_fraud') == 'True').limit(5)

user_id,merchant_abn,dollar_value,order_id,order_datetime,consumer_id,merchant_name,user_name,address,state,postcode,gender,field,revenue_level,take_rate,is_fraud
1405,90568944804,515.04,c3229118-d39e-4c2...,2021-11-29,15233,Diam Eu Dolor LLC,Angela Nelson,877 Denise Island...,WA,6966,Female,tent and awning s...,b,4.1,True
22064,14827550074,16059.5,84dcf104-beb7-4a5...,2021-10-08,19667,,Alicia Howard,4411 Elizabeth Lo...,TAS,7301,Female,,,,True
3101,91880575299,105193.88,2ab65c8f-11b2-41c...,2021-04-17,27622,At Foundation,Christopher Roberson,6757 Reginald Haven,WA,6484,Male,antique shops - s...,b,3.4,True
529,31334588839,19238.04,db703b4f-cb1b-412...,2021-12-18,31180,Lacus Aliquam Cor...,Ashlee Jones,018 Lewis Squares,NSW,2721,Female,antique shops - s...,b,4.22,True
21596,80518954462,215.5,2431b7a6-7c41-411...,2021-11-27,33015,Neque Sed Dictum ...,John Johnson,56517 Bradley Road,QLD,4313,Male,"computers, comput...",b,3.49,True


### Model to detect fraud in future records

In [28]:
## set potential influential features that contributes identifying fraud records
model = curated_csv.select('user_id', 'merchant_abn', 'dollar_value', 'is_fraud')

model = model.withColumn('dollar_value', model.dollar_value.cast(FloatType()))

In [29]:
## convert string to index as categorical variable
indexers = [
StringIndexer(inputCol="user_id", outputCol = "user_id_index"),  
StringIndexer(inputCol="merchant_abn", outputCol = "merchant_abn_index"),
StringIndexer(inputCol="is_fraud", outputCol = "label")]

In [30]:
## form new dataset with feature index
pipeline = Pipeline(stages=indexers) 
#Fitting a model to the input dataset. 
indexed_model = pipeline.fit(model).transform(model) 
indexed_model.show(5,False) 

                                                                                

22/09/19 18:00:36 WARN DAGScheduler: Broadcasting large task binary with size 1048.2 KiB
+-------+------------+------------+--------+-------------+------------------+-----+
|user_id|merchant_abn|dollar_value|is_fraud|user_id_index|merchant_abn_index|label|
+-------+------------+------------+--------+-------------+------------------+-----+
|13153  |72472909171 |112.17      |False   |12788.0      |14.0              |0.0  |
|13153  |23338656015 |324.21      |False   |12788.0      |45.0              |0.0  |
|13153  |13093581573 |117.23      |False   |12788.0      |443.0             |0.0  |
|13153  |89726005175 |24.57       |False   |12788.0      |6.0               |0.0  |
|13153  |40590483301 |26.08       |False   |12788.0      |467.0             |0.0  |
+-------+------------+------------+--------+-------------+------------------+-----+
only showing top 5 rows



In [31]:
## form feature list and label for each transaction
vecAssembler = VectorAssembler(inputCols=['user_id_index', 'merchant_abn_index', 'dollar_value'], outputCol="features")
vindexed_model = vecAssembler.transform(indexed_model) 
vindexed_model.show(5, False)

22/09/19 18:00:36 WARN DAGScheduler: Broadcasting large task binary with size 1058.3 KiB
+-------+------------+------------+--------+-------------+------------------+-----+----------------------------------+
|user_id|merchant_abn|dollar_value|is_fraud|user_id_index|merchant_abn_index|label|features                          |
+-------+------------+------------+--------+-------------+------------------+-----+----------------------------------+
|13153  |72472909171 |112.17      |False   |12788.0      |14.0              |0.0  |[12788.0,14.0,112.16999816894531] |
|13153  |23338656015 |324.21      |False   |12788.0      |45.0              |0.0  |[12788.0,45.0,324.2099914550781]  |
|13153  |13093581573 |117.23      |False   |12788.0      |443.0             |0.0  |[12788.0,443.0,117.2300033569336] |
|13153  |89726005175 |24.57       |False   |12788.0      |6.0               |0.0  |[12788.0,6.0,24.56999969482422]   |
|13153  |40590483301 |26.08       |False   |12788.0      |467.0             |0

In [46]:
## train_test_split
(trainingData, testData) = vindexed_model.randomSplit([0.7, 0.3])

print(trainingData.count())
print(testData.count())

22/09/19 18:02:21 WARN DAGScheduler: Broadcasting large task binary with size 1088.9 KiB


                                                                                

5706313
22/09/19 18:02:27 WARN DAGScheduler: Broadcasting large task binary with size 1088.9 KiB




2445271


                                                                                

In [47]:
# Train a NaiveBayes model
nb = NaiveBayes(smoothing=1.0, modelType="gaussian")

# Chain labelIndexer, vecAssembler and NBmodel in a 
nbmodel = nb.fit(trainingData)

# Run stages in pipeline and train model
predictions_df = nbmodel.transform(testData)
predictions_df.show(5, True)

22/09/19 18:02:33 WARN DAGScheduler: Broadcasting large task binary with size 1094.7 KiB




22/09/19 18:02:39 WARN DAGScheduler: Broadcasting large task binary with size 1108.2 KiB


                                                                                

22/09/19 18:02:39 WARN DAGScheduler: Broadcasting large task binary with size 1110.8 KiB


[Stage 212:>                                                        (0 + 1) / 1]

+-------+------------+------------+--------+-------------+------------------+-----+--------------------+--------------------+--------------------+----------+
|user_id|merchant_abn|dollar_value|is_fraud|user_id_index|merchant_abn_index|label|            features|       rawPrediction|         probability|prediction|
+-------+------------+------------+--------+-------------+------------------+-----+--------------------+--------------------+--------------------+----------+
|      1| 16629601490|      388.44|   False|       2820.0|            1319.0|  0.0|[2820.0,1319.0,38...|[-23.796344042140...|[0.99994713313389...|       0.0|
|      1| 17324645993|        3.52|   False|       2820.0|              25.0|  0.0|[2820.0,25.0,3.51...|[-22.238121402710...|[0.99999288820892...|       0.0|
|      1| 17488304283|       249.7|   False|       2820.0|              61.0|  0.0|[2820.0,61.0,249....|[-22.148838414742...|[0.99999324190829...|       0.0|
|      1| 19933438190|       64.54|   False|       2

                                                                                

In [48]:
## show performance of this model
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy") 
nbaccuracy = evaluator.evaluate(predictions_df) 
print("Test accuracy = " + str(nbaccuracy))

22/09/19 18:02:41 WARN DAGScheduler: Broadcasting large task binary with size 1115.2 KiB




Test accuracy = 0.9878295698104628


                                                                                

In [49]:
## num of transactions being catergorized and predicted as fraud
predictions_df = predictions_df.select('label', 'prediction')
predictions_df.where((col('prediction') == col('label')) & (col('prediction') == 1)).count()

22/09/19 18:02:48 WARN DAGScheduler: Broadcasting large task binary with size 1112.8 KiB


                                                                                

739

In [43]:
## num of transactions being predicted as fraud
predictions_df.where(col('prediction') == 1).count()

22/09/19 18:02:02 WARN DAGScheduler: Broadcasting large task binary with size 1109.1 KiB


                                                                                

29554

In [44]:
## num of transactions being catergorized and predicted as normal record
predictions_df = predictions_df.select('label', 'prediction')
predictions_df.where((col('prediction') == col('label')) & (col('prediction') == 0)).count()

22/09/19 18:02:08 WARN DAGScheduler: Broadcasting large task binary with size 1112.8 KiB


                                                                                

2414438

In [45]:
## num of transactions being predicted as normal record
predictions_df.where(col('prediction') == 0).count()

22/09/19 18:02:15 WARN DAGScheduler: Broadcasting large task binary with size 1109.1 KiB


                                                                                

2415560