# Run "ETL" script

In [277]:
# Open Spark session 
from pyspark.sql import SparkSession, functions as F


# Create a spark session (which will run spark jobs)
spark = (
    SparkSession.builder.appName("Data_Explorer")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.driver.memory", "4g")
    .getOrCreate()
)

In [278]:
# Read the files 
transactions_sample = spark.read.parquet('../data/tables/transactions_20210828_20220227_snapshot')
transactions_sample2 = spark.read.parquet('../data/tables/transactions_20210228_20210827_snapshot')
transactions_sample3 = spark.read.parquet('../data/tables/transactions_20220228_20220828_snapshot')
transactions_sample.unionByName(transactions_sample2, True)
transactions_sample.unionByName(transactions_sample3, True)
consumer_details = spark.read.parquet('../data/tables/consumer_user_details.parquet')
merchants_tbl = spark.read.parquet('../data/tables/tbl_merchants.parquet')
customer_tbl = spark.read.option("delimiter", "|").option("header",True).csv('../data/tables/tbl_consumer.csv')

                                                                                

In [279]:
merchants = merchants_tbl.toPandas()

In [280]:
import re
# this function standardises the tags attribute, creating a list with the 'description', 'revenue band' and 'BNPL service charge'
def tag_extract(tag_string): 
    # first need to preprocess
    string =  re.sub('\[','(', tag_string.lower())
    string = re.sub('\]',')', string)
    # break the string into sections
    string_cut = string.split('),')
    new_string = []
    # first extract the description 
    new_string.append(str(string_cut[0].strip('((')))
    # second extract the band
    new_string.append(str(re.search(r'[a-z]',string_cut[1]).group()))
    # finally the take rate
    new_string.append(float(re.search(r'[0-9]+\.[0-9]+',string_cut[2]).group()))
    return(new_string)
################
# now we can run the algorithm
tags = merchants['tags']
processed_tags = []
for i in tags:
    processed_tags.append(tag_extract(i))

In [281]:
import pandas as pd
merchant_tbl = pd.DataFrame(processed_tags, columns=('Description', 'Earnings_Class', 'BNPL_Fee'))
merchant_tbl = pd.concat([merchants, merchant_tbl], axis=1)
# drop the tags column 
merchant_tbl.drop(columns='tags', inplace=True)

In [282]:
# and convert back to spark dataframe 
merchants_tbl = spark.createDataFrame(merchant_tbl)

In [283]:
# This could be further expanded in breaking the discription up further

# Standardisation of Customers 
The objective of this section is to verify if a customer's details have been recorded correctly

In [284]:
import pandas as pd
import numpy as np
# dataset link 
link = 'https://www.matthewproctor.com/Content/postcodes/australian_postcodes.csv'
# load data 
cust = customer_tbl.toPandas()
postcodes = pd.read_csv("../data/tables/postcode_verification.csv")
postcodes['postcode'] = postcodes['postcode'].astype('str')
keep_columns = ['postcode', 'state', 'sa3name', 'sa4name', 'SA3_NAME_2016', 'electoraterating', 'electorate']
postcodes = postcodes[keep_columns]

In [285]:
# First imputate missing values
for col in postcodes.columns[1:]:
    postcodes[col] = postcodes.groupby("state")[col].transform(lambda x: x.fillna(x.mode()))

In [286]:
postcodes_agg = postcodes.groupby(['state', 'postcode'], as_index=False).agg(sa3name = pd.NamedAgg('sa3name',lambda x: pd.Series.mode(x) if len(pd.Series.mode(x))>0 else np.NaN),
                                                 sa4name = pd.NamedAgg('sa4name',lambda x: pd.Series.mode(x) if len(pd.Series.mode(x))>0 else np.NaN),
                                                 electoraterating = pd.NamedAgg('electoraterating',lambda x: pd.Series.mode(x) if len(pd.Series.mode(x))>0 else np.NaN),
                                                 SA3_NAME_2016 = pd.NamedAgg('SA3_NAME_2016',lambda x: pd.Series.mode(x) if len(pd.Series.mode(x))>0 else np.NaN),
                                                 electorate = pd.NamedAgg('electorate',lambda x: pd.Series.mode(x) if len(pd.Series.mode(x))>0 else np.NaN)
                                                 )

In [287]:
# Imputate
imputation = postcodes_agg.groupby('state', as_index=False).agg(sa3name_mode = pd.NamedAgg('sa3name',lambda x: pd.Series.mode(x) if len(pd.Series.mode(x))>0 else np.NaN),
                                                 sa4name_mode = pd.NamedAgg('sa4name',lambda x: pd.Series.mode(x) if len(pd.Series.mode(x))>0 else np.NaN),
                                                 electoraterating_mode = pd.NamedAgg('electoraterating',lambda x: pd.Series.mode(x) if len(pd.Series.mode(x))>0 else np.NaN),
                                                 SA3_NAME_2016_mode = pd.NamedAgg('SA3_NAME_2016',lambda x: pd.Series.mode(x) if len(pd.Series.mode(x))>0 else np.NaN),
                                                 electorate_mode = pd.NamedAgg('electorate',lambda x: pd.Series.mode(x) if len(pd.Series.mode(x))>0 else np.NaN)
                                                 )

In [288]:
postcodes_agg = postcodes_agg.merge(imputation, on='state', how='left')

In [289]:
postcodes_agg.sa3name.fillna(postcodes_agg.sa3name_mode, inplace=True)
postcodes_agg.sa4name.fillna(postcodes_agg.sa4name_mode, inplace=True)
postcodes_agg.electoraterating.fillna(postcodes_agg.electoraterating_mode, inplace=True)
postcodes_agg.SA3_NAME_2016.fillna(postcodes_agg.SA3_NAME_2016_mode, inplace=True)
postcodes_agg.electorate.fillna(postcodes_agg.electorate_mode, inplace=True)

In [290]:
postcodes_agg = postcodes_agg.drop(['sa3name_mode', 'sa4name_mode', 'electoraterating_mode', 'SA3_NAME_2016_mode', 'electorate_mode'], axis = 1)

# Tax data addition

In [299]:
tax_data = pd.read_csv("../data/tables/tax_income.csv")
tax_data['Postcode'] = tax_data['Postcode'].astype('str')
# First remove duplicate columns 
tax_data = tax_data.T.drop_duplicates().T
tax_columns = tax_data.columns[1:]
# IMPUTATION TIME
postcodes_agg = postcodes_agg.join(tax_data, lsuffix='postcode', rsuffix='Postcode', how='left')
for col in tax_columns:
    for agg_name in ['sa4name', 'electorate', 'electoraterating']:
        postcodes_agg[col] = postcodes_agg.groupby(agg_name)[col].transform(lambda x: x.fillna(x.mean()))
    postcodes_agg[col] = postcodes_agg[col].apply(np.ceil).astype('int')
postcodes_agg.set_index(['state', 'postcode'], inplace = True)
cust.set_index(['state', 'postcode'], inplace= True)
customer_tbl = cust.join(postcodes_agg, how='left')
customer_tbl = customer_tbl.reset_index()
customer_tbl.drop(columns='Postcode', inplace = True)
# create schema
from pyspark.sql.types import *
from traitlets import Integer
structure = []
for att_name in customer_tbl.columns:
        if att_name not in tax_columns:
                structure.append(StructField(att_name, StringType(), True))
        else: 
                structure.append(StructField(att_name, IntegerType(), True))
schema = StructType(structure)
# convert back to original form
customer_tbl = spark.createDataFrame(customer_tbl, schema)

In [301]:
customer_tbl = customer_tbl.join(consumer_details, ['consumer_id'])

In [302]:
full_dataset = transactions_sample.join(customer_tbl, ['user_id'])
merchants_tbl = merchants_tbl.withColumnRenamed('name','company_name')
full_dataset = full_dataset.join(merchants_tbl, ['merchant_abn'])

In [303]:
# lets add a day (mon,...), weekly & monthly attribute
import pyspark.sql.functions as F
full_dataset = full_dataset.withColumn('Day', F.dayofweek('order_datetime'))
full_dataset = full_dataset.withColumn('Month', F.month('order_datetime'))
# now we can also add the bnpl revenue from a transaction 
full_dataset = full_dataset.withColumn('BNPL_Revenue', F.col('dollar_value') * 0.01 * F.col('BNPL_Fee'))

In [None]:
full_dataset.createOrReplaceTempView('data')
# we can remove name, location and customerID for now, due to being unnnesesary attributes (although company_name could also be removed)
full_dataset = spark.sql("""
select merchant_abn, user_id, dollar_value, order_id, order_datetime, state, postcode, gender, company_name, 
        Description, Earnings_Class, BNPL_Fee, BNPL_Revenue, Day, Month, weekofyear(order_datetime) as weekofyear from data
""")

In [None]:
# now can convert back to customer data anc continue

# Aggregation statistics/features for merchants