# Commerce clickstream ML prediction
####Dataset download > [Ad impressions with clicks dataset](https://www.kaggle.com/c/avazu-ctr-prediction/data)

The use case here I am taking is of a Commerce company that has an ecommerce website as well as traditional retail stores. They want to analyse the online clickstream data to better understand their customers. We will use a sample clickstream dataset from the data science website Kaggle.  We will start with the Ingest and Exploration of data. Next we create features and train and evaluate the ML model. We will join this data with Dynamics products table to try to analyse if products influence the ML model result. The goal of this workflow is to create a machine learning model that, given a new ad impression, predicts whether or not there will be a click. We will also do features exploration to see what features influence the prediction most. We have a big dataset so we will go with supervised learning which relies on historicl data to build a model to predict the result of the next observation.

Clickstream data is data about how users interact with your ecommerce websites, what ads they click, what products they view, which pages they spend most time on. It is behavioural data that can give you insights into your products and customers so you can better market to your customer base.

The notebook is written in PySpark and executed on Databricks.

Note- In the dataset download from Kaggle, train.csv given is 40 million rows, a 6 GB uncompressed file! Excel only shows 1 million, and since i wanted to add a product column, i saved excel as a smaller set of 1 million rows. I filled with some random product numbers taken from Dynamics to be able to make joins.

In [0]:
# Reading clicks csv files in a dataframe
file_path = "dbfs:/mnt/commercedata/clickstream-ad-ML/adtech/impression/csv/train_1M_p.csv"
df_clicks = spark.read.csv(file_path, header=True, inferSchema=True)
display(df_clicks.limit(10))

id,click,hour,C1,banner_pos,site_id,site_domain,site_category,product,app_id,app_domain,app_category,device_id,device_ip,device_model,device_type,device_conn_type,C14,C15,C16,C17,C18,C19,C20,C21
1e+18,0,14102100,1005,0,1fbe01fe,f3845767,28905ebd,M0010,ecad2386,7801e8d9,07d7df22,a99f214a,ddd2926e,44956a24,1,2,15706,320,50,1722,0,35,-1,79
1e+19,0,14102100,1005,0,1fbe01fe,f3845767,28905ebd,M0010,ecad2386,7801e8d9,07d7df22,a99f214a,96809ac8,711ee120,1,0,15704,320,50,1722,0,35,100084,79
1e+19,0,14102100,1005,0,1fbe01fe,f3845767,28905ebd,M0010,ecad2386,7801e8d9,07d7df22,a99f214a,b3cf8def,8a4875bd,1,0,15704,320,50,1722,0,35,100084,79
1e+19,0,14102100,1005,0,1fbe01fe,f3845767,28905ebd,M0010,ecad2386,7801e8d9,07d7df22,a99f214a,e8275b8f,6332421a,1,0,15706,320,50,1722,0,35,100084,79
1e+19,0,14102100,1005,1,fe8cc448,9166c161,0569f928,M0008,ecad2386,7801e8d9,07d7df22,a99f214a,9644d0bf,779d90c2,1,0,18993,320,50,2161,0,35,-1,157
1e+19,0,14102100,1005,0,d6137915,bb1ef334,f028772b,D0002,ecad2386,7801e8d9,07d7df22,a99f214a,05241af0,8a4875bd,1,0,16920,320,50,1899,0,431,100077,117
1e+19,0,14102100,1005,0,8fda644b,25d4cfcd,f028772b,D0002,ecad2386,7801e8d9,07d7df22,a99f214a,b264c159,be6db1d7,1,0,20362,320,50,2333,0,39,-1,157
1e+19,0,14102100,1005,1,e151e245,7e091613,f028772b,D0002,ecad2386,7801e8d9,07d7df22,a99f214a,e6f67278,be74e6fe,1,0,20632,320,50,2374,3,39,-1,23
1e+19,1,14102100,1005,0,1fbe01fe,f3845767,28905ebd,M0010,ecad2386,7801e8d9,07d7df22,a99f214a,37e8da74,5db079b5,1,2,15707,320,50,1722,0,35,-1,79
1e+19,0,14102100,1002,0,84c7ba46,c4e18dd6,50e219e0,M0001,ecad2386,7801e8d9,07d7df22,c357dbff,f1ac7184,373ecbe6,0,0,21689,320,50,2496,3,167,100191,23


In [0]:
df_clicks.count()

In [0]:
df_clicks.printSchema()

In [0]:
display(df_clicks.describe())

summary,id,click,hour,C1,banner_pos,site_id,site_domain,site_category,product,app_id,app_domain,app_category,device_id,device_ip,device_model,device_type,device_conn_type,C14,C15,C16,C17,C18,C19,C20,C21
count,1048575.0,1048575.0,1048575.0,1048575.0,1048575.0,1048575,1048575,1048575,1048575,1048575,1048575,1048575,1048575,1048575,1048575,1048575.0,1048575.0,1048575.0,1048575.0,1048575.0,1048575.0,1048575.0,1048575.0,1048575.0,1048575.0
mean,9.625753648290011e+18,0.1597386929881029,14102102.69341058,1005.0856181007558,0.2299806880766755,Infinity,Infinity,Infinity,,Infinity,Infinity,,Infinity,Infinity,Infinity,1.0241222611639609,0.2262184393104928,18291.969587297044,318.96731087428174,56.529631166106384,2044.9395245929,1.4729075173449682,190.75181746656176,45400.48523376964,69.428166797797
stddev,5.325281666798175e+18,0.3663637140262458,1.544329711959354,1.1502600956085731,0.4644457006515146,,,,,,,,,,,0.4487517255174427,0.6708972312755451,3528.096967822448,19.5208167602368,36.6609825097274,443.6807942965722,1.3633189585312968,273.99519872324294,49834.27263423589,38.42995991531878
min,9980000000000.0,0.0,14102100.0,1001.0,0.0,00255fb4,005b495a,0569f928,D0001,000d6291,002e4064,07d7df22,0.00E+00,00001c3b,0009f4d7,0.0,0.0,375.0,120.0,20.0,112.0,0.0,33.0,-1.0,13.0
max,1.84e+19,1.0,14102105.0,1012.0,7.0,fffe8e1c,fff602a2,f66779e6,M0017,fff4213a,fd68cbd8,fc6fa53d,ffffb0fc,fffffaa3,ffeafe15,5.0,5.0,21705.0,1024.0,1024.0,2497.0,3.0,1835.0,100248.0,195.0


In [0]:
# taking too long to execute, rewrite
""" 
numeric_data = df_clicks.select(numeric_features).toPandas()
axs = pd.plotting.scatter_matrix(numeric_data, figsize=(8, 8));
n = len(numeric_data.columns)

for i in range(n):
    v = axs[i, 0]
    v.yaxis.label.set_rotation(0)
    v.yaxis.label.set_ha('right')
    v.set_yticks(())
    h = axs[n-1, i]
    h.xaxis.label.set_
"""

In [0]:
# create a sql view
df_clicks.createOrReplaceTempView("vw_clicks")

In [0]:
%sql describe vw_clicks

col_name,data_type,comment
id,double,
click,int,
hour,int,
C1,int,
banner_pos,int,
site_id,string,
site_domain,string,
site_category,string,
product,string,
app_id,string,


In [0]:
display(dbutils.fs.ls("dbfs:/mnt/dynamics365-financeandoperations/d365commerce.sandbox.operations.dynamics.com/Tables/SupplyChain/ProductInformationManagement/Main/EcoResProduct"))

path,name,size,modificationTime
dbfs:/mnt/dynamics365-financeandoperations/d365commerce.sandbox.operations.dynamics.com/Tables/SupplyChain/ProductInformationManagement/Main/EcoResProduct/ECORESPRODUCT_00001.csv,ECORESPRODUCT_00001.csv,1392132,1645207720000
dbfs:/mnt/dynamics365-financeandoperations/d365commerce.sandbox.operations.dynamics.com/Tables/SupplyChain/ProductInformationManagement/Main/EcoResProduct/index.json,index.json,155,1645207720000


In [0]:
# Reading product csv files in a dataframe
df_product= spark.read.format("csv").option("header",False).load("dbfs:/mnt/dynamics365-financeandoperations/d365commerce.sandbox.operations.dynamics.com/Tables/SupplyChain/ProductInformationManagement/Main/EcoResProduct/ECORESPRODUCT_00001.csv")

display(df_product.limit(10))

_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7,_c8,_c9,_c10,_c11,_c12,_c13,_c14,_c15,_c16,_c17,_c18,_c19,_c20,_c21,_c22,_c23,_c24,_c25
22565421183,,,2022-02-03T02:00:46.9157225Z,22565421183,,,,,,,0,D0001,13678,0,1,MidRangeSpeaker,,0,0,5637144576,0,,0,0,0
22565421184,,,2022-02-03T02:00:46.9157463Z,22565421184,,,,,,,0,D0002,13678,0,1,Cabinet,,0,0,5637144576,0,,0,0,0
22565421185,,,2022-02-03T02:00:46.9157627Z,22565421185,,,,,,,0,D0003,13678,0,1,StandardSpeaker,,0,0,5637144576,0,,0,0,0
22565421187,,,2022-02-03T02:00:46.9157762Z,22565421187,,,,,,,0,L0001,13678,0,1,MidRangeSpeaker2,,0,0,5637144576,0,,0,0,0
22565421188,,,2022-02-03T02:00:46.9157886Z,22565421188,,,,,,,0,M0001,13678,0,1,WiringHarness,,0,0,5637144576,0,,0,0,0
22565421189,,,2022-02-03T02:00:46.9158037Z,22565421189,,,,,,,0,M0002,13678,0,1,MidRangeSpeakerUnit,,0,0,5637144576,0,,0,0,0
22565421190,,,2022-02-03T02:00:46.9158168Z,22565421190,,,,,,,0,M0003,13678,0,1,TweeterSpeakerUnit,,0,0,5637144576,0,,0,0,0
22565421191,,,2022-02-03T02:00:46.9158307Z,22565421191,,,,,,,0,M0004,13678,0,1,Crossover,,0,0,5637144576,0,,0,0,0
22565421192,,,2022-02-03T02:00:46.9158424Z,22565421192,,,,,,,0,M0005,13678,0,1,Enclosure,,0,0,5637144576,0,,0,0,0
22565421193,,,2022-02-03T02:00:46.9158548Z,22565421193,,,,,,,0,M0006,13678,0,1,BindingPosts,,0,0,5637144576,0,,0,0,0


In [0]:
# select only relevant columns and create a new dataframe
df_productSmall =  df_product.selectExpr(
    '_c12 AS ProductId',
    '_c16 AS ProductName')

display(df_productSmall.limit(10))

ProductId,ProductName
D0001,MidRangeSpeaker
D0002,Cabinet
D0003,StandardSpeaker
L0001,MidRangeSpeaker2
M0001,WiringHarness
M0002,MidRangeSpeakerUnit
M0003,TweeterSpeakerUnit
M0004,Crossover
M0005,Enclosure
M0006,BindingPosts


In [0]:
# create a view
df_productSmall.createOrReplaceTempView("vw_Products")

In [0]:
%sql
select * from vw_Products limit 10

ProductId,ProductName
D0001,MidRangeSpeaker
D0002,Cabinet
D0003,StandardSpeaker
L0001,MidRangeSpeaker2
M0001,WiringHarness
M0002,MidRangeSpeakerUnit
M0003,TweeterSpeakerUnit
M0004,Crossover
M0005,Enclosure
M0006,BindingPosts


In [0]:
%sql
-- join clicks and products view
select s.*, p.ProductName as product_name
                     from vw_clicks s 
                     left join vw_Products p on s.product = p.ProductId limit 10

id,click,hour,C1,banner_pos,site_id,site_domain,site_category,product,app_id,app_domain,app_category,device_id,device_ip,device_model,device_type,device_conn_type,C14,C15,C16,C17,C18,C19,C20,C21,product_name
1e+18,0,14102100,1005,0,1fbe01fe,f3845767,28905ebd,M0010,ecad2386,7801e8d9,07d7df22,a99f214a,ddd2926e,44956a24,1,2,15706,320,50,1722,0,35,-1,79,CarAudioUnit
1e+19,0,14102100,1005,0,1fbe01fe,f3845767,28905ebd,M0010,ecad2386,7801e8d9,07d7df22,a99f214a,96809ac8,711ee120,1,0,15704,320,50,1722,0,35,100084,79,CarAudioUnit
1e+19,0,14102100,1005,0,1fbe01fe,f3845767,28905ebd,M0010,ecad2386,7801e8d9,07d7df22,a99f214a,b3cf8def,8a4875bd,1,0,15704,320,50,1722,0,35,100084,79,CarAudioUnit
1e+19,0,14102100,1005,0,1fbe01fe,f3845767,28905ebd,M0010,ecad2386,7801e8d9,07d7df22,a99f214a,e8275b8f,6332421a,1,0,15706,320,50,1722,0,35,100084,79,CarAudioUnit
1e+19,0,14102100,1005,1,fe8cc448,9166c161,0569f928,M0008,ecad2386,7801e8d9,07d7df22,a99f214a,9644d0bf,779d90c2,1,0,18993,320,50,2161,0,35,-1,157,HighEndCabinet
1e+19,0,14102100,1005,0,d6137915,bb1ef334,f028772b,D0002,ecad2386,7801e8d9,07d7df22,a99f214a,05241af0,8a4875bd,1,0,16920,320,50,1899,0,431,100077,117,Cabinet
1e+19,0,14102100,1005,0,8fda644b,25d4cfcd,f028772b,D0002,ecad2386,7801e8d9,07d7df22,a99f214a,b264c159,be6db1d7,1,0,20362,320,50,2333,0,39,-1,157,Cabinet
1e+19,0,14102100,1005,1,e151e245,7e091613,f028772b,D0002,ecad2386,7801e8d9,07d7df22,a99f214a,e6f67278,be74e6fe,1,0,20632,320,50,2374,3,39,-1,23,Cabinet
1e+19,1,14102100,1005,0,1fbe01fe,f3845767,28905ebd,M0010,ecad2386,7801e8d9,07d7df22,a99f214a,37e8da74,5db079b5,1,2,15707,320,50,1722,0,35,-1,79,CarAudioUnit
1e+19,0,14102100,1002,0,84c7ba46,c4e18dd6,50e219e0,M0001,ecad2386,7801e8d9,07d7df22,c357dbff,f1ac7184,373ecbe6,0,0,21689,320,50,2496,3,167,100191,23,WiringHarness


Next lets do some Exploratory Data Analysis, that is analyse relationships between features, to get a sense of what could be influencing someone clicking an ad.

In [0]:
%sql 
-- different banner positions of ads. Where they are placed on a page. We can see 8 types
select banner_pos, count(1)
from vw_clicks
group by 1 order by 1

banner_pos,count(1)
0,815396
1,230987
2,349
3,2
4,50
5,1638
7,153


In [0]:
%sql 
-- total number of clicks vs no clicks for each banner pos
select banner_pos,
sum(case when click = 1 then 1 else 0 end) as click,
sum(case when click = 0 then 1 else 0 end) as no_click
from vw_clicks group by 1 order by 1

banner_pos,click,no_click
0,123062,692334
1,44231,186756
2,49,300
3,0,2
4,7,43
5,139,1499
7,10,143


In [0]:
%sql 
-- CTR is the number of clicks that your ad receives divided by the number of times that your ad is shown: clicks ÷ impressions = CTR
-- CTR value for each banner pos. Number 3 is empty which means that position is never clicked. It could be faulty data too. Number is haighest CTR, so that is a popular one.
select banner_pos,
sum(case when click = 1 then 1 else 0 end) / (count(1) * 1.0) as CTR
from vw_clicks group by 1 order by 1

banner_pos,CTR
0,0.1509229871130101
1,0.1914869667989973
2,0.1404011461318051
3,0.0
4,0.14
5,0.0848595848595848
7,0.065359477124183


In [0]:
%sql 
-- different kinds of devices used
-- Device type 1 is most used by people who visit the site. 
select device_type, count(1)
from vw_clicks
group by 1 order by 1

device_type,count(1)
0,32620
1,997101
4,17502
5,1352


In [0]:
%sql 
-- total number of clicks vs no clicks for each device
-- though device 1 is most used but has highest no clicks too
select device_type,
sum(case when click = 1 then 1 else 0 end) as click,
sum(case when click = 0 then 1 else 0 end) as no_click
from vw_clicks group by 1 order by 1

device_type,click,no_click
0,7507,25113
1,158570,838531
4,1309,16193
5,112,1240


In [0]:
%sql 
-- CTR value for each device type. Number 4 is least and Number 0 is highest, highest chances are with device 0. For number 4, maybe company should stop showing ads and save some money.
select device_type,
sum(case when click = 1 then 1 else 0 end) / (count(1) * 1.0) as CTR
from vw_clicks group by 1 order by 1

device_type,CTR
0,0.2301348865726548
1,0.1590310309587494
4,0.0747914524054393
5,0.0828402366863905


In [0]:
%sql 
-- product M0001 is really popular in this clickstream dataset. So customers are spending lot of time looking at that product. Next are D0002 and M0010.
select product, count(1) as count
from vw_clicks
group by 1 having count > 200 order by count desc

product,count
M0001,381761
D0002,273460
M0010,271621
M0007,77926
D0003,9833
M0002,7993
M0003,7431
L0001,4980
M0008,4731
M0013,4185


In [0]:
%sql 
-- total number of clicks vs no clicks for each product
-- M0010 gets the highest of clicks
select product,
sum(case when click = 1 then 1 else 0 end) as click,
sum(case when click = 0 then 1 else 0 end) as no_click
from vw_clicks group by 1 order by 3 desc

product,click,no_click
M0001,40736,341025
D0002,50178,223282
M0010,55883,215738
M0007,17439,60487
D0003,224,9609
M0002,307,7686
M0003,795,6636
M0008,138,4593
L0001,641,4339
M0013,672,3513


In [0]:
%sql 
-- CTR of different products. M0006 has the highest CTR. % wise this product gets most clicks, 30%
select product,
sum(case when click = 1 then 1 else 0 end) / (count(1) * 1.0) as CTR
from vw_clicks group by 1 order by 2 desc

product,CTR
M0006,0.3076923076923076
M0007,0.2237892359417909
M0010,0.2057388788053942
D0002,0.183493015431873
M0013,0.1605734767025089
M0009,0.158894645941278
L0001,0.128714859437751
M0005,0.12
M0015,0.1174377224199288
M0014,0.1139601139601139


In [0]:
%sql 
select substr(hour, 7) as hour, 
count(1)
from vw_clicks 
group by 1 order by 1

hour,count(1)
0,119006
1,137442
2,207471
3,193355
4,264711
5,126590


In [0]:
%sql
-- total number of clicks vs no clicks for hour of day
select substr(hour, 7) as hour,
sum(case when click = 1 then 1 else 0 end) as click,
sum(case when click = 0 then 1 else 0 end) as no_click
from vw_clicks group by 1 order by 1

hour,click,no_click
0,20792,98214
1,23873,113569
2,31265,176206
3,32830,160525
4,40026,224685
5,18712,107878


In [0]:
%sql 
select substr(hour, 7) as hour,
sum(case when click = 1 then 1 else 0 end) / (count(1) * 1.0) as CTR
from vw_clicks group by 1 order by 1

hour,CTR
0,0.1747138799724383
1,0.1736950859271547
2,0.1506957598893339
3,0.1697913164903933
4,0.1512064100094064
5,0.1478157832372225


In [0]:
%sql 

select count(1) as total,

count(distinct C1) as C1,
count(distinct banner_pos) as banner_pos,
count(distinct site_id) as site_id,
count(distinct site_domain) as site_domain,
count(distinct site_category) as site_category,
count(distinct product) as product,
count(distinct app_id) as app_id,
count(distinct app_domain) as app_domain,
count(distinct app_category) as app_category,
count(distinct device_id) as device_id,
count(distinct device_ip) as device_ip,
count(distinct device_model) as device_model,
count(distinct device_type) as device_type,
count(distinct device_conn_type) as device_conn_type,
count(distinct C14) as C14,
count(distinct C15) as C15,
count(distinct C16) as C16,
count(distinct C17) as C17,
count(distinct C18) as C18,
count(distinct C19) as C19,
count(distinct C20) as C20,
count(distinct C21) as C21

from vw_clicks

total,C1,banner_pos,site_id,site_domain,site_category,product,app_id,app_domain,app_category,device_id,device_ip,device_model,device_type,device_conn_type,C14,C15,C16,C17,C18,C19,C20,C21
1048575,7,7,2106,2064,21,21,2360,159,23,87398,325275,4619,4,4,608,8,9,162,4,41,162,35


In [0]:
display(df_clicks.describe())

summary,id,click,hour,C1,banner_pos,site_id,site_domain,site_category,product,app_id,app_domain,app_category,device_id,device_ip,device_model,device_type,device_conn_type,C14,C15,C16,C17,C18,C19,C20,C21
count,1048575.0,1048575.0,1048575.0,1048575.0,1048575.0,1048575,1048575,1048575,1048575,1048575,1048575,1048575,1048575,1048575,1048575,1048575.0,1048575.0,1048575.0,1048575.0,1048575.0,1048575.0,1048575.0,1048575.0,1048575.0,1048575.0
mean,9.625753648290015e+18,0.1597386929881029,14102102.69341058,1005.0856181007558,0.2299806880766755,Infinity,Infinity,Infinity,,Infinity,Infinity,,Infinity,Infinity,Infinity,1.0241222611639609,0.2262184393104928,18291.969587297044,318.96731087428174,56.529631166106384,2044.9395245929,1.4729075173449682,190.75181746656176,45400.48523376964,69.428166797797
stddev,5.325281666798175e+18,0.3663637140262458,1.5443297118823776,1.1502600956085722,0.4644457006515146,,,,,,,,,,,0.4487517255174427,0.6708972312755451,3528.096967822448,19.520816760236805,36.6609825097274,443.6807942965722,1.363318958531297,273.99519872324294,49834.27263423589,38.42995991531878
min,9980000000000.0,0.0,14102100.0,1001.0,0.0,00255fb4,005b495a,0569f928,D0001,000d6291,002e4064,07d7df22,0.00E+00,00001c3b,0009f4d7,0.0,0.0,375.0,120.0,20.0,112.0,0.0,33.0,-1.0,13.0
max,1.84e+19,1.0,14102105.0,1012.0,7.0,fffe8e1c,fff602a2,f66779e6,M0017,fff4213a,fd68cbd8,fc6fa53d,ffffb0fc,fffffaa3,ffeafe15,5.0,5.0,21705.0,1024.0,1024.0,2497.0,3.0,1835.0,100248.0,195.0


In [0]:
# Drop site_category column
# we have 1 to 1 mapping with our product column so its highly correlated. We want to avoid correlation and use features that have no bearing on each other to get the best prediction.
df_clicks1 = df_clicks.drop('site_category')
df_clicks1.printSchema()

In [0]:
# extract exact hour from hour column into a new hr column
# we will add hr as a new feature
df_clicks1 =  df_clicks1.selectExpr("*",
    'substr(hour, 7) as hr')

display(df_clicks1.limit(10))

id,click,hour,C1,banner_pos,site_id,site_domain,product,app_id,app_domain,app_category,device_id,device_ip,device_model,device_type,device_conn_type,C14,C15,C16,C17,C18,C19,C20,C21,hr,hr.1
1e+18,0,14102100,1005,0,1fbe01fe,f3845767,M0010,ecad2386,7801e8d9,07d7df22,a99f214a,ddd2926e,44956a24,1,2,15706,320,50,1722,0,35,-1,79,0,0
1e+19,0,14102100,1005,0,1fbe01fe,f3845767,M0010,ecad2386,7801e8d9,07d7df22,a99f214a,96809ac8,711ee120,1,0,15704,320,50,1722,0,35,100084,79,0,0
1e+19,0,14102100,1005,0,1fbe01fe,f3845767,M0010,ecad2386,7801e8d9,07d7df22,a99f214a,b3cf8def,8a4875bd,1,0,15704,320,50,1722,0,35,100084,79,0,0
1e+19,0,14102100,1005,0,1fbe01fe,f3845767,M0010,ecad2386,7801e8d9,07d7df22,a99f214a,e8275b8f,6332421a,1,0,15706,320,50,1722,0,35,100084,79,0,0
1e+19,0,14102100,1005,1,fe8cc448,9166c161,M0008,ecad2386,7801e8d9,07d7df22,a99f214a,9644d0bf,779d90c2,1,0,18993,320,50,2161,0,35,-1,157,0,0
1e+19,0,14102100,1005,0,d6137915,bb1ef334,D0002,ecad2386,7801e8d9,07d7df22,a99f214a,05241af0,8a4875bd,1,0,16920,320,50,1899,0,431,100077,117,0,0
1e+19,0,14102100,1005,0,8fda644b,25d4cfcd,D0002,ecad2386,7801e8d9,07d7df22,a99f214a,b264c159,be6db1d7,1,0,20362,320,50,2333,0,39,-1,157,0,0
1e+19,0,14102100,1005,1,e151e245,7e091613,D0002,ecad2386,7801e8d9,07d7df22,a99f214a,e6f67278,be74e6fe,1,0,20632,320,50,2374,3,39,-1,23,0,0
1e+19,1,14102100,1005,0,1fbe01fe,f3845767,M0010,ecad2386,7801e8d9,07d7df22,a99f214a,37e8da74,5db079b5,1,2,15707,320,50,1722,0,35,-1,79,0,0
1e+19,0,14102100,1002,0,84c7ba46,c4e18dd6,M0001,ecad2386,7801e8d9,07d7df22,c357dbff,f1ac7184,373ecbe6,0,0,21689,320,50,2496,3,167,100191,23,0,0


In [0]:
from pyspark.sql.functions import *

strCols = map(lambda t: t[0], __builtin__.filter(lambda t: t[1] == 'string', df_clicks1.dtypes))
intCols = map(lambda t: t[0], __builtin__.filter(lambda t: t[1] == 'int', df_clicks1.dtypes))

# [row_idx][json_idx]
strColsCount = sorted(map(lambda c: (c, df_clicks1.select(countDistinct(c)).collect()[0][0]), strCols), key=lambda x: x[1], reverse=True)
intColsCount = sorted(map(lambda c: (c, df_clicks1.select(countDistinct(c)).collect()[0][0]), intCols), key=lambda x: x[1], reverse=True)

In [0]:
# distinct counts for str columns
display(strColsCount)

_1,_2
device_ip,325275
device_id,87398
device_model,4619
app_id,2360
site_id,2106
site_domain,2064
app_domain,159
app_category,23
product,21
hr,6


In [0]:
# distinct counts for int columns
display(intColsCount)

_1,_2
C14,608
C17,162
C20,162
C19,41
C21,35
C16,9
C15,8
C1,7
banner_pos,7
hour,6


Below code is taken from databricks’ official site and it indexes each categorical column using the StringIndexer, then converts the indexed categories into one-hot encoded variables. The resulting output has the binary vectors appended to the end of each row. We use the StringIndexer again to encode our labels to label indices. Next, we use the VectorAssembler to combine all the feature columns into a single vector column.

Once we have familiarized ourselves with our data, we proceed to the machine learning phase, where we convert our data into features for input to a machine learning algorithm and produce a trained model with which we can predict. Because Spark MLlib algorithms take a column of feature vectors of doubles as input, a typical feature engineering workflow includes:

1. Identifying numeric and categorical features
2. String indexing
3. Assembling them all into a sparse vector

In our use of GBTClassifer, while we use string indexer but we are not applying One Hot Encoder (OHE).

When using StringIndexer, categorical features are kept as k-ary categorical features. A tree node will test if feature X has a value in {subset of categories}. With both StringIndexer + OHE: Your categorical features are turned into a bunch of binary features. A tree node will test if feature X = category a vs. all the other categories (one vs. rest test).

When using only StringIndexer, the benefits include:

1. There are fewer features to choose
2. Each node’s test is more expressive than with binary 1-vs-rest features

Therefore, for tree based methods, it is preferable to not use OHE as it is a less expressive test and it takes up more space. But for non-tree-based algorithms such as like linear regression, you must use OHE or else the model will impose a false and misleading ordering on categories.

In [0]:
# Include PySpark Feature Engineering methods
from pyspark.ml.feature import StringIndexer, VectorAssembler

# All of the columns (string or integer) are categorical columns
#  except for the [click] column
maxBins = 70
categorical = list(map(lambda c: c[0], __builtin__.filter(lambda c: c[1] <= maxBins, strColsCount)))
categorical += list(map(lambda c: c[0], __builtin__.filter(lambda c: c[1] <= maxBins, intColsCount)))
categorical.remove('click')

# Apply string indexer to all of the categorical columns
#  And add _idx to the column name to indicate the index of the categorical value
stringIndexers = list(map(lambda c: StringIndexer(inputCol = c, outputCol = c + "_idx"), categorical))

# Assemble the put as the input to the VectorAssembler 
#   with the output being our features
assemblerInputs = list(map(lambda c: c + "_idx", categorical))
vectorAssembler = VectorAssembler(inputCols = assemblerInputs, outputCol = "features")

# The [click] column is our label 
labelStringIndexer = StringIndexer(inputCol = "click", outputCol = "label")

# The stages of our ML pipeline 
stages = stringIndexers + [vectorAssembler, labelStringIndexer]

We use Pipeline to chain multiple Transformers and Estimators together to specify our machine learning workflow. A Pipeline’s stages are specified as an ordered array.

In [0]:
from pyspark.ml import Pipeline

# Create our pipeline
pipeline = Pipeline(stages = stages)

# create transformer to add features
featurizer = pipeline.fit(df_clicks1)

# dataframe with feature and intermediate transformation columns appended
featurizedClicks = featurizer.transform(df_clicks1)

selectedCols = ['label', 'features'] + df_clicks1.columns
featurizedClicks = featurizedClicks.select(selectedCols)
featurizedClicks.printSchema()


In [0]:
pd.DataFrame(featurizedClicks.take(5), columns=featurizedClicks.columns).transpose()

Unnamed: 0,0,1,2,3,4
label,0.0,0.0,0.0,0.0,0.0
features,"(0.0, 2.0, 5.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 2.0, 5.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 2.0, 5.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 2.0, 5.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 8.0, 5.0, 0.0, 4.0, 0.0, 0.0, 0.0, 1.0, ..."
id,1000000000000000000.0,10000000000000000000.0,10000000000000000000.0,10000000000000000000.0,10000000000000000000.0
click,0,0,0,0,0
hour,14102100,14102100,14102100,14102100,14102100
C1,1005,1005,1005,1005,1005
banner_pos,0,0,0,0,1
site_id,1fbe01fe,1fbe01fe,1fbe01fe,1fbe01fe,fe8cc448
site_domain,f3845767,f3845767,f3845767,f3845767,9166c161
product,M0010,M0010,M0010,M0010,M0008


As you can see, we now have 'features' column and 'label' column.

In [0]:
display(featurizedClicks.select('features', 'label').limit(10))

features,label
"Map(vectorType -> sparse, length -> 13, indices -> List(1, 2, 9, 11), values -> List(2.0, 5.0, 5.0, 1.0))",0.0
"Map(vectorType -> sparse, length -> 13, indices -> List(1, 2, 9), values -> List(2.0, 5.0, 5.0))",0.0
"Map(vectorType -> sparse, length -> 13, indices -> List(1, 2, 9), values -> List(2.0, 5.0, 5.0))",0.0
"Map(vectorType -> sparse, length -> 13, indices -> List(1, 2, 9), values -> List(2.0, 5.0, 5.0))",0.0
"Map(vectorType -> sparse, length -> 13, indices -> List(1, 2, 4, 8, 9), values -> List(8.0, 5.0, 4.0, 1.0, 5.0))",0.0
"Map(vectorType -> sparse, length -> 13, indices -> List(1, 2, 3, 4, 9), values -> List(1.0, 5.0, 8.0, 6.0, 5.0))",0.0
"Map(vectorType -> sparse, length -> 13, indices -> List(1, 2, 3, 4, 9), values -> List(1.0, 5.0, 1.0, 4.0, 5.0))",0.0
"Map(vectorType -> sparse, length -> 13, indices -> List(1, 2, 3, 4, 8, 9, 12), values -> List(1.0, 5.0, 1.0, 2.0, 1.0, 5.0, 1.0))",0.0
"Map(vectorType -> sparse, length -> 13, indices -> List(1, 2, 9, 11), values -> List(2.0, 5.0, 5.0, 1.0))",1.0
"Map(vectorType -> sparse, length -> 13, indices -> List(2, 3, 4, 7, 9, 10, 12), values -> List(5.0, 3.0, 2.0, 1.0, 5.0, 1.0, 1.0))",0.0


In [0]:
train, test = featurizedClicks \
  .select(["label", "features", "hr"]) \
  .randomSplit([0.7, 0.3], 42)
train.cache()
test.cache()

print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

We will take Gradient Boosting Tree classifier for our ML as that is a popular one. There are others you can try, XGBoost, Random forest etc. The exact nature of these models is outstide the scope for our demo.

In [0]:
from pyspark.ml.classification import GBTClassifier

# Train our GBTClassifier model 
classifier = GBTClassifier(labelCol="label", featuresCol="features", maxBins=maxBins, maxDepth=10, maxIter=10)
model = classifier.fit(train)

In [0]:
# Execute our predictions
predictions = model.transform(test)

predictions.select('hr', 'label', 'rawPrediction', 'prediction', 'probability').show(10)

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate our GBTClassifier model using BinaryClassificationEvaluator()
ev = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", metricName="areaUnderROC")

print("Test Area Under ROC: " + str(ev.evaluate(predictions)))

An ROC curve (receiver operating characteristic curve) is a graph showing the performance of a classification model at all classification thresholds. 
AUC stands for "Area under the ROC Curve." That is, AUC measures the entire two-dimensional area underneath the entire ROC curve.
With our predictions, we can evaluate the model according to an evaluation metric, like area under the ROC curve, which in this case is 72%

In [0]:
#exaplanation of all parameters available
print(classifier.explainParams())

In [0]:
import json
features = map(lambda c: str(json.loads(json.dumps(c))['name']), \
               list(predictions.schema['features'].metadata.get('ml_attr').get('attrs').values())[0])
# convert numpy.float64 to str for spark.createDataFrame()

weights=map(lambda w: '%.10f' % w, model.featureImportances)
weightedFeatures = sorted(zip(weights, features), key=lambda x: x[1], reverse=True)

spark.createDataFrame(weightedFeatures).toDF("weight", "feature").createOrReplaceTempView('wf')

In [0]:
%sql 
select feature, weight 
from wf 
order by weight desc

feature,weight
C21_idx,0.5258602402
C19_idx,0.1332534227
product_idx,0.0983216862
app_category_idx,0.0674902803
hr_idx,0.0617483513
device_conn_type_idx,0.034707685
banner_pos_idx,0.0294980743
C16_idx,0.0177438182
C1_idx,0.0121301829
C18_idx,0.007666283


#### Product feature has 10% weight on the prediction. Its not very high. So it does not impact heavily on the result, whether a customer clicks an ad or not. The feature C21 though is a different story, 53%. We should dig more into what that is and why it is influencing the result so much.

In [0]:
#create a sql view
predictions.createOrReplaceTempView("predictions")

In [0]:
%sql 
describe predictions

col_name,data_type,comment
label,double,
features,vector,
hr,string,
rawPrediction,vector,
probability,vector,
prediction,double,


In [0]:
%sql 
select sum(case when prediction = label then 1 else 0 end) / (count(1) * 1.0) as accuracy
from predictions

accuracy
0.8423606448882709


### The AUC for our model is 72% and accuracy is 84%. Both are high enough.
We evaluated two metrics, AUC and Accuracy. There are other metrics too like Precision, Recall, F1 score. Choosing the right metric needs some thinking. Sometimes it depends on the dataset, whether its balanced or not, or what kind of problem you are solving or what kind of ML model you are using. Again something outside the scope of this notebook.

Product feature has 10% weight on the prediction. Its not very high, which means it does not impact heavily on the result. What product a customer sees has no effect on the probability an ad will be clicked.

The feature C21 though is a different story, 53%. We should dig more into what that is and why it is influencing the result so much.

Hope you got a taste of what kind of data analysis and ML models we can build on clickstream data and Dynamics data.

Thank you.