# Feature Engineering and Selection

In this notebook, we are working through the engineering of all of our features and then creating parquet versions of the file for ease of use in our main notebook. The approach is described in detail in our main notebook

## Setup

In [1]:
# imports
import re
import ast
import time
import shutil
import os
import copy
import numpy as np
import seaborn as sns
import networkx as nx
import matplotlib.pyplot as plt
from IPython.display import display

from pyspark.sql import Window, Row
from pyspark.sql.functions import col, desc, mean, isnan, when, count, isnull, rank, sum, countDistinct, avg, stddev, round, lit, rand, broadcast, udf, log, monotonically_increasing_id
from pyspark.sql.types import LongType, IntegerType, StringType, DoubleType, ArrayType, FloatType
from pyspark.sql.window import Window

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, OneHotEncoderEstimator, VectorAssembler, MinMaxScaler, Imputer
from pyspark.ml.stat import Correlation
from pyspark.mllib.stat import Statistics

from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

import warnings
warnings.filterwarnings('ignore')

In [2]:
%reload_ext autoreload
%autoreload 2

In [3]:
# store path to notebook
PWD = !pwd
PWD = PWD[0]

In [4]:
# start Spark Session
from pyspark.sql import SparkSession
app_name = "final_project"
master = "local[*]"
spark = SparkSession\
        .builder\
        .appName(app_name)\
        .master(master)\
        .getOrCreate()
sc = spark.sparkContext
spark

REMINDER: If you are running this notebook on the course docker container, you can monitor the progress of your jobs using the Spark UI at: http://localhost:4040/jobs/

In [5]:
sc = spark.sparkContext

In [6]:
# read parquet file
# will focus on 10,000 for most of our EDA
start = time.time()
train_1_million_df = spark.read.parquet("data/train_1_million.parquet")
train_100000_df    = spark.read.parquet("data/train_100000.parquet")
train_10000_df     = spark.read.parquet("data/train_10000.parquet")
train_1000_df      = spark.read.parquet("data/train_1000.parquet")
print(f'... completed job in {time.time() - start} seconds.')

... completed job in 3.08327054977417 seconds.


In [7]:
# Splitting train and test dataframe
splits = train_1_million_df.randomSplit([0.8, 0.2], seed = 2019)
df = splits[0]
test_df = splits[1]

<br>

## Functions

In [8]:
def cat_recoding(column):
    if column > 0.6:
        return 'H6'
    if column > 0.1:
        return 'H1'
    return 'L'

func_udf = udf(cat_recoding, StringType())

In [9]:
def null_as_Missing(x):
    return when(col(x).isNotNull(), col(x)).otherwise('L')

In [10]:
def null_as_Missing_2(x):
    return when(col(x).isNotNull(), col(x)).otherwise('M')

In [11]:
def parquet_files(directory_name, df, col_1, col_2):
    start = time.time()
    
    # delete parquet directory if exist
    if os.path.exists(directory_name):
        print('\ndeleting', directory_name)
        shutil.rmtree(directory_name)

    # First, we are creating few partition buckets
    df.write.partitionBy(col_1, col_2).parquet(directory_name)
    
    print(f'... completed {directory_name} parquet file job in {time.time() - start} seconds.')

<br>

## Numeric Transformations

### Train

In [12]:
df_dep = df.select(df.columns[0])

In [13]:
# making all columns floats and taking the log transformations
df_num = df.select(*(log(col(i) + 1).cast("float").alias(i) for i in df.columns[1:14]))

In [14]:
# imputing the mean for the null values
imputer = Imputer(strategy = "mean", inputCols = df_num.columns, 
                  outputCols = [i for i in df_num.columns])
df_num_mean_fit = imputer.fit(df_num)
df_num_mean = df_num_mean_fit.transform(df_num)

In [15]:
# creating a MinMax scaler
assembler_mean = VectorAssembler(inputCols = df_num_mean.columns, outputCol = "features")
transformed_mean = assembler_mean.transform(df_num_mean)
scaler_mean = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
scalerModel_mean = scaler_mean.fit(transformed_mean.select("features"))
scaledData_mean = scalerModel_mean.transform(transformed_mean).drop("features")
df_num_mean = scaledData_mean.select(scaledData_mean.columns[13:])

In [16]:
df_num_zero = df_num.fillna(0)

In [17]:
# creating a MinMax scaler
assembler_zero = VectorAssembler(inputCols = df_num_zero.columns, outputCol = "features")
transformed_zero = assembler_zero.transform(df_num_zero)
scaler_zero = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
scalerModel_zero = scaler_zero.fit(transformed_zero.select("features"))
scaledData_zero = scalerModel_zero.transform(transformed_zero).drop("features")
df_num_zero = scaledData_zero.select(scaledData_zero.columns[13:])

### Test

In [18]:
test_df_dep = test_df.select(test_df.columns[0])

In [19]:
# making all columns floats
test_df_num = test_df.select(*(log(col(i)).cast("float").alias(i) for i in test_df.columns[1:14]))

In [20]:
# imputing the train numeric mean for the null test values
test_df_num_mean = df_num_mean_fit.transform(test_df_num)

In [21]:
# transforming the min max scaler for the test set
assembler_mean_test = VectorAssembler(inputCols = test_df_num_mean.columns, outputCol = "features")
transformed_mean_test = assembler_mean_test.transform(test_df_num_mean)

scaledData_mean_test = scalerModel_mean.transform(transformed_mean_test).drop("features")
test_df_num_mean = scaledData_mean_test.select(scaledData_mean_test.columns[13:])

In [22]:
test_df_num_zero = test_df_num.fillna(0)

In [23]:
# transforming the min max scaler for the test set
assembler_zero_test = VectorAssembler(inputCols = test_df_num_zero.columns, outputCol = "features")
transformed_zero_test = assembler_zero_test.transform(test_df_num_zero)

scaledData_zero_test = scalerModel_zero.transform(transformed_zero_test).drop("features")
test_df_num_zero = scaledData_zero_test.select(scaledData_zero_test.columns[13:])

<br>

## Categorical Transformations

### Recode Categories as Average of Dependent Variable for Train

In [24]:
df_cat = df

In [25]:
start = time.time()

frames = {}
for i in df_cat.columns[14:]:
    recode_df = df_cat.groupby(i, '_c0').count().sort(desc('count'))
    recode_df = recode_df.groupBy(i).agg((sum(recode_df['_c0'] * recode_df['count'])/
                                          sum(recode_df['count'])).alias(i + "_wv")).sort(i)
    frames['frame{}'.format(i)] = recode_df
    
print(f'... completed job in {time.time() - start} seconds.')

... completed job in 0.8594028949737549 seconds.


In [26]:
# Showing one example
start = time.time()
frames['frame{}'.format('_c14')].show()
print(f'... completed job in {time.time() - start} seconds.')

+--------+-------+
|    _c14|_c14_wv|
+--------+-------+
|000d72dd|   0.25|
|003f4253|    1.0|
|00c2e152|    0.0|
|010e5266|    0.0|
|0121ecd4|    0.0|
|0158c7d2|    0.0|
|024d3a13|    0.0|
|0284361c|    0.5|
|02f0e0c9|    0.0|
|02f970ca|    0.8|
|037aa84d|    0.0|
|0434607f|    0.4|
|04eb6da9|    1.0|
|04f97c29|    0.0|
|05048b8c|    0.0|
|054f9f17|    0.0|
|0550a183|    0.5|
|058a7626|    1.0|
|05930803|    0.0|
|05c26cc9|    0.5|
+--------+-------+
only showing top 20 rows

... completed job in 6.966044902801514 seconds.


In [27]:
start = time.time()

# The columns will get longer and longer without this specification
recode_cols = df_cat.columns[14:]

for i in recode_cols:
    df_cat = df_cat.join(frames['frame{}'.format(i)], on = i, how = "left")

# only keeping the weighted value categories
df_cat = df_cat.select(df_cat.columns[40:])

print(f'... completed job in {time.time() - start} seconds.')

... completed job in 0.7008023262023926 seconds.


In [28]:
start = time.time()

# imputing the mean for the null values
imputer = Imputer(strategy = "mean", inputCols = df_cat.columns, 
                  outputCols = [i for i in df_cat.columns])

df_cat_wv_fit = imputer.fit(df_cat)
df_cat_wv = df_cat_wv_fit.transform(df_cat)

print(f'... completed job in {time.time() - start} seconds.')

... completed job in 190.8936688899994 seconds.


### Recode Categories as Average of Dependent Variable for Test

In [29]:
test_df_cat = test_df

In [30]:
start = time.time()

# including the weighted value training data transformations on our test data
for i in recode_cols:
    test_df_cat = test_df_cat.join(frames['frame{}'.format(i)], on = i, how = "left")
    
# only keeping the weighted value categories
test_df_cat = test_df_cat.select(test_df_cat.columns[40:])

print(f'... completed job in {time.time() - start} seconds.')

... completed job in 0.6154842376708984 seconds.


In [31]:
# imputing the train categorical mean for the null test values
test_df_cat_wv = df_cat_wv_fit.transform(test_df_cat)

<br>

### Recode Categories as Relative Categories to Dependent Variable for Train

In [32]:
df_cat = df

In [33]:
start = time.time()

frames = {}
for i in df_cat.columns[14:]:
    recode_df = df_cat.groupby(i, '_c0').count().sort(desc('count'))
    recode_df = recode_df.groupBy(i).agg((sum(recode_df['_c0'] * recode_df['count'])/
                                          sum(recode_df['count'])).alias(i+"_p")).sort(desc(i+"_p"))
    recode_df = recode_df.withColumn(i+'_wgt', when(col(i+"_p")> 0.2, 'H').otherwise('L'))
    frames['frame{}'.format(i)] = recode_df.select([i,i+'_wgt'])
    
print(f'... completed job in {time.time() - start} seconds.')

# taking the average
# this replaces the first and second recode_df
# recode_df = df_cat.groupBy(i).agg((mean(df_cat['_c0'])).alias(i+"_p")).sort(desc(i+"_p"))

... completed job in 1.2210311889648438 seconds.


In [34]:
# Showing one example
start = time.time()
frames['frame{}'.format('_c30')].show()
print(f'... completed job in {time.time() - start} seconds.')

+--------+--------+
|    _c30|_c30_wgt|
+--------+--------+
|8efede7f|       H|
|e5ba7672|       H|
|27c07bd6|       H|
|3486227d|       H|
|07c540c4|       H|
|d4bb7bd8|       L|
|1e88c74f|       L|
|776ce399|       L|
|2005abd1|       L|
|af5d780c|       L|
+--------+--------+

... completed job in 5.990954875946045 seconds.


In [35]:
start = time.time()

# The columns will get longer and longer without this specification
recode_cols = df_cat.columns[14:]

for i in recode_cols:
    df_cat = df_cat.join(frames['frame{}'.format(i)], on = i, how = "left")
    
# only keeping the transformed categories
df_cat_wgt_HL = df_cat.select(df_cat.columns[40:])

print(f'... completed job in {time.time() - start} seconds.')

... completed job in 0.5451486110687256 seconds.


In [36]:
# change null to 'L' category
for i in df_cat_wgt_HL.columns:
    df_cat_wgt_HL = df_cat_wgt_HL.withColumn(i, null_as_Missing(i))

In [37]:
# see the recoded variables
start = time.time()
print("Counts for Each Category:")
for i in df_cat_wgt_HL.columns[:1]: 
    df_cat_wgt_HL.groupby(i).count().sort(desc('count')).show()
print(f'... completed job in {time.time() - start} seconds.')

Counts for Each Category:
+--------+------+
|_c14_wgt| count|
+--------+------+
|       H|607941|
|       L|  2426|
+--------+------+

... completed job in 157.56113076210022 seconds.


<br>

### Recode Categories as Relative Categories to Dependent Variable for Test

In [38]:
test_df_cat = test_df

In [39]:
start = time.time()

# The columns will get longer and longer without this specification
recode_cols = test_df_cat.columns[14:]

for i in recode_cols:
    test_df_cat = test_df_cat.join(frames['frame{}'.format(i)], on = i, how = "left")
    
# only keeping the transformed categories
test_df_cat_wgt_HL = test_df_cat.select(test_df_cat.columns[40:])

print(f'... completed job in {time.time() - start} seconds.')

... completed job in 0.5343544483184814 seconds.


In [40]:
# change null to 'L' category (same as the train set)
for i in test_df_cat_wgt_HL.columns:
    test_df_cat_wgt_HL = test_df_cat_wgt_HL.withColumn(i, null_as_Missing(i))

In [41]:
# see the recoded variables
start = time.time()
print("Counts for Each Category:")
for i in test_df_cat_wgt_HL.columns[:1]: 
    test_df_cat_wgt_HL.groupby(i).count().sort(desc('count')).show()
print(f'... completed job in {time.time() - start} seconds.')

Counts for Each Category:
+--------+------+
|_c14_wgt| count|
+--------+------+
|       H|152429|
|       L|   644|
+--------+------+

... completed job in 163.7044665813446 seconds.


<br>

### Recode Categories as Relative Categories to Dependent Variable for Train with M

In [42]:
df_cat = df

In [43]:
start = time.time()

frames = {}
for i in df_cat.columns[14:]:
    recode_df = df_cat.groupby(i, '_c0').count().sort(desc('count'))
    recode_df = recode_df.groupBy(i).agg((sum(recode_df['_c0'] * recode_df['count'])/
                                          sum(recode_df['count'])).alias(i+"_p")).sort(desc(i+"_p"))
    recode_df = recode_df.withColumn(i+'_wgt', func_udf(col(i+"_p")))
    frames['frame{}'.format(i)] = recode_df.select([i,i+'_wgt'])
    
print(f'... completed job in {time.time() - start} seconds.')

# taking the average
# this replaces the first and second recode_df
# recode_df = df_cat.groupBy(i).agg((mean(df_cat['_c0'])).alias(i+"_p")).sort(desc(i+"_p"))

... completed job in 1.1528358459472656 seconds.


In [44]:
# Showing one example
start = time.time()
frames['frame{}'.format('_c30')].show()
print(f'... completed job in {time.time() - start} seconds.')

+--------+--------+
|    _c30|_c30_wgt|
+--------+--------+
|8efede7f|      H1|
|e5ba7672|      H1|
|27c07bd6|      H1|
|3486227d|      H1|
|07c540c4|      H1|
|d4bb7bd8|      H1|
|1e88c74f|      H1|
|776ce399|      H1|
|2005abd1|      H1|
|af5d780c|       L|
+--------+--------+

... completed job in 5.765534400939941 seconds.


In [45]:
start = time.time()

# The columns will get longer and longer without this specification
recode_cols = df_cat.columns[14:]

for i in recode_cols:
    df_cat = df_cat.join(frames['frame{}'.format(i)], on = i, how = "left")
print(f'... completed job in {time.time() - start} seconds.')

... completed job in 0.5545010566711426 seconds.


In [46]:
# only keeping the transformed categories
df_cat_wgt_HLM = df_cat.select(df_cat.columns[40:])

In [47]:
# change null to 'M' category
for i in df_cat_wgt_HLM.columns:
    df_cat_wgt_HLM = df_cat_wgt_HLM.withColumn(i, null_as_Missing_2(i))

In [48]:
# see the recoded variables
start = time.time()
print("Counts for Each Category:")
for i in df_cat_wgt_HLM.columns[:1]: 
    df_cat_wgt_HLM.groupby(i).count().sort(desc('count')).show()
print(f'... completed job in {time.time() - start} seconds.')

Counts for Each Category:
+--------+------+
|_c14_wgt| count|
+--------+------+
|      H1|609243|
|       L|   925|
|      H6|   199|
+--------+------+

... completed job in 161.1223657131195 seconds.


<br>

### Recode Categories as Relative Categories to Dependent Variable for Test with M

In [49]:
test_df_cat = test_df

In [50]:
start = time.time()

# The columns will get longer and longer without this specification
recode_cols = test_df_cat.columns[14:]

for i in recode_cols:
    test_df_cat = test_df_cat.join(frames['frame{}'.format(i)], on = i, how = "left")
print(f'... completed job in {time.time() - start} seconds.')

... completed job in 0.49120020866394043 seconds.


In [51]:
# only keeping the transformed categories
test_df_cat_wgt_HLM = test_df_cat.select(df_cat.columns[40:])

In [52]:
# change null to 'L' category (same as the train set)
for i in test_df_cat_wgt_HLM.columns:
    test_df_cat_wgt_HLM = test_df_cat_wgt_HLM.withColumn(i, null_as_Missing_2(i))

In [53]:
# see the recoded variables
start = time.time()
print("Counts for Each Category:")
for i in test_df_cat_wgt_HLM.columns[:1]: 
    test_df_cat_wgt_HLM.groupby(i).count().sort(desc('count')).show()
print(f'... completed job in {time.time() - start} seconds.')

Counts for Each Category:
+--------+------+
|_c14_wgt| count|
+--------+------+
|      H1|152717|
|       L|   226|
|       M|    85|
|      H6|    45|
+--------+------+

... completed job in 158.9292495250702 seconds.


<br>

# Combining Files

## Add indices to each dataframe for joining

### Train

In [54]:
df_dep = df_dep.withColumn("index", monotonically_increasing_id())
df_num_zero = df_num_zero.withColumn("index", monotonically_increasing_id())
df_num_mean = df_num_mean.withColumn("index", monotonically_increasing_id())
df_cat_wv = df_cat_wv.withColumn("index", monotonically_increasing_id())
df_cat_wgt_HL = df_cat_wgt_HL.withColumn("index", monotonically_increasing_id())
df_cat_wgt_HLM = df_cat_wgt_HLM.withColumn("index", monotonically_increasing_id())

In [55]:
start = time.time()

df_zero_wv = df_num_zero.join(df_cat_wv, on = "index", how = "left").join(df_dep, on = "index", how = "left")
df_mean_wv = df_num_mean.join(df_cat_wv, on = "index", how = "left").join(df_dep, on = "index", how = "left")
df_zero_wgt_HL = df_num_zero.join(df_cat_wgt_HL, on = "index", how = "left").join(df_dep, on = "index", how = "left")
df_mean_wgt_HL = df_num_mean.join(df_cat_wgt_HL, on = "index", how = "left").join(df_dep, on = "index", how = "left")
df_zero_wgt_HLM = df_num_zero.join(df_cat_wgt_HLM, on = "index", how = "left").join(df_dep, on = "index", how = "left")
df_mean_wgt_HLM = df_num_mean.join(df_cat_wgt_HLM, on = "index", how = "left").join(df_dep, on = "index", how = "left")

print(f'... completed job in {time.time() - start} seconds.')

... completed job in 0.16069340705871582 seconds.


### Test

In [56]:
test_df_dep = test_df_dep.withColumn("index", monotonically_increasing_id())
test_df_num_zero = test_df_num_zero.withColumn("index", monotonically_increasing_id())
test_df_num_mean = test_df_num_mean.withColumn("index", monotonically_increasing_id())
test_df_cat_wv = test_df_cat_wv.withColumn("index", monotonically_increasing_id())
test_df_cat_wgt_HL = test_df_cat_wgt_HL.withColumn("index", monotonically_increasing_id())
test_df_cat_wgt_HLM = test_df_cat_wgt_HLM.withColumn("index", monotonically_increasing_id())

In [57]:
start = time.time()

test_df_zero_wv = test_df_num_zero.join(test_df_cat_wv, on = "index", how = "left").join(test_df_dep, on = "index", how = "left")
test_df_mean_wv = test_df_num_mean.join(test_df_cat_wv, on = "index", how = "left").join(test_df_dep, on = "index", how = "left")
test_df_zero_wgt_HL = test_df_num_zero.join(test_df_cat_wgt_HL, on = "index", how = "left").join(test_df_dep, on = "index", how = "left")
test_df_mean_wgt_HL = test_df_num_mean.join(test_df_cat_wgt_HL, on = "index", how = "left").join(test_df_dep, on = "index", how = "left")
test_df_zero_wgt_HLM = test_df_num_zero.join(test_df_cat_wgt_HLM, on = "index", how = "left").join(test_df_dep, on = "index", how = "left")
test_df_mean_wgt_HLM = test_df_num_mean.join(test_df_cat_wgt_HLM, on = "index", how = "left").join(test_df_dep, on = "index", how = "left")

print(f'... completed job in {time.time() - start} seconds.')

... completed job in 0.12122201919555664 seconds.


<br>

## Parquet all of the feature files

The dependent variable will not be on these files because the dependent variable will always be the same and it required no featuring engineering

In [58]:
# parquet all train files
parquet_files("data/df_zero_wv.parquet", df_zero_wv, "_c0", "_c22_wv")
parquet_files("data/df_mean_wv.parquet", df_mean_wv, "_c0", "_c22_wv")
parquet_files("data/df_zero_wgt_HL.parquet", df_zero_wgt_HL, "_c0", "_c22_wgt")
parquet_files("data/df_mean_wgt_HL.parquet", df_mean_wgt_HL, "_c0", "_c22_wgt")
parquet_files("data/df_zero_wgt_HLM.parquet", df_zero_wgt_HLM, "_c0", "_c22_wgt")
parquet_files("data/df_mean_wgt_HLM.parquet", df_mean_wgt_HLM, "_c0", "_c22_wgt")


deleting data/df_zero_wv.parquet
... completed data/df_zero_wv.parquet parquet file job in 207.8179919719696 seconds.

deleting data/df_mean_wv.parquet
... completed data/df_mean_wv.parquet parquet file job in 204.47486424446106 seconds.

deleting data/df_zero_wgt_HL.parquet
... completed data/df_zero_wgt_HL.parquet parquet file job in 181.21654510498047 seconds.

deleting data/df_mean_wgt_HL.parquet
... completed data/df_mean_wgt_HL.parquet parquet file job in 183.55237793922424 seconds.

deleting data/df_zero_wgt_HLM.parquet
... completed data/df_zero_wgt_HLM.parquet parquet file job in 202.77417302131653 seconds.

deleting data/df_mean_wgt_HLM.parquet
... completed data/df_mean_wgt_HLM.parquet parquet file job in 214.07915925979614 seconds.


In [59]:
# parquet all test files
parquet_files("data/test_df_zero_wv.parquet", test_df_zero_wv, "_c0", "_c22_wv")
parquet_files("data/test_df_mean_wv.parquet", test_df_mean_wv, "_c0", "_c22_wv")
parquet_files("data/test_df_zero_wgt_HL.parquet", test_df_zero_wgt_HL, "_c0", "_c22_wgt")
parquet_files("data/test_df_mean_wgt_HL.parquet", test_df_mean_wgt_HL, "_c0", "_c22_wgt")
parquet_files("data/test_df_zero_wgt_HLM.parquet", test_df_zero_wgt_HLM, "_c0", "_c22_wgt")
parquet_files("data/test_df_mean_wgt_HLM.parquet", test_df_mean_wgt_HLM, "_c0", "_c22_wgt")


deleting data/test_df_zero_wv.parquet
... completed data/test_df_zero_wv.parquet parquet file job in 198.25913763046265 seconds.

deleting data/test_df_mean_wv.parquet
... completed data/test_df_mean_wv.parquet parquet file job in 198.1466989517212 seconds.

deleting data/test_df_zero_wgt_HL.parquet
... completed data/test_df_zero_wgt_HL.parquet parquet file job in 176.74510288238525 seconds.

deleting data/test_df_mean_wgt_HL.parquet
... completed data/test_df_mean_wgt_HL.parquet parquet file job in 176.47251987457275 seconds.

deleting data/test_df_zero_wgt_HLM.parquet
... completed data/test_df_zero_wgt_HLM.parquet parquet file job in 195.98881149291992 seconds.

deleting data/test_df_mean_wgt_HLM.parquet
... completed data/test_df_mean_wgt_HLM.parquet parquet file job in 198.45913743972778 seconds.
