# w261 Final Project - Clickthrough Rate Prediction


[Your team number (from the spreadsheet)]   
[Your team names]   
Summer 2019, section [Your section numbers>]   

## Table of Contents

* __Section 1__ - Question Formulation
* __Section 2__ - Algorithm Explanation
* __Section 3__ - EDA & Challenges
* __Section 4__ - Algorithm Implementation
* __Section 5__ - Course Concepts

# __Section 1__ - Question Formulation

# __Section 2__ - Algorithm Explanation

# __Section 3__ - EDA & Challenges

# __Section 4__ - Algorithm Implementation

# __Section 5__ - Course Concepts

### Setup and Initiate Spark

In [31]:
import re
import ast
import time
import itertools
import numpy as np
from numpy import allclose
import pandas as pd
import matplotlib.pyplot as plt
from pyspark.sql import Row
from pyspark.sql import SQLContext
import pyspark.sql.functions as F
from pyspark.ml.feature import StandardScaler
from pyspark.sql.functions import *
from pyspark.ml.classification import  RandomForestClassifier
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler, VectorSlicer
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import Imputer
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

In [32]:
%reload_ext autoreload
%autoreload 2
%matplotlib inline

In [33]:
# store path to notebook
PWD = !pwd
PWD = PWD[0]

In [34]:
# start Spark Session (RUN THIS CELL AS IS)
from pyspark.sql import SparkSession
app_name = "hw3_notebook"
master = "local[*]"
spark = SparkSession\
        .builder\
        .appName(app_name)\
        .master(master)\
        .getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)

### Feature Extraction

**Setup Path and read in raw data files**

In [35]:
# Read the toy data file
toy_raw = ast.literal_eval(open("data/toy.txt", "r").read())

# Read the 10K sample data file for feature extraction and data preparation
tenK_raw = ast.literal_eval(open("data/eda.txt", "r").read())

In [36]:
def parse_raw_row(row):
    '''
    for each row in the raw data,  output is a list of label and all the features:
        - [label, feature_1, feature_1, ...]
    For first 13 features, change the data type to number.
    Remaining features will of type string.
    For null values, populate None
    '''
    row_values = row.split('\t')
    for i, value in enumerate(row_values):
        if i <14:
            row_values[i] = float(value) if value != '' else None
        else:
            row_values[i] = value if value != '' else None
    # "''"
    return row_values

In [37]:
# Calculate click through rate frequency count of each category

def BinCategoricalFeatures(tenK_df4):
    '''
    takes a spark df with numerical and categorical columns
    outputs a spark df where all the categorical features are binned using custom logic
    '''
    exclude_list = ['_20', '_31', '_37']

    tenK_click_df = tenK_df4
    for n,i in enumerate(tenK_df4.dtypes):

        if i[1]=='string':

            feature = i[0]

            # frequency count of unique categories under each feature
            cat_freqDF = tenK_df4.groupBy(feature).count()

            # click through frequency count: count of 'label = 1' for each category
            click_freqDF = tenK_df4.where("_1 == 1").groupBy(feature, "_1").count()


            ## Calculate click through frequency ratio for each category:
            ##(count of 'label = 1'/total count)

            df1 = click_freqDF.alias('df1')
            df2 = cat_freqDF.alias('df2')
            if n == 0:
                df3 = tenK_df4.alias('df3')
            else:
                df3 = tenK_click_df.alias('df3')

            tenK_click_df = df1.join(df2, [feature]).join(df3, [feature]).select(feature, 'df3.*',
                                    (df1['count']/df2['count']).alias(feature+"_click"))

            ## End of click through frequency ratio calculation
            
            ###### Bin data into binary bins based on the click through rate(ctr).
            
            if i[0] not in exclude_list:

                # if ctr == 0, value = A
                # else value = B
                # Keep null values as it is
                tenK_click_df = tenK_click_df.withColumn(feature,
                F.when(tenK_click_df[feature+'_click'] == 0, F.lit("A"))
                .otherwise(F.lit("B")))


            elif i[0] in ['_20', '_31']:
                
                max_ctr = tenK_click_df.agg({feature+"_click": "max"}).collect()[0][0]
                ctr_threshold = max_ctr/2

                # if ctr == 0, value = A
                # if ctr > 0 and <= threshhold, value = B
                # else value = C
                # Keep null values as it is
                tenK_click_df = tenK_click_df.withColumn(feature,
                F.when(tenK_click_df[feature+'_click'] == 0, F.lit("A"))
                .otherwise(
                    F.when((tenK_click_df[feature+'_click'] > ctr_threshold)|(tenK_click_df[feature+'_click'] > ctr_threshold)
                       , F.lit("B"))
                    .otherwise(F.lit("C"))))

            elif i[0] == '_37':

                max_ctr = tenK_click_df.agg({feature+"_click": "max"}).collect()[0][0]
                ctr_threshold1 = max_ctr/3
                ctr_threshold2 = 2*ctr_threshold1
                
                # if ctr == 0, value = A
                # if ctr > 0 and <= threshhold1, value = B
                # if ctr > threshhold1 and <= threshhold2, value = C
                # else value = D
                # Keep null values as it is
                
                tenK_click_df = tenK_click_df.withColumn(feature,
                F.when(tenK_click_df[feature+'_click'] == 0, F.lit("A"))
                .otherwise(
                    F.when(((tenK_click_df[feature+'_click'] > 0) 
                            & ((tenK_click_df[feature+'_click'] < ctr_threshold1) | (tenK_click_df[feature+'_click'] == ctr_threshold1)))
                           , F.lit("B"))
                    .otherwise(
                        F.when(((tenK_click_df[feature+'_click'] > ctr_threshold1) 
                            & ((tenK_click_df[feature+'_click'] < ctr_threshold2) | (tenK_click_df[feature+'_click'] == ctr_threshold2)))
                           , F.lit("C"))
                        .otherwise(F.lit("D")))))

    tenK_df5 = tenK_click_df.drop('_15_click','_16_click','_19_click','_22_click','_25_click','_27_click',
                                 '_28_click','_29_click', '_31_click', '_32_click', '_37_click', '_38_click'
                                 ,'_20_click','_23_click','_31_click', '_37_click')

    tenK_df5.cache()
    return tenK_df5

In [38]:
# FeatureScore calculation using RandomForest Ensembling

def CalFeatureScore(tenK_df5):
    '''
    Takes input as a Spark DataFrame.
    Fit and transfor using Assembler Pipeline 
    Run RandomForestClassifier to output top performing 30 features
    '''
    
    def ExtractFeatureImp(featureImp, dataset, featuresCol):
        '''
        Function to display featureImportances in human readable format
        '''
        list_extract = []
        for i in dataset.schema[featuresCol].metadata["ml_attr"]["attrs"]:
            list_extract = list_extract + dataset.schema[featuresCol].metadata["ml_attr"]["attrs"][i]
        varlist = pd.DataFrame(list_extract)
        varlist['score'] = varlist['idx'].apply(lambda x: featureImp[x])
        return(varlist.sort_values('score', ascending = False))


    encoding_var = [i[0] for i in tenK_df5.dtypes if (i[1]=='string')]
    num_var = [i[0] for i in tenK_df5.dtypes if (i[1]!='string') & (i[0]!= '_1')]

    string_indexes = [StringIndexer(inputCol = c, outputCol = 'IDX_' + c, handleInvalid = 'keep')
                      for c in encoding_var]
    onehot_indexes = [OneHotEncoderEstimator(inputCols = ['IDX_' + c], outputCols = ['OHE_' + c])
                      for c in encoding_var]
    label_indexes = StringIndexer(inputCol = '_1', outputCol = 'label', handleInvalid = 'keep')
    assembler = VectorAssembler(inputCols = num_var + ['OHE_' + c for c in encoding_var]
                                , outputCol = "features")
    rf = RandomForestClassifier(labelCol="label", featuresCol="features", seed = 8464,
                                 numTrees=10, cacheNodeIds = True, subsamplingRate = 0.7)

    pipe = Pipeline(stages = string_indexes + onehot_indexes + [assembler, label_indexes, rf])
    
    ## fit into pipe

    mod = pipe.fit(tenK_df5)
    tenK_df6 = mod.transform(tenK_df5)
    
    varlist = ExtractFeatureImp(mod.stages[-1].featureImportances, tenK_df6, "features")
    top_features = [x for x in varlist['name'][0:30]]
    
    return top_features

In [39]:
#Create data frame with one-hot encoding for categorical variables

def one_hot_encode(tenK_df5, top_features):
    '''
    Create data frame with one-hot encoding for categorical variables
    Take input as Spark Data Frame
    Output Spark DataFrame with hot-encoding
    '''
    
    one_hot = tenK_df5.toPandas()
    encoding_var = [i[0] for i in tenK_df5.dtypes if (i[1]=='string')]
    for col in encoding_var:
        one_hot_pd = pd.concat([one_hot,pd.get_dummies(one_hot[col], prefix='OHE_'+col,dummy_na=False)],axis=1).drop([col],axis=1)
        one_hot = one_hot_pd

    one_hot_df = spark.createDataFrame(one_hot_pd)

    ###Keep the columns recommended by RandomForestClassifier

    curr_col = one_hot_df.columns
    col_to_drop = [x for x in curr_col if x not in top_features and x != '_1']

    tenK_df7 = one_hot_df
    for col in col_to_drop:
        tenK_df7 = tenK_df7.drop(col)
        
    return tenK_df7

In [40]:
# use average imputer for null values

def imputeNumeric(numeric_DF):
    '''
    takes a spark df with continuous numeric columns
    outputs a spark df where all null values are replaced with the column average
    
    the first column, which is the outcome values, are preserved
    '''
    outputColumns=["{}".format(c) for c in numeric_DF.columns[1:11]]
    catColumns = ["{}".format(c) for c in numeric_DF.columns[11:]]
    
    imputer = Imputer(
        inputCols=numeric_DF.columns[1:11], 
        outputCols=["{}".format(c) for c in numeric_DF.columns[1:11]]
    )

    model = imputer.fit(numeric_DF)

    imputedDF = model.transform(numeric_DF).select(['_1']+outputColumns+catColumns)

    return imputedDF

In [41]:
def scaleFeatures(inputedDF):
    '''
    inputs imputed data frame with no null values and continuous features
    transforms the data frame into 2 column data frame with first column as label and second column as dense vector of features
    scales all features using the StandardScalar
    returns 2 column dataframe with scaled features
    '''
    
    transformedImputedDF = inputedDF.rdd.map(lambda x: (x[0], Vectors.dense(x[1:11]))).toDF(['label', 'x'])
    
    
    scaler = StandardScaler(inputCol="x", 
                        outputCol="features",
                        withStd=True, withMean=True)

    scalerModel = scaler.fit(transformedImputedDF)
    scaledDF = scalerModel.transform(transformedImputedDF).select(['label', 'features'])
    
    return scaledDF

### Parse Row Into Readable formats

In [42]:
# parse raw toy data to form toyRDD
toyRDD = sc.parallelize(toy_raw).map(parse_raw_row).cache()

# parse raw 10k sample data to form tenKRDD
tenKRDD = sc.parallelize(tenK_raw).map(parse_raw_row).cache()

## Create Dataframe for Feature Engineering ##

In [43]:
#### Create SQL dataframe from RDD

# for toy data
toyFeature_df = sqlContext.createDataFrame(toyRDD)

# for 10K sample data
tenKfeature_df = sqlContext.createDataFrame(tenKRDD)

## Feature Extraction

**1: Remove features with very large number of unknown data**  

From EDA we see that the below feature columns have more than 40% null values.  
Due to this high percenage of unknown data we won't keep these features in our model. So dropping these columns from our data frame.

In [44]:
# drop features with high unknown values

toy_df1 = toyFeature_df.drop('_13','_36','_2','_11','_33','_34','_39','_40')
tenK_df1 = tenKfeature_df.drop('_13','_36','_2','_11','_33','_34','_39','_40')

tenK_df1.show(1)

+---+---+---+----+---+----+---+----+----+---+----+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+
| _1| _3| _4|  _5| _6|  _7| _8|  _9| _10|_12| _14|     _15|     _16|     _17|     _18|     _19|     _20|     _21|     _22|     _23|     _24|     _25|     _26|     _27|     _28|     _29|     _30|     _31|     _32|     _35|     _37|     _38|
+---+---+---+----+---+----+---+----+----+---+----+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+
|0.0|6.0|3.0|34.0|4.0|27.0|3.0|33.0|34.0|1.0|15.0|05db9164|028bd518|77f2f2e5|d16679b9|25c83c98|fbad5c96|2ecb612f|5b392875|a73ee510|3b08e48b|4efc1873|9f32b866|f15f3681|b28479f6|9559bea6|31ca40b6|07c540c4|2a40f0da|dfcfc3fa|32c7478e|aee52b6f|
+---+---+---+----+---+----+---+----+----

**2. Remove Categorical features with high % of Uniqueness of Categories**  

From EDA, we see that for the following categorical features uniqueness is more than 50%. When uniqueness of a feature is more than 50% it should not be having much impact on label prediction. So will remove those columns from our model.  
_17, _18, _21, _24, _26, _30, _35

In [45]:
toy_df2 = toy_df1.drop('_17','_18','_21','_24','_26','_30','_35')
tenK_df2 = tenK_df1.drop('_17','_18','_21','_24','_26','_30','_35')
tenK_df2.show(5)

+---+-----+----+----+------+-----+----+----+-----+---+----+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+
| _1|   _3|  _4|  _5|    _6|   _7|  _8|  _9|  _10|_12| _14|     _15|     _16|     _19|     _20|     _22|     _23|     _25|     _27|     _28|     _29|     _31|     _32|     _37|     _38|
+---+-----+----+----+------+-----+----+----+-----+---+----+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+
|0.0|  6.0| 3.0|34.0|   4.0| 27.0| 3.0|33.0| 34.0|1.0|15.0|05db9164|028bd518|25c83c98|fbad5c96|5b392875|a73ee510|4efc1873|f15f3681|b28479f6|9559bea6|07c540c4|2a40f0da|32c7478e|aee52b6f|
|0.0|521.0| 1.0| 2.0| 512.0| 21.0| 3.0|18.0| 44.0|1.0| 2.0|5bfa8ab5|38a947a1|25c83c98|    null|0b153874|a73ee510|b91c2548|a03da696|b28479f6|65afeec4|e5ba7672|b133fcd4|bcdee96c|8d365d3b|
|1.0|  0.0| 1.0| 1.0|4982.0| null| 0.0| 1.0| 10.0|0.0| 1.0|5a9ed9b0|0b

**3. Replace null values in numerical variables with mean**

In [46]:
##Replace null with mean for numerical features

tenK_df4 = imputeNumeric(tenK_df2)
tenK_df4.cache()
tenK_df4.show(1,False)

+---+---+---+----+---+----+---+----+----+---+----+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+
|_1 |_3 |_4 |_5  |_6 |_7  |_8 |_9  |_10 |_12|_14 |_15     |_16     |_19     |_20     |_22     |_23     |_25     |_27     |_28     |_29     |_31     |_32     |_37     |_38     |
+---+---+---+----+---+----+---+----+----+---+----+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+
|0.0|6.0|3.0|34.0|4.0|27.0|3.0|33.0|34.0|1.0|15.0|05db9164|028bd518|25c83c98|fbad5c96|5b392875|a73ee510|4efc1873|f15f3681|b28479f6|9559bea6|07c540c4|2a40f0da|32c7478e|aee52b6f|
+---+---+---+----+---+----+---+----+----+---+----+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+
only showing top 1 row



**4: Binning of Categories**  

From EDA we reviewed the pattern of the distribution of each categorical feature. Depending on the type of distribution have binned the categories

> Check frequency of unique categories of Categorical Features

In [47]:
#### Customize binning for categorical features

tenK_df5 = BinCategoricalFeatures(tenK_df4)
tenK_df5.show(20,False)

+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+-----+-----------------+-----------------+--------+------------------+----+----+------+----+-----------------+
|_38|_37|_32|_31|_29|_28|_27|_25|_23|_22|_20|_19|_16|_15|_1 |_3   |_4               |_5               |_6      |_7                |_8  |_9  |_10   |_12 |_14              |
+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+-----+-----------------+-----------------+--------+------------------+----+----+------+----+-----------------+
|B  |D  |B  |B  |B  |B  |B  |B  |B  |B  |C  |B  |B  |B  |1.0|0.0  |2.0              |2.0              |13140.0 |638.0             |7.0 |20.0|600.0 |4.0 |8.0              |
|B  |D  |B  |B  |B  |B  |B  |B  |B  |B  |C  |B  |B  |B  |1.0|3.0  |1.0              |1.0              |6396.0  |11.0              |15.0|1.0 |147.0 |7.0 |1.0              |
|B  |B  |B  |C  |B  |B  |B  |B  |B  |B  |C  |B  |B  |B  |1.0|79.0 |37.0             |7.415154749199573|678.0   |111.90972894482091|0.0 |15.0

**5: Run RandomForest ensemble to check featureImportances**:

With remaining features we will run RandomForest ensemble classifier to check featureImportances matrices.  
We will extract the Features with higher featureImportances scores for our model.  


In [48]:
### Call RandomForest Classifier to retrieve top performing features
top_features = CalFeatureScore(tenK_df5)
print(top_features)

['_7', '_14', '_6', '_8', '_9', '_10', '_12', '_3', 'OHE__31_B', '_5', '_4', 'OHE__31_C', 'OHE__37_B', 'OHE__37_C', 'OHE__37_D', 'OHE__20_B', 'OHE__20_C', 'OHE__16_B', 'OHE__19_B', 'OHE__23_B', 'OHE__22_B', 'OHE__32_B', 'OHE__25_B', 'OHE__27_B', 'OHE__28_B', 'OHE__29_B', 'OHE__38_B', 'OHE__15_B']


**6. One hot-encoding of categorical variables**

In [49]:
### Call one-hot encoding

tenK_df7 = one_hot_encode(tenK_df5, top_features)
tenK_df7.show(5, False)

+---+----+-----------------+-----------------+-------+------------------+----+----+-----+----+-----------------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+
|_1 |_3  |_4               |_5               |_6     |_7                |_8  |_9  |_10  |_12 |_14              |OHE__38_B|OHE__37_B|OHE__37_C|OHE__37_D|OHE__32_B|OHE__31_B|OHE__31_C|OHE__29_B|OHE__28_B|OHE__27_B|OHE__25_B|OHE__23_B|OHE__22_B|OHE__20_B|OHE__20_C|OHE__19_B|OHE__16_B|OHE__15_B|
+---+----+-----------------+-----------------+-------+------------------+----+----+-----+----+-----------------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+
|1.0|0.0 |2.0              |2.0              |13140.0|638.0             |7.0 |20.0|600.0|4.0 |8.0              |1        

## Format Data to be used for Model Run

**Split data into numerical and categorical features**

In [50]:
### Build separate RDD for Categorical columns

catDF = tenK_df7.select([c for c in tenK_df7.columns if 'OHE' in c ])
catRDD = catDF.rdd
catRDD.take(5)

[Row(OHE__38_B=1, OHE__37_B=0, OHE__37_C=0, OHE__37_D=1, OHE__32_B=1, OHE__31_B=1, OHE__31_C=0, OHE__29_B=1, OHE__28_B=1, OHE__27_B=1, OHE__25_B=1, OHE__23_B=1, OHE__22_B=1, OHE__20_B=0, OHE__20_C=1, OHE__19_B=1, OHE__16_B=1, OHE__15_B=1),
 Row(OHE__38_B=1, OHE__37_B=0, OHE__37_C=0, OHE__37_D=1, OHE__32_B=1, OHE__31_B=1, OHE__31_C=0, OHE__29_B=1, OHE__28_B=1, OHE__27_B=1, OHE__25_B=1, OHE__23_B=1, OHE__22_B=1, OHE__20_B=0, OHE__20_C=1, OHE__19_B=1, OHE__16_B=1, OHE__15_B=1),
 Row(OHE__38_B=1, OHE__37_B=1, OHE__37_C=0, OHE__37_D=0, OHE__32_B=1, OHE__31_B=0, OHE__31_C=1, OHE__29_B=1, OHE__28_B=1, OHE__27_B=1, OHE__25_B=1, OHE__23_B=1, OHE__22_B=1, OHE__20_B=0, OHE__20_C=1, OHE__19_B=1, OHE__16_B=1, OHE__15_B=1),
 Row(OHE__38_B=1, OHE__37_B=0, OHE__37_C=1, OHE__37_D=0, OHE__32_B=1, OHE__31_B=1, OHE__31_C=0, OHE__29_B=1, OHE__28_B=1, OHE__27_B=1, OHE__25_B=1, OHE__23_B=1, OHE__22_B=1, OHE__20_B=0, OHE__20_C=1, OHE__19_B=1, OHE__16_B=1, OHE__15_B=1),
 Row(OHE__38_B=1, OHE__37_B=0, OHE__37_C

**Standardize numerical data**

In [51]:
### Standardize numerical column and Build separate RDD for Numerical columns

numericDF = scaleFeatures(tenK_df7)
numRDD = numericDF.rdd
numRDD.take(5)

[Row(label=1.0, features=DenseVector([-0.2714, -0.1977, -0.7073, -0.0452, 2.2576, -0.2357, 0.4622, 2.3658, 0.1294, 0.0155])),
 Row(label=1.0, features=DenseVector([-0.2648, -0.2069, -0.8412, -0.1495, -0.3809, -0.0832, -0.8294, 0.1707, 0.6753, -0.7331])),
 Row(label=1.0, features=DenseVector([-0.0972, 0.1248, 0.0176, -0.238, 0.0437, -0.369, 0.1223, -0.4689, -0.5983, 0.06])),
 Row(label=1.0, features=DenseVector([-0.2052, 0.0902, 0.0176, -0.1909, -0.4104, 1.4982, -0.8974, -0.1248, 1.2211, 0.06])),
 Row(label=0.0, features=DenseVector([-0.2515, -0.1977, 0.3636, -0.2291, -0.3473, -0.3309, 0.0543, -0.464, -0.4164, 0.5502]))]

In [52]:
### Combine both the RDD-s to build full data RDD

FullDataRDD = numRDD.zip(catRDD)

FullDataRDD1 =  FullDataRDD.map(lambda x: (x[0][0], np.array(x[0][1]), np.array(x[1])))\
                           .map(lambda x: (x[0], np.append(x[1], x[2])))

FullDataRDD2 = FullDataRDD1.map(lambda x: (x[0],Vectors.dense(x[1])))


In [53]:
FullDataRDD1.take(5)

[(1.0, array([-0.2713806 , -0.19770746, -0.70734428, -0.04519599,  2.25755412,
         -0.23566383,  0.46222047,  2.36579736,  0.1294362 ,  0.01552995,
          1.        ,  0.        ,  0.        ,  1.        ,  1.        ,
          1.        ,  0.        ,  1.        ,  1.        ,  1.        ,
          1.        ,  1.        ,  1.        ,  0.        ,  1.        ,
          1.        ,  1.        ,  1.        ])),
 (1.0, array([-0.26476714, -0.20692072, -0.84120987, -0.14951418, -0.38094305,
         -0.08323582, -0.82944416,  0.17074711,  0.67525364, -0.73306695,
          1.        ,  0.        ,  0.        ,  1.        ,  1.        ,
          1.        ,  0.        ,  1.        ,  1.        ,  1.        ,
          1.        ,  1.        ,  1.        ,  0.        ,  1.        ,
          1.        ,  1.        ,  1.        ])),
 (1.0, array([-0.09722618,  0.12475676,  0.01755856, -0.2379619 ,  0.04369815,
         -0.36903833,  0.12230873, -0.46887018, -0.5983204 ,  0.05997

In [54]:

FullDataRDD2.take(5)

[(1.0,
  DenseVector([-0.2714, -0.1977, -0.7073, -0.0452, 2.2576, -0.2357, 0.4622, 2.3658, 0.1294, 0.0155, 1.0, 0.0, 0.0, 1.0, 1.0, 1.0, 0.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 1.0, 1.0, 1.0, 1.0])),
 (1.0,
  DenseVector([-0.2648, -0.2069, -0.8412, -0.1495, -0.3809, -0.0832, -0.8294, 0.1707, 0.6753, -0.7331, 1.0, 0.0, 0.0, 1.0, 1.0, 1.0, 0.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 1.0, 1.0, 1.0, 1.0])),
 (1.0,
  DenseVector([-0.0972, 0.1248, 0.0176, -0.238, 0.0437, -0.369, 0.1223, -0.4689, -0.5983, 0.06, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 1.0, 1.0, 1.0, 1.0])),
 (1.0,
  DenseVector([-0.2052, 0.0902, 0.0176, -0.1909, -0.4104, 1.4982, -0.8974, -0.1248, 1.2211, 0.06, 1.0, 0.0, 1.0, 0.0, 1.0, 1.0, 0.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 1.0, 1.0, 1.0, 1.0])),
 (0.0,
  DenseVector([-0.2515, -0.1977, 0.3636, -0.2291, -0.3473, -0.3309, 0.0543, -0.464, -0.4164, 0.5502, 1.0, 0.0, 1.0, 0.0, 1.0, 1.0, 0.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 1.0, 1.0, 1.0, 1.0]))]

**3. Colenearity Reduction** 

EDA shows strong correlation between below numerical features:  
> _5 and _14  
> _5 and _9  
> _9 and _14  
> _8 and _12  
There is also a moderate negative correlation for feature:  
> _6 and _11  
> _7 and _11.  
To avoid co-leniarity we will remove feature _14, _9, _6, _7 and _8