# The following code is for external data processing

In [1]:
import re
import numpy as np
import pandas as pd
import re
import geopandas as gpd

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = (
    SparkSession.builder.appName("MAST30034 Project 2 Preprocessing")
    .config("spark.driver.memory", '4g')
    .config("spark.executor.memory", '8g')
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .config("spark.sql.parquet.enableVectorizedReader","false")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.parquet.writeLegacyFormat", 'true')
    .getOrCreate()
)

22/09/19 10:41:53 WARN Utils: Your hostname, DESKTOP-3NQ3PQI resolves to a loopback address: 127.0.1.1; using 172.31.183.205 instead (on interface eth0)
22/09/19 10:41:53 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/09/19 10:41:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Mapping Postcode to ABS Postal Areas

In [24]:
# Read in data
postal_areas_gdf = gpd.read_file('../data/raw/postcodes/abs_postal_areas.zip')
consumer_details_df = pd.read_csv('../data/tables/tbl_consumer.csv', delimiter="|")
postcode_df = pd.read_csv('../data/raw/postcodes/postcodes.csv').drop_duplicates('postcode')

# Convert postcode dataframe to geodataframe
postcode_gdf = gpd.GeoDataFrame(
    postcode_df, geometry=gpd.points_from_xy(postcode_df['long'], postcode_df['lat'])
)
postcode_gdf.crs = postal_areas_gdf.crs

# Get list of postcodes not listed as abs postal areas and filter geodataframe to just these postcodes
unmapped = consumer_details_df[~consumer_details_df['postcode'].astype(str).str.zfill(4).isin(postal_areas_gdf['POA_CODE21'])]['postcode'].unique()
postcodes_gdf = postcode_gdf[postcode_gdf['postcode'].isin(unmapped)]

# Spatially join unmapped postcodes and abs postal areas
postcode_poa_gdf = postcodes_gdf.sjoin(postal_areas_gdf, how = 'inner')
postcode_poa_df = postcode_poa_gdf[['postcode', 'POA_CODE21']]
postcode_poa_df = postcode_poa_df.rename(columns = {'POA_CODE21' : 'poa'}).reset_index()
postcode_poa_df = pd.concat([postcode_poa_df,pd.DataFrame(data = {'postcode' : postal_areas_gdf['POA_CODE21'], 'poa' : postal_areas_gdf['POA_CODE21']})], ignore_index = True)

In [25]:
postcode_poa_df

Unnamed: 0,index,postcode,poa
0,0.0,200,2601
1,4308.0,2608,2601
2,4315.0,2610,2601
3,4.0,801,0800
4,5.0,804,0820
...,...,...,...
3164,,7469,7469
3165,,7470,7470
3166,,9494,9494
3167,,9797,9797


2 postcodes could not be mapped. Niether of these could be found in the Australia post website. https://postcodes-australia.com/postcodes/6958 says 6958 is a Western Australian postcode reserved for non standard use 

# Median Age

In [None]:
transactions_sdf = spark.read.parquet(
    '../data/tables/transactions_20210228_20210827_snapshot/'
).union(
    spark.read.parquet(
        '../data/tables/transactions_20210828_20220227_snapshot/'
    )
).union(
    spark.read.parquet(
        '../data/tables/transactions_20220228_20220828_snapshot/'
    )
)

ids_sdf = spark.read.parquet(
    '../data/tables/consumer_user_details.parquet'
)

consumers_sdf = spark.read.options(
    header = True, delimiter = '|'
).csv(
    '../data/tables/tbl_consumer.csv'
)

age_sdf = spark.read.options(
    header = True
).csv(
    '../data/curated/census/age_data.csv'
)

Processing on sample data

In [None]:
transactions_sdf.sample(0.01).write.parquet('../data/raw/samples/transaction_sample.parquet')

In [None]:
transactions_sdf = spark.read.parquet('../data/raw/samples/transaction_sample.parquet')

In [None]:

# Creates dataframe grouped by merchant and postcode with propn of customers for each corresponding postcode
merchants_sdf = transactions_sdf.join(
    ids_sdf,
    on = 'user_id'
).join(
    consumers_sdf,
    on = 'consumer_id'
).groupBy(
    'merchant_abn', 'postcode'
).count().join(
    transactions_sdf.groupby(
        'merchant_abn'
    ).count().withColumnRenamed(
        'count',
        'size'
    ),
    on = 'merchant_abn'
).withColumn(
    'propn',
    F.col('count')/F.col('size')
).drop(
    'count',
    'size'
)

# Joins merchant and postcode data with abs data for population by age
merchants_sdf = merchants_sdf.join(
    age_sdf,
    on = 'postcode'
)

# Creates scaled version of each population metric by age
for col in age_sdf.columns:
    if col == 'postcode':
        continue
    merchants_sdf = merchants_sdf.withColumn(
        col+'_scaled',
        F.col(col)*F.col('propn')
    )

# Removes non scaled columns (used to make the scaled columns) and calculates weighted sum of each population metric by propn of customers from that postcode
merchants_sdf = merchants_sdf.select(
    merchants_sdf.colRegex("`merchant_abn|.*_scaled`")
).groupBy(
    'merchant_abn'
).sum()

merchants_df = merchants_sdf.toPandas()

# Renames columns and sets index
merchants_df = merchants_df.drop(
    'sum(merchant_abn)',
    axis = 1
).set_index(
    'merchant_abn'
).rename(
    columns = {col : col[4:-1] for col in merchants_df.columns}
)

In [None]:
# Calculates median of each row in dataframe where each row corresponds to a count of the given column value
def get_median_col(df):
    median_count = df.sum(axis = 1)/2

    return df.cumsum(axis = 1).apply(
        lambda col : (col > median_count)
    ).idxmax(
        axis = 1
    )

# Executes get_medial_col function for males females and persons    
for person_type in ['m', 'f', 'p']:
    merchants_df[f'median_age_{person_type}'] = get_median_col(
        merchants_df.filter(
            regex = f'age_yr_(\d+|(80_84)|(85_89)|(90_94)|(95_99)|(100_yr_over))_{person_type}_scaled',
            axis = 1
        )
    ).apply(
        lambda x : re.findall('\d+', x)[0]
    )


In [None]:
merchants_df

In [None]:
external_sdf = spark.read.option("header", "true").csv("../data/raw/external/income.csv")
external_sdf= external_sdf.withColumnRenamed(
    "INCP Total Personal Income (weekly)",
    "Income"
)

In [None]:
external_sdf

In [None]:
income_factors = list(set(external_sdf.select(F.collect_list("Income")).first()[0]))

In [None]:
income_factors

As we can see, there seems to be a row per location regarding the total amount of 'Count'. We wish to extract this information and create a separate dataset for easier access to these numbers

In [None]:
location_total = external_sdf.filter(F.col("Income") == "Total")

In [None]:
location_total

In [None]:
external_sdf = external_sdf.where(F.col("Income") != "Total")

# Use regular expression to find amount range

In [None]:
temp_df = external_sdf.select('Income').toPandas()

In [None]:
from readline import append_history_file


output_col = []
pattern = "\((\$\d*,?\d+-\$\d*,*\d*)|(\$\d*,?\d* or more)\)"

for income in temp_df["Income"]:
    matched = re.findall(pattern, income)
    if len(matched) > 0:
        output_col.append(matched[0][0])
    else:
        output_col.append(income)

In [None]:
temp_df['Income Parsed'] = output_col

In [None]:
output_col[14]

In [None]:
test = spark.createDataFrame(temp_df)
test

Ben's preprocessing

In [None]:
income_df = pd.read_csv('../data/raw/external/income.csv')
income_df = income_df.drop(index=range(len(income_df) - 4, len(income_df)))

In [None]:
income_df = income_df.rename(columns = {"SA2 (UR)" : "Region", "INCP Total Personal Income (weekly)" : "Income"})

In [None]:
def convert_income(row):
    if row['Income'] in ['Negative income', 'Nil income', 'Total', 'Not stated', 'Not applicable']:
        row['weekly_income'] = row['Income']
        row['yearly_income'] = row['Income']
    else:
        matches = re.findall('(.*)\s\((.*)\)', row['Income'])
        row['weekly_income'] = matches[0][0]
        row['yearly_income'] = matches[0][1]
    return row

In [None]:
income_df = income_df.apply(convert_income, axis = 1)

    

In [None]:
income_df