###  Function to mount to criteosmall public dataset bucket

In [2]:
def mount_s3_bucket(access_key, secret_key, bucket_name, mount_folder):
  ACCESS_KEY_ID = access_key
  SECRET_ACCESS_KEY = secret_key
  ENCODED_SECRET_KEY = SECRET_ACCESS_KEY.replace("/", "%2F")

  print ("Mounting", bucket_name)

  try:
    # Unmount the data in case it was already mounted.
    dbutils.fs.unmount("/mnt/%s" % mount_folder)
    
  except:
    # If it fails to unmount it most likely wasn't mounted in the first place
    print ("Directory not unmounted: ", mount_folder)
    
  finally:
    # Lastly, mount our bucket.
    dbutils.fs.mount("s3a://%s:%s@%s" % (ACCESS_KEY_ID, ENCODED_SECRET_KEY, bucket_name), "/mnt/%s" % mount_folder)
    #dbutils.fs.mount("s3a://"+ ACCESS_KEY_ID + ":" + ENCODED_SECRET_KEY + "@" + bucket_name, mount_folder)
    print ("The bucket", bucket_name, "was mounted to", mount_folder, "\n")

###  Set your access key and secret access key
> <br>

In [4]:
# Set AWS programmatic access credentials
ACCESS_KEY = ""
SECRET_ACCESS_KEY = ""

### Mount the datasets required for this course
> <br>
> * It may take 5-10 minutes. The mounting process is slow.

#### Mounting

In [7]:
from pyspark.sql import SparkSession
spark = SparkSession \
        .builder \
        .appName('WeCloud Spark Training') \
        .getOrCreate()
print('Session created')

In [8]:
sc = spark.sparkContext

In [9]:
criteoDF.rdd.glom().take(5)[:][0:1]

In [10]:
mount_s3_bucket(ACCESS_KEY, SECRET_ACCESS_KEY, "criteo-small", "criteo-small")

In [11]:
%fs
ls /mnt/criteo-small

path,name,size
dbfs:/mnt/criteo-small/train.tsv.gz,train.tsv.gz,4044433407


### Reading the Criteo Dataset (Tab Seperated File)

##### Creating a Schema

In [14]:
# creating my own shema
criteoFiles = "/mnt/criteo-small/train.tsv.gz"


from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType

criteoSchema = StructType([
    StructField("label", IntegerType(), True),
    StructField("i_1", IntegerType(), True),
    StructField("i_2", IntegerType(), True),
    StructField("i_3", IntegerType(), True),
    StructField("i_4", IntegerType(), True),
    StructField("i_5", IntegerType(), True),
    StructField("i_6", IntegerType(), True),
    StructField("i_7", IntegerType(), True),
    StructField("i_8", IntegerType(), True),
    StructField("i_9", IntegerType(), True),
    StructField("i_10", IntegerType(), True),
    StructField("i_11", IntegerType(), True),
    StructField("i_12", IntegerType(), True),
    StructField("i_13", IntegerType(), True),
    StructField("c_1", StringType(), True),
    StructField("c_2", StringType(), True),
    StructField("c_3", StringType(), True),
    StructField("c_4", StringType(), True),
    StructField("c_5", StringType(), True),
    StructField("c_6", StringType(), True),
    StructField("c_7", StringType(), True),
    StructField("c_8", StringType(), True), StructField("c_9", StringType(), True), StructField("c_10", StringType(), True), StructField("c_11", StringType(), True), StructField("c_12", StringType(), True), StructField("c_13", StringType(), True), StructField("c_14", StringType(), True), StructField("c_15", StringType(), True), StructField("c_16", StringType(), True), StructField("c_17", StringType(), True), StructField("c_18", StringType(), True), StructField("c_19", StringType(), True), StructField("c_20", StringType(), True), StructField("c_21", StringType(), True), StructField("c_22", StringType(), True), StructField("c_23", StringType(), True), StructField("c_24", StringType(), True), StructField("c_25", StringType(), True),StructField("c_26", StringType(), True)
]
)



##### Reading the Tab Seperated File

In [16]:
criteoSmallDF = (spark.read
    .option("header", "false")
    .option("delimiter", "\t")
    .schema(criteoSchema)
    .csv(criteoFiles)
)

In [17]:
criteoSmallDF.printSchema()

In [18]:
criteoSmallDF.count()

In [19]:
#count the number of clicked on adds and the number of non clicked on adds
#the data is highly unbalanced, the number of clicked on adds is only approximately 26% of the data while 74% of the data represents unclicked on adds

criteoSmallDF.groupBy('label').count().orderBy('label').show()

### Write the Criteo Dataset as a Parquet File

In [21]:
# save the file as parquet file

criteoOutParquet = "/tmp/criteo_small.parquet"

(criteoSmallDF.write                       # Our DataFrameWriter
  .option("delimiter", "\t")  
  .option("compression", "snappy")
  .mode("overwrite")                       # Replace existing files
  .parquet(criteoOutParquet)               # Write DataFrame to parquet files
)

### Read the Parquet file

In [23]:
# Read the parquet file
criteoOutParquet = "/tmp/criteo_small.parquet"

criteo_small_Parquet_DF = (spark.read              
  .option("delimiter", "\t")  
  .schema(criteoSchema)        # Use the specified schema
  .parquet(criteoOutParquet)   # Creates a DataFrame from Parquet after reading in the file
)

In [24]:
# Count the number of clicked on adds versus non clicked on adds
criteo_small_Parquet_DF.groupBy('label').count().orderBy('label').show()

### Count null values in each coulmn
> <br>

In [26]:
# Count the number of nulls in each coulmn regardless of the label and showing the percentage of nulls

from pyspark.sql.functions import count, col
columns= criteo_small_Parquet_DF.columns[1:]
critieo_null_percentage=dict()
for i in columns:
  critieo_null_percentage[i]=criteo_small_Parquet_DF.filter(col(i).isNull()).count() / criteo_small_Parquet_DF.count()
print(critieo_null_percentage)



### Display the null counts for each coulmn ordered descending
> <br>

In [28]:
# 15 attributes have no null values, 2 have more than 75% and 6 between 44% and 45% and 16 >0 and less than 44%
sort_critieo_null_percentage = sorted(critieo_null_percentage.items(), key=lambda x: x[1], reverse=True)
for item in sort_critieo_null_percentage:
  print(item)

### Filter the Coulmns where Null Percentage Is More Than 40%
> <br>

In [30]:
# Filter the coulmns that have null values more than 40% of their values
coulmns_to_drop=[] # coulmns to be dropped
coulmns_to_convert_to_Bool=[] # columns to convert to boolean value (True/False)
for col in critieo_null_percentage:
  if critieo_null_percentage[col] > 0.40:
    print (col, ":",critieo_null_percentage[col]) # print all the coulmn names that have more than 40% nulls
    if critieo_null_percentage[col] > 0.70:
      coulmns_to_drop.append(col)
    else:
      coulmns_to_convert_to_Bool.append(col)
    

### Dispaly the Coulmn Names That Will Be Dropped
> <br>

In [32]:
print(coulmns_to_drop)

### Dispaly the Coulmn names that will be Transformed to Boolean
> <br>

In [34]:
print(coulmns_to_convert_to_Bool)

### Balance The Data
> <br>

In [36]:
# Creating a balanced dataset
# filtering the rows into two sets clicked on adds and unclicked on adds
# from the parquet file
from pyspark.sql.functions import col
critieo_clicked=criteo_small_Parquet_DF.filter(col("label")==1)
critieo_unclicked=criteo_small_Parquet_DF.filter(col("label")==0)

      

In [37]:
# Select from the unclicked sample approximately 11745438 random records which are the number of 
# records avaialble in the data where label =1 (clicked on adds)
critieo_unclicked=critieo_unclicked.sample(False, 0.34448, 100)


In [38]:
critieo_unclicked.count()

In [39]:
critieo_clicked.count()

In [40]:
# Combine the two dataframes  critieo_clicked and critieo_unclicked and put them back in criteo_small_Parquet_DF
criteo_small_Parquet_DF=critieo_clicked.union(critieo_unclicked)

### Drop the coulmns where null count exceeds 75%
> <br>

In [42]:
# Before replacing the integer valued columns with the mean and the categorical values with the mode
# I would like to drop the coulmns that have more than 70% null values
# these coulmns are i_12,c_22
# coulmns_to_drop=["i_12","c_22"]
criteo_small_Parquet_DF=criteo_small_Parquet_DF.drop(*coulmns_to_drop)
criteo_small_Parquet_DF.columns

In [43]:
display(criteo_small_Parquet_DF)

label,i_1,i_2,i_3,i_4,i_5,i_6,i_7,i_8,i_9,i_10,i_11,i_13,c_1,c_2,c_3,c_4,c_5,c_6,c_7,c_8,c_9,c_10,c_11,c_12,c_13,c_14,c_15,c_16,c_17,c_18,c_19,c_20,c_21,c_23,c_24,c_25,c_26
1,1.0,4,2.0,0.0,0.0,0.0,1.0,0,0.0,1.0,1.0,0.0,68fd1e64,2c16a946,503b9dbc,e4dbea90,f3474129,13718bbd,38eb9cf4,1f89b562,a73ee510,547c0ffe,bc8c9f21,60ab2f07,46f42a63,07d13a8f,18231224,e6b6bdc7,e5ba7672,74ef3502,,,5316a17f,32c7478e,9117a34a,,
1,0.0,-1,,,1465.0,0.0,17.0,0,4.0,0.0,4.0,,241546e0,38a947a1,fa673455,6a14f9b9,25c83c98,fe6b92e5,1c86e0eb,1f89b562,a73ee510,e7ba2569,755e4a50,208d9687,5978055e,07d13a8f,5182f694,f8b34416,e5ba7672,e5f8f18f,,,f3ddd519,32c7478e,b34f3128,,
1,,2,11.0,5.0,10262.0,34.0,2.0,4,5.0,,1.0,5.0,be589b51,287130e0,cd7a7a22,fb7334df,25c83c98,,6cdb3998,361384ce,a73ee510,3ff10fb2,5874c9c9,976cbd4c,740c210d,1adce6ef,310d155b,07eb8110,07c540c4,891589e7,18259a83,a458ea53,a0ab60ca,32c7478e,a052b1ed,9b3e8820,8967c0d2
1,1.0,987,,2.0,105.0,2.0,1.0,2,2.0,1.0,1.0,2.0,68fd1e64,38d50e09,da603082,431a5096,43b19349,7e0ccccf,3f35b640,0b153874,a73ee510,3b08e48b,3d5fb018,6aaab577,94172618,07d13a8f,ee569ce2,2f03ef40,d4bb7bd8,582152eb,21ddcdc9,b1252a9d,3b203ca1,32c7478e,b21dc903,001f3601,aa5f0a15
1,,47,,0.0,6399.0,38.0,19.0,10,143.0,,10.0,6.0,1464facd,38a947a1,223b0e16,ca55061c,25c83c98,7e0ccccf,6933dec1,5b392875,a73ee510,3b08e48b,860c302b,156f99ef,30735474,1adce6ef,0e78291e,5fbf4a84,e5ba7672,1999bae9,,,deb9605d,32c7478e,e448275f,,
1,0.0,1,20.0,16.0,1548.0,93.0,42.0,32,912.0,0.0,15.0,16.0,8cf07265,942f9a8d,a8e40bcf,0365276a,25c83c98,7e0ccccf,3f4ec687,1f89b562,a73ee510,726f00fd,c4adf918,27c604a6,85dbe138,07d13a8f,a8e962af,c449f783,27c07bd6,1f868fdd,21ddcdc9,a458ea53,7eee76d1,32c7478e,9af06ad9,9d93af03,cdfe5ab7
1,0.0,20,2.0,2.0,7188.0,170.0,2.0,3,24.0,0.0,2.0,2.0,68fd1e64,38a947a1,ee6e4611,30d9fc77,4cf72387,7e0ccccf,bf9d4f90,0b153874,a73ee510,b7c4dad5,81cae03e,5332e3fb,d413ef3e,07d13a8f,a6d97bf2,ec676ace,3486227d,02e8d897,,,b055c31b,3a171ecb,ae2cd100,,
1,0.0,78,2.0,15.0,4311.0,85.0,4.0,18,230.0,0.0,3.0,15.0,68fd1e64,1287a654,5ed035c9,5b5365b2,4cf72387,6f6d9be8,1b1aa9ea,0b153874,a73ee510,c3e69838,7a3651f5,df8b1dea,95bc260c,b28479f6,ced5be3a,4cc0abe4,e5ba7672,df00d249,,,f520f961,32c7478e,27b60b01,,
1,3.0,0,4.0,13.0,224.0,28.0,3.0,35,27.0,1.0,1.0,13.0,05db9164,90081f33,993f507e,14a74146,25c83c98,13718bbd,dc7659bd,0b153874,a73ee510,03e48276,e51ddf94,18fe7085,3516f6e6,64c94865,98995c3b,8c48eb08,e5ba7672,7181ccc8,,,2ed6b316,3a171ecb,abf08f1b,,
1,,277,,3.0,7318.0,24.0,6.0,3,98.0,,1.0,3.0,8cf07265,9adf4cf9,2e76fb61,0b1ad9da,4cf72387,fe6b92e5,75dcaaca,0b153874,a73ee510,3b08e48b,8aabdae8,9886a0a7,edcf17ce,07d13a8f,2aaebd23,338c0d09,e5ba7672,c7dbecd5,,,60d2d691,3a171ecb,90b6276f,,


### Initialize the stages list for the pipeline
> <br>

In [45]:
stages = [] # stages in our Pipeline or transformations in dataset

### Transform the coulmns that have between 40% - 70% null values into boolean features
> <br>

In [47]:
from pyspark.ml.pipeline import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param
from pyspark.sql.functions import col, expr, when
from pyspark.sql import functions as F
from pyspark import keyword_only


### Custom Transformer for Converting some Coulmns that have more than 70% nulls into a Boolean Data Type


> <br>

In [49]:
# Custom Transformer for converting some coulmns into boolean
class TransformIntoBoolean(Transformer, HasInputCol, HasOutputCol):
  
  @keyword_only
  def __init__(self, inputCol=None, outputCol=None):
    super(TransformIntoBoolean, self).__init__()
    kwargs = self._input_kwargs
    self.setParams(**kwargs)
     
  @keyword_only
  def setParams(self, inputCol=None, outputCol=None):
      """Get params."""
      kwargs = self._input_kwargs
      return self._set(**kwargs)  
    
  def _transform(self, dataframe):
    out_col = self.getOutputCol()
    in_col = dataframe[self.getInputCol()]
    return (dataframe
              .withColumn(out_col, (F.when(in_col.isNull() , False)
                                .otherwise(True))))


In [50]:
# Adding Transformers for Converting Columns into Boolean to stages
for cl in coulmns_to_convert_to_Bool:
  transformToBoolean = TransformIntoBoolean(inputCol=cl, outputCol=cl)
  stages += [transformToBoolean]

In [51]:
# Testing the TransformIntoBoolean custom transformer on column "i_1"
from pyspark.ml import Pipeline
transformToBooleanTest = TransformIntoBoolean(inputCol="i_1", outputCol="i_1_bool")
bool_pipeline = Pipeline(stages=[transformToBooleanTest])
bool_pipeline_model = bool_pipeline.fit(criteo_small_Parquet_DF)
data = bool_pipeline_model.transform(criteo_small_Parquet_DF)
display(data.select(["i_1","i_1_bool"]))

i_1,i_1_bool
1.0,True
0.0,True
,False
1.0,True
,False
0.0,True
0.0,True
0.0,True
3.0,True
,False


### Replace null values in the integer coulmns with the mean based on the label.
### Replace null values in the categorical coulmns with the mode based on the label.


> <br>

In [53]:
# Filtering the rows into two sets clicked on adds and unclicked on adds
# from the parquet file
from pyspark.sql.functions import col
critieo_clicked=criteo_small_Parquet_DF.filter(col("label")==1)
critieo_unclicked=criteo_small_Parquet_DF.filter(col("label")==0)

In [54]:
# Finding the mean for each coulmn twice once if the label is 1 and once when the label is 0
# Using the parquet file
# Adding the name of the coulmn as a key and the mean as the value in a dictionary mean_mode_clicked_d, mean_mode_unclicked_d
from pyspark.sql.functions import col, avg,mean
ci=["i_2","i_3","i_4","i_5","i_6","i_7","i_8","i_9","i_11","i_13"]
mean_mode_clicked_d=dict()
mean_mode_unclicked_d=dict()
for i in ci:
  mean_mode_clicked_d[i]=critieo_clicked.where(col(i).isNotNull()).select(mean(i)).first()[0] 
  mean_mode_unclicked_d[i]=critieo_unclicked.where(col(i).isNotNull()).select(mean(i)).first()[0] 

print(mean_mode_clicked_d)
print(mean_mode_unclicked_d)



In [55]:

# Finding the mode for each coulmn twice once if the label is 1 and once when the label is 0
# using the parquet file
# adding the name of the coulmn as a key and the mean as the value in a dictionary mean_mode_clicked_d, mean_mode_unclicked_d
from pyspark.sql.functions import col
cc=["c_1","c_2","c_3","c_4","c_5","c_6","c_7","c_8","c_9","c_10","c_11","c_12","c_13","c_14","c_15","c_16","c_17","c_18","c_21","c_23","c_24"] #categorical coulmns

for i in cc:
  mean_mode_clicked_d[i]=critieo_clicked.filter(col(i) !="Null").groupby(i).count().orderBy("count", ascending=False).first()[0]
  mean_mode_unclicked_d[i]=critieo_unclicked.filter(col(i) !="Null").groupby(i).count().orderBy("count", ascending=False).first()[0]
  
print(mean_mode_clicked_d)
print(mean_mode_unclicked_d)

### Custom Transformer for Replacing Null Values with Mean or Mode based on the Class Label 


> <br>

In [57]:
# Custom Transfromer for replacing null values with mean r mode based on the class label 

class ReplaceNullWithValue(Transformer, HasInputCol, HasOutputCol):
  
  @keyword_only
  def __init__(self, inputCol=None, outputCol=None,label = None , value1 = None, value2 = None):
      super(ReplaceNullWithValue, self).__init__()
      self.label = Param(self, "label", "Label coulmn of the data")
      self._setDefault(label="label")
      self.value1 = Param(self, "value1", "mean1 or mode1 for the coulmn where label =1")
      self.value2 = Param(self, "value2", "mean2 or mode2 for the coulmn where label =0")
      kwargs = self._input_kwargs
      self.setParams(**kwargs)
    
  @keyword_only
  def setParams(self, inputCol=None, outputCol=None,label = None , value1 = None, value2 = None):
      """Get params."""
      kwargs = self._input_kwargs
      print(kwargs)
      return self._set(**kwargs)  
  
  def setLabel(self, value):
      return self._set(label=value)

  def getLabel(self):
      return self.getOrDefault(self.label)

  def setValue1(self, value):
      return self._set(value1=value)

  def getValue1(self):
      return self.getOrDefault(self.value1)    

  def setValue2(self, value):
      return self._set(value2=value)

  def getValue2(self):
      return self.getOrDefault(self.value2)    
      
            
  def _transform(self, dataframe):
      out_col = self.getOutputCol()
      in_col = dataframe[self.getInputCol()]
      label_col = dataframe[self.getLabel()]
      return (dataframe.withColumn(out_col, 
                                   (F.when( ((in_col.isNull()) & (label_col == 1)) , self.getValue1()).
                                      when( ((in_col.isNull()) & (label_col == 0)) , self.getValue2()).otherwise(in_col))))

In [58]:
print(criteo_small_Parquet_DF.columns)

In [59]:
# Adding Transformers for replacing null values from categorical coulmns to mode

cc=["c_1","c_2","c_3","c_4","c_5","c_6","c_7","c_8","c_9","c_10","c_11","c_12","c_13","c_14","c_15","c_16","c_17","c_18","c_21","c_23","c_24"] #categorical coulmns
for cl in cc:
  replaceNullWithMode = ReplaceNullWithValue(inputCol=cl, outputCol=cl, label = "label", value1 = mean_mode_clicked_d[cl] ,value2 = mean_mode_unclicked_d[cl] )
  stages += [replaceNullWithMode]

In [60]:
# Adding Transformers for replacing null values from integer coulmns to mean

ci=["i_2","i_3","i_4","i_5","i_6","i_7","i_8","i_9","i_11","i_13"]
for cl in ci:
  replaceNullWithmean = ReplaceNullWithValue(inputCol=cl, outputCol=cl, label = "label", value1 = mean_mode_clicked_d[cl] ,value2 = mean_mode_unclicked_d[cl] )
  stages += [replaceNullWithmean]

In [61]:
# Testing the ReplaceNullWithMean custom transformer on column "i_2"
replaceNullWithMeanTest = ReplaceNullWithValue(inputCol="i_2", outputCol="i_2", label = "label", value1 = mean_mode_clicked_d["i_2"] ,value2 = mean_mode_unclicked_d["i_2"] )
mean_pipeline = Pipeline(stages=[replaceNullWithMeanTest])
mean_pipeline_model = mean_pipeline.fit(criteo_small_Parquet_DF)
data = mean_pipeline_model.transform(criteo_small_Parquet_DF)
display(data)

label,i_1,i_2,i_3,i_4,i_5,i_6,i_7,i_8,i_9,i_10,i_11,i_13,c_1,c_2,c_3,c_4,c_5,c_6,c_7,c_8,c_9,c_10,c_11,c_12,c_13,c_14,c_15,c_16,c_17,c_18,c_19,c_20,c_21,c_23,c_24,c_25,c_26
1,1.0,4.0,2.0,0.0,0.0,0.0,1.0,0,0.0,1.0,1.0,0.0,68fd1e64,2c16a946,503b9dbc,e4dbea90,f3474129,13718bbd,38eb9cf4,1f89b562,a73ee510,547c0ffe,bc8c9f21,60ab2f07,46f42a63,07d13a8f,18231224,e6b6bdc7,e5ba7672,74ef3502,,,5316a17f,32c7478e,9117a34a,,
1,0.0,-1.0,,,1465.0,0.0,17.0,0,4.0,0.0,4.0,,241546e0,38a947a1,fa673455,6a14f9b9,25c83c98,fe6b92e5,1c86e0eb,1f89b562,a73ee510,e7ba2569,755e4a50,208d9687,5978055e,07d13a8f,5182f694,f8b34416,e5ba7672,e5f8f18f,,,f3ddd519,32c7478e,b34f3128,,
1,,2.0,11.0,5.0,10262.0,34.0,2.0,4,5.0,,1.0,5.0,be589b51,287130e0,cd7a7a22,fb7334df,25c83c98,,6cdb3998,361384ce,a73ee510,3ff10fb2,5874c9c9,976cbd4c,740c210d,1adce6ef,310d155b,07eb8110,07c540c4,891589e7,18259a83,a458ea53,a0ab60ca,32c7478e,a052b1ed,9b3e8820,8967c0d2
1,1.0,987.0,,2.0,105.0,2.0,1.0,2,2.0,1.0,1.0,2.0,68fd1e64,38d50e09,da603082,431a5096,43b19349,7e0ccccf,3f35b640,0b153874,a73ee510,3b08e48b,3d5fb018,6aaab577,94172618,07d13a8f,ee569ce2,2f03ef40,d4bb7bd8,582152eb,21ddcdc9,b1252a9d,3b203ca1,32c7478e,b21dc903,001f3601,aa5f0a15
1,,47.0,,0.0,6399.0,38.0,19.0,10,143.0,,10.0,6.0,1464facd,38a947a1,223b0e16,ca55061c,25c83c98,7e0ccccf,6933dec1,5b392875,a73ee510,3b08e48b,860c302b,156f99ef,30735474,1adce6ef,0e78291e,5fbf4a84,e5ba7672,1999bae9,,,deb9605d,32c7478e,e448275f,,
1,0.0,1.0,20.0,16.0,1548.0,93.0,42.0,32,912.0,0.0,15.0,16.0,8cf07265,942f9a8d,a8e40bcf,0365276a,25c83c98,7e0ccccf,3f4ec687,1f89b562,a73ee510,726f00fd,c4adf918,27c604a6,85dbe138,07d13a8f,a8e962af,c449f783,27c07bd6,1f868fdd,21ddcdc9,a458ea53,7eee76d1,32c7478e,9af06ad9,9d93af03,cdfe5ab7
1,0.0,20.0,2.0,2.0,7188.0,170.0,2.0,3,24.0,0.0,2.0,2.0,68fd1e64,38a947a1,ee6e4611,30d9fc77,4cf72387,7e0ccccf,bf9d4f90,0b153874,a73ee510,b7c4dad5,81cae03e,5332e3fb,d413ef3e,07d13a8f,a6d97bf2,ec676ace,3486227d,02e8d897,,,b055c31b,3a171ecb,ae2cd100,,
1,0.0,78.0,2.0,15.0,4311.0,85.0,4.0,18,230.0,0.0,3.0,15.0,68fd1e64,1287a654,5ed035c9,5b5365b2,4cf72387,6f6d9be8,1b1aa9ea,0b153874,a73ee510,c3e69838,7a3651f5,df8b1dea,95bc260c,b28479f6,ced5be3a,4cc0abe4,e5ba7672,df00d249,,,f520f961,32c7478e,27b60b01,,
1,3.0,0.0,4.0,13.0,224.0,28.0,3.0,35,27.0,1.0,1.0,13.0,05db9164,90081f33,993f507e,14a74146,25c83c98,13718bbd,dc7659bd,0b153874,a73ee510,03e48276,e51ddf94,18fe7085,3516f6e6,64c94865,98995c3b,8c48eb08,e5ba7672,7181ccc8,,,2ed6b316,3a171ecb,abf08f1b,,
1,,277.0,,3.0,7318.0,24.0,6.0,3,98.0,,1.0,3.0,8cf07265,9adf4cf9,2e76fb61,0b1ad9da,4cf72387,fe6b92e5,75dcaaca,0b153874,a73ee510,3b08e48b,8aabdae8,9886a0a7,edcf17ce,07d13a8f,2aaebd23,338c0d09,e5ba7672,c7dbecd5,,,60d2d691,3a171ecb,90b6276f,,


In [62]:
# Testing the ReplaceNullWithMode custom transformer on column "c_1"
replaceNullWithModeTest = ReplaceNullWithValue(inputCol="c_1", outputCol="c_1", label = "label", value1 = mean_mode_clicked_d["c_1"] ,value2 = mean_mode_unclicked_d["c_1"] )
mode_pipeline = Pipeline(stages=[replaceNullWithModeTest])
mode_pipeline_model = mode_pipeline.fit(criteo_small_Parquet_DF)
data = mode_pipeline_model.transform(criteo_small_Parquet_DF)
display(data.select(["c_1"]))

c_1
68fd1e64
241546e0
be589b51
68fd1e64
1464facd
8cf07265
68fd1e64
68fd1e64
05db9164
8cf07265


### Replace outliers with the mean.


> <br>

In [64]:
# Find outliers in the data
# calculate the IQR and the upper and lower limit for each column
bounds = {
    c: dict(
        zip(["q1", "q3"], criteo_small_Parquet_DF.approxQuantile(c, [0.25, 0.75], 0))
    )
    for c in ["i_2","i_3","i_4","i_5","i_6","i_7","i_8","i_9","i_11","i_13"]
}

for c in bounds:
    iqr = bounds[c]['q3'] - bounds[c]['q1']
    bounds[c]['lower'] = bounds[c]['q1'] - (iqr * 1.5)
    bounds[c]['upper'] = bounds[c]['q3'] + (iqr * 1.5)
print(bounds)

In [65]:
# Finding the mean for the whole coulmn regardless of the label
from pyspark.sql.functions import col, avg, mean
ci=["i_2","i_3","i_4","i_5","i_6","i_7","i_8","i_9","i_11","i_13"]
mean_data=dict()
for i in ci:
  mean_data[i]=round(criteo_small_Parquet_DF.where(col(i).isNotNull()).select(mean(i)).first()[0] )

print(mean_data)

### Custom Transformer for Replacing Outliers with the Mean.


> <br>

In [67]:
# Adding a Transformer for replacing outliers with the mean
class ReplaceOutliersWithMean(Transformer, HasInputCol, HasOutputCol):
  
  @keyword_only
  def __init__(self, inputCol=None, outputCol=None, mean =None, upper = None, lower =None):
      super(ReplaceOutliersWithMean, self).__init__()
      self.mean = Param(self, "mean", "mean for the coulmn")
      self.upper = Param(self, "upper", "upper bound")
      self.lower = Param(self, "lower", "lower bound")
      kwargs = self._input_kwargs
      self.setParams(**kwargs)


  @keyword_only
  def setParams(self, inputCol=None, outputCol=None, mean =None, upper = None, lower =None):
      """Get params."""
      kwargs = self._input_kwargs
      return self._set(**kwargs)  
 
  def setMean(self, value):
      return self._set(mean=value)

  def getMean(self):
      return self.getOrDefault(self.mean)  
    
  def setUpper(self, value):
      return self._set(upper=value)

  def getUpper(self):
      return self.getOrDefault(self.upper)
  
  def setLower(self, value):
      return self._set(lower=value)

  def getLower(self):
      return self.getOrDefault(self.lower)  
    
  def _transform(self, dataframe):
      """Do transformation."""
      out_col = self.getOutputCol()
      in_col = dataframe[self.getInputCol()]
      return (dataframe
                .withColumn(out_col, (F.when( (in_col > self.getUpper())  | (in_col < self.getLower()), self.getMean())
                                      .otherwise(in_col))))

In [68]:
# Testing the ReplaceOutliersWithMean custom transformer on column "i_3"

replaceOutlierWithMeanTest = ReplaceOutliersWithMean(inputCol="i_3", outputCol= "i_3_capped", mean= mean_data["i_3"] , upper = bounds["i_3"]["upper"] , lower = bounds["i_3"]["lower"])
outlier_pipeline = Pipeline(stages=[replaceOutlierWithMeanTest])
outlier_pipeline_model = outlier_pipeline.fit(criteo_small_Parquet_DF)
data = outlier_pipeline_model.transform(criteo_small_Parquet_DF)
display(data.select(["i_3","i_3_capped"]))

i_3,i_3_capped
5.0,5.0
44.0,27.0
1.0,1.0
,
,
,
2.0,2.0
2.0,2.0
4.0,4.0
,


In [69]:
# Replacing the outliers with the mean 
from pyspark.sql.functions import col, expr, when
from pyspark.sql import functions as F
for cl in ["i_2","i_3","i_4","i_5","i_6","i_7","i_8","i_9","i_11","i_13"]:
  replaceOutlierWithMean = ReplaceOutliersWithMean(inputCol=cl, outputCol=cl, mean=mean_data[cl] , upper =bounds[cl]['upper'] , lower = bounds[cl]['lower'])
  stages += [replaceOutlierWithMean]


### Counting Distinct Values in Categorical Columns


> <br>

In [71]:
# Count the distinct values for the categorical coulmns c_1 to c_26 excluding the c22 because we dropped it
columns=["c_1","c_2","c_3","c_4","c_5","c_6","c_7","c_8","c_9","c_10","c_11","c_12","c_13","c_14","c_15","c_16","c_17","c_18","c_21","c_23","c_24"]
critieo_cat_distinct=dict()
for col in columns:
  critieo_cat_distinct[col]=criteo_small_Parquet_DF.select(col).distinct().count()
  
print(critieo_cat_distinct)  

In [72]:
# Sort the dictionary
# Count the distinct values for the categorical coulmns c_1 to c_26 excluding the c22
sort_critieo_cat_distinct = sorted(critieo_cat_distinct.items(), key=lambda x: x[1], reverse=True)
for item in sort_critieo_cat_distinct:
  print(item)

### Transform the categorical values (strings) into indexes


> <br>

In [74]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer

In [75]:
# Used stringIndexer() on categorical columns (high cardinality)
cat_Col_H = ["c_1","c_2","c_3","c_4","c_5","c_7","c_8","c_10","c_11","c_12","c_13","c_15","c_16","c_18","c_21","c_24"]
indexers = [StringIndexer(inputCol=column, outputCol=column+"_indexH").fit(criteo_small_Parquet_DF) for column in cat_Col_H ]


stages += indexers



In [76]:
# Used stringIndexer() on categorical columns (low cardinality)
cat_Col_L = ["c_6","c_9","c_14","c_17","c_23"]
indexers = [StringIndexer(inputCol=column, outputCol=column+"_indexL").fit(criteo_small_Parquet_DF) for column in cat_Col_L ]
stages += indexers


In [77]:
print(criteo_small_Parquet_DF.columns)

In [78]:
criteo_small_Parquet_DF.rdd.take(1)

In [79]:
# checking the mleap version
import mleap
print (mleap.__version__)

### Z Scoring The Values of the Integer Columns


> <br>

### Custom Transformer for Z Scoring The Values of the Integer Columns


> <br>

In [82]:
from pyspark.sql.functions import col, stddev_samp,stddev,mean

class ZScoring(Transformer, HasInputCol, HasOutputCol):
  
  @keyword_only
  def __init__(self, inputCol=None, outputCol=None):
      super(ZScoring, self).__init__()
      kwargs = self._input_kwargs
      self.setParams(**kwargs)
   
  @keyword_only
  def setParams(self, inputCol=None, outputCol=None):
      """Get params."""
      kwargs = self._input_kwargs
      return self._set(**kwargs)  
    
  
  def _transform(self, dataframe):
      """Do transformation."""
      out_col = self.getOutputCol()
      in_col = dataframe[self.getInputCol()]
      col_mean = dataframe.select(mean(in_col)).first()[0]
      stdv= dataframe.agg(stddev(in_col)).first()[0]
      return (dataframe.withColumn(out_col, (in_col - col_mean) / stdv))

In [83]:
# Testing the zscoring custom transformer on column "i_2"
ZScoringTest = ZScoring(inputCol="i_2", outputCol="i_2_scaled" )
ZScoringTest_pipeline = Pipeline(stages=[ZScoringTest])
ZScoringTest_pipeline_model = ZScoringTest_pipeline.fit(criteo_small_Parquet_DF)
data = ZScoringTest_pipeline_model.transform(criteo_small_Parquet_DF)
display(data.select(["i_2","i_2_scaled"]))

i_2,i_2_scaled
1,-0.2678409108750413
0,-0.2703954645000928
0,-0.2703954645000928
893,2.010820922670842
-1,-0.2729500181251442
-1,-0.2729500181251442
1,-0.2678409108750413
4,-0.260177249999887
44,-0.1579951049978294
35,-0.1809860876232924


In [84]:
# z-scoring (scalling) the integer coulmns "i_2","i_3","i_4","i_5","i_6","i_7","i_8","i_9","i_11","i_13"
from pyspark.sql.functions import col, stddev_samp,stddev
col_to_scale=["i_2","i_3","i_4","i_5","i_6","i_7","i_8","i_9","i_11","i_13"]
for cl in col_to_scale:
  zSoring = ZScoring(inputCol=cl, outputCol=cl+"_scaled")
  stages += [zSoring]
  

In [85]:
print(criteo_small_Parquet_DF.columns)

In [86]:
criteo_small_Parquet_DF.rdd.take(1)

### Binning The Values of the Integer Columns


> <br>

In [88]:
# Binning( bucketizing the values)
from pyspark.ml.feature import Bucketizer
col_to_bin=["i_2","i_3","i_4","i_5","i_6","i_7","i_8","i_9","i_11","i_13"]
splits = [-float('inf'), -.5, 0.0, .5, float('inf')]
for cl in col_to_bin:
  bucketizer=Bucketizer(splits=splits,inputCol=cl+"_scaled",outputCol=cl+"_bucket")
  stages += [bucketizer]
  

In [89]:
criteo_small_Parquet_DF.rdd.take(1)

### One Hot Code Encoding The Low Dimensional Categorical Columns (low cardinality)


> <br>

In [91]:
# One hot encoding only the low dimentional categorical values
from pyspark.ml.feature import OneHotEncoder

for cl in ["c_6_indexL","c_9_indexL","c_14_indexL","c_17_indexL","c_23_indexL"]:
  
  OHE = (OneHotEncoder()
                .setInputCol(cl)
                .setOutputCol(cl[:cl.index("_index")]+"_OHE"))

  stages += [OHE]


In [92]:
from pyspark.sql.functions import col
criteo_small_Parquet_DF.filter(col('label')==1).count()

### Select The Relevant Columns To Use for Assembling Into a Vector


> <br>

In [94]:
print(criteo_small_Parquet_DF.columns)

In [95]:
# Select The Relevant Columns To Use for Assembling Into a Vector
catgoricalLowDimention = ["c_6_OHE","c_9_OHE","c_14_OHE","c_17_OHE","c_23_OHE"]
catgoricalHighDimention = ["c_1_indexH","c_2_indexH","c_3_indexH","c_4_indexH","c_5_indexH","c_7_indexH","c_8_indexH","c_10_indexH","c_11_indexH","c_12_indexH","c_13_indexH","c_15_indexH","c_16_indexH","c_18_indexH","c_21_indexH","c_24_indexH"]
booleanCoulmns = ['i_1', 'i_10', 'c_19', 'c_20', 'c_25', 'c_26']
bucketizedCoulmns = ["i_2_bucket","i_3_bucket","i_4_bucket","i_5_bucket","i_6_bucket","i_7_bucket","i_8_bucket","i_9_bucket","i_11_bucket","i_13_bucket"]

relevantCols = bucketizedCoulmns + booleanCoulmns + catgoricalHighDimention + catgoricalLowDimention

### Vector assembler the Relevant Columns into Features column


> <br>

In [97]:
# Let us import the vector assembler
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=relevantCols,outputCol="features")
stages += [assembler]


### Running The Vector Assembler Pipeline


> <br>

In [99]:
# Create a Pipeline and fit & transform the pipline to the dataset.
from pyspark.ml import Pipeline


pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(criteo_small_Parquet_DF)
criteo_small_Parquet_DF = pipelineModel.transform(criteo_small_Parquet_DF)


In [100]:
print(criteo_small_Parquet_DF.columns)

In [101]:
criteo_small_Parquet_DF.rdd.take(1)

In [102]:
dataset = criteo_small_Parquet_DF.select(["label","features"])
dataset.rdd.take(1)

### Split The Data into Training and Testing Set


> <br>

In [104]:
# Splitting the data into training and testing sets
criteoTest, criteoTrain = criteo_small_Parquet_DF.randomSplit([.25, .75], seed=0)

# Cache as we'll be using these several times
criteoTest.cache()
criteoTrain.cache()

print('Items in test datset: {0}'.format(criteoTest.count()))
print('Items in train dataset: {0}'.format(criteoTrain.count()))

In [105]:
# Cast the features coulmn data type to String type to be easily handeled in python (producer/consumer)
from pyspark.sql.functions import col

criteoTest=criteoTest.withColumn("features1",col("features").cast(StringType()))



In [106]:
print(criteoTest.filter(col('label')==1).count())
print(criteoTest.filter(col('label')==0).count())
print(criteoTrain.filter(col('label')==1).count())
print(criteoTrain.filter(col('label')==0).count())


### Save The Training and Testing Set


> <br>

In [108]:
# Save the train data
criteoTrainParquet = "/tmp/criteosmall_train.parquet"

(criteoTrain.write                       # Our DataFrameWriter
  .option("delimiter", "\t")  
  .option("compression", "snappy")
  .mode("overwrite")                       # Replace existing files
  .parquet(criteoTrainParquet)               # Write DataFrame to parquet files
)



criteoTestParquet = "/FileStore/test/criteosmall_test.parquet"

(criteoTest.coalesce(1).write                       # Our DataFrameWriter
  .option("delimiter", "\t")  
  .option("compression", "gzip")
  .mode("overwrite")                       # Replace existing files
  .parquet(criteoTestParquet)               # Write DataFrame to parquet files
)

In [109]:
# Save the test data
criteoTestParquet = "/FileStore/test/criteosmall_test_uncompressed.parquet"

(criteoTest.coalesce(1).write                       # Our DataFrameWriter
  .option("delimiter", "\t")  
  .option("compression", "uncompressed")
  .mode("overwrite")                       # Replace existing files
  .parquet(criteoTestParquet)               # Write DataFrame to parquet files
)

In [110]:
newTestData=criteoTest["label","features1"]
display(newTestData)

label,features1
0,"(110,[12,15,16,17,18,19,20,21,22,23,24,27,29,30,31,32,33,34,35,36,37,59,62,89,96],[5.0,954.0,4.0,1.0,5.0,1.0,1401.0,1.0,2.0,450.0,1.0,3.0,1.0,3.0,3.0,3.0,2.0,3.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0])"
0,"(110,[12,13,15,16,17,18,20,21,23,24,27,28,30,31,34,37,59,61,87,96],[5.0,1.0,1476.0,4.0,1.0,5.0,588.0,1.0,82.0,1.0,2.0,3.0,3.0,3.0,3.0,1.0,1.0,1.0,1.0,1.0])"
0,"(110,[12,13,15,16,17,18,20,21,23,24,29,30,31,33,34,36,37,59,61,87,96],[5.0,2.0,141.0,4.0,1.0,5.0,6273.0,1.0,1486.0,1.0,3.0,1.0,2.0,3.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0])"
0,"(110,[12,15,16,17,18,19,20,21,23,24,27,29,31,36,37,59,63,87,96],[6.0,65.0,5.0,1.0,8.0,2.0,977.0,1.0,687.0,1.0,1.0,1.0,3.0,1.0,1.0,1.0,1.0,1.0,1.0])"
0,"(110,[12,13,15,16,17,18,20,21,23,24,27,28,29,30,32,34,35,36,37,59,61,87,96],[6.0,13.0,238.0,5.0,1.0,8.0,335.0,1.0,82.0,1.0,3.0,3.0,3.0,1.0,2.0,1.0,2.0,3.0,1.0,1.0,1.0,1.0,1.0])"
0,"(110,[12,15,16,17,18,20,21,23,24,25,30,31,34,37,59,61,87,98],[9.0,108.0,17.0,1.0,16.0,1305.0,1.0,173.0,1.0,2.0,3.0,3.0,1.0,1.0,1.0,1.0,1.0,1.0])"
0,"(110,[12,13,15,16,17,18,20,21,22,23,24,28,29,31,36,37,59,61,94,96],[10.0,1.0,13.0,3.0,1.0,2.0,588.0,1.0,7.0,82.0,1.0,3.0,3.0,3.0,3.0,1.0,1.0,1.0,1.0,1.0])"
0,"(110,[12,15,16,17,18,19,20,21,22,23,24,30,31,32,33,34,35,37,59,62,92,96],[13.0,1026.0,7.0,1.0,4.0,1.0,5268.0,1.0,5.0,1097.0,1.0,1.0,1.0,3.0,1.0,3.0,3.0,1.0,1.0,1.0,1.0,1.0])"
0,"(110,[12,14,16,17,18,20,21,22,23,24,28,29,30,31,32,33,34,35,36,37,60,61,95,96],[14.0,1.0,29.0,1.0,10.0,3820.0,1.0,8.0,2056.0,1.0,3.0,3.0,3.0,3.0,3.0,2.0,3.0,2.0,3.0,1.0,1.0,1.0,1.0,1.0])"
0,"(110,[12,14,16,17,18,20,21,22,23,24,28,29,30,31,32,34,35,36,37,60,61,95,96],[14.0,1.0,29.0,1.0,10.0,4868.0,1.0,8.0,1921.0,1.0,3.0,3.0,3.0,3.0,3.0,3.0,2.0,3.0,1.0,1.0,1.0,1.0,1.0])"


### Save Only The Two Relevant Columns (label, features) in The Testing Set


> <br>

In [112]:
# Save the test data features1 and label only

criteoTestParquet = "/FileStore/test/criteofeatures_label_test_uncompressed.parquet"

(newTestData.coalesce(1).write                       # Our DataFrameWriter
  .option("delimiter", "\t")  
  .option("compression", "uncompressed")
  .mode("overwrite")                       # Replace existing files
  .parquet(criteoTestParquet)               # Write DataFrame to parquet files
)

### Logestic Regression Model


> <br>

In [114]:
# logestic regression model
# fit the model on the train data

from pyspark.ml.classification import LogisticRegression

lr = (LogisticRegression()
      .setFeaturesCol('features')
      .setRegParam(0.0)
      .setLabelCol('label')
      .setMaxIter(10000))

ModelLR = lr.fit(criteoTrain)


critieoTestPredictions_lr = (ModelLR
                       .transform(criteoTest)
                       .cache())


### Saving The Logestic Regression Model


> <br>

In [116]:
import mleap.pyspark
from mleap.pyspark.spark_support import SimpleSparkSerializer

ModelLR.serializeToBundle("jar:file:/tmp/LR_model.zip", critieoTestPredictions_lr)

In [117]:
dbutils.fs.cp("file:/tmp/LR_model.zip", "dbfs:/FileStore/lr_model.zip")
display(dbutils.fs.ls("dbfs:/FileStore"))

path,name,size
dbfs:/FileStore/jars/,jars/,0
dbfs:/FileStore/lr_model.zip,lr_model.zip,2152
dbfs:/FileStore/model_export/,model_export/,0
dbfs:/FileStore/tables/,tables/,0
dbfs:/FileStore/test/,test/,0


In [118]:
display(critieoTestPredictions_lr)

label,i_1,i_10,c_19,c_20,c_25,c_26,c_1_index,c_2_index,c_3_index,c_4_index,c_5_index,c_6_index,c_7_index,c_8_index,c_9_index,c_10_index,c_11_index,c_12_index,c_13_index,c_14_index,c_15_index,c_16_index,c_17_index,c_18_index,c_21_index,c_23_index,c_24_index,i_2_bucket,i_3_bucket,i_4_bucket,i_5_bucket,i_6_bucket,i_7_bucket,i_8_bucket,i_9_bucket,i_11_bucket,i_13_bucket,c_6_OHE,c_9_OHE,c_14_OHE,c_17_OHE,c_23_OHE,features,rawPrediction,probability,prediction
0,False,False,False,False,False,False,0,0,0,0,0,0,5,0,0,954,4,1,5,1,1401,1,2,450,1,0,0,3.0,0.0,1.0,3.0,3.0,3.0,2.0,3.0,2.0,1.0,"List(0, 22, List(0), List(1.0))","List(0, 2, List(0), List(1.0))","List(0, 26, List(1), List(1.0))","List(0, 9, List(2), List(1.0))","List(0, 14, List(0), List(1.0))","List(0, 110, List(12, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 27, 29, 30, 31, 32, 33, 34, 35, 36, 37, 59, 62, 89, 96), List(5.0, 954.0, 4.0, 1.0, 5.0, 1.0, 1401.0, 1.0, 2.0, 450.0, 1.0, 3.0, 1.0, 3.0, 3.0, 3.0, 2.0, 3.0, 2.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","List(1, 2, List(), List(0.7493866476334627, -0.7493866476334627))","List(1, 2, List(), List(0.6790450380801913, 0.32095496191980866))",0.0
0,False,False,False,False,False,False,0,0,0,0,0,0,5,1,0,1476,4,1,5,0,588,1,0,82,1,0,0,2.0,3.0,0.0,3.0,3.0,0.0,0.0,3.0,0.0,0.0,"List(0, 22, List(0), List(1.0))","List(0, 2, List(0), List(1.0))","List(0, 26, List(0), List(1.0))","List(0, 9, List(0), List(1.0))","List(0, 14, List(0), List(1.0))","List(0, 110, List(12, 13, 15, 16, 17, 18, 20, 21, 23, 24, 27, 28, 30, 31, 34, 37, 59, 61, 87, 96), List(5.0, 1.0, 1476.0, 4.0, 1.0, 5.0, 588.0, 1.0, 82.0, 1.0, 2.0, 3.0, 3.0, 3.0, 3.0, 1.0, 1.0, 1.0, 1.0, 1.0))","List(1, 2, List(), List(-0.12542943127978012, 0.12542943127978012))","List(1, 2, List(), List(0.46868368851795317, 0.5313163114820469))",1.0
0,False,False,False,False,False,False,0,0,0,0,0,0,5,2,0,141,4,1,5,0,6273,1,0,1486,1,0,0,0.0,0.0,3.0,1.0,2.0,0.0,3.0,1.0,0.0,2.0,"List(0, 22, List(0), List(1.0))","List(0, 2, List(0), List(1.0))","List(0, 26, List(0), List(1.0))","List(0, 9, List(0), List(1.0))","List(0, 14, List(0), List(1.0))","List(0, 110, List(12, 13, 15, 16, 17, 18, 20, 21, 23, 24, 29, 30, 31, 33, 34, 36, 37, 59, 61, 87, 96), List(5.0, 2.0, 141.0, 4.0, 1.0, 5.0, 6273.0, 1.0, 1486.0, 1.0, 3.0, 1.0, 2.0, 3.0, 1.0, 2.0, 1.0, 1.0, 1.0, 1.0, 1.0))","List(1, 2, List(), List(0.4055567810273656, -0.4055567810273656))","List(1, 2, List(), List(0.6000220012989006, 0.3999779987010994))",0.0
0,False,False,False,False,False,False,0,0,0,0,0,0,6,0,0,65,5,1,8,2,977,1,0,687,1,0,0,1.0,0.0,1.0,0.0,3.0,0.0,0.0,0.0,0.0,1.0,"List(0, 22, List(0), List(1.0))","List(0, 2, List(0), List(1.0))","List(0, 26, List(2), List(1.0))","List(0, 9, List(0), List(1.0))","List(0, 14, List(0), List(1.0))","List(0, 110, List(12, 15, 16, 17, 18, 19, 20, 21, 23, 24, 27, 29, 31, 36, 37, 59, 63, 87, 96), List(6.0, 65.0, 5.0, 1.0, 8.0, 2.0, 977.0, 1.0, 687.0, 1.0, 1.0, 1.0, 3.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","List(1, 2, List(), List(0.6929106313128223, -0.6929106313128223))","List(1, 2, List(), List(0.6666140980950417, 0.3333859019049582))",0.0
0,False,False,False,False,False,False,0,0,0,0,0,0,6,13,0,238,5,1,8,0,335,1,0,82,1,0,0,3.0,3.0,3.0,1.0,0.0,2.0,0.0,1.0,2.0,3.0,"List(0, 22, List(0), List(1.0))","List(0, 2, List(0), List(1.0))","List(0, 26, List(0), List(1.0))","List(0, 9, List(0), List(1.0))","List(0, 14, List(0), List(1.0))","List(0, 110, List(12, 13, 15, 16, 17, 18, 20, 21, 23, 24, 27, 28, 29, 30, 32, 34, 35, 36, 37, 59, 61, 87, 96), List(6.0, 13.0, 238.0, 5.0, 1.0, 8.0, 335.0, 1.0, 82.0, 1.0, 3.0, 3.0, 3.0, 1.0, 2.0, 1.0, 2.0, 3.0, 1.0, 1.0, 1.0, 1.0, 1.0))","List(1, 2, List(), List(-0.6355677144371348, 0.6355677144371348))","List(1, 2, List(), List(0.34624914966214954, 0.6537508503378505))",1.0
0,False,False,False,False,False,False,0,0,0,0,0,0,9,0,0,108,17,1,16,0,1305,1,0,173,1,2,0,0.0,0.0,0.0,3.0,3.0,0.0,0.0,1.0,0.0,0.0,"List(0, 22, List(0), List(1.0))","List(0, 2, List(0), List(1.0))","List(0, 26, List(0), List(1.0))","List(0, 9, List(0), List(1.0))","List(0, 14, List(2), List(1.0))","List(0, 110, List(12, 15, 16, 17, 18, 20, 21, 23, 24, 25, 30, 31, 34, 37, 59, 61, 87, 98), List(9.0, 108.0, 17.0, 1.0, 16.0, 1305.0, 1.0, 173.0, 1.0, 2.0, 3.0, 3.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","List(1, 2, List(), List(0.07549708690766888, -0.07549708690766888))","List(1, 2, List(), List(0.5188653118534058, 0.4811346881465942))",0.0
0,False,False,False,False,False,False,0,0,0,0,0,0,10,1,0,13,3,1,2,0,588,1,7,82,1,0,0,0.0,3.0,3.0,0.0,3.0,0.0,0.0,0.0,0.0,3.0,"List(0, 22, List(0), List(1.0))","List(0, 2, List(0), List(1.0))","List(0, 26, List(0), List(1.0))","List(0, 9, List(7), List(1.0))","List(0, 14, List(0), List(1.0))","List(0, 110, List(12, 13, 15, 16, 17, 18, 20, 21, 22, 23, 24, 28, 29, 31, 36, 37, 59, 61, 94, 96), List(10.0, 1.0, 13.0, 3.0, 1.0, 2.0, 588.0, 1.0, 7.0, 82.0, 1.0, 3.0, 3.0, 3.0, 3.0, 1.0, 1.0, 1.0, 1.0, 1.0))","List(1, 2, List(), List(1.7219654367908346, -1.7219654367908346))","List(1, 2, List(), List(0.8483818238285464, 0.15161817617145354))",0.0
0,False,False,False,False,False,False,0,0,0,0,0,0,13,0,0,1026,7,1,4,1,5268,1,5,1097,1,0,0,0.0,0.0,0.0,1.0,1.0,3.0,1.0,3.0,3.0,0.0,"List(0, 22, List(0), List(1.0))","List(0, 2, List(0), List(1.0))","List(0, 26, List(1), List(1.0))","List(0, 9, List(5), List(1.0))","List(0, 14, List(0), List(1.0))","List(0, 110, List(12, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 30, 31, 32, 33, 34, 35, 37, 59, 62, 92, 96), List(13.0, 1026.0, 7.0, 1.0, 4.0, 1.0, 5268.0, 1.0, 5.0, 1097.0, 1.0, 1.0, 1.0, 3.0, 1.0, 3.0, 3.0, 1.0, 1.0, 1.0, 1.0, 1.0))","List(1, 2, List(), List(-0.7009259566733601, 0.7009259566733601))","List(1, 2, List(), List(0.33160696329869227, 0.6683930367013077))",1.0
0,False,False,False,False,False,False,0,0,0,0,0,0,14,0,1,0,29,1,10,0,3820,1,8,2056,1,0,0,0.0,3.0,3.0,3.0,3.0,3.0,2.0,3.0,2.0,3.0,"List(0, 22, List(0), List(1.0))","List(0, 2, List(1), List(1.0))","List(0, 26, List(0), List(1.0))","List(0, 9, List(8), List(1.0))","List(0, 14, List(0), List(1.0))","List(0, 110, List(12, 14, 16, 17, 18, 20, 21, 22, 23, 24, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 60, 61, 95, 96), List(14.0, 1.0, 29.0, 1.0, 10.0, 3820.0, 1.0, 8.0, 2056.0, 1.0, 3.0, 3.0, 3.0, 3.0, 3.0, 2.0, 3.0, 2.0, 3.0, 1.0, 1.0, 1.0, 1.0, 1.0))","List(1, 2, List(), List(1.4091677448954567, -1.4091677448954567))","List(1, 2, List(), List(0.803634641969405, 0.196365358030595))",0.0
0,False,False,False,False,False,False,0,0,0,0,0,0,14,0,1,0,29,1,10,0,4868,1,8,1921,1,0,0,0.0,3.0,3.0,3.0,3.0,3.0,0.0,3.0,2.0,3.0,"List(0, 22, List(0), List(1.0))","List(0, 2, List(1), List(1.0))","List(0, 26, List(0), List(1.0))","List(0, 9, List(8), List(1.0))","List(0, 14, List(0), List(1.0))","List(0, 110, List(12, 14, 16, 17, 18, 20, 21, 22, 23, 24, 28, 29, 30, 31, 32, 34, 35, 36, 37, 60, 61, 95, 96), List(14.0, 1.0, 29.0, 1.0, 10.0, 4868.0, 1.0, 8.0, 1921.0, 1.0, 3.0, 3.0, 3.0, 3.0, 3.0, 3.0, 2.0, 3.0, 1.0, 1.0, 1.0, 1.0, 1.0))","List(1, 2, List(), List(1.430160582147637, -1.430160582147637))","List(1, 2, List(), List(0.806926335092231, 0.193073664907769))",0.0


### Random Forest model


> <br>

In [120]:
# Train a RandomForest model.

from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=50,maxDepth=25)

Modelrf = rf.fit(criteoTrain)


critieoTestPredictions_rf = (Modelrf
                       .transform(criteoTest)
                       .cache())

# default acc = 0.739
# numTrees=50, maxDepth=10  acc = 0.765
# numTrees=50, maxDepth=20 acc = 0.782
# numTrees=75, maxDepth=20  acc = 0.783
# numTrees=50, maxDepth=25  acc = crashed

In [121]:
# Random Forest Predictions
display(critieoTestPredictions_rf)

label,i_1,i_10,c_19,c_20,c_25,c_26,c_1_index,c_2_index,c_3_index,c_4_index,c_5_index,c_6_index,c_7_index,c_8_index,c_9_index,c_10_index,c_11_index,c_12_index,c_13_index,c_14_index,c_15_index,c_16_index,c_17_index,c_18_index,c_21_index,c_23_index,c_24_index,i_2_bucket,i_3_bucket,i_4_bucket,i_5_bucket,i_6_bucket,i_7_bucket,i_8_bucket,i_9_bucket,i_11_bucket,i_13_bucket,c_6_OHE,c_9_OHE,c_14_OHE,c_17_OHE,c_23_OHE,features,rawPrediction,probability,prediction
0,False,False,False,False,False,False,0,0,0,0,0,0,5,0,0,954,4,1,5,1,1401,1,2,450,1,0,0,3.0,0.0,1.0,3.0,3.0,3.0,2.0,3.0,2.0,1.0,"List(0, 22, List(0), List(1.0))","List(0, 2, List(0), List(1.0))","List(0, 26, List(1), List(1.0))","List(0, 9, List(2), List(1.0))","List(0, 14, List(0), List(1.0))","List(0, 110, List(12, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 27, 29, 30, 31, 32, 33, 34, 35, 36, 37, 59, 62, 89, 96), List(5.0, 954.0, 4.0, 1.0, 5.0, 1.0, 1401.0, 1.0, 2.0, 450.0, 1.0, 3.0, 1.0, 3.0, 3.0, 3.0, 2.0, 3.0, 2.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","List(1, 2, List(), List(6.5046313078031694, 3.4953686921968314))","List(1, 2, List(), List(0.6504631307803169, 0.34953686921968313))",0.0
0,False,False,False,False,False,False,0,0,0,0,0,0,5,1,0,1476,4,1,5,0,588,1,0,82,1,0,0,2.0,3.0,0.0,3.0,3.0,0.0,0.0,3.0,0.0,0.0,"List(0, 22, List(0), List(1.0))","List(0, 2, List(0), List(1.0))","List(0, 26, List(0), List(1.0))","List(0, 9, List(0), List(1.0))","List(0, 14, List(0), List(1.0))","List(0, 110, List(12, 13, 15, 16, 17, 18, 20, 21, 23, 24, 27, 28, 30, 31, 34, 37, 59, 61, 87, 96), List(5.0, 1.0, 1476.0, 4.0, 1.0, 5.0, 588.0, 1.0, 82.0, 1.0, 2.0, 3.0, 3.0, 3.0, 3.0, 1.0, 1.0, 1.0, 1.0, 1.0))","List(1, 2, List(), List(6.7927323159568935, 3.2072676840431074))","List(1, 2, List(), List(0.6792732315956893, 0.32072676840431075))",0.0
0,False,False,False,False,False,False,0,0,0,0,0,0,5,2,0,141,4,1,5,0,6273,1,0,1486,1,0,0,0.0,0.0,3.0,1.0,2.0,0.0,3.0,1.0,0.0,2.0,"List(0, 22, List(0), List(1.0))","List(0, 2, List(0), List(1.0))","List(0, 26, List(0), List(1.0))","List(0, 9, List(0), List(1.0))","List(0, 14, List(0), List(1.0))","List(0, 110, List(12, 13, 15, 16, 17, 18, 20, 21, 23, 24, 29, 30, 31, 33, 34, 36, 37, 59, 61, 87, 96), List(5.0, 2.0, 141.0, 4.0, 1.0, 5.0, 6273.0, 1.0, 1486.0, 1.0, 3.0, 1.0, 2.0, 3.0, 1.0, 2.0, 1.0, 1.0, 1.0, 1.0, 1.0))","List(1, 2, List(), List(7.284974480038865, 2.7150255199611353))","List(1, 2, List(), List(0.7284974480038865, 0.2715025519961135))",0.0
0,False,False,False,False,False,False,0,0,0,0,0,0,6,0,0,65,5,1,8,2,977,1,0,687,1,0,0,1.0,0.0,1.0,0.0,3.0,0.0,0.0,0.0,0.0,1.0,"List(0, 22, List(0), List(1.0))","List(0, 2, List(0), List(1.0))","List(0, 26, List(2), List(1.0))","List(0, 9, List(0), List(1.0))","List(0, 14, List(0), List(1.0))","List(0, 110, List(12, 15, 16, 17, 18, 19, 20, 21, 23, 24, 27, 29, 31, 36, 37, 59, 63, 87, 96), List(6.0, 65.0, 5.0, 1.0, 8.0, 2.0, 977.0, 1.0, 687.0, 1.0, 1.0, 1.0, 3.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","List(1, 2, List(), List(6.270480783175749, 3.7295192168242517))","List(1, 2, List(), List(0.6270480783175749, 0.3729519216824252))",0.0
0,False,False,False,False,False,False,0,0,0,0,0,0,6,13,0,238,5,1,8,0,335,1,0,82,1,0,0,3.0,3.0,3.0,1.0,0.0,2.0,0.0,1.0,2.0,3.0,"List(0, 22, List(0), List(1.0))","List(0, 2, List(0), List(1.0))","List(0, 26, List(0), List(1.0))","List(0, 9, List(0), List(1.0))","List(0, 14, List(0), List(1.0))","List(0, 110, List(12, 13, 15, 16, 17, 18, 20, 21, 23, 24, 27, 28, 29, 30, 32, 34, 35, 36, 37, 59, 61, 87, 96), List(6.0, 13.0, 238.0, 5.0, 1.0, 8.0, 335.0, 1.0, 82.0, 1.0, 3.0, 3.0, 3.0, 1.0, 2.0, 1.0, 2.0, 3.0, 1.0, 1.0, 1.0, 1.0, 1.0))","List(1, 2, List(), List(6.720240092321733, 3.279759907678267))","List(1, 2, List(), List(0.6720240092321733, 0.3279759907678267))",0.0
0,False,False,False,False,False,False,0,0,0,0,0,0,9,0,0,108,17,1,16,0,1305,1,0,173,1,2,0,0.0,0.0,0.0,3.0,3.0,0.0,0.0,1.0,0.0,0.0,"List(0, 22, List(0), List(1.0))","List(0, 2, List(0), List(1.0))","List(0, 26, List(0), List(1.0))","List(0, 9, List(0), List(1.0))","List(0, 14, List(2), List(1.0))","List(0, 110, List(12, 15, 16, 17, 18, 20, 21, 23, 24, 25, 30, 31, 34, 37, 59, 61, 87, 98), List(9.0, 108.0, 17.0, 1.0, 16.0, 1305.0, 1.0, 173.0, 1.0, 2.0, 3.0, 3.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","List(1, 2, List(), List(6.157599198311761, 3.8424008016882394))","List(1, 2, List(), List(0.6157599198311761, 0.38424008016882394))",0.0
0,False,False,False,False,False,False,0,0,0,0,0,0,10,1,0,13,3,1,2,0,588,1,7,82,1,0,0,0.0,3.0,3.0,0.0,3.0,0.0,0.0,0.0,0.0,3.0,"List(0, 22, List(0), List(1.0))","List(0, 2, List(0), List(1.0))","List(0, 26, List(0), List(1.0))","List(0, 9, List(7), List(1.0))","List(0, 14, List(0), List(1.0))","List(0, 110, List(12, 13, 15, 16, 17, 18, 20, 21, 22, 23, 24, 28, 29, 31, 36, 37, 59, 61, 94, 96), List(10.0, 1.0, 13.0, 3.0, 1.0, 2.0, 588.0, 1.0, 7.0, 82.0, 1.0, 3.0, 3.0, 3.0, 3.0, 1.0, 1.0, 1.0, 1.0, 1.0))","List(1, 2, List(), List(8.110478572207516, 1.8895214277924839))","List(1, 2, List(), List(0.8110478572207516, 0.18895214277924838))",0.0
0,False,False,False,False,False,False,0,0,0,0,0,0,13,0,0,1026,7,1,4,1,5268,1,5,1097,1,0,0,0.0,0.0,0.0,1.0,1.0,3.0,1.0,3.0,3.0,0.0,"List(0, 22, List(0), List(1.0))","List(0, 2, List(0), List(1.0))","List(0, 26, List(1), List(1.0))","List(0, 9, List(5), List(1.0))","List(0, 14, List(0), List(1.0))","List(0, 110, List(12, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 30, 31, 32, 33, 34, 35, 37, 59, 62, 92, 96), List(13.0, 1026.0, 7.0, 1.0, 4.0, 1.0, 5268.0, 1.0, 5.0, 1097.0, 1.0, 1.0, 1.0, 3.0, 1.0, 3.0, 3.0, 1.0, 1.0, 1.0, 1.0, 1.0))","List(1, 2, List(), List(5.753406148661773, 4.246593851338228))","List(1, 2, List(), List(0.5753406148661773, 0.42465938513382284))",0.0
0,False,False,False,False,False,False,0,0,0,0,0,0,14,0,1,0,29,1,10,0,3820,1,8,2056,1,0,0,0.0,3.0,3.0,3.0,3.0,3.0,2.0,3.0,2.0,3.0,"List(0, 22, List(0), List(1.0))","List(0, 2, List(1), List(1.0))","List(0, 26, List(0), List(1.0))","List(0, 9, List(8), List(1.0))","List(0, 14, List(0), List(1.0))","List(0, 110, List(12, 14, 16, 17, 18, 20, 21, 22, 23, 24, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 60, 61, 95, 96), List(14.0, 1.0, 29.0, 1.0, 10.0, 3820.0, 1.0, 8.0, 2056.0, 1.0, 3.0, 3.0, 3.0, 3.0, 3.0, 2.0, 3.0, 2.0, 3.0, 1.0, 1.0, 1.0, 1.0, 1.0))","List(1, 2, List(), List(9.008219923333963, 0.9917800766660381))","List(1, 2, List(), List(0.9008219923333962, 0.09917800766660381))",0.0
0,False,False,False,False,False,False,0,0,0,0,0,0,14,0,1,0,29,1,10,0,4868,1,8,1921,1,0,0,0.0,3.0,3.0,3.0,3.0,3.0,0.0,3.0,2.0,3.0,"List(0, 22, List(0), List(1.0))","List(0, 2, List(1), List(1.0))","List(0, 26, List(0), List(1.0))","List(0, 9, List(8), List(1.0))","List(0, 14, List(0), List(1.0))","List(0, 110, List(12, 14, 16, 17, 18, 20, 21, 22, 23, 24, 28, 29, 30, 31, 32, 34, 35, 36, 37, 60, 61, 95, 96), List(14.0, 1.0, 29.0, 1.0, 10.0, 4868.0, 1.0, 8.0, 1921.0, 1.0, 3.0, 3.0, 3.0, 3.0, 3.0, 3.0, 2.0, 3.0, 1.0, 1.0, 1.0, 1.0, 1.0))","List(1, 2, List(), List(9.306407162197923, 0.6935928378020768))","List(1, 2, List(), List(0.9306407162197923, 0.06935928378020768))",0.0


In [122]:
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
import numpy as np
from pyspark.ml.regression import RandomForestRegressor

rf = RandomForestRegressor(labelCol="label", featuresCol="features")

paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [x for x in [50, 75] ]) \
    .addGrid(rf.maxDepth, [x for x in [20, 25] ]) \
    .build()


crossval = CrossValidator(estimator=rf,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(),
                          numFolds=3)
cvModel = crossval.fit(criteoTrain)
predictions = cvModel.transform(criteoTest)

### Saving The Random Forest Model


> <br>

In [124]:
display(critieoTestPredictions_lr)

label,i_1,i_10,c_19,c_20,c_25,c_26,c_1_index,c_2_index,c_3_index,c_4_index,c_5_index,c_6_index,c_7_index,c_8_index,c_9_index,c_10_index,c_11_index,c_12_index,c_13_index,c_14_index,c_15_index,c_16_index,c_17_index,c_18_index,c_21_index,c_23_index,c_24_index,i_2_bucket,i_3_bucket,i_4_bucket,i_5_bucket,i_6_bucket,i_7_bucket,i_8_bucket,i_9_bucket,i_11_bucket,i_13_bucket,c_6_OHE,c_9_OHE,c_14_OHE,c_17_OHE,c_23_OHE,features,rawPrediction,probability,prediction
0,False,False,False,False,False,False,0,0,0,0,0,0,5,0,0,954,4,1,5,1,1401,1,2,450,1,0,0,3.0,0.0,1.0,3.0,3.0,3.0,2.0,3.0,2.0,1.0,"List(0, 22, List(0), List(1.0))","List(0, 2, List(0), List(1.0))","List(0, 26, List(1), List(1.0))","List(0, 9, List(2), List(1.0))","List(0, 14, List(0), List(1.0))","List(0, 110, List(12, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 27, 29, 30, 31, 32, 33, 34, 35, 36, 37, 59, 62, 89, 96), List(5.0, 954.0, 4.0, 1.0, 5.0, 1.0, 1401.0, 1.0, 2.0, 450.0, 1.0, 3.0, 1.0, 3.0, 3.0, 3.0, 2.0, 3.0, 2.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","List(1, 2, List(), List(0.7173743127045273, -0.7173743127045273))","List(1, 2, List(), List(0.6720285609933266, 0.3279714390066733))",0.0
0,False,False,False,False,False,False,0,0,0,0,0,0,5,1,0,1476,4,1,5,0,588,1,0,82,1,0,0,2.0,3.0,0.0,3.0,3.0,0.0,0.0,3.0,0.0,0.0,"List(0, 22, List(0), List(1.0))","List(0, 2, List(0), List(1.0))","List(0, 26, List(0), List(1.0))","List(0, 9, List(0), List(1.0))","List(0, 14, List(0), List(1.0))","List(0, 110, List(12, 13, 15, 16, 17, 18, 20, 21, 23, 24, 27, 28, 30, 31, 34, 37, 59, 61, 87, 96), List(5.0, 1.0, 1476.0, 4.0, 1.0, 5.0, 588.0, 1.0, 82.0, 1.0, 2.0, 3.0, 3.0, 3.0, 3.0, 1.0, 1.0, 1.0, 1.0, 1.0))","List(1, 2, List(), List(-0.18590880730154985, 0.18590880730154985))","List(1, 2, List(), List(0.4536561995458742, 0.5463438004541258))",1.0
0,False,False,False,False,False,False,0,0,0,0,0,0,5,2,0,141,4,1,5,0,6273,1,0,1486,1,0,0,0.0,0.0,3.0,1.0,2.0,0.0,3.0,1.0,0.0,2.0,"List(0, 22, List(0), List(1.0))","List(0, 2, List(0), List(1.0))","List(0, 26, List(0), List(1.0))","List(0, 9, List(0), List(1.0))","List(0, 14, List(0), List(1.0))","List(0, 110, List(12, 13, 15, 16, 17, 18, 20, 21, 23, 24, 29, 30, 31, 33, 34, 36, 37, 59, 61, 87, 96), List(5.0, 2.0, 141.0, 4.0, 1.0, 5.0, 6273.0, 1.0, 1486.0, 1.0, 3.0, 1.0, 2.0, 3.0, 1.0, 2.0, 1.0, 1.0, 1.0, 1.0, 1.0))","List(1, 2, List(), List(0.319686795860182, -0.319686795860182))","List(1, 2, List(), List(0.5792479200039619, 0.4207520799960381))",0.0
0,False,False,False,False,False,False,0,0,0,0,0,0,6,0,0,65,5,1,8,2,977,1,0,687,1,0,0,1.0,0.0,1.0,0.0,3.0,0.0,0.0,0.0,0.0,1.0,"List(0, 22, List(0), List(1.0))","List(0, 2, List(0), List(1.0))","List(0, 26, List(2), List(1.0))","List(0, 9, List(0), List(1.0))","List(0, 14, List(0), List(1.0))","List(0, 110, List(12, 15, 16, 17, 18, 19, 20, 21, 23, 24, 27, 29, 31, 36, 37, 59, 63, 87, 96), List(6.0, 65.0, 5.0, 1.0, 8.0, 2.0, 977.0, 1.0, 687.0, 1.0, 1.0, 1.0, 3.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","List(1, 2, List(), List(0.7532738453829171, -0.7532738453829171))","List(1, 2, List(), List(0.6798916348474482, 0.32010836515255175))",0.0
0,False,False,False,False,False,False,0,0,0,0,0,0,6,13,0,238,5,1,8,0,335,1,0,82,1,0,0,3.0,3.0,3.0,1.0,0.0,2.0,0.0,1.0,2.0,3.0,"List(0, 22, List(0), List(1.0))","List(0, 2, List(0), List(1.0))","List(0, 26, List(0), List(1.0))","List(0, 9, List(0), List(1.0))","List(0, 14, List(0), List(1.0))","List(0, 110, List(12, 13, 15, 16, 17, 18, 20, 21, 23, 24, 27, 28, 29, 30, 32, 34, 35, 36, 37, 59, 61, 87, 96), List(6.0, 13.0, 238.0, 5.0, 1.0, 8.0, 335.0, 1.0, 82.0, 1.0, 3.0, 3.0, 3.0, 1.0, 2.0, 1.0, 2.0, 3.0, 1.0, 1.0, 1.0, 1.0, 1.0))","List(1, 2, List(), List(-0.7211316888704877, 0.7211316888704877))","List(1, 2, List(), List(0.32714382602375375, 0.6728561739762462))",1.0
0,False,False,False,False,False,False,0,0,0,0,0,0,9,0,0,108,17,1,16,0,1305,1,0,173,1,2,0,0.0,0.0,0.0,3.0,3.0,0.0,0.0,1.0,0.0,0.0,"List(0, 22, List(0), List(1.0))","List(0, 2, List(0), List(1.0))","List(0, 26, List(0), List(1.0))","List(0, 9, List(0), List(1.0))","List(0, 14, List(2), List(1.0))","List(0, 110, List(12, 15, 16, 17, 18, 20, 21, 23, 24, 25, 30, 31, 34, 37, 59, 61, 87, 98), List(9.0, 108.0, 17.0, 1.0, 16.0, 1305.0, 1.0, 173.0, 1.0, 2.0, 3.0, 3.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","List(1, 2, List(), List(0.014933891483809636, -0.014933891483809636))","List(1, 2, List(), List(0.5037334034855597, 0.4962665965144402))",0.0
0,False,False,False,False,False,False,0,0,0,0,0,0,10,1,0,13,3,1,2,0,588,1,7,82,1,0,0,0.0,3.0,3.0,0.0,3.0,0.0,0.0,0.0,0.0,3.0,"List(0, 22, List(0), List(1.0))","List(0, 2, List(0), List(1.0))","List(0, 26, List(0), List(1.0))","List(0, 9, List(7), List(1.0))","List(0, 14, List(0), List(1.0))","List(0, 110, List(12, 13, 15, 16, 17, 18, 20, 21, 22, 23, 24, 28, 29, 31, 36, 37, 59, 61, 94, 96), List(10.0, 1.0, 13.0, 3.0, 1.0, 2.0, 588.0, 1.0, 7.0, 82.0, 1.0, 3.0, 3.0, 3.0, 3.0, 1.0, 1.0, 1.0, 1.0, 1.0))","List(1, 2, List(), List(1.7007367632735013, -1.7007367632735013))","List(1, 2, List(), List(0.8456309359391896, 0.1543690640608103))",0.0
0,False,False,False,False,False,False,0,0,0,0,0,0,13,0,0,1026,7,1,4,1,5268,1,5,1097,1,0,0,0.0,0.0,0.0,1.0,1.0,3.0,1.0,3.0,3.0,0.0,"List(0, 22, List(0), List(1.0))","List(0, 2, List(0), List(1.0))","List(0, 26, List(1), List(1.0))","List(0, 9, List(5), List(1.0))","List(0, 14, List(0), List(1.0))","List(0, 110, List(12, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 30, 31, 32, 33, 34, 35, 37, 59, 62, 92, 96), List(13.0, 1026.0, 7.0, 1.0, 4.0, 1.0, 5268.0, 1.0, 5.0, 1097.0, 1.0, 1.0, 1.0, 3.0, 1.0, 3.0, 3.0, 1.0, 1.0, 1.0, 1.0, 1.0))","List(1, 2, List(), List(-0.8172309904536585, 0.8172309904536585))","List(1, 2, List(), List(0.30635175969170136, 0.6936482403082986))",1.0
0,False,False,False,False,False,False,0,0,0,0,0,0,14,0,1,0,29,1,10,0,3820,1,8,2056,1,0,0,0.0,3.0,3.0,3.0,3.0,3.0,2.0,3.0,2.0,3.0,"List(0, 22, List(0), List(1.0))","List(0, 2, List(1), List(1.0))","List(0, 26, List(0), List(1.0))","List(0, 9, List(8), List(1.0))","List(0, 14, List(0), List(1.0))","List(0, 110, List(12, 14, 16, 17, 18, 20, 21, 22, 23, 24, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 60, 61, 95, 96), List(14.0, 1.0, 29.0, 1.0, 10.0, 3820.0, 1.0, 8.0, 2056.0, 1.0, 3.0, 3.0, 3.0, 3.0, 3.0, 2.0, 3.0, 2.0, 3.0, 1.0, 1.0, 1.0, 1.0, 1.0))","List(1, 2, List(), List(1.486832144839124, -1.486832144839124))","List(1, 2, List(), List(0.815602318693018, 0.18439768130698195))",0.0
0,False,False,False,False,False,False,0,0,0,0,0,0,14,0,1,0,29,1,10,0,4868,1,8,1921,1,0,0,0.0,3.0,3.0,3.0,3.0,3.0,0.0,3.0,2.0,3.0,"List(0, 22, List(0), List(1.0))","List(0, 2, List(1), List(1.0))","List(0, 26, List(0), List(1.0))","List(0, 9, List(8), List(1.0))","List(0, 14, List(0), List(1.0))","List(0, 110, List(12, 14, 16, 17, 18, 20, 21, 22, 23, 24, 28, 29, 30, 31, 32, 34, 35, 36, 37, 60, 61, 95, 96), List(14.0, 1.0, 29.0, 1.0, 10.0, 4868.0, 1.0, 8.0, 1921.0, 1.0, 3.0, 3.0, 3.0, 3.0, 3.0, 3.0, 2.0, 3.0, 1.0, 1.0, 1.0, 1.0, 1.0))","List(1, 2, List(), List(1.5390743043585138, -1.5390743043585138))","List(1, 2, List(), List(0.8233301160200049, 0.17666988397999497))",0.0


In [125]:
import mleap.pyspark
from mleap.pyspark.spark_support import SimpleSparkSerializer

Modelrf.serializeToBundle("jar:file:/tmp/RF_model.zip", critieoTestPredictions_rf)

In [126]:
dbutils.fs.cp("file:/tmp/RF_model.zip", "dbfs:/FileStore/random_forest_model.zip")




### linearSVM model


> <br>

In [128]:
# training the data using a linearSVM model

from pyspark.ml.classification import LinearSVC
lsvc = LinearSVC(maxIter=20, regParam=0.1)

ModelLSVM = lsvc.fit(criteoTrain)


critieoTestPredictions_LSVM = (ModelLSVM
                       .transform(criteoTest)
                       .cache())

### Saving The SVM Model


> <br>

In [130]:
import mleap.pyspark
from mleap.pyspark.spark_support import SimpleSparkSerializer
# mleap.pyspark.mleap.version
ModelLSVM.serializeToBundle("jar:file:/tmp/LSVM_model.zip", critieoTestPredictions_LSVM)

In [131]:
dbutils.fs.cp("file:/tmp/LSVM_model.zip", "dbfs:/FileStore/lsvm_model.zip")

### Calculating Model Accuracies


> <br>

In [133]:
# model accuracy for logestic regression model
from pyspark.sql.functions import col

def modelAccuracy(df):
    return (df
          .select((col('prediction') == col('label')).cast('int').alias('correct'))
          .groupBy()
          .avg('correct')
          .first()[0])

modelAccuracy = modelAccuracy(critieoTestPredictions_lr)

print('modelOneAccuracy for the logestic regression model: {0:.3f}'.format(modelAccuracy))



In [134]:
# model accuracy for random forest model
from pyspark.sql.functions import col

def modelAccuracy(df):
    return (df
          .select((col('prediction') == col('label')).cast('int').alias('correct'))
          .groupBy()
          .avg('correct')
          .first()[0])

modelAccuracy = modelAccuracy(critieoTestPredictions_rf)
print('modelOneAccuracy for the Random Forest model: {0:.3f}'.format(modelAccuracy))



In [135]:
# model accuracy for linear SVM
from pyspark.sql.functions import col

def modelAccuracy(df):
    return (df
          .select((col('prediction') == col('label')).cast('int').alias('correct'))
          .groupBy()
          .avg('correct')
          .first()[0])

modelAccuracy = modelAccuracy(critieoTestPredictions_LSVM)

print('modelOneAccuracy for the LSVM model: {0:.3f}'.format(modelAccuracy))



In [136]:
# save the predictions
criteoPredictParquet = "/tmp/criteosmall_predict.parquet"

(critieoTestPredictions.write                       # Our DataFrameWriter
  .option("delimiter", "\t")  
  .option("compression", "snappy")
  .mode("overwrite")                       # Replace existing files
  .parquet(criteoPredictParquet)               # Write DataFrame to parquet files
)

In [137]:
# from mmlspark import LightGBMClassifier
# the above name space has changed

from mmlspark.lightgbm  import LightGBMClassifier
lgb_estimator = LightGBMClassifier(learningRate=0.1, 
                                   numIterations=100,
                                   earlyStoppingRound=10,
                                   labelCol="label", timeout=3000.0)

ModelLGB = lgb_estimator.fit(criteoTest)


critieoTestPredictions_LGBM = (ModelLGB
                       .transform(criteoTest)
                       .cache())

In [138]:
# search grid for tunning the logistic parameters values
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
#logit = LogisticRegression(maxIter=10)
logit = (LogisticRegression()
      .setFeaturesCol('features')
      .setLabelCol('label')
      .setMaxIter(60))
pipeline = Pipeline(stages=[logit])
paramGrid = ParamGridBuilder() \
    .addGrid(logit.regParam, [0, 0.01, 0.05, 0.1, 0.5, 1.0,1.3,1.5,2.0]) \
    .addGrid(logit.elasticNetParam, [0.0, 0.1, 0.5, 0.8, 1]) \
    .build()
evaluator = BinaryClassificationEvaluator(metricName = 'areaUnderPR')
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)
tuned_model = crossval.fit(criteoTrain)
model = tuned_model.bestModel
critieoTestPredictions_lr = (model
                       .transform(criteoTest)
                       .cache())

In [139]:
# The End