## Kaggle Categorical Feature Encoding Challenge II

This Kaggle competition is based on the artificially created dataset where every feature is categorical. The dataset that can be downloaded from

https://www.kaggle.com/c/cat-in-the-dat-ii/data

consists of 5 binary features, 10 low- and high-cardinality nominal features, 6 low- and high-cardinality ordinal features and 2 potentially cyclic features. This is a Playground competition that allows participants to build machine learning skills by trying different encoding schemes, trying different ways to impute missing values and comparing performance of different algorithms. The target variable is an ordinary binary variable.

The purpose of the project presented in this notebook, is to do predictive modelling based on the
methods of PySpark (rather than scikit-learn) machine learning library. While using PySpark is hardly justified for relatively small datasets (the sizes of train and test sets are 83MB and 55MB respectively), it is interesting to see how accurate predictions can be if one uses built-in classifiers from pyspark.ml module.    

## Reading and inspecting the data 

In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

#create the spark session
spark = SparkSession.builder.appName('CFEC_II').getOrCreate()

In [2]:
#import the garbage collection module

import gc
gc.enable()

In [3]:
#import the module that shows the memory usage

import os, psutil

def usage():
    process = psutil.Process(os.getpid())
    return process.memory_info()[0] / float(2 ** 20)
    
usage()

62.76953125

In [4]:
train_df = spark.read.csv('../data/train.csv', header = True, inferSchema = True)
train_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- bin_0: double (nullable = true)
 |-- bin_1: double (nullable = true)
 |-- bin_2: double (nullable = true)
 |-- bin_3: string (nullable = true)
 |-- bin_4: string (nullable = true)
 |-- nom_0: string (nullable = true)
 |-- nom_1: string (nullable = true)
 |-- nom_2: string (nullable = true)
 |-- nom_3: string (nullable = true)
 |-- nom_4: string (nullable = true)
 |-- nom_5: string (nullable = true)
 |-- nom_6: string (nullable = true)
 |-- nom_7: string (nullable = true)
 |-- nom_8: string (nullable = true)
 |-- nom_9: string (nullable = true)
 |-- ord_0: double (nullable = true)
 |-- ord_1: string (nullable = true)
 |-- ord_2: string (nullable = true)
 |-- ord_3: string (nullable = true)
 |-- ord_4: string (nullable = true)
 |-- ord_5: string (nullable = true)
 |-- day: double (nullable = true)
 |-- month: double (nullable = true)
 |-- target: integer (nullable = true)



In [5]:
test_df = spark.read.csv('../data/test.csv', header = True, inferSchema = True)
test_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- bin_0: double (nullable = true)
 |-- bin_1: double (nullable = true)
 |-- bin_2: double (nullable = true)
 |-- bin_3: string (nullable = true)
 |-- bin_4: string (nullable = true)
 |-- nom_0: string (nullable = true)
 |-- nom_1: string (nullable = true)
 |-- nom_2: string (nullable = true)
 |-- nom_3: string (nullable = true)
 |-- nom_4: string (nullable = true)
 |-- nom_5: string (nullable = true)
 |-- nom_6: string (nullable = true)
 |-- nom_7: string (nullable = true)
 |-- nom_8: string (nullable = true)
 |-- nom_9: string (nullable = true)
 |-- ord_0: double (nullable = true)
 |-- ord_1: string (nullable = true)
 |-- ord_2: string (nullable = true)
 |-- ord_3: string (nullable = true)
 |-- ord_4: string (nullable = true)
 |-- ord_5: string (nullable = true)
 |-- day: double (nullable = true)
 |-- month: double (nullable = true)



In [6]:
train_df.show(10)

+---+-----+-----+-----+-----+-----+-----+---------+-------+----------+--------+---------+---------+---------+---------+---------+-----+-----------+-----------+-----+-----+-----+---+-----+------+
| id|bin_0|bin_1|bin_2|bin_3|bin_4|nom_0|    nom_1|  nom_2|     nom_3|   nom_4|    nom_5|    nom_6|    nom_7|    nom_8|    nom_9|ord_0|      ord_1|      ord_2|ord_3|ord_4|ord_5|day|month|target|
+---+-----+-----+-----+-----+-----+-----+---------+-------+----------+--------+---------+---------+---------+---------+---------+-----+-----------+-----------+-----+-----+-----+---+-----+------+
|  0|  0.0|  0.0|  0.0|    F|    N|  Red|Trapezoid|Hamster|    Russia| Bassoon|de4c57ee2|a64bc7ddf|598080a91|0256c7a4b|02e7c8990|  3.0|Contributor|        Hot|    c|    U|   Pw|6.0|  3.0|     0|
|  1|  1.0|  1.0|  0.0|    F|    Y|  Red|     Star|Axolotl|      null|Theremin|2bb3c3e5c|3a3a936e8|1dddb8473|52ead350c|f37df64af|  3.0|Grandmaster|       Warm|    e|    X|   pE|7.0|  7.0|     0|
|  2|  0.0|  1.0|  0.0|  

In [7]:
test_df.show(10)

+------+-----+-----+-----+-----+-----+-----+---------+-------+----------+--------+---------+---------+---------+---------+---------+-----+-----------+-----------+-----+-----+-----+---+-----+
|    id|bin_0|bin_1|bin_2|bin_3|bin_4|nom_0|    nom_1|  nom_2|     nom_3|   nom_4|    nom_5|    nom_6|    nom_7|    nom_8|    nom_9|ord_0|      ord_1|      ord_2|ord_3|ord_4|ord_5|day|month|
+------+-----+-----+-----+-----+-----+-----+---------+-------+----------+--------+---------+---------+---------+---------+---------+-----+-----------+-----------+-----+-----+-----+---+-----+
|600000|  0.0|  0.0|  0.0|    F|    Y| Blue|  Polygon|Axolotl|   Finland|   Piano|52f6dd16c|147d704e4|8d857a0a1|ca9ad1d4b|fced9e114|  3.0|     Novice|Boiling Hot|    f|    U|   oU|3.0|  9.0|
|600001|  0.0|  0.0|  0.0|    F|    Y|  Red|   Circle|   Lion|    Russia| Bassoon|691ebeae8|8653dcc2e|67a8d4ebb|060a21580|7ca8775da|  1.0|     Novice|       Cold|    n|    N| null|2.0|  8.0|
|600002|  0.0|  0.0|  0.0|    F|    Y| Blue| 

We see that almost every row in both train_df and test_df contains a missing value. Let's count how many values are missing. 'id' and 'target' columns are the only ones that don't have missing values. 

In [8]:
train_df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in train_df.columns]).show()

+---+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+
| id|bin_0|bin_1|bin_2|bin_3|bin_4|nom_0|nom_1|nom_2|nom_3|nom_4|nom_5|nom_6|nom_7|nom_8|nom_9|ord_0|ord_1|ord_2|ord_3|ord_4|ord_5|  day|month|target|
+---+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+
|  0|17894|18003|17930|18014|18047|18252|18156|18035|18121|18035|17778|18131|18003|17755|18073|18288|18041|18075|17916|17930|17713|17952|17988|     0|
+---+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+



In [9]:
test_df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in test_df.columns]).show()

+---+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
| id|bin_0|bin_1|bin_2|bin_3|bin_4|nom_0|nom_1|nom_2|nom_3|nom_4|nom_5|nom_6|nom_7|nom_8|nom_9|ord_0|ord_1|ord_2|ord_3|ord_4|ord_5|  day|month|
+---+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
|  0|11901|12038|11972|11951|11951|12062|11947|12179|12176|11993|11912|12012|12003|11956|12060|11893|12167|12105|12053|11933|12047|12025|11984|
+---+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+



We also see from below that both dataframes contain the categorical features (nominal and ordinal) whose cardinality varies significantly. Some columns of the  train set contain unique values not present in the test set.

In [10]:
train_df.select([F.countDistinct(F.col(c)).alias(c) for c in train_df.columns]).show()

+------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+---+-----+------+
|    id|bin_0|bin_1|bin_2|bin_3|bin_4|nom_0|nom_1|nom_2|nom_3|nom_4|nom_5|nom_6|nom_7|nom_8|nom_9|ord_0|ord_1|ord_2|ord_3|ord_4|ord_5|day|month|target|
+------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+---+-----+------+
|600000|    2|    2|    2|    2|    2|    3|    6|    6|    6|    4| 1220| 1519|  222|  222| 2218|    3|    5|    6|   15|   26|  190|  7|   12|     2|
+------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+---+-----+------+



In [11]:
test_df.select([F.countDistinct(F.col(c)).alias(c) for c in test_df.columns]).show()

+------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+---+-----+
|    id|bin_0|bin_1|bin_2|bin_3|bin_4|nom_0|nom_1|nom_2|nom_3|nom_4|nom_5|nom_6|nom_7|nom_8|nom_9|ord_0|ord_1|ord_2|ord_3|ord_4|ord_5|day|month|
+------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+---+-----+
|400000|    2|    2|    2|    2|    2|    3|    6|    6|    6|    4| 1219| 1517|  222|  222| 2216|    3|    5|    6|   15|   26|  190|  7|   12|
+------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+---+-----+



Let's look closer what the 'nom_5', 'nom_6','nom_7','nom_8','nom_9' columns contain.

In [12]:
train_df.select('nom_5', 'nom_6','nom_7','nom_8','nom_9').show(5)

+---------+---------+---------+---------+---------+
|    nom_5|    nom_6|    nom_7|    nom_8|    nom_9|
+---------+---------+---------+---------+---------+
|de4c57ee2|a64bc7ddf|598080a91|0256c7a4b|02e7c8990|
|2bb3c3e5c|3a3a936e8|1dddb8473|52ead350c|f37df64af|
|b574c9841|708248125|5ddc9a726|745b909d1|     null|
|673bdf1f6|23edb8da3|3a33ef960|bdaa56dd1|f9d456e57|
|777d1ac2c|3a7975e46|bc9cc2a94|     null|c5361037c|
+---------+---------+---------+---------+---------+
only showing top 5 rows



In [13]:
test_df.select('nom_5', 'nom_6','nom_7','nom_8','nom_9').show(5)

+---------+---------+---------+---------+---------+
|    nom_5|    nom_6|    nom_7|    nom_8|    nom_9|
+---------+---------+---------+---------+---------+
|52f6dd16c|147d704e4|8d857a0a1|ca9ad1d4b|fced9e114|
|691ebeae8|8653dcc2e|67a8d4ebb|060a21580|7ca8775da|
|81f792c16|6cdda499e|69403e18c|165e81a00|5940334c9|
|c9134205b|acbca4827|cb681246b|77d41330d|6fbdeefc8|
|f0f100f57|6f800b9af|cd9feb5c6|2218d9dfe|2a27c8fde|
+---------+---------+---------+---------+---------+
only showing top 5 rows



We see that the above columns contain different combinations of letters and numbers of various lengths. It is interesting to see that all these entries are different for all 5 columns 'nom_5', 'nom_6','nom_7','nom_8','nom_9'. All what they have in common is the missing values.

In [14]:
print( train_df.select('nom_5').distinct().\
       join(train_df.select('nom_6').distinct(), 
       train_df.nom_5 == train_df.nom_6, how = 'inner').count(),
      
       train_df.select('nom_6').distinct().\
       join(train_df.select('nom_7').distinct(), 
       train_df.nom_6 == train_df.nom_7, how = 'inner').count(),
      
       train_df.select('nom_7').distinct().\
       join(train_df.select('nom_8').distinct(), 
       train_df.nom_7 == train_df.nom_8, how = 'inner').count(),
      
       train_df.select('nom_8').distinct().\
       join(train_df.select('nom_9').distinct(), 
       train_df.nom_8 == train_df.nom_9, how = 'inner').count() )

0 0 0 0


Let's count the number of distinct values for a number of relatively low-cardinality features. Number of missing values is provided as well.

In [15]:
for col in [col for col in train_df.columns if 'bin' in col]:

    print( f'Values in the column {col} are (train set)', 
    train_df.groupBy(col).count().orderBy('count').toPandas().set_index(col).to_dict().values() )

Values in the column bin_0 are (train set) dict_values([{nan: 17894, 1.0: 53729, 0.0: 528377}])
Values in the column bin_1 are (train set) dict_values([{nan: 18003, 1.0: 107979, 0.0: 474018}])
Values in the column bin_2 are (train set) dict_values([{nan: 17930, 1.0: 162225, 0.0: 419845}])
Values in the column bin_3 are (train set) dict_values([{None: 18014, 'T': 215774, 'F': 366212}])
Values in the column bin_4 are (train set) dict_values([{None: 18047, 'Y': 269609, 'N': 312344}])


In [16]:
for col in [col for col in test_df.columns if 'bin' in col]:

    print( f'Values in the column {col} are (test set)', 
    test_df.groupBy(col).count().orderBy('count').toPandas().set_index(col).to_dict().values() )

Values in the column bin_0 are (test set) dict_values([{nan: 11901, 1.0: 36322, 0.0: 351777}])
Values in the column bin_1 are (test set) dict_values([{nan: 12038, 1.0: 72609, 0.0: 315353}])
Values in the column bin_2 are (test set) dict_values([{nan: 11972, 1.0: 108030, 0.0: 279998}])
Values in the column bin_3 are (test set) dict_values([{None: 11951, 'T': 143957, 'F': 244092}])
Values in the column bin_4 are (test set) dict_values([{None: 11951, 'Y': 179622, 'N': 208427}])


In [17]:
for col in ['nom_0','nom_1', 'nom_2','nom_3','nom_4']:

    print( f'Values in the column {col} are (train set)', 
    train_df.groupBy(col).count().orderBy('count').toPandas().set_index(col).to_dict().values() )

Values in the column nom_0 are (train set) dict_values([{None: 18252, 'Green': 52601, 'Blue': 205861, 'Red': 323286}])
Values in the column nom_1 are (train set) dict_values([{'Star': 14155, None: 18156, 'Square': 26503, 'Circle': 104995, 'Trapezoid': 119438, 'Polygon': 152563, 'Triangle': 164190}])
Values in the column nom_2 are (train set) dict_values([{'Snake': 14144, None: 18035, 'Cat': 26276, 'Dog': 104825, 'Lion': 119504, 'Axolotl': 152319, 'Hamster': 164897}])
Values in the column nom_3 are (train set) dict_values([{'China': 14317, None: 18121, 'Canada': 26425, 'Finland': 104601, 'Russia': 119840, 'Costa Rica': 151827, 'India': 164869}])
Values in the column nom_4 are (train set) dict_values([{None: 18035, 'Piano': 26709, 'Oboe': 49996, 'Bassoon': 196639, 'Theremin': 308621}])


In [18]:
for col in ['nom_0','nom_1', 'nom_2','nom_3','nom_4']:

    print( f'Values in the column {col} are (test set)', 
    test_df.groupBy(col).count().orderBy('count').toPandas().set_index(col).to_dict().values() )

Values in the column nom_0 are (test set) dict_values([{None: 12062, 'Green': 34894, 'Blue': 136592, 'Red': 216452}])
Values in the column nom_1 are (test set) dict_values([{'Star': 9523, None: 11947, 'Square': 17398, 'Circle': 70076, 'Trapezoid': 80025, 'Polygon': 101389, 'Triangle': 109642}])
Values in the column nom_2 are (test set) dict_values([{'Snake': 9416, None: 12179, 'Cat': 17641, 'Dog': 69927, 'Lion': 79702, 'Axolotl': 101836, 'Hamster': 109299}])
Values in the column nom_3 are (test set) dict_values([{'China': 9401, None: 12176, 'Canada': 17619, 'Finland': 69587, 'Russia': 80093, 'Costa Rica': 101447, 'India': 109677}])
Values in the column nom_4 are (test set) dict_values([{None: 11993, 'Piano': 17673, 'Oboe': 33332, 'Bassoon': 131465, 'Theremin': 205537}])


In [19]:
for col in ['ord_0', 'ord_1','ord_2','ord_3']:

    print( f'Values in the column {col} are (train set)', 
    train_df.groupBy(col).count().orderBy('count').toPandas().set_index(col).to_dict().values() )

Values in the column ord_0 are (train set) dict_values([{nan: 18288, 2.0: 155997, 3.0: 197798, 1.0: 227917}])
Values in the column ord_1 are (train set) dict_values([{None: 18041, 'Master': 75998, 'Grandmaster': 95866, 'Contributor': 109821, 'Expert': 139677, 'Novice': 160597}])
Values in the column ord_2 are (train set) dict_values([{None: 18075, 'Lava Hot': 64840, 'Hot': 67508, 'Boiling Hot': 84790, 'Cold': 97822, 'Warm': 124239, 'Freezing': 142726}])
Values in the column ord_3 are (train set) dict_values([{'l': 2835, 'j': 3639, 'g': 6180, None: 17916, 'f': 29450, 'd': 30634, 'i': 34763, 'k': 38718, 'e': 38904, 'b': 44795, 'o': 45464, 'h': 55744, 'c': 56675, 'm': 57980, 'a': 65321, 'n': 70982}])


In [20]:
for col in ['ord_0', 'ord_1','ord_2','ord_3']:

    print( f'Values in the column {col} are (test set)', 
    test_df.groupBy(col).count().orderBy('count').toPandas().set_index(col).to_dict().values() )

Values in the column ord_0 are (test set) dict_values([{nan: 11893, 2.0: 104146, 3.0: 132302, 1.0: 151659}])
Values in the column ord_1 are (test set) dict_values([{None: 12167, 'Master': 51086, 'Grandmaster': 63986, 'Contributor': 73069, 'Expert': 92863, 'Novice': 106829}])
Values in the column ord_2 are (test set) dict_values([{None: 12105, 'Lava Hot': 43493, 'Hot': 44509, 'Boiling Hot': 56624, 'Cold': 65042, 'Warm': 82940, 'Freezing': 95287}])
Values in the column ord_3 are (test set) dict_values([{'l': 1957, 'j': 2452, 'g': 4203, None: 12053, 'f': 19771, 'd': 20552, 'i': 23453, 'k': 25600, 'e': 25628, 'b': 29456, 'o': 30484, 'h': 36974, 'c': 37888, 'm': 38372, 'a': 43625, 'n': 47532}])


Finally, we count distinct values in the target feature of train_df. We see that our dataset is rather imbalanced.

In [21]:
print( f'Values in the target column are (train set)', 
          train_df.groupBy('target').count().orderBy('count').\
      toPandas().set_index('target').to_dict().values() )

Values in the target column are (train set) dict_values([{1: 112323, 0: 487677}])


We then check that columns 'ord_4' and 'ord_5' in train_df and test_df dataframes 
contain the same set of values.

In [22]:
set(train_df.select('ord_4').distinct().toPandas().ord_4) ==\
set(test_df.select('ord_4').distinct().toPandas().ord_4)

True

In [23]:
set(train_df.select('ord_5').distinct().toPandas().ord_5) ==\
set(test_df.select('ord_5').distinct().toPandas().ord_5)

True

## Predictive modelling

In this section, we will first use the Pipeline to encode features. The process includes category indexing and subsequent assembling encoded features. Missing values are taken care of automatically during the procedure of category encoding. After the major preprocessing steps, the Gradient Boosting Tree and Random Forest Classifiers are used to fit the training data via 3-fold cross-validation. Finally, the GBT classifier is used to make predictions for the target variable on the test set.

In [24]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline

In [25]:
# low cardinality columns
cols1 = ['bin_0', 'bin_1', 'bin_2', 'bin_3', 'bin_4', 'nom_0', 'nom_1', 'nom_2', 
         'nom_3', 'nom_4', 'ord_0', 'ord_1', 'ord_2']

#high cardinality columns
cols2 = ['nom_5', 'nom_6', 'nom_7', 'nom_8', 'nom_9', 'ord_3', 'ord_4', 'ord_5', 'day', 'month']

cols = cols1 + cols2

#stages for the pipeline
stages = []

In [26]:
#treat one column at a time
for col in cols:
    strInd = StringIndexer(inputCol = col, outputCol = col + '_idx', handleInvalid="keep")
    
    stages += [strInd]


#do the vector assembling
assemblerInputs = [col + '_idx' for col in cols] 
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol='features')
stages += [assembler]

Before applying the classification algorithms, we must perform the final preparatory steps described above for both train_df and test_df using the pipeline. We also extract 'id' column from the test data. 

In [27]:
#extract id's from the test data
ids = test_df.select("id").rdd.map(lambda x: int(x[0]))

In [28]:
%%time

pipeline = Pipeline(stages = stages)

pipelineModel = pipeline.fit(train_df)
train_df = pipelineModel.transform(train_df)

selectedCols = ['features','target'] + cols
train_df = train_df.select(selectedCols)

train_df.printSchema()
train_df.select('features').show(5, truncate=False)

root
 |-- features: vector (nullable = true)
 |-- target: integer (nullable = true)
 |-- bin_0: double (nullable = true)
 |-- bin_1: double (nullable = true)
 |-- bin_2: double (nullable = true)
 |-- bin_3: string (nullable = true)
 |-- bin_4: string (nullable = true)
 |-- nom_0: string (nullable = true)
 |-- nom_1: string (nullable = true)
 |-- nom_2: string (nullable = true)
 |-- nom_3: string (nullable = true)
 |-- nom_4: string (nullable = true)
 |-- ord_0: double (nullable = true)
 |-- ord_1: string (nullable = true)
 |-- ord_2: string (nullable = true)
 |-- nom_5: string (nullable = true)
 |-- nom_6: string (nullable = true)
 |-- nom_7: string (nullable = true)
 |-- nom_8: string (nullable = true)
 |-- nom_9: string (nullable = true)
 |-- ord_3: string (nullable = true)
 |-- ord_4: string (nullable = true)
 |-- ord_5: string (nullable = true)
 |-- day: double (nullable = true)
 |-- month: double (nullable = true)

+-----------------------------------------------------------------

In [29]:
pipeline = Pipeline(stages = stages)

pipelineModel = pipeline.fit(test_df)
test_df = pipelineModel.transform(test_df)

selectedCols = ['features'] + cols
test_df = test_df.select(selectedCols)

#test_df.printSchema()
#train_df.select('features').show(5, truncate=False)

### Gradient Boosting Tree Classifier 

We first use the Gradient Boosting Tree Classifier to train the model. We will do the GridSearch 3-fold cross-validation, choosing the best model among the models with maxIter =20, and stepSize equal to 0.1, 0.15 and 0.2. Note that parameter maxBin must be set to 2219, the number of categories in the column with the highest cardinality. Evaluator will evaluate the area under ROC curve. The seed parameter is 789.

In [30]:
from pyspark.ml.classification import GBTClassifier, RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [31]:
%%time

# initialize the GBT classifier
gbtcl = GBTClassifier(labelCol="target", featuresCol="features", 
                      predictionCol='prediction', maxDepth=5, maxIter=20, maxBins = 2219,
                      seed = 789)

evaluator = BinaryClassificationEvaluator(labelCol = 'target')

# Create a parameter grid builder
params = ParamGridBuilder()

# Add grid points
params = params.addGrid(gbtcl.stepSize, [0.1, 0.15, 0.2])
# Construct the grid
params = params.build()


# 3-fold cross validation
cv = CrossValidator(estimator=gbtcl, estimatorParamMaps=params, evaluator=evaluator, numFolds=3)

#fit the model on train data
gbtModel = cv.fit(train_df)


Wall time: 28min 1s


We see that the best model is the model with stepSize parameter equal to 0.2.

In [32]:
gbtModel.bestModel.explainParam('stepSize')

'stepSize: Step size (a.k.a. learning rate) in interval (0, 1] for shrinking the contribution of each estimator. (default: 0.1, current: 0.2)'

The area under the ROC curve obtained as a result of cross-validation is 0.6782.

In [33]:
# display CV score
auc_roc = gbtModel.avgMetrics[0]
print('The area under ROC curve is:', auc_roc)

The area under ROC curve is: 0.6782561292772246


### Random Forest Classifier 

Next we use the Random Forest Classifier for training. We again do the GridSearch 3-fold cross-validation, choosing the best model among the models with maxDepth =5, and numTrees equal to 20, 50 and 100. The same evaluator, area under ROC curve, is used.

In [34]:
%%time

# initialize the Random Forest Classifier
rfcl = RandomForestClassifier(labelCol="target", featuresCol="features", 
                    predictionCol='prediction', maxDepth=5, subsamplingRate=1.0,
                    maxBins = 2219, seed = 789)

evaluator = BinaryClassificationEvaluator(labelCol = 'target')

# Create a parameter grid builder
params = ParamGridBuilder()

# Add grid points
params = params.addGrid(rfcl.numTrees, [20, 50, 80])
# Construct the grid
params = params.build()


# 3-fold cross validation
cv = CrossValidator(estimator=rfcl, estimatorParamMaps=params, evaluator=evaluator, numFolds=3)

#fit the model on train data
rfModel = cv.fit(train_df)

Wall time: 17min 49s


The best model is the model with numTrees parameter equal to 80.

In [35]:
rfModel.bestModel.explainParam('numTrees')

'numTrees: Number of trees to train (>= 1) (default: 20, current: 80)'

The area under the ROC curve is only 0.6324.

In [36]:
# display CV score
auc_roc = rfModel.avgMetrics[0]
print('The area under ROC curve is:', auc_roc)

The area under ROC curve is: 0.6324591567908788


### Predictions 

Since the GBT classifier tends to give better results, we will use it to make predictions on the test data set. For submission, we must extract the probability of the second class.

In [37]:
predictions = gbtModel.transform(test_df)

# extract the probability of the second class
targets = predictions.select("probability").rdd.map(lambda x: float(x[0][1]))

To prepare submission, we create numpy arrays out of ids and targets, create the corresponding Pandas dataframe and write if to csv file. 

In [38]:
from numpy import array

#create numpy arrays
ids = array(ids.collect())
targets = array(targets.collect())

print(len(ids), len(targets))

400000 400000


In [39]:
%%time

from pandas import DataFrame


subm = DataFrame({'id': ids, 'target': targets })
                    
subm.to_csv('submission3.csv', index=False)

Wall time: 2.66 s


Finally, we will look at the table with the relative feature importances obtained as a result of training. We see that the high cardinality nominal features play the major role in classification decisions. Feature importances table is useful while making the actionable insights.

In [40]:

feature_imp = DataFrame( sorted(list(zip(pipelineModel.stages[-1].getInputCols(), 
                gbtModel.bestModel.featureImportances)), key = lambda t: t[1], reverse =True),
                       columns = ['feature', 'weight'])

feature_imp

Unnamed: 0,feature,weight
0,nom_6_idx,0.2601
1,nom_9_idx,0.258527
2,nom_5_idx,0.228987
3,ord_3_idx,0.045862
4,nom_8_idx,0.027048
5,nom_7_idx,0.026544
6,ord_0_idx,0.025208
7,month_idx,0.025045
8,ord_5_idx,0.023894
9,ord_2_idx,0.017023


In [41]:
gc.collect()

340

In [42]:
spark.stop()

## Conclusions

The 'submission3.csv' file was scored by Kaggle giving the value of the area under ROC curve metric equal to 0.63203 on the test set. This result is not very high, given that the standard implementation of the xgboost classifier resulted in the value of 0.77 for the same metric.

The purpose of this project was not to achieve the best score by using PySpark on a single machine, but rather to try a variety of methods built in the PySpark ML library which is more suitable for very large datasets trained on a cluster. We have convinced ourselves that the Gradient Boosting Tree classifier is probably the best one among the tree-based methods available in the pyspark.ml library, and that concomitant imputation of missing values is very convenient while performing fitting and transforming steps of the whole machine learning pipeline.  