# ***External ABS data handling***

In [1]:
import os
import sys
import pandas as pd
sys.path.append("../")
from pyspark.sql import SparkSession
from pyspark.sql.functions import input_file_name, regexp_extract, regexp_replace, to_date, avg
from scripts.download import download_abs

## Correspondence

In [2]:
download_abs()

Successfully downloaded ABS files.


In [None]:
correspondence = pd.read_csv("../data/tables/abs/postcode_correspondences_2021.csv")
correspondence.head(15)

#### Postcodes may be matched to multiple SA2 regions, therefore we will choose the region with the highest ratio (percentage of population for that postcode) as representative.

In [None]:
# get indices of max ratio values for each postcode
max_indices = correspondence.groupby("POSTCODE")["RATIO_FROM_TO"].idxmax()

In [None]:
correspondence_filtered = correspondence.loc[max_indices].reset_index(drop=True)
correspondence_filtered.head(5)

#### We can then get rid of all columns except postcode and SA2 code, which will be of use for combining our ABS data with the synthetic data.

In [None]:
correspondence_filtered = correspondence_filtered[["POSTCODE", "SA2_CODE_2021"]]
correspondence_filtered.head(5)

We can now assign a SA2 code for each customer in our synthetic dataset.

In [None]:
# read in data
# data is seperated by pipe "|", not comma
customers = pd.read_csv("../data/tables/synthetic/tbl_consumer.csv", sep="|")
customers

In [None]:
customers_merged = pd.merge(customers, correspondence_filtered, left_on="postcode", right_on="POSTCODE", how="left")
customers_merged = customers_merged.drop("POSTCODE", axis=1)
customers_merged.head(5)

In [None]:
customers_merged["SA2_CODE_2021"].isna().sum()

83,181 missing SA2 codes - maybe impute with mean/median values for state when using ABS data later? Or find similar postcodes or use latitude/longitude data?

In [None]:
# replace NaN values with zeroes (helps with merging later)
customers_merged["SA2_CODE_2021"] = customers_merged["SA2_CODE_2021"].fillna(0)
customers_merged.head(5)

## ABS Data Cleaning

In [None]:
abs_df = pd.read_csv("../data/tables/abs/ABS_2021.csv")
abs_df.head(15)

In [None]:
# Before filtering:
abs_df.shape

First, let's get state-wide statistics for imputation purposes later. Since we are focusing on median personal income, we will only get each state's median personal income number.

In [None]:
stat = "2: Median total personal income ($/weekly)"

NSW_median_income = float(abs_df[(abs_df["MEDAVG: Median/Average"] == stat) &
                           (abs_df["REGION: Region"] == "1: New South Wales")]["OBS_VALUE"])

VIC_median_income = float(abs_df[(abs_df["MEDAVG: Median/Average"] == stat) &
                           (abs_df["REGION: Region"] == "2: Victoria")]["OBS_VALUE"])

QLD_median_income = float(abs_df[(abs_df["MEDAVG: Median/Average"] == stat) &
                           (abs_df["REGION: Region"] == "3: Queensland")]["OBS_VALUE"])

SA_median_income = float(abs_df[(abs_df["MEDAVG: Median/Average"] == stat) &
                           (abs_df["REGION: Region"] == "4: South Australia")]["OBS_VALUE"])

WA_median_income = float(abs_df[(abs_df["MEDAVG: Median/Average"] == stat) &
                           (abs_df["REGION: Region"] == "5: Western Australia")]["OBS_VALUE"])

TAS_median_income = float(abs_df[(abs_df["MEDAVG: Median/Average"] == stat) &
                           (abs_df["REGION: Region"] == "6: Tasmania")]["OBS_VALUE"])

NT_median_income = float(abs_df[(abs_df["MEDAVG: Median/Average"] == stat) &
                           (abs_df["REGION: Region"] == "7: Northern Territory")]["OBS_VALUE"])

ACT_median_income = float(abs_df[(abs_df["MEDAVG: Median/Average"] == stat) &
                           (abs_df["REGION: Region"] == "8: Australian Capital Territory")]["OBS_VALUE"])

Firstly, we only want SA2 data, and data from 2021.

In [None]:
abs_filtered = abs_df[(abs_df["REGION_TYPE: Region Type"] == "SA2: Statistical Area Level 2") & 
                      (abs_df["TIME_PERIOD: Time Period"] == 2021)]
abs_filtered.shape

In [None]:
abs_filtered.head(5)

Now, we can convert the SA2 region into only its code, and also remove unnecessary features. We will also rename the relevant columns for ease of access.

In [None]:
# change SA2 feature
abs_filtered["REGION: Region"] = abs_filtered["REGION: Region"].str[:9].astype("int64")
abs_filtered.head(5)

In [None]:
abs_filtered = abs_filtered.drop(["DATAFLOW", "REGION_TYPE: Region Type", "STATE: State",
                                  "TIME_PERIOD: Time Period"], axis=1) # drop useless columns
abs_filtered = abs_filtered.rename(columns={"MEDAVG: Median/Average": "statistic",
                                            "REGION: Region": "region",
                                            "OBS_VALUE": "value"}) # rename columns
abs_filtered.shape

We should engineer new columns based on the categorical values of the `Statistic` column.

In [None]:
abs_filtered = abs_filtered.pivot_table(index="region", columns="statistic", values="value", aggfunc="sum")
abs_filtered = abs_filtered.reset_index()
abs_filtered

In [None]:
# rename all columns for simplicity
abs_filtered.columns = ["region", "median_age", "median_personal_income",
                        "median_family_income", "median_household_income", "median_mortgage",
                        "median_rent", "avg_bedroom", "avg_household"]
abs_filtered.shape

In [None]:
abs_filtered.drop("region", axis=1).describe()

We should remove instances which include outlier values for some features. For example, we can see that minimum values for a lot of the statistics are zero, which doesn't make sense. We should also remove any NaN values.

In [None]:
# remove outliers
abs_filtered = abs_filtered[(abs_filtered["median_age"] > 0) & 
                            (abs_filtered["median_personal_income"] > 0) &
                            (abs_filtered["median_family_income"] > 0) &
                            (abs_filtered["median_household_income"] > 0) &
                            (abs_filtered["median_mortgage"] > 0) &
                            (abs_filtered["median_rent"] > 0) &
                            (abs_filtered["avg_bedroom"] > 0) &
                            (abs_filtered["avg_household"] > 0)]

abs_filtered = abs_filtered.dropna() # remove NaN values

abs_filtered.drop("region", axis=1).describe()

We can see that the distributions of these numeric features now look much more sensible.

In [None]:
# external dataset shape after filtering
abs_filtered.shape

In [None]:
abs_filtered

Now we can take the median personal income (our variable of interest) and merge this with our customer dataset according to SA2 region code.

In [None]:
median_personal_income_df = abs_filtered.loc[:, ["region", "median_personal_income"]]
customers_merged["SA2_CODE_2021"] = customers_merged["SA2_CODE_2021"].astype("int") # helps with merging

df_merged = pd.merge(customers_merged, median_personal_income_df, left_on="SA2_CODE_2021", right_on="region", how="left")
df_merged = df_merged.drop("SA2_CODE_2021", axis=1) # drop duplicate column
df_merged.head(5)

Since we are missing some income numbers (see the `NaN` value), we will impute each missing value with the median income of the state where the customer is from.

In [None]:
replacement_values = {"NSW": NSW_median_income, # values determined through initial ABS dataset
                      "VIC": VIC_median_income,
                      "QLD": QLD_median_income,
                      "SA": SA_median_income,
                      "WA": WA_median_income,
                      "TAS": TAS_median_income,
                      "NT": NT_median_income,
                      "ACT": ACT_median_income}

replacement_series = df_merged["state"].map(replacement_values)

df_merged["median_personal_income"] = df_merged["median_personal_income"].fillna(replacement_series)
df_merged.head(5)

In [None]:
df_merged["median_personal_income"].isna().any()

We can see that our median personal income column now has no missing values, and hence the data has been properly imputed.

In [None]:
# define the folder path and filename
output_path = "../data/curated/"
file_name = "consumers_median_income"

# create the folder if it doesn't exist
if not os.path.exists(output_path):
    os.makedirs(output_path)

# save df to csv in the specified folder
file_path = os.path.join(output_path, file_name)
df_merged.to_csv(file_path, index=False)

## Making Dataframe

Each column: Order ID, Date, ABN, User ID, Consumer ID, Fraud Probability, Median Income

In [None]:
spark = (
    SparkSession.builder.appName("nathan")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .getOrCreate()
)

In [None]:
consumer_fraud = pd.read_csv("../data/tables/synthetic/consumer_fraud_probability.csv")
consumer_details = spark.read.parquet("../data/tables/synthetic/consumer_user_details.parquet")

In [None]:
consumer_details.limit(5)

In [None]:
transactions_path = "../data/tables/synthetic/transactions/"

# Read all parquet files and add a new column for the file name
df = spark.read.parquet(transactions_path).withColumn("order_datetime", input_file_name())
df = df.withColumn("order_datetime", regexp_extract(input_file_name(), r'[^/]+$', 0))

df.limit(5)

Convert `date` column into date format.

In [None]:
df = df.withColumn("order_datetime", regexp_replace("order_datetime", r'\.parquet$', ''))
df = df.withColumn("order_datetime", to_date("order_datetime", "yyyy-MM-dd"))

df.limit(5)

Add fraud probability

In [None]:
consumer_fraud_spark = spark.createDataFrame(consumer_fraud)
consumer_fraud_spark.limit(5)

There are some duplicates in the fraud probability dataframe (i.e. entries with the same `user_id` and `order_datetime`), so we will average out all the `fraud_probability` values over the duplicates, and remove any duplicate rows.

In [None]:
consumer_fraud_spark = consumer_fraud_spark.groupBy("user_id", "order_datetime").agg(
    avg("fraud_probability").alias("avg_fraud_prob")
)

consumer_fraud_spark.limit(5)

In [None]:
df_fraud = df.join(consumer_fraud_spark, on=["user_id", "order_datetime"], how="left")
df_fraud = df_fraud.fillna({"avg_fraud_prob": 0})

df_fraud.limit(5)

Now we can add in the consumer IDs from the `consumer_details` dataframe.

In [None]:
consumer_details.limit(5)

In [None]:
df_fraud = df_fraud.join(consumer_details, on=["user_id"], how="left")
df_fraud.limit(5)

In [None]:
df_fraud.filter(df_fraud["consumer_id"].isNull()).count()

We can see there are no missing values in the `consumer_id` column, so there are no issues with merging. Now, we can merge again using our `df_merged` dataset, which contains median income data.

In [None]:
# convert median income pandas df to a spark df
median_income_spark = spark.createDataFrame(df_merged[["consumer_id", "median_personal_income"]])
median_income_spark.limit(5)

In [None]:
df_final = df_fraud.join(median_income_spark, on=["consumer_id"], how="left")
df_final.limit(5)

In [None]:
df_final.filter(df_final["median_personal_income"].isNull()).count()

We can see that our new column has no missing values, so our dataframe is complete. Next step is to add the take rate depending on `merchant_abn`, and calculate purchase power, expected revenue and expected loss for each transaction.