# Click-Through Rate Example

In [1]:
# import pyspark class Row from module sql
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.types import *
from pyspark.conf import SparkConf

#Generate Spark Session

spark = SparkSession \
    .builder \
    .master("local[4]")\
    .appName("CTR Example") \
    .config("spark.executor.memory", "16g") \
    .config("spark.driver.memory", "16g") \
    .config("spark.driver.maxResultSize", "16g") \
    .getOrCreate()


#Read the training data. from csv
df_train = spark.read.csv('train.csv', header = True, inferSchema = True, mode="DROPMALFORMED")

#Split the data in order to fit the memory of the local machine.
(df_train, df_rest) = df_train.randomSplit([0.0025, 0.9975], 100000)

df_train.count()

100836

In [2]:
from pyspark.sql.functions import year, month, dayofmonth
from pyspark.sql.functions import unix_timestamp, from_unixtime
#InferSchema predicts the date column as int, therefore there is a need to change the data type.

#Convert date datatype from integer to String.
df_train = df_train.withColumn("hour", df_train["hour"].cast(StringType()))

#Convert date from String to TimeStamp.
df_train = df_train.withColumn("hour", unix_timestamp("hour", "yyMMddHH").cast(TimestampType()))

print(df_train.dtypes)

[('id', 'decimal(20,0)'), ('click', 'int'), ('hour', 'timestamp'), ('C1', 'int'), ('banner_pos', 'int'), ('site_id', 'string'), ('site_domain', 'string'), ('site_category', 'string'), ('app_id', 'string'), ('app_domain', 'string'), ('app_category', 'string'), ('device_id', 'string'), ('device_ip', 'string'), ('device_model', 'string'), ('device_type', 'int'), ('device_conn_type', 'int'), ('C14', 'int'), ('C15', 'int'), ('C16', 'int'), ('C17', 'int'), ('C18', 'int'), ('C19', 'int'), ('C20', 'int'), ('C21', 'int')]


In [3]:
from collections import defaultdict
#Collect the column numbers for later operations.
data_types = defaultdict(list)
for entry in df_train.schema.fields:
    data_types[str(entry.dataType)].append(entry.name)
print(data_types)

defaultdict(<class 'list'>, {'DecimalType(20,0)': ['id'], 'IntegerType': ['click', 'C1', 'banner_pos', 'device_type', 'device_conn_type', 'C14', 'C15', 'C16', 'C17', 'C18', 'C19', 'C20', 'C21'], 'TimestampType': ['hour'], 'StringType': ['site_id', 'site_domain', 'site_category', 'app_id', 'app_domain', 'app_category', 'device_id', 'device_ip', 'device_model']})


In [4]:
from pyspark.sql.functions import countDistinct, approxCountDistinct
#Extract the string data type for datatype conversion string-->numeric

counts_summary = df_train.agg(*[countDistinct(c).alias(c) for c in data_types["StringType"]])
counts_summary = counts_summary.toPandas()
import pandas as pd
counts = pd.Series(counts_summary.values.ravel())
counts.index = counts_summary.columns

#Ignore the columns that passed the hardcoded threshold in order to relax the hardware overloading problem and 
#to fasten the overall progress.

sorted_vars = counts.sort_values(ascending = False)
ignore = list((sorted_vars[sorted_vars >1000]).index)

#Print the ignored colums
print(ignore)

['device_ip', 'device_id', 'device_model', 'site_id', 'site_domain', 'app_id']


In [5]:
#Use built-in imputer for the double type features to fill mising data.
numericals = data_types["DoubleType"]
numericals = [var for var in numericals if var not in ignore]
numericals_imputed = [var + "_imputed" for var in numericals]

from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols = numericals, outputCols = numericals_imputed)
df_train = imputer.fit(df_train).transform(df_train)

In [6]:
#Cast the integer type values to double to use Imputer function again to fill missing data.
for c in data_types["IntegerType"]:
    if c != "click":
        df_train = df_train.withColumn(c+ "_cast_to_double", df_train[c].cast("double"))
cast_vars = [var for var in  df_train.columns if var.endswith("_cast_to_double")]
cast_vars_imputed  = [var+ "imputed" for var in cast_vars]

imputer_for_cast_vars = Imputer(inputCols = cast_vars, outputCols = cast_vars_imputed)
df_train = imputer_for_cast_vars.fit(df_train).transform(df_train)
#df_train.show()

In [7]:
#Find and replace missing data in case of existance for string type.
strings_used = [var for var in data_types["StringType"] if var not in ignore]
print(strings_used)
missing_data_fill = {}
for var in strings_used:
    missing_data_fill[var] = "missing"

df_train = df_train.fillna(missing_data_fill)

['site_category', 'app_domain', 'app_category']


In [8]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer
#Use one-hot encoding method to convert string values in to numeric values.
stage_string = [StringIndexer(inputCol= c, outputCol= c+"_string_encoded") for c in strings_used]
stage_one_hot = [OneHotEncoder(inputCol= c+"_string_encoded", outputCol= c+"_one_hot") for c in strings_used]

ppl = Pipeline(stages= stage_string + stage_one_hot)
df_train = ppl.fit(df_train).transform(df_train)

In [9]:
#Assemble all of the features in to one column called 'features' for further.
from pyspark.ml.feature import VectorAssembler
features = cast_vars_imputed + numericals_imputed \
              + [var + "_one_hot" for var in strings_used]

vector_assembler = VectorAssembler(inputCols = features, outputCol= "features")
data_training_and_test = vector_assembler.transform(df_train)
data_training_and_test.show(1)

+----------------+-----+-------------------+----+----------+--------+-----------+-------------+--------+----------+------------+---------+---------+------------+-----------+----------------+-----+---+---+----+---+---+---+---+-----------------+-------------------------+--------------------------+-------------------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+--------------------------------------+-------------------------+-------------------------+--------------------------------+------------------------+---------------------------------+-------------------------+-------------------------+-------------------------+-------------------------+-------------------------+-------------------------+----------------------------+-------------------------+---------------------------+---------------------+------------------+--------------------+--------------------+
|             

In [10]:
#Now, use PCA to make dimension reduction and have the best 'k' variances.
from pyspark.ml.feature import PCA

pca_model = PCA(k = 10,inputCol = "features", outputCol = "pca_features")
model = pca_model.fit(data_training_and_test)
data_training_and_test = model.transform(data_training_and_test)
data_training_and_test.show()

+------------------+-----+-------------------+----+----------+--------+-----------+-------------+--------+----------+------------+---------+---------+------------+-----------+----------------+-----+---+---+----+---+---+------+---+-----------------+-------------------------+--------------------------+-------------------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+--------------------------------------+-------------------------+-------------------------+--------------------------------+------------------------+---------------------------------+-------------------------+-------------------------+-------------------------+-------------------------+-------------------------+-------------------------+----------------------------+-------------------------+---------------------------+---------------------+------------------+--------------------+--------------------+----------

In [11]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
#Test the data with a Random Forest Classifier to see the accuracy result.
(training_data, test_data) = data_training_and_test.randomSplit([0.7, 0.3], 35000)
rf = RandomForestClassifier(labelCol = "click", featuresCol = "features", numTrees = 20)
rf_model_predictions = rf.fit(training_data).transform(test_data)
evaluator= BinaryClassificationEvaluator(labelCol = "click", rawPredictionCol="probability", metricName= "areaUnderROC")
accuracy = evaluator.evaluate(rf_model_predictions)
print("Test Error Before Cross Validation = " + str(1.0 - accuracy))

Test Error Before Cross Validation = 0.3705399467631496


In [12]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator as BCE
#Now use cross validation for hyper-parameter tuning to see the best results.
pca_model = PCA(inputCol = "features", outputCol = "pca_features_cv")
rf = RandomForestClassifier(labelCol = "click", featuresCol = "features")
ppl_cv = Pipeline(stages = [pca_model, rf])

paramGrid = ParamGridBuilder() \
      .addGrid(pca_model.k, [5, 10, 15, 20, 25]) \
      .addGrid(rf.numTrees, [20, 30, 50]) \
      .build()

crossval = CrossValidator(estimator = ppl_cv,
                            estimatorParamMaps=paramGrid,
                            evaluator = BCE(labelCol = "click",\
                                            rawPredictionCol = "probability",\
                                            metricName = "areaUnderROC"),
                            numFolds= 3)

cv_model = crossval.fit(training_data)

In [13]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
predictions = cv_model.transform(test_data)
#Finally Evaluate the results with a probabilistic prediction approach and use ROC as the metric.
evaluator= BinaryClassificationEvaluator(labelCol = "click", rawPredictionCol="probability", metricName= "areaUnderROC")
accuracy = evaluator.evaluate(predictions)
print("Test Error After Cross Validation = " + str(1.0 - accuracy))

Test Error After Cross Validation = 0.37073930085588924
