# Add External Data to Transaction Data 

In [1]:
import pandas as pd
import numpy as np
import geopandas as gpd
import os
import re
import glob
import math
import matplotlib.pyplot as plt
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType, DoubleType
from datetime import datetime, timedelta

In [2]:
spark = (
    SparkSession.builder.appName('Specific Analysis')
    .config("spark.sql.repl.eagerEval.enabled", True)
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .config('spark.driver.memory', '4g')
    .config('spark.executor.memory', '2g')
    .getOrCreate()
)

your 131072x1 screen size is bogus. expect trouble
24/10/13 22:13:52 WARN Utils: Your hostname, DESKTOP-F216TKE resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
24/10/13 22:13:52 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/13 22:13:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
consumer_details = spark.read.parquet(".././data/curated/consumer_details")
transactions = spark.read.parquet(".././data/curated/total_transactions")
merchant_fraud = spark.read.parquet(".././data/curated/merchant_fraud")
tbl_consumer = spark.read.parquet(".././data/curated/tbl_consumer")
tbl_merchant = spark.read.parquet(".././data/curated/tbl_merchants")

                                                                                

In [4]:
tbl_merchant.show()

+--------------------+------------+--------------------+----+---------+
|       merchant_name|merchant_abn|                tags|type|take_rate|
+--------------------+------------+--------------------+----+---------+
|       Felis Limited| 10023283211|furniture, home f...|   e|     0.18|
|Arcu Ac Orci Corp...| 10142254217|cable, satellite,...|   b|     4.22|
|    Nunc Sed Company| 10165489824|jewelry, watch, c...|   b|      4.4|
|Ultricies Digniss...| 10187291046|watch, clock, jew...|   b|     3.29|
| Enim Condimentum PC| 10192359162|music, musical in...|   a|     6.33|
|       Fusce Company| 10206519221|gift, card, novel...|   a|     6.34|
|Aliquam Enim Inco...| 10255988167|computers, comput...|   b|     4.32|
|    Ipsum Primis Ltd| 10264435225|watch, clock, jew...|   c|     2.39|
|Pede Ultrices Ind...| 10279061213|computer programm...|   a|     5.71|
|           Nunc Inc.| 10323485998|furniture, home f...|   a|     6.61|
|Facilisis Facilis...| 10342410215|computers, comput...|   a|   

In [5]:
consumer_fraud = spark.read.parquet(".././data/curated/consumer_fraud")

In [6]:
consumer_fraud.show()

+-------+--------------+-----------------+
|user_id|order_datetime|fraud_probability|
+-------+--------------+-----------------+
|   6228|    2021-12-19| 97.6298077657765|
|  21419|    2021-12-10|99.24738020302328|
|   5606|    2021-10-17|84.05825045251777|
|   3101|    2021-04-17|91.42192091901347|
|  22239|    2021-10-19|94.70342477508035|
|  16556|    2022-02-20|89.65663294494827|
|  10278|    2021-09-28|83.59136689427714|
|  15790|    2021-12-30|71.77065889280253|
|   5233|    2021-08-29|85.87123303878818|
|    230|    2021-08-28|86.28328808934151|
|  13601|    2021-12-26|83.13696487489679|
|   6383|    2021-09-15| 66.2676451623754|
|   3513|    2022-02-27|75.16981192247916|
|  18658|    2021-10-19|82.98609082999361|
|   5965|    2021-11-14|69.37164467869053|
|  18714|    2021-11-14|83.78813794627237|
|  22957|    2022-02-12|82.79065699075498|
|  20118|    2021-09-05|80.34030486265003|
|   6436|    2021-12-24|84.81618344606828|
|  17900|    2022-02-25|92.73262811161372|
+-------+--

In [7]:
merchant_fraud = merchant_fraud.withColumnRenamed('fraud_probability', 'merchant_fraud_probability')

In [8]:
tbl_merchant.printSchema()

root
 |-- merchant_name: string (nullable = true)
 |-- merchant_abn: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- type: string (nullable = true)
 |-- take_rate: double (nullable = true)



In [9]:
merchant_fraud.printSchema()

root
 |-- merchant_abn: string (nullable = true)
 |-- order_datetime: date (nullable = true)
 |-- merchant_fraud_probability: string (nullable = true)



In [10]:
merchant_fraud = merchant_fraud.withColumn('merchant_fraud_probability', F.col('merchant_fraud_probability').cast(DoubleType()))

In [11]:
consumer_fraud.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- order_datetime: date (nullable = true)
 |-- fraud_probability: double (nullable = true)



In [12]:
tbl_consumer.printSchema()

root
 |-- consumer_name: string (nullable = true)
 |-- state: string (nullable = true)
 |-- postcode: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- consumer_id: string (nullable = true)



In [13]:
transactions.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- merchant_abn: string (nullable = true)
 |-- dollar_value: double (nullable = true)
 |-- order_id: string (nullable = true)
 |-- order_datetime: date (nullable = true)



In [14]:
full_transactions = transactions.join(consumer_details, on="user_id",how="left") \
                   .join(tbl_consumer, on="consumer_id",how="left") \
                   .join(consumer_fraud, on=["user_id", "order_datetime"],how="left") \
                   .join(tbl_merchant, on="merchant_abn",how="left") \
                   .join(merchant_fraud, on=["order_datetime", "merchant_abn"], how="left")

In [15]:
full_transactions.show()

+--------------+------------+-------+-----------+------------------+--------------------+------------------+-----+--------+------+-----------------+--------------------+--------------------+----+---------+--------------------------+
|order_datetime|merchant_abn|user_id|consumer_id|      dollar_value|            order_id|     consumer_name|state|postcode|gender|fraud_probability|       merchant_name|                tags|type|take_rate|merchant_fraud_probability|
+--------------+------------+-------+-----------+------------------+--------------------+------------------+-----+--------+------+-----------------+--------------------+--------------------+----+---------+--------------------------+
|    2022-08-25| 63290521567|  11138|     401483|41.239626303220014|09f6132a-a6a8-47a...|   Veronica Nelson|   WA|    6519|Female|             NULL|Vehicula Pellente...|artist supply, craft|   a|     6.48|                      NULL|
|    2022-08-25| 63465140133|  22285|     416278| 8.815286645396842|

In [16]:
#full_transactions.write.mode('overwrite').parquet('.././data/curated/full_transactions')

Add ShapeFile Information

In [17]:
postcodes = gpd.read_file(".././data/landing/POA_2021_AUST_GDA2020_SHP")
postcodes = postcodes.to_crs(epsg='4326')

postcodes['lon'] = postcodes.geometry.centroid.x
postcodes['lat'] = postcodes.geometry.centroid.y
postcodes = postcodes.drop(columns='geometry')
postcodes = postcodes.loc[:, ['POA_CODE21', 'AREASQKM21', 'SHAPE_Leng', 'SHAPE_Area', 'lon', 'lat']]
postcodes = postcodes.rename(columns={'POA_CODE21': 'postcode'})

postcodes = spark.createDataFrame(postcodes)

full_transactions = full_transactions.join(postcodes, on='postcode')
full_transactions.show(truncate=False)


  postcodes['lon'] = postcodes.geometry.centroid.x

  postcodes['lat'] = postcodes.geometry.centroid.y

+--------+--------------+------------+-------+-----------+------------------+------------------------------------+-----------------+-----+-----------+-----------------+------------------------------+----------------------------------------------------------------+----+---------+--------------------------+----------+--------------+----------------+------------------+-------------------+
|postcode|order_datetime|merchant_abn|user_id|consumer_id|dollar_value      |order_id                            |consumer_name    |state|gender     |fraud_probability|merchant_name                 |tags                                                            |type|take_rate|merchant_fraud_probability|AREASQKM21|SHAPE_Leng    |SHAPE_Area      |lon               |lat                |
+--------+--------------+------------+-------+-----------+------------------+------------------------------------+-----------------+-----+-----------+-----------------+------------------------------+-----------------------

                                                                                

Add Census Data

In [18]:
# Combine the census data into a single dataframe, by postcode
# https://www.abs.gov.au/census/find-census-data/datapacks?release=2021&product=GCP&geography=SA2&header=S
TABLES = [
    '2021Census_G02_AUST_POA.csv',
    '2021Census_G04A_AUST_POA.csv',
    '2021Census_G04B_AUST_POA.csv'
]
POSTCODE_COLUMN = 'POA_CODE_2021'
SELECTED_COLUMNS = [
    POSTCODE_COLUMN,
    'Median_age_persons', 
    'Median_tot_fam_inc_weekly', 
    'Median_tot_hhd_inc_weekly', 
    'Average_household_size', 
    'Tot_M', 
    'Tot_P', 
    'Tot_F'
]

census_df = None
for table in TABLES:
    df = spark.read.options(header=True) \
        .csv(f".././data/landing/2021_GCP_POA_for_AUS_short-header/2021 Census GCP Postal Areas for AUS/{table}")

    if census_df is None:
        census_df = df
    else:
        census_df = census_df.join(df, POSTCODE_COLUMN).drop(df[POSTCODE_COLUMN])

census_df = census_df.withColumn(POSTCODE_COLUMN, F.regexp_replace(F.col(POSTCODE_COLUMN), "(\D)", ""))
census_df = census_df.select(*SELECTED_COLUMNS)
for column in SELECTED_COLUMNS[1:]:
    census_df = census_df.withColumn(column, F.col(column).cast(DoubleType()))
census_df = census_df.withColumnRenamed("POA_CODE_2021", "postcode")
census_df.show(truncate=False)

full_transactions = full_transactions.join(census_df, on='postcode', how="left")
full_transactions.show(truncate=False)

+--------+------------------+-------------------------+-------------------------+----------------------+-------+-------+-------+
|postcode|Median_age_persons|Median_tot_fam_inc_weekly|Median_tot_hhd_inc_weekly|Average_household_size|Tot_M  |Tot_P  |Tot_F  |
+--------+------------------+-------------------------+-------------------------+----------------------+-------+-------+-------+
|2000    |32.0              |2367.0                   |2225.0                   |2.1                   |14223.0|27936.0|13713.0|
|2007    |30.0              |2197.0                   |1805.0                   |2.1                   |3763.0 |7410.0 |3644.0 |
|2008    |28.0              |2453.0                   |1746.0                   |1.9                   |5315.0 |10400.0|5083.0 |
|2009    |37.0              |3035.0                   |2422.0                   |2.1                   |6391.0 |12658.0|6267.0 |
|2010    |36.0              |3709.0                   |2297.0                   |1.7             

24/10/13 22:14:42 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+--------+--------------+------------+-------+-----------+------------------+------------------------------------+------------------+-----+------+-----------------+---------------------------------+-----------------------------------------------------+----+---------+--------------------------+----------+--------------+----------------+------------------+-------------------+------------------+-------------------------+-------------------------+----------------------+-------+-------+-------+
|postcode|order_datetime|merchant_abn|user_id|consumer_id|dollar_value      |order_id                            |consumer_name     |state|gender|fraud_probability|merchant_name                    |tags                                                 |type|take_rate|merchant_fraud_probability|AREASQKM21|SHAPE_Leng    |SHAPE_Area      |lon               |lat                |Median_age_persons|Median_tot_fam_inc_weekly|Median_tot_hhd_inc_weekly|Average_household_size|Tot_M  |Tot_P  |Tot_F  |
+--------+

Add Retail Sales Data 

In [19]:
retail_df = pd.read_excel("../data/landing/850103.xlsx", sheet_name="Data1", index_col=0)

states = ["NSW", "VIC", "QLD", "SA", "WA", "TAS", "NT", "ACT", "Total"]
trend_types = ["original", "seasonal", "trend"]
retail_columns = []
for trend_type in trend_types:
    for state in states:
        retail_columns.append(f"{state}_{trend_type}")

# Add date column
retail_df.columns = retail_columns
retail_df = retail_df.iloc[11:,:]
retail_df.index = pd.to_datetime(retail_df.index).strftime("%Y-%m")
retail_df['date'] = retail_df.index
retail_df.to_parquet("../data/raw/retail_sales.parquet", index=False)
retail_df

Unnamed: 0,NSW_original,VIC_original,QLD_original,SA_original,WA_original,TAS_original,NT_original,ACT_original,Total_original,NSW_seasonal,...,NSW_trend,VIC_trend,QLD_trend,SA_trend,WA_trend,TAS_trend,NT_trend,ACT_trend,Total_trend,date
1982-06,1234.2,887,494.1,277.2,301.5,77.5,,55.3,3357.8,1309.8,...,1305.1,946.3,515.7,291.8,317.8,81.9,,57.7,3547,1982-06
1982-07,1265,921.3,515.6,296.1,316.4,82.7,,56.3,3486.8,1291.9,...,1302.4,954.3,517.3,296.3,319.1,82.2,,58.2,3560.6,1982-07
1982-08,1217.6,883.2,501.4,288.4,300.5,78.1,,55.4,3355.9,1314.4,...,1298.1,963.6,518,301.1,320.4,82.4,,58.9,3573.6,1982-08
1982-09,1244.9,917.9,517.7,293,312.3,79.1,,57.5,3454.3,1292.4,...,1293.1,974.1,517.9,306.2,321.6,82.4,,60,3586.7,1982-09
1982-10,1264.2,983.3,504.2,307.9,318.7,78.7,,61.9,3551.5,1289.4,...,1293.2,985.5,518.1,311.3,322.6,82.5,,61.2,3605.8,1982-10
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2024-04,10639.8,8820.1,6907.1,2218.8,3800.2,682.8,311,641.2,34020.9,11180.9,...,11165.7,9231.7,7380.5,2320.2,4058.4,720.4,331.4,674.9,35883.3,2024-04
2024-05,11090.6,9144.7,7206.6,2298.7,4004.9,707.1,335.3,667.6,35455.4,11181.1,...,11189.4,9264.5,7408.5,2322.8,4088.1,720.6,333.4,677.2,36004.6,2024-05
2024-06,10777.4,8937.4,7163.6,2229,3973.2,678.1,346.4,661.2,34766.2,11228.9,...,11214.5,9300.7,7438.9,2324.5,4114.9,721.1,335.3,679.1,36129.1,2024-06
2024-07,10772.9,8957.7,7424.2,2253.4,4040.8,693.2,372.7,654.4,35169.4,11217.1,...,11239.8,9337.7,7469.3,2325.4,4139.1,721.6,337.2,680.7,36250.9,2024-07


In [20]:
retail_df = spark.read.parquet(".././data/raw/retail_sales.parquet")
retail_df = retail_df.withColumn("month", F.month("date"))
retail_df = retail_df.withColumn("year", F.year("date"))
retail_df = retail_df.drop('date')
retail_df = retail_df.select([col for col in retail_df.columns if not col.endswith('trend')])
retail_df.show(truncate=False)

full_transactions = full_transactions.withColumn("month", F.month("order_datetime"))
full_transactions = full_transactions.withColumn("year", F.year("order_datetime"))
full_transactions = full_transactions.join(retail_df, on=['month', 'year'], how='left')
full_transactions.show(truncate=False)

+------------+------------+------------+-----------+-----------+------------+-----------+------------+--------------+------------+------------+------------+-----------+-----------+------------+-----------+------------+--------------+-----+----+
|NSW_original|VIC_original|QLD_original|SA_original|WA_original|TAS_original|NT_original|ACT_original|Total_original|NSW_seasonal|VIC_seasonal|QLD_seasonal|SA_seasonal|WA_seasonal|TAS_seasonal|NT_seasonal|ACT_seasonal|Total_seasonal|month|year|
+------------+------------+------------+-----------+-----------+------------+-----------+------------+--------------+------------+------------+------------+-----------+-----------+------------+-----------+------------+--------------+-----+----+
|1234.2      |887.0       |494.1       |277.2      |301.5      |77.5        |NULL       |55.3        |3357.8        |1309.8      |952.8       |518.6       |288.9      |321.2      |81.0        |NULL       |58.3        |3561.5        |6    |1982|
|1265.0      |921.3 

                                                                                

+-----+----+--------+--------------+------------+-------+-----------+------------------+------------------------------------+------------------+-----+------+-----------------+---------------------------------+-----------------------------------------------------+----+---------+--------------------------+----------+--------------+----------------+------------------+-------------------+------------------+-------------------------+-------------------------+----------------------+-------+-------+-------+------------+------------+------------+-----------+-----------+------------+-----------+------------+--------------+------------+------------+------------+-----------+-----------+------------+-----------+------------+--------------+
|month|year|postcode|order_datetime|merchant_abn|user_id|consumer_id|dollar_value      |order_id                            |consumer_name     |state|gender|fraud_probability|merchant_name                    |tags                                                 

In [21]:
full_transactions.show(truncate=False)

+-----+----+--------+--------------+------------+-------+-----------+------------------+------------------------------------+------------------+-----+------+-----------------+---------------------------------+-----------------------------------------------------+----+---------+--------------------------+----------+--------------+----------------+------------------+-------------------+------------------+-------------------------+-------------------------+----------------------+-------+-------+-------+------------+------------+------------+-----------+-----------+------------+-----------+------------+--------------+------------+------------+------------+-----------+-----------+------------+-----------+------------+--------------+
|month|year|postcode|order_datetime|merchant_abn|user_id|consumer_id|dollar_value      |order_id                            |consumer_name     |state|gender|fraud_probability|merchant_name                    |tags                                                 

In [22]:
full_transactions.write.mode('overwrite').parquet('.././data/curated/transaction_external')

                                                                                