In [387]:
#---------------Import & Check Data------------------------------
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

In [388]:
from pyspark.shell import spark

In [389]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

In [390]:
from functools import reduce

from pyspark.sql import DataFrame
from pyspark.sql.functions import col, countDistinct, explode, array, lit, isnan, when, count
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import LinearSVC
from pyspark.ml.classification import FMClassifier


from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [391]:
ad_feature_df = spark.read.format("csv") \
  .option("inferSchema", True) \
  .option("header", True) \
  .option("sep", ",") \
  .load('ad_feature.csv')
print("There are", ad_feature_df.count(),"rows", len(ad_feature_df.columns),"columns", "in the data.")

                                                                                

There are 846811 rows 6 columns in the data.


In [392]:
ad_feature_df.show(10)

+----------+-------+-----------+--------+------+-----+
|adgroup_id|cate_id|campaign_id|customer| brand|price|
+----------+-------+-----------+--------+------+-----+
|     63133|   6406|      83237|       1| 95471|170.0|
|    313401|   6406|      83237|       1| 87331|199.0|
|    248909|    392|      83237|       1| 32233| 38.0|
|    208458|    392|      83237|       1|174374|139.0|
|    110847|   7211|     135256|       2|145952|32.99|
|    607788|   6261|     387991|       6|207800|199.0|
|    375706|   4520|     387991|       6|  NULL| 99.0|
|     11115|   7213|     139747|       9|186847| 33.0|
|     24484|   7207|     139744|       9|186847| 19.0|
|     28589|   5953|     395195|      13|  NULL|428.0|
+----------+-------+-----------+--------+------+-----+
only showing top 10 rows



In [393]:
display(ad_feature_df)

DataFrame[adgroup_id: int, cate_id: int, campaign_id: int, customer: int, brand: string, price: double]

In [394]:
user_profile_df = spark.read.format("csv") \
  .option("inferSchema", True) \
  .option("header", True) \
  .option("sep", ",") \
  .load('user_profile.csv')
print("There are", user_profile_df.count(),"rows", len(user_profile_df.columns),"columns", "in the data.")

                                                                                

There are 1061768 rows 9 columns in the data.


In [395]:
user_profile_df.show(10)

+------+---------+------------+-----------------+---------+------------+--------------+----------+-----------+
|userid|cms_segid|cms_group_id|final_gender_code|age_level|pvalue_level|shopping_level|occupation|city_level |
+------+---------+------------+-----------------+---------+------------+--------------+----------+-----------+
|   234|        0|           5|                2|        5|        null|             3|         0|          3|
|   523|        5|           2|                2|        2|           1|             3|         1|          2|
|   612|        0|           8|                1|        2|           2|             3|         0|       null|
|  1670|        0|           4|                2|        4|        null|             1|         0|       null|
|  2545|        0|          10|                1|        4|        null|             3|         0|       null|
|  3644|       49|           6|                2|        6|           2|             3|         0|          2|
|

In [396]:
user_profile_df.select('userid').describe().show() # There are 1061768 individual users in total.

[Stage 12464:>                                                      (0 + 1) / 1]

+-------+------------------+
|summary|            userid|
+-------+------------------+
|  count|           1061768|
|   mean| 571310.4088755735|
| stddev|329502.06203730527|
|    min|                 1|
|    max|           1141729|
+-------+------------------+



                                                                                

In [397]:
display(user_profile_df)

DataFrame[userid: int, cms_segid: int, cms_group_id: int, final_gender_code: int, age_level: int, pvalue_level: int, shopping_level: int, occupation: int, city_level : int]

In [398]:
df0 = spark.read.format("csv") \
  .option("inferSchema", True) \
  .option("header", True) \
  .option("sep", ",") \
  .load('raw_sample.csv')
print("There are", df0.count(),"rows", len(df0.columns),"columns", "in the data.")



There are 26557961 rows 6 columns in the data.




In [399]:
df0.show(10)

+------+----------+----------+-----------+------+---+
|  user|time_stamp|adgroup_id|        pid|nonclk|clk|
+------+----------+----------+-----------+------+---+
|581738|1494137644|         1|430548_1007|     1|  0|
|449818|1494638778|         3|430548_1007|     1|  0|
|914836|1494650879|         4|430548_1007|     1|  0|
|914836|1494651029|         5|430548_1007|     1|  0|
|399907|1494302958|         8|430548_1007|     1|  0|
|628137|1494524935|         9|430548_1007|     1|  0|
|298139|1494462593|         9|430539_1007|     1|  0|
|775475|1494561036|         9|430548_1007|     1|  0|
|555266|1494307136|        11|430539_1007|     1|  0|
|117840|1494036743|        11|430548_1007|     1|  0|
+------+----------+----------+-----------+------+---+
only showing top 10 rows



In [400]:
display(df0)

DataFrame[user: int, time_stamp: int, adgroup_id: int, pid: string, nonclk: int, clk: int]

In [401]:
df = df0.join(user_profile_df, df0.user == user_profile_df.userid,how='left').drop(user_profile_df.userid)
df = df.join(ad_feature_df, df.adgroup_id == ad_feature_df.adgroup_id,how='left').drop(ad_feature_df.adgroup_id)

df.show(10)

[Stage 12478:>                                                      (0 + 1) / 1]

+-----+----------+----------+-----------+------+---+---------+------------+-----------------+---------+------------+--------------+----------+-----------+-------+-----------+--------+------+------+
| user|time_stamp|adgroup_id|        pid|nonclk|clk|cms_segid|cms_group_id|final_gender_code|age_level|pvalue_level|shopping_level|occupation|city_level |cate_id|campaign_id|customer| brand| price|
+-----+----------+----------+-----------+------+---+---------+------------+-----------------+---------+------------+--------------+----------+-----------+-------+-----------+--------+------+------+
|39521|1494337568|    161713|430548_1007|     1|  0|       31|           4|                2|        4|           1|             3|         0|          4|    392|     261349|  227829|439805|  55.0|
|24973|1494413617|    315556|430539_1007|     1|  0|        0|           4|                2|        4|        null|             3|         0|          3|   1665|     129652|    5521| 88597| 498.0|
|45048|149

                                                                                

In [402]:
print("There are", df.count(),"rows", len(df.columns),"columns", "in the data.")



There are 26557961 rows 19 columns in the data.


                                                                                

In [403]:
#df=df.limit(1000000)
#print("There are", df.count(),"rows", len(df.columns),"columns", "in the data.")

In [404]:
df.select('user').describe().show()



+-------+------------------+
|summary|              user|
+-------+------------------+
|  count|          26557961|
|   mean| 568205.4524721984|
| stddev|329750.23491822765|
|    min|                 1|
|    max|           1141729|
+-------+------------------+



                                                                                

In [405]:
df.select('clk','final_gender_code','age_level','pvalue_level','shopping_level','occupation','city_level ','price').describe().toPandas()

                                                                                

Unnamed: 0,summary,clk,final_gender_code,age_level,pvalue_level,shopping_level,occupation,city_level,price
0,count,26557961.0,25029435.0,25029435.0,12005163.0,25029435.0,25029435.0,18333132.0,26557961.0
1,mean,0.0514367801052196,1.7250850448681725,3.386655072317853,1.7316214698625916,2.806960924207837,0.0544937590480967,2.553724262717358,747.3390698450776
2,stddev,0.2208869384884038,0.4464714218634672,1.1893886683133303,0.5878902699964927,0.4951439286351354,0.226989407971931,0.9255699614604562,132365.1141789427
3,min,0.0,1.0,0.0,1.0,1.0,0.0,1.0,0.01
4,max,1.0,2.0,6.0,3.0,3.0,1.0,4.0,99999999.0


In [406]:
#Get unique number of all the variables
expression = [countDistinct(c).alias(c) for c in df.columns]
unique_number_table = df.select(*expression).toPandas()
unique_number_table

                                                                                

Unnamed: 0,user,time_stamp,adgroup_id,pid,nonclk,clk,cms_segid,cms_group_id,final_gender_code,age_level,pvalue_level,shopping_level,occupation,city_level,cate_id,campaign_id,customer,brand,price
0,1141729,662061,846811,2,2,2,97,13,2,7,3,3,2,4,6769,423436,255875,99815,14861


In [407]:
#Get missing values of all the variables
expression = [count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]
null_number_table = df.select(*expression).toPandas()
null_number_table

                                                                                

Unnamed: 0,user,time_stamp,adgroup_id,pid,nonclk,clk,cms_segid,cms_group_id,final_gender_code,age_level,pvalue_level,shopping_level,occupation,city_level,cate_id,campaign_id,customer,brand,price
0,0,0,0,0,0,0,1528526,1528526,1528526,1528526,14552798,1528526,1528526,8224829,0,0,0,0,0


In [408]:
#Dealing with missing values in Categorial Missing Vaules

print('Total rows with missing value on variable "cms_group_id", "final_gender_code","age_level","shopping_level" and "occupation"')
print(df.where((df.final_gender_code.isNull() == True) & 
               (df.age_level.isNull() == True) &
               (df.shopping_level.isNull() == True) &
               (df.occupation.isNull() == True)&
               (df.cms_group_id.isNull() == True)
                     ).count())
#These missing values are all from the same rows.

Total rows with missing value on variable "cms_group_id", "final_gender_code","age_level","shopping_level" and "occupation"




1528526


                                                                                

In [409]:
#Delete Rows with Null on variable "cms_group_id", "final_gender_code","age_level","shopping_level" and "occupation" 
df_clean = df.filter(df.final_gender_code. isNotNull())
df_clean = df_clean.filter(df_clean.age_level. isNotNull())
df_clean = df_clean.filter(df_clean.shopping_level. isNotNull())
df_clean = df_clean.filter(df_clean.occupation. isNotNull())
df_clean = df_clean.filter(df_clean.cms_group_id. isNotNull())

print("There are", df_clean.count(),"rows", len(df_clean.columns),"columns", "in the data.")



There are 25029435 rows 19 columns in the data.




In [410]:
df_clean = df_clean.filter(df_clean.pvalue_level. isNotNull())


In [411]:
#Delete Columns
df_clean = df_clean.drop("pvalue_level") # Too many Nulls
df_clean = df_clean.drop("adgroup_id") # Too many Categories
df_clean = df_clean.drop("brand") # Too many Categories
df_clean = df_clean.drop('cate_id') # Too many Categories
df_clean = df_clean.drop('campaign_id') # Too many Categories

In [412]:
#Remove Useless Columns
df_clean = df_clean.drop('cms_segid')
df_clean = df_clean.drop('customer')
df_clean = df_clean.drop('nonclk')

In [413]:
#Dealing with city_value (Categorical variable with too many nulls)
df_clean.groupBy('city_level ').count().orderBy('count').show()



+-----------+-------+
|city_level |  count|
+-----------+-------+
|          1|1041473|
|       null|1779301|
|          4|2090168|
|          3|2584769|
|          2|4509452|
+-----------+-------+



                                                                                

In [414]:
df_clean= df_clean.fillna(0)

In [415]:
expression = [count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_clean.columns]
null_number_table = df_clean.select(*expression).toPandas()
null_number_table

                                                                                

Unnamed: 0,user,time_stamp,pid,clk,cms_group_id,final_gender_code,age_level,shopping_level,occupation,city_level,price
0,0,0,0,0,0,0,0,0,0,0,0


In [416]:
pd.DataFrame(df_clean.dtypes, columns = ['Column Name','Data type'])

Unnamed: 0,Column Name,Data type
0,user,int
1,time_stamp,int
2,pid,string
3,clk,int
4,cms_group_id,int
5,final_gender_code,int
6,age_level,int
7,shopping_level,int
8,occupation,int
9,city_level,int


In [417]:
df_clean.show(n=2, truncate=False, vertical=True)



-RECORD 0------------------------
 user              | 7           
 time_stamp        | 1494421153  
 pid               | 430548_1007 
 clk               | 0           
 cms_group_id      | 8           
 final_gender_code | 1           
 age_level         | 2           
 shopping_level    | 3           
 occupation        | 0           
 city_level        | 1           
 price             | 280.0       
-RECORD 1------------------------
 user              | 7           
 time_stamp        | 1494024156  
 pid               | 430548_1007 
 clk               | 0           
 cms_group_id      | 8           
 final_gender_code | 1           
 age_level         | 2           
 shopping_level    | 3           
 occupation        | 0           
 city_level        | 1           
 price             | 108.0       
only showing top 2 rows



                                                                                

In [418]:
#--------------------------EDA--------------------------------------------
freq_table = df_clean.select(col("clk").cast("int")).groupBy("clk").count().toPandas()
freq_table
#It is imbalanced data

                                                                                

Unnamed: 0,clk,count
0,1,613297
1,0,11391866


In [419]:
gender_table = df_clean.select(col("final_gender_code").cast("int")).groupBy("final_gender_code").count().toPandas()
gender_table

                                                                                

Unnamed: 0,final_gender_code,count
0,1,3148437
1,2,8856726


In [560]:
import plotly.graph_objects as go

labels = ["Male Customers", "Female Customers"]
values = [6880966,18148469]

# Create subplots: use 'domain' type for Pie subplot
fig = go.Figure(data=[go.Pie(labels=labels, values=values)])

# Use `hole` to create a donut-like pie chart
fig.update_traces(hole=.5, hoverinfo="label+percent")

fig.update_layout(
    title_text="Gender Distribution",
    # Add annotations in the center of the donut pies.
    annotations=[dict(text='Mostly Female<br />Customers', x=0.5, y=0.5, font_size=20, showarrow=False)],
    font=dict(size=20))
fig.show()

In [561]:
fig.write_html("Gender_Distribution.html")

In [422]:
gender_table = df_clean.select(col("shopping_level").cast("int")).groupBy("shopping_level").count().toPandas()
gender_table

                                                                                

Unnamed: 0,shopping_level,count
0,1,176996
1,3,11042300
2,2,785867


In [562]:
labels = ["Shallow Customers", "Moderate Customers", "Deep Customers"]
values = [1118722,2594215,21316498]

# Create subplots: use 'domain' type for Pie subplot
fig = go.Figure(data=[go.Pie(labels=labels, values=values)])

# Use `hole` to create a donut-like pie chart
fig.update_traces(hole=.5, hoverinfo="label+percent")

fig.update_layout(
    title_text="Shopping Level Distribution",
    # Add annotations in the center of the donut pies.
    annotations=[dict(text='Mostly Deep<br />Customers', x=0.5, y=0.5, font_size=20, showarrow=False)],
    font=dict(size=20))
fig.show()

In [563]:
fig.write_html("Shopping_Level_Distribution.html")

In [425]:
age_table = df_clean.select(col("age_level").cast("int")).groupBy("age_level").count().toPandas()
age_table

                                                                                

Unnamed: 0,age_level,count
0,1,533971
1,6,181309
2,3,3724433
3,5,2083894
4,4,2937965
5,2,2539499
6,0,4092


In [564]:
import plotly.express as px
fig=px.bar(x=age_table["age_level"], y=age_table["count"],text_auto='.2s')
fig.update_layout(
    title="Age Level Distribution",
    xaxis_title="Age Level",
    yaxis_title="Frequency",
    font=dict(size=16))

fig.show()

In [565]:
fig.write_html("Age_Level_Distribution.html")

In [428]:
city_table = df_clean.select(col("city_level ").cast("int")).groupBy("city_level ").count().toPandas()
city_table

                                                                                

Unnamed: 0,city_level,count
0,1,1041473
1,3,2584769
2,4,2090168
3,2,4509452
4,0,1779301


In [566]:
fig=px.bar(x=city_table["city_level "], y=city_table["count"],text_auto='.2s')
fig.update_layout(
    title="City Level Distribution",
    xaxis_title="City Level",
    yaxis_title="Frequency",
    font=dict(size=16))

fig.show()

In [567]:
fig.write_html("City_Level_Distribution.html")

In [431]:
#--------------------Data Preprocessing--------------------------------------

In [432]:
#Random Oversampling for imbalanced data (CLK)
major_df = df_clean.filter(col("clk") == 0)
minor_df = df_clean.filter(col("clk") == 1)
ratio = int(major_df.count()/minor_df.count())
print("ratio: {}".format(ratio))



ratio: 18


                                                                                

In [433]:
# duplicate the minority rows
oversampled_df = minor_df.withColumn("dummy", explode(array([lit(x) for x in range(ratio)]))).drop('dummy')

# combine both oversampled minority rows and previous majority rows 
combined_df = major_df.unionAll(oversampled_df)
combined_df.groupBy("clk").count().show()



+---+--------+
|clk|   count|
+---+--------+
|  0|11391866|
|  1|11039346|
+---+--------+



                                                                                

In [434]:
combined_df= combined_df.fillna(0)

In [435]:
combined_df.groupBy("city_level ").count().show()



+-----------+-------+
|city_level |  count|
+-----------+-------+
|          1|1936744|
|          3|4797608|
|          4|3907400|
|          2|8435551|
|          0|3353909|
+-----------+-------+



                                                                                

In [436]:
combined_df.createOrReplaceTempView('ml_df')

In [437]:
# Split into train and test set
# Use the first 7 days as the Train dataset and the 8th day as the Test dataset.
train = spark.sql(
    '''
    SELECT *
    FROM ml_df
    WHERE FROM_UNIXTIME(time_stamp) <= "2017-05-12"
    ''')
train = train.drop(train.time_stamp)
print((train.count(), len(train.columns)))



(17895054, 10)


                                                                                

In [438]:
train.show(30)

[Stage 12658:>                                                      (0 + 1) / 1]

+----+-----------+---+------------+-----------------+---------+--------------+----------+-----------+-----+
|user|        pid|clk|cms_group_id|final_gender_code|age_level|shopping_level|occupation|city_level |price|
+----+-----------+---+------------+-----------------+---------+--------------+----------+-----------+-----+
|   7|430548_1007|  0|           8|                1|        2|             3|         0|          1|280.0|
|   7|430548_1007|  0|           8|                1|        2|             3|         0|          1|108.0|
|  30|430548_1007|  0|           4|                2|        4|             3|         0|          2| 48.0|
|   7|430548_1007|  0|           8|                1|        2|             3|         0|          1|198.0|
|   7|430548_1007|  0|           8|                1|        2|             3|         0|          1| 11.8|
|   7|430548_1007|  0|           8|                1|        2|             3|         0|          1|275.0|
|   7|430548_1007|  0|      

                                                                                

In [439]:
#Indexer+OneHotEncoder+Vector Ensemble
categoricalColumns = ['pid','cms_group_id','final_gender_code', 'age_level', 'shopping_level','occupation','city_level ']
stages = []

for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index').setHandleInvalid("keep")
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

label_stringIdx = StringIndexer(inputCol = 'clk', outputCol = 'label')
stages += [label_stringIdx]

numericCols = ['price']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [440]:
#Build Pipeline for Train set
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(train)
train_merged_processed = pipelineModel.transform(train)
cols = train.columns
selectedCols = ['label', 'features'] + cols
train_merged_processed = train_merged_processed.select(selectedCols)
train_merged_processed.printSchema()

                                                                                

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- user: integer (nullable = true)
 |-- pid: string (nullable = true)
 |-- clk: integer (nullable = true)
 |-- cms_group_id: integer (nullable = true)
 |-- final_gender_code: integer (nullable = true)
 |-- age_level: integer (nullable = true)
 |-- shopping_level: integer (nullable = true)
 |-- occupation: integer (nullable = true)
 |-- city_level : integer (nullable = true)
 |-- price: double (nullable = false)



In [441]:
#Build Test Dataset
test = spark.sql(
    '''
    SELECT *
    FROM ml_df
    WHERE FROM_UNIXTIME(time_stamp) > "2017-05-12"
    ''')
test = test.drop(test.time_stamp)
print((test.count(), len(test.columns)))



(4536158, 10)


                                                                                

In [442]:
#Build Pipeline for Test set
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(test)
test_merged_processed = pipelineModel.transform(test)
cols = test.columns
selectedCols = ['label', 'features'] + cols
test_merged_processed = test_merged_processed.select(selectedCols)
test_merged_processed.printSchema()



root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- user: integer (nullable = true)
 |-- pid: string (nullable = true)
 |-- clk: integer (nullable = true)
 |-- cms_group_id: integer (nullable = true)
 |-- final_gender_code: integer (nullable = true)
 |-- age_level: integer (nullable = true)
 |-- shopping_level: integer (nullable = true)
 |-- occupation: integer (nullable = true)
 |-- city_level : integer (nullable = true)
 |-- price: double (nullable = false)



                                                                                

In [443]:
#--------------------Machine Learning------------------------------------------

In [444]:
#------------------Logistic Regression----------------------------------------
lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=10)
lrModel = lr.fit(train_merged_processed)

22/06/10 18:38:46 WARN MemoryStore: Not enough space to cache rdd_18379_5 in memory! (computed 65.0 MiB so far)
22/06/10 18:38:46 WARN BlockManager: Persisting block rdd_18379_5 to disk instead.
22/06/10 18:39:01 WARN MemoryStore: Not enough space to cache rdd_18379_5 in memory! (computed 113.0 MiB so far)
22/06/10 18:39:06 WARN MemoryStore: Not enough space to cache rdd_18379_6 in memory! (computed 65.0 MiB so far)
22/06/10 18:39:06 WARN BlockManager: Persisting block rdd_18379_6 to disk instead.
22/06/10 18:39:20 WARN MemoryStore: Not enough space to cache rdd_18379_6 in memory! (computed 113.0 MiB so far)
22/06/10 18:39:27 WARN MemoryStore: Not enough space to cache rdd_18379_0 in memory! (computed 65.0 MiB so far)
22/06/10 18:39:28 WARN MemoryStore: Not enough space to cache rdd_18379_1 in memory! (computed 65.0 MiB so far)
22/06/10 18:39:29 WARN MemoryStore: Not enough space to cache rdd_18379_2 in memory! (computed 65.0 MiB so far)
22/06/10 18:39:30 WARN MemoryStore: Not enough s

22/06/10 18:40:23 WARN MemoryStore: Not enough space to cache rdd_18379_1 in memory! (computed 65.0 MiB so far)
22/06/10 18:40:23 WARN MemoryStore: Not enough space to cache rdd_18379_2 in memory! (computed 65.0 MiB so far)
22/06/10 18:40:24 WARN MemoryStore: Not enough space to cache rdd_18379_3 in memory! (computed 65.0 MiB so far)
22/06/10 18:40:25 WARN MemoryStore: Not enough space to cache rdd_18379_5 in memory! (computed 65.0 MiB so far)
22/06/10 18:40:25 WARN MemoryStore: Not enough space to cache rdd_18379_6 in memory! (computed 65.0 MiB so far)
[Stage 12898:>                                                      (0 + 1) / 1]

In [None]:
trainingSummary = lrModel.summary
roc = trainingSummary.roc.toPandas()
plt.plot(roc['FPR'],roc['TPR'])
plt.ylabel('False Positive Rate')
plt.xlabel('True Positive Rate')
plt.title('ROC Curve')
plt.show()
print('Training set areaUnderROC: ' + str(trainingSummary.areaUnderROC))

In [None]:
predictions = lrModel.transform(test_merged_processed)
evaluator = BinaryClassificationEvaluator()
print('Test Area Under ROC', evaluator.evaluate(predictions))

In [None]:
#Confusion Matrix
# Create a confusion matrix
predictions.groupBy('label', 'prediction').count().show()

# Calculate the elements of the confusion matrix
TN = predictions.filter('prediction = 0 AND label = prediction').count()
TP = predictions.filter('prediction = 1 AND label = prediction').count()
FN = predictions.filter('prediction = 0 AND label != prediction').count()
FP = predictions.filter('prediction = 1 AND label != prediction').count()

# Accuracy measures the proportion of correct predictions
accuracy = (TN + TP) / (TN + TP + FN + FP)
print(accuracy)

In [448]:
#-----------------------Decision Tree-----------------------------------
dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label', maxDepth = 3,
                           minInstancesPerNode = 20, impurity = 'gini')
dtModel = dt.fit(train_merged_processed)
predictions = dtModel.transform(test_merged_processed)
predictions.groupBy("clk").count().show()

22/06/10 19:08:40 WARN MemoryStore: Not enough space to cache rdd_18951_0 in memory! (computed 328.4 MiB so far)
22/06/10 19:08:40 WARN BlockManager: Persisting block rdd_18951_0 to disk instead.
22/06/10 19:08:53 WARN MemoryStore: Not enough space to cache rdd_18951_0 in memory! (computed 362.4 MiB so far)
22/06/10 19:09:07 WARN MemoryStore: Not enough space to cache rdd_18951_1 in memory! (computed 328.4 MiB so far)
22/06/10 19:09:07 WARN BlockManager: Persisting block rdd_18951_1 to disk instead.
22/06/10 19:09:19 WARN MemoryStore: Not enough space to cache rdd_18951_1 in memory! (computed 362.4 MiB so far)
22/06/10 19:09:34 WARN MemoryStore: Not enough space to cache rdd_18951_2 in memory! (computed 328.4 MiB so far)
22/06/10 19:09:34 WARN BlockManager: Persisting block rdd_18951_2 to disk instead.
22/06/10 19:09:46 WARN MemoryStore: Not enough space to cache rdd_18951_2 in memory! (computed 362.4 MiB so far)
22/06/10 19:09:59 WARN MemoryStore: Not enough space to cache rdd_18951_4

+---+-------+
|clk|  count|
+---+-------+
|  0|2337818|
|  1|2198340|
+---+-------+



                                                                                

In [449]:
evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))



Test Area Under ROC: 0.4847432824594339


                                                                                

In [None]:
#Confusion Matrix
# Create a confusion matrix
predictions.groupBy('label', 'prediction').count().show()

# Calculate the elements of the confusion matrix
TN = predictions.filter('prediction = 0 AND label = prediction').count()
TP = predictions.filter('prediction = 1 AND label = prediction').count()
FN = predictions.filter('prediction = 0 AND label != prediction').count()
FP = predictions.filter('prediction = 1 AND label != prediction').count()

# Accuracy measures the proportion of correct predictions
accuracy = (TN + TP) / (TN + TP + FN + FP)
print(accuracy)

In [452]:
#-----------------------------------SVM---------------------------------

In [453]:
#SVM
svm = LinearSVC(maxIter=5, regParam=0.01)
svmModel = svm.fit(train_merged_processed)


22/06/10 19:35:14 WARN MemoryStore: Not enough space to cache rdd_19426_5 in memory! (computed 65.0 MiB so far)
22/06/10 19:35:14 WARN BlockManager: Persisting block rdd_19426_5 to disk instead.
22/06/10 19:35:27 WARN MemoryStore: Not enough space to cache rdd_19426_5 in memory! (computed 113.0 MiB so far)
22/06/10 19:35:32 WARN MemoryStore: Not enough space to cache rdd_19426_6 in memory! (computed 65.0 MiB so far)
22/06/10 19:35:32 WARN BlockManager: Persisting block rdd_19426_6 to disk instead.
22/06/10 19:35:45 WARN MemoryStore: Not enough space to cache rdd_19426_6 in memory! (computed 113.0 MiB so far)
22/06/10 19:35:51 WARN MemoryStore: Not enough space to cache rdd_19426_0 in memory! (computed 65.0 MiB so far)
22/06/10 19:35:52 WARN MemoryStore: Not enough space to cache rdd_19426_1 in memory! (computed 65.0 MiB so far)
22/06/10 19:35:52 WARN MemoryStore: Not enough space to cache rdd_19426_2 in memory! (computed 65.0 MiB so far)
22/06/10 19:35:53 WARN MemoryStore: Not enough s

22/06/10 19:36:26 WARN MemoryStore: Not enough space to cache rdd_19426_1 in memory! (computed 65.0 MiB so far)
22/06/10 19:36:27 WARN MemoryStore: Not enough space to cache rdd_19426_2 in memory! (computed 65.0 MiB so far)
22/06/10 19:36:27 WARN MemoryStore: Not enough space to cache rdd_19426_3 in memory! (computed 65.0 MiB so far)
22/06/10 19:36:28 WARN MemoryStore: Not enough space to cache rdd_19426_5 in memory! (computed 65.0 MiB so far)
22/06/10 19:36:28 WARN MemoryStore: Not enough space to cache rdd_19426_6 in memory! (computed 65.0 MiB so far)
22/06/10 19:36:29 WARN MemoryStore: Not enough space to cache rdd_19426_0 in memory! (computed 65.0 MiB so far)
22/06/10 19:36:29 WARN MemoryStore: Not enough space to cache rdd_19426_1 in memory! (computed 65.0 MiB so far)
22/06/10 19:36:30 WARN MemoryStore: Not enough space to cache rdd_19426_2 in memory! (computed 65.0 MiB so far)
22/06/10 19:36:31 WARN MemoryStore: Not enough space to cache rdd_19426_3 in memory! (computed 65.0 MiB 

22/06/10 19:37:04 WARN MemoryStore: Not enough space to cache rdd_19426_2 in memory! (computed 65.0 MiB so far)
22/06/10 19:37:04 WARN MemoryStore: Not enough space to cache rdd_19426_3 in memory! (computed 65.0 MiB so far)
22/06/10 19:37:05 WARN MemoryStore: Not enough space to cache rdd_19426_5 in memory! (computed 65.0 MiB so far)
22/06/10 19:37:05 WARN MemoryStore: Not enough space to cache rdd_19426_6 in memory! (computed 65.0 MiB so far)
22/06/10 19:37:06 WARN MemoryStore: Not enough space to cache rdd_19426_0 in memory! (computed 65.0 MiB so far)
22/06/10 19:37:07 WARN MemoryStore: Not enough space to cache rdd_19426_1 in memory! (computed 65.0 MiB so far)
22/06/10 19:37:07 WARN MemoryStore: Not enough space to cache rdd_19426_2 in memory! (computed 65.0 MiB so far)
22/06/10 19:37:08 WARN MemoryStore: Not enough space to cache rdd_19426_3 in memory! (computed 65.0 MiB so far)
22/06/10 19:37:08 WARN MemoryStore: Not enough space to cache rdd_19426_5 in memory! (computed 65.0 MiB 

22/06/10 19:37:44 WARN MemoryStore: Not enough space to cache rdd_19426_3 in memory! (computed 65.0 MiB so far)
22/06/10 19:37:45 WARN MemoryStore: Not enough space to cache rdd_19426_5 in memory! (computed 65.0 MiB so far)
22/06/10 19:37:45 WARN MemoryStore: Not enough space to cache rdd_19426_6 in memory! (computed 65.0 MiB so far)
22/06/10 19:37:46 WARN MemoryStore: Not enough space to cache rdd_19426_0 in memory! (computed 65.0 MiB so far)
22/06/10 19:37:47 WARN MemoryStore: Not enough space to cache rdd_19426_1 in memory! (computed 65.0 MiB so far)
22/06/10 19:37:47 WARN MemoryStore: Not enough space to cache rdd_19426_2 in memory! (computed 65.0 MiB so far)
22/06/10 19:37:48 WARN MemoryStore: Not enough space to cache rdd_19426_3 in memory! (computed 65.0 MiB so far)
22/06/10 19:37:48 WARN MemoryStore: Not enough space to cache rdd_19426_5 in memory! (computed 65.0 MiB so far)
22/06/10 19:37:49 WARN MemoryStore: Not enough space to cache rdd_19426_6 in memory! (computed 65.0 MiB 

In [None]:
predictions = svmModel.transform(test_merged_processed)
evaluator = BinaryClassificationEvaluator()
evaluation = evaluator.evaluate(predictions)
print("evaluation (area under ROC): %f" % evaluation)

In [455]:
#----------------------Random Forest-----------------------------------

In [456]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label')
rfModel = rf.fit(train_merged_processed)

22/06/10 19:51:40 WARN MemoryStore: Not enough space to cache rdd_19841_0 in memory! (computed 360.0 MiB so far)
22/06/10 19:51:40 WARN BlockManager: Persisting block rdd_19841_0 to disk instead.
22/06/10 19:51:58 WARN MemoryStore: Not enough space to cache rdd_19841_0 in memory! (computed 360.0 MiB so far)
22/06/10 19:52:16 WARN MemoryStore: Not enough space to cache rdd_19841_1 in memory! (computed 360.0 MiB so far)
22/06/10 19:52:16 WARN BlockManager: Persisting block rdd_19841_1 to disk instead.
22/06/10 19:52:34 WARN MemoryStore: Not enough space to cache rdd_19841_1 in memory! (computed 360.0 MiB so far)
22/06/10 19:52:53 WARN MemoryStore: Not enough space to cache rdd_19841_2 in memory! (computed 360.0 MiB so far)
22/06/10 19:52:53 WARN BlockManager: Persisting block rdd_19841_2 to disk instead.
22/06/10 19:53:11 WARN MemoryStore: Not enough space to cache rdd_19841_2 in memory! (computed 360.0 MiB so far)
22/06/10 19:53:25 WARN MemoryStore: Not enough space to cache rdd_19841_3

In [457]:
predictions = rfModel.transform(test_merged_processed)


In [None]:
evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

In [459]:
#-------------------------Factorization machines-------------------------------------

In [460]:
#fm = FMClassifier(featuresCol = 'features', labelCol = 'label', stepSize=0.001)
#fmModel = fm.fit(train_merged_processed)
#predictions = dtModel.transform(test_merged_processed)

In [461]:
#evaluation = evaluator.evaluate(predictions)
#print("evaluation (area under ROC): %f" % evaluation)

In [462]:
#Confusion Matrix
# Create a confusion matrix
predictions.groupBy('label', 'prediction').count().show()

# Calculate the elements of the confusion matrix
#TN = predictions.filter('prediction = 0 AND label = prediction').count()
#TP = predictions.filter('prediction = 1 AND label = prediction').count()
#FN = predictions.filter('prediction = 0 AND label != prediction').count()
#FP = predictions.filter('prediction = 1 AND label != prediction').count()

# Accuracy measures the proportion of correct predictions
#accuracy = (TN + TP) / (TN + TP + FN + FP)
#print(accuracy)

22/06/10 20:08:35 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.

+-----+----------+-------+
|label|prediction|  count|
+-----+----------+-------+
|  0.0|       1.0| 973623|
|  0.0|       0.0|1364195|
|  1.0|       1.0| 993438|
|  1.0|       0.0|1204902|
+-----+----------+-------+



                                                                                

In [463]:
#Feature Importance of Decision Tree
#dtModel.featureImportances

In [464]:
predictions.select('probability', 'prediction').show(5)


[Stage 13922:>                                                      (0 + 1) / 1]

+--------------------+----------+
|         probability|prediction|
+--------------------+----------+
|[0.48837534351880...|       1.0|
|[0.48837534351880...|       1.0|
|[0.53494553709699...|       0.0|
|[0.52645676390686...|       0.0|
|[0.52645676390686...|       0.0|
+--------------------+----------+
only showing top 5 rows



[Stage 13929:>                                                      (0 + 1) / 1]                                                                                

In [465]:
predictions.select('probability', 'shopping_level').show(5)


[Stage 13945:>                                                      (0 + 1) / 1]

+--------------------+--------------+
|         probability|shopping_level|
+--------------------+--------------+
|[0.48837534351880...|             3|
|[0.48837534351880...|             3|
|[0.53494553709699...|             3|
|[0.52645676390686...|             3|
|[0.52645676390686...|             3|
+--------------------+--------------+
only showing top 5 rows



                                                                                

In [512]:
pre_results = predictions.select('probability', 'pid','cms_group_id','final_gender_code', 'age_level', 'shopping_level','occupation','city_level ','price').limit(10000).toPandas()

                                                                                

In [469]:
pre_results.head()

Unnamed: 0,probability,pid,cms_group_id,final_gender_code,age_level,shopping_level,occupation,city_level,price
0,"[0.4883753435188057, 0.5116246564811943]",430539_1007,3,2,3,3,0,4,65.0
1,"[0.4883753435188057, 0.5116246564811943]",430539_1007,3,2,3,3,0,2,65.0
2,"[0.5349455370969991, 0.465054462903001]",430548_1007,8,1,2,3,0,4,75.0
3,"[0.526456763906867, 0.473543236093133]",430548_1007,11,1,5,3,0,2,176.0
4,"[0.526456763906867, 0.473543236093133]",430548_1007,11,1,5,3,0,2,118.0


In [489]:
pre_results.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10000 entries, 0 to 9999
Data columns (total 10 columns):
 #   Column             Non-Null Count  Dtype  
---  ------             --------------  -----  
 0   probability        10000 non-null  object 
 1   pid                10000 non-null  object 
 2   cms_group_id       10000 non-null  int32  
 3   final_gender_code  10000 non-null  int32  
 4   age_level          10000 non-null  int32  
 5   shopping_level     10000 non-null  int32  
 6   occupation         10000 non-null  int32  
 7   city_level         10000 non-null  int32  
 8   price              10000 non-null  float64
 9   clk_p              0 non-null      float64
dtypes: float64(2), int32(6), object(2)
memory usage: 547.0+ KB


In [None]:
#---------------------------Insights------------------------------

In [520]:
pre_results['clk_p']= pre_results['probability'].astype(str).str.split(',').str[1]

In [524]:
pre_results['clk_p']= pre_results['clk_p'].str.replace("]"," ").astype(float)


The default value of regex will change from True to False in a future version. In addition, single character regular expressions will *not* be treated as literal strings when regex=True.



In [None]:
df["column"] = df["column"].str.replace(",","").astype(float)

In [525]:
pre_results.head()

Unnamed: 0,probability,pid,cms_group_id,final_gender_code,age_level,shopping_level,occupation,city_level,price,clk_p
0,"[0.4883753435188057, 0.5116246564811943]",430539_1007,3,2,3,3,0,4,65.0,0.511625
1,"[0.4883753435188057, 0.5116246564811943]",430539_1007,3,2,3,3,0,2,65.0,0.511625
2,"[0.5349455370969991, 0.465054462903001]",430548_1007,8,1,2,3,0,4,75.0,0.465054
3,"[0.526456763906867, 0.473543236093133]",430548_1007,11,1,5,3,0,2,176.0,0.473543
4,"[0.526456763906867, 0.473543236093133]",430548_1007,11,1,5,3,0,2,118.0,0.473543


In [526]:
pre_results.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10000 entries, 0 to 9999
Data columns (total 10 columns):
 #   Column             Non-Null Count  Dtype  
---  ------             --------------  -----  
 0   probability        10000 non-null  object 
 1   pid                10000 non-null  object 
 2   cms_group_id       10000 non-null  int32  
 3   final_gender_code  10000 non-null  int32  
 4   age_level          10000 non-null  int32  
 5   shopping_level     10000 non-null  int32  
 6   occupation         10000 non-null  int32  
 7   city_level         10000 non-null  int32  
 8   price              10000 non-null  float64
 9   clk_p              10000 non-null  float64
dtypes: float64(2), int32(6), object(2)
memory usage: 547.0+ KB


In [552]:
fig = px.box(pre_results, x="city_level ", y="clk_p")

fig.update_layout(
    title="CTR Probability Distributed by City Level",
    xaxis_title="City Level",
    yaxis_title="CTR Probability",
    font=dict(size=16))


fig.show()

In [553]:
fig.write_html("City_Level_p.html")

In [554]:
fig = px.box(pre_results, x="final_gender_code", y="clk_p",color="occupation")
fig.update_layout(
    title="CTR Probability Distributed by Gender",
    xaxis_title="Gender",
    yaxis_title="CTR Probability",
    font=dict(size=16))
fig.show()

In [555]:
fig.write_html("Gender_p.html")

In [556]:
fig = px.box(pre_results, x="age_level", y="clk_p",color="shopping_level")
fig.update_layout(
    title="CTR Probability Distributed by Age",
    xaxis_title="Age",
    yaxis_title="CTR Probability",
    font=dict(size=16))
fig.show()

In [557]:
fig.write_html("Age_p.html")

In [558]:
fig = px.box(pre_results, x="cms_group_id", y="clk_p",color="pid")

fig.update_layout(
    title="CTR Probability Distributed by Online Community",
    xaxis_title="Online Community",
    yaxis_title="CTR Probability",
    font=dict(size=16))


fig.show()


In [559]:
fig.write_html("Community_p.html")