# General pipeline for binary classification tasks in spark ml

$\color{blue}{\text{Covering major components in real life scenarios for binary classification tasks using structured datasets.}}$

Modelling and data transformation part will be mostly operated in pure spark, but exploration and plotting metrics may involve pandas components.

The toolkits are in the lib/ folder, covering following topics:

0. summary on transformer, estimators, pipelines
1. spark and pandas dataframe conversion, tips in converting datatypes and assign correct schema
2. typical udf to transform columns
3. explorative analysis on spark df
4. categorical variables encoding methods, some advanced types of encoding implemented
5. feature selection methods in spark ml, selection based on model, lasso...
6. handling skewed datasets and highly imbalanced labels (up/down sampling) SMOTE in spark
7. modelling toolkits, contains common classifiers and their tuning guidance, use of xgboost in spark
8. metrics plotting tools, to plot common metrics after training

### Spark ml structure:
Key components:
1. Transformer
2. Estimator
3. Pipeline

$\textbf{Transformer}$ can transform one df into another df by appending new columns onto original df. E.g., an ML model is a Transformer which transforms a DataFrame with features into a DataFrame with predictions. It has .transform() method, normally taking df as input. Transformers can be trained models, trained encoders.

$\textbf{Estimator}$ is an algorithm to be fit on a df to produce a Transformer. E.g., a learning algorithm is an Estimator which trains on a df and produces a model; if we specify a One-hot-encoder, it is an estimarot object, we need to .fit() it onto a column and obtain a transformer. Output of fitted/trained estimator is transformer.

$\textbf{Pipeline}$ chains multiple Transformers and Estimators together to specify an ML workflow. When executing the pipeline, spark will automatically sort out the steps to execute, depending on whether you called a .fit() or .transform() method. A Pipeline is specified as a sequence of stages, and each stage is either a Transformer or an Estimator. These stages are run in order, and the input DataFrame is transformed as it passes through each stage. For Transformer stages, the transform() method is called on the DataFrame. For Estimator stages, the fit() method is called to produce a Transformer (which becomes part of the PipelineModel, or fitted Pipeline). Pipeline is an estimator, after calling pipeline.fit() method, the output will be PipelineModel, a transformer ready to apply .transform() at test time.

In [146]:
import os
import random
import pandas as pd
pd.options.display.max_columns=None
pd.options.display.max_rows=None

#import toolkits
from lib import util
from lib import logger

def initialize_spark(app_name='spark_pipeline'):
    import findspark
    #spark path using default value
    findspark.init()

    import pyspark
    import pyarrow
    from pyspark.sql import SQLContext
    
    #broadcastTimeout is purposedly set to be large due to development on single machine
    conf = pyspark.SparkConf()\
        .setAppName(app_name)\
        .setMaster('local')\
        .set('spark.driver.memory', '8g')\
        .set('spark.executor.memory', '8g')\
        .set('spark.executor.instances', 4)\
        .set('spark.executor.cores', 4)\
        .set('spark.driver.maxResultSize', '8g')\
        .set('spark.sql.shuffle.partitions', 100)\
        .set('spark.default.parallelism', 200)\
        .set('spark.sql.broadcastTimeout', 36000)\
        .set('spark.kryoserializer.buffer.max', '1024m')\
        .set('spark.sql.execution.arrow.enabled', 'false')\
        .set('spark.dynamicAllocation.enabled', "False")\
        .set('spark.port.maxRetries',30) 

    sc = pyspark.SparkContext.getOrCreate(conf)
    spark = pyspark.sql.SparkSession(sc)
    sqlContext = SQLContext.getOrCreate(sc)    
    return sc,spark,sqlContext

from pyspark.ml import Pipeline
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.sql.types import IntegerType,DecimalType
from pyspark.sql.functions import when, lit
from distutils.version import LooseVersion
from importlib import reload
import pyspark.sql.functions as func
import pyspark.sql.types as typ

In [2]:
sc,spark,sqlContext = initialize_spark()

## Loading data into spark dataframe

In [6]:
df = pd.read_csv('datasets/adult.csv')
# if directly using spark.read.csv('datasets/adult.csv',header=True), unless we specify schema manually,
# all columns will be interpreted as string type, troublesome for later process
dataset = util.pandas_to_spark(sqlContext,df)
dataset = dataset.withColumn('income', when(dataset.income=='<=50K', lit(0)).otherwise(1))
cols = dataset.columns

### 1. Random downsampling

In [23]:
from lib import imbalance_handler as imbalance_handle
import lib.feature_selection as fs
reload(fs)
reload(imbalance_handle)

<module 'lib.imbalance_handler' from '/Users/hwang/Desktop/spark_pipelines/lib/imbalance_handler.py'>

In [None]:
down_sampled_df = imbalance_handle.spark_df_down_sampling(dataset, 2, 'income', major_class_val = 0)

In [10]:
down_sampled_df.select('income').toPandas().income.value_counts()

0    23373
1    11687
Name: income, dtype: int64

In [67]:
# get num_cols and cat_cols from spark df
num_cols, cat_cols = util.get_num_cat_feat(dataset)

All columns are been covered.


In [14]:
min_cat = 2
max_cat = 20

In [14]:
cat_coverage_df,no_info_col,cols_high_cardinality = fs.cat_col_cardinality_test(dataset,cat_cols,min_cat,max_cat)

Start the count computation for categorical features...
The no. of categorical features: 8


In [41]:
# find highly correlated columns
fs.num_cols_correlation_test(dataset,num_cols,0.1)

['capital-loss', 'capital-gain', 'income', 'educational-num', 'hours-per-week']

### 2. Smote, encoding, preprocessing, normalization should happen before smote

In [16]:
from config.conf_template import Struct as Section

In [17]:
conf = Section("smote_config")
conf.seed = 48
conf.bucketLength = 100
conf.k = 4
conf.multiplier = 3

In [126]:
vectorized = imbalance_handle.pre_smote_df_process(dataset,num_cols,cat_cols,'income')
res = imbalance_handle.smote(vectorized,conf)

generating batch 0 of synthetic instances
generating batch 1 of synthetic instances
generating batch 2 of synthetic instances


In [127]:
res.persist()
res.cache()

DataFrame[features: vector, workclass_index: double, native-country_index: double, marital-status_index: double, gender_index: double, race_index: double, relationship_index: double, education_index: double, occupation_index: double, label: int]

In [155]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator

In [157]:
res_restored = restore_smoted_df(num_cols,res,'features')

## Start encoding selected categorical cols

Stringindex all -> train test split -> smote train -> restore smoted train to original columns -> train models -> transform testset

In [160]:
import pyspark
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from distutils.version import LooseVersion
from pyspark.ml.feature import StandardScaler,MinMaxScaler

def one_hot_encode_cat_cols(sdf,cat_cols,require_str_index,cat_encode_suffix):
    '''
    perform one hot encoding for cat_cols 
    input:
    * sdf: spark df
    * cat_cols: categorical columns
    * require_str_index: bool, control if to string index before encoding
    output:
    * stages
    '''
    stages = [] # stages in our Pipeline

    for categoricalCol in cat_cols:
        # Category Indexing with StringIndexer, will encode to numerical according to frequency, highest frequency will be encoded to 0
        # when applying this stringIndexer onto another dataset and encounter missing encoded value, we can throw exception or setHandleInvalid(“skip”)
        # like indexer.fit(df1).setHandleInvalid("skip").transform(df2), will remove all rows unable to encode    
        # no indexing applied
        # stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")

        # Use OneHotEncoder to convert categorical variables into binary SparseVectors，
        # binary sparse vectors like (2,[0],[1.0]) means a vector of length 2 with 1.0 at position 0 and 0 elsewhere.
        # spark OHE will automatically drop the last category, you can force it not to drop by dropLast=False
        # it omits the final category to break the correlation between features

        if require_str_index == False:
            # column is already indexed, with suffix _index as default
            if LooseVersion(pyspark.__version__) < LooseVersion("3.0"):
                from pyspark.ml.feature import OneHotEncoderEstimator
                encoder = OneHotEncoderEstimator(inputCols=[categoricalCol], outputCols=[categoricalCol + cat_encode_suffix])
            else:
                from pyspark.ml.feature import OneHotEncoder
                encoder = OneHotEncoder(inputCols=[categoricalCol], outputCols=[categoricalCol + cat_encode_suffix])
            # Add stages.  These are not run here, but will run all at once later on.
            stages += [encoder]
        else:
            # column is not indexed yet, we need to stringIndex first, with suffix being _index
            print("not yet implemented")
            
    return stages

def str_index_cat_cols(sdf,cat_cols,cat_cols_affix):
    '''
    only to stringIndex cols (per item frequency), no encoding applied
    input:
        * sdf: spark df
        * cat_cols: cat cols to be string indexed
        * cat_cols_affix: output affix to indexed cat cols
        * stages: input stages (from any previous stages)
    output:
        * stages: modified stages for spark pipeline
    '''
    stages = []
    
    for categoricalCol in cat_cols:
        stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + cat_cols_affix)
        stages+=[stringIndexer]
        
    return stages

In [None]:
vectorized = imbalance_handle.pre_smote_df_process(trainingData,num_cols,cat_cols,'income')
res = imbalance_handle.smote(vectorized,conf)

In [169]:
new_cat_cols = ['workclass_index','native-country_index','marital-status_index',
'gender_index','race_index','relationship_index','education_index','occupation_index']

encoded_outcols = [a+'classVec' for a in new_cat_cols]


In [165]:
stages = one_hot_encode_cat_cols(res,cat_cols_smoted,False,'classVec')

In [170]:
encoded_outcols

['workclass_indexclassVec',
 'native-country_indexclassVec',
 'marital-status_indexclassVec',
 'gender_indexclassVec',
 'race_indexclassVec',
 'relationship_indexclassVec',
 'education_indexclassVec',
 'occupation_indexclassVec']

In [167]:
num_cols

['age',
 'fnlwgt',
 'educational-num',
 'capital-gain',
 'capital-loss',
 'hours-per-week']

In [168]:
assemblerInputs = num_cols+encoded_outcols
#assemblerInputs stores all necessary (transformed) columns after all the stages
#VectorAssembler only applied to numerical or transformed categorical columns
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="all_features")
stages += [assembler]

In [171]:
#having compiled the stages into a list, at execution, it will automatically sort out the sequence to perform steps in stages
#like when .fit() is called, what should be executed...
partialPipeline = Pipeline().setStages(stages) #type is pipeline, independent of dataframe, only using stages 
pipelineModel = partialPipeline.fit(res_restored) #type is pipelinemodel, use the prepared staged pipelines to fit dataframe
preppedDataDF = pipelineModel.transform(res_restored) #type is stage transformed dataframe, it contains all original columns, and indexed/encoded/vector_encoded columns

In [35]:
trainingData, testData = dataset.randomSplit([0.7, 0.3], seed=100)

print(trainingData.count())
print(testData.count())

34294
14548


In [422]:
#stages = one_hot_encode_cat_cols(trainingData,cat_cols,'classVec')
#stages_2 = assemble_into_features(trainingData,num_cols,cat_cols,stages,'classVec')

def get_cat_col_cardinality(sdf,cat_cols):
    '''
    generate top categories for each cat features
    input: 
    * spark df
    * num_cols, cat_cols: list of str
    func is sql.functions
    '''
    #cat_info_dict to store index of cat feature and its cardinality, for modelling input
    cat_info_dict = {}

    print('generating cardinality map for cat cols')
    for i, col in enumerate(cat_cols):
        cat_info_dict[i] = sdf.select(func.countDistinct(col).alias("distinct_count_%s"%col)).collect()[0][0]
    
    return cat_info_dict

#d = get_cat_col_cardinality(dataset,cat_cols)

def assemble_into_features(sdf,num_cols,cat_cols,stages,cat_cols_affix):
    '''
    assemble all features into vector
    cat_cols with affix
    num cols
    input:
    * processed cat cols affix
    '''
    # to combine all the feature columns into a single vector column. 
    # This includes both the numeric columns and the one-hot encoded binary vector columns in our dataset.
    # Transform all features into a vector using VectorAssembler
    
    # keep track of num cols indices for any smote purposes, in dealing with smote on cat cols
    num_cols_indices = list(range(len(num_cols)))
    
    assemblerInputs = num_cols+[c + cat_cols_affix for c in cat_cols]
    #assemblerInputs stores all necessary (transformed) columns after all the stages
    #VectorAssembler only applied to numerical or transformed categorical columns
    assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
    stages += [assembler] 

    # then we apply scaling on the vectorized features, 2 additional params are:
    # withStd: True by default. Scales the data to unit standard deviation.
    # withMean: False by default. Centers the data with mean before scaling.
    scaler = StandardScaler(inputCol="features", outputCol="scaled_features",withMean=True)
    #scaler = MinMaxScaler(min=0, max=1, inputCol='features', outputCol='features_minmax')

    stages += [scaler] 
    return stages

In [423]:
#having compiled the stages into a list, at execution, it will automatically sort out the sequence to perform steps in stages
#like when .fit() is called, what should be executed...
partialPipeline = Pipeline().setStages(stages) #type is pipeline, independent of dataframe, only using stages 

pipelineModel = partialPipeline.fit(trainingData) #type is pipelinemodel, use the prepared staged pipelines to fit dataframe

preppedDataDF = pipelineModel.transform(trainingData) #type is stage transformed dataframe, it contains all original columns, and indexed/encoded/vector_encoded columns


In [424]:
preppedDataDF = preppedDataDF.withColumnRenamed("income","label")

In [173]:
preppedDataDF_test = pipelineModel.transform(testData)
preppedDataDF_test = preppedDataDF_test.withColumnRenamed("income","label")

IllegalArgumentException: 'Field "workclass_index" does not exist.\nAvailable fields: age, workclass, fnlwgt, education, educational-num, marital-status, occupation, relationship, race, gender, capital-gain, capital-loss, hours-per-week, native-country, income'

In [426]:
# now to train on the train set
from pyspark.ml.classification import RandomForestClassifier
# Create an initial RandomForest model.
rf = RandomForestClassifier(labelCol="label", featuresCol="features")
# Train model with Training Data
rfModel = rf.fit(preppedDataDF)

In [427]:
# Make predictions on test data using the transform() method.
# LogisticRegression.transform() will only use the 'features' column.
predictions = rfModel.transform(preppedDataDF_test)

In [428]:
# View model's predictions and probabilities of each prediction class
# You can select any columns in the above schema to view as well. For example's sake we will choose age & occupation
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)

0.8883475483195341

In [415]:
evaluator.getMetricName()

'areaUnderROC'

In [438]:
pandasDF = pd.DataFrame(preppedDataDF_test.schema["features"].metadata["ml_attr"]["attrs"]["binary"]+preppedDataDF_test.schema["features"].metadata["ml_attr"]["attrs"]["numeric"]).sort_values("idx")

In [440]:
feature_dict = dict(zip(pandasDF["idx"],pandasDF["name"])) 

feature_dict_broad = sc.broadcast(feature_dict)

In [463]:
col_importance_val = []
for i,importance in enumerate(importances):
    col_importance_val.append([i,importance])

final_sorted_importance = sorted(col_importance_val, key=lambda x: x[1], reverse =True)

In [466]:
top_feature_index = [a[0] for a in final_sorted_importance[:15]]

In [451]:
importances = list(np.array(rfModel.featureImportances))

In [471]:
res = []
for i,importance in enumerate(importances):
    feature_nm = feature_dict[i]
    res.append([feature_nm,importance])
    
sorted_important_fs = sorted(res, key=lambda x: x[1], reverse =True)

In [368]:
# Doing cross validation and params tuning

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(rf.maxDepth, [2, 4, 6])
             .addGrid(rf.maxBins, [20, 60])
             .addGrid(rf.numTrees, [5, 20])
             .build())
# paramGrid contains 3*2*2 = 12 models
# cv is 5 folds, so total 60 models are searched

# Create 5-fold CrossValidator, input is an estimator (rf classifier e.g.)
cv = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
# Run cross validations
cvModel = cv.fit(trainingData)
# this will likely take a fair amount of time because of the amount of models that we're creating and testing

In [369]:
# Use test set to measure the accuracy of our model on new data
predictions = cvModel.transform(testData)

In [370]:
# cvModel uses the best model found from the Cross Validation
# Evaluate best model
evaluator.evaluate(predictions)

0.8999505914844388

In [202]:
# vector assembler can have inputs as: numeric,bool,vector
# output will be a flattened vector (even if input could have vector)

In [371]:
bestModel = cvModel.bestModel

In [372]:
# Generate predictions for entire dataset
finalPredictions = bestModel.transform(dataset)
# Evaluate best model
evaluator.evaluate(finalPredictions)

In [None]:
'''
from pyspark.ml.feature import StringIndexer, IndexToString
labelReverse = IndexToString().setInputCol("race_index").setOutputCol("recover")
labelReverse.transform(vectorized).select("race_index","recover").show()
'''