# Project 2 - Group 13

## 0 - Begin Spark

In [1]:
from pyspark.sql import SparkSession

# Create a spark session (which will run spark jobs)
spark = (
    SparkSession.builder.appName("MAST30034 Project 2")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    #.config("spark.executor.memory", "2g")
    #.config("spark.driver.memory", "2g")
    .getOrCreate()
)

24/09/03 00:39:49 WARN Utils: Your hostname, MacBook.local resolves to a loopback address: 127.0.0.1; using 192.168.0.12 instead (on interface en0)
24/09/03 00:39:49 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/03 00:39:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## 1 - Data cleaning

### 1.1 - Consumer fraud probability

In [2]:
consumer_fraud_probability_df = spark.read.csv('../data/tables/tables/consumer_fraud_probability.csv', header = True, inferSchema = True)
consumer_fraud_probability_df = consumer_fraud_probability_df.withColumnRenamed('order_datetime', 'consumer_fraud_order_datetime')
consumer_fraud_probability_df = consumer_fraud_probability_df.withColumnRenamed('fraud_probability', 'consumer_fraud_probability')
print(f'Number of entires = {consumer_fraud_probability_df.count()}')
consumer_fraud_probability_df.limit(5)

                                                                                

Number of entires = 34864


24/09/03 00:40:06 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


user_id,consumer_fraud_order_datetime,consumer_fraud_probability
6228,2021-12-19,97.6298077657765
21419,2021-12-10,99.24738020302328
5606,2021-10-17,84.05825045251777
3101,2021-04-17,91.42192091901347
22239,2021-10-19,94.70342477508036


In [3]:
consumer_fraud_probability_df.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- consumer_fraud_order_datetime: date (nullable = true)
 |-- consumer_fraud_probability: double (nullable = true)



### 1.2 - Consumer user details

In [4]:
consumer_user_details_df = spark.read.parquet('../data/tables/tables/consumer_user_details.parquet', inferSchema = True)
print(f'Number of entires = {consumer_user_details_df.count()}')
consumer_user_details_df.limit(5)

                                                                                

Number of entires = 499999


user_id,consumer_id
1,1195503
2,179208
3,1194530
4,154128
5,712975


In [5]:
consumer_user_details_df.printSchema()

root
 |-- user_id: long (nullable = true)
 |-- consumer_id: long (nullable = true)



### 1.3 - Merchant fraud probability

In [6]:
merchant_fraud_probability_df = spark.read.csv('../data/tables/tables/merchant_fraud_probability.csv', header = True, inferSchema = True)
merchant_fraud_probability_df = merchant_fraud_probability_df.withColumnRenamed('order_datetime', 'merchant_fraud_order_datetime')
merchant_fraud_probability_df = merchant_fraud_probability_df.withColumnRenamed('fraud_probability', 'merchant_fraud_probability')
print(f'Number of entires = {merchant_fraud_probability_df.count()}')
merchant_fraud_probability_df.limit(5)

Number of entires = 114


merchant_abn,merchant_fraud_order_datetime,merchant_fraud_probability
19492220327,2021-11-28,44.40365864749536
31334588839,2021-10-02,42.75530083865367
19492220327,2021-12-22,38.867790051131095
82999039227,2021-12-19,94.1347004808891
90918180829,2021-09-02,43.32551731714902


In [7]:
merchant_fraud_probability_df.printSchema()

root
 |-- merchant_abn: long (nullable = true)
 |-- merchant_fraud_order_datetime: date (nullable = true)
 |-- merchant_fraud_probability: double (nullable = true)



### 1.4 - TBL Consumer

In [8]:
tbl_consumer_df = spark.read.option("delimiter", '|').csv('../data/tables/tables/tbl_consumer.csv', header = True, inferSchema = True)
print(f'Number of entires = {tbl_consumer_df.count()}')
tbl_consumer_df.limit(5)

                                                                                

Number of entires = 499999


name,address,state,postcode,gender,consumer_id
Yolanda Williams,413 Haney Gardens...,WA,6935,Female,1195503
Mary Smith,3764 Amber Oval,NSW,2782,Female,179208
Jill Jones MD,40693 Henry Greens,NT,862,Female,1194530
Lindsay Jimenez,00653 Davenport C...,NSW,2780,Female,154128
Rebecca Blanchard,9271 Michael Mano...,WA,6355,Female,712975


In [9]:
tbl_consumer_df.printSchema()

root
 |-- name: string (nullable = true)
 |-- address: string (nullable = true)
 |-- state: string (nullable = true)
 |-- postcode: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- consumer_id: integer (nullable = true)



### 1.5 - TBL merchants

In [10]:
tbl_merchants_df = spark.read.parquet('../data/tables/tables/tbl_merchants.parquet', inferSchema = True)
tbl_merchants_df = tbl_merchants_df.withColumnRenamed('name', 'merchant_name')
print(f'Number of entires = {tbl_merchants_df.count()}')
tbl_merchants_df.limit(5)

Number of entires = 4026


merchant_name,tags,merchant_abn
Felis Limited,"((furniture, home...",10023283211
Arcu Ac Orci Corp...,"([cable, satellit...",10142254217
Nunc Sed Company,"([jewelry, watch,...",10165489824
Ultricies Digniss...,"([wAtch, clock, a...",10187291046
Enim Condimentum PC,([music shops - m...,10192359162


In [11]:
tbl_merchants_df.printSchema()

root
 |-- merchant_name: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- merchant_abn: long (nullable = true)



### 1.6 - Transactions

#### 1.6.1 - Tables 2

In [12]:
tables_2_df = spark.read.parquet('../data/tables/tables 2/transactions_20210228_20210827_snapshot', inferSchema = True)
print(f'Number of entires = {tables_2_df.count()}')
tables_2_df.limit(5)

                                                                                

Number of entires = 3643266


user_id,merchant_abn,dollar_value,order_id,order_datetime
18478,62191208634,63.255848959735246,949a63c8-29f7-4ab...,2021-08-20
2,15549624934,130.3505283105634,6a84c3cf-612a-457...,2021-08-20
18479,64403598239,120.15860593212784,b10dcc33-e53f-425...,2021-08-20
3,60956456424,136.6785200286976,0f09c5a5-784e-447...,2021-08-20
18479,94493496784,72.96316578355305,f6c78c1a-4600-4c5...,2021-08-20


In [13]:
tables_3_df = spark.read.parquet('../data/tables/tables 2/transactions_20220228_20220828_snapshot', inferSchema = True)
print(f'Number of entires = {tables_3_df.count()}')
tables_3_df.limit(5)

                                                                                

Number of entires = 6044133


user_id,merchant_abn,dollar_value,order_id,order_datetime
11139,96152467973,16.213590228273233,785b0080-9e4b-471...,2022-08-20
1,98973094975,86.97955945703498,2560f7b0-ee5d-4b3...,2022-08-20
11139,56762458844,31.513502323509197,0311717b-8b5b-410...,2022-08-20
1,89502033586,124.18468694868491,f8891626-f098-45b...,2022-08-20
11139,96161808980,61.620445567668966,d90a421f-f1da-4bf...,2022-08-20


In [14]:
tables_4_df = spark.read.parquet('../data/tables/tables 2/transactions_20210828_20220227_snapshot', inferSchema = True)
print(f'Number of entires = {tables_4_df.count()}')
tables_4_df.limit(5)

                                                                                

Number of entires = 4508106


user_id,merchant_abn,dollar_value,order_id,order_datetime
14935,79417999332,136.06570809815838,23acbb7b-cf98-458...,2021-11-26
1,46451548968,72.61581642788431,76bab304-fa2d-400...,2021-11-26
14936,89518629617,3.0783487174439297,a2ae446a-2959-41c...,2021-11-26
1,49167531725,51.58228625503599,7080c274-17f7-4cc...,2021-11-26
14936,31101120643,25.2281149424178,8e301c0f-06ab-45c...,2021-11-26


In [47]:
transactions_df = tables_2_df.union(tables_3_df).union(tables_4_df)
print(f'Number of entires = {transactions_df.count()}')
transactions_df.limit(20)

                                                                                

Number of entires = 14195505


user_id,merchant_abn,dollar_value,order_id,order_datetime
18478,62191208634,63.255848959735246,949a63c8-29f7-4ab...,2021-08-20
2,15549624934,130.3505283105634,6a84c3cf-612a-457...,2021-08-20
18479,64403598239,120.15860593212784,b10dcc33-e53f-425...,2021-08-20
3,60956456424,136.6785200286976,0f09c5a5-784e-447...,2021-08-20
18479,94493496784,72.96316578355305,f6c78c1a-4600-4c5...,2021-08-20
3,76819856970,448.529684285612,5ace6a24-cdf0-4aa...,2021-08-20
18479,67609108741,86.4040605836911,d0e180f0-cb06-42a...,2021-08-20
3,34096466752,301.5793450525113,6fb1ff48-24bb-4f9...,2021-08-20
18482,70501974849,68.75486276223054,8505fb33-b69a-412...,2021-08-20
4,49891706470,48.89796461900801,ed11e477-b09f-4ae...,2021-08-20


In [16]:
transactions_df.printSchema()

root
 |-- user_id: long (nullable = true)
 |-- merchant_abn: long (nullable = true)
 |-- dollar_value: double (nullable = true)
 |-- order_id: string (nullable = true)
 |-- order_datetime: date (nullable = true)



### 1.7 - Joining Datasets

In [23]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Join tbl_merchant based on ABN
transactions_df = transactions_df.join(F.broadcast(tbl_merchants_df),
                                       transactions_df.merchant_abn == tbl_merchants_df.merchant_abn).drop(tbl_merchants_df.merchant_abn)

# Join fraud probability based on ABN and date time to ensure correct fraud probability
joined_df = transactions_df.join(F.broadcast(
    merchant_fraud_probability_df),
    (transactions_df['merchant_abn'] == merchant_fraud_probability_df['merchant_abn']) &
    (transactions_df['order_datetime'] >= merchant_fraud_probability_df['merchant_fraud_order_datetime']),
    how='inner'
).drop(merchant_fraud_probability_df.merchant_abn)

# Use Window function to rank based on merchant_date, keeping the most recent one per order
window_spec = Window.partitionBy("user_id", "order_datetime", "merchant_abn").orderBy(F.col("merchant_fraud_order_datetime").desc())

ranked_df = joined_df.withColumn("rank", F.row_number().over(window_spec))

# Filter to keep only the most recent valid merchant_date
transactions_df = ranked_df.filter(F.col("rank") == 1).drop("rank")

# Join fraud probability based on user id and datetime
joined_df = transactions_df.join(F.broadcast(
    consumer_fraud_probability_df),
    (transactions_df['user_id'] == consumer_fraud_probability_df['user_id']) &
    (transactions_df['order_datetime'] >= consumer_fraud_probability_df['consumer_fraud_order_datetime']),
    how='inner'
).drop(consumer_fraud_probability_df.user_id)

# Use Window function to rank based on merchant_date, keeping the most recent one per order
window_spec = Window.partitionBy("user_id", "order_datetime").orderBy(F.col("consumer_fraud_order_datetime").desc())

ranked_df = joined_df.withColumn("rank", F.row_number().over(window_spec))

# Filter to keep only the most recent valid merchant_date
transactions_df = ranked_df.filter(F.col("rank") == 1).drop("rank")

# Join Consumer Details based on user id
transactions_df = transactions_df.join(F.broadcast(consumer_user_details_df),
                                       transactions_df.user_id == consumer_user_details_df.user_id).drop(consumer_user_details_df.user_id)

# Join Consumer Details based on user id
transactions_df = transactions_df.join(F.broadcast(tbl_consumer_df),
                                       transactions_df.consumer_id == tbl_consumer_df.consumer_id).drop(tbl_consumer_df.consumer_id)

print(f'Number of entires = {transactions_df.count()}')
transactions_df.limit(5)

                                                                                

Number of entires = 271038


                                                                                

user_id,merchant_abn,dollar_value,order_id,order_datetime,merchant_name,tags,merchant_fraud_order_datetime,merchant_fraud_probability,consumer_fraud_order_datetime,consumer_fraud_probability,consumer_id,name,address,state,postcode,gender
1,90568944804,811.416129781905,a43c190e-2cce-431...,2022-02-24,Diam Eu Dolor LLC,[(tent and awNing...,2021-11-29,33.172494688960434,2022-02-20,9.80543113652096,1195503,Yolanda Williams,413 Haney Gardens...,WA,6935,Female
1,94493496784,99.74630947804766,bd6a0f3b-3eb7-494...,2022-05-09,Dictum Phasellus ...,"[(gift, card, nov...",2021-11-26,30.57903215900633,2022-02-20,9.80543113652096,1195503,Yolanda Williams,413 Haney Gardens...,WA,6935,Female
1,21439773999,16.037987400475743,2bd9d9c7-8628-45e...,2022-07-12,Mauris Non Institute,"([cable, satellit...",2021-11-26,28.504479048104585,2022-02-20,9.80543113652096,1195503,Yolanda Williams,413 Haney Gardens...,WA,6935,Female
1,94493496784,173.32599490591667,1d0805b4-d4ca-451...,2022-10-15,Dictum Phasellus ...,"[(gift, card, nov...",2021-11-26,30.57903215900633,2022-02-20,9.80543113652096,1195503,Yolanda Williams,413 Haney Gardens...,WA,6935,Female
3,27093785141,162.0698770696245,8db1d586-8111-4b5...,2022-05-21,Placerat Orci Ins...,"[[stationery, off...",2021-11-29,29.520113582407653,2021-11-03,8.300636455314633,1194530,Jill Jones MD,40693 Henry Greens,NT,862,Female


### 1.7 - (RETRY) Joining Datasets

In [42]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

```
DATASET JOINING:
Define datasets as:
    1. consumer_fraud_probability_df
       [user_id, consumer_fraud_order_datetime, consumer_fraud_probability]
    2. consumer_user_details_df
       [user_id, consumer_id]
    3. merchant_fraud_probability_df
       [merchant_abn, merchant_fraud_order_datetime, merchant_fraud_probability]
    4. tbl_consumer_df
       [name, address, state, postcode, gender, consumer_id]
    5. tbl_merchant
       [merchant_name, tags, merchant_abn]
    6. transactions_df
       [user_id, merchant_abn, dollar_value, order_id, order_datetime]

Joining steps:
2 + 4 ON consumer_id -> [A]
6 + 5 ON merchant_abn -> [B]
[A] + [B] ON user_id -> [C]

For fraud dfs:
IF: consumer OR merchant has fraud on same day, tag on dataset 1 or 2
ELSE: leave fields merchant_fraud_datetime, merchant_fraud_probability, consumer… as NULL

NOTES:
Customers' and Merchants' (although doesn't appear to occur for merchants) fraud orders, will only be counted for orders on the same day.
Therefore, customers and merchants may only be 'flagged' as fraud ONCE from the merged transactions.
(Good or bad depending on how we decide our metric)
```

#### 1.7.1 - Consumer User Details + TBL Consumer

In [44]:
# merge consumer_user_details and tbl_consumer ON consumer_id
merge_df_A = consumer_user_details_df.join(F.broadcast(tbl_consumer_df),
                                           consumer_user_details_df.consumer_id == tbl_consumer_df.consumer_id,
                                           how = 'inner'
                                           ).drop(tbl_consumer_df.consumer_id)

print(f'Number of entires = {merge_df_A.count()}')
merge_df_A.limit(5)

                                                                                

Number of entires = 499999


                                                                                

user_id,consumer_id,name,address,state,postcode,gender
1,1195503,Yolanda Williams,413 Haney Gardens...,WA,6935,Female
2,179208,Mary Smith,3764 Amber Oval,NSW,2782,Female
3,1194530,Jill Jones MD,40693 Henry Greens,NT,862,Female
4,154128,Lindsay Jimenez,00653 Davenport C...,NSW,2780,Female
5,712975,Rebecca Blanchard,9271 Michael Mano...,WA,6355,Female


#### 1.7.2 - Transactions + TBL Merchants

MAY need to consider LEFT JOIN for orders unassociated with a merchant_abn we know.

In [48]:
# merge transactions and tbl_merchants ON merchant_abn
merge_df_B = transactions_df.join(F.broadcast(tbl_merchants_df),
                                  transactions_df.merchant_abn == tbl_merchants_df.merchant_abn,
                                  how = 'inner'
                                  ).drop(tbl_merchants_df.merchant_abn)

print(f'Number of entires = {merge_df_B.count()}')
merge_df_B.limit(5)

                                                                                

Number of entires = 13614675


user_id,merchant_abn,dollar_value,order_id,order_datetime,merchant_name,tags
18478,62191208634,63.255848959735246,949a63c8-29f7-4ab...,2021-08-20,Cursus Non Egesta...,"[(furniture, home..."
2,15549624934,130.3505283105634,6a84c3cf-612a-457...,2021-08-20,Commodo Associates,"[(opticians, optI..."
18479,64403598239,120.15860593212784,b10dcc33-e53f-425...,2021-08-20,Lobortis Ultrices...,((music shops - m...
3,60956456424,136.6785200286976,0f09c5a5-784e-447...,2021-08-20,Ultricies Digniss...,"([gift, card, Nov..."
18479,94493496784,72.96316578355305,f6c78c1a-4600-4c5...,2021-08-20,Dictum Phasellus ...,"[(gift, card, nov..."


#### 1.7.3 - mergeA + mergeB

In [50]:
# merge mergeA and mergeB ON user_id
merge_df_C = merge_df_B.join(F.broadcast(merge_df_A),
                             merge_df_B.user_id == merge_df_A.user_id,
                             how = 'inner'
                             ).drop(merge_df_A.user_id)

print(f'Number of entires = {merge_df_C.count()}')
merge_df_C.limit(5)

                                                                                

Number of entires = 13614675


                                                                                

user_id,merchant_abn,dollar_value,order_id,order_datetime,merchant_name,tags,consumer_id,name,address,state,postcode,gender
18478,62191208634,63.255848959735246,949a63c8-29f7-4ab...,2021-08-20,Cursus Non Egesta...,"[(furniture, home...",651338,James Smith,27393 Wiley Lane ...,TAS,7001,Male
2,15549624934,130.3505283105634,6a84c3cf-612a-457...,2021-08-20,Commodo Associates,"[(opticians, optI...",179208,Mary Smith,3764 Amber Oval,NSW,2782,Female
18479,64403598239,120.15860593212784,b10dcc33-e53f-425...,2021-08-20,Lobortis Ultrices...,((music shops - m...,467663,Sandra Schultz,86171 Coleman Sta...,TAS,7010,Female
3,60956456424,136.6785200286976,0f09c5a5-784e-447...,2021-08-20,Ultricies Digniss...,"([gift, card, Nov...",1194530,Jill Jones MD,40693 Henry Greens,NT,862,Female
18479,94493496784,72.96316578355305,f6c78c1a-4600-4c5...,2021-08-20,Dictum Phasellus ...,"[(gift, card, nov...",467663,Sandra Schultz,86171 Coleman Sta...,TAS,7010,Female


#### 1.7.4 - mergeC + Fraud

Includes frauds on the same day -> larger number of entries than mergeC.

In [55]:
# merge mergeC and consumer_fraud ON user_id and datetime
merge_df_D = merge_df_C.join(F.broadcast(consumer_fraud_probability_df),
                             (merge_df_C.user_id == consumer_fraud_probability_df.user_id) &
                             (merge_df_C.order_datetime == consumer_fraud_probability_df.consumer_fraud_order_datetime),
                             how = 'left'
                             ).drop(consumer_fraud_probability_df.user_id)

print(f'Number of entires = {merge_df_D.count()}')
merge_df_D.limit(5)

                                                                                

Number of entires = 13614854


                                                                                

user_id,merchant_abn,dollar_value,order_id,order_datetime,merchant_name,tags,consumer_id,name,address,state,postcode,gender,consumer_fraud_order_datetime,consumer_fraud_probability
18478,62191208634,63.255848959735246,949a63c8-29f7-4ab...,2021-08-20,Cursus Non Egesta...,"[(furniture, home...",651338,James Smith,27393 Wiley Lane ...,TAS,7001,Male,,
2,15549624934,130.3505283105634,6a84c3cf-612a-457...,2021-08-20,Commodo Associates,"[(opticians, optI...",179208,Mary Smith,3764 Amber Oval,NSW,2782,Female,,
18479,64403598239,120.15860593212784,b10dcc33-e53f-425...,2021-08-20,Lobortis Ultrices...,((music shops - m...,467663,Sandra Schultz,86171 Coleman Sta...,TAS,7010,Female,,
3,60956456424,136.6785200286976,0f09c5a5-784e-447...,2021-08-20,Ultricies Digniss...,"([gift, card, Nov...",1194530,Jill Jones MD,40693 Henry Greens,NT,862,Female,,
18479,94493496784,72.96316578355305,f6c78c1a-4600-4c5...,2021-08-20,Dictum Phasellus ...,"[(gift, card, nov...",467663,Sandra Schultz,86171 Coleman Sta...,TAS,7010,Female,,


In [56]:
# merge mergeD and merchant_fraud ON merchant_abn and datetime
merge_df_E = merge_df_D.join(F.broadcast(merchant_fraud_probability_df),
                             (merge_df_D.merchant_abn == merchant_fraud_probability_df.merchant_abn) &
                             (merge_df_D.order_datetime == merchant_fraud_probability_df.merchant_fraud_order_datetime),
                             how = 'left'
                             ).drop(merchant_fraud_probability_df.merchant_abn)

print(f'Number of entires = {merge_df_E.count()}')
merge_df_E.limit(5)

                                                                                

Number of entires = 13614854


                                                                                

user_id,merchant_abn,dollar_value,order_id,order_datetime,merchant_name,tags,consumer_id,name,address,state,postcode,gender,consumer_fraud_order_datetime,consumer_fraud_probability,merchant_fraud_order_datetime,merchant_fraud_probability
18478,62191208634,63.255848959735246,949a63c8-29f7-4ab...,2021-08-20,Cursus Non Egesta...,"[(furniture, home...",651338,James Smith,27393 Wiley Lane ...,TAS,7001,Male,,,,
2,15549624934,130.3505283105634,6a84c3cf-612a-457...,2021-08-20,Commodo Associates,"[(opticians, optI...",179208,Mary Smith,3764 Amber Oval,NSW,2782,Female,,,,
18479,64403598239,120.15860593212784,b10dcc33-e53f-425...,2021-08-20,Lobortis Ultrices...,((music shops - m...,467663,Sandra Schultz,86171 Coleman Sta...,TAS,7010,Female,,,,
3,60956456424,136.6785200286976,0f09c5a5-784e-447...,2021-08-20,Ultricies Digniss...,"([gift, card, Nov...",1194530,Jill Jones MD,40693 Henry Greens,NT,862,Female,,,,
18479,94493496784,72.96316578355305,f6c78c1a-4600-4c5...,2021-08-20,Dictum Phasellus ...,"[(gift, card, nov...",467663,Sandra Schultz,86171 Coleman Sta...,TAS,7010,Female,,,,


#### 1.7.5 - Saving MERGE

In [57]:
merge_df_E.write.mode("overwrite").parquet("../data/merge_df_E.parquet")

                                                                                