## Buy Now, Pay Later Project
### MAST30034: Applied Data Science 
#### Notebook 2: Preprocessing Data 2: Data Aggregation

In [2]:
# create modeling spark session
from pyspark.sql import SparkSession
import pandas as pd

spark = (
    SparkSession.builder.appName('Project 2 test')
    .config('spark.sql.repl.eagerEval.enabled', True) 
    .config('spark.sql.parquet.cacheMetadata', 'true')
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .config("spark.executor.memory", "8g")
    .config("spark.driver.memory", "8g")
    .getOrCreate()
)

In [3]:
full = spark.read.parquet('../data/curated/clean_full_dataset/')
full

postcode,user_id,merchant_abn,dollar_value,order_datetime,name,business_area,revenue_level,take_rate,consumer_id,state,gender,Median_age_persons,Median_mortgage_repay_monthly,Median_tot_prsnl_inc_weekly,Median_rent_weekly,Median_tot_fam_inc_weekly,Average_num_psns_per_bedroom,Median_tot_hhd_inc_weekly,Average_household_size,business_area_type
3332,14,68004106739,35.066583865041444,2021-08-08,Nec Ante Ltd,"cable, satellite,...",a,5.61,1343547,VIC,Male,38,1733,797,350,2096,0.8,1955,2.9,Retail_trade
3432,4943,67609108741,87.09715299772967,2022-03-05,Metus Sit Amet In...,"cable, satellite,...",e,0.38,533495,VIC,Male,51,2000,791,550,2374,0.8,1792,2.7,Retail_trade
3332,14,70052129860,10.624562993647425,2021-07-25,Donec Tempus Lore...,"cable, satellite,...",b,3.37,1343547,VIC,Male,38,1733,797,350,2096,0.8,1955,2.9,Retail_trade
3432,4943,19237425345,36.91374841278192,2022-03-08,A Scelerisque Ass...,"cable, satellite,...",c,2.04,533495,VIC,Male,51,2000,791,550,2374,0.8,1792,2.7,Retail_trade
3332,14,79645157255,21.83858073111989,2021-04-16,Consectetuer Maur...,"cable, satellite,...",a,6.46,1343547,VIC,Male,38,1733,797,350,2096,0.8,1955,2.9,Retail_trade
3432,4943,67979471799,9.354001353615072,2022-03-16,Laoreet Ipsum Corp.,"cable, satellite,...",d,1.04,533495,VIC,Male,51,2000,791,550,2374,0.8,1792,2.7,Retail_trade
3332,14,21439773999,57.50723650600155,2021-06-01,Mauris Non Institute,"cable, satellite,...",a,6.1,1343547,VIC,Male,38,1733,797,350,2096,0.8,1955,2.9,Retail_trade
3432,4943,68501926042,72.64904714060505,2022-03-23,Dolor Corp.,"cable, satellite,...",a,5.78,533495,VIC,Male,51,2000,791,550,2374,0.8,1792,2.7,Retail_trade
3332,14,66370248931,24.001048123394057,2021-05-04,Morbi Non PC,"cable, satellite,...",b,3.15,1343547,VIC,Male,38,1733,797,350,2096,0.8,1955,2.9,Retail_trade
2082,4949,44454840859,34.990386491563186,2021-08-27,Erat Eget Ipsum PC,"cable, satellite,...",a,5.62,801928,NSW,Undisclosed,42,2500,977,580,2804,0.8,2449,2.9,Retail_trade


### Calculating the Gender Percentage for Consumers

Noting that the "male_percentage"/"female_percentage"/"undisclosed_percentage" here refer to the amount of transactions made by male/female/undisclosed consumers divided by the total amount of transactions. Multiple transaction records made by the same consumers are included in this calculation algorithm.

In [None]:
# count the number of comsumers by gender and merchant
gender_count_sdf = full.groupBy(["merchant_abn", "gender"]).count().sort("merchant_abn")
gender_count_sdf

In [None]:
# count the total number of consumers
total_count = gender_count_sdf.groupBy('merchant_abn').sum('count').sort("merchant_abn")
total_count

In [None]:
gender_count_sdf = gender_count_sdf.join(total_count, on='merchant_abn').sort("merchant_abn")
gender_count_sdf = gender_count_sdf.withColumnRenamed("sum(count)","total_transactions_count")
gender_count_sdf


In [None]:
# calculate the consumer gender percentage for each gender and for each merchants, save the percentage as "gender_percentage"
from pyspark.sql import functions as F
gender_count_sdf = gender_count_sdf.withColumn("gender_percentage", F.col("count")/F.col("total_transactions_count"))
gender_count_sdf

In [None]:
# separate gender percentage by male, female and undisclosed
male_percentage = gender_count_sdf.filter("gender == 'Male'").select(F.col("merchant_abn"),F.col("gender_percentage")).withColumnRenamed("gender_percentage","male_consumer_percentage")
female_percentage = gender_count_sdf.filter("gender == 'Female'").select(F.col("merchant_abn"),F.col("gender_percentage")).withColumnRenamed("gender_percentage","female_consumer_percentage")
undisclosed_percentage = gender_count_sdf.filter("gender == 'Undisclosed'").select(F.col("merchant_abn"),F.col("gender_percentage")).withColumnRenamed("gender_percentage","undisclosed_consumer_percentage")

In [None]:
# observe one of the outcome dataframe
male_percentage

In [None]:
# combine the 3 gender percentages together into 1 dataframe
agg_df = male_percentage.join(female_percentage, on="merchant_abn")
agg_df = agg_df.join(undisclosed_percentage, on="merchant_abn")
# add the total transaction count into aggregated dataframe and rename the column name
agg_df = agg_df.join(total_count, on="merchant_abn")
agg_df = agg_df.withColumnRenamed("sum(count)","total_transactions_count")
agg_df

In [None]:
full.groupBy("merchant_abn") \
    .agg(F.mean("Median_age_persons").alias("ave_age"), \
         F.mean("Median_tot_prsnl_inc_weekly").alias("ave_income"), \
         F.mean("Median_rent_weekly").alias("ave_rent") \
     ).limit(10)