# ETL Processes
***
## Lists of tasks:
- ### <del>Consolidating Datasets</del>
- ### Normalising/Restructuring Tables
- ### Exploratory Data Analysis
- ### Data Cleaning
- ### Package ETL.py into a Class
***

## Content:
- ### [Consumer Dataset](#Consumer-dataset)
- ### [Transaction Dataset](#Transaction-dataset)
- ### [Merchant Dataset](#Merchant-dataset)
- ### [Data Aggregations](#Aggregation)


In [1]:
import pandas as pd
import numpy as np
import os
import re

# Set working directory
if not "/data/tables" in os.getcwd():
    os.chdir("../data/tables")

from pyspark.sql import SparkSession
from pyspark.shell import spark
from pyspark.sql import SQLContext
from pyspark.sql.functions import *
import matplotlib.pyplot as plt

spark = (
    SparkSession.builder.appName("MAST30034 Project 2")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .config("spark.driver.memory", "4g")
    .getOrCreate()
)

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.3.0
      /_/

Using Python version 3.9.12 (main, Apr  4 2022 05:22:27)
Spark context Web UI available at http://192.168.1.6:4040
Spark context available as 'sc' (master = local[*], app id = local-1662471150520).
SparkSession available as 'spark'.


# Consumer dataset

In [2]:
# Read csv file
consumer = spark.read.option("delimiter", "|").csv('tbl_consumer.csv', header = True)
consumer

name,address,state,postcode,gender,consumer_id
Yolanda Williams,413 Haney Gardens...,WA,6935,Female,1195503
Mary Smith,3764 Amber Oval,NSW,2782,Female,179208
Jill Jones MD,40693 Henry Greens,NT,862,Female,1194530
Lindsay Jimenez,00653 Davenport C...,NSW,2780,Female,154128
Rebecca Blanchard,9271 Michael Mano...,WA,6355,Female,712975
Karen Chapman,2706 Stewart Oval...,NSW,2033,Female,407340
Andrea Jones,122 Brandon Cliff,QLD,4606,Female,511685
Stephen Williams,6804 Wright Crest...,WA,6056,Male,448088
Stephanie Reyes,5813 Denise Land ...,NSW,2482,Female,650435
Jillian Gonzales,461 Ryan Common S...,VIC,3220,Female,1058499


In [3]:
print(f"Dataset details: \n\tNumber of rows: {consumer.count()}", \
      f"\n\tNumber of distinct Consumer ID: {consumer.select('consumer_id').distinct().count()}", \
      f"\n\tNumber of distinct Postcodes: {consumer.select('postcode').distinct().count()}")

Dataset details: 
	Number of rows: 499999 
	Number of distinct Consumer ID: 499999 
	Number of distinct Postcodes: 3167


Note: 
- The **address field is fake** and derived from USA street names. We have included it to mimic a more realistic dataset, but the streets themselves are non-existent and if there are any matches, it will be a pure coincidence. <font color='red'>**Not sure what sort of information we can extract here if they are all fake</font> 
- The **postcode field is accurate** and should be **used for aggregated analysis** for joining with other geospatial datasets for demographic information (i.e ABS datasets) <font color='red'>**Highly relevant for geospatial analysis</font> 
- There is roughly a **uniform distribution at the state level** (i.e number of consumers per state is the same for all states).

### Checking for missing values in consumer dataset

In [4]:
def missing_values_check(sdf):
    """Check missing values in each column of the spark dataframe"""
    
    missing_count = sdf.select([count(when(col(c).contains('None') | \
                                           col(c).contains('NULL') | \
                                          (col(c) == '' ) | \
                                           col(c).isNull() | \
                                           isnan(c), c))
                                     .alias(c)
                                for c, dtype in sdf.dtypes if dtype != 'date'])
    
    return missing_count


In [5]:
missing_values_check(consumer)

name,address,state,postcode,gender,consumer_id
0,0,0,0,0,0


### User detail dataset

In [21]:
user_detail = spark.read.parquet("consumer_user_details.parquet")
user_detail

user_id,consumer_id
1,1195503
2,179208
3,1194530
4,154128
5,712975
6,407340
7,511685
8,448088
9,650435
10,1058499


In [7]:
print(f"Dataset details: \n\tNumber of rows: {user_detail.count()}", \
      f"\n\tNumber of distinct User ID: {user_detail.select('user_id').distinct().count()}", \
      f"\n\tNumber of distinct Consumer ID: {user_detail.select('consumer_id').distinct().count()}")

Dataset details: 
	Number of rows: 499999 
	Number of distinct User ID: 499999 
	Number of distinct Consumer ID: 499999


### Checking for missing values in user detail dataset

In [8]:
missing_values_check(user_detail)

user_id,consumer_id
0,0


Note:
- Due to a difference between the internal system and a poor design choice (for some reason), the transaction tables use a **surrogate key** for each new user_id. <font color='red'>**Transaction dataset uses `user_id` to map customer but customer data are mapped to their own unique `customer_id` so the user detail data serves to map those two together</font> 
- However, the Consumer table has a **unique ID (some are missing on purpose)** field which will require some form of mapping between consumer_id to user_id. <font color='red'>**Might require further investigation and decide on whether it is appropriate to remove</font> 
- An additional mapping table has been provided to join the two datasets together.


# Transaction dataset

In [9]:
transaction = spark.read.parquet("transactions_20210228_20210827_snapshot/")

Py4JJavaError: An error occurred while calling o267.parquet.
: java.lang.UnsatisfiedLinkError: 'boolean org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(java.lang.String, int)'
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1218)
	at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1423)
	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
	at org.apache.spark.util.HadoopFSUtils$.listLeafFiles(HadoopFSUtils.scala:225)
	at org.apache.spark.util.HadoopFSUtils$.$anonfun$parallelListLeafFilesInternal$1(HadoopFSUtils.scala:95)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.util.HadoopFSUtils$.parallelListLeafFilesInternal(HadoopFSUtils.scala:85)
	at org.apache.spark.util.HadoopFSUtils$.parallelListLeafFiles(HadoopFSUtils.scala:69)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$.bulkListLeafFiles(InMemoryFileIndex.scala:158)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.listLeafFiles(InMemoryFileIndex.scala:131)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.refresh0(InMemoryFileIndex.scala:94)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.<init>(InMemoryFileIndex.scala:66)
	at org.apache.spark.sql.execution.datasources.DataSource.createInMemoryFileIndex(DataSource.scala:567)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:409)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:228)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:210)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:210)
	at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:562)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
	at java.base/java.lang.reflect.Method.invoke(Method.java:577)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)


<div class="alert alert-block alert-warning">
<b>Note:</b> Use the following code to load the transactions files if you have problems running the code above.
</div>

In [3]:
path = "transactions_20210228_20210827_snapshot/"
list_files = os.listdir(path)
list_files = list_files[1:(len(list_files)-1)]

# import modules
from pyspark.sql import SparkSession
import functools
 
# explicit function
def unionAll(dfs):
    return functools.reduce(lambda df1, df2: df1.union(df2.select(df1.columns)), dfs)

# read files
file_name = os.listdir(path+ list_files[0])[1]
transaction = spark.read.parquet(path+ list_files[0] +"/" + file_name)
transaction = transaction.withColumn('order_datetime',lit(list_files[0][15:]))
for i in list_files[1:]:
    file_name = os.listdir(path + i)[1]
    tmp = spark.read.parquet(path+ i +"/" + file_name)
    tmp = tmp.withColumn('order_datetime',lit(i[15:]))
    transaction = unionAll([transaction, tmp] )

### Inspecting transaction dataset

In [4]:
transaction

user_id,merchant_abn,dollar_value,order_id,order_datetime
1,28000487688,133.22689421562643,0c37b3f7-c7f1-48c...,2021-02-28
18485,62191208634,79.13140006851712,9e18b913-0465-4fd...,2021-02-28
1,83690644458,30.441348317517228,40a2ff69-ea34-465...,2021-02-28
18488,39649557865,962.8133405407584,f4c1a5ae-5b76-40d...,2021-02-28
2,80779820715,48.12397733548124,cd09bdd6-f56d-489...,2021-02-28
18489,43186523025,98.14878546968934,9008a98e-1b02-4de...,2021-02-28
3,29566626791,46.33087226118639,26b7574e-81c2-455...,2021-02-28
18490,93558142492,232.83335268750145,2bda0665-796f-4f2...,2021-02-28
3,32361057556,87.34942171371054,633a7656-2fcc-4b8...,2021-02-28
18491,64974914166,130.12601873970038,4bc15338-83eb-43d...,2021-02-28


In [11]:
transaction.printSchema()

root
 |-- user_id: long (nullable = true)
 |-- merchant_abn: long (nullable = true)
 |-- dollar_value: double (nullable = true)
 |-- order_id: string (nullable = true)
 |-- order_datetime: string (nullable = false)



In [11]:
min_date, max_date = transaction.select(min("order_datetime"), max("order_datetime")).first()

print(f"Dataset details: \n\tNumber of rows: {transaction.count()}", \
      f"\n\tNumber of distinct order: {transaction.select('order_id').distinct().count()}", \
      f"\n\tPeriod: {min_date} - {max_date}")

Dataset details: 
	Number of rows: 3643266 
	Number of distinct order: 3643266 
	Period: 2021-02-28 - 2021-08-27


### Checking for missing values in transaction dataset

In [110]:
missing_values_check(transaction)

user_id,merchant_abn,dollar_value,order_id
0,0,0,0


In [None]:
transaction.count()

# Merchant dataset

In [5]:
merchant = spark.read.parquet("tbl_merchants.parquet")
merchant

name,tags,merchant_abn
Felis Limited,"((furniture, home...",10023283211
Arcu Ac Orci Corp...,"([cable, satellit...",10142254217
Nunc Sed Company,"([jewelry, watch,...",10165489824
Ultricies Digniss...,"([wAtch, clock, a...",10187291046
Enim Condimentum PC,([music shops - m...,10192359162
Fusce Company,"[(gift, card, nov...",10206519221
Aliquam Enim Inco...,"[(computers, comP...",10255988167
Ipsum Primis Ltd,"[[watch, clock, a...",10264435225
Pede Ultrices Ind...,([computer progra...,10279061213
Nunc Inc.,"[(furniture, home...",10323485998


In [13]:
merchant.printSchema()

root
 |-- name: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- merchant_abn: long (nullable = true)



In [14]:
print(f"Dataset details: \n\tNumber of rows: {merchant.count()}", \
      f"\n\tNumber of distinct Merchant ABN: {merchant.select('merchant_abn').distinct().count()}")

Dataset details: 
	Number of rows: 4026 
	Number of distinct Merchant ABN: 4026


### Checking for missing values in merchant dataset

In [15]:
missing_values_check(merchant)

name,tags,merchant_abn
0,0,0


### The tags column consists of tags, revenue levels and take rate of a merchant
- **Revenue Levels**: (a, b, c, d, e) represents the **level of revenue bands** (unknown to groups). a denotes the smallest band whilst e denotes the highest revenue band. <font color='red'>**Highly relevant in ranking merchant</font> 
- **Take Rate**: the **fee charged by the BNPL firm** to a merchant on a transaction. That is, for each transaction made, a certain percentage is taken by the BNPL firm.<font color='red'>**Highly relevant in ranking merchant</font> 
- The dataset has been created to mimic a Salesforce data extract (i.e salespeople will type in tags and segments within a **free-text** field). <font color='red'>This suggests use of lemmatizating/stemming/fuzzy methods to group similar texts?</font> 
- As such, please be aware of small **human errors** when parsing the dataset.
- For Example, the tag field may have errors as they were manually input by employees.

Since the data is small, we will be using Pandas to deal with Merchant data for convenience.

In [6]:
merchant = merchant.toPandas()

In [7]:
# Display first 5 rows for "tags"
for idx, row in merchant.head(5).iterrows():
    print(row['tags'])


((furniture, home furnishings and equipment shops, and manufacturers, except appliances), (e), (take rate: 0.18))
([cable, satellite, and otHer pay television and radio services], [b], [take rate: 4.22])
([jewelry, watch, clock, and silverware shops], [b], [take rate: 4.40])
([wAtch, clock, and jewelry repair shops], [b], [take rate: 3.29])
([music shops - musical instruments, pianos, and sheet music], [a], [take rate: 6.33])


### Extract Revenue Levels and Take Rate columns

In [8]:
# Function to extract tags, revenue level and take rate from tags column
def extract_tags(arr, category='tags'):
    
    # Split tags into the three components
    arr = arr[1:-1]
    split_arr = re.split('\), \(|\], \[', arr.strip('[()]'))
    
    if category == 'take_rate':
        return re.findall('[\d\.\d]+', split_arr[2])[0]
    
    elif category == 'revenue_level':
        return split_arr[1].lower()
    
    return split_arr[0].lower()


In [9]:
# Extract all three components in tags as standalone columns
merchant['take_rate'] = merchant['tags'].apply(lambda x : extract_tags(x, 'take_rate'))
merchant['revenue_level'] = merchant['tags'].apply(lambda x : extract_tags(x, 'revenue_level'))
merchant['tags'] = merchant['tags'].apply(lambda x : extract_tags(x, 'tags'))


In [10]:
# Check if we extracted the take_rate and rev_level values correctly
print(f"Unique value in Revenue Level: {merchant['revenue_level'].unique()}")
print(f"Range of Take Rate: {merchant['take_rate'].min()} - {merchant['take_rate'].max()}")


Unique value in Revenue Level: ['e' 'b' 'a' 'c' 'd']
Range of Take Rate: 0.10 - 7.00


In [11]:
# Check data type for columns
merchant.dtypes

name             object
tags             object
merchant_abn      int64
take_rate        object
revenue_level    object
dtype: object

In [12]:
merchant['take_rate'] = pd.to_numeric(merchant['take_rate'])

In [13]:
# Check data type for columns
merchant.dtypes

name              object
tags              object
merchant_abn       int64
take_rate        float64
revenue_level     object
dtype: object

In [14]:
merchant.to_csv('../curated/clean_merchant.csv')

In [15]:
merchant

Unnamed: 0,name,tags,merchant_abn,take_rate,revenue_level
0,Felis Limited,"furniture, home furnishings and equipment shop...",10023283211,0.18,e
1,Arcu Ac Orci Corporation,"cable, satellite, and other pay television and...",10142254217,4.22,b
2,Nunc Sed Company,"jewelry, watch, clock, and silverware shops",10165489824,4.40,b
3,Ultricies Dignissim Lacus Foundation,"watch, clock, and jewelry repair shops",10187291046,3.29,b
4,Enim Condimentum PC,"music shops - musical instruments, pianos, and...",10192359162,6.33,a
...,...,...,...,...,...
4021,Elit Dictum Eu Ltd,"opticians, optical goods, and eyeglasses",99938978285,4.50,b
4022,Mollis LLP,"books, periodicals, and newspapers",99974311662,3.17,b
4023,Sociosqu Corp.,shoe shops,99976658299,6.57,a
4024,Commodo Hendrerit LLC,motor vehicle supplies and new parts,99987905597,6.82,a


# Aggregation

Here we generate various aggregate data to supplement our analyses and modelling.

### Merchant Sales

In [16]:
# Generate data which summarizes merchants' sales
merchant_sales = (transaction.groupby('merchant_abn', 'order_datetime')
                             .agg({'dollar_value':'sum',
                                   'order_id':'count'})
                             .withColumnRenamed('sum(dollar_value)', 'sales_revenue')
                             .withColumnRenamed('count(order_id)', 'no_orders'))

In [17]:
merchant_sales

merchant_abn,order_datetime,sales_revenue,no_orders
10206519221,2021-02-28,187.7153766867693,10
98416475066,2021-02-28,1058.5452757003895,12
89430765327,2021-02-28,253.9068580537148,2
76819856970,2021-02-28,9839.311194112235,16
56180588880,2021-02-28,3900.506412051464,1
13839274306,2021-02-28,1377.6882307413898,22
12438697459,2021-02-28,590.3300849013574,5
27109170541,2021-02-28,766.5064222099558,2
26505333735,2021-02-28,887.2957823944452,4
32413511882,2021-02-28,747.5319334842841,1


In [117]:
# Download data
merchant_sales.write.parquet("../curated/merchant_sales.parquet")

### Customers Purchase Behaviour

In [18]:
# Generate data which summarizes customers spendings
customer_purchases = (transaction.groupby('user_id', 'order_datetime')
                                 .agg({'dollar_value':'sum',
                                       'order_id':'count'})
                                 .withColumnRenamed('sum(dollar_value)', 'dollar_spent')
                                 .withColumnRenamed('count(order_id)', 'no_orders'))


In [19]:
customer_purchases

user_id,order_datetime,dollar_spent,no_orders
269,2021-02-28,138.55889233239787,2
18824,2021-02-28,161.30065784276258,2
18859,2021-02-28,47.75901508104701,1
723,2021-02-28,151.57254593190075,1
19588,2021-02-28,67.85442400130255,1
19752,2021-02-28,165.39506556429245,2
1240,2021-02-28,62.79844175476583,1
1546,2021-02-28,119.08585100461778,1
20617,2021-02-28,276.75110260730594,2
21065,2021-02-28,222.43537264833688,1


In [None]:
# Download data
customer_purchases.write.parquet("../curated/customer_purchase_behaviour.parquet")

### Sales by Region

In [22]:
# Join transaction data with customer data
customer_transaction = (transaction.join(user_detail, transaction.user_id == user_detail.user_id)
                                   .drop(user_detail.user_id))

customer_transaction = (customer_transaction.join(consumer, customer_transaction.consumer_id == consumer.consumer_id)
                                            .drop(consumer.consumer_id)
                                            .select(transaction['*'], consumer.postcode, consumer.state, consumer.gender))

In [23]:
customer_transaction

user_id,merchant_abn,dollar_value,order_id,order_datetime,postcode,state,gender
3698,55778594682,21.941266654463465,8a806e0d-558d-468...,2021-02-28,2299,NSW,Male
3698,10648956813,99.30549322421652,e493f287-efe0-425...,2021-03-03,2299,NSW,Male
3698,75089928159,3.363306277086005,94b7fb1a-82d1-422...,2021-03-06,2299,NSW,Male
3698,42543374304,351.2979463642349,451bdc08-cc6e-41b...,2021-03-08,2299,NSW,Male
3698,54611298155,2034.6296050908,dccaacc6-a0fd-44c...,2021-03-10,2299,NSW,Male
3698,24852446429,22.143273437576056,c6b2cfa3-2494-4fa...,2021-03-12,2299,NSW,Male
3698,63123845164,384.9929053083648,ee723c3a-ad57-4b3...,2021-03-12,2299,NSW,Male
3698,63290521567,42.178347325192256,7d5afc38-5be8-4c6...,2021-03-12,2299,NSW,Male
3698,42355028515,76.8823425915479,274dfcce-a369-46c...,2021-03-14,2299,NSW,Male
3698,65674339048,80.50821804740839,dd90c0e0-e343-40c...,2021-03-16,2299,NSW,Male


In [None]:
# Download data
customer_transaction.write.parquet("../curated/customer_join_transaction.parquet")


In [24]:
# Aggregate by state -> postcode -> date
sales_by_region = (customer_transaction.groupby('state', 'postcode', 'order_datetime')
                                       .agg({'dollar_value':'sum',
                                             'order_id':'count'})
                                       .withColumnRenamed('sum(dollar_value)', 'dollar_spent')
                                       .withColumnRenamed('count(order_id)', 'no_orders'))


In [25]:
sales_by_region

state,postcode,order_datetime,dollar_spent,no_orders
NSW,2299,2021-04-24,487.8261273032642,6
NSW,2299,2021-05-23,515.0847836543765,8
NSW,2323,2021-06-08,1347.7929991124768,9
NSW,2582,2021-07-14,1482.574981292855,7
SA,5172,2021-07-18,2837.824624108052,11
QLD,4003,2021-08-24,1057.323047286387,5
WA,6057,2021-03-06,186.83207671677712,3
WA,6057,2021-03-30,881.0567063659305,13
WA,6415,2021-03-14,2926.7564431043656,9
WA,6415,2021-05-07,1012.219799159162,10


In [None]:
# Download data
sales_by_region.write.parquet("../curated/sales_by_region.parquet")

# 0903

In [26]:
#make 'month' column extracting from 'order_datetime'
customer_transaction = customer_transaction.select(col("*"),col("order_datetime"),
          month(col("order_datetime")).alias('month')
                           )
customer_transaction

user_id,merchant_abn,dollar_value,order_id,order_datetime,postcode,state,gender,order_datetime.1,month
3698,55778594682,21.941266654463465,8a806e0d-558d-468...,2021-02-28,2299,NSW,Male,2021-02-28,2
3698,10648956813,99.30549322421652,e493f287-efe0-425...,2021-03-03,2299,NSW,Male,2021-03-03,3
3698,75089928159,3.363306277086005,94b7fb1a-82d1-422...,2021-03-06,2299,NSW,Male,2021-03-06,3
3698,42543374304,351.2979463642349,451bdc08-cc6e-41b...,2021-03-08,2299,NSW,Male,2021-03-08,3
3698,54611298155,2034.6296050908,dccaacc6-a0fd-44c...,2021-03-10,2299,NSW,Male,2021-03-10,3
3698,24852446429,22.143273437576056,c6b2cfa3-2494-4fa...,2021-03-12,2299,NSW,Male,2021-03-12,3
3698,63123845164,384.9929053083648,ee723c3a-ad57-4b3...,2021-03-12,2299,NSW,Male,2021-03-12,3
3698,63290521567,42.178347325192256,7d5afc38-5be8-4c6...,2021-03-12,2299,NSW,Male,2021-03-12,3
3698,42355028515,76.8823425915479,274dfcce-a369-46c...,2021-03-14,2299,NSW,Male,2021-03-14,3
3698,65674339048,80.50821804740839,dd90c0e0-e343-40c...,2021-03-16,2299,NSW,Male,2021-03-16,3


In [None]:
# Download data
customer_transaction.write.parquet("../curated/customer_transaction.parquet")

# Customer Profile by each merchant by Whole time period(given) 

In [27]:
customer_purchases_W = (customer_transaction.groupby('merchant_abn', 'user_id')
                                       .agg({'dollar_value':'sum',
                                             'order_id':'count'})
                                       .withColumnRenamed('sum(dollar_value)', 'dollar_spent')
                                       .withColumnRenamed('count(order_id)', 'no_orders'))
#  customer profiles by each merchant(# of orders, dollar spent,..)
customer_purchase_bymerchant = customer_purchases_W.orderBy('merchant_abn')


customer_purchase_bymerchant_sum = (customer_purchase_bymerchant.groupby('merchant_abn')
                                       .agg({'user_id':'count',
                                             'dollar_spent':'sum',
                                             'no_orders':'sum'}))


#  customer'saverage value of dollaer spent and number of orders by each merchant
customer_purchase_bymerchant_mean = (customer_purchase_bymerchant.groupby('merchant_abn')
                                       .agg({'user_id':'count',
                                             'dollar_spent':'mean',
                                             'no_orders':'mean'}))
customer_purchase_bymerchant_mean

merchant_abn,count(user_id),avg(no_orders),avg(dollar_spent)
10023283211,810,1.0185185185185186,212.61634618932624
10142254217,751,1.0266311584553929,41.106075595661295
10187291046,88,1.0,110.32619101999413
10192359162,107,1.0,451.1432080236007
10206519221,2303,1.05601389491967,40.37604885055831
10255988167,218,1.0,389.55526545205015
10264435225,1241,1.0273972602739727,117.2157978515124
10279061213,131,1.0,312.349403146513
10323485998,2483,1.058799838904551,136.54531611591233
10342410215,238,1.0,378.0169713940926


In [None]:
# Download data
customer_purchase_bymerchant_mean.write.parquet("../curated/customer_purchase_bymerchant_mean.parquet")

## state diversity by each merchant

In [29]:
customer_state_bymerchant = customer_transaction.select("merchant_abn",'gender','state')
customer_state_bymerchant = customer_state_bymerchant.orderBy('merchant_abn')
customer_state_bymerchant

merchant_abn,gender,state
10023283211,Female,VIC
10023283211,Male,VIC
10023283211,Male,QLD
10023283211,Male,WA
10023283211,Female,NSW
10023283211,Male,NSW
10023283211,Male,VIC
10023283211,Male,VIC
10023283211,Undisclosed,VIC
10023283211,Male,WA


In [None]:
# Download data
customer_state_bymerchant.write.parquet("../curated/customer_state_bymerchant.parquet")

In [51]:
# number of transaction occurred in each state by merchant
from pyspark.sql import SQLContext

customer_state_bymerchant.createOrReplaceTempView('customer_state_bymerchant')

state_merchant_count =  spark.sql(
"""
Select merchant_abn,state,
    count(state) as N

from customer_state_bymerchant
group by merchant_abn,state
order by merchant_abn, state
""")
state_merchant_count

merchant_abn,state,N
10023283211,ACT,6
10023283211,NSW,215
10023283211,NT,11
10023283211,QLD,134
10023283211,SA,92
10023283211,TAS,36
10023283211,VIC,189
10023283211,WA,142
10142254217,ACT,4
10142254217,NSW,218


In [None]:
# Download data
state_merchant_count.write.parquet("../curated/state_merchant_count.parquet")

# Monthly Customer Profile by each merchant

In [32]:
# customer profile by each month in each merchant
customer_purchases_month = (customer_transaction.groupby('merchant_abn', 'user_id','month')
                                       .agg({'dollar_value':'sum',
                                             'order_id':'count'})
                                       .withColumnRenamed('sum(dollar_value)', 'dollar_spent')
                                       .withColumnRenamed('count(order_id)', 'no_orders'))

customer_purchase_bymerchant_month_mean = (customer_purchases_month.groupby('merchant_abn','month')
                                       .agg({'user_id':'count',
                                             'dollar_spent':'mean',
                                             'no_orders':'mean'})
                                       .withColumnRenamed('count(user_id)', 'user_N')
                                       .withColumnRenamed('avg(dollar_spent)', 'dollar_spent_monthly')
                                       .withColumnRenamed('avg(no_orders)', 'no_orders_monthly'))


customer_purchase_bymerchant_month_mean = customer_purchase_bymerchant_month_mean.orderBy('merchant_abn','month')
customer_purchase_bymerchant_month_mean

merchant_abn,month,user_N,no_orders_monthly,dollar_spent_monthly
10023283211,2,3,1.0,233.85554607960853
10023283211,3,111,1.0,221.9310405054564
10023283211,4,129,1.0,214.1266880162709
10023283211,5,142,1.0140845070422535,212.05551592195565
10023283211,6,145,1.0,198.55471923470589
10023283211,7,153,1.0,192.3577249024277
10023283211,8,139,1.0071942446043165,222.5031371574782
10142254217,2,1,1.0,10.88692600276833
10142254217,3,120,1.0,38.578921812952856
10142254217,4,116,1.0,37.29687171920041


In [None]:
# Download data
customer_purchase_bymerchant_month_mean.write.parquet("../curated/customer_purchase_bymerchant_month_mean.parquet")

In [47]:
from pyspark.sql.functions import when

def mk_month_data(i):
    merchant_abn_n = "merchant_abn_" + str(i) 
    user_N_n = "user_N_" + str(i)
    no_orders_monthly_n = "no_orders_monthly_" + str(i)
    dollar_spent_monthly_n = 'dollar_spent_monthly_' + str(i)
    month = customer_purchase_bymerchant_month_mean.filter(col("month") == i).withColumnRenamed('merchant_abn', merchant_abn_n).withColumnRenamed('user_N', user_N_n).withColumnRenamed('no_orders_monthly', no_orders_monthly_n).withColumnRenamed('dollar_spent_monthly', dollar_spent_monthly_n).drop("month")
    return(month)


In [48]:
month_2 = mk_month_data(2)
month_3 = mk_month_data(3)

In [49]:
# Feb + March
month_all = month_2.join(month_3,month_2["merchant_abn_2"] == month_3["merchant_abn_3"],"outer")
month_all = month_all.withColumn("merchant_abn",when(month_all.merchant_abn_2.isNull() ,month_all.merchant_abn_3)
                                 .otherwise(month_all.merchant_abn_2))
month_all = month_all.drop("merchant_abn_2","merchant_abn_3")

for i in range(4,9):
    month =mk_month_data(i)
    merchant_abn_n = "merchant_abn_" + str(i)
    
    month_all = month_all.join(month,month_all["merchant_abn"] == month[merchant_abn_n],"outer")
    month_all = month_all.withColumnRenamed(merchant_abn_n, "tmp")
    month_all = month_all.withColumn("merchant_abn",when(month_all.merchant_abn.isNull() ,month_all.tmp)
                                     .otherwise(month_all.merchant_abn))
    month_all = month_all.drop(merchant_abn_n,"tmp")
month_all

user_N_2,no_orders_monthly_2,dollar_spent_monthly_2,user_N_3,no_orders_monthly_3,dollar_spent_monthly_3,merchant_abn,user_N_4,no_orders_monthly_4,dollar_spent_monthly_4,user_N_5,no_orders_monthly_5,dollar_spent_monthly_5,user_N_6,no_orders_monthly_6,dollar_spent_monthly_6,user_N_7,no_orders_monthly_7,dollar_spent_monthly_7,user_N_8,no_orders_monthly_8,dollar_spent_monthly_8
3.0,1.0,233.85554607960853,111.0,1.0,221.9310405054564,10023283211,129.0,1.0,214.1266880162709,142.0,1.0140845070422535,212.0555159219556,145.0,1.0,198.55471923470589,153.0,1.0,192.3577249024277,139.0,1.0071942446043165,222.50313715747825
1.0,1.0,10.88692600276833,120.0,1.0,38.57892181295287,10142254217,116.0,1.0,37.29687171920041,131.0,1.0076335877862594,43.13398261474645,139.0,1.014388489208633,42.613813056750764,145.0,1.006896551724138,39.692417410622326,115.0,1.0,39.7790943251901
,,,16.0,1.0,107.72361075937582,10187291046,19.0,1.0,119.01709223111595,15.0,1.0,88.61273189667673,16.0,1.0,110.08781285928356,9.0,1.0,124.45328364595768,13.0,1.0,116.39436524661224
,,,12.0,1.0,534.7979863110331,10192359162,16.0,1.0,626.603918306726,14.0,1.0,318.22757436618195,26.0,1.0,371.8861967387389,15.0,1.0,431.2986305997255,24.0,1.0,468.140754773151
10.0,1.0,18.77153766867693,373.0,1.0053619302949062,38.600205239802,10206519221,387.0,1.0025839793281657,43.28378954029646,441.0,1.002267573696145,37.90847624764909,383.0,1.0130548302872062,36.67461845535125,397.0,1.0100755667506298,38.999943862474346,423.0,1.011820330969267,36.4128307626313
,,,40.0,1.0,370.18030304127865,10255988167,27.0,1.0,364.2554792577587,37.0,1.0,452.9652645793652,36.0,1.0,317.63375442095355,41.0,1.0,344.2160346549832,37.0,1.0,485.7716334457072
8.0,1.0,135.69085104347488,193.0,1.005181347150259,114.1983718871035,10264435225,204.0,1.0049019607843137,117.7236448382106,210.0,1.0047619047619047,113.39824828783368,223.0,1.0044843049327354,113.46894444356228,220.0,1.0045454545454546,107.02970272739216,211.0,1.004739336492891,121.60960972885104
,,,17.0,1.0,321.22735301358034,10279061213,19.0,1.0,307.65563671213323,18.0,1.0,293.4111782823476,27.0,1.0,325.52495716064726,30.0,1.0,317.6128360509162,20.0,1.0,300.62447897423084
12.0,1.0,131.31077307033766,379.0,1.0026385224274406,127.26761157158916,10323485998,389.0,1.0051413881748072,129.90312988335592,436.0,1.006880733944954,133.81992118362578,445.0,1.0179775280898875,130.12091222234585,503.0,1.0059642147117296,128.72656142256034,443.0,1.0112866817155757,130.25010464594826
1.0,1.0,546.8300161135974,29.0,1.0,350.19521887080884,10342410215,49.0,1.0,411.4055327273485,46.0,1.0,345.10032987257193,49.0,1.0,397.9043129159006,36.0,1.0,372.52578049920976,28.0,1.0,368.7079328499261


In [None]:
# Download data
month_all.write.parquet("../curated/month_all.parquet")

In [52]:
month_all_pandas = month_all.select("*").toPandas()
month_all_pandas

Unnamed: 0,user_N_2,no_orders_monthly_2,dollar_spent_monthly_2,user_N_3,no_orders_monthly_3,dollar_spent_monthly_3,merchant_abn,user_N_4,no_orders_monthly_4,dollar_spent_monthly_4,...,dollar_spent_monthly_5,user_N_6,no_orders_monthly_6,dollar_spent_monthly_6,user_N_7,no_orders_monthly_7,dollar_spent_monthly_7,user_N_8,no_orders_monthly_8,dollar_spent_monthly_8
0,3.0,1.000000,233.855546,111.0,1.000000,221.931041,10023283211,129.0,1.000000,214.126688,...,212.055516,145.0,1.000000,198.554719,153.0,1.000000,192.357725,139.0,1.007194,222.503137
1,1.0,1.000000,10.886926,120.0,1.000000,38.578922,10142254217,116.0,1.000000,37.296872,...,43.133983,139.0,1.014388,42.613813,145.0,1.006897,39.692417,115.0,1.000000,39.779094
2,,,,16.0,1.000000,107.723611,10187291046,19.0,1.000000,119.017092,...,88.612732,16.0,1.000000,110.087813,9.0,1.000000,124.453284,13.0,1.000000,116.394365
3,,,,12.0,1.000000,534.797986,10192359162,16.0,1.000000,626.603918,...,318.227574,26.0,1.000000,371.886197,15.0,1.000000,431.298631,24.0,1.000000,468.140755
4,10.0,1.000000,18.771538,373.0,1.005362,38.600205,10206519221,387.0,1.002584,43.283790,...,37.908476,383.0,1.013055,36.674618,397.0,1.010076,38.999944,423.0,1.011820,36.412831
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
4354,19.0,1.000000,29.100052,671.0,1.010432,30.706334,99938978285,641.0,1.007800,30.916451,...,30.535293,772.0,1.019430,31.686462,775.0,1.012903,29.040884,713.0,1.014025,30.048953
4355,,,,5.0,1.000000,301.093134,99974311662,5.0,1.000000,171.788718,...,556.590131,8.0,1.000000,369.765453,6.0,1.000000,341.297195,6.0,1.000000,248.971769
4356,31.0,1.032258,162.800652,903.0,1.015504,147.241897,99976658299,901.0,1.014428,153.506541,...,155.687732,978.0,1.015337,150.091754,1053.0,1.022792,155.285307,980.0,1.023469,156.425385
4357,,,,4.0,1.000000,288.776125,99987905597,9.0,1.000000,413.783283,...,234.248764,11.0,1.000000,369.063765,7.0,1.000000,425.081275,5.0,1.000000,377.143481


# Check Stability of merchant

In [36]:
month_all_dollar_spent = month_all_pandas[["merchant_abn", "dollar_spent_monthly_2", "dollar_spent_monthly_3",\
                 "dollar_spent_monthly_4", "dollar_spent_monthly_5",\
                 "dollar_spent_monthly_6", "dollar_spent_monthly_7",\
                 "dollar_spent_monthly_8"]]

month_all_dollar_spent["dollar_spent_std"] = month_all_dollar_spent.iloc[:,1:].std(axis = 1)
month_all_dollar_spent

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  month_all_dollar_spent["dollar_spent_std"] = month_all_dollar_spent.iloc[:,1:].std(axis = 1)


Unnamed: 0,merchant_abn,dollar_spent_monthly_2,dollar_spent_monthly_3,dollar_spent_monthly_4,dollar_spent_monthly_5,dollar_spent_monthly_6,dollar_spent_monthly_7,dollar_spent_monthly_8,dollar_spent_std
0,10023283211,233.855546,221.931041,214.126688,212.055516,198.554719,192.357725,222.503137,14.371337
1,10142254217,10.886926,38.578922,37.296872,43.133983,42.613813,39.692417,39.779094,11.266106
2,10187291046,,107.723611,119.017092,88.612732,110.087813,124.453284,116.394365,12.546764
3,10192359162,,534.797986,626.603918,318.227574,371.886197,431.298631,468.140755,111.437402
4,10206519221,18.771538,38.600205,43.283790,37.908476,36.674618,38.999944,36.412831,7.849015
...,...,...,...,...,...,...,...,...,...
4354,99938978285,29.100052,30.706334,30.916451,30.535293,31.686462,29.040884,30.048953,0.966990
4355,99974311662,,301.093134,171.788718,556.590131,369.765453,341.297195,248.971769,130.743210
4356,99976658299,162.800652,147.241897,153.506541,155.687732,150.091754,155.285307,156.425385,4.961597
4357,99987905597,,288.776125,413.783283,234.248764,369.063765,425.081275,377.143481,74.754914


In [37]:
# excluded Feb data, because there are only one data(02-28) /  this makes std value so high
month_all_user_N = month_all_pandas[["merchant_abn", "user_N_3",\
                 "user_N_4", "user_N_5",\
                 "user_N_6", "user_N_7",\
                 "user_N_8"]]

month_all_user_N["user_N_std"] = month_all_user_N.iloc[:,1:].std(axis = 1)
month_all_user_N

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  month_all_user_N["user_N_std"] = month_all_user_N.iloc[:,1:].std(axis = 1)


Unnamed: 0,merchant_abn,user_N_3,user_N_4,user_N_5,user_N_6,user_N_7,user_N_8,user_N_std
0,10023283211,111.0,129.0,142.0,145.0,153.0,139.0,14.747881
1,10142254217,120.0,116.0,131.0,139.0,145.0,115.0,12.612163
2,10187291046,16.0,19.0,15.0,16.0,9.0,13.0,3.386247
3,10192359162,12.0,16.0,14.0,26.0,15.0,24.0,5.741661
4,10206519221,373.0,387.0,441.0,383.0,397.0,423.0,26.089589
...,...,...,...,...,...,...,...,...
4354,99938978285,671.0,641.0,773.0,772.0,775.0,713.0,58.523215
4355,99974311662,5.0,5.0,3.0,8.0,6.0,6.0,1.643168
4356,99976658299,903.0,901.0,1079.0,978.0,1053.0,980.0,73.853007
4357,99987905597,4.0,9.0,9.0,11.0,7.0,5.0,2.664583


In [38]:
month_all_no_orders_monthly = month_all_pandas[["merchant_abn", "no_orders_monthly_2", "no_orders_monthly_3",\
                 "no_orders_monthly_4", "no_orders_monthly_5",\
                 "no_orders_monthly_6", "no_orders_monthly_7",\
                 "no_orders_monthly_8"]]

month_all_no_orders_monthly["no_orders_monthly_std"] = month_all_no_orders_monthly.iloc[:,1:].std(axis = 1)
month_all_no_orders_monthly

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  month_all_no_orders_monthly["no_orders_monthly_std"] = month_all_no_orders_monthly.iloc[:,1:].std(axis = 1)


Unnamed: 0,merchant_abn,no_orders_monthly_2,no_orders_monthly_3,no_orders_monthly_4,no_orders_monthly_5,no_orders_monthly_6,no_orders_monthly_7,no_orders_monthly_8,no_orders_monthly_std
0,10023283211,1.000000,1.000000,1.000000,1.014085,1.000000,1.000000,1.007194,0.005559
1,10142254217,1.000000,1.000000,1.000000,1.007634,1.014388,1.006897,1.000000,0.005677
2,10187291046,,1.000000,1.000000,1.000000,1.000000,1.000000,1.000000,0.000000
3,10192359162,,1.000000,1.000000,1.000000,1.000000,1.000000,1.000000,0.000000
4,10206519221,1.000000,1.005362,1.002584,1.002268,1.013055,1.010076,1.011820,0.005177
...,...,...,...,...,...,...,...,...,...
4354,99938978285,1.000000,1.010432,1.007800,1.021992,1.019430,1.012903,1.014025,0.007338
4355,99974311662,,1.000000,1.000000,1.000000,1.000000,1.000000,1.000000,0.000000
4356,99976658299,1.032258,1.015504,1.014428,1.028730,1.015337,1.022792,1.023469,0.007035
4357,99987905597,,1.000000,1.000000,1.000000,1.000000,1.000000,1.000000,0.000000


# merchant_category

In [39]:
merchant_category = pd.read_csv("merchant_category.csv")
merchant_category

FileNotFoundError: [Errno 2] No such file or directory: 'merchant_category.csv'

In [None]:
categories = [tag for tag in merchant_category if tag not in ('name','merchant_abn','revenue_level','take_rate')]
merchants_by_categories = {}
for category in categories:
    merchants_by_categories[category]=merchant_category[merchant_category[category] != 0]['merchant_abn'].unique()

In [None]:
# tag별로 판매이력이 있는 상점 목록
merchants_by_categories