# Preliminary Analysis

This notebook involves the preliminary analysis and preprocessing on all datasets.

---

BNPL Data timeline: 2021-2-28 to 2022-10-26

In [25]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import* 
from pyspark.sql.functions import regexp_replace, col, trim, split
import pandas as pd
import geopandas as gpd

In [26]:
spark = (
    SparkSession.builder.appName("Preliminary Analysis")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.driver.memory","4G")
    .config("spark.executor.memory","4G")
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .getOrCreate()
)

# BNPL dataset

## Table 1

### Merchant datasets

In [27]:
merchant_fraud = spark.read.csv('../data/tables/tables 1/merchant_fraud_probability.csv', header=True, inferSchema=True)
tbl_merchants = spark.read.parquet('../data/tables/tables 1/tbl_merchants.parquet')

In [28]:
print(merchant_fraud.select('merchant_abn').distinct().count())
tbl_merchants.select('merchant_abn').distinct().count()

61


4026

### Join Merchant datasets

In [29]:
merchant_table = merchant_fraud.join(tbl_merchants, on="merchant_abn", how="right")
merchant_table.show(10)

+------------+--------------+-----------------+--------------------+--------------------+
|merchant_abn|order_datetime|fraud_probability|                name|                tags|
+------------+--------------+-----------------+--------------------+--------------------+
| 10023283211|          NULL|             NULL|       Felis Limited|((furniture, home...|
| 10142254217|          NULL|             NULL|Arcu Ac Orci Corp...|([cable, satellit...|
| 10165489824|          NULL|             NULL|    Nunc Sed Company|([jewelry, watch,...|
| 10187291046|          NULL|             NULL|Ultricies Digniss...|([wAtch, clock, a...|
| 10192359162|          NULL|             NULL| Enim Condimentum PC|([music shops - m...|
| 10206519221|          NULL|             NULL|       Fusce Company|[(gift, card, nov...|
| 10255988167|          NULL|             NULL|Aliquam Enim Inco...|[(computers, comP...|
| 10264435225|          NULL|             NULL|    Ipsum Primis Ltd|[[watch, clock, a...|
| 10279061

In [30]:
merchant_table.count()

4073

In [31]:
# convert all string to lowercase
merchant_table = merchant_table.withColumn("name", lower(col("name"))) \
                               .withColumn("tags", lower(col("tags")))

In [32]:
# convert all brackets to []

# replace '(' with '['
merchant_table = merchant_table.withColumn("tags_converted", regexp_replace(col("tags"), r'\(', '['))

# replace ')' with ']'
merchant_table = merchant_table.withColumn("tags_converted", regexp_replace(col("tags_converted"), 
                                                                            r'\)', ']'))


In [33]:
# split the elements by '], [' to get the three parts
split_col = split(col("tags_converted"), r'\], \[')

# clean up each part and assign them to separate columns
merchant_table = merchant_table.withColumn("category", trim(regexp_replace(split_col.getItem(0), r'^\[|\]$', ''))) \
                               .withColumn("revenue_level", trim(regexp_replace(split_col.getItem(1), r'^\[|\]$', ''))) \
                               .withColumn("take_rate", trim(regexp_replace(split_col.getItem(2), r'^\[take rate: |\]$', '')))

# keep only numeric values
merchant_table = merchant_table.withColumn("category", regexp_replace(col("category"), r'^\[|\]$', ''))
merchant_table = merchant_table.withColumn("take_rate", regexp_replace(col("take_rate"), r'[^\d.]+', ''))

merchant_table = merchant_table.drop('tags', 'tags_converted')
merchant_table.show(5)

+------------+--------------+-----------------+--------------------+--------------------+-------------+---------+
|merchant_abn|order_datetime|fraud_probability|                name|            category|revenue_level|take_rate|
+------------+--------------+-----------------+--------------------+--------------------+-------------+---------+
| 10023283211|          NULL|             NULL|       felis limited|furniture, home f...|            e|     0.18|
| 10142254217|          NULL|             NULL|arcu ac orci corp...|cable, satellite,...|            b|     4.22|
| 10165489824|          NULL|             NULL|    nunc sed company|jewelry, watch, c...|            b|     4.40|
| 10187291046|          NULL|             NULL|ultricies digniss...|watch, clock, and...|            b|     3.29|
| 10192359162|          NULL|             NULL| enim condimentum pc|music shops - mus...|            a|     6.33|
+------------+--------------+-----------------+--------------------+--------------------

In [34]:
merchant_table.printSchema()

root
 |-- merchant_abn: long (nullable = true)
 |-- order_datetime: date (nullable = true)
 |-- fraud_probability: double (nullable = true)
 |-- name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- revenue_level: string (nullable = true)
 |-- take_rate: string (nullable = true)



In [35]:
# convert the `take_rate` column from string type to float type
merchant_table = merchant_table.withColumn("take_rate", col("take_rate").cast("float"))
merchant_table.printSchema()

root
 |-- merchant_abn: long (nullable = true)
 |-- order_datetime: date (nullable = true)
 |-- fraud_probability: double (nullable = true)
 |-- name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- revenue_level: string (nullable = true)
 |-- take_rate: float (nullable = true)



In [36]:
# check for invalid fraud probabilities - none
invalid_fraud_prob = merchant_table.filter((col("fraud_probability") < 0) | (col("fraud_probability") > 100))
invalid_fraud_prob.show(truncate=False)

+------------+--------------+-----------------+----+--------+-------------+---------+
|merchant_abn|order_datetime|fraud_probability|name|category|revenue_level|take_rate|
+------------+--------------+-----------------+----+--------+-------------+---------+
+------------+--------------+-----------------+----+--------+-------------+---------+



In [37]:
# check missing values
# all columns in the DataFrame
columns = merchant_table.columns

# count missing values for each column
missing_values_expr = [sum(col(c).isNull().cast("int")).alias(c) for c in columns]
missing_counts = merchant_table.select(*missing_values_expr).collect()[0].asDict()

# missing values count for each column
for column, count in missing_counts.items():
    print(f"There are {count} missing values in the '{column}' column.")

There are 0 missing values in the 'merchant_abn' column.
There are 3978 missing values in the 'order_datetime' column.
There are 3978 missing values in the 'fraud_probability' column.
There are 0 missing values in the 'name' column.
There are 0 missing values in the 'category' column.
There are 0 missing values in the 'revenue_level' column.
There are 0 missing values in the 'take_rate' column.


In [38]:
merchant_table.describe()

summary,merchant_abn,fraud_probability,name,category,revenue_level,take_rate
count,4073,95.0,4073,4073,4073,4073.0
mean,5.443029800201424E10,39.51738670731344,,,,4.394797444297836
stddev,2.605832899933888...,16.617441988139745,,,,1.776735642951219
min,10023283211,18.21089142894488,a aliquet ltd,antique shops - ...,a,0.1
max,99990536339,91.09606847149963,vulputate velit inc.,"watch, clock, and...",e,7.0


In [39]:
merchant_table.select('merchant_abn').distinct().count()

4026

In [40]:
# save the DataFrame as a Parquet file
merchant_table.write.parquet('../data/curated/merchant', mode='overwrite')

## Consumer datasets

In [41]:
consumer_fraud = spark.read.csv('../data/tables/tables 1/consumer_fraud_probability.csv', header=True, inferSchema=True)
consumer_user_details = spark.read.parquet('../data/tables/tables 1/consumer_user_details.parquet')
tbl_consumer = spark.read.csv('../data/tables/tables 1/tbl_consumer.csv', header=True, inferSchema=True)

In [42]:
# split tbl_consumer table
# single column into multiple columns
split_col = split(tbl_consumer['name|address|state|postcode|gender|consumer_id'], r'\|')

# create separate columns for each part
tbl_consumer = tbl_consumer.withColumn('name', split_col.getItem(0)) \
                           .withColumn('address', split_col.getItem(1)) \
                           .withColumn('state', split_col.getItem(2)) \
                           .withColumn('postcode', split_col.getItem(3)) \
                           .withColumn('gender', split_col.getItem(4)) \
                           .withColumn('consumer_id', split_col.getItem(5))

tbl_consumer = tbl_consumer.drop('name|address|state|postcode|gender|consumer_id')
tbl_consumer = tbl_consumer.drop('address')
tbl_consumer.show(3)

+----------------+-----+--------+------+-----------+
|            name|state|postcode|gender|consumer_id|
+----------------+-----+--------+------+-----------+
|Yolanda Williams|   WA|    6935|Female|    1195503|
|      Mary Smith|  NSW|    2782|Female|     179208|
|   Jill Jones MD|   NT|     862|Female|    1194530|
+----------------+-----+--------+------+-----------+
only showing top 3 rows



In [43]:
print(consumer_fraud.select('user_id').distinct().count())
print(consumer_user_details.select('user_id').distinct().count())
print(consumer_user_details.select('consumer_id').distinct().count())
tbl_consumer.select('consumer_id').distinct().count()

20128
499999
499999


                                                                                

499999

### Join cosumer tables

In [44]:
consumer_table = consumer_fraud.join(consumer_user_details, on="user_id", how="right")
consumer_table.select('user_id').distinct().count()
consumer_table = consumer_table.join(tbl_consumer, on="consumer_id", how="inner")
consumer_table.select('consumer_id').distinct().count()

                                                                                

499999

In [45]:
consumer_table.show(10)

                                                                                

+-----------+-------+--------------+-----------------+---------------+-----+--------+-----------+
|consumer_id|user_id|order_datetime|fraud_probability|           name|state|postcode|     gender|
+-----------+-------+--------------+-----------------+---------------+-----+--------+-----------+
|          7| 371406|          NULL|             NULL| James Williams|  TAS|    7248|       Male|
|         19|  92127|          NULL|             NULL| Dennis Ramirez|  QLD|    4406|       Male|
|         22| 166164|          NULL|             NULL|  Joseph Turner|  NSW|    2281|       Male|
|         26| 476081|          NULL|             NULL|  Chloe Walters|  VIC|    3026|     Female|
|         29| 286075|          NULL|             NULL|      Evan Pope|  QLD|    4706|Undisclosed|
|         54| 352250|          NULL|             NULL|  Rebecca Lyons|  QLD|    4516|     Female|
|         77|  37181|          NULL|             NULL| Donald Burgess|   WA|    6017|       Male|
|         94|  77872

In [46]:
consumer_table.printSchema()

root
 |-- consumer_id: long (nullable = true)
 |-- user_id: long (nullable = true)
 |-- order_datetime: date (nullable = true)
 |-- fraud_probability: double (nullable = true)
 |-- name: string (nullable = true)
 |-- state: string (nullable = true)
 |-- postcode: string (nullable = true)
 |-- gender: string (nullable = true)



In [47]:
# convert the `postcode` column from string type to long type
consumer_table = consumer_table.withColumn("postcode", col("postcode").cast("long"))
consumer_table.printSchema()

root
 |-- consumer_id: long (nullable = true)
 |-- user_id: long (nullable = true)
 |-- order_datetime: date (nullable = true)
 |-- fraud_probability: double (nullable = true)
 |-- name: string (nullable = true)
 |-- state: string (nullable = true)
 |-- postcode: long (nullable = true)
 |-- gender: string (nullable = true)



### Consumer table preprocessing

In [48]:
# check for invalid fraud probabilities - none
invalid_fraud_prob = consumer_table.filter((col("fraud_probability") < 0) | (col("fraud_probability") > 100))
invalid_fraud_prob.show(truncate=False)

+-----------+-------+--------------+-----------------+----+-----+--------+------+
|consumer_id|user_id|order_datetime|fraud_probability|name|state|postcode|gender|
+-----------+-------+--------------+-----------------+----+-----+--------+------+
+-----------+-------+--------------+-----------------+----+-----+--------+------+



                                                                                

In [49]:
# check missing values
# all columns in the DataFrame
columns = consumer_table.columns

# count missing values for each column
missing_values_expr = [sum(col(c).isNull().cast("int")).alias(c) for c in columns]
missing_counts = consumer_table.select(*missing_values_expr).collect()[0].asDict()

# missing values count for each column
for column, count in missing_counts.items():
    print(f"There are {count} missing values in the '{column}' column.")



There are 0 missing values in the 'consumer_id' column.
There are 0 missing values in the 'user_id' column.
There are 479871 missing values in the 'order_datetime' column.
There are 479871 missing values in the 'fraud_probability' column.
There are 0 missing values in the 'name' column.
There are 0 missing values in the 'state' column.
There are 0 missing values in the 'postcode' column.
There are 0 missing values in the 'gender' column.


                                                                                

In [50]:
consumer_table.describe()

                                                                                

summary,consumer_id,user_id,fraud_probability,name,state,postcode,gender
count,514735.0,514735.0,34864.0,514735,514735,514735.0,514735
mean,750893.6460508805,243188.9465064548,15.120090644154695,,,4037.528333997105,
stddev,433192.7318899152,147689.9105190419,9.946084849578078,,,1791.152468501352,
min,4.0,1.0,8.287143531552802,Aaron Acevedo,ACT,200.0,Female
max,1499995.0,499999.0,99.24738020302328,Zoe Wright,WA,9999.0,Undisclosed


In [51]:
# save the DataFrame as a Parquet file
consumer_table.write.parquet('../data/curated/consumer', mode='overwrite')

                                                                                

## Table 2 3 4 - transaction tables

In [52]:
# 3 transactions tables
tables_2 = spark.read.parquet('../data/tables/tables 2')
tables_3 = spark.read.parquet('../data/tables/tables 3')
tables_4 = spark.read.parquet('../data/tables/tables 4')

                                                                                

In [53]:
print('number of transactions in table 2 3 4: ', tables_2.count(), tables_3.count(), tables_4.count())


number of transactions in table 2 3 4:  3643266 4508106 6044133


In [54]:
# combine all transactions - 14195505 transactions with no duplicate record
transaction_table = tables_2.union(tables_3).union(tables_4)

In [55]:
transaction_table.show(5)

+-------+------------+------------------+--------------------+--------------+
|user_id|merchant_abn|      dollar_value|            order_id|order_datetime|
+-------+------------+------------------+--------------------+--------------+
|  18478| 62191208634|63.255848959735246|949a63c8-29f7-4ab...|    2021-08-20|
|      2| 15549624934| 130.3505283105634|6a84c3cf-612a-457...|    2021-08-20|
|  18479| 64403598239|120.15860593212783|b10dcc33-e53f-425...|    2021-08-20|
|      3| 60956456424| 136.6785200286976|0f09c5a5-784e-447...|    2021-08-20|
|  18479| 94493496784| 72.96316578355305|f6c78c1a-4600-4c5...|    2021-08-20|
+-------+------------+------------------+--------------------+--------------+
only showing top 5 rows



In [56]:
transaction_table.printSchema()

root
 |-- user_id: long (nullable = true)
 |-- merchant_abn: long (nullable = true)
 |-- dollar_value: double (nullable = true)
 |-- order_id: string (nullable = true)
 |-- order_datetime: date (nullable = true)



In [57]:
# check duplicate transaction records

# group by all columns and count occurrences
duplicates = transaction_table.groupBy(transaction_table.columns).count()

# keep only duplicate records
duplicates = duplicates.filter(col("count") > 1)

# duplicate row
duplicates.show()



+-------+------------+------------+--------+--------------+-----+
|user_id|merchant_abn|dollar_value|order_id|order_datetime|count|
+-------+------------+------------+--------+--------------+-----+
+-------+------------+------------+--------+--------------+-----+



                                                                                

In [58]:
# check missing values
# all columns in the DataFrame
columns = transaction_table.columns

# count missing values for each column
missing_values_expr = [sum(col(c).isNull().cast("int")).alias(c) for c in columns]
missing_counts = transaction_table.select(*missing_values_expr).collect()[0].asDict()

# missing values count for each column
for column, count in missing_counts.items():
    print(f"There are {count} missing values in the '{column}' column.")



There are 0 missing values in the 'user_id' column.
There are 0 missing values in the 'merchant_abn' column.
There are 0 missing values in the 'dollar_value' column.
There are 0 missing values in the 'order_id' column.
There are 0 missing values in the 'order_datetime' column.


                                                                                

In [59]:
# save the DataFrame as a Parquet file
transaction_table.write.parquet('../data/curated/transaction', mode='overwrite')

                                                                                

# External dataset

### Location datasets (SA2 information and postcode)
1. SA2 information and its geometry: https://www.abs.gov.au/statistics/standards/australian-statistical-geography-standard-asgs-edition-3/jul2021-jun2026/access-and-downloads/digital-boundary-files/SA2_2021_AUST_SHP_GDA2020.zip
2. SA2 information and postcode lookup table: https://data.gov.au/data/dataset/6cd8989d-4aca-46b7-b93e-77befcffa0b6/resource/cb659d81-5bd2-41f5-a3d0-67257c9a5893/download/asgs2021codingindexs.zip

In [60]:
# read SA2 name and its geometry
gda = gpd.read_file('../data/tables/SA2_2021_AUST_SHP_GDA2020/SA2_2021_AUST_GDA2020.shp')
gda = gda[['SA2_NAME21', 'geometry']]

# read the SA2 name and postcode lookup table 
postcode_sa2 = pd.read_csv('../data/tables/asgs2021codingindexs/2023 Locality to 2021 SA2 Coding Index.csv')
postcode_sa2 = postcode_sa2[['SA2_NAME_2021', 'SA2_MAINCODE_2021', 'POSTCODE', 'STATE']]
postcode_sa2 = postcode_sa2.dropna(subset=['POSTCODE'])
postcode_sa2['POSTCODE'] = postcode_sa2['POSTCODE'].astype(int)
postcode_sa2 = postcode_sa2.rename(columns={'POSTCODE': 'postcode'})
postcode_sa2 = postcode_sa2.rename(columns={'STATE': 'state'})

In [61]:
gda.head()

Unnamed: 0,SA2_NAME21,geometry
0,Braidwood,"POLYGON ((149.58424 -35.44426, 149.58444 -35.4..."
1,Karabar,"POLYGON ((149.21899 -35.36738, 149.218 -35.366..."
2,Queanbeyan,"POLYGON ((149.21326 -35.34325, 149.21619 -35.3..."
3,Queanbeyan - East,"POLYGON ((149.24034 -35.34781, 149.24024 -35.3..."
4,Queanbeyan West - Jerrabomberra,"POLYGON ((149.19572 -35.36126, 149.1997 -35.35..."


In [62]:
postcode_sa2.head()

Unnamed: 0,SA2_NAME_2021,SA2_MAINCODE_2021,postcode,state
0,Black Mountain,801051123,2600,ACT
1,Parkes (ACT) - South,801061068,2600,ACT
2,Barton,801061129,2600,ACT
3,Deakin,801061062,2600,ACT
4,Parkes (ACT) - South,801061068,2600,ACT


In [63]:
postcode_sa2_geo = gda.merge(postcode_sa2, left_on='SA2_NAME21', right_on='SA2_NAME_2021', how='inner')
postcode_sa2_geo = postcode_sa2_geo.drop_duplicates()
postcode_sa2_geo = postcode_sa2_geo.drop(columns=['SA2_NAME_2021'])
postcode_sa2_geo = postcode_sa2_geo.rename(columns={'SA2_NAME21': 'SA2_name'})
postcode_sa2_geo.head()

Unnamed: 0,SA2_name,geometry,SA2_MAINCODE_2021,postcode,state
0,Braidwood,"POLYGON ((149.58424 -35.44426, 149.58444 -35.4...",101021007,2580,NSW
5,Braidwood,"POLYGON ((149.58424 -35.44426, 149.58444 -35.4...",101021007,2622,NSW
53,Karabar,"POLYGON ((149.21899 -35.36738, 149.218 -35.366...",101021008,2620,NSW
56,Queanbeyan,"POLYGON ((149.21326 -35.34325, 149.21619 -35.3...",101021009,2620,ACT
57,Queanbeyan,"POLYGON ((149.21326 -35.34325, 149.21619 -35.3...",101021009,2620,NSW


In [64]:
# check the data type
print('postcode_sa2_geo:')
print(postcode_sa2_geo.dtypes)

postcode_sa2_geo:
SA2_name               object
geometry             geometry
SA2_MAINCODE_2021       int64
postcode                int64
state                  object
dtype: object


In [65]:
# number of unique postcode
unique_postcodes_count = len(postcode_sa2_geo['postcode'].unique())
print(f"Number of unique postcodes: {unique_postcodes_count}")

Number of unique postcodes: 2643


In [66]:
postcode_sa2_geo.shape

(4841, 5)

In [67]:
# save the DataFrame as a CSV file
postcode_sa2_geo.to_file('../data/curated/postcode_sa2_geo.shp')

  postcode_sa2_geo.to_file('../data/curated/postcode_sa2_geo.shp')
  ogr_write(


### Census dataset
https://www.abs.gov.au/census/find-census-data/datapacks/download/2021_GCP_SA2_for_AUS_short-header.zip

In [68]:
census_df = pd.read_csv('../data/tables/2021_GCP_SA2_for_AUS_short-header/2021 Census GCP Statistical Area 2 for AUS/2021Census_G02_AUST_SA2.csv')
census_df.head()

Unnamed: 0,SA2_CODE_2021,Median_age_persons,Median_mortgage_repay_monthly,Median_tot_prsnl_inc_weekly,Median_rent_weekly,Median_tot_fam_inc_weekly,Average_num_psns_per_bedroom,Median_tot_hhd_inc_weekly,Average_household_size
0,101021007,51,1732,760,330,1886,0.8,1429,2.2
1,101021008,38,1950,975,350,2334,0.8,1989,2.6
2,101021009,37,1700,996,330,2233,0.9,1703,2.1
3,101021010,36,1700,1104,310,2412,0.9,1796,2.1
4,101021012,37,2300,1357,430,3332,0.8,3014,2.9


In [69]:
census_df.dtypes

SA2_CODE_2021                      int64
Median_age_persons                 int64
Median_mortgage_repay_monthly      int64
Median_tot_prsnl_inc_weekly        int64
Median_rent_weekly                 int64
Median_tot_fam_inc_weekly          int64
Average_num_psns_per_bedroom     float64
Median_tot_hhd_inc_weekly          int64
Average_household_size           float64
dtype: object

In [70]:
# count the number of null in each column
null_counts = census_df.isnull().sum()
for column, count in null_counts.items():
    if count > 0:
        print(f"Column '{column}' has {count} null values.")
    else:
        print(f"Column '{column}' has 0 null values.")

Column 'SA2_CODE_2021' has 0 null values.
Column 'Median_age_persons' has 0 null values.
Column 'Median_mortgage_repay_monthly' has 0 null values.
Column 'Median_tot_prsnl_inc_weekly' has 0 null values.
Column 'Median_rent_weekly' has 0 null values.
Column 'Median_tot_fam_inc_weekly' has 0 null values.
Column 'Average_num_psns_per_bedroom' has 0 null values.
Column 'Median_tot_hhd_inc_weekly' has 0 null values.
Column 'Average_household_size' has 0 null values.


In [71]:
# count the number of 0 in each column
zero_counts = (census_df == 0).sum()
for column, count in zero_counts.items():
    print(f"Column '{column}' has {count} zero values.")

Column 'SA2_CODE_2021' has 0 zero values.
Column 'Median_age_persons' has 38 zero values.
Column 'Median_mortgage_repay_monthly' has 111 zero values.
Column 'Median_tot_prsnl_inc_weekly' has 42 zero values.
Column 'Median_rent_weekly' has 80 zero values.
Column 'Median_tot_fam_inc_weekly' has 69 zero values.
Column 'Average_num_psns_per_bedroom' has 55 zero values.
Column 'Median_tot_hhd_inc_weekly' has 58 zero values.
Column 'Average_household_size' has 55 zero values.


In [72]:
# delete the rows that are all 0 except the `SA2_CODE_2021` column
census_df = census_df[(census_df.iloc[:, 1:] != 0).any(axis=1)]

In [73]:
census_df.describe()

Unnamed: 0,SA2_CODE_2021,Median_age_persons,Median_mortgage_repay_monthly,Median_tot_prsnl_inc_weekly,Median_rent_weekly,Median_tot_fam_inc_weekly,Average_num_psns_per_bedroom,Median_tot_hhd_inc_weekly,Average_household_size
count,2434.0,2434.0,2434.0,2434.0,2434.0,2434.0,2434.0,2434.0,2434.0
mean,314236900.0,40.349219,1827.395234,848.050534,362.711586,2140.251849,0.820912,1783.120789,2.538127
std,194594400.0,7.098079,663.84346,268.597468,123.711599,713.285625,0.148683,594.407149,0.465065
min,101021000.0,10.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
25%,127021500.0,36.0,1470.5,681.0,295.0,1678.0,0.8,1355.0,2.3
50%,303021100.0,40.0,1820.0,803.0,369.0,2087.0,0.8,1742.0,2.5
75%,405031100.0,44.0,2167.0,962.0,430.0,2481.0,0.9,2144.5,2.8
max,901041000.0,91.0,9999.0,3250.0,1200.0,7000.0,2.0,7000.0,6.0


In [74]:
census_df.shape

(2434, 9)

In [75]:
# save the DataFrame as parquet file
census = spark.createDataFrame(census_df)
census.write.parquet('../data/curated/census', mode='overwrite')

                                                                                

### Population dataset
https://www.abs.gov.au/statistics/people/population/regional-population/2021-22/32180DS0003_2001-22r.xlsx

In [76]:
# read the first Excel sheet
population_pandas = pd.read_excel('../data/tables/32180DS0003_2001-22r.xlsx', sheet_name=1, header=[6, 7])
population_pandas.head()

Unnamed: 0_level_0,Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,Unnamed: 3_level_0,Unnamed: 4_level_0,Unnamed: 5_level_0,Unnamed: 6_level_0,Unnamed: 7_level_0,Unnamed: 8_level_0,Unnamed: 9_level_0,...,2013,2014,2015,2016,2017,2018,2019,2020,2021,2022
Unnamed: 0_level_1,S/T code,S/T name,GCCSA code,GCCSA name,SA4 code,SA4 name,SA3 code,SA3 name,SA2 code,SA2 name,...,no.,no.,no.,no.,no.,no.,no.,no.,no.,no.
0,,,,,,,,,,,...,,,,,,,,,,
1,1.0,New South Wales,12.0,Rest of NSW,101.0,Capital Region,10102.0,Queanbeyan,101021007.0,Braidwood,...,3685.0,3762.0,3849.0,3950.0,4041.0,4145.0,4218.0,4282.0,4332.0,4366.0
2,1.0,New South Wales,12.0,Rest of NSW,101.0,Capital Region,10102.0,Queanbeyan,101021008.0,Karabar,...,8848.0,8731.0,8603.0,8531.0,8530.0,8516.0,8500.0,8535.0,8548.0,8528.0
3,1.0,New South Wales,12.0,Rest of NSW,101.0,Capital Region,10102.0,Queanbeyan,101021009.0,Queanbeyan,...,11050.0,11199.0,11213.0,11230.0,11362.0,11460.0,11468.0,11460.0,11375.0,11390.0
4,1.0,New South Wales,12.0,Rest of NSW,101.0,Capital Region,10102.0,Queanbeyan,101021010.0,Queanbeyan - East,...,4983.0,4967.0,4961.0,4970.0,5016.0,5079.0,5126.0,5089.0,5097.0,5090.0


In [99]:
population_pandas.columns

MultiIndex([('Unnamed: 0_level_0',   'S/T code'),
            ('Unnamed: 1_level_0',   'S/T name'),
            ('Unnamed: 2_level_0', 'GCCSA code'),
            ('Unnamed: 3_level_0', 'GCCSA name'),
            ('Unnamed: 4_level_0',   'SA4 code'),
            ('Unnamed: 5_level_0',   'SA4 name'),
            ('Unnamed: 6_level_0',   'SA3 code'),
            ('Unnamed: 7_level_0',   'SA3 name'),
            ('Unnamed: 8_level_0',   'SA2 code'),
            ('Unnamed: 9_level_0',   'SA2 name'),
            (                2001,        'no.'),
            (                2002,        'no.'),
            (                2003,        'no.'),
            (                2004,        'no.'),
            (                2005,        'no.'),
            (                2006,        'no.'),
            (                2007,        'no.'),
            (                2008,        'no.'),
            (                2009,        'no.'),
            (                2010,        'no.'),


In [78]:
# columns of interest
columns_of_interest = [
    ('Unnamed: 9_level_0', 'SA2 name'),
    (2021, 'no.'),
    (2022, 'no.')
]

# selected columns
population_selected = population_pandas.loc[:, columns_of_interest]

# rename the columns
population_selected.columns = ['SA2_name', '2021', '2022']

# remove the specific row with all NaNs
population_selected_cleaned = population_selected[population_selected.notna().all(axis=1)]

# reset the index to maintain a clean index
population_selected_cleaned = population_selected_cleaned.reset_index(drop=True)
population_selected_cleaned.head()

Unnamed: 0,SA2_name,2021,2022
0,Braidwood,4332.0,4366.0
1,Karabar,8548.0,8528.0
2,Queanbeyan,11375.0,11390.0
3,Queanbeyan - East,5097.0,5090.0
4,Queanbeyan West - Jerrabomberra,12748.0,12779.0


In [79]:
population_selected_cleaned.dtypes

SA2_name     object
2021        float64
2022        float64
dtype: object

In [80]:
# remaining NaN values
missing_value = population_selected_cleaned.isna().sum()
print("NaN values summary:")
print(missing_value)

NaN values summary:
SA2_name    0
2021        0
2022        0
dtype: int64


In [81]:
# check if '2021' and '2022' columns are present
if '2021' in population_selected_cleaned.columns and '2022' in population_selected_cleaned.columns:
    
    # create a new column 'average_population' that averages the '2021' and '2022' columns
    population_selected_cleaned['average_population'] = population_selected_cleaned[['2021', '2022']].mean(axis=1)
    
    # drop the '2021' and '2022' columns
    population_selected_cleaned = population_selected_cleaned.drop(columns=['2021', '2022'])
    
else:
    print("The columns '2021' and '2022' are not present in the DataFrame.")

# dataFrame with the average column
print("DataFrame after averaging population of 2021 and 2022:")
population_selected_cleaned.head()

DataFrame after averaging population of 2021 and 2022:


Unnamed: 0,SA2_name,average_population
0,Braidwood,4349.0
1,Karabar,8538.0
2,Queanbeyan,11382.5
3,Queanbeyan - East,5093.5
4,Queanbeyan West - Jerrabomberra,12763.5


In [82]:
# convert Pandas DataFrame to Spark DataFrame
population = spark.createDataFrame(population_selected_cleaned)

# save as a Parquet file
population.write.parquet('../data/curated/population', mode='overwrite')

### Unemployment dataset
https://explore.data.abs.gov.au/vis?tm=unemployment%20sa2&pg=0&df[ds]=C21_ASGS&df[id]=C21_G43_SA2&df[ag]=ABS&df[vs]=1.0.0&pd=2021%2C&dq=LFS_P1.3...&ly[rs]=REGION

In [83]:
# read the CSV file
unemployment = pd.read_csv('../data/tables/ABS_C21_G43_SA2_1.0.0_LFS_P1.3....csv')
unemployment.head(5)

Unnamed: 0,DATAFLOW,LFEMP: Selected Labour Force/Education/Migration characteristic,SEXP: Sex,REGION: Region,REGION_TYPE: Region Type,STATE: State,TIME_PERIOD: Time Period,OBS_VALUE
0,ABS:C21_G43_SA2(1.0.0),LFS_P1: Labour force status: % Unemployment,3: Persons,101021611: Queanbeyan Surrounds,SA2: Statistical Area Level 2,1: New South Wales,2021,2.6
1,ABS:C21_G43_SA2(1.0.0),LFS_P1: Labour force status: % Unemployment,3: Persons,107011134: Unanderra - Mount Kembla,SA2: Statistical Area Level 2,1: New South Wales,2021,4.2
2,ABS:C21_G43_SA2(1.0.0),LFS_P1: Labour force status: % Unemployment,3: Persons,204031069: Bright - Mount Beauty,SA2: Statistical Area Level 2,2: Victoria,2021,1.8
3,ABS:C21_G43_SA2(1.0.0),LFS_P1: Labour force status: % Unemployment,3: Persons,211051280: Montrose,SA2: Statistical Area Level 2,2: Victoria,2021,3.1
4,ABS:C21_G43_SA2(1.0.0),LFS_P1: Labour force status: % Unemployment,3: Persons,213051468: Werribee - West,SA2: Statistical Area Level 2,2: Victoria,2021,6.2


In [84]:
# select and rename the columns
selected_unemp = unemployment[['REGION: Region', 'OBS_VALUE']]
selected_unemp.columns = ['SA2_name', 'unemployment_rate']

# selected columns
selected_unemp.head()

Unnamed: 0,SA2_name,unemployment_rate
0,101021611: Queanbeyan Surrounds,2.6
1,107011134: Unanderra - Mount Kembla,4.2
2,204031069: Bright - Mount Beauty,1.8
3,211051280: Montrose,3.1
4,213051468: Werribee - West,6.2


In [85]:
# remove numbers and colon from the 'SA2 name' column
selected_unemp.loc[:, 'SA2_name'] = selected_unemp['SA2_name'].str.replace(r'^\d+:\s*', '', regex=True)

# display the DataFrame with the cleaned 'SA2_name'
selected_unemp.head()

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  selected_unemp.loc[:, 'SA2_name'] = selected_unemp['SA2_name'].str.replace(r'^\d+:\s*', '', regex=True)


Unnamed: 0,SA2_name,unemployment_rate
0,Queanbeyan Surrounds,2.6
1,Unanderra - Mount Kembla,4.2
2,Bright - Mount Beauty,1.8
3,Montrose,3.1
4,Werribee - West,6.2


In [86]:
# convert Pandas DataFrame to Spark DataFrame
unemployment = spark.createDataFrame(selected_unemp)

# save the DataFrame as a Parquet file
unemployment.write.parquet('../data/curated/unemployment', mode='overwrite')

# Merge all external datasets

In [87]:
# merge census and location
census_sa2_df = pd.merge(census_df, postcode_sa2_geo, left_on='SA2_CODE_2021', right_on='SA2_MAINCODE_2021', how='inner')
census_sa2_df = census_sa2_df.drop(['SA2_CODE_2021', 'SA2_MAINCODE_2021'], axis=1)

# merge the above with and population
census_sa2_pop_df = pd.merge(population_selected_cleaned, census_sa2_df, on='SA2_name', how='inner')

# merge the above with unemployment rate
census_sa2_pop_unemp_df = pd.merge(census_sa2_pop_df, selected_unemp, on='SA2_name', how='inner')

In [88]:
census_sa2_pop_unemp_df.shape

(4969, 14)

In [89]:
census_sa2_pop_unemp_df.dtypes

SA2_name                           object
average_population                float64
Median_age_persons                  int64
Median_mortgage_repay_monthly       int64
Median_tot_prsnl_inc_weekly         int64
Median_rent_weekly                  int64
Median_tot_fam_inc_weekly           int64
Average_num_psns_per_bedroom      float64
Median_tot_hhd_inc_weekly           int64
Average_household_size            float64
geometry                         geometry
postcode                            int64
state                              object
unemployment_rate                 float64
dtype: object

In [90]:
# count the number of null in each column
null_counts = census_sa2_pop_unemp_df.isnull().sum()
for column, count in null_counts.items():
    if count > 0:
        print(f"Column '{column}' has {count} null values.")
    else:
        print(f"Column '{column}' has 0 null values.")

Column 'SA2_name' has 0 null values.
Column 'average_population' has 0 null values.
Column 'Median_age_persons' has 0 null values.
Column 'Median_mortgage_repay_monthly' has 0 null values.
Column 'Median_tot_prsnl_inc_weekly' has 0 null values.
Column 'Median_rent_weekly' has 0 null values.
Column 'Median_tot_fam_inc_weekly' has 0 null values.
Column 'Average_num_psns_per_bedroom' has 0 null values.
Column 'Median_tot_hhd_inc_weekly' has 0 null values.
Column 'Average_household_size' has 0 null values.
Column 'geometry' has 0 null values.
Column 'postcode' has 0 null values.
Column 'state' has 0 null values.
Column 'unemployment_rate' has 0 null values.


In [91]:
# count the number of 0 in each column
zero_counts = (census_sa2_pop_unemp_df == 0).sum()
for column, count in zero_counts.items():
    print(f"Column '{column}' has {count} zero values.")

Column 'SA2_name' has 0 zero values.
Column 'average_population' has 4 zero values.
Column 'Median_age_persons' has 0 zero values.
Column 'Median_mortgage_repay_monthly' has 58 zero values.
Column 'Median_tot_prsnl_inc_weekly' has 3 zero values.
Column 'Median_rent_weekly' has 33 zero values.
Column 'Median_tot_fam_inc_weekly' has 22 zero values.
Column 'Average_num_psns_per_bedroom' has 9 zero values.
Column 'Median_tot_hhd_inc_weekly' has 12 zero values.
Column 'Average_household_size' has 9 zero values.
Column 'geometry' has 0 zero values.
Column 'postcode' has 0 zero values.
Column 'state' has 0 zero values.
Column 'unemployment_rate' has 57 zero values.


In [92]:
# delete the rows that are all 0 except the `SA2_name`, `geometry`, `postcode`, and `state` column
columns_to_check = census_sa2_pop_unemp_df.columns.difference(['SA2_name', 'geometry', 'postcode', 'state'])
census_sa2_pop_unemp_df_cleaned = census_sa2_pop_unemp_df[(census_sa2_pop_unemp_df[columns_to_check] != 0).any(axis=1)]
census_sa2_pop_unemp_df_cleaned.shape

(4969, 14)

In [93]:
census_sa2_pop_unemp_df_cleaned.describe()

Unnamed: 0,average_population,Median_age_persons,Median_mortgage_repay_monthly,Median_tot_prsnl_inc_weekly,Median_rent_weekly,Median_tot_fam_inc_weekly,Average_num_psns_per_bedroom,Median_tot_hhd_inc_weekly,Average_household_size,postcode,unemployment_rate
count,4969.0,4969.0,4969.0,4969.0,4969.0,4969.0,4969.0,4969.0,4969.0,4969.0,4969.0
mean,9765.708593,42.477963,1641.460052,797.392634,316.111491,1988.73556,0.800584,1638.860938,2.497786,4030.357818,4.670759
std,6035.311035,6.900428,622.554219,227.784335,121.914507,608.287984,0.125917,522.250357,0.37333,1529.038199,2.756597
min,0.0,10.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,800.0,0.0
25%,5065.0,38.0,1213.0,654.0,230.0,1579.0,0.7,1245.0,2.3,2734.0,3.3
50%,8194.5,42.0,1600.0,759.0,310.0,1891.0,0.8,1501.0,2.5,3852.0,4.1
75%,13466.5,47.0,2000.0,892.0,400.0,2284.0,0.8,1982.0,2.7,5109.0,5.3
max,28645.5,91.0,9999.0,3250.0,1200.0,7000.0,2.0,4799.0,6.0,7470.0,66.7


In [94]:
census_sa2_pop_unemp_df_cleaned.head()

Unnamed: 0,SA2_name,average_population,Median_age_persons,Median_mortgage_repay_monthly,Median_tot_prsnl_inc_weekly,Median_rent_weekly,Median_tot_fam_inc_weekly,Average_num_psns_per_bedroom,Median_tot_hhd_inc_weekly,Average_household_size,geometry,postcode,state,unemployment_rate
0,Braidwood,4349.0,51,1732,760,330,1886,0.8,1429,2.2,"POLYGON ((149.58424 -35.44426, 149.58444 -35.4...",2580,NSW,3.5
1,Braidwood,4349.0,51,1732,760,330,1886,0.8,1429,2.2,"POLYGON ((149.58424 -35.44426, 149.58444 -35.4...",2622,NSW,3.5
2,Karabar,8538.0,38,1950,975,350,2334,0.8,1989,2.6,"POLYGON ((149.21899 -35.36738, 149.218 -35.366...",2620,NSW,4.3
3,Queanbeyan,11382.5,37,1700,996,330,2233,0.9,1703,2.1,"POLYGON ((149.21326 -35.34325, 149.21619 -35.3...",2620,ACT,4.2
4,Queanbeyan,11382.5,37,1700,996,330,2233,0.9,1703,2.1,"POLYGON ((149.21326 -35.34325, 149.21619 -35.3...",2620,ACT,3.1


In [95]:
# save the DataFrame as a CSV file
census_sa2_pop_unemp_df_cleaned.to_csv('../data/curated/merged_external.csv', index=False)

In [96]:
# drop the geometry column
census_sa2_pop_unemp_df_cleaned = census_sa2_pop_unemp_df_cleaned.drop(columns=['geometry'])

# convert the DataFrame as a Parquet file
merged_external = spark.createDataFrame(census_sa2_pop_unemp_df_cleaned)

# save the DataFrame as a Parquet file
merged_external.write.parquet('../data/curated/merged_external', mode='overwrite')