# Titanic dataset

In this script we will train a model for the titanic dataset. This is an exercise that is done for 
many people who is introduced in the Machine Learning analysis. I have done this before with sklearn, pandas, and other common libraries for data analysis, but today I will make the same thing with spark.

In [1]:
from pyspark.sql import SparkSession
spark=SparkSession.builder\
.master("local[*]")\
.appName("Titanic_model")\
.getOrCreate()

You are probably asking what "local[*]" means. We pass this parameter to the method master, and it means that we´re Running Spark locally with as many worker threads as logical cores on your machine.

In [2]:
Titanic=spark.read.csv("Titanic.csv",header=True,inferSchema=True)

In [3]:
Titanic.show(4)

+------+--------+--------------------+------+------+-----+-----+------+--------+-------+--------+----+----+--------------------+
|pclass|survived|                name|   sex|   age|sibsp|parch|ticket|    fare|  cabin|embarked|boat|body|           home_dest|
+------+--------+--------------------+------+------+-----+-----+------+--------+-------+--------+----+----+--------------------+
|     1|       1|Allen, Miss. Elis...|female|  29.0|    0|    0| 24160|211.3375|     B5|       S|   2|null|        St Louis, MO|
|     1|       1|Allison, Master. ...|  male|0.9167|    1|    2|113781|  151.55|C22 C26|       S|  11|null|Montreal, PQ / Ch...|
|     1|       0|Allison, Miss. He...|female|   2.0|    1|    2|113781|  151.55|C22 C26|       S|null|null|Montreal, PQ / Ch...|
|     1|       0|Allison, Mr. Huds...|  male|  30.0|    1|    2|113781|  151.55|C22 C26|       S|null| 135|Montreal, PQ / Ch...|
+------+--------+--------------------+------+------+-----+-----+------+--------+-------+--------+

Fisrt we need to do a clean data work. We´re going to see the variables, null values, how we will impute them, etc.

We can see that if we select the home.dest column, we obtain an error, that implies that probably
we need to change the name of the column, let´s see.

First, we need to decide what columns mantain in the analysis, so then we won´t make work that doesnt is useful.

In [5]:
from pyspark.sql.functions import isnan, when, count, col


In [6]:
Titanic.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in Titanic.columns]).show()


+------+--------+----+---+---+-----+-----+------+----+-----+--------+----+----+---------+
|pclass|survived|name|sex|age|sibsp|parch|ticket|fare|cabin|embarked|boat|body|home_dest|
+------+--------+----+---+---+-----+-----+------+----+-----+--------+----+----+---------+
|     0|       0|   0|  0|263|    0|    0|     0|   1| 1014|       2| 823|1188|      564|
+------+--------+----+---+---+-----+-----+------+----+-----+--------+----+----+---------+



As we can see there are a lot of null values in body,boat,cabin,home_dest

In [7]:
numerical_columns=["age","sibsp","parch","fare"]

In [8]:
Titanic.select(numerical_columns).summary().show()

+-------+------------------+------------------+------------------+-----------------+
|summary|               age|             sibsp|             parch|             fare|
+-------+------------------+------------------+------------------+-----------------+
|  count|              1046|              1309|              1309|             1308|
|   mean|  29.8811345124283|0.4988540870893812|0.3850267379679144|33.29547928134572|
| stddev|14.413499699923596|1.0416583905961012|0.8655602753495143|51.75866823917421|
|    min|            0.1667|                 0|                 0|              0.0|
|    25%|              21.0|                 0|                 0|           7.8958|
|    50%|              28.0|                 0|                 0|          14.4542|
|    75%|              39.0|                 1|                 0|           31.275|
|    max|              80.0|                 8|                 9|         512.3292|
+-------+------------------+------------------+------------------

In [9]:
titanic_train,titanic_test=Titanic.randomSplit([0.7,0.3])

Let´s imputte the variables like this:
- Age

Impute the age with the mean of the pclass of each row.

- Cabin

Fill with an Unknown, and only place the letter of the cabin, since there are a lot.

- Embarked

The most frecuent value


- fare

It´s 1 value only, the mean of all the train data.



In [10]:
import pyspark.sql.functions as f
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType

In [11]:
def impute_variable(data):
    Dictionary_means=data.select(["age","pclass"]).groupBy("pclass").mean()
    data=data.join(Dictionary_means.select(["pclass","avg(age)"]),on="pclass",how="left")
    # Round the avg(column)
    data=data.withColumn("avg(age)",f.round(data["avg(age)"],0))
    # Replace the null values with the mean of the class
    data=data.withColumn("age",f.when(data["age"].isNull(),data["avg(age)"]).otherwise(Titanic["age"]))
    ### Cabin
    udf=UserDefinedFunction(lambda x: x[0],StringType())
    # Fill the null values with an U string
    data=data.withColumn("cabin",f.when(data["cabin"].isNull(),"U").otherwise(data["cabin"]))
    # Select only the letter of the boat
    data=data.withColumn("cabin",udf(data["cabin"]))
    ### Embarked: Replace with the most common port of embarcation S
    data=data.withColumn("embarked",f.when(data["embarked"].isNull(),"S").otherwise(data["embarked"]))
    ## Fare
    mean_fare=data.agg(f.avg(f.col("fare"))).collect()[0][0]
    data=data.withColumn("fare",f.when(data["fare"].isNull(),mean_fare).otherwise(data["fare"]))
    # Now let´s drop the useless columns
    data=data.drop(*["body","home_dest","boat","name","ticket","avg(age)"])
    return(data)
    
    


Why did we create this function instead of apply to the full Titanic dataset?
Because we need to separate completly the data train and data test, and test data must not influence
to the train data.

In [12]:
titanic_train=impute_variable(titanic_train)
titanic_test=impute_variable(titanic_test)

In [13]:
Titanic=impute_variable(Titanic)

Show the result:

In [14]:
titanic_train.show()

+------+--------+------+----+-----+-----+--------+-----+--------+
|pclass|survived|   sex| age|sibsp|parch|    fare|cabin|embarked|
+------+--------+------+----+-----+-----+--------+-----+--------+
|     1|       0|  male|42.0|    0|    0|   26.55|    U|       S|
|     1|       0|female|25.0|    1|    2|  151.55|    C|       S|
|     1|       0|  male|71.0|    0|    0| 49.5042|    U|       C|
|     1|       0|  male|47.0|    1|    0| 227.525|    C|       C|
|     1|       0|  male|24.0|    0|    1|247.5208|    B|       C|
|     1|       0|  male|36.0|    0|    0| 75.2417|    C|       C|
|     1|       0|  male|25.0|    0|    0|    26.0|    U|       C|
|     1|       0|  male|41.0|    0|    0|    30.5|    A|       S|
|     1|       0|  male|48.0|    0|    0| 50.4958|    B|       C|
|     1|       0|  male|45.0|    0|    0|   26.55|    B|       S|
|     1|       0|  male|33.0|    0|    0|     5.0|    B|       S|
|     1|       0|  male|49.0|    0|    0|    26.0|    U|       S|
|     1|  

In [None]:
# te amo fer

Once we have this, we need to convert the categorical columms to integers and then apply OneHotEncoder.


In [15]:
from pyspark.ml.feature import StringIndexer

In [16]:
titanic_train.columns

['pclass',
 'survived',
 'sex',
 'age',
 'sibsp',
 'parch',
 'fare',
 'cabin',
 'embarked']

In [17]:
categorical_columns=["pclass","sex","cabin","embarked"]
numerical_columns=["age","sibsp","parch","fare"]

In [18]:
from pyspark.ml.feature import OneHotEncoder as OneHotEncoderEstimator
from pyspark.ml import Pipeline
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler

We need to train the OHE with all the dataset, since then of spplitting the data in train,test
sometimes it can be that are columns with more categorical values in some of the two datasets.

In [19]:
SI=[StringIndexer(inputCol=column,outputCol=column+"_num").fit(Titanic) for column in categorical_columns]
categorical_columns_num=[column+"_num" for column in categorical_columns]
categorical_columns_OHE=[column+"_OHE" for column in categorical_columns]
OHE=OneHotEncoderEstimator(inputCols=categorical_columns_num,outputCols=categorical_columns_OHE)
VA=VectorAssembler(inputCols=numerical_columns+OHE.getOutputCols(),outputCol="features")
pipeline=Pipeline(stages=SI+[OHE,VA]).fit(Titanic)
Titanic=pipeline.transform(Titanic)
# Now transform the train and data sets
titanic_train=pipeline.transform(titanic_train)
titanic_test=pipeline.transform(titanic_test)

With this we have done all the necessary transformations that need  the Machine Learning algorithms, since we have prepared the data as the model required.

### Random Forest

In [20]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [44]:
RF=RandomForestClassifier(labelCol="survived", featuresCol="features", numTrees=10).fit(titanic_train)
predictions_RF=RF.transform(titanic_test)
evaluator=BinaryClassificationEvaluator(labelCol="survived",metricName="areaUnderROC")

In [45]:
print("Area Under ROC Curve is:",str(evaluator.evaluate(predictions_RF)))

Area Under ROC Curve is: 0.8431495130297444


In [92]:
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.types import FloatType

In [94]:
#important: need to cast to float type, and order by prediction, else it won't work
preds_and_labels = predictions.select(['prediction','survived']).withColumn('label', f.col('survived').cast(FloatType())).orderBy('prediction')

#select only prediction and label columns
preds_and_labels = preds_and_labels.select(['prediction','label'])

metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))

#print(metrics.ConfusionMatrix().toArray())

In [98]:
metrics.confusionMatrix().toArray()

array([[187.,  37.],
       [ 45., 119.]])

### SVM

In [100]:
from pyspark.ml.classification import LinearSVC

In [122]:
SVC=LinearSVC(labelCol="survived",featuresCol="features",maxIter=10,regParam=0.5).fit(titanic_train)

In [123]:
predictions=SVC.transform(titanic_test)

In [124]:
print("Area Under ROC Curve is:",str(evaluator.evaluate(predictions)))

Area Under ROC Curve is: 0.8216599520905928


As we can see the AUC is worse than RandomForest.

### Logistic Regression

Now we will try Logistic Regression

In [28]:
from pyspark.ml.classification import LogisticRegression

In [31]:
Logistic_Regression_model=LogisticRegression(featuresCol="features",labelCol="survived",
                                            standardization=True).fit(titanic_train)

In [32]:
prediction=Logistic_Regression_model.transform(titanic_test)

In [34]:
print("Area Under ROC Curve is:",str(evaluator.evaluate(predictions)))

Area Under ROC Curve is: 0.8431495130297444


Finally we're going to implement Gradient-Boosted Trees algorithm for classfification.

### GBTClassifier

In [35]:
from pyspark.ml.classification import GBTClassifier

In [36]:
GBT_model=GBTClassifier(labelCol="survived",minInstancesPerNode=3).fit(titanic_train)

In [42]:
predictions_GBT=GBT_model.transform(titanic_test)

In [43]:
print("Area Under ROC Curve is:",str(evaluator.evaluate(predictions_GBT)))

Area Under ROC Curve is: 0.8625460647538823


Surprisingly the GBTClssifier has achieved a better AUC metric than the Random Forest algorithm, and the others did a great job too; but now we're going too choose one of the best of them by using recall(TPR or sensitivity),specificity and the F-Score metrics.

In [47]:
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.types import FloatType

In [77]:
def evaluate_model(predictions):
    #important: need to cast to float type, and order by prediction, else it won't work
    preds_and_labels = predictions.select(['prediction','survived']).withColumn('label', f.col('survived').cast(FloatType())).orderBy('prediction')
    #select only prediction and label columns
    preds_and_labels = preds_and_labels.select(['prediction','label'])
    metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))
    U=metrics.confusionMatrix().toArray()
    Recall=U[1,1]/sum(U[1,])
    specificity=U[0,0]/sum(U[0,])
    Precision=U[1,1]/sum(U[:,1])
    print("The Recall is:")
    print(Recall)
    print("The Specificity is:")
    print(specificity)
    print("The precision is:")
    print(Precision)
    print("The F-Score is:")
    print((2*Precision*Recall/(Precision+Recall)))
    
    

In [80]:
evaluate_model(predictions_GBT)

The Recall is:
0.7022900763358778
The Specificity is:
0.9008620689655172
The precision is:
0.8
The F-Score is:
0.7479674796747967


In [79]:
evaluate_model(predictions_RF)

The Recall is:
0.5343511450381679
The Specificity is:
0.9439655172413793
The precision is:
0.8433734939759037
The F-Score is:
0.6542056074766355


Now the million question is: Wich one do you prefer?

I would say it depends. If we are more interested in identify the people who are more likely
of survived I choose the Random Forest, if it is the opposite I should prefer the GBT.
But if we take atention to the F-score, wich merge the two metric I should prefer the
GBTClassfier.