# w261 Final Project - Clickthrough Rate Prediction


In [1]:
# imports
import re
import ast
import time
import numpy as np
import pandas as pd
# import seaborn as sns
# import networkx as nx
import matplotlib.pyplot as plt
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
import sys
from pyspark.sql import SQLContext
from pyspark.ml.feature import VectorAssembler
import pyspark.sql.functions as F
from pyspark.mllib.util import MLUtils
from pyspark.ml.feature import StandardScaler
from pyspark.sql import SparkSession
from pyspark.ml.feature import OneHotEncoderEstimator, OneHotEncoderModel
from pyspark.ml.feature import OneHotEncoder, StringIndexer, StandardScaler, Imputer, VectorAssembler, SQLTransformer
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel
# from pyspark.mllib.evaluation import BinaryClassificationMetrics, MulticlassMetrics

from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from pyspark.mllib.util import MLUtils

from pyspark.ml.classification import RandomForestClassifier as RF
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


In [2]:
%reload_ext autoreload
%autoreload 2

In [3]:
# store path to notebook
PWD = !pwd
PWD = PWD[0]

In [4]:
import pyspark
conf = pyspark.SparkConf().setAll([ ('spark.executor.memory', '10g'), ('spark.driver.memory','10g')])
sc = pyspark.SparkContext(conf=conf)
# sqlContext = SQLContext(sc)
# 
# sc = SparkContext(appName="Final_Project")
sqlContext = SQLContext(sc)

In [5]:
# start Spark Session
app_name = "final_project_notebook"
master = "local[*]"
spark = SparkSession\
        .builder\
        .appName(app_name)\
        .master(master)\
        .getOrCreate()

In [6]:
spark

In [None]:
sc._conf.getAll()

# Skip below and move to next section once imputed file is written

In [None]:
testDF = sqlContext.read.parquet('test.parquet') # This loads a Data Frame
trainDF = sqlContext.read.parquet('train.parquet')

In [None]:
HEADER = trainDF.columns
INTEGER_FEATURES = HEADER[0:14] # These are the integer features
CATEGORICAL_FEATURES = HEADER[14:] # These are the categorical features

In [None]:
for feature in INTEGER_FEATURES[1:]:
    trainDF = trainDF.withColumn(feature, trainDF[feature].cast(StringType()))
    trainDF = trainDF.fillna({feature:''})

In [None]:
CATEGORICAL_FEATURES = INTEGER_FEATURES + CATEGORICAL_FEATURES

In [None]:
# We will track which features to eliminate from our dataframe
featuresToDrop = set()
featuresToDrop.add('x11')
featuresToDrop.add('x13')
featuresToDrop.add('x14')
featuresToDrop.add('x18')
featuresToDrop.add('x21')

In [None]:
def keepTopK(df, dftest, K, categoricalColumnstoImpute):
    for col in categoricalColumnstoImpute:
        mostCommon = df.select(col).groupby(col).count()\
                            .orderBy('count', ascending=False) \
                            .limit(K).collect()
            
        mostCommonSet = set([x[0] for x in mostCommon])
               
        df = df.withColumn(col, F.when(~df[col].isin(mostCommonSet), "RECODED").otherwise(df[col]))
        
        dftest = dftest.withColumn(col, F.when(~dftest[col].isin(mostCommonSet), "RECODED") \
                        .otherwise(dftest[col]))
    
    print("Successfully Recoded Top K Categorical Values")
    
    return (df, dftest)

In [None]:
start = time.time()
trainDF, testDF = keepTopK(trainDF, testDF, 10000, ['x5', 'x16', 'x17', 'x25', 'x29', 'x34', 'x37']) # Select 10,000 top categories
print(f"\n... Executed in {time.time() - start} seconds")

In [None]:
# Count Number of Unique Values
start = time.time()
distinct = []
for col in CATEGORICAL_FEATURES:
    distinct.append(set(trainDF.select(col).distinct().rdd.map(lambda x: x[0]).collect()))
print(f"\n... Executed in {time.time() - start} seconds")

In [None]:
for item, number in zip(CATEGORICAL_FEATURES, distinct):
    print(f"Feature: {item} had {len(number)} unique values")

In [None]:
distinctDict = dict((k, v) for k, v  in zip(CATEGORICAL_FEATURES, distinct))

In [None]:
def imputeValues(df, dftest):
    categoricalColumnstoImpute = CATEGORICAL_FEATURES[1:]
    
    # Impute categorical features
    for col in categoricalColumnstoImpute:
        mostCommon = df.select(col).groupby(col).count()\
                            .orderBy('count', ascending=False) \
                            .limit(1).collect()[0][0]
        if mostCommon == "":
            mostCommon = "EMPTY"
        
        print(f"Column {col} has most common {mostCommon}")
        
        df = df.withColumn(col, F.when((df[col].isNull() | (df[col] == '')), mostCommon) \
                                .otherwise(df[col]))
        
        dftest = dftest.withColumn(col, F.when((dftest[col].isNull() | (dftest[col] == '') | (~dftest[col].isin(distinctDict[col]))), mostCommon) \
                        .otherwise(dftest[col]))
    print("Successfully Imputed Categorical Values")
    
    # Assure there is no missing values
    for col in categoricalColumnstoImpute:
        assert df.filter(df[col].isNull()).count() == 0, f"Column {col} contains NULL value(s)"
        assert df.filter(df[col] == '').count() == 0, f"Column {col} contains empty string(s)"
    
        assert dftest.filter(dftest[col].isNull()).count() == 0, f"Column {col} contains NULL value(s)"
        assert dftest.filter(dftest[col] == '').count() == 0, f"Column {col} contains empty string(s)"
    
    print("Successfully Imputed All Values and Passed Tests")
    return (df, dftest)

In [None]:
start = time.time()
trainDF, testDF = imputeValues(trainDF, testDF)
print(f"\n... Executed in {time.time() - start} seconds")

In [None]:
start = time.time()
trainDF.write.parquet('trainImputed.parquet')
testDF.write.parquet('testImputed.parquet')
print(f"\n... Executed in {time.time() - start} seconds")

# Start here once imputed files are written

In [7]:
# start = time.time()
trainDF = sqlContext.read.parquet('trainImputed.parquet')
testDF = sqlContext.read.parquet('testImputed.parquet')
# print(f"\n... Executed in {time.time() - start} seconds")

In [None]:
# trainDF.take(1)

In [8]:
HEADER = trainDF.columns
# INTEGER_FEATURES = HEADER[0:14] # These are the integer features
CATEGORICAL_FEATURES = HEADER[1:] # These are the categorical features

In [None]:
# INTEGER_FEATURES

In [None]:
# CATEGORICAL_FEATURES

In [9]:
featuresToDrop = set()
featuresToDrop.add('x11')
featuresToDrop.add('x13')
featuresToDrop.add('x14')
featuresToDrop.add('x18')
featuresToDrop.add('x21')

In [None]:
# featuresToDrop = set()

In [None]:
# start = time.time()

# assembler = VectorAssembler(inputCols = INTEGER_FEATURES[1:], outputCol = 'integerFeatures')
# trainDF = assembler.transform(trainDF)
# testDF = assembler.transform(testDF)

# scaler = StandardScaler(inputCol="integerFeatures", outputCol="scaledFeatures",
#                         withStd=True, withMean=False)

# scalerModel = scaler.fit(trainDF)

# trainDF = scalerModel.transform(trainDF)
# testDF = scalerModel.transform(testDF)
# print(f"\n... Executed in {time.time() - start} seconds")

In [None]:
# featuresToDrop = featuresToDrop.union(set(INTEGER_FEATURES[1:]))
# featuresToDrop.add('integerFeatures')

In [10]:
# # Drop the Integer Columns
trainDF = trainDF.select([c for c in trainDF.columns if c not in featuresToDrop])
testDF = testDF.select([c for c in testDF.columns if c not in featuresToDrop])

In [None]:
trainDF.take(1)

# Apply One Hot Encoding

In [None]:
# OHE_CATEGORICAL_FEATURES = set(CATEGORICAL_FEATURES) #-featuresToDrop # Remaining Categorical Features for the One hot encoding

In [11]:
# Code help from https://www.youtube.com/watch?v=CdHuLGuU2c4

cols_in = ['x1','x2', 'x3','x4', 'x5', 'x6', 'x7', 'x8', 'x9', 'x10', 'x12',
        'x15', 'x16', 'x17', 'x19', 'x20', 'x22', 'x23', 'x24', 'x25', 'x26', 'x27',
        'x28', 'x29', 'x30', 'x31', 'x32', 'x33', 'x34', 'x35', 'x36', 'x37', 'x38', 'x39']

cols_out = ['x1_OHE','x2_OHE', 'x3_OHE','x4_OHE', 'x5_OHE', 'x6_OHE', 'x7_OHE', 'x8_OHE', 'x9_OHE', 'x10_OHE', 'x12_OHE',
        'x15_OHE', 'x16_OHE', 'x17_OHE', 'x19_OHE', 'x20_OHE', 'x22_OHE', 'x23_OHE', 'x24_OHE', 'x25_OHE', 'x26_OHE', 'x27_OHE',
        'x28_OHE', 'x29_OHE', 'x30_OHE', 'x31_OHE', 'x32_OHE', 'x33_OHE', 'x34_OHE', 'x35_OHE', 'x36_OHE', 'x37_OHE', 'x38_OHE', 'x39_OHE']

# String Indexing for categorical features
indexers = [StringIndexer(inputCol=x, outputCol=x+"_tmp") for x in cols_in]

# One-hot encoding for categorical features
encoders = [OneHotEncoder(dropLast=False, inputCol=x+"_tmp", outputCol=y) for x,y in zip(cols_in, cols_out)]
    
tmp = [[i,j] for i,j in zip(indexers, encoders)]
tmp = [i for sublist in tmp for i in sublist]


In [None]:
trainDF.take(1)

In [12]:
assembler = VectorAssembler(inputCols = cols_out, outputCol = "features")

In [13]:
labelIndexer = StringIndexer(inputCol="y", outputCol="label")

In [14]:
tmp += [assembler,labelIndexer]

In [15]:
pipeline = Pipeline(stages=tmp)

In [None]:
allData = pipeline.fit(trainDF).transform(trainDF)

In [None]:
# alltestData = pipeline.fit(testDF).transform(testDF)

In [17]:
allData.cache()

DataFrame[y: float, x1: string, x2: string, x3: string, x4: string, x5: string, x6: string, x7: string, x8: string, x9: string, x10: string, x12: string, x15: string, x16: string, x17: string, x19: string, x20: string, x22: string, x23: string, x24: string, x25: string, x26: string, x27: string, x28: string, x29: string, x30: string, x31: string, x32: string, x33: string, x34: string, x35: string, x36: string, x37: string, x38: string, x39: string, x1_tmp: double, x1_OHE: vector, x2_tmp: double, x2_OHE: vector, x3_tmp: double, x3_OHE: vector, x4_tmp: double, x4_OHE: vector, x5_tmp: double, x5_OHE: vector, x6_tmp: double, x6_OHE: vector, x7_tmp: double, x7_OHE: vector, x8_tmp: double, x8_OHE: vector, x9_tmp: double, x9_OHE: vector, x10_tmp: double, x10_OHE: vector, x12_tmp: double, x12_OHE: vector, x15_tmp: double, x15_OHE: vector, x16_tmp: double, x16_OHE: vector, x17_tmp: double, x17_OHE: vector, x19_tmp: double, x19_OHE: vector, x20_tmp: double, x20_OHE: vector, x22_tmp: double, x22_

In [18]:
trainData, validData = allData.randomSplit([0.8,0.2], seed=1)
# # print("Distribution of Positive and Negative in trainData is: ", trainData.groupBy("label").count().take(3))

## Train and predict

In [None]:
# trainData, validData = allData.randomSplit([0.8,0.2], seed=1)
# # print("Distribution of Positive and Negative in trainData is: ", trainData.groupBy("label").count().take(3))

In [19]:
randforest = RF(labelCol="label", featuresCol="features", numTrees = 100)

In [None]:
rf_fit = randforest.fit(trainData)

In [None]:
transformed = rf_fit.transform(validData)

## Chk Performance

In [None]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics as metric

In [None]:
results = transformed.select(["probability", "label"])

In [None]:
results_collect = results.collect()

In [None]:
results_list = [(float(i[0][0]),1.0-float(i[1])) for i in results_collect]

In [None]:
score = sc.parallelize(results_list)

In [None]:
metrics = metric(score)

In [None]:
print("The ROC score is (numTrees=100): ", metrics.areaUnderROC)

# Sample to test code

In [17]:
sampleDF = trainDF.sample(False, 0.0001, seed=1234) #.toPandas() # Approximately 4500 records

In [None]:
sampleDF.take(1)

In [18]:
sampleData = pipeline.fit(sampleDF).transform(sampleDF)

In [19]:
trainData, validData = sampleData.randomSplit([0.8,0.2], seed=1)
# print("Distribution of Positive and Negative in trainData is: ", trainData.groupBy("label").count().take(3))

## Train and predict

In [20]:
randforest = RF(labelCol="label", featuresCol="features", numTrees = 100)

In [21]:
rf_fit = randforest.fit(trainData)

In [22]:
transformed = rf_fit.transform(validData)

## Chk Performance

In [23]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics as metric

In [24]:
results = transformed.select(["probability", "label"])

In [25]:
results_collect = results.collect()

In [26]:
results_list = [(float(i[0][0]),1.0-float(i[1])) for i in results_collect]

In [27]:
score = sc.parallelize(results_list)

In [28]:
metrics = metric(score)

In [29]:
print("The ROC score is (numTrees=100): ", metrics.areaUnderROC)

The ROC score is (numTrees=100):  0.6816261914088002
