#Advertiseing Analytics Click Prediction: Spark ML
### Ad Impressions with click dataset

In [2]:
#read data and add a column named 'hr'
df = spark.read.format('parquet')\
          .load('/project2/train.csv')\
          .selectExpr('*','substr(hour, 7) as hr')\
          .repartition(32)

In [3]:
df.dtypes

In [4]:
from pyspark.sql.functions import *

strCols = map(lambda t: t[0], filter(lambda t: t[1]=='string', df.dtypes))
intCols = map(lambda t: t[0], filter(lambda t: t[1]=='int', df.dtypes))

#[row_idx][json_idx]
strColsCount = sorted(map(lambda c: (c, df.select(countDistinct(c)).collect()[0][0]), strCols), key=lambda x: x[1], reverse=True)
intColsCount = sorted(map(lambda c: (c, df.select(countDistinct(c)).collect()[0][0]), intCols), key=lambda x: x[1], reverse=True)



In [5]:
#number of distinct rows of str columns
display(strColsCount)

_1,_2
device_ip,261706
device_id,64742
device_model,4380
app_id,2241
site_id,2225
site_domain,2188
app_domain,143
app_category,27
hr,24
site_category,22


In [6]:
#number of distinct rows of int columns
display(intColsCount)

_1,_2
_c0,402586
C14,2088
C17,411
hour,240
C20,161
C19,65
C21,60
C16,9
C15,8
C1,7


In [7]:
#Pyspark Feature Engineering mothods
from pyspark.ml.feature import StringIndexer, VectorAssembler

#All of the columns are categorical columns except for the [click] column
maxBins = 70
categorical = list(map(lambda c: c[0], filter(lambda c: c[1] <= maxBins, strColsCount))) #the name of the columns which has <= maxBins distinct rows 
categorical += list(map(lambda c: c[0], filter(lambda c: c[1] <= maxBins, intColsCount))) #the name of the columns which has <= maxBins distinct rows
categorical.remove('click')

#apply string indexer to all of the categorical columns
#and add _idx to the column name to indicate the index of the categorical value
stringIndexers = list(map(lambda c: StringIndexer(inputCol=c, outputCol=c+'_idx'), categorical))

#assemble the put as the input to the VectorAssembler
#with the output being our features
assemblerInputs = list(map(lambda c: c+'_idx', categorical))
vectorAssembler = VectorAssembler(inputCols = assemblerInputs, outputCol ='features')

#column 'click' is the label
labelStringIndexer = StringIndexer(inputCol = 'click', outputCol = 'label')

#the stages of our ML pipeline
stages = stringIndexers +[vectorAssembler, labelStringIndexer]


In [8]:
from pyspark.ml import Pipeline

#create pipeline
pipeline=Pipeline(stages = stages)

#create transformer to add features
featurizer = pipeline.fit(df)

In [9]:
#dataframe with feature and intermediate transformation columns appended
featurizedDf = featurizer.transform(df)

In [10]:
display(featurizedDf.select('features','label').take(10))

features,label
"List(0, 12, List(1, 2, 3, 4, 8), List(10.0, 1.0, 10.0, 3.0, 1.0))",1.0
"List(0, 12, List(2, 3, 4, 11), List(2.0, 5.0, 9.0, 2.0))",0.0
"List(0, 12, List(2, 3, 4, 11), List(2.0, 5.0, 9.0, 2.0))",0.0
"List(0, 12, List(2, 3, 4, 11), List(2.0, 5.0, 9.0, 2.0))",0.0
"List(0, 12, List(2, 3, 4, 11), List(2.0, 5.0, 9.0, 2.0))",0.0
"List(0, 12, List(2, 3, 4, 11), List(2.0, 5.0, 9.0, 2.0))",0.0
"List(0, 12, List(2, 3, 4, 11), List(2.0, 5.0, 9.0, 2.0))",0.0
"List(0, 12, List(0, 1, 3, 4), List(1.0, 7.0, 7.0, 1.0))",0.0
"List(0, 12, List(2, 3, 4, 8, 11), List(12.0, 14.0, 13.0, 1.0, 3.0))",0.0
"List(0, 12, List(1, 2, 4, 8), List(2.0, 1.0, 2.0, 1.0))",0.0


In [11]:
display(featurizedDf.take(10))

_c0,id,click,hour,C1,banner_pos,site_id,site_domain,site_category,app_id,app_domain,app_category,device_id,device_ip,device_model,device_type,device_conn_type,C14,C15,C16,C17,C18,C19,C20,C21,hr,app_category_idx,hr_idx,site_category_idx,C19_idx,C21_idx,C16_idx,C15_idx,C1_idx,banner_pos_idx,device_type_idx,device_conn_type_idx,C18_idx,features,label
25126377,5.811234063447005e+18,0,14102709,1005,0,85f751fd,c4e18dd6,50e219e0,e2fcccd2,5c5a694b,0f2161f8,a99f214a,b6bd2423,1f0bc64f,1,0,6559,320,50,571,2,39,-1,32,9,1.0,1.0,0.0,1.0,7.0,0.0,0.0,0.0,0.0,0.0,0.0,2.0,"List(0, 12, List(0, 1, 3, 4, 11), List(1.0, 1.0, 1.0, 7.0, 2.0))",0.0
28296714,1.0126316670208449e+19,1,14102808,1005,1,d9750ee7,98572c79,f028772b,ecad2386,7801e8d9,07d7df22,a99f214a,d2a673f0,a5bce124,1,0,17753,320,50,1993,2,1063,100083,33,8,0.0,5.0,1.0,12.0,8.0,0.0,0.0,0.0,1.0,0.0,0.0,2.0,"List(0, 12, List(1, 2, 3, 4, 8, 11), List(5.0, 1.0, 12.0, 8.0, 1.0, 2.0))",1.0
30897380,1.5390115192076245e+19,0,14102816,1005,0,6399eda6,968765cd,f028772b,ecad2386,7801e8d9,07d7df22,a99f214a,199f8548,b45673ff,1,0,19772,320,50,2227,0,935,100077,48,16,0.0,9.0,1.0,10.0,3.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,"List(0, 12, List(1, 2, 3, 4), List(9.0, 1.0, 10.0, 3.0))",0.0
22875403,1.1240748033462151e+19,0,14102616,1005,0,85f751fd,c4e18dd6,50e219e0,e2fcccd2,5c5a694b,0f2161f8,60c58e05,2b237a09,fce66524,1,0,4687,320,50,423,2,39,100148,32,16,1.0,9.0,0.0,1.0,7.0,0.0,0.0,0.0,0.0,0.0,0.0,2.0,"List(0, 12, List(0, 1, 3, 4, 11), List(1.0, 9.0, 1.0, 7.0, 2.0))",0.0
25054897,1.457605447424814e+19,0,14102709,1005,0,1fbe01fe,f3845767,28905ebd,ecad2386,7801e8d9,07d7df22,a99f214a,00fa23ec,6332421a,1,0,20108,320,50,2299,2,1327,100084,52,9,0.0,1.0,2.0,5.0,9.0,0.0,0.0,0.0,0.0,0.0,0.0,2.0,"List(0, 12, List(1, 2, 3, 4, 11), List(1.0, 2.0, 5.0, 9.0, 2.0))",0.0
25017042,1.0473046185841204e+18,0,14102709,1005,1,d9750ee7,98572c79,f028772b,ecad2386,7801e8d9,07d7df22,a99f214a,83518c23,9e3836ff,1,0,17753,320,50,1993,2,1063,100084,33,9,0.0,1.0,1.0,12.0,8.0,0.0,0.0,0.0,1.0,0.0,0.0,2.0,"List(0, 12, List(1, 2, 3, 4, 8, 11), List(1.0, 1.0, 12.0, 8.0, 1.0, 2.0))",0.0
26081347,7.09631819755699e+18,0,14102716,1005,0,29229f8e,a10eb148,f028772b,ecad2386,7801e8d9,07d7df22,a99f214a,db1ef0f5,68b6db2c,1,0,6563,320,50,572,2,39,-1,32,16,0.0,9.0,1.0,1.0,7.0,0.0,0.0,0.0,0.0,0.0,0.0,2.0,"List(0, 12, List(1, 2, 3, 4, 11), List(9.0, 1.0, 1.0, 7.0, 2.0))",0.0
21861636,4.1762386136346527e+18,0,14102611,1005,0,85f751fd,c4e18dd6,50e219e0,77a5cd0d,7801e8d9,0f2161f8,0bd9f41d,8d65fdd2,5db079b5,1,2,11189,320,50,1149,3,47,-1,23,11,1.0,7.0,0.0,4.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,1.0,"List(0, 12, List(0, 1, 3, 10, 11), List(1.0, 7.0, 4.0, 1.0, 1.0))",0.0
30895872,1.5299886467163593e+19,1,14102816,1005,0,85f751fd,c4e18dd6,50e219e0,e2fcccd2,5c5a694b,0f2161f8,a99f214a,d0b2accf,be6db1d7,1,0,4687,320,50,423,2,39,100148,32,16,1.0,9.0,0.0,1.0,7.0,0.0,0.0,0.0,0.0,0.0,0.0,2.0,"List(0, 12, List(0, 1, 3, 4, 11), List(1.0, 9.0, 1.0, 7.0, 2.0))",1.0
26530004,8.416655672130896e+18,0,14102718,1005,0,85f751fd,c4e18dd6,50e219e0,51cedd4e,aefc06bd,0f2161f8,a99f214a,7a500e1a,fd235d3a,1,0,16685,320,50,1092,3,811,100156,61,18,1.0,14.0,0.0,33.0,5.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,"List(0, 12, List(0, 1, 3, 4, 11), List(1.0, 14.0, 33.0, 5.0, 1.0))",0.0


In [12]:
train, test = featurizedDf.select(['label', 'features', 'hr'])\
                          .randomSplit([0.7,0.3], 1)
train.cache()
test.cache()


In [13]:
from pyspark.ml.classification import GBTClassifier

classifier = GBTClassifier(labelCol = 'label', featuresCol='features', maxBins=maxBins, maxDepth=10, maxIter=10)
model = classifier.fit(train)

In [14]:
model

In [15]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

predictions = model.transform(test)

evaluator= BinaryClassificationEvaluator(rawPredictionCol='rawPrediction', metricName='areaUnderROC')
print(evaluator.evaluate(predictions))

In [16]:
display(predictions.head(10))

label,features,hr,rawPrediction,probability,prediction
0.0,"List(0, 12, List(0, 1, 3, 4), List(1.0, 1.0, 7.0, 22.0))",9,"List(1, 2, List(), List(0.837384872590142, -0.837384872590142))","List(1, 2, List(), List(0.842210716408703, 0.15778928359129696))",0.0
0.0,"List(0, 12, List(0, 1, 3, 4), List(1.0, 1.0, 24.0, 1.0))",9,"List(1, 2, List(), List(0.971340613896726, -0.971340613896726))","List(1, 2, List(), List(0.8746464089000109, 0.12535359109998911))",0.0
0.0,"List(0, 12, List(0, 1, 3, 4), List(1.0, 1.0, 24.0, 1.0))",9,"List(1, 2, List(), List(0.971340613896726, -0.971340613896726))","List(1, 2, List(), List(0.8746464089000109, 0.12535359109998911))",0.0
0.0,"List(0, 12, List(0, 1, 3, 4), List(1.0, 1.0, 24.0, 1.0))",9,"List(1, 2, List(), List(0.971340613896726, -0.971340613896726))","List(1, 2, List(), List(0.8746464089000109, 0.12535359109998911))",0.0
0.0,"List(0, 12, List(0, 1, 3, 4), List(1.0, 1.0, 24.0, 1.0))",9,"List(1, 2, List(), List(0.971340613896726, -0.971340613896726))","List(1, 2, List(), List(0.8746464089000109, 0.12535359109998911))",0.0
0.0,"List(0, 12, List(0, 1, 3, 4), List(1.0, 2.0, 2.0, 1.0))",12,"List(1, 2, List(), List(0.58941164602503, -0.58941164602503))","List(1, 2, List(), List(0.7647361625846725, 0.23526383741532753))",0.0
0.0,"List(0, 12, List(0, 1, 3, 4), List(1.0, 2.0, 11.0, 3.0))",12,"List(1, 2, List(), List(0.8729720737492569, -0.8729720737492569))","List(1, 2, List(), List(0.8514405098561602, 0.14855949014383985))",0.0
0.0,"List(0, 12, List(0, 1, 3, 4), List(1.0, 2.0, 14.0, 24.0))",12,"List(1, 2, List(), List(1.1738478002400186, -1.1738478002400186))","List(1, 2, List(), List(0.9127508872387476, 0.08724911276125236))",0.0
0.0,"List(0, 12, List(0, 1, 3, 4), List(1.0, 3.0, 11.0, 3.0))",14,"List(1, 2, List(), List(0.7678970063635673, -0.7678970063635673))","List(1, 2, List(), List(0.8228524663786096, 0.17714753362139035))",0.0
0.0,"List(0, 12, List(0, 1, 3, 4), List(1.0, 4.0, 2.0, 1.0))",10,"List(1, 2, List(), List(0.6408612795713152, -0.6408612795713152))","List(1, 2, List(), List(0.7827428514345975, 0.21725714856540246))",0.0


In [17]:
predictions.createOrReplaceTempView('predictions_t')

In [18]:
%sql
describe predictions_t

col_name,data_type,comment
label,double,
features,vector,
hr,string,
rawPrediction,vector,
probability,vector,
prediction,double,


In [19]:
%sql
select sum( case when prediction=label then 1 else 0 end ) * 1.0/ count(*) as accuracy
from predictions_t

accuracy
0.8301834032403981
