# Pre-Processing

## Given Data

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import * 
from urllib.request import urlretrieve
from owslib.wfs import WebFeatureService
from dotenv import load_dotenv
import pandas as pd
from pyspark.sql.types import *

# Create a spark session
spark = (
    SparkSession.builder.appName("preprocessing")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .config("spark.driver.memory", "2g")
    .config("spark.executor.memory", "6g")
    .getOrCreate()
)

22/10/15 15:50:27 WARN Utils: Your hostname, AryansLaptop resolves to a loopback address: 127.0.1.1; using 172.29.56.17 instead (on interface eth0)
22/10/15 15:50:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/10/15 15:50:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# Combines all transaction datasets into one

RELATIVE_DIR = "../data/"

transactiondf1 = spark.read.parquet(f"{RELATIVE_DIR}tables/transactions_20210228_20210827_snapshot/")
transactiondf2 = spark.read.parquet(f"{RELATIVE_DIR}tables/transactions_20210828_20220227_snapshot/")
transactiondf3 = spark.read.parquet(f"{RELATIVE_DIR}tables/transactions_20220228_20220828_snapshot/")
transactiondf12 = transactiondf1.union(transactiondf2)
transactiondf = transactiondf12.union(transactiondf3)
transactiondf.limit(5)

                                                                                

user_id,merchant_abn,dollar_value,order_id,order_datetime
18478,62191208634,63.255848959735246,949a63c8-29f7-4ab...,2021-08-20
2,15549624934,130.3505283105634,6a84c3cf-612a-457...,2021-08-20
18479,64403598239,120.15860593212784,b10dcc33-e53f-425...,2021-08-20
3,60956456424,136.6785200286976,0f09c5a5-784e-447...,2021-08-20
18479,94493496784,72.96316578355305,f6c78c1a-4600-4c5...,2021-08-20


In [3]:
# Saves combined transaction dataset for futher analaysis

transactiondf.write.mode("overwrite").parquet(f'{RELATIVE_DIR}curated/transactiondf.parquet')

22/10/15 15:50:38 WARN MemoryManager: Total allocation exceeds 95.00% (1,813,485,955 bytes) of heap memory
Scaling row group sizes to 96.51% for 14 writers
22/10/15 15:50:38 WARN MemoryManager: Total allocation exceeds 95.00% (1,813,485,955 bytes) of heap memory
Scaling row group sizes to 90.08% for 15 writers
22/10/15 15:50:38 WARN MemoryManager: Total allocation exceeds 95.00% (1,813,485,955 bytes) of heap memory
Scaling row group sizes to 84.45% for 16 writers


[Stage 12:>                                                       (0 + 16) / 48]

22/10/15 15:50:40 WARN MemoryManager: Total allocation exceeds 95.00% (1,813,485,955 bytes) of heap memory
Scaling row group sizes to 90.08% for 15 writers
22/10/15 15:50:40 WARN MemoryManager: Total allocation exceeds 95.00% (1,813,485,955 bytes) of heap memory
Scaling row group sizes to 96.51% for 14 writers




22/10/15 15:50:41 WARN MemoryManager: Total allocation exceeds 95.00% (1,813,485,955 bytes) of heap memory
Scaling row group sizes to 96.51% for 14 writers
22/10/15 15:50:41 WARN MemoryManager: Total allocation exceeds 95.00% (1,813,485,955 bytes) of heap memory
Scaling row group sizes to 90.08% for 15 writers
22/10/15 15:50:41 WARN MemoryManager: Total allocation exceeds 95.00% (1,813,485,955 bytes) of heap memory
Scaling row group sizes to 84.45% for 16 writers
22/10/15 15:50:41 WARN MemoryManager: Total allocation exceeds 95.00% (1,813,485,955 bytes) of heap memory
Scaling row group sizes to 90.08% for 15 writers
22/10/15 15:50:41 WARN MemoryManager: Total allocation exceeds 95.00% (1,813,485,955 bytes) of heap memory
Scaling row group sizes to 96.51% for 14 writers




22/10/15 15:50:42 WARN MemoryManager: Total allocation exceeds 95.00% (1,813,485,955 bytes) of heap memory
Scaling row group sizes to 96.51% for 14 writers
22/10/15 15:50:42 WARN MemoryManager: Total allocation exceeds 95.00% (1,813,485,955 bytes) of heap memory
Scaling row group sizes to 90.08% for 15 writers
22/10/15 15:50:42 WARN MemoryManager: Total allocation exceeds 95.00% (1,813,485,955 bytes) of heap memory
Scaling row group sizes to 84.45% for 16 writers
22/10/15 15:50:43 WARN MemoryManager: Total allocation exceeds 95.00% (1,813,485,955 bytes) of heap memory
Scaling row group sizes to 90.08% for 15 writers
22/10/15 15:50:43 WARN MemoryManager: Total allocation exceeds 95.00% (1,813,485,955 bytes) of heap memory
Scaling row group sizes to 96.51% for 14 writers


                                                                                

In [4]:
# Read in and visualize user dataset

userdf = spark.read.parquet(f"{RELATIVE_DIR}tables/consumer_user_details.parquet")
userdf.limit(5)

user_id,consumer_id
1,1195503
2,179208
3,1194530
4,154128
5,712975


In [5]:
# Read in and visualize consumer dataset

consumerdf = spark.read.option("header","true").csv(f"{RELATIVE_DIR}tables/tbl_consumer.csv", sep="|")
consumerdf = consumerdf.withColumnRenamed("name","customer_name")
consumerdf.limit(5)

customer_name,address,state,postcode,gender,consumer_id
Yolanda Williams,413 Haney Gardens...,WA,6935,Female,1195503
Mary Smith,3764 Amber Oval,NSW,2782,Female,179208
Jill Jones MD,40693 Henry Greens,NT,862,Female,1194530
Lindsay Jimenez,00653 Davenport C...,NSW,2780,Female,154128
Rebecca Blanchard,9271 Michael Mano...,WA,6355,Female,712975


In [6]:
# Extract all values from the tags column in the merchant dataframe

merchantdf = spark.read.parquet(f"{RELATIVE_DIR}tables/tbl_merchants.parquet")
merchantdf = merchantdf.withColumnRenamed("name","company_name")

# Replace all square brackets with round brackets
merchantdf = merchantdf.withColumn('tags', regexp_replace('tags', '\\[', '\\('))
merchantdf = merchantdf.withColumn('tags', regexp_replace('tags', '\\]', '\\)'))

# Extract take rate into seperate column
merchantdf = merchantdf.withColumn("take_rate", 
                                   split(col("tags"), "\\),").getItem(2))\
                       .withColumn('take_rate', 
                                   regexp_replace('take_rate', 'take rate: ', 
                                                  ''))\
                       .withColumn('take_rate', 
                                   regexp_replace('take_rate', '\\(', ''))\
                       .withColumn('take_rate', 
                                   regexp_replace('take_rate', '\\)', ''))

# Extract revenue band
merchantdf = merchantdf.withColumn("revenue_band", 
                                   split(col("tags"), "\\),").getItem(1))\
                       .withColumn('revenue_band', 
                                   regexp_replace('revenue_band', '\\(', ''))\
                       .withColumn('revenue_band', 
                                   regexp_replace('revenue_band', '\\)', ''))

# Extract tags band
merchantdf = merchantdf.withColumn("tags", 
                                   split(col("tags"), "\\),").getItem(0))\
                       .withColumn('tags', 
                                   regexp_replace('tags', '\\(', ''))\
                       .withColumn('tags', 
                                   regexp_replace('tags', '\\)', ''))\
                       .withColumn('tags', 
                                   regexp_replace('tags', ' +', ' '))\
                       .withColumn('tags', 
                                   lower('tags'))

merchantdf.limit(5)

company_name,tags,merchant_abn,take_rate,revenue_band
Felis Limited,"furniture, home f...",10023283211,0.18,e
Arcu Ac Orci Corp...,"cable, satellite,...",10142254217,4.22,b
Nunc Sed Company,"jewelry, watch, c...",10165489824,4.4,b
Ultricies Digniss...,"watch, clock, and...",10187291046,3.29,b
Enim Condimentum PC,music shops - mus...,10192359162,6.33,a


In [7]:
# Saves more useful merchant dataframe for later use

merchantdf.write.mode("overwrite").parquet(f'{RELATIVE_DIR}curated/merchantdf.parquet')

In [8]:
# Combines all datasets and checks for loss of rows

# Check no rows dropped when combining transactions with user
print(transactiondf.count(),userdf.count())
mergedf = transactiondf.join(userdf, "user_id")
print(mergedf.count())
print("\n")

# Check no rows dropped when combining with consumer
print(mergedf.count(), consumerdf.count())
mergedf = mergedf.join(consumerdf, "consumer_id")
print(mergedf.count())
print("\n")

# Check no rows dropped when combining with merchant
print(mergedf.count(),merchantdf.count())
mergedf = mergedf.join(merchantdf, "merchant_abn")
print(mergedf.count())

14195505 499999


                                                                                

14195505


14195505 499999
14195505


14195505 4026




13614675


                                                                                

We can see that the number of rows in the dataset has decreased from 14195505 to 13614675. Since this reduction occurred when joining on merchant_abn, this means that the merchantdf for a number of transactions didn't have a record for the specified merchant.

In [9]:
# Saves a merged version of all given datasets for later use

mergedf.write.mode("overwrite").parquet(f'{RELATIVE_DIR}curated/mergedftemp.parquet')

[Stage 73:>                                                       (0 + 16) / 17]

22/10/15 15:51:01 WARN MemoryManager: Total allocation exceeds 95.00% (1,813,485,955 bytes) of heap memory
Scaling row group sizes to 96.51% for 14 writers
22/10/15 15:51:01 WARN MemoryManager: Total allocation exceeds 95.00% (1,813,485,955 bytes) of heap memory
Scaling row group sizes to 90.08% for 15 writers
22/10/15 15:51:01 WARN MemoryManager: Total allocation exceeds 95.00% (1,813,485,955 bytes) of heap memory
Scaling row group sizes to 84.45% for 16 writers
22/10/15 15:51:07 WARN MemoryManager: Total allocation exceeds 95.00% (1,813,485,955 bytes) of heap memory
Scaling row group sizes to 90.08% for 15 writers


[Stage 73:===>                                                    (1 + 16) / 17]

22/10/15 15:51:08 WARN MemoryManager: Total allocation exceeds 95.00% (1,813,485,955 bytes) of heap memory
Scaling row group sizes to 96.51% for 14 writers


                                                                                

## Population Data

Population data is an xlsx file that contains field names 'no..', 'no.', ... that represent age groups (years): '0-4', '5-9', ... , with all groups having a 4 years interval. <br>
When reading the population file, only the 'no.' fields are retrieved and hence, need to assign these fields appropriate names.

In [10]:
# rows to be skipped based on inspection of the file
skip = list(range(7)) + [8] + list(range(2481, 2490)) + [2480]

# auto populate fields to be renamed and all field names
fields_2b_renamed = ['S/T name', 'no.']
field_names = ['S/T name', 'SA2 code', 'SA2 name', 'no.']
AGE_FIELDS_COUNT = 18
for i in range (1, AGE_FIELDS_COUNT+1):
    string = 'no..' + str(i)
    fields_2b_renamed.append(string)
    field_names.append(string)

# auto populate new field names to be assigned
# based on given file's headings for each 'no.' field 
AGE_UB = 85
AGE_RANGE = 4
rename_to = ['State/Terr']
for i in range(0, AGE_UB+1, AGE_RANGE+1):
    col_name = "age "

    if i == AGE_UB:
        col_name += f"{i}+"
        rename_to.append(col_name)
        continue

    col_name += f"{i}-{i+AGE_RANGE}"
    rename_to.append(col_name)

rename_to.append('Total')

rename_cols = dict(zip(fields_2b_renamed, rename_to))

In [11]:
# read the population data and rename fields

pop_df = pd \
    .read_excel(
        f'{RELATIVE_DIR}tables/population.xlsx',
        sheet_name = 'Table 3',
        skiprows = skip,
    ) \
    .get(field_names) \
    .rename(columns = rename_cols)

pop_df

Unnamed: 0,State/Terr,SA2 code,SA2 name,age 0-4,age 5-9,age 10-14,age 15-19,age 20-24,age 25-29,age 30-34,...,age 45-49,age 50-54,age 55-59,age 60-64,age 65-69,age 70-74,age 75-79,age 80-84,age 85+,Total
0,New South Wales,101021007.0,Braidwood,220.0,253.0,237.0,166.0,116.0,175.0,204.0,...,300.0,352.0,362.0,424.0,328.0,293.0,237.0,123.0,82.0,4330.0
1,New South Wales,101021008.0,Karabar,543.0,539.0,575.0,500.0,532.0,642.0,644.0,...,556.0,590.0,580.0,516.0,412.0,320.0,216.0,159.0,90.0,8546.0
2,New South Wales,101021009.0,Queanbeyan,684.0,591.0,477.0,439.0,707.0,1164.0,1258.0,...,696.0,682.0,664.0,574.0,499.0,405.0,278.0,259.0,354.0,11370.0
3,New South Wales,101021010.0,Queanbeyan - East,334.0,254.0,216.0,190.0,351.0,572.0,537.0,...,297.0,363.0,315.0,295.0,208.0,172.0,122.0,67.0,57.0,5093.0
4,New South Wales,101021012.0,Queanbeyan West - Jerrabomberra,870.0,926.0,976.0,934.0,748.0,665.0,853.0,...,1032.0,1114.0,987.0,646.0,442.0,358.0,197.0,126.0,69.0,12743.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2466,Australian Capital Territory,801101139.0,Wright,289.0,274.0,217.0,141.0,295.0,495.0,553.0,...,207.0,171.0,140.0,82.0,69.0,42.0,18.0,8.0,4.0,3806.0
2467,Australian Capital Territory,801101145.0,Molonglo - East,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,3.0,0.0,0.0,0.0,0.0,0.0,0.0,3.0
2468,Australian Capital Territory,801101146.0,Whitlam,3.0,0.0,0.0,0.0,0.0,3.0,2.0,...,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,10.0
2469,Australian Capital Territory,801111140.0,ACT - South West,45.0,58.0,34.0,35.0,29.0,48.0,33.0,...,37.0,31.0,28.0,35.0,15.0,13.0,5.0,0.0,0.0,554.0


Create custom categories for better interpretation:
- old: 60+
- middle age: 35-59
- young adult: 18-34
- adolescent: 10-17
- under 10: 0-9

Note: retiremet age in Australia is 66

In [12]:
# Get all age columns to be dropped

drop_cols = [string for string in pop_df.columns if string[:3] == 'age']

In [13]:
# Sum the age groups population that fall within each custom category

groups = {
    'Under 10': [0, 10],
    'Adolescent': [10, 20],
    'Young adult': [20, 35],
    'Middle age': [35, 60],
    'Old': [60, 86]
}

for group, ages in groups.items():
    age_sum = 0
    for i in range(ages[0], ages[1], 5):

        if i == 85:
            age_range_str = f"age {i}+"
        else:
            age_range_str = f"age {i}-{i+4}"

        age_sum += pop_df[age_range_str]
        
    pop_df[group] = age_sum
 
# Drop all columns containing age
pop_df_mod = pop_df.drop(axis=0, columns=drop_cols)

In [14]:
# Type cast all age fields to integer type

pop_df_mod = pop_df_mod.convert_dtypes()
pop_df_mod.dtypes
pop_df_mod[pop_df_mod.isnull().any(axis=1)]
pop_df_mod = pop_df_mod.dropna()

21 cells feaaturing NA values were dropped here, since they were either totals or rows of entirely null values

Convert population pandas dataframe to spark dataframe for later integration.

In [15]:
# Define the schema for the population dataframe

mySchema = StructType([
    StructField("State/Terr", StringType()),
    StructField("SA2 code", StringType()),
    StructField("SA2 name", StringType()),
    StructField("Total", IntegerType()),
    StructField("Under 10", IntegerType()),
    StructField("Adolescent", IntegerType()),
    StructField("Young adult", IntegerType()),
    StructField("Middle age", IntegerType()),
    StructField("Old", IntegerType())
])

In [16]:
# Create spark dataframe 

pop_sdf = spark.createDataFrame(
    pop_df_mod,
    mySchema
)

pop_sdf.limit(5)

                                                                                

State/Terr,SA2 code,SA2 name,Total,Under 10,Adolescent,Young adult,Middle age,Old
New South Wales,101021007,Braidwood,4330,473,403,495,1472,1487
New South Wales,101021008,Karabar,8546,1082,1075,1818,2858,1713
New South Wales,101021009,Queanbeyan,11370,1275,916,3129,3681,2369
New South Wales,101021010,Queanbeyan - East,5093,588,406,1460,1718,921
New South Wales,101021012,Queanbeyan West -...,12743,1796,1910,2266,4933,1838


In [17]:
# Save new population dataframe

pop_sdf \
    .write \
    .mode("overwrite") \
    .parquet(f'{RELATIVE_DIR}curated/pop_sdf.parquet')

## Postcode Ratio Data

In [18]:
# read the postcode ratio dataset, retrieving only the necessary fields
# and typecasting postcode and sa2_maincode fields to string

skip = list(range(5)) + [6]

postcode_ratio_df = pd \
    .read_excel(
        f'{RELATIVE_DIR}tables/1270055006_CG_POSTCODE_2011_SA2_2011.xls',
        sheet_name = 'Table 3',
        skiprows = skip,
        converters = {'POSTCODE': str, 'SA2_MAINCODE_2011': str}
    ) \
    .drop(columns=['POSTCODE.1', 'PERCENTAGE']) \
    .dropna(axis=0, how='any') # removes footer only (no NA values present in dataset)


In [19]:
postcode_ratio_df

Unnamed: 0,POSTCODE,SA2_MAINCODE_2011,SA2_NAME_2011,RATIO
0,0800,701011002,Darwin City,1.000000
1,0810,701021010,Alawa,0.071997
2,0810,701021013,Brinkin - Nakara,0.096392
3,0810,701021016,Coconut Grove,0.096494
4,0810,701021018,Jingili,0.061562
...,...,...,...,...
5983,7466,604031097,West Coast (Tas.),1.000000
5984,7467,604031097,West Coast (Tas.),1.000000
5985,7468,604031097,West Coast (Tas.),1.000000
5986,7469,604031097,West Coast (Tas.),1.000000


In [20]:
# Convert to spark dataframe

mySchema = StructType([
    StructField("postcode", StringType()),
    StructField("sa2_code", StringType()),
    StructField("sa2_name", StringType()),
    StructField("ratio", FloatType())
])

postcode_ratio_sdf = spark.createDataFrame(
    postcode_ratio_df,
    mySchema
)

postcode_ratio_sdf.limit(5)

postcode,sa2_code,sa2_name,ratio
800,701011002,Darwin City,1.0
810,701021010,Alawa,0.0719971
810,701021013,Brinkin - Nakara,0.0963918
810,701021016,Coconut Grove,0.0964936
810,701021018,Jingili,0.061562


In [21]:
# Save postcode dataframe

postcode_ratio_sdf \
    .write \
    .mode("overwrite") \
    .parquet(f'{RELATIVE_DIR}curated/postcode_ratio_sdf.parquet')

22/10/15 15:51:13 WARN MemoryManager: Total allocation exceeds 95.00% (1,813,485,955 bytes) of heap memory
Scaling row group sizes to 96.51% for 14 writers


----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 36056)
Traceback (most recent call last):
  File "/usr/lib/python3.10/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/lib/python3.10/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/usr/lib/python3.10/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/lib/python3.10/socketserver.py", line 747, in __init__
    self.handle()
  File "/home/ashahi/.local/lib/python3.10/site-packages/pyspark/accumulators.py", line 281, in handle
    poll(accum_updates)
  File "/home/ashahi/.local/lib/python3.10/site-packages/pyspark/accumulators.py", line 253, in poll
    if func():
  File "/home/ashahi/.local/lib/python3.10/site-packages/pyspark/accumulators.py", line 257, in accum_updates
    num_updates = re