# Create Training Data for Fraud Predictor Models

In [1]:
# import libraries
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from statistics import mean, stdev, pstdev
import os

In [2]:
# setup spark
spark = (
    SparkSession.builder.appName("aggregate data for first 3 final model variables")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .config("spark.driver.memory", "15g")
    .getOrCreate()
)

22/09/18 17:30:02 WARN Utils: Your hostname, modaxuexiweiyuanzhangde-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.13.64.116 instead (on interface en0)
22/09/18 17:30:02 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/09/18 17:30:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/09/18 17:30:04 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
# create directory persona if does not exist
dirs_to_create = ['../data/curated/fraud', '../data/curated/fraud/input', '../data/curated/fraud/output']

def create_dirs(dirs_to_create):
    # check if it exists as it makedir will raise an error if it does exist
    for dir_to_create in dirs_to_create:
        if not os.path.exists(dir_to_create):
            os.makedirs(dir_to_create)

create_dirs(dirs_to_create)

## Read in files

In [4]:
# import transactions data
data1 = spark.read.parquet("../data/tables/transactions_20210228_20210827_snapshot/")
data2 = spark.read.parquet("../data/tables/transactions_20210828_20220227_snapshot/")
data3 = spark.read.parquet("../data/tables/transactions_20220228_20220828_snapshot/")

data = data1.union(data2)
data = data.union(data3)

                                                                                

In [5]:
# read both fraud data
merchant_fraud = spark.read.option("header",True).csv('../data/tables/merchant_fraud_probability.csv', )
consumer_fraud = spark.read.option("header",True).csv('../data/tables/consumer_fraud_probability.csv', )

In [6]:
# join up the fraud data with merchant data so every transaction made on the day with fraud label is linked
merchant_fraud_join_data = merchant_fraud.join(data, [merchant_fraud.merchant_abn == data.merchant_abn, \
    merchant_fraud.order_datetime == data.order_datetime], 'left')

consumer_fraud_join_data = consumer_fraud.join(data, [consumer_fraud.user_id == data.user_id, \
    consumer_fraud.order_datetime == data.order_datetime], 'left')

In [7]:
# create pandas dataframe
merchant_fraud_join_data_df = merchant_fraud_join_data.toPandas()
consumer_fraud_join_data_df = consumer_fraud_join_data.toPandas()

                                                                                

In [8]:
# drop duplicate columns from previous join
merchant_fraud_join_data_df = merchant_fraud_join_data_df.iloc[:, 2:]
consumer_fraud_join_data_df = consumer_fraud_join_data_df.iloc[:, 2:]

In [9]:
# create exxtra column for taking aggregation
merchant_fraud_join_data_df['dollar_value2'] = merchant_fraud_join_data_df['dollar_value']
consumer_fraud_join_data_df['dollar_value2'] = consumer_fraud_join_data_df['dollar_value']

In [10]:
# take aggregation
merchant_fraud_instance_agg = merchant_fraud_join_data_df.groupby(['merchant_abn', 'order_datetime']).agg({'dollar_value': mean, \
    'dollar_value2': pstdev, 'order_id':'count', 'user_id':'nunique', 'fraud_probability': 'first'})

consumer_fraud_instance_agg = consumer_fraud_join_data_df.groupby(['user_id', 'order_datetime']).agg({'dollar_value': mean, \
    'dollar_value2': pstdev, 'order_id':'count', 'merchant_abn':'nunique', 'fraud_probability': 'first'})

In [11]:
# previously used population standard deviation to avoid dividng by 0 error. 
# Now use liottle trick to get sample sd and setting samples with only one obs to 0
tmp = merchant_fraud_instance_agg['order_id']
tmp2 = [x-1 if x != 1 else 1 for x in tmp ]
merchant_fraud_instance_agg['count**'] = tmp2
merchant_fraud_instance_agg['sd_transact'] = merchant_fraud_instance_agg['dollar_value2']* \
    merchant_fraud_instance_agg['order_id']/merchant_fraud_instance_agg['count**']
merchant_fraud_instance_agg = merchant_fraud_instance_agg.drop(['dollar_value2', 'count**'], axis=1)

tmp = consumer_fraud_instance_agg['order_id']
tmp2 = [x-1 if x != 1 else 1 for x in tmp ]
consumer_fraud_instance_agg['count**'] = tmp2
consumer_fraud_instance_agg['sd_transact'] = consumer_fraud_instance_agg['dollar_value2']* \
    consumer_fraud_instance_agg['order_id']/consumer_fraud_instance_agg['count**']
consumer_fraud_instance_agg = consumer_fraud_instance_agg.drop(['dollar_value2', 'count**'], axis=1)

In [12]:
# rename for more accurate feature descriptions
merchant_fraud_instance_agg = merchant_fraud_instance_agg.rename(columns = {'order_id': '#daily_orders', 'dollar_value': 'transact_amount_perOrder', 'user_id': '#distinct_customers', 'sd_transact': 'transact_amount_perOrder_sd'})
consumer_fraud_instance_agg = consumer_fraud_instance_agg.rename(columns = {'order_id': '#daily_orders', 'dollar_value': 'transact_amount_perOrder', 'merchant_abn': '#distinct_merchants', 'sd_transact': 'transact_amount_perOrder_sd'})

In [13]:
# change it into correct data type of float
merchant_fraud_instance_agg['fraud_probability'] = merchant_fraud_instance_agg['fraud_probability'].astype(float)
consumer_fraud_instance_agg['fraud_probability'] = consumer_fraud_instance_agg['fraud_probability'].astype(float)

## Analysis

In [14]:
merchant_fraud_instance_agg.describe()

Unnamed: 0,transact_amount_perOrder,#daily_orders,#distinct_customers,fraud_probability,transact_amount_perOrder_sd
count,114.0,114.0,114.0,114.0,114.0
mean,16658.823701,35.605263,35.412281,40.419335,5477.762207
std,19284.02638,84.515341,83.795348,17.187745,7431.335945
min,28.891633,1.0,1.0,18.210891,0.0
25%,763.176082,2.0,2.0,28.992765,158.578334
50%,10875.642509,5.0,5.0,32.692032,1620.766619
75%,25253.562493,10.75,10.75,48.39526,8923.734614
max,105193.885789,535.0,528.0,94.1347,42347.488441


In [15]:
consumer_fraud_instance_agg.describe()

Unnamed: 0,transact_amount_perOrder,#daily_orders,#distinct_merchants,fraud_probability,transact_amount_perOrder_sd
count,34765.0,34765.0,34765.0,34765.0,34765.0
mean,2687.717829,2.317273,2.307925,14.94585,2714.708273
std,3052.406638,1.19144,1.177942,9.397401,3735.733129
min,188.135142,1.0,1.0,8.287144,0.0
25%,1134.143002,1.0,1.0,9.630652,0.0
50%,1896.602843,2.0,2.0,11.718317,1906.195963
75%,3081.780009,3.0,3.0,16.153476,3413.690131
max,105193.885789,12.0,11.0,99.24738,59050.181658


In [16]:
consumer_fraud_instance_agg

Unnamed: 0_level_0,Unnamed: 1_level_0,transact_amount_perOrder,#daily_orders,#distinct_merchants,fraud_probability,transact_amount_perOrder_sd
user_id,order_datetime,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
1,2022-02-20,2479.076338,1,1,9.805431,0.000000
2,2021-08-30,710.667418,3,3,9.599514,1023.234306
2,2021-09-25,2236.461666,1,1,10.069851,0.000000
3,2021-11-03,2334.493717,1,1,8.300636,0.000000
4,2021-10-09,775.013087,3,3,9.633302,1106.645341
...,...,...,...,...,...,...
24079,2021-10-12,4708.648444,1,1,14.948165,0.000000
24079,2021-11-08,1408.125536,2,2,8.940524,2185.655120
24079,2021-11-26,696.038011,4,4,8.838622,1016.552311
24081,2021-10-08,2160.357725,2,2,14.343772,4241.359534


# Get all merchant/consumer's daily activity - to take ratio (more feature engineering)

In [17]:
# First aggregate by day
full_merchant_daily = data.groupBy("merchant_abn", "order_datetime").agg(F.countDistinct("user_id").alias('distinct_users'),
                                                     F.count("order_id").alias('count'),
                                                     F.mean("dollar_value").alias('mean transact'),
                                                     F.stddev("dollar_value").alias('stdev transact'))

# Then aggregate by unit
full_merchant_daily_agg = full_merchant_daily.groupBy("merchant_abn")\
    .agg(F.mean('distinct_users').alias('mean_#distinct_customers'), \
        F.mean('count').alias('mean_#daily_orders'),\
        F.mean("mean transact").alias('mean_transact_amount_perOrder'),\
        F.mean('stdev transact').alias('mean_transact_amount_perOrder_sd'))

# turn into Pandas dataframe
full_merchant_daily_agg_df = full_merchant_daily_agg.toPandas()

                                                                                

In [18]:
# First aggregate by day
full_consumer_daily = data.groupBy("user_id", "order_datetime").agg(F.countDistinct("merchant_abn").alias('distinct_merchants'),
                                                     F.count("order_id").alias('count'),
                                                     F.mean("dollar_value").alias('mean transact'),
                                                     F.stddev("dollar_value").alias('stdev transact'))

# Then aggregate by unit
full_consumer_daily_agg = full_consumer_daily.groupBy("user_id")\
    .agg(F.mean('distinct_merchants').alias('mean_#distinct_merchants'), \
        F.mean('count').alias('mean_#daily_orders'),\
        F.mean("mean transact").alias('mean_transact_amount_perOrder'),\
        F.mean('stdev transact').alias('mean_transact_amount_perOrder_sd'))

# turn into Pandas dataframe
full_consumer_daily_agg_df = full_consumer_daily_agg.toPandas()                                      

                                                                                

In [19]:
# create ratio variables after merging fraud set with all merchant's individual averagehistorical activity
merchant_fraud_df = merchant_fraud_instance_agg.merge(full_merchant_daily_agg_df, on = 'merchant_abn', how = 'left')

merchant_fraud_df['#daily_orders_ratio'] = merchant_fraud_df['#daily_orders']/merchant_fraud_df['mean_#daily_orders']
merchant_fraud_df['#distinct_customers_ratio'] = merchant_fraud_df['#distinct_customers']/merchant_fraud_df['mean_#distinct_customers']
merchant_fraud_df['transact_amount_perOrder_sd_ratio'] = merchant_fraud_df['transact_amount_perOrder_sd']/merchant_fraud_df['mean_transact_amount_perOrder_sd']
merchant_fraud_df['transact_amount_perOrder_ratio'] = merchant_fraud_df['transact_amount_perOrder']/merchant_fraud_df['mean_transact_amount_perOrder']

merchant_fraud_df = merchant_fraud_df.drop(['mean_#daily_orders', 'mean_#distinct_customers', 'mean_transact_amount_perOrder_sd', \
    'mean_transact_amount_perOrder'], axis=1)

merchant_fraud_df = merchant_fraud_df.fillna(0)

In [20]:
# create ratio variables after merging fraud set with all user's average historical activity 
consumer_fraud_df = consumer_fraud_instance_agg.merge(full_consumer_daily_agg_df, on = 'user_id', how = 'left')

consumer_fraud_df['#daily_orders_ratio'] = consumer_fraud_df['#daily_orders']/consumer_fraud_df['mean_#daily_orders']
consumer_fraud_df['#distinct_merchants_ratio'] = consumer_fraud_df['#distinct_merchants']/consumer_fraud_df['mean_#distinct_merchants']
consumer_fraud_df['transact_amount_perOrder_sd_ratio'] = consumer_fraud_df['transact_amount_perOrder_sd']/consumer_fraud_df['mean_transact_amount_perOrder_sd']
consumer_fraud_df['transact_amount_perOrder_ratio'] = consumer_fraud_df['transact_amount_perOrder']/consumer_fraud_df['mean_transact_amount_perOrder']

consumer_fraud_df = consumer_fraud_df.drop(['mean_#daily_orders', 'mean_#distinct_merchants', 'mean_transact_amount_perOrder_sd', 'mean_transact_amount_perOrder'], axis=1)

consumer_fraud_df = consumer_fraud_df.fillna(0)

In [21]:
consumer_fraud_df.describe()

Unnamed: 0,user_id,transact_amount_perOrder,#daily_orders,#distinct_merchants,fraud_probability,transact_amount_perOrder_sd,#daily_orders_ratio,#distinct_merchants_ratio,transact_amount_perOrder_sd_ratio,transact_amount_perOrder_ratio
count,34765.0,34765.0,34765.0,34765.0,34765.0,34765.0,34765.0,34765.0,34765.0,34765.0
mean,12059.19255,2687.717829,2.317273,2.307925,14.94585,2714.708273,1.465361,1.462653,14.126294,15.308155
std,6962.389886,3052.406638,1.19144,1.177942,9.397401,3735.733129,0.751907,0.745055,15.722238,14.260001
min,1.0,188.135142,1.0,1.0,8.287144,0.0,0.58277,0.583893,0.0,0.941424
25%,6063.0,1134.143002,1.0,1.0,9.630652,0.0,0.654045,0.655172,0.0,6.984808
50%,12071.0,1896.602843,2.0,2.0,11.718317,1906.195963,1.27551,1.27787,11.094978,11.513097
75%,18092.0,3081.780009,3.0,3.0,16.153476,3413.690131,1.898276,1.901193,19.454004,18.203966
max,24081.0,105193.885789,12.0,11.0,99.24738,59050.181658,7.641892,6.875,147.443528,239.265478


In [22]:
merchant_fraud_df.describe()

Unnamed: 0,merchant_abn,transact_amount_perOrder,#daily_orders,#distinct_customers,fraud_probability,transact_amount_perOrder_sd,#daily_orders_ratio,#distinct_customers_ratio,transact_amount_perOrder_sd_ratio,transact_amount_perOrder_ratio
count,114.0,114.0,114.0,114.0,114.0,114.0,114.0,114.0,114.0,114.0
mean,54407380000.0,16658.823701,35.605263,35.412281,40.419335,5477.762207,2.246425,2.244831,1.541706,1.92071
std,31726770000.0,19284.02638,84.515341,83.795348,17.187745,7431.335945,0.940527,0.939643,1.26438,1.24808
min,11149060000.0,28.891633,1.0,1.0,18.210891,0.0,0.836735,0.836735,0.0,0.820279
25%,21731510000.0,763.176082,2.0,2.0,28.992765,158.578334,1.673469,1.673469,0.914321,1.123448
50%,49424970000.0,10875.642509,5.0,5.0,32.692032,1620.766619,2.196464,2.196464,1.187441,1.499756
75%,84917120000.0,25253.562493,10.75,10.75,48.39526,8923.734614,2.776873,2.767263,2.381298,2.288433
max,99989040000.0,105193.885789,535.0,528.0,94.1347,42347.488441,5.855323,5.855323,7.45396,9.63713


In [23]:
# Export
merchant_fraud_df.to_csv('../data/curated/fraud/input/merchant_fraud_model_building_data.csv', index = False)

consumer_fraud_df.to_csv('../data/curated/fraud/input/consumer_fraud_model_building_data.csv', index = False)

# Aggregate data of all consumers for running fraud detection on (only take consumer data because failed to build model for merchants)

In [24]:
# get all consumer data which will be used in 
full_consumer_daily_df = full_consumer_daily.toPandas()

                                                                                

In [25]:
# create ratio variables after merging fraud set with all user's average historical activity 
full_consumer_fraud_df = full_consumer_daily_df.merge(full_consumer_daily_agg_df, on = 'user_id', how = 'left')

full_consumer_fraud_df['#daily_orders_ratio'] = full_consumer_fraud_df['count']/full_consumer_fraud_df['mean_#daily_orders']
full_consumer_fraud_df['#distinct_merchants_ratio'] = full_consumer_fraud_df['distinct_merchants']/full_consumer_fraud_df['mean_#distinct_merchants']
full_consumer_fraud_df['transact_amount_perOrder_sd_ratio'] = full_consumer_fraud_df['stdev transact']/full_consumer_fraud_df['mean_transact_amount_perOrder_sd']
full_consumer_fraud_df['transact_amount_perOrder_ratio'] = full_consumer_fraud_df['mean transact']/full_consumer_fraud_df['mean_transact_amount_perOrder']

full_consumer_fraud_df = full_consumer_fraud_df.drop(['mean_#daily_orders', 'mean_#distinct_merchants', 'mean_transact_amount_perOrder_sd', 'mean_transact_amount_perOrder'], axis=1)

full_consumer_fraud_df = full_consumer_fraud_df.fillna(0)

full_consumer_fraud_df = full_consumer_fraud_df.rename(columns={'distinct_merchants': 'mean_#distinct_merchants', 'count': 'mean_#daily_orders',\
    'mean transact': 'mean_transact_amount_perOrder', 'stdev transact': 'mean_transact_amount_perOrder_sd'})

Unnamed: 0,user_id,order_datetime,mean_#distinct_merchants,mean_#daily_orders,mean_transact_amount_perOrder,mean_transact_amount_perOrder_sd,#daily_orders_ratio,#distinct_merchants_ratio,transact_amount_perOrder_sd_ratio,transact_amount_perOrder_ratio
0,13898,2021-08-21,2,2,48.024644,33.236330,1.293532,1.295681,0.267929,0.360271
1,16746,2021-08-19,1,1,65.185294,0.000000,0.600649,0.600649,0.000000,0.324533
2,4455,2021-08-14,3,3,211.379209,300.708717,1.971284,1.971284,1.509501,1.261312
3,7195,2021-07-15,1,1,42.044419,0.000000,0.629005,0.630068,0.000000,0.203176
4,13205,2021-08-16,2,2,29.326597,32.856032,1.280000,1.282230,0.197515,0.184522
...,...,...,...,...,...,...,...,...,...,...
8976952,12433,2022-04-06,1,1,36.923377,0.000000,0.648276,0.650519,0.000000,0.209799
8976953,4471,2022-03-04,1,1,11.379185,0.000000,0.617124,0.619125,0.000000,0.062164
8976954,17824,2022-03-30,1,1,4.038201,0.000000,0.638079,0.639175,0.000000,0.027947
8976955,12031,2022-03-10,1,1,23.498661,0.000000,0.619454,0.622642,0.000000,0.146306


In [28]:
full_consumer_fraud_df.to_csv('../data/curated/fraud/input/full_consumer_behaviour_data_for_fraud_prediction.csv')

22/09/18 20:26:41 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 453057 ms exceeds timeout 120000 ms
22/09/18 20:26:41 WARN SparkContext: Killing executors is not supported by current scheduler.
22/09/19 10:41:09 WARN TransportChannelHandler: Exception in connection from /10.13.64.116:61067
java.io.IOException: Operation timed out
	at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
	at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
	at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
	at sun.nio.ch.IOUtil.read(IOUtil.java:192)
	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
	at io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:258)
	at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132)
	at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:151)
	at io.netty.channel.