# Titanic Disaster Analysis

## Imports

In [1]:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.udf

import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.ml.classification.{RandomForestClassifier, BinaryLogisticRegressionTrainingSummary, RandomForestClassificationModel, LogisticRegression, LogisticRegressionModel, BinaryLogisticRegressionSummary}
import org.apache.spark.ml.feature.{StringIndexer, OneHotEncoder, VectorAssembler}
import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator, MulticlassClassificationEvaluator}

## Spark Init / Reading Data

In [2]:
val spark = SparkSession.builder().appName("titanic").getOrCreate() // start the spark session
val passengers = spark.read.option("header", "true").csv("train.csv").as("Passenger") // read in data set
passengers.cache()

[PassengerId: string, Survived: string ... 10 more fields]

## Count nulls, avg age, do null replacement

In [3]:
val passengers_reduced = passengers.drop("PassengerId", "Name", "Ticket") // drop the passenger id and the name, as they should not be significant

// find null values in a column
println("Cabin has null count of: " + passengers_reduced.filter("Cabin is null").count())
println("Age has null count of: " + passengers_reduced.filter("Age is null").count())
println("Embarked has null count of: " + passengers_reduced.filter("Embarked is null").count())

// calculate average for age column
val avgAge = passengers_reduced.filter("Age is not null").agg(avg("Age")).collect()(0)(0)
println("Average age is: " + avgAge)

// fill null values
// Replace null age values with the mean age
// Replace null ports of embarkation with port "U" for unknown
// Replace null cabin field with cabin "U" for unknown
val pnc = passengers_reduced.na.fill(Map("Age" -> avgAge,
                                         "Embarked" -> "U",
                                         "Cabin" -> "U"))

Cabin has null count of: 687
Age has null count of: 177
Embarked has null count of: 2
Average age is: 29.69911764705882


## Fix Fare and Age columns, Index String Columns + Turn into one-hot features

In [4]:
// cast the Fare and Age columns to doubles
var df = pnc.select(pnc.col("*"),
                    pnc("Fare").cast("double").alias("FareDbl"),
                    pnc("Age").cast("double").alias("AgeDbl")).drop("Fare", "Age")

//df.describe(df.drop("Sex", "Pclass").columns: _*).show()

val indexer = new StringIndexer()
val encoder = new OneHotEncoder()

var column_name = ""

for ( column_name <- List("Sex", "Pclass", "SibSp", "Parch", "Cabin", "Embarked") ) {
    indexer.setInputCol(column_name).setOutputCol(column_name+"_ind")
    df = indexer.fit(df).transform(df)
    
    encoder.setInputCol(column_name+"_ind").setOutputCol(column_name+"_vec")
    df = encoder.transform(df).drop(column_name, column_name + "_ind")
} 

## Assemble feature vectors and label column

In [5]:
val featureAssembler = new VectorAssembler().setInputCols(Array("FareDbl", "AgeDbl", "Sex_vec", "Pclass_vec", "SibSp_vec", "Parch_vec", "Embarked_vec")).setOutputCol("features")

indexer.setInputCol("Survived").setOutputCol("Survived"+"_ind")
df = indexer.fit(df).transform(df).drop("Survived").withColumnRenamed("Survived_ind", "Survived")

df = featureAssembler.transform(df)
df = df.withColumnRenamed("Survived", "label")
    
val lr_model = new LogisticRegression() // Initialize LR model
val rf_model = new RandomForestClassifier() // Initialize RF model

## Initialize and fit pipeline (Logistic Regression)

In [6]:
val generic_pipeline = new Pipeline()
val lr_pipeline = generic_pipeline.setStages(Array(lr_model))
val lr_fit = lr_pipeline.fit(df)

## Evaluate model performance (Logistic Regression)

In [7]:
var accuracy_evaluator = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("accuracy")
var emptyGrid = new ParamGridBuilder().build()
var crossVal = new CrossValidator().setEstimator(lr_pipeline).setEvaluator(accuracy_evaluator).setEstimatorParamMaps(emptyGrid).setNumFolds(5)

var cvFit = crossVal.fit(df)
var metric = cvFit.avgMetrics(0)
println("5-fold cross-validated accuracy: " + metric)

5-fold cross-validated accuracy: 0.8068410173417301


In [8]:
var lrSummary = lr_fit.stages(0).asInstanceOf[LogisticRegressionModel].summary.asInstanceOf[BinaryLogisticRegressionSummary]
val loss = lr_fit.stages(0).asInstanceOf[LogisticRegressionModel].summary.asInstanceOf[BinaryLogisticRegressionTrainingSummary].objectiveHistory
var roc = lrSummary.roc
var pr = lrSummary.pr

In [20]:
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._

val loss_df = sc.parallelize(loss.zipWithIndex).toDF("Loss", "Iteration")

## Fit pipeline (Random Forest)

In [11]:
val rf_pipeline = generic_pipeline.setStages(Array(rf_model))
val rf = rf_pipeline.fit(df)

## Evaluate model performance (Random Forest)

In [12]:
var accuracy_evaluator2 = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("accuracy")
var emptyGrid2 = new ParamGridBuilder().build()
var crossVal2 = new CrossValidator().setEstimator(rf_pipeline).setEvaluator(accuracy_evaluator2).setEstimatorParamMaps(emptyGrid2).setNumFolds(5)

var cvFit2 = crossVal2.fit(df)
var metric2 = cvFit2.avgMetrics(0)
println("5-fold cross-validated accuracy: " + metric2)

5-fold cross-validated accuracy: 0.8139402707453315


In [13]:
val model = rf.stages(0).asInstanceOf[RandomForestClassificationModel]
model.featureImportances

(20,[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19],[0.11411160350075174,0.08753538928891272,0.41360618625197015,0.17761604620597035,0.027962032058452213,0.018279788287787,0.021293682282097118,0.004843686249781529,0.007663806223508535,0.008284514936144603,0.004745572354877171,0.030004097609481712,0.017526164974931153,0.008442423968666853,0.003178353686799715,0.0012632909760254927,0.001643775011788731,0.02745462947340188,0.01658124029527312,0.00796371636337827])

## Run test set through to generate predictions

In [14]:
// now to put the test set through the pipeline
val passengers_test = spark.read.option("header", "true").csv("test.csv").as("Passenger") // read in data set
passengers_test.cache()

val passengers_reduced = passengers_test.drop("Ticket") // drop the passenger id and the name, as they should not be significant

// find null values in a column
println("Cabin has null count of: " + passengers_reduced.filter("Cabin is null").count())
println("Age has null count of: " + passengers_reduced.filter("Age is null").count())
println("Embarked has null count of: " + passengers_reduced.filter("Embarked is null").count())

// fill null values
// Replace null age values with the mean age
// Replace null ports of embarkation with port "U" for unknown
// Replace null cabin field with cabin "U" for unknown
val pnc = passengers_reduced.na.fill(Map("Age" -> avgAge,
                                         "Embarked" -> "U",
                                         "Cabin" -> "U",
                                         "Fare" -> 0,
                                         "Pclass" -> 0,
                                         "SibSp" -> 0,
                                         "Parch" -> 0))
                                         
// cast the Fare and Age columns to doubles
var df = pnc.select(pnc.col("*"),
                    pnc("Fare").cast("double").alias("FareDbl"),
                    pnc("Age").cast("double").alias("AgeDbl")).drop("Fare", "Age")

//df.describe(df.drop("Sex", "Pclass").columns: _*).show()

val indexer = new StringIndexer()
val encoder = new OneHotEncoder()

var column_name = ""

for ( column_name <- List("Sex", "Pclass", "SibSp", "Parch", "Cabin", "Embarked") ) {
    indexer.setInputCol(column_name).setOutputCol(column_name+"_ind")
    df = indexer.fit(df).transform(df)
    
    encoder.setInputCol(column_name+"_ind").setOutputCol(column_name+"_vec")
    df = encoder.transform(df).drop(column_name, column_name + "_ind")
} 

val featureAssembler = new VectorAssembler().setInputCols(Array("FareDbl", "AgeDbl", "Sex_vec", "Pclass_vec", "SibSp_vec", "Parch_vec", "Embarked_vec")).setOutputCol("features")

df = featureAssembler.transform(df)

Cabin has null count of: 327
Age has null count of: 86
Embarked has null count of: 0


In [15]:
val test_preds = rf.transform(df)

In [16]:
var preds_kaggle = test_preds.select("PassengerId","prediction").withColumnRenamed("Prediction", "Survived").withColumnRenamed("PassengerId", "PassengerID")

In [17]:
val n = preds_kaggle.na.drop()
var nnew = n.select(n("PassengerId").alias("PassengerID"),n("Survived").cast("Int"))
nnew.coalesce(1).write.option("header", "true").csv(path="finalpreds")