# Final Project - Criteo Labs Display Advertising Challenge
__`MIDS w261: Machine Learning at Scale | UC Berkeley School of Information | Spring 2019`__

__`Team: Chi Iong Ansjory, Catherine Cao, Scott Xu`__

Table of Content:
- [0. Background](#background)
- [1. Question Formulation](#question_formulation)
- [2. Algorithm Explanation](#algorithm_explanation)
- [3. EDA & Discussion of Challenges](#eda_challenges)
- [4. Algorithm Implementation](#algorithm_implementation)
- [5. Application of Course Concepts](#course_concepts_application)

<a id='background'></a>
# 0. Background

Criteo Labs is a leading global technology company that specializes in performance display advertising, working with over 4,000 e-commerce companies around the world. Their technology takes an algorithmic approach to determining what user they show an advertisement to, when, and for what products. For billions of unique advertisements that are created and displayed at lightning fast speeds every day.

Display advertising is a billion dollar effort and one of the central uses of Machine Learning on the Internet. However, its data and methods are usually kept confidential. Through the Kaggle research competition, Criteo Labs is sharing a week’s worth of data for participants to develop models predicting advertisement click-through rate (CTR). Given a user and the page being visited, what is the probability that the user will click on a given advertisement?

Source: https://www.kaggle.com/c/criteo-display-ad-challenge

For the dataset, the smaller version is no longer available from Kaggle. The full-size version needs to be used instead from Criteo Labs.

Source: https://www.kaggle.com/c/criteo-display-ad-challenge/data (smaller version - obsoleted); http://labs.criteo.com/2014/02/kaggle-display-advertising-challenge-dataset/ (full-size version)

### Notebook Set-Up

In [1]:
# imports
import re
import ast
import time
import numpy as np
import pandas as pd
import seaborn as sns
import networkx as nx
import matplotlib.pyplot as plt
import pandas as pd

import os
import json

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.sql.functions import lit
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.sql.functions import udf


In [2]:
%reload_ext autoreload
%autoreload 2

In [3]:
# store path to notebook
PWD = !pwd
PWD = PWD[0]

<a id='question_formulation'></a>
# 1. Question Formulation

The goal of this analysis is to benchmark the most accurate ML algorithms for CTR estimation.

<a id='algorithm_explanation'></a>
# 2. Algorithm Explanation

Random Forest is an ensemble method for classification and regression. The algorithm creates multiple trees. Each tree will give a prediction on its own. And final prediction is the most common from all the trees(classification) or the average (regression). In order to remove the correlation between each tree, it use bagging to sample 1) the training data 2) the features. So that each tree will have slightly different input. Overall random forest helps improve the model performance and avoid the overfitting.

<a id='eda_challenges'></a>
# 3. EDA & Discussion of Challenges

The main challenges are the dataset given for this analysis has no column labels. We can't leverage any of our pre-existing knowledge about how online ads are served and CTR is computed in understanding the data. This means we have to put in extra effort in analyzing the data so we can understand the relationships between different features in the dataset and process them appropriately.

In [4]:

app_name = "final"
master = "local[*]"
MAX_MEMORY = "4g"
spark = SparkSession\
        .builder\
        .appName(app_name)\
        .master(master)\
        .config("spark.executor.memory", MAX_MEMORY) \
        .config("spark.driver.memory", MAX_MEMORY) \
        .getOrCreate()
sc = spark.sparkContext


In [16]:
rdd = sc.textFile("./data/train.txt")

#rdd_sample = rdd.sample(fraction = 0.001, withReplacement= False).cache()

numeric_features = ['I'+str(i) for i in [x for x in range(1, 14) if x !=12] ]
categorical_features = ['C'+str(i) for i in [x for x in range(1, 27) if x !=22]]
header = ['target'] + numeric_features + categorical_features

df = rdd.map(lambda x: x.split("\t")).toDF(header).cache()

for var in ['target'] + numeric_features:
    df =df.withColumn(var, df[var].cast(IntegerType()))

In [20]:
rdd.count()

45840617

In [22]:
rdd_test = sc.textFile("./data/test.txt")
rdd_test.count()

6042135

<a id='algorithm_implementation'></a>
# 4. Algorithm Implementation

In [17]:
df.replace('', None, categorical_features)
with open("imputation_int.json") as json_file:  
    impute = json.load(json_file)
    del impute['C22']
    del impute['I12']
df_impute = df.replace('', None, categorical_features)
df_impute = df_impute.fillna(impute)

hex_string=udf(lambda x:int(x,16),IntegerType())

for col in categorical_features:
    df_impute = df_impute.withColumn(col,hex_string(col))

In [18]:
def preprocess(df):
    stages = []
#     categorical_features_index=[]
#    ['C1','C2','C5','C6','C8','C9','C11','C13','C14','C15','C17','C18','C19','C20','C22','C23','C25']
#     for categoricalCol in categorical_features:
#         stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
#         encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
#         #encoder = VectorIndexer(inputCol=[stringIndexer.getOutputCol()], outputCol=[categoricalCol + "classVec"])
#         categorical_features_index += [categoricalCol + 'Index']
#         stages += [stringIndexer,encoder]
    #print(categorical_features_index)

    vector_assembler = VectorAssembler( \
        inputCols= numeric_features+ categorical_features, \
        outputCol="features")

    stages += [vector_assembler] 
    pipeline = Pipeline(stages = stages)

    pipelineModel = pipeline.fit(df)
    df_temp = pipelineModel.transform(df)
    
    return df_temp, pipelineModel


In [19]:
start = time.time()
df_temp, pipelineModel = preprocess(df_impute)
print("Wall time: {} seconds".format(time.time() - start))
df_temp.printSchema()

Wall time: 0.02083730697631836 seconds
root
 |-- target: integer (nullable = true)
 |-- I1: integer (nullable = false)
 |-- I2: integer (nullable = false)
 |-- I3: integer (nullable = false)
 |-- I4: integer (nullable = false)
 |-- I5: integer (nullable = false)
 |-- I6: integer (nullable = false)
 |-- I7: integer (nullable = false)
 |-- I8: integer (nullable = false)
 |-- I9: integer (nullable = false)
 |-- I10: integer (nullable = false)
 |-- I11: integer (nullable = false)
 |-- I13: integer (nullable = false)
 |-- C1: integer (nullable = true)
 |-- C2: integer (nullable = true)
 |-- C3: integer (nullable = true)
 |-- C4: integer (nullable = true)
 |-- C5: integer (nullable = true)
 |-- C6: integer (nullable = true)
 |-- C7: integer (nullable = true)
 |-- C8: integer (nullable = true)
 |-- C9: integer (nullable = true)
 |-- C10: integer (nullable = true)
 |-- C11: integer (nullable = true)
 |-- C12: integer (nullable = true)
 |-- C13: integer (nullable = true)
 |-- C14: integer (null

In [10]:
(trainingData, testData) = df_temp.randomSplit([0.7, 0.3])
start = time.time()
rf = RandomForestClassifier(labelCol="target",\
featuresCol="features", numTrees=100)
model = rf.fit(trainingData)

predictions = model.transform(testData)
evaluator = BinaryClassificationEvaluator(labelCol = "target")

print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))
print("Wall time: {} seconds".format(time.time() - start))

Test Area Under ROC: 0.7083715760474067
Wall time: 3488.9581015110016 seconds


In [38]:
predictions

DataFrame[target: int, I1: int, I2: int, I3: int, I4: int, I5: int, I6: int, I7: int, I8: int, I9: int, I10: int, I11: int, I12: int, I13: int, C1: string, C2: string, C3: string, C4: string, C5: string, C6: string, C7: string, C8: string, C9: string, C10: string, C11: string, C12: string, C13: string, C14: string, C15: string, C16: string, C17: string, C18: string, C19: string, C20: string, C21: string, C22: string, C23: string, C24: string, C25: string, C26: string, C6Index: double, C6classVec: vector, C9Index: double, C9classVec: vector, C14Index: double, C14classVec: vector, C17Index: double, C17classVec: vector, C20Index: double, C20classVec: vector, C23Index: double, C23classVec: vector, C25Index: double, C25classVec: vector, features: vector, rawPrediction: vector, probability: vector, prediction: double]

In [11]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *
import math

def logloss(prob, actual):
    if actual == 1:
        return -math.log(prob)
    else:
        return -math.log(1-prob)

udf_logloss = udf(logloss, FloatType())

firstelement=udf(lambda v:float(v[1]),FloatType())
logloss = predictions.select(firstelement('probability').alias('probability'), 'target').withColumn("logloss", udf_logloss("probability","target"))\
       .groupby().avg('logloss').collect()
print (logloss)


[Row(avg(logloss)=0.5186227341338779)]


In [20]:
rdd_test = sc.textFile("./data/test.txt")
numeric_features = ['I'+str(i) for i in range(1, 14)]
categorical_features = ['C'+str(i) for i in range(1, 27)]
header_test = numeric_features + categorical_features
df_test = rdd_test.map(lambda x: x.split("\t")).toDF(header_test).cache()

for var in numeric_features:
    df_test =df_test.withColumn(var, df_test[var].cast(IntegerType()))
    
df_test.replace('', None, categorical_features)
with open("imputation_int.json") as json_file:  
    impute = json.load(json_file)
df_impute_test=df_test.replace('', None, categorical_features)
df_impute_test = df_impute_test.fillna(impute)

hex_string=udf(lambda x:int(x,16),IntegerType())

for col in categorical_features:
    df_impute_test = df_impute_test.withColumn(col,hex_string(col))

df_temp_test = pipelineModel.transform(df_impute_test)

predictions_test = model.transform(df_temp_test)



In [25]:
pred = predictions_test.select(firstelement('probability').alias('Predicted')).toPandas()
submission = pd.read_csv('random_submission.csv')
submission2 = pd.concat([submission['Id'], pred['Predicted']], axis=1)
#submission2.rename(columns={'probability':'Predicted'}, inplace=True)
submission2.to_csv('predictions_rf3.csv', index=False)

In [26]:
submission2.head()

Unnamed: 0,Id,Predicted
0,60000000,0.179017
1,60000001,0.272029
2,60000002,0.355309
3,60000003,0.183668
4,60000004,0.333123


In [27]:
predictions.select('target', 'rawPrediction', 'prediction', 'probability').show(10)

+------+--------------------+----------+--------------------+
|target|       rawPrediction|prediction|         probability|
+------+--------------------+----------+--------------------+
|     0|[85.9569560005486...|       0.0|[0.85956956000548...|
|     0|[81.0723273102391...|       0.0|[0.81072327310239...|
|     0|[76.8602831987373...|       0.0|[0.76860283198737...|
|     0|[84.1087912534379...|       0.0|[0.84108791253437...|
|     0|[79.3108344510999...|       0.0|[0.79310834451099...|
|     0|[75.0067298049367...|       0.0|[0.75006729804936...|
|     0|[81.5320252434899...|       0.0|[0.81532025243489...|
|     0|[81.3178605316716...|       0.0|[0.81317860531671...|
|     0|[78.7006942259489...|       0.0|[0.78700694225948...|
|     0|[84.2800906835654...|       0.0|[0.84280090683565...|
+------+--------------------+----------+--------------------+
only showing top 10 rows



<a id='course_concepts_application'></a>
# 5. Application of Course Concepts