# Predict Survival of the Titanic Passengers

Dataset Source: [https://ww2.amstat.org/publications/jse/v3n3/datasets.dawson.html](https://ww2.amstat.org/publications/jse/v3n3/datasets.dawson.html)

## Table of Contents
- [Load Libraries](#load_libraries)
- [Access data](#access_data)
- [Split Data into Training and Test set](#training_test)
- [Build Naive Bayes Model](#build_model)
- [Predict for Test data](#test_data)
- [Evaluate the Model](#evaluate_model)

<a id="load_libraries"></a>
## Load Libraries

The Spark and Python libraries that you need are preinstalled in the notebook environment and only need to be loaded.

Run the following cell to load the libraries you will work with in this notebook:

In [16]:
# PySpark Machine Learning Library
from pyspark.ml import Pipeline
from pyspark.ml.classification import  NaiveBayes, MultilayerPerceptronClassifier
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row, SQLContext

import os
import sys
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *

from pyspark.mllib.regression import LabeledPoint
from numpy import array

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Library for confusion matrix, precision, test error
from pyspark.mllib.evaluation import MulticlassMetrics
# Library For Area under ROC curve and Area under precision-recall curve
from pyspark.mllib.evaluation import BinaryClassificationMetrics

# Assign resources to the application
sqlContext = SQLContext(sc)

In [17]:
# The data will be loaded into an array.
# This is the summary of the data structure, including the column position and name.
# The first filed starts from position 0. 

# 0 Name    -  Passenger first and last name.
# 1 PClass  -  Passenger class (1st, 2nd, or 3rd)
# 2 Age
# 3 Sex
# 4 Survived -  1 if the passenger survived;  0 if the passenger did not survive
# 5 PersonID

# Label is a target variable. PersonInfo is a list of independent variables besides unique identifier

LabeledDocument = Row("PersonID", "PersonInfo", "label")

# Define a function that parses the raw CSV file and returns an object of type LabeledDocument

def parseDocument(line):
    values = [str(x) for x in line.split(',')] 
    if (values[4]>'0'):
      Survived = 1.0
    else:
      Survived = 0.0
        
    textValue = str(values[1]) + " " + str(values[2])+" " + str(values[3])
    return LabeledDocument(values[5], textValue, Survived)

<a id="access_data"></a>
## Access Data
Before you can access data in the data file in the Object Storage, you must setup the Spark configuration with your Object Storage credentials. 

To do this, click on the cell below and select the **Insert to code > Insert Spark Session DataFrame** function from the Files tab below the data file you want to work with.

<div class="alert alert-block alert-info">The following code contains the credentials for a file in your IBM Cloud Object Storage. Delete the code starting from `from pyspark.sql import SparkSession` line before you run the cell.</div>

In [18]:
# Object Storage Credentials
import ibmos2spark

# @hidden_cell
credentials = {
    'endpoint': 'https://s3-api.us-geo.objectstorage.service.networklayer.com',
    'api_key': 'lue6np0RwcvhARfJIQNtvVTUt3I45m5qk9UHM6LFTc3B',
    'service_id': 'iam-ServiceId-3ac1bdb0-c8a1-4ae8-abe0-2dd53ae4c6e9',
    'iam_service_endpoint': 'https://iam.ng.bluemix.net/oidc/token'}

configuration_name = 'os_9d550c7f6655453f915511631f5b077f_configs'
cos = ibmos2spark.CloudObjectStorage(sc, credentials, configuration_name, 'bluemix_cos')

Now let's load the data into a `Spark RDD` and output the number of rows and first 5 rows.
Each project you create has a bucket in your object storage. You can get the bucket name from the project Settings page. Change the string `BUCKET` to the bucket name

In [19]:
data = sc.textFile(cos.url('Titanic.csv', 'assignment331c5c83422a7434ab113135bf62f5fb0'))
print "Total records in the data set:", data.count()
print "The first 5 rows"
data.take(5)

Total records in the data set: 757
The first 5 rows


[u'Name,PClass,Age,Sex,Survived,PersonID',
 u'Allen Miss Elisabeth Walton,1st,29,female,1,1',
 u'Allison Miss Helen Loraine,1st,2,female,0,2',
 u'Allison Mr Hudson Joshua Creighton,1st,30,male,0,3',
 u'Allison Mrs Hudson JC (Bessie Waldo Daniels),1st,25,female,0,4']

Crate DataFrame from RDD

In [20]:
#Load the data into a dataframe, parse it using the function above
documents = data.filter(lambda s: "Name" not in s).map(parseDocument)
TitanicData = documents.toDF() # ToDataFrame
print "Number of records: " + str(TitanicData.count())
print "First 5 records: "
TitanicData.take(5)

Number of records: 756
First 5 records: 


[Row(PersonID=u'1', PersonInfo=u'1st 29 female', label=1.0),
 Row(PersonID=u'2', PersonInfo=u'1st 2 female', label=0.0),
 Row(PersonID=u'3', PersonInfo=u'1st 30 male', label=0.0),
 Row(PersonID=u'4', PersonInfo=u'1st 25 female', label=0.0),
 Row(PersonID=u'5', PersonInfo=u'1st 0.92 male', label=1.0)]

<a id="training_test"></a>
## Split Data into Training and Test set

We divide the data into training and test set.  The training set is used to build the model to be used on future data, and the test set is used to evaluate the model.

In [21]:
# Divide the data into training and test set
(train, test) = TitanicData.randomSplit([0.8, 0.2])
print "Number of records in the training set: " + str(train.count())
print "Number of records in the test set: " + str(test.count())
# Output first 20 records in the training set
print "First 20 records in the training set: "
train.show()

Number of records in the training set: 610
Number of records in the test set: 146
First 20 records in the training set: 
+--------+-------------+-----+
|PersonID|   PersonInfo|label|
+--------+-------------+-----+
|       1|1st 29 female|  1.0|
|      10|  1st 71 male|  0.0|
|     100|  1st 46 male|  0.0|
|     102|1st 21 female|  1.0|
|     103|  1st 48 male|  1.0|
|     104|1st 49 female|  1.0|
|     106|1st 36 female|  1.0|
|     107|  1st 55 male|  0.0|
|     108|1st 52 female|  1.0|
|      11|  1st 47 male|  0.0|
|     110|1st 16 female|  1.0|
|     111|1st 44 female|  1.0|
|     112|1st 51 female|  1.0|
|     114|1st 35 female|  1.0|
|     117|1st 35 female|  1.0|
|     118|1st 50 female|  0.0|
|     119|  1st 49 male|  1.0|
|     120|  1st 46 male|  0.0|
|     121|  1st 58 male|  0.0|
|     122|  1st 41 male|  0.0|
+--------+-------------+-----+
only showing top 20 rows



<a id="build_model"></a>
## Build Naive Bayes Model

We use the Pipeline of SparkML to build the Naive Bayes Model

In [22]:
# set up Naive Bayes using Pipeline of SparkML
tokenizer = Tokenizer(inputCol="PersonInfo", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
nb = NaiveBayes(labelCol="label", featuresCol="features", predictionCol="prediction", smoothing=1.0, modelType="multinomial")
pipeline = Pipeline(stages=[tokenizer, hashingTF, nb])

In [23]:
# set up Naive Bayes Model
# the stages are executed in order
model = pipeline.fit(train)

<a id="test_data"></a>
## Predict for Test data

In [24]:
# Make predictions on test documents and print columns of interest
prediction = model.transform(test)
selected = prediction.select("PersonInfo", "prediction", "probability")
for row in selected.collect():
    print row
#for row in prediction.collect():
#    print row

Row(PersonInfo=u'1st 25 male', prediction=0.0, probability=DenseVector([0.9003, 0.0997]))
Row(PersonInfo=u'1st 45 male', prediction=0.0, probability=DenseVector([0.7303, 0.2697]))
Row(PersonInfo=u'1st 24 female', prediction=1.0, probability=DenseVector([0.3208, 0.6792]))
Row(PersonInfo=u'1st 42 male', prediction=0.0, probability=DenseVector([0.8939, 0.1061]))
Row(PersonInfo=u'1st 35 male', prediction=0.0, probability=DenseVector([0.828, 0.172]))
Row(PersonInfo=u'1st 38 male', prediction=0.0, probability=DenseVector([0.9059, 0.0941]))
Row(PersonInfo=u'1st 19 female', prediction=1.0, probability=DenseVector([0.2394, 0.7606]))
Row(PersonInfo=u'1st 55 female', prediction=1.0, probability=DenseVector([0.2957, 0.7043]))
Row(PersonInfo=u'1st 15 female', prediction=1.0, probability=DenseVector([0.073, 0.927]))
Row(PersonInfo=u'1st 30 male', prediction=0.0, probability=DenseVector([0.9074, 0.0926]))
Row(PersonInfo=u'1st 65 male', prediction=0.0, probability=DenseVector([0.8784, 0.1216]))
Row(Pe

In [25]:
#Tabulate the predicted outcome
prediction.select("prediction").groupBy("prediction").count().show(truncate=False)

+----------+-----+
|prediction|count|
+----------+-----+
|0.0       |119  |
|1.0       |27   |
+----------+-----+



In [26]:
#Tabulate the actual outcome
prediction.select("label").groupBy("label").count().show(truncate=False)

+-----+-----+
|label|count|
+-----+-----+
|0.0  |85   |
|1.0  |61   |
+-----+-----+



In [27]:
# This table shows:
# 1. The number of survived passengers predicted as survived
# 2. The number of survived passengers predicted as not survived
# 3. The number of not survived passengers predicted as survived
# 4. The number of not survived passengers predicted as not survived

prediction.crosstab('label', 'prediction').show()

+----------------+---+---+
|label_prediction|0.0|1.0|
+----------------+---+---+
|             1.0| 36| 25|
|             0.0| 83|  2|
+----------------+---+---+



<a id="evaluate_model"></a>
## Evaluate the Model

We evaluate the model on a training set and on a test set.  The purpose is to measure the model's predictive accuracy, including the accuracy for new data.

In [28]:
# Evaluate the Naive Bayes model on a training set
# Select (prediction, true label) and compute training error
pred_nb=model.transform(train).select("prediction", "label")
eval_nb=MulticlassClassificationEvaluator (
    labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy_nb=eval_nb.evaluate(pred_nb)
# create RDD
predictionAndLabels_nb=pred_nb.rdd
metrics_nb=MulticlassMetrics(predictionAndLabels_nb)
precision_nb=metrics_nb.precision(1.0)
recall_nb=metrics_nb.recall(1.0)
f1Measure_nb = metrics_nb.fMeasure(1.0, 1.0)
print("F1 Measure = %s" % f1Measure_nb)
print ("Training Accuracy = %s" %accuracy_nb)
print ("Training Error = %s" % (1-accuracy_nb))
print ("Precision = %s" %precision_nb)
print ("Recall = %s" %recall_nb)

F1 Measure = 0.68671679198
Training Accuracy = 0.795081967213
Training Error = 0.204918032787
Precision = 0.931972789116
Recall = 0.543650793651


In [29]:
# Evaluate the Naive Bayes model on a test set
# Select (prediction, true label) and compute test error
pred_nb=model.transform(test).select("prediction", "label")
eval_nb=MulticlassClassificationEvaluator (
    labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy_nb=eval_nb.evaluate(pred_nb)
# create RDD
predictionAndLabels_nb=pred_nb.rdd
metrics_nb=MulticlassMetrics(predictionAndLabels_nb)
precision_nb=metrics_nb.precision(1.0)
recall_nb=metrics_nb.recall(1.0)
f1Measure_nb = metrics_nb.fMeasure(1.0, 1.0)
print("F1 Measure = %s" % f1Measure_nb)
print ("Test Accuracy = %s" %accuracy_nb)
print ("Test Error = %s" % (1-accuracy_nb))
print ("Precision = %s" %precision_nb)
print ("Recall = %s" %recall_nb)

F1 Measure = 0.568181818182
Test Accuracy = 0.739726027397
Test Error = 0.260273972603
Precision = 0.925925925926
Recall = 0.409836065574


In [30]:
bin_nb=BinaryClassificationMetrics(predictionAndLabels_nb)

# Area under precision-recall curve
print("Area under PR = %s" % bin_nb.areaUnderPR)
# Area under precision-recall curve
print("Area under ROC = %s" % bin_nb.areaUnderROC)

Area under PR = 0.791168666983
Area under ROC = 0.693153326905
