# Case Study: Advertising Analytics - Click Prediction

### Lulu Zhu 17FALL BIA

Advertising teams want to analyze their immense stores and varieties of data requiring a scalable, extensible, and elastic platform. Advanced analytics, including but not limited to classification, clustering, recognition, prediction, and recommendations allow these organizations to gain deeper insights from their data and drive business outcomes. As data of various types grow in volume, Apache Spark provides an API and distributed compute engine to process data easily and in parallel, thereby decreasing time to value. 

Let’s look at a concrete example with the Click-Through Rate Prediction dataset of ad impressions and clicks from the data science website Kaggle. The goal of this workflow is to create a machine learning model that predicts whether or not there will be a click.

To build our advanced analytics workflow, let’s focus on the three main steps:   
- Building the ETL process  
- Exploratory Data Analysis   
- Advanced Analytics / Machine Learning

**Data Source:** https://www.kaggle.com/c/avazu-ctr-prediction/data  
**Note:** This notebook is just a simplified instruction on pySpark. If you want to learn more, please refer to the official document.   
        Also, we will only use sample data in this notebook for less processing time. 

### BUILDING THE ETL PROCESS 

This creates a Spark DataFrame – an immutable, tabular, distributed data structure on Spark cluster.   

In [1]:
import pyspark
from pyspark.sql import SparkSession

# start a spark session 
spark = SparkSession.builder.master("local").appName("CTR Models").config(
    "spark.executor.memory", "8g").config(
    "spark.driver.memory", "15g").getOrCreate()

# load data with inferred schema 
df = spark.read.load("./sample_data.csv", 
                     format="csv", inferSchema="true", header="true")

# The inferred schema can be seen using .printSchema().
df.printSchema()

root
 |-- id: decimal(20,0) (nullable = true)
 |-- click: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- C1: integer (nullable = true)
 |-- banner_pos: integer (nullable = true)
 |-- site_id: string (nullable = true)
 |-- site_domain: string (nullable = true)
 |-- site_category: string (nullable = true)
 |-- app_id: string (nullable = true)
 |-- app_domain: string (nullable = true)
 |-- app_category: string (nullable = true)
 |-- device_id: string (nullable = true)
 |-- device_ip: string (nullable = true)
 |-- device_model: string (nullable = true)
 |-- device_type: integer (nullable = true)
 |-- device_conn_type: integer (nullable = true)
 |-- C14: integer (nullable = true)
 |-- C15: integer (nullable = true)
 |-- C16: integer (nullable = true)
 |-- C17: integer (nullable = true)
 |-- C18: integer (nullable = true)
 |-- C19: integer (nullable = true)
 |-- C20: integer (nullable = true)
 |-- C21: integer (nullable = true)



### EXPLORATORY DATA ANALYSIS

In this section, we will use two methods to explore the datafame:   
- Build-in methods from spark dataframe. Check all the methods from https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame
- Querying data using SQL  


### 1. Using Spark DataFrame Methods 

In [2]:
# select columns 
df.select("click").show(3)

+-----+
|click|
+-----+
|    0|
|    1|
|    0|
+-----+
only showing top 3 rows



In [3]:
# count only unique values in one column 
df.select("click").distinct().count()

2

In [4]:
# count the occurance of each unique value 
df.groupBy("click").count().show()

+-----+-----+
|click|count|
+-----+-----+
|    1| 6905|
|    0|33348|
+-----+-----+



In [5]:
# get statistics by 'describe'
df.describe(["app_id"]).show()

+-------+--------+
|summary|  app_id|
+-------+--------+
|  count|   40253|
|   mean|Infinity|
| stddev|     NaN|
|    min|000d6291|
|    max|ffc6ffd0|
+-------+--------+



In [6]:
# get statistics by 'summary' after selection 
# Available statistics are: - count - mean - stddev - min - max 
                          # - arbitrary approximate percentiles specified as a percentage (eg, 75%)
df.select("C14","C15","C16","C17").summary("count", "min","25%", "max").show()

+-------+-----+-----+-----+-----+
|summary|  C14|  C15|  C16|  C17|
+-------+-----+-----+-----+-----+
|  count|40253|40253|40253|40253|
|    min|  375|  120|   20|  112|
|    25%|16920|  320|   50| 1863|
|    max|24041| 1024| 1024| 2756|
+-------+-----+-----+-----+-----+



In [7]:
# select rows by condition, where() is an alias for filter().
df.filter(df.click == 1).show(3)
# df.where(df.click == 1).show(3)

+--------------------+-----+--------+----+----------+--------+-----------+-------------+--------+----------+------------+---------+---------+------------+-----------+----------------+-----+---+---+----+---+---+------+---+
|                  id|click|    hour|  C1|banner_pos| site_id|site_domain|site_category|  app_id|app_domain|app_category|device_id|device_ip|device_model|device_type|device_conn_type|  C14|C15|C16| C17|C18|C19|   C20|C21|
+--------------------+-----+--------+----+----------+--------+-----------+-------------+--------+----------+------------+---------+---------+------------+-----------+----------------+-----+---+---+----+---+---+------+---+
|10114646856300029525|    1|14102100|1005|         0|1fbe01fe|   f3845767|     28905ebd|ecad2386|  7801e8d9|    07d7df22| a99f214a| 093fa194|    5096d134|          1|               0|15701|320| 50|1722|  0| 35|100084| 79|
|10313349640527177600|    1|14102100|1005|         0|5b08c53b|   7687a86e|     3e814130|ecad2386|  7801e8d9|    

### 2. Using Spark SQL 

In [8]:
# create a temperate view: createTempView vs createOrReplaceTempView
df.createTempView("test")

# calculates the click through rate (CTR) by banner position
spark.sql("select banner_pos,\
          sum(case when click = 1 then 1 else 0 end) / (count(1) * 1.0) as CTR from test\
          group by 1\
          order by 1").show()

# drop the temperate view 
spark.catalog.dropTempView("test")

+----------+-------------------+
|banner_pos|                CTR|
+----------+-------------------+
|         0|0.16690831262082298|
|         1|0.18319657509810917|
|         2|0.16666666666666667|
|         3|              0E-17|
|         4|0.20000000000000000|
|         5|              0E-17|
|         7|0.26666666666666667|
+----------+-------------------+



### PREDICT THE CLICKS

Three common procedures in Machine Learning will be done with Spark in this session.
- Data Enrichment 
- Feature Engineering Pipline 
- Machine Learning Model & Evaluation


### 1. Data Enrichment  - Date
We can always generate some new columns from existing columns. Sometimes, those derived columns may be more relevant to the target than their original columns. Therefore, in this part, we will first generate "date" column from original "hour" column. The "date" column is in datetime type. Then new features such as day_of_week and hour_of_day will be derived from "date" column. 

In [9]:
from pyspark.sql.types import StringType
from pyspark.sql.functions import to_timestamp, date_format,hour

# withColumn(): Returns a new DataFrame by adding a column or replacing the existing column that has the same name.

# 1. Create a string column 'date' from 'hour'
df = df.withColumn("date", df["hour"].cast(StringType()))

# 2. Convert the 'date' column to time format 
df = df.withColumn("date", to_timestamp("date", "yyMMddHH"))

# 3. Create a new column 'day_of_week' from 'date'
df = df.withColumn("day_of_week", date_format("date", "E"))

# 4. Create a new column 'hour_of_day' from 'date'
df = df.withColumn("hour_of_day", hour("date"))

In [10]:
df.show(2)

+--------------------+-----+--------+----+----------+--------+-----------+-------------+--------+----------+------------+---------+---------+------------+-----------+----------------+-----+---+---+----+---+---+------+---+-------------------+-----------+-----------+
|                  id|click|    hour|  C1|banner_pos| site_id|site_domain|site_category|  app_id|app_domain|app_category|device_id|device_ip|device_model|device_type|device_conn_type|  C14|C15|C16| C17|C18|C19|   C20|C21|               date|day_of_week|hour_of_day|
+--------------------+-----+--------+----+----------+--------+-----------+-------------+--------+----------+------------+---------+---------+------------+-----------+----------------+-----+---+---+----+---+---+------+---+-------------------+-----------+-----------+
|10092095701542991716|    0|14102100|1005|         0|85f751fd|   c4e18dd6|     50e219e0|f888bf4c|  5b9c592b|    0f2161f8| 33d33eb8| 15fb30ea|    be6db1d7|          1|               0|18987|320| 50|2158|

### 2.  Feature Engineering Pipline 
Once we have familiarized ourselves with our data, we can proceed to the machine learning phase, where we convert our data into features for input to a machine learning algorithm and produce a trained model with which we can predict. Because Spark ML algorithms take **a column of feature vectors of doubles as input**, a typical feature engineering workflow includes:  
- Identifying numeric and categorical features   
- String indexing  
- Assembling them all into a sparse vector  

#### 2.1 Identifying numeric and categorical features with less than 70 categories 

In [11]:
maxBins = 70
str_featues = []
num_features = []

for feature in df.dtypes:
    
    # print unique value and data type for each feature
    feature_name = str(feature[0])
    feature_type = str(feature[1])
    uni = df.select(feature_name).distinct().count()
    print("unique number of {} ({}):".format(feature_name, feature_type),uni)
    
    # find out features with integer type
    if feature_type == "int" or feature_type == "decimal(20,0)":
        
        num_features.append(feature_name)
        
    # find out string feature and it's unique value less than maxBins
    elif feature_type == "string" and uni < maxBins:
        
        str_featues.append(feature_name)

unique number of id (decimal(20,0)): 40253
unique number of click (int): 2
unique number of hour (int): 240
unique number of C1 (int): 7
unique number of banner_pos (int): 7
unique number of site_id (string): 1054
unique number of site_domain (string): 904
unique number of site_category (string): 18
unique number of app_id (string): 877
unique number of app_domain (string): 68
unique number of app_category (string): 20
unique number of device_id (string): 6858
unique number of device_ip (string): 33788
unique number of device_model (string): 2378
unique number of device_type (int): 4
unique number of device_conn_type (int): 4
unique number of C14 (int): 1445
unique number of C15 (int): 7
unique number of C16 (int): 8
unique number of C17 (int): 380
unique number of C18 (int): 4
unique number of C19 (int): 63
unique number of C20 (int): 149
unique number of C21 (int): 59
unique number of date (timestamp): 240
unique number of day_of_week (string): 7
unique number of hour_of_day (int): 2

In [12]:
print("There is in total {} numerical features:{}".format(len(num_features),num_features)) 
print("There is in total {} string features:{}".format(len(str_featues),str_featues)) 

There is in total 16 numerical features:['id', 'click', 'hour', 'C1', 'banner_pos', 'device_type', 'device_conn_type', 'C14', 'C15', 'C16', 'C17', 'C18', 'C19', 'C20', 'C21', 'hour_of_day']
There is in total 4 string features:['site_category', 'app_domain', 'app_category', 'day_of_week']


In [13]:
# remove id and label from features 
num_features.remove("id")
num_features.remove("click")
num_features.remove("hour")

#### 2.2 String indexing

StringIndexer encodes a string column of labels to a column of label indices. The indices are in [0, numLabels)

In [14]:
from pyspark.ml.feature import StringIndexer, VectorAssembler, ChiSqSelector

indexers = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in str_featues]

**Stringindexer vs One-Hot-Encoder**

You may have noticed that while we use string indexer but we are not applying One Hot Encoder (OHE). When using StringIndexer, categorical features are kept as k-ary categorical features. A tree node will test if feature X has a value in {subset of categories}. With OHE: Your categorical features are turned into a bunch of binary features. A tree node will test if feature X = category a vs. all the other categories (one vs. rest test).  

When using only StringIndexer, the benefits include:  
• There are fewer features to choose  
• Each node’s test is more expressive than with binary 1-vs-rest features  

Therefore, for because for tree based methods, it is preferable to not use OHE as it is a less expressive test and it takes up more space. But for non-tree- based algorithms such as like linear regression, you must use OHE or else the model will impose a false and misleading ordering on categories.

#### 2.3 Assembling them all into a sparse vector

VectorAssembler is a transformer that combines a given list of columns into a single vector column. It is useful for combining raw features and features generated by different feature transformers into a single feature vector, in order to train ML models like logistic regression and decision trees. 

VectorAssembler accepts the following input column types: all numeric types, boolean type, and vector type. In each row, the values of the input columns will be concatenated into a vector in the specified order.

In [15]:
index_names = ["{}_index".format(x) for x in str_featues] + num_features

assembler = VectorAssembler(inputCols= index_names, outputCol="features")

#### 2.4 Building the Pipline

In [16]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=indexers+[assembler])

data = pipeline.fit(df).transform(df)

In [17]:
data.head(1)

[Row(id=Decimal('10092095701542991716'), click=0, hour=14102100, C1=1005, banner_pos=0, site_id='85f751fd', site_domain='c4e18dd6', site_category='50e219e0', app_id='f888bf4c', app_domain='5b9c592b', app_category='0f2161f8', device_id='33d33eb8', device_ip='15fb30ea', device_model='be6db1d7', device_type=1, device_conn_type=0, C14=18987, C15=320, C16=50, C17=2158, C18=3, C19=291, C20=100190, C21=61, date=datetime.datetime(2014, 10, 21, 0, 0), day_of_week='Tue', hour_of_day=0, site_category_index=0.0, app_domain_index=13.0, app_category_index=1.0, day_of_week_index=0.0, features=DenseVector([0.0, 13.0, 1.0, 0.0, 1005.0, 0.0, 1.0, 0.0, 18987.0, 320.0, 50.0, 2158.0, 3.0, 291.0, 100190.0, 61.0, 0.0]))]

In [18]:
# DenseVector VS SparseVector
from  pyspark.mllib.linalg import SparseVector 
a = SparseVector(5, {0: 1.0, 2: 2.0})
a.toArray()

array([1., 0., 2., 0., 0.])

### 3. Modeling - GBT

#### 3.1 Spliting train and test 

In [19]:
# select feature col 
input_data = data.select(["features","click"])

# split dataset into train set and test set 
train_data, test_data = input_data.randomSplit([.8,.2])

#### 3.2 the GBT model 

Gradient boosting is a machine learning technique for classification problems, which produces a prediction model in the form of an ensemble of weak prediction models, typically decision trees. 

In [20]:
from pyspark.ml.classification import GBTClassifier

# define the classifier 
gbt = GBTClassifier(labelCol="click", featuresCol="features", maxBins=maxBins)

# train the classifier with train data 
GBT = gbt.fit(train_data)

# predict test data 
predictions = GBT.transform(test_data)

In [21]:
# check out some characteristics of the model 
GBT.featureImportances

SparseVector(17, {0: 0.0831, 1: 0.2354, 2: 0.0835, 3: 0.0791, 4: 0.0291, 5: 0.0126, 7: 0.0137, 8: 0.1002, 9: 0.0018, 10: 0.0165, 11: 0.0292, 12: 0.0529, 13: 0.0715, 14: 0.0386, 15: 0.0643, 16: 0.0884})

In [22]:
GBT.getNumTrees

20

#### 3.3 Choosing Evaluation method 

In [23]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# choose the auc score as evaluator 
evaluator = BinaryClassificationEvaluator(labelCol="click", metricName='areaUnderROC')

accuracy = evaluator.evaluate(predictions)

In [24]:
print(accuracy)

0.6752495349310862


### Future Work
**1. Unbalanced Data**: resampling...  
**2. More Feature Engineering**: feature hashing...  
**3. Feature Selection**: ChiSqSelector ...   
**4. Parameter Optimization**: Grid Search ...

### Reference 
- Four Real-Life Machine Learning Use Cases, Databricks    
- Apache Spark 2.4.1 https://spark.apache.org/docs/latest/api/python/index.html  
- Gradient boosting https://en.wikipedia.org/wiki/Gradient_boosting