# Ensemble classifier on Titanic Kaggle dataset with Pyspark

## Objectives: full pyspark notebook

The primary goal of this notebook is to make a deep dive into spark ML module:  
- using the well known kaggle Titanic dataset (already explored in a sklearn notebook)  
- performing feature extraction and preprocessing using custom Transformers/Estimators pipelines  
- combining trained models into a custom majority vote Ensemble  
- preparing the ground for intensive hyperparameter tuning on a google gcp hadoop cluster  

<br>

## Note  
I opted for custom Estimators and Models especially for:  
- the imputation of missing _"Age"_ values via linear regression of features _"Pclass"_ and _"Sex"_  
- the creation of a majority Vote Ensemble classifier   

**Feedbacks are obviously welcome!**

# Dataset acquisition

In [1]:
# Let's vizualize the csv dataset format
!powershell Get-Content "../data/train.csv" -Head 3

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
1,0,3,"Braund, Mr. Owen Harris",male,22,1,0,A/5 21171,7.25,,S
2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Thayer)",female,38,1,0,PC 17599,71.2833,C85,C


In [2]:
from pyspark.sql import SparkSession, DataFrame, Column
from pyspark.sql.functions import count, when, col

spark = ( SparkSession.builder
    .master("local[*]")
    .appName("votingclassifier-titanic")
    .getOrCreate()
)

train_df = spark.read.csv(
    "../data/train.csv", 
    header = True,
    inferSchema = True
).cache()

test_df = spark.read.csv(
    "../data/test.csv", 
    header = True,
    inferSchema = True
).cache()

def display_missing_values(df):
        df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

print("train dataset - quick viz")
train_df.printSchema()
train_df.show(5)

print("train dataset - nb missing values per column")
display_missing_values(train_df)

print("test dataset - nb missing values per column")
display_missing_values(test_df)

train dataset - quick viz
root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|f

# Feature extraction

We already performed the feature engineering work with pandas in the sklearn notebook.  
Here we applied our findings using pyspark.  
Since Pyspark does not provide regression Imputer, we create a custome one for "Age" missing values regression from "Pclass" and "Sex"

In [3]:
import re
from pyspark.sql.functions import regexp_extract, udf
from pyspark.ml.feature import OneHotEncoder, StringIndexer, StandardScaler, VectorAssembler, Imputer
from pyspark.ml import Pipeline, Transformer, Estimator, Model
from pyspark.sql.types import IntegerType, DoubleType, StringType
from pyspark.ml.regression import LinearRegression
from pyspark.ml.linalg import Vectors
from pyspark.ml.tuning import ParamGridBuilder

# Accompanied = binary feature = "has either sibling, spouse, parent or child"
# stateless transformation => use a transformer
class ExtractAccompaniedFeature(Transformer):
    def transform(self, dataset, params=None):
        return dataset.withColumn(
            "Accompanied", 
            (dataset.SibSp + dataset.Parch >= 1).cast(IntegerType()) 
        )

# Imput missing Embarked with most frequent value
# stateful transformation => use an estimator
class HandleMissingEmbarked(Estimator):
    def fit(self, dataset, params=None):
        mostFrequentValue = (dataset.groupby("Embarked")
                             .count()
                             .orderBy("count", ascending=False)
                             .first()
                             .Embarked
                            )
        return HandleMissingEmbarkedModel(mostFrequentValue)
        
class HandleMissingEmbarkedModel(Model):
    
    def __init__(self, mostFrequentValue):
        self.mostFrequentValue = mostFrequentValue
        
    def transform(self, dataset, params=None):
        return dataset.fillna(self.mostFrequentValue, "Embarked")

# TItle regex processing
@udf(returnType=StringType())
def replace_title(s):
    mrs_pattern = "(Mme\.|Ms\.|Countess\.|Lady\.)"
    miss_pattern = "(Mlle\.)"
    mr_pattern = "(Don\.|Major\.|Sir\.|Col\.|Capt\.)"
    if re.search(mrs_pattern, s):
        return re.sub(mrs_pattern, "Mrs.", s)
    if re.search(miss_pattern, s):
        return re.sub(miss_pattern, "Miss.", s)
    if re.search(mr_pattern, s):
        return re.sub(mr_pattern, "Mr.", s)
    return s


@udf
def replace_empty(s):
    if s == "":
        return "No-Title"
    return s

# extraction of Title feature from Name
class ExtractTitle(Transformer):
    def transform(self, dataset, params=None):
        titles_extract_pattern = r'(Mr\.|Mrs\.|Miss\.|Master\.|Dr\.|Rev\.)'
        return ( dataset.withColumn("Title", regexp_extract("Name", titles_extract_pattern, 1))
                .withColumn("Title", replace_empty("Title"))
               )
    
# Imputing missing "Age" from regression of Pclass and Sex
class HandleMissingAge(Estimator):
    def __init__(self):
        vect = VectorAssembler(
            inputCols = ["Pclass_encoded", "Sex_encoded"], 
            outputCol='features_class_sex'
        )
        
        lr = LinearRegression(
            featuresCol="features_class_sex",
            labelCol='Age',
            predictionCol='Age_imputed',
            regParam = 0.3
        )

        self.pipe = Pipeline(
            stages = [
                vect,
                lr
            ])
        self._params = None
        self._paramMap = ParamGridBuilder().build()


    def fit(self, dataset, params=None):
        dataset_without_missing = dataset.where(col("Age").isNotNull())
        ageRegressor = self.pipe.fit(dataset_without_missing)
        return HandleMissingAgeModel(ageRegressor)

    
class HandleMissingAgeModel(Model):
    
    def __init__(self, ageRegressor):
        self.ageRegressor = ageRegressor
        
    def transform(self, dataset, params=None):
        null_age_df = dataset.where(col("Age").isNull())
        not_null_age_df = dataset.where(col("Age").isNotNull())
              
        null_age_df = (
            self.ageRegressor
            .transform(null_age_df)
            .drop("features_class_sex")
            .cache()
        )
            
        return null_age_df.union(
            not_null_age_df.withColumn("Age_imputed", col("Age"))
        )
    
pipe_extractFeatures = Pipeline(
    stages = [
        ExtractAccompaniedFeature(),
        ExtractTitle(),
        HandleMissingEmbarked(),
        
        # need to get Dummy categorisation of Pclass & Sex for Age Imputation
        StringIndexer(inputCol = "Pclass", outputCol='Pclass_indexed', handleInvalid='keep'),
        StringIndexer(inputCol = "Sex", outputCol='Sex_indexed', handleInvalid='keep'),
        OneHotEncoder(inputCol = "Pclass_indexed", outputCol='Pclass_encoded', handleInvalid='keep'),
        OneHotEncoder(inputCol = "Sex_indexed", outputCol='Sex_encoded', handleInvalid='keep'),
        
        HandleMissingAge(),
        Imputer(inputCol = "Fare", outputCol='Fare_imputed')       
])

keep_features = ["PassengerId", "Survived", "Pclass_encoded", "Title", "Sex_encoded", 
                 "Age_imputed", "Fare_imputed", "Embarked", "Accompanied"]

pipe_extractFeatures_fitted = pipe_extractFeatures.fit(train_df)

def extract_features(df):
    return (pipe_extractFeatures_fitted.transform(df)
            .transform(lambda df: df.select([col for col in df.columns if col in keep_features])))


train = extract_features(train_df).cache()
test = extract_features(test_df).cache()
train_df.unpersist()
test_df.unpersist()

print("train dataset - features extracted")
train.show(5)

print("train dataset - nb missing values per column")
display_missing_values(train)

print("test dataset - features extracted")
test.show(5)

print("test dataset - nb missing values per column")
display_missing_values(test)

train dataset - features extracted
+-----------+--------+--------+-----------+-----+--------------+-------------+------------------+------------+
|PassengerId|Survived|Embarked|Accompanied|Title|Pclass_encoded|  Sex_encoded|       Age_imputed|Fare_imputed|
+-----------+--------+--------+-----------+-----+--------------+-------------+------------------+------------+
|          6|       0|       Q|          0|  Mr.| (4,[0],[1.0])|(3,[0],[1.0])|26.497742339152797|      8.4583|
|         18|       1|       S|          0|  Mr.| (4,[2],[1.0])|(3,[0],[1.0])| 31.81582649258425|        13.0|
|         20|       1|       C|          0| Mrs.| (4,[0],[1.0])|(3,[1],[1.0])|21.982979570371622|       7.225|
|         27|       0|       C|          0|  Mr.| (4,[0],[1.0])|(3,[0],[1.0])|26.497742339152797|       7.225|
|         29|       1|       Q|          0|Miss.| (4,[0],[1.0])|(3,[1],[1.0])|21.982979570371622|      7.8792|
+-----------+--------+--------+-----------+-----+--------------+-------------

# Model Selection

## Training of RandomForest, GBT and MLP classifiers

We train these three models using a 3-Fold Cross validation.  
Hyperparameter tuning is kept aside for now, we'll perform it later on a cloud hadoop cluster (see gcp section)

In [4]:
from pyspark.ml.classification import GBTClassifier, RandomForestClassifier, MultilayerPerceptronClassifier
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

preprocess = Pipeline(
    stages = [
        StringIndexer(inputCol = "Embarked", outputCol='Embarked_indexed', handleInvalid='keep'),
        StringIndexer(inputCol = "Title", outputCol='Title_indexed', handleInvalid='keep'),
        OneHotEncoder(inputCol = "Embarked_indexed", outputCol='Embarked_encoded', handleInvalid='keep'),
        OneHotEncoder(inputCol = "Accompanied", outputCol='Accompanied_encoded', handleInvalid='keep'),
        OneHotEncoder(inputCol = "Title_indexed", outputCol='Title_encoded', handleInvalid='keep'),
        VectorAssembler(inputCols = ["Fare_imputed"], outputCol='Fare_vect'),
        StandardScaler(withMean = True, inputCol = "Fare_vect", outputCol='Fare_std'),
        VectorAssembler(inputCols = ["Age_imputed"], outputCol='Age_vect'),
        StandardScaler(withMean = True, inputCol = "Age_vect", outputCol='Age_std'),        
])

inputCols = [
    "Pclass_encoded", 
    "Sex_encoded", 
    "Embarked_encoded", 
    "Accompanied_encoded", 
    "Title_encoded",
    "Age_std", 
    "Fare_std"]

gbt = GBTClassifier(
    featuresCol='features',
    labelCol = "Survived")

rf = RandomForestClassifier(
    featuresCol='features',
    labelCol = "Survived")

va = VectorAssembler(
    inputCols = inputCols,
    outputCol='features'
)

# here we have 23 inputs features corresponding to the 7 features in inputCols which are already encoded
# can be checked with: 
# transformed = pipe_preprocessing.fit(train_df).transform(train_df)
# va.transform(transformed).schema["features"].metadata["ml_attr"]
layers = [23, 100, 2]

mlp = MultilayerPerceptronClassifier(
    featuresCol='features',
    labelCol = "Survived",
    layers=layers
)

# Yes, accuracy is rather a poor evaluation metric
# but it is the metric defined in the Titanic kaggle competition
# the only way to get an accuracy evaluator seems to use MulticlassClassificationEvaluator
accuracyEvaluator = MulticlassClassificationEvaluator(labelCol="Survived", metricName="accuracy")
    

trainedModels = dict()
for classifier, classifier_name in zip( [mlp, rf, gbt], ["mlp", "rf", "gbt"]):
    
    clf = Pipeline(
        stages = [
        preprocess,
        va,
        classifier
    ])

    # no paramter tuning for now, will perform it later on a gcp hadoop cluster    
    paramGrid = ParamGridBuilder().build()

    # example of parameter tuning for GBT:
    # paramGrid = ParamGridBuilder()
    #   .addGrid(gbt.stepSize, [0.001, 0.03, 0.1, 0.3])
    #   .addGrid(gbt.maxDepth, list(range(3,10))
    #   .build()

    cv = CrossValidator(
        estimator=clf, 
        estimatorParamMaps=paramGrid, 
        evaluator=accuracyEvaluator, 
        numFolds=3, 
        parallelism=2)

    cv_model = cv.fit(train)
    trainedModels[classifier_name] = cv_model.bestModel
    print(f"[{classifier_name}] avg accuracy:\n{cv_model.avgMetrics[0]}\n")


[mlp] avg accuracy:
0.8020948608210172

[rf] avg accuracy:
0.8196452797511349

[gbt] avg accuracy:
0.8174916361138569



In [5]:
# print(gbt.explainParams())

## Majority Vote Ensemble Estimator

We implement here the equivalent to sklearn.ensemble.VotingClassifier in Pyspark.  
The goal using such an ensemble being to obtain better overall performances and reduce overfitting.

In [6]:
import numpy as np
from pyspark.sql.functions import array

@udf(returnType = DoubleType())
def majority_vote_prediction(predictions):
    return float(np.mean(predictions) >= 0.5)

@udf(returnType = DoubleType())
def majority_vote_proba(predictions, probas):
    prediction = int(np.mean(predictions) >= 0.5)
    if prediction == 0:
        # high probability for the class [Survived == 0]
        return float(np.max(probas))
    else:
        return float(np.min(probas))
    
# Custom implementation of a majority vote Ensemble of trained ML models
# should be equivalent of its scikit-learn counterpart
# TODO: make it more generic to fit any ensemble of fitted models: currently adherence with Titanic dataset features names

class VotingClassifier(Model, Estimator):
    
    def __init__(self, fittedModels):
        self.fittedModels = fittedModels
    
    @staticmethod
    def concatenate(final_df, current_df):
        keeped = ["PassengerId", "prediction", "probability"]
        if(final_df == None):
            # keep labels "Survived" column
            return current_df.select([col for col in current_df.columns if col in keeped + ["Survived"]])
        return final_df.join(current_df.select(keeped), "PassengerId")
        
    def transform(self, dataset, params=None):
        all_predictions_df = None
        for model_name, model in self.fittedModels.items():
            all_predictions_df = (
                model.transform(dataset)
                .transform(lambda df: VotingClassifier.concatenate(all_predictions_df, df))
                .withColumnRenamed("prediction", f"{model_name}_prediction")
                .withColumnRenamed("probability", f"{model_name}_probability")
             )
        
        predictionCols = [col for col in all_predictions_df.columns if "_prediction" in col]
        probabilityCols = [col for col in all_predictions_df.columns if "_probability" in col]
        return (
            all_predictions_df
            .withColumn(
                "prediction", 
                majority_vote_prediction(array([col(c) for c in predictionCols])))
            .withColumn(
                "probability", 
                majority_vote_proba(array([col(c) for c in predictionCols]), array([col(c) for c in probabilityCols])))
        )


## Make predictions with the ensemble model 

In [None]:
votingClf = VotingClassifier(trainedModels)
final_prediction = votingClf.transform(test).cache()
final_prediction.show(5)
    
# store final prediction for kaggle submission
(
    final_prediction
    .select(col("PassengerId"), col("prediction").cast(IntegerType()))
    .withColumnRenamed("prediction", "Survived")
    .coalesce(1)
    .orderBy("PassengerId")
    .write.csv(
        "../submissions/titanic_ensemble_rf_gb_mlp_spark.csv",
        mode = "overwrite",
        header = True
    )
)