# ETL Processes
***
## Lists of tasks:
- ### <del>Consolidating Datasets</del>
- ### Normalising/Restructuring Tables
- ### Exploratory Data Analysis
- ### Data Cleaning
- ### Package ETL.py into a Class
***

## Content:
- ### [Consumer Dataset](#Consumer-dataset)
- ### [Transaction Dataset](#Transaction-dataset)
- ### [Merchant Dataset](#Merchant-dataset)
- ### [Data Aggregations](#Aggregation)


In [1]:
import pandas as pd
import numpy as np
import os
import re

# Set working directory
if not "/data/tables" in os.getcwd():
    os.chdir("../data/tables")

from pyspark.sql import SparkSession
from pyspark.shell import spark
from pyspark.sql import SQLContext
from pyspark.sql.functions import *
import matplotlib.pyplot as plt

spark = (
    SparkSession.builder.appName("MAST30034 Project 2")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .config("spark.driver.memory", "4g")
    .config("spark.sql.broadcastTimeout", -1)
    .getOrCreate()
)

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.1.2
      /_/

Using Python version 3.7.4 (default, Aug 13 2019 15:17:50)
Spark context Web UI available at http://169.254.224.157:4040
Spark context available as 'sc' (master = local[*], app id = local-1662463139952).
SparkSession available as 'spark'.


# Consumer dataset

In [2]:
# Read csv file
consumer = spark.read.option("delimiter", "|").csv('tbl_consumer.csv', header = True)
consumer

name,address,state,postcode,gender,consumer_id
Yolanda Williams,413 Haney Gardens...,WA,6935,Female,1195503
Mary Smith,3764 Amber Oval,NSW,2782,Female,179208
Jill Jones MD,40693 Henry Greens,NT,862,Female,1194530
Lindsay Jimenez,00653 Davenport C...,NSW,2780,Female,154128
Rebecca Blanchard,9271 Michael Mano...,WA,6355,Female,712975
Karen Chapman,2706 Stewart Oval...,NSW,2033,Female,407340
Andrea Jones,122 Brandon Cliff,QLD,4606,Female,511685
Stephen Williams,6804 Wright Crest...,WA,6056,Male,448088
Stephanie Reyes,5813 Denise Land ...,NSW,2482,Female,650435
Jillian Gonzales,461 Ryan Common S...,VIC,3220,Female,1058499


In [3]:
consumer_id = consumer.toPandas()['consumer_id'].unique()

In [4]:
len(consumer_id)

499999

In [5]:
print(f"Dataset details: \n\tNumber of rows: {consumer.count()}", \
      f"\n\tNumber of distinct Consumer ID: {consumer.select('consumer_id').distinct().count()}", \
      f"\n\tNumber of distinct Postcodes: {consumer.select('postcode').distinct().count()}")

Dataset details: 
	Number of rows: 499999 
	Number of distinct Consumer ID: 499999 
	Number of distinct Postcodes: 3167


Note: 
- The **address field is fake** and derived from USA street names. We have included it to mimic a more realistic dataset, but the streets themselves are non-existent and if there are any matches, it will be a pure coincidence. <font color='red'>**Not sure what sort of information we can extract here if they are all fake</font> 
- The **postcode field is accurate** and should be **used for aggregated analysis** for joining with other geospatial datasets for demographic information (i.e ABS datasets) <font color='red'>**Highly relevant for geospatial analysis</font> 
- There is roughly a **uniform distribution at the state level** (i.e number of consumers per state is the same for all states).

### Checking for missing values in consumer dataset

In [6]:
def missing_values_check(sdf):
    """Check missing values in each column of the spark dataframe"""
    
    col_summary = []
    for c, dtype in sdf.dtypes:
        if dtype == 'string':
            col_summary.append(
                count(
                    when(
                        col(c).contains('None') | \
                        col(c).contains('NULL') | \
                        (col(c) == '' ) | \
                        col(c).isNull() | \
                        isnan(c), c
                    )
                ).alias(c)
            )
        
        elif dtype == 'date':
            col_summary.append(
                count(
                    when(
                        col(c).isNull(), c
                    )
                ).alias(c)
            )
            
        else:
            sdf = sdf.fillna({c:-99})
            col_summary.append(
                count(
                    when(
                        col(c) == -99, c
                        
                    )
                ).alias(c)
            )
            
    return sdf.select(col_summary)
                               


In [7]:
missing_values_check(consumer)

name,address,state,postcode,gender,consumer_id
0,0,0,0,0,0


### Check if there exists invalid postcodes

In [11]:
consumer.where((col('postcode') < 800) | (col('postcode') > 8000))


name,address,state,postcode,gender,consumer_id
Mrs. Cynthia Cook,483 Caitlyn Valley,VIC,8070,Female,216120
Jennifer Wood,166 Miller Pass S...,VIC,8070,Female,1421589
Christopher Johnson,586 Kimberly Pine,QLD,9005,Undisclosed,747767
Natalie Stewart,06565 Bullock For...,VIC,8009,Female,1019378
Heather Walker,57761 Erin Square,VIC,8011,Female,175536
Nicole Aguirre,25436 Shepherd St...,VIC,8001,Female,506458
Kathleen Garcia,369 Cannon Parkways,VIC,8120,Female,805144
Wendy Shepard,4120 Christopher ...,VIC,8111,Female,1242001
Joyce Wilson DDS,10840 David Trail,VIC,8002,Female,940835
Alyssa Harris,617 Carrie Track,VIC,8010,Undisclosed,89478


### User detail dataset

In [None]:
user_detail = spark.read.parquet("consumer_user_details.parquet")
user_detail

In [None]:
len(consumer_id)

In [None]:
consumer_id = user_detail.toPandas()['user_id'].unique()

In [None]:
print(f"Dataset details: \n\tNumber of rows: {user_detail.count()}", \
      f"\n\tNumber of distinct User ID: {user_detail.select('user_id').distinct().count()}", \
      f"\n\tNumber of distinct Consumer ID: {user_detail.select('consumer_id').distinct().count()}")

### Checking for missing values in user detail dataset

In [None]:
missing_values_check(user_detail)

Note:
- Due to a difference between the internal system and a poor design choice (for some reason), the transaction tables use a **surrogate key** for each new user_id. <font color='red'>**Transaction dataset uses `user_id` to map customer but customer data are mapped to their own unique `customer_id` so the user detail data serves to map those two together</font> 
- However, the Consumer table has a **unique ID (some are missing on purpose)** field which will require some form of mapping between consumer_id to user_id. <font color='red'>**Might require further investigation and decide on whether it is appropriate to remove</font> 
- An additional mapping table has been provided to join the two datasets together.


# Transaction dataset

In [None]:
transaction = spark.read.parquet("transactions_20210828_20220227_snapshot/")

<div class="alert alert-block alert-warning">
<b>Note:</b> Use the following code to load the transactions files if you have problems running the code above.
</div>

In [None]:
path = "transactions_20210228_20210827_snapshot/"
list_files = os.listdir(path)
list_files = list_files[1:(len(list_files)-1)]
list_files

In [None]:
# import modules
from pyspark.sql import SparkSession
import functools
 
# explicit function
def unionAll(dfs):
    return functools.reduce(lambda df1, df2: df1.union(df2.select(df1.columns)), dfs)

# insert datetime
file_name = os.listdir(path+ list_files[0])[1]
transaction = spark.read.parquet(path+ list_files[0] +"/" + file_name)
for i in list_files[1:]:
    file_name = os.listdir(path + i)[1]
    tmp = spark.read.parquet(path+ i +"/" + file_name)
    transaction = unionAll([transaction, tmp] )

### Inspecting transaction dataset

In [None]:
transaction

In [None]:
transaction.write.parquet("/Users/oliver/Downloads/sales_by_region.parquet")

In [None]:
transaction.printSchema()

In [None]:
min_date, max_date = transaction.select(min("order_datetime"), max("order_datetime")).first()

print(f"Dataset details: \n\tNumber of rows: {transaction.count()}", \
      f"\n\tNumber of distinct order: {transaction.select('order_id').distinct().count()}", \
      f"\n\tPeriod: {min_date} - {max_date}")

### Checking for missing values in transaction dataset

In [None]:
missing_values_check(transaction)

In [None]:
transaction_cust_id = transaction.select('user_id').toPandas()['user_id'].unique()

In [None]:
consumer_id = set(consumer_id)
transaction_cust_id = set(transaction_cust_id)

In [None]:
len(transaction_cust_id)

In [None]:
len(transaction_cust_id.intersection(consumer_id))

In [None]:
transaction.count()

# Merchant dataset

In [None]:
merchant = spark.read.parquet("tbl_merchants.parquet")
merchant

In [None]:
merchant.printSchema()

In [None]:
print(f"Dataset details: \n\tNumber of rows: {merchant.count()}", \
      f"\n\tNumber of distinct Merchant ABN: {merchant.select('merchant_abn').distinct().count()}")

### Checking for missing values in merchant dataset

In [None]:
missing_values_check(merchant)

### The tags column consists of tags, revenue levels and take rate of a merchant
- **Revenue Levels**: (a, b, c, d, e) represents the **level of revenue bands** (unknown to groups). a denotes the smallest band whilst e denotes the highest revenue band. <font color='red'>**Highly relevant in ranking merchant</font> 
- **Take Rate**: the **fee charged by the BNPL firm** to a merchant on a transaction. That is, for each transaction made, a certain percentage is taken by the BNPL firm.<font color='red'>**Highly relevant in ranking merchant</font> 
- The dataset has been created to mimic a Salesforce data extract (i.e salespeople will type in tags and segments within a **free-text** field). <font color='red'>This suggests use of lemmatizating/stemming/fuzzy methods to group similar texts?</font> 
- As such, please be aware of small **human errors** when parsing the dataset.
- For Example, the tag field may have errors as they were manually input by employees.

Since the data is small, we will be using Pandas to deal with Merchant data for convenience.

In [None]:
merchant = merchant.toPandas()

In [None]:
# Display first 5 rows for "tags"
for idx, row in merchant.head(5).iterrows():
    print(row['tags'])


### Extract Revenue Levels and Take Rate columns

In [None]:
extract_tags(merchant)

In [None]:
# Function to extract tags, revenue level and take rate from tags column
def extract_tags(arr, category='tags'):
    
    # Split tags into the three components
    arr = arr[1:-1]
    split_arr = re.split('\), \(|\], \[', arr.strip('[()]'))
    
    if category == 'take_rate':
        return re.findall('[\d\.\d]+', split_arr[2])[0]
    
    elif category == 'revenue_level':
        return split_arr[1].lower()
    
    # by default return tags
    return split_arr[0].lower()


In [None]:
# Extract all three components in tags as standalone columns
merchant['take_rate'] = merchant['tags'].apply(lambda x : extract_tags(x, 'take_rate'))
merchant['revenue_level'] = merchant['tags'].apply(lambda x : extract_tags(x, 'revenue_level'))
merchant['tags'] = merchant['tags'].apply(lambda x : extract_tags(x, 'tags'))


In [None]:
# Check if we extracted the take_rate and rev_level values correctly
print(f"Unique value in Revenue Level: {merchant['revenue_level'].unique()}")
print(f"Range of Take Rate: {merchant['take_rate'].min()} - {merchant['take_rate'].max()}")


In [None]:
# Check data type for columns
merchant.dtypes

In [None]:
merchant['take_rate'] = pd.to_numeric(merchant['take_rate'])

In [None]:
# Check data type for columns
merchant.dtypes

In [None]:
merchant.to_csv('../curated/clean_merchant.csv')

# Aggregation

Here we generate various aggregate data to supplement our analyses and modelling.

### Merchant Sales

In [None]:
# Generate data which summarizes merchants' sales
merchant_sales = (transaction.groupby('merchant_abn', 'order_datetime')
                             .agg({'dollar_value':'sum',
                                   'order_id':'count'})
                             .withColumnRenamed('sum(dollar_value)', 'sales_revenue')
                             .withColumnRenamed('count(order_id)', 'no_orders'))

In [None]:
merchant_sales

In [None]:
# Download data
merchant_sales.write.parquet("../curated/merchant_sales.parquet")

### Customers Purchase Behaviour

In [None]:
transaction.columns

In [None]:
half_transaction = transaction

In [None]:
half_transaction.count()

In [None]:
# Generate data which summarizes customers spendings
customer_purchases = (transaction.groupby('user_id', 'order_datetime')
                                 .agg({'dollar_value':'sum',
                                       'order_id':'count'})
                                 .withColumnRenamed('sum(dollar_value)', 'dollar_spent')
                                 .withColumnRenamed('count(order_id)', 'no_orders'))


In [None]:
customer_purchases

In [None]:
# Download data
customer_purchases.write.parquet("../curated/customer_purchase_behaviour.parquet")

### Sales by Region

In [None]:
# Join transaction data with customer data
customer_transaction = (half_transaction.join(
                            user_detail, 
                            half_transaction.user_id == user_detail.user_id,
                            how = 'left_outer'
                        ).drop(
                            user_detail.user_id
                        ))

# customer_transaction = (customer_transaction.join(consumer, customer_transaction.consumer_id == consumer.consumer_id)
#                                             .drop(consumer.consumer_id)
#                                             .select(transaction['*'], consumer.postcode, consumer.state, consumer.gender))

In [None]:
missing_values_check(customer_transaction)

In [None]:
customer_transaction.describe()

In [None]:
# Download data
customer_transaction.write.parquet("../curated/customer_join_transaction.parquet")


In [None]:
# Aggregate by state -> postcode -> date
sales_by_region = (customer_transaction.groupby('state', 'postcode', 'order_datetime')
                                       .agg({'dollar_value':'sum',
                                             'order_id':'count'})
                                       .withColumnRenamed('sum(dollar_value)', 'dollar_spent')
                                       .withColumnRenamed('count(order_id)', 'no_orders'))


In [None]:
sales_by_region

In [None]:
# Download data
sales_by_region.write.parquet("../curated/sales_by_region.parquet")