In [42]:
# Loading library
from functools import reduce
from pyspark.sql import SparkSession, DataFrame, functions as F
from pyspark.sql.types import DoubleType, StringType, IntegerType
import zipfile
import os

# Create a Spark Session
spark = (
    SparkSession.builder.appName("ETL Pipeline")
    .config("spark.sql.repl.eagerEval.enabled", True)
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .config("spark.driver.memory", "4g")
    .config("spark.execturo.memory", "2g")
    .getOrCreate()
)

# Extract

As we can't use `urlretrieve` to get the data from Canvas, please download it to your local machine and move it `data/tables`. Then run the code below to unzip the files

In [16]:
# Assign data path
raw_path = "../data"

# for file in os.listdir(f"{raw_path}/tables"):
#     if file == ".gitkeep":
#         continue
#     with zipfile.ZipFile(f"{raw_path}/tables/{file}", "r") as zip_ref:
#         zip_ref.extractall(f"{raw_path}/")
#     os.remove(f"{raw_path}/tables/{file}")

# Transform

The system use `user_id` as a key for identifying customer in transactions record and fraud probability tables. However, they also have a key-value map of `user_id` and `consumer_id`. We will use `consumer_id` as the only ID for customer. Thus, we will map `user_id` from each table to `consumer_id` and drop the former.


In [24]:
def replace_id(map_df, target_df):
    mapped_df = target_df.join(map_df, on="user_id", how="inner")
    mapped_df = mapped_df.drop('user_id')
    
    return mapped_df

def merchant_info_clean(df):
    df = df.withColumn("tags", F.regexp_replace("tags", r"^[\(\[]|[\)\]]$", "")) # Remove the outermost bracket
    df = df.withColumn("tags", F.regexp_replace("tags", r"[\)\]],\s*[\(\[]", r")\|(")) # Replacing the comma that seperate each touple/list into "|"

# Split accorddingly 
    df = df.withColumn("tags", F.split("tags", "\|")) 
    df = df.withColumns({"category": F.regexp_replace(F.col("tags").getItem(0), r"^[\(\[]|[\)\]]$", ""),
                         "revenue_level": F.regexp_replace(F.col("tags").getItem(1), r"^[\(\[]|[\)\]]$", ""),
                         "take_rate": F.regexp_extract(F.col("tags").getItem(2), r"take rate: (\d+\.\d+)",1).cast(DoubleType())
                        })
    
    df = df.withColumn("category", F.lower(F.col("category")))

    df = df.drop("tags")

    df = df.filter((F.col("revenue_level") == "a") | (F.col("revenue_level") == "b") | (F.col("revenue_level") == "c") |
                   (F.col("revenue_level") == "d") | (F.col("revenue_level") == "e"))
    
    return df

In [34]:
# Load consumer info - a key : value map for user_id to consumer_id
consumer_info = spark.read.parquet(f"{raw_path}/tables/consumer_user_details.parquet")
consumer_info.count()

499999

In [28]:
# Load all files that need to replace user_id
consumer_fraud_rate = spark.read.csv(f"{raw_path}/tables/consumer_fraud_probability.csv", header=True, inferSchema=True)
consumer_fraud_rate = replace_id(consumer_info, consumer_fraud_rate)

In [None]:
# Load transaction data for user_id replacement
transaction_p1 = spark.read.parquet(f"{raw_path}/tables/transactions_20210228_20210827_snapshot")
transaction_p1 = replace_id(consumer_info, transaction_p1)

transaction_p2 = spark.read.parquet(f"{raw_path}/tables/transactions_20210828_20220227_snapshot")
transaction_p2 = replace_id(consumer_info, transaction_p2)

transaction_p3 = spark.read.parquet(f"{raw_path}/tables/transactions_20220228_20220828_snapshot")
transaction_p3 = replace_id(consumer_info, transaction_p3)

transaction_records = reduce(DataFrame.unionAll, [transaction_p1, transaction_p2, transaction_p3])
transaction_records.groupBy("merchant_abn").agg(
    F.sum(F.col("dollar_value")).alias("total_dollar_value")
    )

Now that replacing `user_id` to `consumer_id` is done, load all other data

In [None]:
# Load merchant fraud probability
merchant_fraud_rate = spark.read.csv(f"{raw_path}/tables/merchant_fraud_probability.csv", header=True, inferSchema=True)

date_pattern = r"^\d{4}-\d{2}-\d{2}$"

test = merchant_fraud_rate.withColumn("is_valid_date", F.regexp_extract(F.col("order_datetime"), date_pattern, 0))
invalid_dates = test.filter(F.col("is_valid_date") == "")
invalid_dates

In [161]:
merchant_fraud_rate

merchant_abn,order_datetime,fraud_probability
19492220327,2021-11-28,44.40365864749536
31334588839,2021-10-02,42.75530083865367
19492220327,2021-12-22,38.867790051131095
82999039227,2021-12-19,94.1347004808891
90918180829,2021-09-02,43.32551731714902
31334588839,2021-12-26,38.36165958070444
23686790459,2021-12-10,79.4543441508535
14827550074,2021-11-26,46.45775596795885
31334588839,2021-11-26,36.20971272078342
19492220327,2021-12-18,33.819672154331755


Cleaning `tbl_merchants.parquet`. The feature `tags` is a string that represent either a tuple or a list, containing 3 elements:
* Items that are being sold
* Revenue levels
* Commission rate
Each elements either a list, a tuple, or a combination of both (e.g starts with `[` and ends with `)` and vice versa). These inconsistencies are mostly due to human errors. Thus, we need to take into account these consistent when splitting the values of the feature `tags` into separate columns

In [38]:
# Load merchant's info and clean it
merchant_info = spark.read.parquet(f"{raw_path}/tables/tbl_merchants.parquet")
merchant_info = merchant_info_clean(merchant_info)
merchant_info.groupBy(F.col("revenue_level")).agg(F.avg(F.col("take_rate")))

revenue_level,avg(take_rate)
e,0.3147169811320755
d,0.9912244897959184
c,2.2512039045553167
b,4.094056254626199
a,6.232297128589269


In [39]:
# merchant_info.select("merchant_abn").distinct().count()

4026

In [12]:
# Load consumer's info and reformat
consumer_info = spark.read.csv(f"{raw_path}/tables/tbl_consumer.csv", header=True, inferSchema=True)
consumer_info = consumer_info.withColumn("info", F.split(F.col("name|address|state|postcode|gender|consumer_id"), "\|")).drop(F.col("name|address|state|postcode|gender|consumer_id"))
consumer_info = consumer_info.withColumns({"consumer_id": F.col("info").getItem(5),
                                           "name": F.col("info").getItem(0),
                                           "postcode": F.col("info").getItem(3).cast(IntegerType()),
                                           "gender": F.col("info").getItem(4)}).drop(F.col("info"))
consumer_info.groupBy("gender").count() # relatively same proportion of female and male customer, only a small percentage of did not provide their gender

                                                                                

gender,count
Undisclosed,50074
Female,224946
Male,224979
