# w261 - Project - Criteo Clickthrough
#### Project Team:
- Yekun Wang
- Anand Eyunni
- Dinesh Achuthan
- Miroslava Walekova

#### Contents
 0. Notebook Set Up 
 1. Introduction
 2. Exploratory Data Analysis
 3. Preprocessing Numeric Variables
 4. Algorithm Theory
 5. Feature Engineering and Baseline Model
 6. Model Training and Selection 
 7. Conclusions
 8. Course Concepts
 
#### Executive Summary

Criteo is a marketing solution that enables online businesses to follow up visitors who have left a website without a transaction by using customized banners as ads. The solution operates on a pay per click/cost per click basis. Advertisers only pay for the customers that click on an ad banner. In this particular scenario, the problem we are trying to solve is to predict the likelihood that a user will click on a given ad on a page. The prediction is done based on features generated from traffic logs along with click labels.

The Criteo dataset has approximately more than 46 million records and contains 26 categorical variables and 13 numeric variables. We take the following steps to prepare our solution - data analysis, data preparation, feature engineering, model training and hyperparamter tuning. We perform exploratory data analysis on the given data and realize that there are missing, null, negative and hashed values. Without columnar description, it is hard to make meaningful assumptions about backfilling data. We perform downsampling and upsampling to maintain balance between the two outcomes. We also test for correlation between variables (and log transformed variables) and realized that multicolinearity was not an issue. We also perform imputation using column mean because it worked better than median imputation. 

We then begin baseline modeling using logistic regression and computed the confusion matrix, ROC curve, and Precision vs Recall. We also explore normalization of numeric variables in the baseline model to search for improvement in the model. For categorical variables, we impute the missing values, and perform binning, hashing, one-hot-encoding and chi-squared feature selection. There is no significant impact on Precision, Recall, AUC or Logloss. Rebalancing the training dataset has an impact on all four metrics. The next model is a Random Forest with cross validation and evaluation of model performance. It performs less well than the logistic regression model. A third model in the form of Gradient Boosted Trees also performed less well than logistic regression despite hyper parameter tuning. Eventually, we settle on the logistic regression for our click predictions based on our evaluation metrics and kaggle leaderboard logloss metric.

# 1. Notebook Set-Up

In [3]:
# IBM DagCrossValidator set up
# %sh 
# git clone https://github.com/Walekova/PipelineTuning.git

In [4]:
#%sh
#ls -alh /databricks/driver/PipelineTuning/

In [5]:
import sys

# Add the path to system
#sys.path.append('/databricks/driver/')
#sys.path.append('/databricks/driver/PipelineTuning/')
#sys.path.append('/databricks/driver/PipelineTuning/pipeline_tuning.py')

In [6]:
# Imports

# Utilities
import re
import time
import ast
import os
import tarfile
import itertools

# Transformation
import numpy as np
import pandas as pd
from functools import reduce

# Plotting
import matplotlib.pyplot as plt
from matplotlib.colors import LogNorm
import seaborn as sns
%matplotlib inline

# Spark SQL Imports
from pyspark.sql import SQLContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import col, count as sparkcount, when, lit
from pyspark.sql import DataFrame

# Spark ML imports - pipeline components
from pyspark.ml.feature import Imputer, StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.feature import FeatureHasher, OneHotEncoderEstimator, ChiSqSelector
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Spark ML imports - modelling metrics
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import GBTClassifier
# from pyspark.mllib.evaluation import MulticlassMetrics ### reguired for F1 score calculation

# Spark ML imports - models
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import RandomForestClassifier

# Custom IBM Codait pipeline tuning library
# from pipeline_tuning import DagCrossValidator ### -> build for python 2.7

In [7]:
# Get user home directory
username = dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags().apply('user')
userHome = 'dbfs:/user/' + username
projectPath = userHome + "/project/" 

# Extract source data from tar file
projectPathOpen = '/dbfs' + projectPath.split(':')[-1] 
dbutils.fs.mkdirs(projectPath)
source_data = '/dbfs/mnt/mids-w261/data/datasets_final_project/dac.tar.gz'
tar = tarfile.open(source_data, "r:gz")
tar.extractall(projectPathOpen)
tar.close()

# Read and convert Train Data into parquet format
dbutils.fs.rm(projectPath+"/dataTrain.parquet", recurse=True)

schema = StructType([\
      StructField("label", FloatType(), True),\
      StructField("I1", FloatType(), True),\
      StructField("I2", FloatType(), True),\
      StructField("I3", FloatType(), True),\
      StructField("I4", FloatType(), True),\
      StructField("I5", FloatType(), True),\
      StructField("I6", FloatType(), True),\
      StructField("I7", FloatType(), True),\
      StructField("I8", FloatType(), True),\
      StructField("I9", FloatType(), True),\
      StructField("I10", FloatType(), True),\
      StructField("I11", FloatType(), True),\
      StructField("I12", FloatType(), True),\
      StructField("I13", FloatType(), True),\
      StructField("C1", StringType(), True),\
      StructField("C2", StringType(), True),\
      StructField("C3", StringType(), True),\
      StructField("C4", StringType(), True),\
      StructField("C5", StringType(), True),\
      StructField("C6", StringType(), True),\
      StructField("C7", StringType(), True),\
      StructField("C8", StringType(), True),\
      StructField("C9", StringType(), True),\
      StructField("C10", StringType(), True),\
      StructField("C11", StringType(), True),\
      StructField("C12", StringType(), True),\
      StructField("C13", StringType(), True),\
      StructField("C14", StringType(), True),\
      StructField("C15", StringType(), True),\
      StructField("C16", StringType(), True),\
      StructField("C17", StringType(), True),\
      StructField("C18", StringType(), True),\
      StructField("C19", StringType(), True),\
      StructField("C20", StringType(), True),\
      StructField("C21", StringType(), True),\
      StructField("C22", StringType(), True),\
      StructField("C23", StringType(), True),\
      StructField("C24", StringType(), True),\
      StructField("C25", StringType(), True),\
      StructField("C26", StringType(), True)])      

txtTrain = spark.read.format("csv")\
.option("header", "false")\
.option("delimiter", "\t")\
.schema(schema)\
.load(projectPath+"train.txt")

txtTrain.write.format("parquet").save(projectPath+"/dataTrain.parquet")

# Read and convert Test Data into parquet format

dbutils.fs.rm(projectPath+"dataTest.parquet", recurse=True)

schema = StructType([\
      StructField("I1", FloatType(), True),\
      StructField("I2", FloatType(), True),\
      StructField("I3", FloatType(), True),\
      StructField("I4", FloatType(), True),\
      StructField("I5", FloatType(), True),\
      StructField("I6", FloatType(), True),\
      StructField("I7", FloatType(), True),\
      StructField("I8", FloatType(), True),\
      StructField("I9", FloatType(), True),\
      StructField("I10", FloatType(), True),\
      StructField("I11", FloatType(), True),\
      StructField("I12", FloatType(), True),\
      StructField("I13", FloatType(), True),\
      StructField("C1", StringType(), True),\
      StructField("C2", StringType(), True),\
      StructField("C3", StringType(), True),\
      StructField("C4", StringType(), True),\
      StructField("C5", StringType(), True),\
      StructField("C6", StringType(), True),\
      StructField("C7", StringType(), True),\
      StructField("C8", StringType(), True),\
      StructField("C9", StringType(), True),\
      StructField("C10", StringType(), True),\
      StructField("C11", StringType(), True),\
      StructField("C12", StringType(), True),\
      StructField("C13", StringType(), True),\
      StructField("C14", StringType(), True),\
      StructField("C15", StringType(), True),\
      StructField("C16", StringType(), True),\
      StructField("C17", StringType(), True),\
      StructField("C18", StringType(), True),\
      StructField("C19", StringType(), True),\
      StructField("C20", StringType(), True),\
      StructField("C21", StringType(), True),\
      StructField("C22", StringType(), True),\
      StructField("C23", StringType(), True),\
      StructField("C24", StringType(), True),\
      StructField("C25", StringType(), True),\
      StructField("C26", StringType(), True)])   

txtTest = spark.read.format("csv")\
.option("header", "false")\
.option("delimiter", "\t")\
.schema(schema)\
.load(projectPath+"test.txt")

txtTest.write.format("parquet").save(projectPath+"/dataTest.parquet")

# secondary dataframes
train_parquet = spark.read.parquet(projectPath+"dataTrain.parquet")
test_parquet = spark.read.parquet(projectPath+"dataTest.parquet")

# check directory
display(dbutils.fs.ls(projectPath))

Raw data files were provided to us as a .tar.gz package. We first extracted all files from the .tar.gz package and converted data files in to parquet files. We chose parquet as the data format because its columnar fomat results in high performance and efficiency in both storage and processing. It is especially useful for later EDA stages when we have to do a column lookup to examine each feature individually.

In [9]:
# Initialise Spark
sc = spark.sparkContext
sqlContext = SQLContext(sc)

# Clear cache - helps when rerunning the workbook 
sqlContext.clearCache()

# 2. Introduction

### 2.1 Document Purpose

Click through rate (CTR) is a metric that measures the number of clicks that a publisher receives on the ads per number of impressions. Machine learning plays a central role in computing the expected CTR of ad impressions and computing advertising campaign ROI. 

The following paper:
- outlines the methodology applied to feature pre-processing and model evaluation techniques
- defines a basic machine learning pipeline to enable execution and evaluation of individual solution experiments
- introduces several machine learning modelling options and evaluates them using pre defined metrics

### 2.2 Dataset Contents

The train and test data combined consists of more than 46 million rows. Each row corresponds to a display ad served by Criteo. Criteo is a company focused on bringing machine-learning technology, data and performance at scale together to drive measurable ROI.

Positive (clicked) and negatives (non-clicked) examples have both been subsampled at different rates in order to reduce the dataset size. The examples are chronologically ordered. The training data consists of a portion of Criteo's traffic over a period of 7 days and the test set is computed in the same way as the training set but for events on the day following the training period.

Data Fields:
- Label - Target variable that indicates if an ad was clicked (1) or not (0).
- I1-I13 - A total of 13 columns of integer features.
- C1-C26 - A total of 26 columns of categorical features hashed onto 32 bits.

### 2.3 Problem Statement

- Given the available information for a single ad impression, what is the likelihood of it results in a positive (clicked) event? 
- Which features are useful in predicting a click on a display ad?
- Which machine learning model is model suitable to predict clickthrough on an display ad for the given data scale? 

### 2.4 Evaluation of Metrics

We have decided to use a collection of metrics to evaluate our solution to the Criteo problem. This is because each of these metrics provides us with a different insight into the observed model output.

#### LogLoss
 
Our initial intention was to use logloss to calculate the binary cross entropy. Log Loss takes into account the uncertainty of the prediction based on how much it varies from the actual label - entropy. However it is sensitive to imbalanced datasets. 

$$ LogLoss = - \frac{1}{n} \sum_{i=1}^{n}[y_i \times log(\hat{p_i}) + (1 - y_i) \times log(1 - \hat{p_i})]  $$ <br>

#### Area Under the Curve (AUC)

AUC maximizes the model's ability to discriminate between classes whilst the logloss penalizes the divergency between actual and estimated probabilities. While we cannot say that a model maximizing AUC means minimized log loss. Whether a model minimizing log loss corresponds to maximized AUC will rely heavily on the context; class separability, model bias.

AUC is scale-invariant. It measures how well predictions are ranked, this makes it suitable for click advertising campaign. AUC is a good metric to use since the predictions ranked by probability is the order in which a list of adverts can be displayed.

As model maximising AUC may not lead to a model minimising Logloss we have decided to use AUC as our model evaluation metric, however also calculate Logloss as a control metric to evaluate our progress in building our model.

#### Confusion Matrix, Precision & Recall

In addition to AUC and Logloss we have decided to capture the confusion matrix, precision and recall because of their interpretability characteristics. 

#### Other Metrics Considered

Other metrics that could be used to evaluate results for a CTR modelling problem are F Score and Payoff matrix.

__Adjusted F Score__

Regular F1 score gives equal weight to to precision and recall. However in the case of click through rate this woud not be appropriate. We would need to include domain knowledge in our evaluation, to decide the appropriate weighting for recall or precision.

Weighted F1 metric, where beta manages the tradeoff between precision and recall:

$$ F_{beta} = (1 + \beta^2) \times \frac{precision \times recall}{\beta^2 * precision + recall} $$

__Payoff__

Confusion matrix can be used to calculate payoffs.

$$ Payoff = a \times TP + b \times FP + c \times FN + d \times TN  $$
a - Revenue from clicks <br>
b - Cost of clicks that do not result in revenue <br>
c - Opportunity cost <br>
d - No cost or revenue <br>

We do not however have sufficient domain knowledge and  information about the source data industry hence we are unable to make any assumptions in relation to calculation of the payoff matrix.
This is because in advertising, majority of the charging models are set at cost per impression rather than cost per click.

# 3. Exploratory Data Analysis

The following two SQL statements provide a glimpse into the raw data sets. Notice that There are many NULL values across both numeric and categorical variables.

In [13]:
# Load source data into SQL table for queries
trainPath = projectPath+'dataTrain.parquet/'
testPath = projectPath+'dataTest.parquet/'
spark.sql("DROP TABLE IF EXISTS trainTable")
spark.sql("DROP TABLE IF EXISTS testTable")
spark.sql("CREATE TEMPORARY TABLE trainTable USING parquet OPTIONS (path '{}')".format(trainPath))
spark.sql("CREATE TEMPORARY TABLE testTable USING parquet OPTIONS (path '{}')".format(testPath))

Through EDA we intend to analyze these features for:
1.	Missing values
2.	Erroneous and or outliers
3.	Understand the unit representation of the data to enable the decision on normalization or standardization in the feature engineering phase
4.	Constant features ( zero variance in a feature across the data set)

In [15]:
# View raw training data
%sql select * from trainTable limit 10;

label,I1,I2,I3,I4,I5,I6,I7,I8,I9,I10,I11,I12,I13,C1,C2,C3,C4,C5,C6,C7,C8,C9,C10,C11,C12,C13,C14,C15,C16,C17,C18,C19,C20,C21,C22,C23,C24,C25,C26
0.0,28.0,3.0,12.0,44.0,15.0,43.0,28.0,43.0,44.0,1.0,1.0,0.0,32.0,05db9164,2f659110,e5cdf789,c2fcecf6,4cf72387,,a4b6564e,1f89b562,a73ee510,3b08e48b,a04db730,72ac7802,c66b30f8,b28479f6,daf40beb,f1a11ae6,e5ba7672,c84cfab2,21ddcdc9,5840adea,f8709c5b,,32c7478e,38be899f,f55c04b6,56009c93
0.0,,-1.0,,,,,0.0,4.0,4.0,,0.0,,,be589b51,8084ee93,02cf9876,c18be181,25c83c98,7e0ccccf,08383675,5b392875,7cc72ec2,3b08e48b,727af3e2,8fe001f4,49fe3d4e,07d13a8f,422c8577,36103458,2005abd1,52e44668,,,e587c466,,be7c41b4,3b183c5c,,
0.0,1.0,260.0,128.0,1.0,0.0,0.0,8.0,11.0,11.0,1.0,6.0,,0.0,68fd1e64,38a947a1,8512d54d,7031bb66,25c83c98,7e0ccccf,c9f171f9,5b392875,a73ee510,e4706565,755e4a50,e2163351,5978055e,b28479f6,46ed0b3c,beef16d6,d4bb7bd8,2c6cb693,,,4645d72c,,bcdee96c,b258af68,,
1.0,8.0,0.0,7.0,12.0,2.0,7.0,9.0,3.0,29.0,1.0,2.0,,6.0,5bfa8ab5,287130e0,95543f64,e326e2dc,43b19349,13718bbd,6284da2d,0b153874,a73ee510,0a263d38,5874c9c9,67f9c091,740c210d,1adce6ef,310d155b,a31938ae,e5ba7672,891589e7,4a237258,b1252a9d,fc87b483,,bcdee96c,3fdb382b,ea9a246c,49d68486
0.0,0.0,44.0,56.0,1.0,1751.0,5.0,17.0,3.0,57.0,0.0,7.0,0.0,1.0,68fd1e64,287130e0,35e8c904,38c6a2ff,25c83c98,7e0ccccf,fed3cb1d,0b153874,a73ee510,175d6c71,b7094596,ca9f3db8,1f9d2c38,07d13a8f,10040656,d8daf836,e5ba7672,891589e7,21ddcdc9,5840adea,0bf4a9b7,ad3062eb,32c7478e,3fdb382b,ea9a246c,49d68486
0.0,2.0,7.0,35.0,10.0,82.0,31.0,41.0,20.0,289.0,1.0,13.0,0.0,10.0,9a89b36c,d8fc04df,b009d929,c7043c4b,25c83c98,7e0ccccf,bb3b7ab9,0b153874,a73ee510,3b08e48b,90b202b5,3563ab62,3a9dafb8,07d13a8f,33e5b3c4,b688c8cc,8efede7f,cbadff99,21ddcdc9,5840adea,2754aaf1,,c7dc6720,3b183c5c,010f6491,9a483882
0.0,,0.0,56.0,1.0,4288.0,,0.0,28.0,125.0,,0.0,,24.0,73bd393d,403ea497,2cbec47f,3e2bfbda,25c83c98,,57b4bd89,37e4aa92,a73ee510,3b08e48b,8ca164ab,21a23bfe,ddd66ce1,07d13a8f,e3209fc2,587267a3,776ce399,a78bd508,21ddcdc9,a458ea53,c2a93b37,,423fab69,1793a828,e8b83407,2fede552
1.0,3.0,1.0,,,73.0,1.0,3.0,1.0,1.0,1.0,1.0,,,05db9164,bdaedcf5,ebe4edfa,ebc42d91,25c83c98,7e0ccccf,366a171d,5b392875,a73ee510,87c7319e,1ed3ae25,e8c9d3fa,aaa80b97,07d13a8f,8165b5cf,2c52502a,07c540c4,7d461236,,,f8c88eda,c9d4222a,3a171ecb,1b256e61,,
1.0,1.0,9.0,4.0,0.0,5.0,0.0,1.0,0.0,0.0,1.0,1.0,,0.0,68fd1e64,bc478804,c746b84d,13508380,4cf72387,fe6b92e5,9c8ed289,0b153874,a73ee510,d3cf8a36,a7b606c4,bfbf389c,eae197fd,07d13a8f,0af7c64c,1e4dde7c,1e88c74f,65a2ac26,55dd3565,b1252a9d,a3e8e99b,,3a171ecb,45ab94c8,001f3601,c84c4aec
0.0,,1.0,3.0,,,,0.0,0.0,5.0,,0.0,,,05db9164,09e68b86,559d9a99,c160a680,25c83c98,,d7087b39,f504a6f4,7cc72ec2,3b08e48b,786751d8,1664a565,0b7f85d0,64c94865,91126f30,181a367a,2005abd1,5aed7436,5e1505a3,a458ea53,cc3b7265,,32c7478e,d931d13b,e8b83407,01702271


In [16]:
# view raw test data
%sql select * from testTable limit 10;

I1,I2,I3,I4,I5,I6,I7,I8,I9,I10,I11,I12,I13,C1,C2,C3,C4,C5,C6,C7,C8,C9,C10,C11,C12,C13,C14,C15,C16,C17,C18,C19,C20,C21,C22,C23,C24,C25,C26
10.0,15.0,4.0,7.0,348.0,18.0,66.0,30.0,520.0,2.0,6.0,,7.0,05db9164,1cfdf714,25b53c72,13508380,f1d40cbe,,90a2c015,0b153874,a73ee510,466a312b,66bfc5e8,4c70d733,3af886ff,07d13a8f,e439dd9b,2280219e,e5ba7672,9df49ecd,55dd3565,b1252a9d,1429d5ca,ad3062eb,3a171ecb,45ab94c8,cb079c2d,c84c4aec
,40.0,25.0,7.0,10532.0,85.0,171.0,8.0,74.0,,2.0,,7.0,5a9ed9b0,1cfdf714,e626c3c7,0cd3e43c,25c83c98,7e0ccccf,7195046d,0b153874,a73ee510,61283720,4d8549da,68f0f743,51b97b8f,b28479f6,d345b1a0,bb9de190,e5ba7672,e88ffc9d,21ddcdc9,b1252a9d,e8f657eb,,bcdee96c,544f6090,cb079c2d,f2e052c3
0.0,0.0,6.0,,7983.0,7.0,44.0,1.0,5.0,0.0,3.0,1.0,,be589b51,1cfdf714,560e18db,02ba716f,25c83c98,3bf701e7,84a150ff,0b153874,a73ee510,3b08e48b,e08f36b2,3b4996ae,422ba909,1adce6ef,f3002fbd,636770c4,27c07bd6,e88ffc9d,790f389c,a458ea53,f050a9cf,c9d4222a,423fab69,ebd002c5,cb079c2d,ed7a66ba
0.0,2.0,4.0,2.0,1481.0,69.0,3.0,44.0,228.0,0.0,2.0,0.0,2.0,75ac2fe6,1cfdf714,aa585663,c7577387,25c83c98,fe6b92e5,a3579031,a25968f2,a73ee510,1a5ba63e,1054ae5c,0ec848db,d7ce3abd,b28479f6,d345b1a0,eb8a8ef3,e5ba7672,e88ffc9d,efa3470f,a458ea53,c3ec1415,,bcdee96c,904cf83e,cb079c2d,09b3050a
0.0,1672.0,10.0,1.0,4623.0,434.0,4.0,25.0,236.0,0.0,1.0,,44.0,05db9164,8084ee93,d032c263,c18be181,25c83c98,7e0ccccf,5e64ce5f,1f89b562,a73ee510,4f1e4025,8b94178b,dfbb09fb,025225f2,b28479f6,b2ff8c6b,84898b2a,e5ba7672,52e44668,,,0014c32a,,32c7478e,3b183c5c,,
0.0,1.0,45.0,0.0,5617.0,320.0,17.0,4.0,95.0,0.0,1.0,,3.0,39af2607,4f25e98b,3fa80e7b,7c246ddc,25c83c98,fbad5c96,0b28577f,1f89b562,a73ee510,2f45a7d3,ccecf8bb,0ec9545e,e853b835,64c94865,40e29d2a,c0b3c99f,3486227d,7ef5affa,55dd3565,a458ea53,d99b40a7,,3a171ecb,f9f8a955,9d93af03,d509cc5d
1.0,-1.0,9.0,11.0,921.0,58.0,57.0,34.0,327.0,1.0,9.0,0.0,11.0,1464facd,09e68b86,0b1059dc,82aab4aa,25c83c98,,b87f4a4a,0b153874,a73ee510,e70742b0,319687c9,79323ec3,62036f49,07d13a8f,36721ddc,522dccf9,e5ba7672,5aed7436,f30f7842,a458ea53,89d51bd1,,32c7478e,e80b80d5,e8b83407,df2e3596
,-1.0,1.0,1.0,15007.0,1.0,9.0,1.0,1.0,,2.0,,1.0,5a9ed9b0,09e68b86,c94341ca,40a048a5,25c83c98,,b87f4a4a,0b153874,a73ee510,e70742b0,319687c9,85ade081,62036f49,07d13a8f,36721ddc,4629ae0b,e5ba7672,5aed7436,b8868b4a,a458ea53,44f7c960,,32c7478e,c7e31a5c,e8b83407,dcc9c5d3
0.0,0.0,2.0,,1729.0,1.0,1.0,1.0,1.0,0.0,1.0,,,05db9164,09e68b86,6bed6a88,a058fdb5,25c83c98,,f14f1abf,985e3fcb,a73ee510,1f20471e,7b5deffb,7658ab1c,269889be,07d13a8f,36721ddc,55fb7803,d4bb7bd8,5aed7436,449c7d20,a458ea53,f4b7b02b,,32c7478e,63863fe2,e8b83407,4cc52399
,7.0,4.0,1.0,11638.0,,0.0,1.0,31.0,,0.0,0.0,1.0,05db9164,1cfdf714,dfd5584d,c60f65b0,25c83c98,3bf701e7,a2bea6d8,0b153874,a73ee510,a30a7b4a,72a65bcc,6fcc4b21,b8fee572,f862f261,0e1257cc,4cbb4c83,e5ba7672,e88ffc9d,7c629f16,b1252a9d,3be3a723,ad3062eb,55dd3565,3fdb382b,cb079c2d,49d68486


In [17]:
# Separate out nummeric and categorical as iCols and cCols
# Define Column Types
iCols = [i for i in train_parquet.columns if re.search('I',i)]
cCols = [i for i in train_parquet.columns if re.search('C',i)]

In [18]:
# Get basic statistics and display as pandas dataframe
trainDF = train_parquet.select(iCols)
trainDF = trainDF.describe()
trainPDF = trainDF.toPandas().set_index("summary").transpose()
trainPDF

summary,count,mean,stddev,min,max
I1,25047061,3.5024133170754044,9.42907640710507,0.0,5775.0
I2,45840617,105.84841979766546,391.45782268707086,-3.0,257675.0
I3,36001170,26.913041020611274,397.9725830227336,0.0,65535.0
I4,35903248,7.322680248873305,8.793230712645805,0.0,969.0
I5,44657500,18538.991664871523,69394.60184622345,0.0,23159456.0
I6,35588289,116.06185085211598,382.5664493712397,0.0,431037.0
I7,43857751,16.333130032135028,66.04975524511718,0.0,56311.0
I8,45817844,12.517042137556713,16.688884567787543,0.0,6047.0
I9,43857751,106.1098234380509,220.2830939864804,0.0,29019.0
I10,25047061,0.6175294977722137,0.6840505553977029,0.0,11.0


We do not know the actual column names in these data sets. We do see columns with negative values and/or null values in numeric columns. Without knowing the actual column and what type of business data is stored, it is hard to assume to clean up the data. At this point we intend to leave the data as is if it is negative and convert all the integer column as float type. From the above table, it is clear that most or all of the integer columns have some form of standard deviation which indicates none of the columns are constant columns but there are few quasi-constant like I10, I11 which we will explore in detail in later sections

In [20]:
# Describe categorical columns
# Number of valid values in each categorical column
catNonMissing = train_parquet.agg(*(count(col(c)).alias(c) for c in cCols))

# Percent of NULL or NaN values in each categorical column
nRows = train_parquet.count()
catNullPercent = train_parquet.select([(count(when(isnan(c) | col(c).isNull(), c))/nRows).alias(c) for c in cCols])

# Number of unique values in each categorical variable:
catUnique = train_parquet.agg(*(countDistinct(col(c)).alias(c) for c in cCols))

# Explore the number of null records for categorical features
def unionAll(*dfs):
    """
    A function to join/ merge dataframes
    """
    return reduce(DataFrame.unionAll, dfs)

# Create categorical statistics and convert to pandas dataframe for viewing
CatStats = unionAll(catNonMissing, catNullPercent, catUnique)
categoricalFeatureStats = CatStats.toPandas()
categoricalFeatureStats = categoricalFeatureStats.T
categoricalFeatureStats.columns = ['Non Null Record count','Percentage of Null records', 'Number of Unique records']
categoricalFeatureStats

Unnamed: 0,Non Null Record count,Percentage of Null records,Number of Unique records
C1,45840617.0,0.0,1460.0
C2,45840617.0,0.0,583.0
C3,44281144.0,0.034019,10131226.0
C4,44281144.0,0.034019,2202607.0
C5,45840617.0,0.0,305.0
C6,40299992.0,0.120867,23.0
C7,45840617.0,0.0,12517.0
C8,45840617.0,0.0,633.0
C9,45840617.0,0.0,3.0
C10,45840617.0,0.0,93145.0


The output shows we have 26 string variables. There are missing values, negative values, hashed values and null values. All the string columns are hashed and hence no meaningful information can be extracted. At this point we are going to assume all the string variables as categorical variables. The percentage of null values across the columns ranges from 0% to 45%. We are looking at approximately 46 million records. With this sheer volume the current categorical column needs deeper analysis into how many can be truly considered as categorical. In this table we have also captured the number of unqiue values available in each categorical column. Under normal circumstancs we expect lower number of unique values for categorical columns. This criteo data set presents unique challenge with the categorical columns as we see some columns with several hundred or sometimes even thousands of unique categorical columns. We will have to further evaluate column by column on how to hanlde this type of scenario. Outcome variable based binning and variance threshold analysis are couple of things we will do next in the featture engineering to understand more about these categorical features and will make appropriate decisions.

The target variable for this data set is a binary response of value either 0 or 1. Naturally, 1 means a click upon the ad impression and a 0 means the user failed to click upon impression. It's important to notice that the two classes are not balanced. The ratio is about 1:3 between label 1 and label 0, as shown by the histogram below. We need to look into this imbalance and ensure our training data is fully balanced so that the model has equal learning capabilites to learn about both click and non-click prediction features. Later we will do down-sampling and up-sampling technique to handle this imbalance. The good news is that all the records have either 0 or 1 and there ar no missing or null labels.

In [23]:
# Describe target column
train_parquet.groupby('label').count().show()
print("label is Null count: "+ str(train_parquet.where(col("label").isNull()).count()))
print("label is NaN count: "+ str(train_parquet.where(isnan(col("label"))).count()))

In [24]:
# Distribution plot of response variable
y = train_parquet.select('label').toPandas()
display(sns.countplot(x = y['label'].to_numpy()).set_title('Response Variable in Train'))

In [25]:
# Convert continuous variables to dataframe
trainiColsDF = train_parquet.select(iCols).na.drop().sample(False, 0.05).toPandas()

# Plot continuous variables
fig = plt.subplots(figsize = (20,20))
i = 1
for header in trainiColsDF.columns:
  plt.subplot(5,3,i)
  x = trainiColsDF[[header]]
  sns.distplot(x).set_title(header)
  plt.plot()
  i +=1

With the exception of I8 and I10 Most of the numeric variables are highly right skewed. In order to satisfy linear regression's normality assumption, we decided to log transform all numeric variables except for I8 and I10.

In [27]:
# Compute the correlation matrix
corr = trainiColsDF.corr()

# Generate a mask for the upper triangle
mask = np.triu(np.ones_like(corr, dtype=np.bool))

# Set up the matplotlib figure
f, ax = plt.subplots(figsize=(11, 9))

# Generate a custom diverging colormap
cmap = sns.diverging_palette(220, 10, as_cmap=True)

# Draw the heatmap with the mask and correct aspect ratio
sns.heatmap(corr, mask=mask, cmap=cmap, vmax=.3, center=0,
            square=True, linewidths=.5, cbar_kws={"shrink": .5})

The correlation plot among numeric variables suggest that the most extreme correlation among numeric variables is around -0.40, which suggest that multicollinearity may not be such a big deal for this data set.

# 4. Feature Pre-Processing

In [30]:
trainDataset = train_parquet
testDataset = test_parquet

# Imputing numeric
num_imputer = Imputer(
    strategy='mean',
    inputCols=['I1','I2','I3', 'I4', 'I5', 'I6', 'I7', 'I8', 'I9', 'I10', 'I11', 'I12', 'I13'], 
    outputCols=['I1','I2','I3', 'I4', 'I5', 'I6', 'I7', 'I8', 'I9', 'I10', 'I11', 'I12', 'I13']
)
# Create Numerical feature Impute Transformer
model = num_imputer.fit(trainDataset)

# Impute train data 
trainDataset = model.transform(trainDataset)

# Impute test data with the output from train dataset
testDataset = model.transform(testDataset)

For numeric variables, we chose to impute NULL values with the columns mean. Mean imputation works better than median imputation because some columns could have NULL as the median value.

In [32]:
# Log transform numeric variables
# set a small epsilon to avoid log(0) singularities
epsilon = 4.0

# transform listed numeric columns one-by-one
for col_name in ['I1','I2','I3', 'I4', 'I5', 'I6', 'I7', 'I9','I11', 'I12', 'I13']:  # I8 and I10 were not transformed
    trainDataset = trainDataset.withColumn(col_name, round(log(col(col_name) + epsilon),4))
    testDataset = testDataset.withColumn(col_name, round(log(col(col_name) + epsilon),4))

In order to apply log-transformation, we had to first make sure that all numeric features are positive values. Column I2 is the only numeric feature that contains negative values and has a minimum of -3. Therefore, we applied a modified version of the log-transformation: 

$$ x_{new} = \log(x + 4)$$

In [34]:
# sanity check for null values resulting from log transformation
trainDataset.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in ['I1','I2','I3', 'I4', 'I5', 'I6', 'I7', 'I8', 'I9', 'I10', 'I11', 'I12', 'I13']]).show()

In [35]:
# Convert continuous variables to dataframe
trainiColsDF = trainDataset.select(iCols).sample(False, 0.05).toPandas()

# Plot continuous variables

fig = plt.subplots(figsize = (20,20))
i = 1
for header in trainiColsDF.columns:
  plt.subplot(5,3,i)
  x = trainiColsDF[[header]]
  sns.distplot(x).set_title(header)
  plt.plot()
  i +=1

In [36]:
# Compute the correlation matrix
corr = trainiColsDF.corr()

# Generate a mask for the upper triangle
mask = np.triu(np.ones_like(corr, dtype=np.bool))

# Set up the matplotlib figure
f, ax = plt.subplots(figsize=(11, 9))

# Generate a custom diverging colormap
cmap = sns.diverging_palette(220, 10, as_cmap=True)

# Draw the heatmap with the mask and correct aspect ratio
sns.heatmap(corr, mask=mask, cmap=cmap, vmax=.3, center=0,
            square=True, linewidths=.5, cbar_kws={"shrink": .5})

# 5. Algorithm Theory

The problem statement of predicting whether an ad impression results in a click event (positive) or a no-click event (negtive) lands itself naturally in the domain of classification. Essentially, each observation is an ad impression and the outcome of the ad impression is denoted with a 1 or a 0 for the case of positive outcome (click) or the case of negative outcome (no-click), respectively. The data set included a total of 39 features - 13 numeric and 26 categorical. Given the feature information for a single obeservation, the chosen machine learning algorithm should be able to provide a decision of whether it's likely to result in a positive outcome or negative outcome. 

To tackle this binary classification problem, the team has chosen to evaluate three machine learning classification algorithms: 
0. Logistic Regression
1. Random Forest Classification
2. Gradient Boosted Tree Classification

__Logistic Regression__

The mathematical formulation of Logistic Regression is as follows: 
$$ \log( \frac{p_i}{1-p_i} ) = \beta_0 + \beta_1 X_1 + ... + \beta_n X_n $$ <br>

Where \\(p_i\\) represents the probability of a positive outcome for the ith obeservation. The left-hand-side of the equation, \\(\log(\frac{p_i}{1-p_i})\\), also called the log-odds, is assumed to have a linear relationship with the predictors. The \\(\beta_i\\) coefficients are the parameters of the model, which will be estimated training data set. 

Intuitively speaking, the linear model is to estimate the log of the ratio between the probability resulting in a postive outcome and the probability resulting in a negative outcome. Through cross-validation on the training set, the best model parameters (\\(\beta_i\\)) will be selected to use to predict on the final test data set. During the predicting phase, an extension to the previous mathematical formulation is applied: 
$$ \hat{p_i} = \frac{b^{\beta_0 + \beta_1 X_1 + ... + \beta_n X_n}}{b^{\beta_0 + \beta_1 X_1 + ... + \beta_n X_n} + 1} = \frac{1}{1+b^{-(\beta_0 + \beta_1 X_1 + ... + \beta_n X_n)}}$$ <br>

The above equation is useful to applied the estimated model parameters from the training set to the unseen data in the test set. 

There are several assumptions associated with Logistic Regression: 
- the log-odds must be a linear combination of the features
- there must be little multicollinearity among the features
- observations to be independent from each other

Practically speaking, Logistic Regression is able to handle both numeric and categorical variables. However, the categorical variables must be translated into dummy binary variables or otherwise known as one-hot encoded. 

__Random Forest Classification__

Random Forest classification is an extension of the ordinary decision tree method. Tree based methods do not require linearity in the feature space. In addition, tree based methods generalize well to both numeric and categorical features, without the need to perform feature engineering such as one-hot-encoding. 

Random Forest model has two improvements over the ordinary decision tree model: 
0. Bagging: instead of creating a single decision tree, Random Forest creates several decision trees, each based on a random sample of the training set. The prediction is made by evaluating outcomes from all decision trees to reach an consensus. In the case of classification, the final prediction is based on the majority voting. 
1. Randomly select features to build sub-trees: this is another adaptation on top of having an ensemble of smaller trees. By randomly selecting which features to include, the model effectively decorrelate features and avoid the effect from some features dominate the model. 

The mathematical formulation of Random Forest is as follows: 

$$ \hat{f(x)} = \frac{1}{B}\sum\_{b=1}^{B}{f_b(x)} $$ <br>

Where \\(B\\) denotes the number of subtrees included in the Random Forest.

__Gradient Boosted Tree Classification__

Gradient Boosted Tree classification is yet another extension of the ordinary decision tree method. Similar to Random Forest, it does not require linearity in the feature space and can be generalized to both numeric and categorical variables. Unlike Random Forest, which builds multiple subtrees at the same time and achieves final prediction through consensus, Gradient Boosted Tree builds smaller subtrees sequentially. In other words, Gradient Boosted Tree creates a very "shallow" tree, then tries to fit another "shallow tree" on top of the difference between the predicted value from the previous tree and the true value. 

The mathematical formulation of Random Forest is as follows: 

$$ \hat{f(x)} = \sum\_{b=1}^{B}{\lambda f_b(x)} $$ <br>

Where \\(B\\) denotes the number of sequential trees build on top of each other and \\(\lambda\\) represents the shrinkage parameter which prevents the tree from becoming too big and avoids overfitting.

In the next several sections, we will explore all three options based on our training set. Depending on the model performance in both prediction efficacy and computation runtime, we may choose to hone in on a single method for further fine-tuning. 

__Train/Dev/Test Data Split__

Only the training data set is used for model selection and model tuning purpose. The test data set, given as part of the original .tar.gz file is reserved for the final stage to test the entire pipeline end-to-end. For each round of model training, we reserve 70% of the original training set for training and use the rest 30% as a dev data set. Model performance on the dev data set is used as the criteria to benchmark models against each other.

# 6. Feature Engineering Using Baseline Model

In [39]:
# Column list assignment
# In place to enable manual management of features fed through to next stages 
numericColumns = iCols
categoricalColumns = cCols

In order to facilitate our feature engineering choices we have opted to use logistic regression to confirm whether any improvement has been achieved.

We chose logistic regression as our baseline model because of its versatility and interpretability. In addition to the logistic regression assumptions mentioned earlier there are two other pre-requisites to enable us to fit a model to the data:
- No missing values
- All values have to be numeric

We address the pre-requisites first and evaluate the incremental improvement of feature engineering choice subsequently.

In [41]:
def preprocessData(sourceTrain, sourceTest, preprocessPipeDefinition):
    """
    Pipeline transformer function. Transforms train and test data using supplied transformer pipeline.
    The transformer pipeline is fitted to train data and applied to both train and test data.
    
    INPUTS:
    sourceTrain - train data, must include label - target variable
    sourceTest - test data, cannot include label - target variable
    preprocessPipeDefinition - transformer pipeline
    
    OUTPUTS:
    trainDF - transformed training data
    testDF - transformed test data
    """
    # EXECUTE Pre-Processing Pipeline
    partialPipeline = Pipeline().setStages(preprocessPipeDefinition)
    pipelineModel = partialPipeline.fit(sourceTrain)
    
    TrainDF = pipelineModel.transform(sourceTrain)
    TestDF = pipelineModel.transform(sourceTest)

    # Keep relevant columns
    trainColumns = ["label", "features"]
    testColumns = ["features"]
    
    TrainDF = TrainDF.select(trainColumns)
    TestDF = TestDF.select(testColumns)
  
    return TrainDF, TestDF
  
def splitTestDev(TrainDF):
    """
    Split train data into training and dev dataset.
    
    INPUTS:
    trainDF - transformed train data
    
    OUTPUTS:
    trainData - training part of train dataset
    devData - dev part of train dataset
    """
  ### Randomly split data into training and test sets. set seed for reproducibility
  (trainData, devData) = TrainDF.randomSplit([0.7, 0.3], seed=100)
  
  return trainData, devData

def baselineModel(trainData, devData):
  """
  Fit a logistic regression model and return metrics.
  
  INPUTS:
  trainData - training part of train dataset
  devData - dev part of train dataset
  
  OUTPUTS:
  metricAUC - AUC metric
  metricLogLoss[0][0] - logLoss metric
  metricPrecision - Precision metric
  metricRecall - Recall metric
  df_cm - confusion matrix
  lrModel - fitted logistic regression model
  """
  # Create LogisticRegression model
  lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter = 10)
  lrModel  = lr.fit(trainData)

  # Check prediction AUC
  predictions = lrModel.transform(devData)
  
  # Cache predictions
  predictions.cache()
  
  # Confusion Matrix
  TP = predictions[(predictions.label == 1) & (predictions.prediction == 1)].count()
  TN = predictions[(predictions.label == 0) & (predictions.prediction == 0)].count()
  FP = predictions[(predictions.label == 0) & (predictions.prediction == 1)].count()
  FN = predictions[(predictions.label == 1) & (predictions.prediction == 0)].count()
  array = [[TP, FP],[FN, TN]]
  df_cm = pd.DataFrame(array, range(2), range(2))
  
  # Calculate Precision and Recall
  metricPrecision = TP/(TP+FP)
  print("Precision: ", metricPrecision)
  metricRecall = TP/(TP+FN)
  print("Recall: ", metricRecall)
  
  # Calculate metrics
  metricAUC = bcEvaluator.evaluate(predictions)
  print("AUC: ", metricAUC)
  
  #metricF1 = mcEvaluator.evaluate(predictions)
  #print("mcEvaluator output: ", metricF1)
  
  metricLogLoss = evaluate_log_loss(predictions)
  print("logloss: ", metricLogLoss[0][0])
  
  # Clear predictions from cache
  predictions.unpersist()
  
  return metricAUC, metricLogLoss[0][0], metricPrecision, metricRecall, df_cm, lrModel

# Evalutation Metric 1: areaUnderROC
bcEvaluator = BinaryClassificationEvaluator() # Possible metrics: areaUnderROC (default), areaUnderPR

# Evalutation Metric 2: f1
mcEvaluator = MulticlassClassificationEvaluator() # Possible metrics: f1 (default), weightedPrecision, weightedRecall, accuracy

# Evalute model using log loss
def evaluate_log_loss(df):
    """
    Calculate logloss.
    
    INPUTS:
    df - prediction dataframe
    
    OUTPUTS:
    average log loss for the prediction data frame
    """
    # Set a small epsilon to correct for extreme singularity
    epsilon = 1e-16
    
    # Get only the first element from the probability dense vector
    firstelement = udf(lambda v:float(v[1]),FloatType())
    df = df.withColumn("p",firstelement("probability"))
    
    # Calculate log loss for each observation and taking the average
    return df.select(df.p, df.label, when(df['label'] == 1.0, -log(df['p'] + epsilon)).otherwise(-log(1 - df['p'] + epsilon)).alias('log_loss')).agg(avg(col("log_loss"))).take(1)
  
def plotLrCharts(confusionMatrix, model):
  """
  plot charts to evaluate baseline model improvement process.
    
  INPUTS:
  confusionMatrix - confusion matrix dataframe
  model - fitted logistic regression model
  """
  confusionMatrix.columns = ['P','N']
  confusionMatrix.index = ['P','N']

  fig = plt.subplots(figsize = (30,15))
 
  # Plot confusion matrix
  plt.subplot(5,5,1)
  sns.heatmap(confusionMatrix, annot=True, fmt="d", annot_kws={"size": 16})
  plt.ylabel('Predicted Value')
  plt.xlabel('Actual Value')
  plt.title('Confusion Matrix')
  
  # Beta coefficients
  beta = np.sort(model.coefficients)
  plt.subplot(5,5,2)
  plt.plot(beta)
  plt.ylabel('Beta')
  plt.title('Beta coefficients')

  # ROC Curve
  trainingSummary = model.summary
  roc = trainingSummary.roc.toPandas()
  plt.subplot(5,5,3)
  plt.plot(roc['FPR'],roc['TPR'])
  plt.ylabel('False Positive Rate')
  plt.xlabel('True Positive Rate')
  plt.title('ROC Curve')

  # Precision / Recall
  pr = trainingSummary.pr.toPandas()
  plt.subplot(5,5,4)
  plt.plot(pr['recall'],pr['precision'])
  plt.ylabel('Precision')
  plt.xlabel('Recall')
  plt.title('Precision vs Recall')

In [42]:
# Test Base Model
sqlContext.clearCache()

numDataset = trainDataset.select(['label','I1','I2','I3', 'I4', 'I5', 'I6', 'I7', 'I8', 'I9', 'I10', 'I11', 'I12', 'I13'])
numDatasetTest = testDataset.select(['I1','I2','I3', 'I4', 'I5', 'I6', 'I7', 'I8', 'I9', 'I10', 'I11', 'I12', 'I13'])

# Transform all features into a vector using VectorAssembler
baseStages = []
assembler = VectorAssembler(inputCols=iCols, outputCol="features")
baseStages += [assembler]

TrainDF, testData = preprocessData(numDataset, numDatasetTest, baseStages)  
trainData, devData = splitTestDev(TrainDF)
metricAUC, metricLogLoss, precision, recall, confusionMatrix, model = baselineModel(trainData, devData)

In [43]:
# Plot baseline model output
plotLrCharts(confusionMatrix, model)

__Baseline Model__

The initial baseline model has AUC: 0.69 and loglogg: 0.52. The winning Kaggle model adressing the same problem achieved a 0.44463 logloss. The metrics above will be used to assess the impact of the feature engineering and modelling techniques.

In [45]:
# Scale Numeric Features
numericStages = []
numeric_assembler = VectorAssembler(inputCols=numericColumns, outputCol="numeric_features")
scaler = StandardScaler(inputCol="numeric_features", outputCol="features", withStd=True, withMean=False)
numericStages += [numeric_assembler, scaler]

In [46]:
# Test Base Model
TrainDF, testData = preprocessData(trainDataset, testDataset, numericStages) 
trainData, devData = splitTestDev(TrainDF)
metricAUC, metricLogLoss, precision, recall, confusionMatrix, model = baselineModel(trainData, devData)

In [47]:
# Plot baseline model output
plotLrCharts(confusionMatrix, model)

__Normalisation__

Introducing normalisation of numeric features does not seem to improve model metrics. In fact we can observe a minor deterioration of performance, at this point however we opt to retain this tranformation in case we may want to test an algorithm that would require scaling.

One-hot encoding is generally the most common way of handling categorical variables. However several categorical features have over 1M unique values. This is prohibitive to us being able to apply a simple one-hot encoding approach.

In the following section we test three separate techniques:
- Hashing
- Binning the long tail of the categorical features and subsequent one hot encoding
- Binning the long tail of the categorical features and dimensionality reduction using chi-square selector

The hashing approach enables us to retain all of the data granularity, however we lose the subsequent interpretability of data as it is not possible to reverse engineer the hasing process. 

In order to enable the other techniques we have introduced binning. We define the percentage ratio limit for categorical feature values. Unless this limit is achieved the value is binned into 'Other' category.

In [50]:
# Update spark settings to avoid network timeout error
spark.conf.set("spark.network.timeout", 800)

In [51]:
# Imputing Categorical Variables - required for OHE and Chi Square only
trainDataset = trainDataset.fillna('Missing')
testDataset = testDataset.fillna('Missing')

In [52]:
def binCategoricalOther(trainDataset, testDataset, cCols, perVal = 0.0001):
  """
  Bin long tail of categorical features into 'Other' value. 
  Any category values that are not present in the train data at least 'perVal' % are binned to 'Other'.
  The training dataset is used to create a list of valid values that meet the threshold. 
  This list is broadcasted and both train and test data are transformed.
    
  INPUTS:
  trainDataset - train dataframe
  testDataset - test dataframe
  cCols - categorical columns to transform
  perVal - percentage threshold below which feature values are binned into 'Other' category value
    
  OUTPUTS:
  trainData - trainsformed train data
  devData - transformed test data
  """
  # Create dataframes that will be transformed
  trainDataset_v2 = trainDataset
  testDataset_v2 = testDataset
  
  # Transformation
  for c in cCols:
    # Define the list of values in a variable that meet the 'perVal' threshold
    total = trainDataset.count()
    valuesToKeep = trainDataset.groupby(c).agg((sparkcount(c)/total).alias('percent')).sort(desc("percent"))
    valuesToKeep = valuesToKeep.filter(valuesToKeep.percent > perVal ).sort(desc("percent")).select(c).toPandas().values.tolist()
    valuesToKeep = list(itertools.chain(*valuesToKeep))
    
    # Broadcast the list for transformation
    sc.broadcast(valuesToKeep)
    
    # Transform the data
    trainDataset_v2 = trainDataset_v2.withColumn(c, when(col(c).isin(valuesToKeep), col(c)).otherwise('Other'))
    testDataset_v2 = testDataset_v2.withColumn(c, when(col(c).isin(valuesToKeep), col(c)).otherwise('Other'))
    
  return trainDataset_v2, testDataset_v2                                            

In [53]:
# Pipeline stages
hashStages = [] 

# Hash categorical variables
hasher = FeatureHasher(inputCols=categoricalColumns,
                       outputCol="hashed_features")
hashStages += [hasher]

# Scale Numeric Features
numeric_assembler = VectorAssembler(inputCols=numericColumns, outputCol="numeric_features")
scaler = StandardScaler(inputCol="numeric_features", outputCol="scaled_numeric_features", withStd=True, withMean=False)
hashStages += [numeric_assembler, scaler]

# Transform all features into a vector using VectorAssembler
assemblerInputs =  ["scaled_numeric_features"] + ["hashed_features"]
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
hashStages += [assembler]

In [54]:
### DEFINE PRE-PROCESSING PIPELINE 
# Pipeline stages
OHEStages = [] 

for categoricalCol in categoricalColumns:
    # Category Indexing with StringIndexer
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    
    # Use OneHotEncoder to convert categorical variables into binary SparseVectors
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    
    # Add stages.  These are not run here, but will run all at once later on.
    OHEStages += [stringIndexer, encoder]

# Scale Numeric Features
numeric_assembler = VectorAssembler(inputCols=numericColumns, outputCol="numeric_features")
scaler = StandardScaler(inputCol="numeric_features", outputCol="scaled_numeric_features", withStd=True, withMean=False)
OHEStages += [numeric_assembler, scaler]

# Transform all features into a vector using VectorAssembler
assemblerInputs =  ["scaled_numeric_features"] + [c + "classVec" for c in categoricalColumns]
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
OHEStages += [assembler]

In [55]:
# Pipeline stages
chiStages = [] 

for categoricalCol in categoricalColumns:
    # Category Indexing with StringIndexer
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")

    # Add stages.  These are not run here, but will run all at once later on.
    chiStages += [stringIndexer]

# Transform all features into a vector using VectorAssembler
assemblerCatInputs = [c + "Index" for c in categoricalColumns]
assemblerCat = VectorAssembler(inputCols=assemblerCatInputs, outputCol="featureVector")
chiStages += [assemblerCat]

# Select categorical features
selector = ChiSqSelector(numTopFeatures=13, featuresCol="featureVector", outputCol="selectedFeatures", labelCol="label")
chiStages += [selector]

# Scale Numeric Features
numeric_assembler = VectorAssembler(inputCols=numericColumns, outputCol="numeric_features")
scaler = StandardScaler(inputCol="numeric_features", outputCol="scaled_numeric_features", withStd=True, withMean=False)
chiStages += [numeric_assembler, scaler]

# Transform all features into a vector using VectorAssembler
assemblerInputs =  ["scaled_numeric_features"] + ["selectedFeatures"]
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
chiStages += [assembler]

In [56]:
# hashing performance test
# Preprocessing stage
TrainDF, testData = preprocessData(trainDataset, testDataset, hashStages)
# Split data
trainData, devData = splitTestDev(TrainDF)
# Fit model and calculate metrics
metricAUC, metricLogLoss, precision, recall, confusionMatrix, model = baselineModel(trainData, devData)

In [57]:
# Plot baseline model output
plotLrCharts(confusionMatrix, model)

__Feature Hashing__

Except for recall, introducing hashed categorical features improves almost all metrics that we have decided to monitor. Hashing categorical features improves our 'True positives' significantly, for every 2.4 'True positives' however, the False positives increase by 1.

In [59]:
# OHE performance test
# Bin categorical variables
trainDataset_v2, testDataset_v2 = binCategoricalOther(trainDataset, testDataset, cCols, 0.0001)
# Preprocessing stage
TrainDF, testData = preprocessData(trainDataset_v2, testDataset_v2, OHEStages)
# Split data
trainData, devData = splitTestDev(TrainDF)
# Fit model and calculate metrics
metricAUC, metricLogLoss, precision, recall, confusionMatrix, model = baselineModel(trainData, devData)

In [60]:
# Plot baseline model output
plotLrCharts(confusionMatrix, model)

__One-Hot Encoding__

Like hashing, introduction of one-hot encoded categorical variables results in significant improvement of our baseline model. The impact on the monitored metrics is consistent with the hashing approach.

In [62]:
# Chi Square performance test
# Bin categorical variables
trainDataset_v2, testDataset_v2 = binCategoricalOther(trainDataset, testDataset, cCols, 0.0001)
# Preprocessing stage
TrainDF, testData = preprocessData(trainDataset_v2, testDataset_v2, chiStages)
# Split data
trainData, devData = splitTestDev(TrainDF)
# Fit model and calculate metrics
metricAUC, metricLogLoss, precision, recall, confusionMatrix, model = baselineModel(trainData, devData)

In [63]:
# Plot baseline model output
plotLrCharts(confusionMatrix, model)

__Chi-Square Selector__

In order to enable chi selector the categorical variable threshold had to be raised higher to 0.1% in comparison to One hot encoding 0.01%. In addition to the binning process we have initially selected 13 features to be selected as part of the process. While this approach may have helped us simplify our model it did not result in improvement of the baseline model and therefore we have made the decision not to incorporate and evaluate this feature engineering technique any further.

__Categorical Feature Engineering Technique Conclusion__

Out of the three techniques Hashing and One Hot Encoding resulted in significant improvement of the baseline model. The hashing technique however, while resulting in worse recall than the One Hot encoding, delivers 128K extra 'True positives'. It is therefore we have decided to move forward with the hashing technique.

Fashing has a significant impact on model interpretabillity. It is not possible to reverse engineer the impact of hashing. Should the interpretability of the feature values be important in order to help business drive commercial decisions one hot encoding would be the preferred option.

## 7. Model Training and Selection

We noted in the previous EDA section that the outcome class is not balanced for the CTR data. Recall the ratio between positive observations (11 millions) and negative observations (34 millions) is roughly 1 to 3. The class imbalance issue could impact model performance and results in poor predictive power in the minority class. In the CTR data, the issue is very relevant because positive label is the minority class. 

To correct for this issue, we tried two different methods: 

__Up Sample Minority Class__

Upward sampling is a method to sample with replacement repeatedly from the minority class to match with the total observations in the majority class. For our training set, we sampled the positive observations in the training set to bring the total number to around 34 millions. Intuitively, it means that roughly each positive observation is used at least three times in model training. 

__Down Sample Majority Class__

Downward sampling is the opposite of upward sampling - instead of adjusting minority class to match with the majority class, alternatively, we can sub-sample the majority class to match with the minority class. In this case, we sampled 11 out of 34 millions of the negative observations in the training set. Intuitively, it means that we are potentially leaving out 2/3 of the information in the negative class.

In [66]:
# Supply hashed data into the rebalancing stage
TrainDF, testData = preprocessData(trainDataset, testDataset, hashStages)

# Define class count
classCount = TrainDF.select('label').groupBy('label').count().take(3)
minorClassCount = classCount[0][1]
minorClassDF = TrainDF.filter(TrainDF['label'] == 1)

majorClassCount = classCount[1][1]
majorClassDF = TrainDF.filter(TrainDF['label'] == 0)

### Adding a classWeight column to the training data set
# Define class weights for minor and major class
majorClassWeight = 1
minorClassWeight = majorClassCount/minorClassCount

# Adding a weight columns to preppedTrainDF
randGen = lambda x: majorClassWeight if x == 0 else minorClassWeight
udfRandGen = udf(randGen, FloatType()) 
TrainDF = TrainDF.withColumn("classWeight", udfRandGen("label"))

### Create new training data sets with downsampled majority class or upsampled minority class
# Calculate the original class size
print("Distribution of 1 and 0 cases in the original training set is: \n", TrainDF.select('label').groupBy('label').count().take(3))

# Downsampling Class 0
TrainDownSampleDF = majorClassDF.sample(False, minorClassCount/majorClassCount).unionAll(minorClassDF)
print(TrainDownSampleDF.select('label').groupBy('label').count().take(3))

# Upsampling Class 1
TrainUpSampleDF = minorClassDF.sample(True, majorClassCount/minorClassCount).unionAll(majorClassDF)
print(TrainUpSampleDF.select('label').groupBy('label').count().take(3))

In [67]:
# Split data
trainData, devData = splitTestDev(TrainUpSampleDF)
# Fit model and calculate metrics
metricAUC, metricLogLoss, precision, recall, confusionMatrix, model = baselineModel(trainData, devData)

In [68]:
# Plot baseline model output
plotLrCharts(confusionMatrix, model)

In [69]:
# Split data
trainData, devData = splitTestDev(TrainDownSampleDF)
# Fit model and calculate metrics
metricAUC, metricLogLoss, precision, recall, confusionMatrix, model = baselineModel(trainData, devData)

In [70]:
# Plot baseline model output
plotLrCharts(confusionMatrix, model)

__Sampling Conclusion__

Because of the sampling approach the confusion matrix, precision and recall are not directly comparable with the previously reported metrics. 

The AUC has improved by 0.2% - Up Sampling and 0.06% - Down Sampling, however the logloss for each of the options has deteriorated and therefore we do not use this technique in our final solution.

In [72]:
# Create RandomForest model
rf = RandomForestClassifier(labelCol="label", featuresCol="features")
rfModel = rf.fit(trainData.sample(False, 0.05)) # => update to full set
predictions = rfModel.transform(devData)

In [73]:
# Cache predictions
predictions.cache()

# Calculate metrics
metricAUC = bcEvaluator.evaluate(predictions)
print("AUC: ", metricAUC)

#metricF1 = mcEvaluator.evaluate(predictions)
#print("mcEvaluator output: ", metricF1)

metricLogLoss = evaluate_log_loss(predictions)
print("logloss: ", metricLogLoss[0][0])

# Clear predictions from cache
predictions.unpersist()

In [74]:
# Create GBT model
gbt = GBTClassifier(labelCol="label", featuresCol="features")
gbtModel = gbt.fit(trainData.sample(False, 0.05)) # => update to full set
predictions = gbtModel.transform(devData)

In [75]:
# Cache predictions
predictions.cache()

# Calculate metrics
metricAUC = bcEvaluator.evaluate(predictions)
print("AUC: ", metricAUC)

#metricF1 = mcEvaluator.evaluate(predictions)
#print("mcEvaluator output: ", metricF1)

metricLogLoss = evaluate_log_loss(predictions)
print("logloss: ", metricLogLoss[0][0])

# Clear predictions from cache
predictions.unpersist()

In [76]:
# Additional libraries required for definition of custom pipeline stage
from pyspark.ml import Transformer
from pyspark.sql import DataFrame

# Create customer Transformer to carry out log transform
class LogTransform(Transformer):
    """
    A custom Transformer which log transforms specific columns.
    """
    # Initialize transformer
    def __init__(self, inputCols):
        super(LogTransform, self).__init__()
        self.iCols = inputCols
        
    # Log transform function    
    def _transform(self, dataDF: DataFrame) -> DataFrame:
        for col_name in iCols:
            dataDF = dataDF.withColumn(col_name, round(log(col(col_name) + 4.0),4))
        
        return dataDF

# define Log stage for the pipeline
num_log = LogTransform(inputCols=['I1','I2','I3', 'I4', 'I5', 'I6', 'I7', 'I9','I11', 'I12', 'I13'])

### End to End pipeline definition ###
# Create LogisticRegression model - pipeline stage
lr = LogisticRegression(labelCol="label", featuresCol="features")

# Define final end to end pipeline stages
finalStages = []
finalStages = [num_imputer, num_log, *hashStages] 
finalStages += [lr]

# Define pipeline
pipeline = Pipeline(stages=finalStages)

In [77]:
# clear cache
sqlContext.clearCache()

# reload source train data
trainDataset = train_parquet

# Set dataframe partitions to enable efficient cross validation parallelism
# Rule of thumb  Number of Cores / # Partitions = cross validation parallelism setting
print(trainDataset.rdd.getNumPartitions())
trainDataset = trainDataset.repartition(12)
print(trainDataset.rdd.getNumPartitions())

# Cache training data
trainDataset.cache()

In [78]:
# Define Parameter Grid for CV
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0, 0.05])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [25])
             .build())

# Cross Validation
cvE2E = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=bcEvaluator, numFolds=5, parallelism = 2)
cvModelE2E = cvE2E.fit(trainDataset)

# Get predictions
predictionsE2E = cvModelE2E.transform(trainDataset)

In [79]:
predictionsE2E.cache()

# Confusion Matrix
TP = predictionsE2E[(predictionsE2E.label == 1) & (predictionsE2E.prediction == 1)].count()
TN = predictionsE2E[(predictionsE2E.label == 0) & (predictionsE2E.prediction == 0)].count()
FP = predictionsE2E[(predictionsE2E.label == 0) & (predictionsE2E.prediction == 1)].count()
FN = predictionsE2E[(predictionsE2E.label == 1) & (predictionsE2E.prediction == 0)].count()
array = [[TP, FP],[FN, TN]]
df_cm = pd.DataFrame(array, range(2), range(2))
  
# Calculate Precision and Recall
metricPrecision = TP/(TP+FP)
print("Precision: ", metricPrecision)
metricRecall = TP/(TP+FN)
print("Recall: ", metricRecall)
  
# Calculate metrics
metricAUC = bcEvaluator.evaluate(predictionsE2E)
print("AUC: ", metricAUC)
  
metricLogLoss = evaluate_log_loss(predictionsE2E)
print("logloss: ", metricLogLoss[0][0])

predictionsE2E.unpersist()

In [80]:
# get best model
bestModel = cvModelE2E.bestModel

# Plot baseline model output
plotLrCharts(df_cm, cvModelE2E)

In [81]:
# Get best parameters
bestPipeline = cvModelE2E.bestModel
bestLRModel = bestPipeline.stages[6]
bestParams = bestLRModel.extractParamMap()
bestParams

In [82]:
# Extract best hyperparameters
best_mod = cvModelE2E.bestModel
param_dict = best_mod.stages[-1].extractParamMap()

sane_dict = {}
for k, v in param_dict.items():
  sane_dict[k.name] = v

best_reg = sane_dict["regParam"]
best_elastic_net = sane_dict["elasticNetParam"]
best_max_iter = sane_dict["maxIter"]

print("Best reg: ", best_reg)
print("Best elastic net parameters: ", best_elastic_net)
print("Best max Iterations: ", best_max_iter)

In [83]:
# Extract Metrics for each cross validation
cvModelE2E.avgMetrics

In [84]:
# Parameter Grid used
paramGrid

In [85]:
# Manually Record Model Results
Models = ["Model 1",
"Model 2",
"Model 3",
"Model 4",
"Model 5",
"Model 6"]

RegParam = [0,0,0,0.5,0.5,0.5]
ElasticNetParam = [0,0.5,1.0,0,0.5,1.0]
MaxIter = [25,25,25,25,25,25]
avgAUC = [0.7846310903086459, 0.7846233955948674, 0.7844952786388535, 0.7847712584250256, 0.6950311682278928, 0.6827879374915752]

# Create a dataframe of model results
data = list(zip(Models, RegParam, ElasticNetParam, MaxIter, avgAUC))
evalDF = pd.DataFrame(data, columns = ["Model", "RegParam", "ElasticNetParam", "MaxIter","avgAUC"])

display(evalDF)

Model,RegParam,ElasticNetParam,MaxIter,avgAUC
Model 1,0.0,0.0,25,0.7846310903086459
Model 2,0.0,0.5,25,0.7846233955948674
Model 3,0.0,1.0,25,0.7844952786388535
Model 4,0.5,0.0,25,0.7847712584250256
Model 5,0.5,0.5,25,0.6950311682278928
Model 6,0.5,1.0,25,0.6827879374915752


# 8. Conclusions

In [87]:
# Generate predictions for entire dataset
# Original intention was to submit the testDataset for logloss evaluation as our Heldout Dataset, however it does not seem to be possible to submit the predictions any longer.
testDataset = test_parquet
finalPredictions = bestModel.transform(testDataset)

In this project we explored three different classification models: Logistic Regression, Random Forest, and Gradient Boosted Tree. So far we have seen the most success in Logistic Regression in terms of both computational efficiency and prediction performance on the development dataset.

Here are some basic statistics to compare the three modelling strategies: 

- Logistic Regression finished within 10 minutes on the entire training data set with hashed categorical variable and produced AUC of 0.785.
- Random Forest took 32 minutes to finish on a 5% of the training data set and produced AUC of 0.600.
- Gradient Boosted Tree took almost 8 hours to complete on a 5% sample of the training data set and produced AUC of 0.733. 

The vast computational advantage is the reason why we chose logistic regression for further development, which includes testing categorical feature engineering and hyperparameter tuning.

We suspect that logistic regression has an advantage over tree-based methods on large scale data set because of its linearity in nature. Perhaps the assumption of the linearity in feature space, as well as the linear decision boundary allows highly optimized Stochastic Descent algorithm for coefficient estimation. On the other hand, we believe Random Forest and GBT have the potential to achieve higher performance, but require much more computation resources and take much longer for fine-tuing parameters to avoid overfitting. 

In the end, our best Logistic Regression model is with L2 regularization and lambda = 0.5, which achieved 0.790 in AUC.

In [89]:
# Manually Record Model Results
Models = ["LR: Numeric Only (Baseline Model)",
"LR: Scaled Numeric",
"LR: Numeric + Hashed Categorical",
"LR: Numeric + OHE Categorical",
"LR: Numeric + Chi-Square Selection",
"LR: Numeric + Hashed Categorical + Up Sample",
"LR: Numeric + Hashed Categorical + Down Sample",
"RF: Numeric + Hashed Categorical (5% Sample)",
"GBT: Numeric + Hashed Categorical (5% Sample)",
"LR: Numeric + Hashed Categorical (Optimized) - DAG",
"LR: Numeric + Hashed Categorical (Optimized)" ]

Minutes = [1.85,2.43,9.15,13.44,10.74,13.52,12.77,32.77,477,228,631]
AUC = [0.695741696,0.695708654,0.785291347,0.773187053,0.697633833,0.78729571,0.785927647,0.600914627,0.733252808,0.78697426,0.7901983543343694]
LogLoss = [0.522797141,0.522846001,0.463562813,0.472858509,0.522085409,0.556037974,0.557642186,0.566578128,0.502643354,0.462142691,0.4608243374895706]
Precision = [0.580026904,0.579571834,0.648865248,0.647038072,0.579396153,0.716565579,0.714278264,None,None,0.655837109,0.6696567273659033]
Recall = [0.138273931,0.13842404,0.335771438,0.299276299,0.133416159,0.70631558,0.707653213,None,None,0.329772816,0.31596156737620174]

# Create a dataframe of model results
data = list(zip(Models, Minutes, AUC, LogLoss, Precision, Recall))
evalDF = pd.DataFrame(data, columns = ["Model", "Minutes", "AUC", "LogLoss","Precision","Recall"])

display(evalDF)

Model,Minutes,AUC,LogLoss,Precision,Recall
LR: Numeric Only (Baseline Model),1.85,0.695741696,0.522797141,0.580026904,0.138273931
LR: Scaled Numeric,2.43,0.695708654,0.522846001,0.579571834,0.13842404
LR: Numeric + Hashed Categorical,9.15,0.785291347,0.463562813,0.648865248,0.335771438
LR: Numeric + OHE Categorical,13.44,0.773187053,0.472858509,0.647038072,0.299276299
LR: Numeric + Chi-Square Selection,10.74,0.697633833,0.522085409,0.579396153,0.133416159
LR: Numeric + Hashed Categorical + Up Sample,13.52,0.78729571,0.556037974,0.716565579,0.70631558
LR: Numeric + Hashed Categorical + Down Sample,12.77,0.785927647,0.557642186,0.714278264,0.707653213
RF: Numeric + Hashed Categorical (5% Sample),32.77,0.600914627,0.566578128,,
GBT: Numeric + Hashed Categorical (5% Sample),477.0,0.733252808,0.502643354,,
LR: Numeric + Hashed Categorical (Optimized) - DAG,228.0,0.78697426,0.462142691,0.655837109,0.329772816


In [90]:
evalDF = evalDF.sort_values(by='AUC', ascending=True)
plt.barh(evalDF.Model, evalDF.AUC, align='center', alpha=0.5)
plt.title('Rank by AUC')

In [91]:
evalDF = evalDF.sort_values(by='LogLoss', ascending=False)
plt.barh(evalDF.Model, evalDF.LogLoss, align='center', alpha=0.5)
plt.title('Rank by LogLoss')

In [92]:
evalDF = evalDF.sort_values(by='Minutes', ascending=False)
plt.barh(evalDF.Model, evalDF.Minutes, align='center', alpha=0.5)
plt.title('Rank by Time Complexity')

## 

The best model in the kaggle leaderboard has a logloss of 0.44463. Our best model has a logloss of 0.460, a difference of approximately 0.015 compared with the leaderboard. If not for the PII obfuscation and column anonymization, we might have been able to better understand the columns and make better decisions during exploratory data analysis and data transformation, leading to better predictions.

A next step would be to extend building the models to include other models such as Support Vector Machines, Naive Bayes, and ensemble methods. Naive Bayes does not work on negative feature values. It has been hard to understand why some of the feature values are negative without knowing more about the columns themselves. It would be help to learn more about the data set schema for a more meaningful, contextual setting of the features. 

Thinking about the above model from a business impact perspective. There are a few business use cases that can leverage the above model. A few of them are listed below. 
1. Estimating the revenue potential from ads based on number of visitors to a website 
2. Prioritizing Criteo's customer businesses that have a higher potential for click through rate 
3. Dynamically pricing the cost of a click based on the click through rate. 
4. Recommending the placement of an ad based on the click through rate etc.

## 8.4 Lessons Learned

__Feature Engineering__
- Hashing is a fast and efficient way to deal with large categorical variables. It enabled us to retain all the information and led to better performance of the model both in terms of runtime as well as metrics.
- For this particular use case and the techniques we have tried feature selection did not lead to model improvement. We have used Chi Square selector to try to select features based on their explanatory relationship with the target variable, however we have found the resulting metrics worse when compared to hashing.

__Model selection__ <br>
We have used logistic regression throughout feature engineering. We have applied the preprocessing pipeline and tested random forest and gradient boosted trees, however the performance of each option has proven to be prohibitive. In the case of these two algorithms, perhaps feature reduction could lead to better and faster results.

__Model hyperparamter tuning__<br>
Logistic regression model had delivered distinctively better and faster results in comparison to random forest and gradient boosted trees. It is therefore we have selected this model for hyperparameter tuning exercise.

__Training time__<br>
One iteration of our E2E pipeline takes approximately 12min. This is consistent with crossvalidation results where one iteration takes approximately 35min. Expected runtime of a 5-fold cross validation is approximately 1hr excluding any preprocessing.
The cluster workload can have a significant impact on the pipeline runtime: 10.5hrs vs 2.4 hrs.

__Optimising the training time__<br>
We have tried a number of techinques to optimise the training time<br>
- parallelism - using the parallelism setting we were able to execute two cross validations in parallel <br>
<img src="https://github.com/Walekova/PipelineTuning/blob/master/Active%20jobs.png?raw=true"/>
- data partitioning - does not seem to have a favourable impact on training time. However the cluster was under heavy workload during training. <br>
- cache - caching training does not lead to runtime improvement. <br>
- DagCrossValidator - experimental cross validator module created by IBM. In theory this module should help spark manage memory and caching and could achieve up to 3.5 better performance by not having to redo same portions of the pipeline several times. However while it worked well for algorithm only pipelines it did not work for more complex pipelines. The module is about 2 years old and while we were able to fix some of the libraries there were other issues with deprecated python functions. <br>

__Importance of training time__<br>
Based on our research click models need to be retrained with relatively high frequency - normally weekly. This is to enable the models to capture changes in consumer behaviour on a timely basis. While it can be computationally expensive, retraining the models could be fully automated and minimise the effort required to redoply the new models. The retraining itself will require similar amount of time, unless more resources can be made available, however the resulting models can be used in a near real time architecture / scenario.

# 9. Course Concepts

__Partitions__

*Concept:* Partitioner class is used to partition data based on keys. The partition columns should be used frequently in queries for filtering and should have a small range of values with enough corresponding data to distribute the files in the directories. One wants to avoid too many small files, which make scans less efficient with excessive parallelism. You also want to avoid having too few large files, which can hurt parallelism.

*Application:* Our data has been partitioned automatically by spark into 83 partitions. We have not adjusted the partitioning of the data as all of our operations have been carried out on the entire dataset and generally the values were not important in the queries. Furthermore based on preliminary EDA all of our variables were highly skewed, which would make it difficult for us to define a good partition key.

We have tried to however repartition our data down to 12 partitions using round robin partitioning, while in both cases the data did fit into memory, by monitoring we have noted that the data split into 12 partitions is more likely to be spilled to disk. The overall impact on single two fold cross validation model runtime was: <br>
83 partitions - 35.5 min (1 set of hyperparameters), 24 min (2 set of hyperparameters)<br>
12 partitions - 49 min (1 set of hyperparameters), 50 min (2 set of hyperparameters)<br>
(note runtimes may not be directly comparable due to shared cluster resources)

The reason for repartitioning of data is a 'rule of thumb' statement from spark conference were cores / partitions = parallelism setting on crossvalidator. The cluster has 24 cores and we wanted to achieve parallelism = 2. 

It is would seem that repartitioning data may not achieve the sought benefit, however it is difficult to draw any conclusions from the empirical evidence as for all except the 83 partition - 24 min run, the cluster was under heavy workload.

__Caching__

*Concept:* Storing data sets in a cluster-wide in-memory cache. This is useful when data is accessed repeatedly, such as running an iterative algorithm. Since operations in Spark are lazy, caching can help force computation.

*Application:* We have tested caching as a way to improve performance.

1. We load the predictions into memory and then calculate a number of metrics that the BinaryClassificationEvaluator does not capture. 
2. We load the training data into memory during cross validation. However the data has not remained in memory for the entire duration of training. The cached data has been moved between memory and disk throughout the training cycle.

The empirical evidence however would suggest that:
- cashing predictions ahead of calculating all metrics improves the performance threefold.
- caching training data does not improve the model runtime, in fact we have observed minor adverse impact.

We have investigated other options of leveraging caching however we stumbled across the following: (Source: https://docs.microsoft.com/en-us/azure/hdinsight/spark/apache-spark-perf)
'Native caching is effective with small data sets as well as in ETL pipelines where you need to cache intermediate results. However, Spark native caching currently doesn't work well with partitioning, since a cached table doesn't keep the partitioning data. A more generic and reliable caching technique is storage layer caching.' As a result, we have decided not to focus on further cache related performance tuning.

__Broadcasting__

*Concept:* Broadcast variables allow for a read-only variable to be cached on each machine / executor.

*Application:* Our dataset is too large to broadcast any data. We had one instance were we were able to leverage this functionality - Feature binning. During the process of binning we analysed the train data and captured valid list of values for each feature. We have then broadcast this list and transformed the train data and test data.

__DAGs__

*Concept:* It is a scheduling layer in a spark which implements stage oriented scheduling. It converts logical execution plan to a physical execution plan. 

*Application:* We have focused on minimising any iterations and shuffles while constructing our final pipeline.

The DAG of our final pipeline: 

<img src="https://github.com/Walekova/PipelineTuning/blob/master/DAG_E2Epipeline.png?raw=true"/>

__Lazy evaluation__ 

*Concept:* An execution will not start until an action is triggered.

*Application:* actions used extensively in this paper to trigger evaluation:
- .show/ .reduce / .count <br>
- .fit <br>

__Model Assumptions__

In this project, we have tested three types of models: Logistic Regression, Random Forest Classification, and Gradient Boosted Tree Classification. 

Logistic Regression requires the log-odds to be expressed as a linear combination fo the feature space. Although normalization is not required for running Logistic Regression, some research has suggested that normalizing numeric features improve performances of Stochastic Gradient Descent by putting all features on the same standard scale, which in turn improves efficiency in model training. In addition, Logistic Regression requires that there are minimal multicollinearity among features and that observations must be independent from one and other. 

In contrast, the tree based methods make minimal assumptions on the dataset. In general tree based methods prefer categorical features over numeric features, because the model effectively is dividing up the feature spaces into partitions. Therefore, all numeric variables are effectively discretized prior to a split. In general Brieman's theorem is used to find the optimal splitting point for both numeric and categorical variables without having to evaluate all possible splitting points. Fortunately for us, the Spark's ML library provides many ready to use modules that tucks away the complexity.

__Model Complexity and Bias-Variance Trade-off__:

Bias-variance trade-off is one of the key criteria throughout our project to balance model complexity. This concept is crucial in reducing the problem of overfitting. Overfitting happens when the model is trained on very specific data sets and produces good training results, but falls short when applied to unseen data. 

In general, prediction error as a result of any machine learning model can be attributed to two sources: bias and variance. A complex model tend to produce large variance. For example, the two tree based models explored in this project have the tendency to overfit. Intuitively, overfitting in tree based method is to grow a very large and complex tree. In this case, the model results could be drastically different depending on the training data set and it does not generalize well on unseen data. In comparison, a simpler model such as the Logistic Regression tends to introduce more errors through bias. It makes rigid assumptions about the feature space: linear relationship, minimal multicollinearity, etc.

However, in practice, even Logistic Regression can result in overfitting. To correct for this, we use the cross-validation to perform regularization and fine-tune the hyperparameters for Logistic Regression. The goal is to achieve a balanced bias and variance trade-off that results in a performant model that is also highly generalizable to unseen data. 

__Regularization__

Regularization helps to control overfitting by penalizing complex models and effectively achieving feature selection by shrinking the coefficients of the not so useful features. When the coefficient becomes zero, the feature can be considered as eliminated. 

In our project, we tried Lasso, Ridge and Elasticnet as regularization method and decided to go with Elasticnet for our CTR use case. With Elasticnet we get the best out of both Lasso and Ridge which is right for our CTR use case with the available data points and ambiguities with the categorical columns. For this project the raw data set provided on Kaggle contains hashed values for categorical variables. Most of the information is not discernible to human eyes. Therefore, we decided that it is best to leave the feature selection to proven methods like Elasticnet to determine which columns in the CTR dataset explains the variance. If we had known the column names, things might have changed from the EDA.

#### Categorical Feature Engineering

__Hashing__

*Concept:* Fast and space-efficient way of vectorizing features, i.e. turning arbitrary features into indices in a vector or matrix.

*Application:* We have applied hashing to all of our categorical variables. This feature engineering has the following benefits:
- it was the fastest out of the three techniques tested
- it enable us to retain all of the granularity in the categorical variables
- it achieved the best model outcome

__Feature Binning__

*Concept:* The original data values which fall into a given small interval, a bin, are replaced by a value representative of that interval.

*Application:* We have tested the following scenarios, hashing all features and selecting features with less than 10000 values and used one hot encoding to evaluate the baseline model outcome. The hashing performed better. on the back of the first test we have worked with a hypethesis that the more granularity we can retain the better the metrics we will be able to achieve. 

It is therefore we have chosen a very simple and fast binning method where only the long tail of the feature values were binned into 'Other' category. We have searched for lowest possible % threshold to apply to enable chi square selection and one hot encoding. Unlike other binning techniques, this does not require reverse enginnering to be applied when interpreting results.

__String Index__

*Concept:*  Encoding of a string column of labels to a column of label indices.

*Application:* Pre-requisite technique to enable one hot encoding as well as Chi Square selector.

__One-Hot Encoding__

*Concept:* One-hot encoding is a methodology to convert categorical data into numeric data. One-hot encoding will create as many dummy variables as the number of categories.

*Application:* After the binning process, we were able to one hot encode all features with all values that had at least 0.01% representation in the training dataset. This enabled us to achieve model performance similar to the hashing technique. 

__Feature Selection__

*Concept:* Select features based on importance to the model.

*Application:* We tested Chi-squared technique for determining feature relevance, the numTopFeatures chooses a fixed number of top features (tested on 13) according to a chi-squared test. - percentile is similar but chooses a fraction of all features instead of a fixed number. However we have found that to enable Chi Square selection we had to bin the long tail of the categorical variables, this preparation step had an adverse impact on the model outcome. Subsequent elimination of any of the features has further deteriorated our model metrics. Therefore we have decided not to proceed with Chi Square selection or any other feature reduction technique.

#### Numeric Feature Engineering

__Normalization__

*Concept:* Method of standardizing features by removing the mean and scaling to unit variance.

*Application:* In our CTR model, we scale the mean of the training data to 0 and standard deviation of the training data to 1. The algorithms we have chosen (logistic regression, random forest, gradient boosted trees) however do not require scaling, hence we have not observed any model improvement. We have decided to retain the transformation in our preprocessing pipeline to be able to leverage the pipeline at a later stage and be able to test other algorithms that require normalization to converge.

__Log Transform__

*Concept:* The log transformation can be used to make highly skewed distributions less skewed. 

*Application:* We have applied log transform to 11 of our 13 numeric variables, we have observed minor improvement 0.2% in model metrics. We have therefore decided to retain the transformation.

#### Feature Engineering

__Vector Embeddings__ 

*Concept:* Method of translating high dimensional vectors into low dimensional space.

*Application:* Used extensively to translate categorical variable output of hashing, one hot encoding and chi square selection into low dimensional space to enable our models to process the data.

# Bibliography
https://www.youtube.com/watch?v=SLqKepl9rEoc <br>
https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/2799933550853697/3911469456159528/2202577924924539/latest.html <br>
https://databricks.com/session/model-parallelism-in-spark-ml-cross-validation <br>
https://github.com/BryanCutler/PipelineTuning <br>
James G., Witten Daniela, Hastie T., & Tibshirani R. (2013). An Introduction to Statstical Learning.