In [30]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.ml.feature import OneHotEncoder, OneHotEncoderModel, StringIndexer, StandardScaler, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import rand
from pyspark.mllib.evaluation import MulticlassMetrics

In [4]:
conf = SparkConf().setAppName('Spark DL Tabular Pipeline').setMaster('local[6]')
sc = SparkContext(conf=conf)
sql_context = SQLContext(sc)

In [6]:
df = sql_context.read.csv('data/bank.csv',
                         header=True,
                         inferSchema=True)

In [7]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- deposit: string (nullable = true)



In [8]:
df.limit(5).toPandas()

Unnamed: 0,age,job,marital,education,default,balance,housing,loan,contact,day,month,duration,campaign,pdays,previous,poutcome,deposit
0,59,admin.,married,secondary,no,2343,yes,no,unknown,5,may,1042,1,-1,0,unknown,yes
1,56,admin.,married,secondary,no,45,no,no,unknown,5,may,1467,1,-1,0,unknown,yes
2,41,technician,married,secondary,no,1270,yes,no,unknown,5,may,1389,1,-1,0,unknown,yes
3,55,services,married,secondary,no,2476,yes,no,unknown,5,may,579,1,-1,0,unknown,yes
4,54,admin.,married,tertiary,no,184,no,no,unknown,5,may,673,2,-1,0,unknown,yes


In [9]:
df = df.drop('day', 'month')

In [10]:
def select_features_to_scale(df=df, lower_skew=-2, upper_skew=2, dtypes='int32', drop_cols=['']):
    
    selected_features = []
    feature_list = list(df.toPandas().select_dtypes(include=[dtypes]).columns.drop(drop_cols))
    
    for feature in feature_list:
        
        if df.toPandas()[feature].kurtosis() < -2 or df.toPandas()[feature].kurtosis() > 2:
            
            selected_features.append(feature)
    return selected_features

In [16]:
cat_features = ['job', 'marital', 'education', 'default', 'housing', 'loan', 'contact', 'poutcome']
num_features = ['age','balance','duration','campaign','pdays','previous']
label = 'deposit'

# Pipeline Stages List
stages = []

# Loop for StringIndexer and OHE for Categorical Variables
for features in cat_features:
    
    # Index Categorical Features
    string_indexer = StringIndexer(inputCol=features, outputCol=features + "_index")
    
    # One Hot Encode Categorical Features
    encoder = OneHotEncoder(inputCols=[string_indexer.getOutputCol()],
                                     outputCols=[features + "_class_vec"])
    # Append Pipeline Stages
    stages += [string_indexer, encoder]
    
# Index Label Feature
label_str_index =  StringIndexer(inputCol=label, outputCol="label_index")

# Scale Feature: Select the Features to Scale using helper 'select_features_to_scale' function above and Standardize 
unscaled_features = select_features_to_scale(df=df, lower_skew=-2, upper_skew=2, dtypes='int32', drop_cols=['age'])

unscaled_assembler = VectorAssembler(inputCols=unscaled_features, outputCol="unscaled_features")
scaler = StandardScaler(inputCol="unscaled_features", outputCol="scaled_features")

stages += [unscaled_assembler, scaler]

# Create list of Numeric Features that Are Not Being Scaled
num_unscaled_diff_list = list(set(num_features) - set(unscaled_features))

# Assemble or Concat the Categorical Features and Numeric Features
assembler_inputs = [feature + "_class_vec" for feature in cat_features] + num_unscaled_diff_list

assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="assembled_inputs") 

stages += [label_str_index, assembler]

# Assemble Final Training Data of Scaled, Numeric, and Categorical Engineered Features
assembler_final = VectorAssembler(inputCols=["scaled_features","assembled_inputs"], outputCol="features")

stages += [assembler_final]

In [17]:
stages

[StringIndexer_9cb788fb9872,
 OneHotEncoder_0240eb97384d,
 StringIndexer_a86dc1393c96,
 OneHotEncoder_d8266ddd4d73,
 StringIndexer_02645b3df890,
 OneHotEncoder_6e1ca595d46f,
 StringIndexer_5354c908d33d,
 OneHotEncoder_f599b91031ff,
 StringIndexer_39370e8b170a,
 OneHotEncoder_92facc17da4c,
 StringIndexer_017b82f0881b,
 OneHotEncoder_0c80010f0256,
 StringIndexer_4b1323bc380c,
 OneHotEncoder_703a002ac865,
 StringIndexer_b01446a8050d,
 OneHotEncoder_7c76a57e19ef,
 VectorAssembler_58cf729d52b0,
 StandardScaler_57050db8e802,
 StringIndexer_b0ddb0ff5b88,
 VectorAssembler_bda5735ed06d,
 VectorAssembler_e4b7dcc9c25f]

In [18]:
# Set Pipeline
pipeline = Pipeline(stages=stages)

# Fit Pipeline to Data
pipeline_model = pipeline.fit(df)

# Transform Data using Fitted Pipeline
df_transform = pipeline_model.transform(df)

In [19]:
# Preview Newly Transformed Data
df_transform.limit(5).toPandas()

Unnamed: 0,age,job,marital,education,default,balance,housing,loan,contact,duration,...,loan_class_vec,contact_index,contact_class_vec,poutcome_index,poutcome_class_vec,unscaled_features,scaled_features,label_index,assembled_inputs,features
0,59,admin.,married,secondary,no,2343,yes,no,unknown,1042,...,(1.0),1.0,"(0.0, 1.0)",0.0,"(1.0, 0.0, 0.0)","[2343.0, 1042.0, 1.0, -1.0, 0.0]","[0.7264185278681131, 3.0017712260834295, 0.367...",1.0,"(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.7264185278681131, 3.0017712260834295, 0.367..."
1,56,admin.,married,secondary,no,45,no,no,unknown,1467,...,(1.0),1.0,"(0.0, 1.0)",0.0,"(1.0, 0.0, 0.0)","[45.0, 1467.0, 1.0, -1.0, 0.0]","[0.013951700279157103, 4.226102100445672, 0.36...",1.0,"(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.013951700279157103, 4.226102100445672, 0.36..."
2,41,technician,married,secondary,no,1270,yes,no,unknown,1389,...,(1.0),1.0,"(0.0, 1.0)",0.0,"(1.0, 0.0, 0.0)","[1270.0, 1389.0, 1.0, -1.0, 0.0]","[0.39374798565621155, 4.001401375268602, 0.367...",1.0,"(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.39374798565621155, 4.001401375268602, 0.367..."
3,55,services,married,secondary,no,2476,yes,no,unknown,579,...,(1.0),1.0,"(0.0, 1.0)",0.0,"(1.0, 0.0, 0.0)","[2476.0, 579.0, 1.0, -1.0, 0.0]","[0.7676535531376218, 1.667970767660562, 0.3673...",1.0,"(0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, ...","(0.7676535531376218, 1.667970767660562, 0.3673..."
4,54,admin.,married,tertiary,no,184,no,no,unknown,673,...,(1.0),1.0,"(0.0, 1.0)",0.0,"(1.0, 0.0, 0.0)","[184.0, 673.0, 2.0, -1.0, 0.0]","[0.05704695225255348, 1.938763949284211, 0.734...",1.0,"(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.05704695225255348, 1.938763949284211, 0.734..."


In [20]:
# Data Structure Type is a PySpark Dataframe
type(df_transform)

pyspark.sql.dataframe.DataFrame

In [21]:
# Select only 'features' and 'label_index' for Final Dataframe
df_transform_fin = df_transform.select('features','label_index')
df_transform_fin.limit(5).toPandas()

Unnamed: 0,features,label_index
0,"(0.7264185278681131, 3.0017712260834295, 0.367...",1.0
1,"(0.013951700279157103, 4.226102100445672, 0.36...",1.0
2,"(0.39374798565621155, 4.001401375268602, 0.367...",1.0
3,"(0.7676535531376218, 1.667970767660562, 0.3673...",1.0
4,"(0.05704695225255348, 1.938763949284211, 0.734...",1.0
