In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName("Variable_Selection").getOrCreate()

In [3]:
file_location = "train.csv"
file_type = "csv"
infer_schema="False"
first_row_is_header="True"
delimeter=","

In [4]:
df = spark.read.format(file_type)\
.option("inferSchema",infer_schema)\
.option("header",first_row_is_header)\
.option("sep",delimeter)\
.load(file_location)

In [5]:
df.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|  22|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|  38|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|  26|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|  35|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|  35|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

In [27]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import count, when, isnan, col

In [14]:
#1.  Identify variable types

In [18]:
def variable_type(df):
    # use the dtypes to separate character and numeric variables
    vars_list = df.dtypes
    char_vars = [] #character variable list
    num_vars = [] # numeric variable list
    for variable in vars_list:
        if variable[1] in ('string'):
            char_vars.append(variable[0])
        else:
            num_vars.append(variable[0])
    return char_vars, num_vars

In [19]:
char_vars, num_vars = variable_type(df)

In [20]:
#2. Apply string indexer to character columns

In [22]:
def category_to_index(df,char_vars):
    
    char_df = df.select(char_vars)
    indexers = [StringIndexer(inputCol=c, outputCol=c+"_index", handleInvalid="keep") for c in char_df.columns]
    pipeline = Pipeline(stages=indexers)
    char_labels = pipeline.fit(char_df)
    df = char_labels.transform(df)
    return df, char_labels

In [23]:
df, char_labels = category_to_index(df,char_vars)

In [24]:
df = df.select([c for c in df.columns if c not in char_vars])

In [28]:
def rename_columns(df, char_vars):
    mapping = dict(zip([i + "_index" for i in char_vars], char_vars))
    df = df.select([col(c).alias(mapping.get(c,c)) for c in df.columns])
    return df

In [29]:
df = rename_columns(df,char_vars)

In [30]:
# step 3 assemble features

In [31]:
target_variable_name = "Survived"

In [32]:
def assemble_vectors(df,features_list,target_variable_name):
    stages = []
    # assemble vectors
    assembler = VectorAssembler(inputCols=features_list,outputCol='features')
    stages=[assembler]
    #select all the columns + target + newly created 'features' column
    selectedCols = [target_variable_name,'features'] + features_list
    #use pipeline to process sequentially
    pipeline = Pipeline(stages=stages)
    #assembler model
    assembleModel =pipeline.fit(df)
    #apply assembler model on data
    df = assembleModel.transform(df).select(selectedCols)
    return df

In [33]:
# exclude target variable and select other feature vectors
features_list =df.columns

In [34]:
features_list.remove(target_variable_name)

In [35]:
df = assemble_vectors(df, features_list, target_variable_name)

In [37]:
import pandas as pd
for k,v in df.schema["features"].metadata["ml_attr"]["attrs"].items():
    features_df=pd.DataFrame(v)


In [38]:
features_df.head()

Unnamed: 0,vals,idx,name
0,"[1, 10, 100, 101, 102, 103, 104, 105, 106, 107...",0,PassengerId
1,"[3, 1, 2, __unknown]",1,Pclass
2,"[""Andersson, Mr. August Edvard (""""Wennerstrom""...",2,Name
3,"[male, female, __unknown]",3,Sex
4,"[24, 22, 18, 19, 28, 30, 21, 25, 36, 29, 26, 2...",4,Age


In [39]:
# Chi-SQuare Selector

In [44]:
features_list = char_vars #

from pyspark.ml.feature import ChiSqSelector
from pyspark.ml.linalg import Vectors

selector = ChiSqSelector(numTopFeatures=6, featuresCol="features", outputCol="selectedFeatures",labelCol=target_variable_name)

chi_selector = selector.fit(df)

result=chi_selector.transform(df)

print("ChiSqSelector output with top %d features selected" % selector.getNumTopFeatures())

print("Selected Indices: ", chi_selector.selectedFeatures)
features_df['chisq_importance'] = features_df["idx"].apply(lambda x: 1 if x in chi_selector.selectedFeatures else 0)
print(features_df)

ChiSqSelector output with top 6 features selected
Selected Indices:  [1, 3, 5, 8, 9, 10]
                                                 vals  idx         name  \
0   [1, 10, 100, 101, 102, 103, 104, 105, 106, 107...    0  PassengerId   
1                                [3, 1, 2, __unknown]    1       Pclass   
2   ["Andersson, Mr. August Edvard (""Wennerstrom"...    2         Name   
3                           [male, female, __unknown]    3          Sex   
4   [24, 22, 18, 19, 28, 30, 21, 25, 36, 29, 26, 2...    4          Age   
5                    [0, 1, 2, 4, 3, 8, 5, __unknown]    5        SibSp   
6                    [0, 1, 2, 3, 5, 4, 6, __unknown]    6        Parch   
7   [1601, 347082, CA. 2343, 3101295, 347088, CA 2...    7       Ticket   
8   [8.05, 13, 7.8958, 7.75, 26, 10.5, 7.925, 7.77...    8         Fare   
9   [B96 B98, C23 C25 C27, G6, C22 C26, D, E101, F...    9        Cabin   
10                               [S, C, Q, __unknown]   10     Embarked   

    chisq_