In [27]:
from pyspark.sql import functions as F
from pyspark.sql.functions import *
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

In [28]:
from pyspark.sql import SparkSession

# Create a spark session
spark = (
    SparkSession.builder.appName("MAST30034 Project 2")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .getOrCreate()
)

# 1.Read the data in and check the presence of null value

In [29]:
# function used to check the presence of null values.
def check_null_value(dataframe):
    for columns in dataframe.columns:
        print('Number of null value in', columns, ':', dataframe.where(F.col(columns).isNull()).count())
    return

In [30]:
consumer_id = spark.read.parquet('../data/tables/consumer_user_details.parquet')
check_null_value(consumer_id)

Number of null value in user_id : 0
Number of null value in consumer_id : 0


In [31]:
# Use pandas read consumer csv
raw_consumer_df = spark.read.option("header",True).csv('../data/tables/tbl_consumer.csv')
check_null_value(raw_consumer_df)

                                                                                

Number of null value in name : 0
Number of null value in address : 0
Number of null value in state : 0
Number of null value in postcode : 0
Number of null value in gender : 0
Number of null value in consumer_id : 0


In [32]:
merchant_df = spark.read.parquet('../data/tables/tbl_merchants.parquet').withColumnRenamed('name','merchant_name')
check_null_value(merchant_df)

Number of null value in merchant_name : 0
Number of null value in tags : 0
Number of null value in merchant_abn : 0


In [33]:
path_list = []
period1 = pd.date_range('2021-2-28','2021-08-27', freq='D').strftime("%Y-%m-%d").tolist()
period2 = pd.date_range('2021-8-28','2022-02-27', freq='D').strftime("%Y-%m-%d").tolist()
period3 = pd.date_range('2022-2-28','2022-08-28', freq='D').strftime("%Y-%m-%d").tolist()
path_prefix1 = '../data/tables/transactions_20210228_20210827_snapshot/order_datetime='
path_prefix2 = '../data/tables/transactions_20210828_20220227_snapshot/order_datetime='
path_prefix3 = '../data/tables/transactions_20220228_20220828_snapshot/order_datetime='
for date in period1:
    path_list.append(path_prefix1 + date)
for date in period2:
    path_list.append(path_prefix2 + date)
for date in period3:
    path_list.append(path_prefix3 + date)

raw_transactions_df = spark.read.parquet(*path_list)

check_null_value(raw_transactions_df)

                                                                                

Number of null value in user_id : 0


                                                                                

Number of null value in merchant_abn : 0


                                                                                

Number of null value in dollar_value : 0




Number of null value in order_id : 0


                                                                                

#### No null value is found in the raw datasets.

# 2. Data aggregation

First, we aggregate the given consumer, merchant and transactions data, drop the irrelevant features for the ranking system such as consumers' id and name, transactions' order id and merchants' abn.

In [34]:
consumer_df =  consumer_id.join(raw_consumer_df, consumer_id.consumer_id == raw_consumer_df.consumer_id, 'inner')\
                            .withColumnRenamed('name','consumer_name')\
                            .withColumnRenamed('address','consumer_address')

Get the take rate of each merchant.

In [35]:
merchant_df = merchant_df.withColumn('tags_string', split(merchant_df['tags'], ':'))\
                         .withColumn('rate_string', element_at(col('tags_string'), -1))\
                         .withColumn('rate_substring', substring('rate_string', 1, 5))\
                         .withColumn('take_rate', col('rate_substring').cast('double'))\
                         .drop('rate_string', 'rate_substring', 'tags_string')

In [36]:
transactions_df = raw_transactions_df.join(merchant_df, raw_transactions_df.merchant_abn == merchant_df.merchant_abn, 'inner')\
                           .join(consumer_df, raw_transactions_df.user_id == consumer_df.user_id, 'inner')\
                           .drop('consumer_id', 'consumer_name')

In [37]:
raw_transactions_df.count(), transactions_df.count()

                                                                                

(12561377, 12047317)

Since some merchant abns from the transaction data do not match with any merchant from the merchant, we should remove them by inner join with the merchant dataset. About  5% of data are eliminated.

In [38]:
transactions_df.limit(10).select('tags')

                                                                                

tags
([florists suppli...
"[[gift, Card, nov..."
"([gifT, card, nov..."
([tent and awning...
[(digital goods: ...
"[(shoe shops), (a..."
[(computer progra...
[(digital goods: ...
"[[stationery, off..."
"[(optIcians, opti..."


We compute the total income, the number of transactions, the mean transaction amount and the profit for each merchant for the preliminary analysis.

In [39]:
merchant_detail = transactions_df.groupBy('user_id', 'merchant_abn', 'merchant_name', 'tags', 'take_rate')\
                  .agg(
                       F.round(F.sum('dollar_value'),2).alias('total_income'),
                       F.count('merchant_name').alias('total_transactions'), )\
                  .withColumn('mean_transaction_amount',  F.round(F.col('total_income')/F.col('total_transactions')))\
                  .withColumn('monthly_profit',  F.round(F.col('total_income')*F.col('take_rate')/100/18))\
                  .orderBy('monthly_profit', ascending = False)

AnalysisException: Reference 'user_id' is ambiguous, could be: user_id, user_id.

In [None]:
merchant_detail.toPandas()

In [None]:
merchant_detail.count(), merchant_df.count()

In [None]:
# Save the preprocessed data
transactions_df.write.mode('overwrite').parquet('../data/curated/transactions_detail.parquet')
merchant_detail.write.mode('overwrite').parquet('../data/curated/merchant_detail.parquet')

# 3. Preliminarily check distribution of data

In [None]:
bins, counts = merchant_detail.select('monthly_profit').rdd.flatMap(lambda x: x).histogram(200)
plt.hist(bins[:-1], bins=bins, weights=counts)
plt.ylabel('Frequency')
plt.xlabel('monthly_profit (dollar)')
plt.title('Histogram graph of monthly profit')
plt.savefig('../plots/Histogram graph of monthly profit')
plt.show()

In [None]:
bins, counts = merchant_detail.select('total_transactions').rdd.flatMap(lambda x: x).histogram(200)
plt.hist(bins[:-1], bins=bins, weights=counts)
plt.ylabel('Frequency')
plt.xlabel('transaction frequency')
plt.title('Histogram graph of total transactions')
plt.savefig('../plots/Histogram graph of total transactions')
plt.show()

In [None]:
bins, counts = merchant_detail.select('mean_transaction_amount').rdd.flatMap(lambda x: x).histogram(200)
plt.hist(bins[:-1], bins=bins, weights=counts)
plt.ylabel('Frequency')
plt.xlabel('mean transaction (dollar)')
plt.title('Histogram graph of mean transaction amount')
plt.savefig('../plots/Histogram graph of mean transaction amount')
plt.show()

There are some extremely large values, outliers elimination and data transformation may be needed.