#Titanic Survivors Machine Learning Application

#![Spark Logo](http://spark-mooc.github.io/web-assets/images/ta_Spark-logo-small.png) + ![Python Logo](http://spark-mooc.github.io/web-assets/images/python-logo-master-v3-TM-flattened_small.png) + ![titanic](https://upload.wikimedia.org/wikipedia/commons/thumb/f/fd/RMS_Titanic_3.jpg/300px-RMS_Titanic_3.jpg)


** This notebook covers: **
* *Part 0: Import Libraries to Use for this Notebook*
* *Part 1: Import Titanic Dataset as a Spark Dataframe*
* *Part 2: Choosing Features and Cleaning Data*
* *Part 3: Working with Categorical Features and Generating Training/Testing Datasets*
* *Part 4: Train and Test a Logistic Regression Model*
* *Part 5: Train and Test a Decision Tree Model*
* *Part 6: Comparing the Logistic Regression Model to the Decision Tree Model*


## Dataset Definition
Titanic dataset from [kaggle](https://www.kaggle.com/c/titanic/data).

### Data Dictionary
* survival - Survival (0 = No, 1 = Yes)
* pclass - Ticket class	(1 = 1st, 2 = 2nd, 3 = 3rd)
* sex - Sex	
* Age - Age in years	
* sibsp - # of siblings / spouses aboard the Titanic	
* parch	- # of parents / children aboard the Titanic	
* ticket - Ticket number
* fare - Passenger fare
* cabin - Cabin number
* embarked - Port of Embarkation (C = Cherbourg, Q = Queenstown, S = Southampton)

#### Variable Notes
* pclass: A proxy for socio-economic status (SES)
  * 1st = Upper
  * 2nd = Middle
  * 3rd = Lower
* age: Age is fractional if less than 1. If the age is estimated, is it in the form of xx.5
* sibsp: The dataset defines family relations in this way...
  * Sibling = brother, sister, stepbrother, stepsister
  * Spouse = husband, wife (mistresses and fiancés were ignored)
* parch: The dataset defines family relations in this way...
  * Parent = mother, father
  * Child = daughter, son, stepdaughter, stepson
  * Some children travelled only with a nanny, therefore parch=0 for them.

# Part 0: Import Libraries to use for this Notebook

In [3]:
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType, StringType, IntegerType
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Part 1: Import Titanic Dataset as a Spark Dataframe

## Create a table named "titanic" from the train.csv file from Kaggle

In [5]:
# Read data from the newly created titanic table into a dataframe
titanic_df = sqlContext.sql("SELECT * FROM titanic")
display(titanic_df)

# Part 2: Choosing Features and Cleaning Data

## What are we predicting?
Our goal for this application will be to predict if a passenger survived the sinking of the Titanic or not given features about the passenger. Our label which we are trying to predict will be the __Survived__ column.

## Feature Columns
* __Pclass__: Categorical
* __Sex__: Categorical
* __Age__: Float
* __SibSp__: Categorical
* __Parch__: Categorical
* __Fare__: Float
* __Embarked__: Categorical

## Cleaning Data Before Moving On
There are null values in the Titanic dataset for some columns we will be using. We will now select only columns we will be using as features or labels and then filter to only keep rows with non-null values. Our new filtered dataframe will be called __filtered_df__.

__Note:__ In a real world application it may be desirable to transform null column values to non-null values depending on the context of the application and the data.

In [7]:
# Select only the columns we will use for features or label
tmp_df = titanic_df.select(col('Survived').cast(IntegerType()).alias('label'), 
                           col('Pclass').cast(StringType()).alias('Pclass'), 
                           col('Sex').cast(StringType()).alias('Sex'), 
                           col('Age').cast(DoubleType()).alias('Age'),
                           col('SibSp').cast(StringType()).alias('SibSp'), 
                           col('Parch').cast(StringType()).alias('Parch'), 
                           col('Fare').cast(DoubleType()).alias('Fare'),
                           col('Embarked').cast(StringType()).alias('Embarked')
                          )

# Filter any rows from tmp_df that have null values in any of their columns
g = ((col('label').isNotNull()) & 
     (col('Pclass').isNotNull()) & 
     (col('Sex').isNotNull()) & 
     (col('Age').isNotNull()) & 
     (col('SibSp').isNotNull()) & 
     (col('Parch').isNotNull()) & 
     (col('Fare').isNotNull()) &
     (col('Embarked').isNotNull())
    )
filtered_df = tmp_df.where(g)

print("%d rows in original dataset\n%d rows in filtered dataset\n\n%d rows filtered with null values" % (titanic_df.count(), filtered_df.count(), titanic_df.count()-filtered_df.count()))

In [8]:
display(filtered_df)

# Part 3: Working with Categorical Features and Generating Training/Testing Datasets
Categorical features need special treatment since mathematically the ordering of categories is arbitrary.
For example, the __Sex__ column contains values (male and female) which could be converted to (0 and 1) but this is not good!
Better to use [one hot encoding](https://en.wikipedia.org/wiki/One-hot) which converts each feature into an _n_ dimensional vector where _n_ is the number of differnt categories for the feature.

Therefore, for the __Sex__ column we can use one hot encoding

male   -> [0,1]
female -> [1,0]

## One hot encoding in Spark
Let us now use Spark to apply one hot encoding on the __Sex__ column of data.

### Step 1: Get index from categorical columns
Convert variables of strings to an index. For example

data = ['male', 'male', 'female', 'male', 'female']

would become

indexedData = [0, 0, 1, 0, 1]

In [10]:
# Create a StringIndexer object for the Sex column and create an indexed Sex_numeric column
indexer = StringIndexer(inputCol="Sex", outputCol="Sex_numeric").fit(filtered_df)

# Use new indexer object to transform the filtered_df dataframe, adding a new column to it
indexed_df = indexer.transform(filtered_df)

# Display only the Sex and Sex_numeric columns for clarity
display(indexed_df.select(col('Sex'), col('Sex_numeric')))

### Step 2: Get One Hot Encoding Vector for Indexed Categorical Data

We will now use the indexed categorical data to create a one hot encoding vector for the __Sex__ column features. In Spark, sparse vectors are used which only shows values if they are non-zero in a map type way. 

For example:

[0, 0, 1, 2] -> (0, 2, [2, 3], [1, 2]) -> (first_numeric_value, last_numeric_value, [non-zero_indicies], [non-zero_values])

In [12]:
# Create a OneHotEncoder object for the Sex_numeric column as Sex_vector column
encoder = OneHotEncoder(inputCol="Sex_numeric", outputCol="Sex_vector")

# Use new encoder object to transform the indexed_df dataframe, adding a new column to it
encoded_df = encoder.transform(indexed_df)

# Display only the Sex, Sex_numeric, and Sex_vector columns for clarity
display(encoded_df.select(col('Sex'), col('Sex_numeric'), col('Sex_vector')))

### Step 3: Setup a Pipeline to Encode all Categorical Variables
Spark offers a [Pipeline workflow](http://spark.apache.org/docs/latest/ml-pipeline.html) for use with Dataframes.

#### Main Components
* Transformers: Transform a Dataframe into another Dataframe using .transform(), typically through adding a column.
* Estimators: An algorithm which can be .fit() onto a Dataframe to produce a Transformer.
* Pipeline: Object containing stages of Transformers and Estimators.

We have already used 2 different Transformers and an Estimator to get our categorical feature vector.
* __StringIndexer__ (Estimator) was used fit to the filteredDF Dataframe to produce a Transformer __indexer__.
* __indexer__ (Transformer) was used to transform the filteredDF Dataframe to produce a new Dataframe, indexedDF.
* __OneHotEncoder__ (Transformer) was used to transform indexedDf Dataframe to encodedDF Dataframe.

In [14]:
# Columns containing categorical data
# Create binary featuers with OneHotEncoder for these columns
cols = ['Pclass', 'Sex', 'SibSp', 'Parch', 'Embarked']

# Create a list of indexers, 1 per categorical column
indexers = [
    StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c))
    for c in cols
]

# Create a list of encoders, 1 per indexer
encoders = [
    OneHotEncoder(
        inputCol=indexer.getOutputCol(),
        outputCol="{0}_encoded".format(indexer.getOutputCol())) 
    for indexer in indexers
]

# Create singular feature vector starting with numerical columns
# and appending encoded features
assembler = VectorAssembler(inputCols=['Age', 'Fare'] + [
  encoder.getOutputCol() for encoder in encoders], 
                            outputCol="features")

# Create pipeline with all stages
pipeline_stages = indexers + encoders + [assembler]
pipeline = Pipeline(stages=pipeline_stages)

# Create a pipeline model by fitting pipeline to filtered_df
pipeline_model = pipeline.fit(filtered_df)

# Get a new Dataframe with only labels and predictions using pipeline_model to transform filtered_df
features_label_df = pipeline_model.transform(filtered_df).select(col('features'), col('label'))

display(features_label_df)

## Step 4: Splitting the Dataframe into training and testing data
Now that we have a Dataframe with a __features__ and __label__ column, we are ready to split our data into testing and training sets, train a model using the training data, and test the model using the testing data.
We will use a 70/30 split for training/testing data. We would like to use as much data as possible to train the model while still having enough testing data to get good prediction statistics on the accuracy of the model.

In [16]:
# Get training and testing data randomly
(training_df, testing_df) = features_label_df.randomSplit([0.7, 0.3])
print("%d (%f percent) training data rows" % (training_df.count(), 100.0*training_df.count()/features_label_df.count()))
print("%d (%f percent) testing data rows" % (testing_df.count(), 100.0*testing_df.count()/features_label_df.count()))

# Part 4: Train and Test a Logistic Regression Model

## Step 1: Training a Logistic Regression Model
We will use [logistic regression](https://en.wikipedia.org/wiki/Logistic_regression) for our first classifier.
Logistic Regression is an algorithm which gives a probability [0,1] that a certain feature vector results in a label of 1.

For example, predicting if a student passes an exam based purely on a single feature, how many hours a student studies.

![example logistic regression](https://upload.wikimedia.org/wikipedia/commons/6/6d/Exam_pass_logistic_curve.jpeg)

In [18]:
# Create a Logistic Regression Estimator with default parameters
lr = LogisticRegression()

# Fit the Logistic Regression Estimator with the training data to create a Logistic Regression Model Transformer
lr_model = lr.fit(training_df)

# Get Dataframe of predictions using the lr_model Transfomer on the testing data
lr_predictions_df = lr_model.transform(testing_df)

display(lr_predictions_df)

## Step 2: Testing the Logistic Regression Model

We will be using the area under the curve (AUC) for both the Precision-Recall (PR) curve and the Receiver Operating Characteristic (ROC) curve.

__References__:
* [Precision-Recall curve](https://en.wikipedia.org/wiki/Precision_and_recall)
* [Receiver Operating Characteristic curve](https://en.wikipedia.org/wiki/Receiver_operating_characteristic)
* [Connection between PR and ROC curves](http://pages.cs.wisc.edu/~jdavis/davisgoadrichcamera2.pdf)

In [20]:
# Create an evaluation object
lr_evaluator = BinaryClassificationEvaluator()

# Get the AUC-ROC and AUC-PR values for the prediction Dataframe
lr_AUC_ROC = lr_evaluator.evaluate(lr_predictions_df, {lr_evaluator.metricName: "areaUnderROC"})
lr_AUC_PR = lr_evaluator.evaluate(lr_predictions_df, {lr_evaluator.metricName: "areaUnderPR"})

print("Area under the Precision Recall Curve: %f" % (lr_AUC_PR))
print("Area under the Receiver Operating Characteristic Curve: %f" % (lr_AUC_ROC))

# Part 5: Train and Test a Decision Tree Model

## Step 1: Training a Decision Tree Model
We will use [decision tree](https://en.wikipedia.org/wiki/Decision_tree) for our second classifier.
A decision tree is an algorithm which gives a hard label, (0 or 1), for a given feature vector.

![example decision tree](https://upload.wikimedia.org/wikipedia/commons/f/f3/CART_tree_titanic_survivors.png)

In [22]:
# Create a Decision Tree Estimator with default parameters
dt = DecisionTreeClassifier()

# Fit the Decision Tree Estimator with the training data to create a Decision Tree Model Transformer
dt_model = dt.fit(training_df)

# Get Dataframe of predictions using the dt_model Transfomer on the testing data
dt_predictions_df = dt_model.transform(testing_df)

display(dt_predictions_df)

## Step 2: Testing the Decision Tree Model

We will be again be using the area under the curve (AUC) for both the Precision-Recall (PR) curve and the Receiver Operating Characteristic (ROC) curve.

In [24]:
# Create an evaluation object
dt_evaluator = BinaryClassificationEvaluator()

# Get the AUC-ROC and AUC-PR values for the prediction Dataframe
dt_AUC_ROC = dt_evaluator.evaluate(dt_predictions_df, {dt_evaluator.metricName: "areaUnderROC"})
dt_AUC_PR = dt_evaluator.evaluate(dt_predictions_df, {dt_evaluator.metricName: "areaUnderPR"})

print("Area under the Precision Recall Curve: %f" % (dt_AUC_PR))
print("Area under the Receiver Operating Characteristic Curve: %f" % (dt_AUC_ROC))

# Part 6: Comparing the Logistic Regression Model to the Decision Tree Model

In [26]:
lr_sur_perr    = (lr_predictions_df.select(col('label'), col('prediction')).rdd.map(lambda x: (x[0], x[1])).filter(lambda (l,p): ((l==1) & (p==1))).count()*100.0/
                  lr_predictions_df.where(col('label')==1).count())
lr_notsur_perr = (lr_predictions_df.select(col('label'), col('prediction')).rdd.map(lambda x: (x[0], x[1])).filter(lambda (l,p): ((l==0) & (p==0))).count()*100.0/
                  lr_predictions_df.where(col('label')==0).count())
dt_sur_perr    = (dt_predictions_df.select(col('label'), col('prediction')).rdd.map(lambda x: (x[0], x[1])).filter(lambda (l,p): ((l==1) & (p==1))).count()*100.0/
                  dt_predictions_df.where(col('label')==1).count())
dt_notsur_perr = (dt_predictions_df.select(col('label'), col('prediction')).rdd.map(lambda x: (x[0], x[1])).filter(lambda (l,p): ((l==0) & (p==0))).count()*100.0/
                  dt_predictions_df.where(col('label')==0).count())

print("Logistic Regression")
print("  AUC-PR:                        %f" % (lr_AUC_PR))
print("  AUC_ROC:                       %f" % (lr_AUC_ROC))
print("  Survived percent error:        %f" % (lr_sur_perr))
print("  Did not survive percent error: %f" % (lr_notsur_perr))
print("\n")
print("Decision Tree")
print("  AUC-PR:                        %f" % (dt_AUC_PR))
print("  AUC_ROC:                       %f" % (dt_AUC_ROC))
print("  Survived percent error:        %f" % (dt_sur_perr))
print("  Did not survive percent error: %f" % (dt_notsur_perr))