In [39]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import ByteType

In [40]:
spark = (
    SparkSession.builder.appName("preprocessing of taxi data")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .config("spark.executor.memory", "2g")
    .config("spark.driver.memory", "4g")
    .getOrCreate()
)

## Classification of merchants
### **Objective**: Predict product description, revenue level and take rate of the missing merchants
### **Classification pipeline**:
 0. Preliminary Data Analysis
 1. Data Engineering
  * Mostly done in ETL
    * Encode revenue level into integer value, e.g. 1, 2, 3, 4, 5
    * Clean the prod_desc (has been updated in ETL)
  * Need one curated dataset for modeling product description and one dataset for modeling revenue level and take rate
 2. Feature Engineering
  * Aggregate data to produce more useful features for modeling revenue level and take rate
  * Recommended features for prod_desc: dollar value, user id and order datetime
  * Recommended features for revenue level and take rate: monthly average revenue,  monthly average number of orders, monthly average number of distinct customers, average revenue per order, median revenue, variance of dollar amount
 3. Data Modeling
  * Choice of classification model: XGBClassifier, RandomForest, Naive Bayes(Last resort)
  * Choice of regression model: Linear regression, XGBregressor
  * Fitting and Tuning model to achieve optimal performance 
 4. Model Validation
 * Metrics:
    * Categorical(prod_desc and revenue_level): 
      * Accuracy
      * f1 score
    * Continuous(take_rate):
      * RMSE
 * Visualization:
    * Categorical:
      * learning curve
      * ROC curve
      * confusion matrix 
    * Continuous:
      * residuals vs. fitted value
 5. Model deployment
   * Use the prediction to impute missing information


### Preliminary Data Analysis
#### Due to the limitation of my device, the PDA is performed on transaction between 2021/02/28 and 2021/08/27

In [41]:
transaction_sdf = spark.read.parquet("../data/curated/transactions_20210228_20210827")

In [42]:
num_of_unknown_merchants = transaction_sdf.where(F.col("merchant_name").isNull()) \
                            .select(F.col("merchant_abn")).distinct().count()
num_of_order_from_unknown_merchants = transaction_sdf.where(F.col("merchant_name").isNull()) \
                            .select(F.col("merchant_abn")).count()

print(f"num_of_unknown_merchants = {num_of_unknown_merchants}" + "\n" +
      f"num_of_order_from_unknown_merchants = {num_of_order_from_unknown_merchants}")

num_of_unknown_merchants = 378
num_of_order_from_unknown_merchants = 149228


#### Checking if every missing merchant has at least one related known merchant, <u>i.e. if there is at least one customer who buys from an unknown merchant also buy from other merchants</u>

In [43]:
transaction_sdf.createOrReplaceTempView('transactions')

spark.sql("""
SELECT merchant_abn, user_id
FROM transactions
WHERE merchant_name IS NULL
""").createOrReplaceTempView('unknown_merchants')

spark.sql("""
SELECT *
FROM transactions
WHERE merchant_name IS NOT NULL
""").createOrReplaceTempView('orders_in_known_merchants')


joined_sdf = spark.sql("""
SELECT table1.merchant_abn AS unknown_merchant_abn, 
    COUNT(table1.merchant_abn) AS num_of_order_in_known_merchant,
    COUNT(DISTINCT table2.user_id) AS num_of_distinct_customers,
    COUNT(DISTINCT table2.merchant_abn) AS num_of_known_merchant,
    COUNT(DISTINCT table2.prod_desc) AS num_of_distinct_prod_desc,
    COUNT(DISTINCT table2.revenue_level) AS num_of_distinct_revenue_level
FROM unknown_merchants AS table1
LEFT JOIN orders_in_known_merchants AS table2 
ON table1.user_id=table2.user_id
GROUP BY table1.merchant_abn
""")

joined_sdf.limit(5)

                                                                                

unknown_merchant_abn,num_of_order_in_known_merchant,num_of_distinct_customers,num_of_known_merchant,num_of_distinct_prod_desc,num_of_distinct_revenue_level
24406529929,151201,1021,3233,25,5
56395390867,1287,9,587,24,5
28767881738,136,1,116,22,4
45925655949,3286,23,1000,24,5
87802246756,116776,778,3092,25,5


In [44]:
# Check if there is at least one customer for every unknown merchant who purchase from a known merchant
joined_sdf.filter(F.col("num_of_order_in_known_merchant") == 0)

                                                                                

unknown_merchant_abn,num_of_order_in_known_merchant,num_of_distinct_customers,num_of_known_merchant,num_of_distinct_prod_desc,num_of_distinct_revenue_level


In [45]:
# number of distinct product description
transaction_sdf.select(F.col("prod_desc")).distinct().orderBy(F.col("prod_desc")).count()

26

In [46]:
# take_rate is the independent of the dollar amount of an order
transaction_sdf.orderBy(F.col("merchant_abn")).limit(5)

                                                                                

user_id,merchant_abn,dollar_value,order_id,order_datetime,merchant_name,prod_desc,revenue_level,take_rate,consumer_name,address,state,postcode,gender,consumer_id
1389,10023283211,202.9580709315428,d79f7f72-1ee9-4d0...,2021-05-14,Felis Limited,"furniture, home f...",e,0.1,Riley Dominguez,99661 Bradley Unions,ACT,2614,Male,1025254
2000,10023283211,98.79635546902789,7122e4e3-1ff1-468...,2021-03-22,Felis Limited,"furniture, home f...",e,0.1,Joshua Anderson,60145 Mendoza Hig...,NSW,2442,Male,1120280
1495,10023283211,149.36202209710652,f29c1f95-491b-4b6...,2021-06-08,Felis Limited,"furniture, home f...",e,0.1,Amanda Galloway,27887 Maria Drive...,NSW,2207,Female,386014
1168,10023283211,354.4266795140579,5cfe6971-5b18-4e3...,2021-04-01,Felis Limited,"furniture, home f...",e,0.1,David Sanders,09930 Angela Cove...,NSW,2021,Undisclosed,388087
1759,10023283211,508.92006999805375,9c80bc74-e333-4e4...,2021-07-02,Felis Limited,"furniture, home f...",e,0.1,Cynthia Pierce,6474 Kristina Bur...,VIC,3024,Female,599489


In [47]:
# Take rate and revenue level are highly negatively correlated

revenue_dict = {'a': 1, 'b': 2, 'c': 3, 'd': 4, 'e': 5}

@F.udf(returnType=ByteType())
def revenue_level_converter(rl):
    if rl:
        return revenue_dict[rl]
    else:
        return None

pc = transaction_sdf.filter(F.col("revenue_level").isNotNull()) \
                    .withColumn("revenue_level_int", revenue_level_converter(F.col("revenue_level"))) \
                    .corr('revenue_level_int', 'take_rate')
print("The Pearson's correlation coefficient between revenue level and take rate is " + str(pc))



The Pearson's correlation coefficient between revenue level and take rate is -0.9520856896861268


                                                                                

### Modeling 

In [48]:
import pandas as pd
from sklearn.model_selection import cross_val_score, train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import mean_squared_error, make_scorer
from sklearn.dummy import DummyClassifier, DummyRegressor
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LinearRegression
import xgboost as xgb
rmse = make_scorer(mean_squared_error, squared=False)

In [49]:
train_set = pd.read_csv("../data/curated/agg_transaction_train.csv")
pred_set = pd.read_csv("../data/curated/agg_transaction_pred.csv")

In [50]:
train_set = train_set.iloc[:, 1:] # drop merchant_abn
pred_set = pred_set.drop(columns=["merchant_abn", "take_rate", "revenue_level_int"]) # drop merchant_abn
train_set = train_set.fillna(0) # columns of stddev contains NULL if the merchant has only one transaction

In [51]:
X = train_set.drop(columns=["take_rate", "revenue_level_int"])
y_revenue_level = train_set["revenue_level_int"]
y_revenue_level = LabelEncoder().fit_transform(y_revenue_level)
y_take_rate = train_set["take_rate"]

#### Classification of revenue level

##### Baseline Classifier

In [52]:
dummy_clf = DummyClassifier(strategy="most_frequent")
cross_val_score(dummy_clf, X, y_revenue_level, cv=5).mean()

0.39884981226533167

##### XGB Classifier

In [53]:
xgb_clf = xgb.XGBClassifier(
 eta=0.1,
 n_estimators=1000,
 max_depth=3,
 min_child_weight=1,
 gamma=0.1,
 subsample=0.8,
 colsample_bytree=0.8,
 objective= 'multi:softmax',
 nthread=4,
 num_class = 5,
 seed=2022)

cross_val_score(xgb_clf, X, y_revenue_level, cv=5).mean()

0.3690904255319149

##### Random Forest Classifier

In [54]:
rf_clf = RandomForestClassifier(max_depth=5, random_state=2022)
cross_val_score(rf_clf, X, y_revenue_level, cv=5).mean()

0.4133554443053818

#### Regression of take rate

##### Baseline Regressor

In [55]:
dummy_reg = DummyRegressor(strategy="mean")
cross_val_score(dummy_reg, X, y_revenue_level, scoring = rmse, cv=5).mean()

0.9148727289998918

##### XGB regressor

In [56]:
xgb_reg = xgb.XGBRegressor(
    eta=0.1,
    n_estimators=1000,
    max_depth=3,
    min_child_weight=1,
    gamma=0.1,
    subsample=0.8,
    colsample_bytree=0.8,
    nthread=4,
    seed=2022)

cross_val_score(xgb_reg, X, y_revenue_level, scoring=rmse, cv=5).mean()


0.9839411849355182

##### Linear Regression

In [57]:
lin_reg = LinearRegression()
cross_val_score(lin_reg, X, y_revenue_level, scoring=rmse, cv=5).mean()

0.9133982666245025

#### Classification of prod_desc

In [58]:
# Naive Bayes Classification model is choosen because of its high efficiency
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, IndexToString
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [59]:
def col_indexer(cols, data):
    """
    return transformed data where col(s) is mapped to an index
    parameters:
               cols (list or other iterables): a list a columns 
               data (spark dataframe): a spark dataframe of target data
    """
    data = StringIndexer(inputCol=cols[0], outputCol=cols[0]+"_idx").fit(data).transform(data)
    if len(cols) > 1:
        return col_indexer(cols[1:], data)
    else:
        return data

def col_ohe(col, data):
    """
    return transformed data where col(s) is turned into a k-dimension vector, 
    k is the number of unique value of col
    parameters:
            cols (list or other iterables): a list a columns 
            data (spark dataframe): a spark dataframe of target data
    """
    data = OneHotEncoder(inputCol=col[0], outputCol=col[0]+"_vector").fit(data).transform(data)
    if len(col) > 1:
        return col_ohe(col[1:], data)
    else:
        return data

assembler = VectorAssembler(inputCols=[
    "user_id_vector", "dollar_value",
    "order_doy_vector",
], outputCol='features')



modified_transaction_sdf = transaction_sdf.filter(F.col("merchant_name").isNotNull()) \
                                                  .withColumn("order_doy", F.dayofyear("order_datetime"))
ohe_transaction_sdf = col_ohe(["user_id", "order_doy"], data=modified_transaction_sdf)
idx_transaction_sdf = col_indexer(["prod_desc"], data=ohe_transaction_sdf)
assembled_transaction_sdf = assembler.transform(idx_transaction_sdf).select("features", "prod_desc_idx")
assembled_transaction_sdf.limit(5)

                                                                                

22/10/04 23:26:48 WARN DAGScheduler: Broadcasting large task binary with size 1580.0 KiB
22/10/04 23:26:49 WARN DAGScheduler: Broadcasting large task binary with size 1580.0 KiB


features,prod_desc_idx
"(24321,[16,24081,...",3.0
"(24321,[16,24081,...",6.0
"(24321,[16,24081,...",10.0
"(24321,[16,24081,...",17.0
"(24321,[16,24081,...",2.0


In [60]:
train_set, test_set = assembled_transaction_sdf.randomSplit([0.8, 0.2], seed=2022)
nb = NaiveBayes(smoothing=1.0, 
                modelType="multinomial", 
                labelCol="prod_desc_idx").fit(train_set)

prediction = nb.transform(test_set)

evaluator = MulticlassClassificationEvaluator(labelCol="prod_desc_idx",
                                              predictionCol="prediction",
                                              metricName="accuracy")

accuracy = evaluator.evaluate(prediction)
print("Test set accuracy = " + str(accuracy))

22/10/04 23:26:49 WARN DAGScheduler: Broadcasting large task binary with size 2.7 MiB




22/10/04 23:26:56 WARN DAGScheduler: Broadcasting large task binary with size 2.7 MiB


                                                                                

22/10/04 23:26:57 WARN DAGScheduler: Broadcasting large task binary with size 7.3 MiB




Test set accuracy = 0.10486449201320018


                                                                                

### Summary
#### Since all the models have poor performance in terms of accuracy/RMSE, it is not feasible to deploy them in an imputation process. From our perspectives, tuning the model will be ultimately a waste of time because it will only refine our model instead of improving it significantly. <u>Therefore, we decide to leave these merchants as unknown for now, and we will try to use clustering method to impute their take rates in the future.</u>.