# Join customer/merchant/transaction data

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
import pandas as pd
import requests

In [2]:
# Create a spark session
spark = (
    SparkSession.builder.appName("Data Joining")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.driver.memory", "9g") 
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .config("spark.network.timeout", "600s")
    .getOrCreate()
)

24/10/05 04:34:03 WARN Utils: Your hostname, codespaces-c6855a resolves to a loopback address: 127.0.0.1; using 10.0.0.128 instead (on interface eth0)
24/10/05 04:34:03 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/05 04:34:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/10/05 04:34:04 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 49590)
Traceback (most recent call last):
  File "/usr/local/python/3.12.1/lib/python3.12/socketserver.py", line 318, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/local/python/3.12.1/lib/python3.12/socketserver.py", line 349, in process_request
    self.finish_request(request, client_address)
  File "/usr/local/python/3.12.1/lib/python3.12/socketserver.py", line 362, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/local/python/3.12.1/lib/python3.12/socketserver.py", line 761, in __init__
    self.handle()
  File "/usr/local/python/3.12.1/lib/python3.12/site-packages/pyspark/accumulators.py", line 295, in handle
    poll(accum_updates)
  File "/usr/local/python/3.12.1/lib/python3.12/site-packages/pyspark/accumulators.py", line 267, in poll
    if self.rfile in r and func():
                    

In [3]:
# Load in datasets
# Load in merchant data (parquet)
merchant = spark.read.parquet("../data/curated/part_1/clean_merchant.parquet")

# Load in merchant fraud (csv)
merchant_fp = pd.read_csv("../data/tables/part_1/merchant_fraud_probability.csv")
merchant_fp = spark.createDataFrame(merchant_fp)

# Load in consumer list (csv)
consumer_cid = pd.read_csv("../data/tables/part_1/tbl_consumer.csv", delimiter="|")
consumer_cid = spark.createDataFrame(consumer_cid)

# Load in consumer fraud (csv)
consumer_fp = pd.read_csv("../data/tables/part_1/consumer_fraud_probability.csv")
consumer_fp = spark.createDataFrame(consumer_fp)

consumer_ud = spark.read.parquet("../data/tables/part_1/consumer_user_details.parquet")

24/10/05 04:34:21 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


**Join customer data**

In [4]:
# Joining user id to customers
consumer_cid = consumer_cid.withColumn("postcode", consumer_cid.postcode.cast('string')) # cast postcode to string
consumer = consumer_cid.join(consumer_ud, on = "consumer_id", how = 'left')
consumer_list = consumer.select("user_id", "postcode")
consumer.show(5)

**Join customers and transaction data**

In [5]:
# Read transaction dataset
transaction1 = spark.read.parquet("../data/tables/part_2")
transaction2 = spark.read.parquet("../data/tables/part_3")
transaction3 = spark.read.parquet("../data/tables/part_4")

transaction = transaction1.union(transaction2).union(transaction3)

                                                                                

In [6]:
# Join customers to transactions
transaction_consumer = transaction.join(consumer_list, on='user_id', how='left')
transaction_consumer.limit(3)

In [None]:
consumer_no_transaction = consumer_list.join(transaction, on='user_id', how='left_anti')
print(f"Number of consumers that have not made a transaction: {consumer_no_transaction.count():,}")

# Joining customer transaction to merchant 

In [7]:
# Correct for any multiple entries in consumer_fp
consumer_fp = consumer_fp.groupBy('order_datetime', 'user_id').agg(F.max('fraud_probability').alias('fraud_probability'))

# Add consumer fraud to transactions
final_df = transaction_consumer.join(consumer_fp, on=['order_datetime', 'user_id'], how='left').withColumnRenamed('fraud_probability', 'consumer_fraud')
no_fraud = final_df.filter(F.col("consumer_fraud").isNull()).count()
print(f"Number of transactions with no consumer fraud: {no_fraud:,}")

In [8]:
# Correct for any multiple entries in merchant_fp
merchant_fp = merchant_fp.groupBy('order_datetime', 'merchant_abn').agg(F.max('fraud_probability').alias('fraud_probability'))

# Add merchant fraud to transactions by merchant and date
final_df = final_df.join(merchant_fp, on=['merchant_abn','order_datetime'], how = 'left').withColumnRenamed('fraud_probability', 'merchant_fraud')
no_fraud = final_df.filter(F.col("merchant_fraud").isNull()).count()
print(f"Number of transactions with no merchant fraud: {no_fraud:,}")

In [9]:
# Impute all null fraud probabilities as 0
final_df = final_df.fillna(0, subset=['merchant_fraud', 'consumer_fraud'])
no_fraud = final_df.filter((final_df["consumer_fraud"]==0) & (final_df["merchant_fraud"]==0)).count()
print(f"Number of transactions with no merchant fraud or consumer fraud: {no_fraud:,}")

In [10]:
# Save transaction-merchant-consumer data to file
final_df.write.mode('overwrite').parquet('../data/curated/fraud_watch/')

24/10/05 04:35:43 WARN TaskSetManager: Stage 12 contains a task of very large size (6923 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

# Join external datasets
Here, we estimate weekly disposible income based on the difference between total_personal_income and the average spent on rent or morgage repayments per week. The calculation uses weekly variables as follows.
$$\text{weekly disposible income} = \text{total personal income} - (\text{median rent} \times \text{proportion of renters}) - (\text{median morgage repayment} \times \text{proportion of mortgage holders})$$

In [11]:
# Download housing data
file_path = '../data/tables/sa2_dataset/main/C21_G37_SA2.csv'
url = "https://api.data.abs.gov.au/data/C21_G37_SA2/1+2+R_T+_T...SA2..2021.?detail=full"
headers = {'accept': 'text/csv'}

# Request data
response = requests.get(url, headers=headers, stream=True)

with open(file_path, 'wb') as file:
    for chunk in response.iter_content(chunk_size=8192):
        if chunk:
            file.write(chunk)

# Read in data and cast types
data = pd.read_csv(file_path, dtype={"REGION": object})

variables = {"R_T":'renting',
             "2":'owned_mortgage',
             "1": 'owned_outright',
             "_T":'total_responses',
             "REGION": 'sa2_code'}

# Aggregate to ignore the 'dwelling type' feature
tenure_data = data.groupby(['REGION', 'TENLLD']).agg('sum').reset_index()[['REGION', 'TENLLD', 'OBS_VALUE']]
tenure_data = tenure_data.pivot(index='REGION', columns='TENLLD', values='OBS_VALUE').reset_index()
tenure_data = tenure_data.rename(variables, axis=1)
tenure_data.head(5)

TENLLD,sa2_code,owned_outright,owned_mortgage,renting,total_responses
0,101021007,1707,1092,497,3438
1,101021008,1789,2469,1756,6150
2,101021009,2220,3098,4238,9842
3,101021010,1040,1426,1973,4549
4,101021012,2455,4055,1972,8608


In [12]:
# Apply calculations
tenure_data['percent_mortgage'] = tenure_data['owned_mortgage'] / tenure_data['total_responses']
tenure_data['percent_rent'] = tenure_data['renting'] / tenure_data['total_responses']

# Investigate number of records with missing data
zero_responses = tenure_data[tenure_data.isna().any(axis=1)]
print('Number of SA2 zones with no data from ABS: ', len(zero_responses))
zero_responses.head(5)

# Handle missing null values by setting to zero
percentage_tenure = tenure_data.fillna(0, axis=1).iloc[:,[0, -1, -2]]

Number of SA2 zones with no data from ABS:  67


In [13]:
# Read in dataset with median statistics
variables = {1: "median_age", 
             2: "median_total_personal_income",
             3: "median_total_family_income",
             4: "median_total_household_income",
             5: "median_mortgage_repayment",
             6: "median_rent",
             7: "avg_people_per_bedroom",
             8: "avg_household_size"}

# Read in data
medians = pd.read_csv("../data/curated/sa2_dataset/C21_G02_SA2_clean.csv")

# Restructure table
medians = medians.pivot(index='sa2_code', columns=['type_of_value_code'], values='obs_value').reset_index().rename(columns=variables)
medians.columns.name = None
medians['sa2_code'] = medians.sa2_code.astype(str)
medians.head(3)

Unnamed: 0,sa2_code,median_age,median_total_personal_income,median_total_family_income,median_total_household_income,median_mortgage_repayment,median_rent,avg_people_per_bedroom,avg_household_size
0,101021007,51.0,760.0,1886.0,1429.0,1732.0,330.0,0.8,2.2
1,101021008,38.0,975.0,2334.0,1989.0,1950.0,350.0,0.8,2.6
2,101021009,37.0,996.0,2233.0,1703.0,1700.0,330.0,0.9,2.1


Before joining housing data to this table of medians, we investigate the records and notice that there are records that have null median summaries.

In [14]:
# Read in list of SA2 codes and associated names
col_types = {"POSTCODE": str, "SA2_CODE_2021":str, "RATIO_FROM_TO": float}
sa2_names = pd.read_excel("../data/tables/correspondence/CG_POSTCODE_2021_SA2_2021.xlsx", converters=col_types)[['SA2_CODE_2021', 'SA2_NAME_2021', 'POSTCODE']]

# Find records with null columns
null_regions = medians[medians.isna().any(axis=1)]
null_regions = null_regions.merge(sa2_names, left_on='sa2_code', right_on='SA2_CODE_2021')

# Show the name of regions associated with null values 
null_regions.iloc[:,-2:]

Unnamed: 0,SA2_NAME_2021,POSTCODE
0,No usual address (NSW),
1,No usual address (Vic.),
2,No usual address (Qld),
3,No usual address (SA),
4,No usual address (WA),
5,No usual address (Tas.),
6,No usual address (NT),
7,No usual address (ACT),
8,No usual address (OT),


We see that the external dataset is only missing values for SA2 regions marked as having "no usual address", and this is ok since they are special purpose codes that don't correspond with a SA2 zone.

In [15]:
WEEKS_IN_MONTH = 4.345

# Add engineered housing features to ABS demographic data
abs_df = medians.merge(percentage_tenure, on='sa2_code')

# Calculate average weekly spending on housing per SA2 zone
# note: monthly mortgage repayment converted to weekly by dividing by # weeks in a month
abs_df['avg_housing_weekly'] = abs_df.median_rent*abs_df.percent_rent + abs_df.median_mortgage_repayment*(abs_df.percent_mortgage/WEEKS_IN_MONTH)
abs_df['weekly_personal_disposable'] = abs_df.median_total_personal_income - abs_df.avg_housing_weekly

# Associate postcodes to SA2 data using "correspondence.parquet"
sa2_names = pd.read_parquet('../data/curated/correspondence.parquet')
abs_df = sa2_names.merge(abs_df, on='sa2_code', how='left')

# Remove unecessary columns
abs_df = abs_df.drop(['median_mortgage_repayment', 'median_rent', 'percent_rent', 
                      'percent_mortgage', 'avg_housing_weekly', 
                      'median_total_personal_income', 'avg_people_per_bedroom'], axis=1)
abs_df.head(4)

Unnamed: 0,postcode,sa2_code,sa2_name,median_age,median_total_family_income,median_total_household_income,avg_household_size,weekly_personal_disposable
0,800,701011002,Darwin City,33.0,2403.0,2151.0,2.0,836.772739
1,810,701021021,Lyons (NT),30.0,2981.0,2965.0,3.3,980.881015
2,812,701021019,Karama,35.0,2021.0,1783.0,2.9,547.723664
3,820,701011008,Stuart Park,34.0,2682.0,2278.0,2.3,890.395423


In [16]:
# Save to file
abs_df.to_parquet('../data/curated/sa2_dataset/abs_medians.parquet')

# Join ABS and customer data

In [17]:
# Read in abs demographic data and customer/transaction data
abs_df = spark.read.parquet("../data/curated/sa2_dataset/abs_medians.parquet")

try:
    final_df = spark.read.parquet("../data/curated/fraud_watch")
except:
    final_df = final_df

# Join ABS data to transactions records
customer_details_abs = final_df.join(abs_df, on='postcode', how='left')

# Check NULL values for transaction data
customer_details_abs.summary('count').toPandas().to_dict(orient='records')[0]

                                                                                

{'summary': 'count',
 'postcode': '14195505',
 'merchant_abn': '14195505',
 'user_id': '14195505',
 'dollar_value': '14195505',
 'order_id': '14195505',
 'consumer_fraud': '14195505',
 'merchant_fraud': '14195505',
 'sa2_code': '11715485',
 'sa2_name': '11715485',
 'median_age': '11715485',
 'median_total_family_income': '11715485',
 'median_total_household_income': '11715485',
 'avg_household_size': '11715485',
 'weekly_personal_disposable': '11715485'}

We see that there are some customer records that are to locations that have no population data. Many of these postcodes are for PO boxes and LVR addresses. For this, we create another field `is_po_box` to help filter out transactions to users delivering to special postcodes.

The ranges for LVRs and PO Boxes were sourced from [Matthew Procter's Australian Postcodes website](https://www.matthewproctor.com/australian_postcodes#downloadlinks).

In [18]:
# Deine PO/LVR range
po_box_codes = [("1000", "1999"), ("0200", "0299"), ("8000", "8999"),
                ("9000", "9999"), ("5800", "5999"), ("6800", "6999"),
                ("7800", "7999"), ("0900", "0999")]

# Determine if postcode is in a PO box/LVR range
po_box_condition = F.lit(False)
for lower, upper in po_box_codes:
    po_box_condition = po_box_condition | ((F.col("postcode") >= lower) & (F.col("postcode") <= upper))

# Apply condition
customer_details_abs = customer_details_abs.withColumn('is_po_box', po_box_condition)

# Confirm that transactions with no associated ABS data delivers to a PO box
customer_details_abs.filter("sa2_code IS NOT NULL AND is_po_box IS NULL").count()

0

In [19]:
# Specify column order to see features better
col_order = ["order_id", "user_id","merchant_abn", "order_datetime", "dollar_value", 
              "postcode", "merchant_fraud", "consumer_fraud","weekly_personal_disposable",
              "median_total_household_income", "median_total_family_income"]

# Save changes to "all_details"
customer_details_abs.select(*col_order, *(set(customer_details_abs.columns) ^ set(col_order)))\
    .write.mode('overwrite').parquet('../data/curated/all_details/')

                                                                                

In [20]:
temp = spark.read.parquet('../data/curated/all_details/')

# view sample of a random user's transaction history
temp.filter(F.col('user_id')==22957).orderBy('dollar_value', ascending=False).limit(5)

order_id,user_id,merchant_abn,order_datetime,dollar_value,postcode,merchant_fraud,consumer_fraud,weekly_personal_disposable,median_total_household_income,median_total_family_income,avg_household_size,sa2_code,sa2_name,is_po_box,median_age
45030f06-09fc-4f3...,22957,89109402284,2022-02-12,43415.16340467725,3178,29.07408314204997,82.79065699075498,559.9299353115772,2083.0,2305.0,2.8,211011256,Rowville - Central,False,41.0
57ed6f9a-1b54-466...,22957,75454398468,2022-03-01,3052.23803054996,3178,0.0,0.0,559.9299353115772,2083.0,2305.0,2.8,211011256,Rowville - Central,False,41.0
aea838f0-5f39-487...,22957,24015576448,2022-06-08,2976.0698327959976,3178,0.0,0.0,559.9299353115772,2083.0,2305.0,2.8,211011256,Rowville - Central,False,41.0
4616274a-a016-43e...,22957,11848576000,2022-10-11,2832.197778546993,3178,0.0,0.0,559.9299353115772,2083.0,2305.0,2.8,211011256,Rowville - Central,False,41.0
74038823-d581-4e6...,22957,35906534450,2022-04-16,2135.9475370548244,3178,0.0,0.0,559.9299353115772,2083.0,2305.0,2.8,211011256,Rowville - Central,False,41.0
