# Survival prediction using Spark

I use the following classification models:
 * Logistic Regression 
 * SVM *(there's a chance the data is simple enough to be linearly separable)*
 * Naive Bayes *(the simplest model one can think of but sometimes it shows unexpectedly good results, so I think it's interesting to see what it can do with Titanic data)*

Start with settings, then load the data

In [1]:
import os

In [2]:
os.environ["JAVA_HOME"] = '/usr/lib/jvm/java-8-oracle'
os.environ["PYSPARK_SUBMIT_ARGS"] = 'pyspark-shell'

In [3]:
os.environ["SPARK_HOME"] = '/home/asti/spark-2.2.0-bin-hadoop2.7/'

In [4]:
import sys

sys.path.append(os.environ['SPARK_HOME']+"/python")

In [5]:
sys.path.append(os.environ['SPARK_HOME']+"/python/lib/py4j-0.10.4-src.zip")

In [6]:
import py4j
from pyspark import SparkContext, SparkConf, SQLContext

In [7]:
from pyspark.mllib.regression import LabeledPoint, LinearRegressionModel, LinearRegressionWithSGD
import numpy as np

In [8]:
conf = (SparkConf().setMaster("local")
        .setAppName("Homework")
        .set("spark.executor.memory", "2g"))

In [9]:
sc = SparkContext(conf=conf)

In [10]:
sqlcontext = SQLContext(sc)

In [28]:
df = sqlcontext.read.format(
    'com.databricks.spark.csv').options(
    header='true').load('train.csv')

In [29]:
df.printSchema()

root
 |-- PassengerId: string (nullable = true)
 |-- Survived: string (nullable = true)
 |-- Pclass: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- SibSp: string (nullable = true)
 |-- Parch: string (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: string (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



Some of the columns will go with *'string'* type just fine, but some should be casted

* Should be **integers**: ticket class (**Pclass**), survival indicator (**Survived**), number of family members onboard (**SibSp** and **Parch**)
* Should be **boolean**: gender (**Sex**)
* Will go with **float/double**: **Age**, **Fare**

# Preprocessing and feature generation

Probably an absolute value of one's age isn't as important as the age group this person belongs to. But before dividing age values into groups let's preprocess it first.

Some values are missed in *Age* column. I'm going to cast the column to double type and then fill all the gaps with the value of mean age in dataset

In [13]:
from pyspark.sql.functions import udf
from pyspark.sql import types

## Process age

In [30]:
def parse_age(str_age):
    try:
        return float(str_age)
    except:
        return None
    
change_age = udf(parse_age, types.StringType())
df = df.withColumn('Age', change_age(df['Age']))

df = df.withColumn("Age",df["Age"].cast(types.DoubleType()).alias("Age"))

In [31]:
mean_age = df.approxQuantile("Age", [0.5], 0.1)[0]

In [32]:
def fix_and_divide_age(age):
    fixed = age if age else mean_age
    # newborns and babies
    if fixed < 3.0:
        return 1
    # little children
    if 3.0 <= fixed < 10.0:
        return 2
    # teenagers
    if 10.0 <= fixed < 17.0:
        return 3
    if 17.0 <= fixed < 22.0:
        return 4
    if 22.0 <= fixed < 35.0:
        return 5
    if 35.0 <= fixed < 50.0:
        return 6
    else:
        return 7
    
fix_age = udf(fix_and_divide_age, types.IntegerType())
df = df.withColumn('Age', fix_age(df['Age']))

## Get titles from names

For this dataset it's common to extract titles from given names (e.g. Mr., Mrs.).

There are not enough data for titles like, 'Don', 'Dr', and Dutch 'Jonkheer' to have considerable impact on predictions, so I'm going to replace these with corresponding common English titles

In [35]:
title_list=['Mrs', 'Mr', 'Master', 'Miss', 'Major', 'Rev',
            'Dr', 'Ms', 'Mlle','Col', 'Capt', 'Mme',
            'Countess', 'Don', 'Jonkheer']

def replace_title(name, sex):
    def find_title(name):
        for title in title_list:
            if title in name:
                return title
        return None
    
    title = find_title(name)
    
    if title in ['Don', 'Major', 'Capt', 'Jonkheer', 'Rev', 'Col']:
        return 'Mr'
    elif title in ['Countess', 'Mme']:
        return 'Mrs'
    elif title in ['Mlle', 'Ms']:
        return 'Miss'
    elif title =='Dr':
        if sex=='male':
            return 'Mr'
        else:
            return 'Mrs'
    else:
        return title

get_title = udf(replace_title, types.StringType())
df = df.withColumn('Title', get_title(df['Name'], df['Sex']))

## Fill missed data and get info about decks

In [37]:
# fix 'Embarked' column, fill gaps with 'Unknown' value
def Embarked_transform(x):
    if x != None:
        return x
    else:
        return 'Unk'
        
get_embarked = udf(Embarked_transform, types.StringType())
df = df.withColumn('Embarked', get_embarked(df['Embarked']))

All the cabins were marked according to the deck they were on. Maybe where people were located on the ship had some impact on their survival

In [38]:
def cabin_transform(cabin):
    if cabin:
        return cabin[0]
    else:
        return 'Unk'
    
get_deck = udf(cabin_transform, types.StringType())
df = df.withColumn('Deck', get_deck(df['Cabin']))

## Encoding

In [39]:
# encode 'Embarked', 'Deck', 'Age' and 'Title' columns
from pyspark.ml.feature import OneHotEncoder, StringIndexer

# --------
embarkedIndexer = StringIndexer(inputCol="Embarked", outputCol="EmbarkedIndex")
e_model = embarkedIndexer.fit(df)
e_indexed = e_model.transform(df)

e_encoder = OneHotEncoder(inputCol="EmbarkedIndex", outputCol="EmbarkedVec")
df = e_encoder.transform(e_indexed)
# --------
# --------
deckIndexer = StringIndexer(inputCol='Deck', outputCol='DeckIndex')
d_model = deckIndexer.fit(df)
d_indexed = d_model.transform(df)

d_encoder = OneHotEncoder(inputCol='DeckIndex', outputCol='DeckVec')
df = d_encoder.transform(d_indexed)
# --------
# --------
age_encoder = OneHotEncoder(inputCol='Age', outputCol='AgeVec')
df = age_encoder.transform(df)
# --------
# --------
titleIndexer = StringIndexer(inputCol='Title', outputCol='TitleIndex')
t_model = titleIndexer.fit(df)
t_indexed = t_model.transform(df)

t_encoder = OneHotEncoder(inputCol='TitleIndex', outputCol='TitleVec')
df = t_encoder.transform(t_indexed)

## Finally getting data in the needed form for training

In [42]:
def transform(row):
    return LabeledPoint(
        int(row.Survived),
        [
            int(row.Pclass),
            row.Sex == 'male',        
            int(row.SibSp),
            int(row.Parch),
            float(row.Fare),
        ]
        + list(row.TitleVec.toArray())
        + list(row.AgeVec.toArray())
        + list(row.DeckVec.toArray())
        + list(row.EmbarkedVec.toArray())
    )

In [43]:
data = df.rdd.map(transform)

In [45]:
train, test = data.randomSplit([0.7, 0.3])

In [46]:
from pyspark.mllib.evaluation import MulticlassMetrics
def print_metrics(cls, test_data):
    def f1_score(metrics):
        return 2 * (metrics.recall(1.0) * metrics.precision(1.0)) / (metrics.recall(1.0) + metrics.precision(1.0))
    
    scoreAndLabels = test_data.map(lambda x : (float(cls.predict(x.features)), x.label))
    metrics = MulticlassMetrics(scoreAndLabels)
    
    print("Accuracy of the model: {}" .format(metrics.accuracy))
    print("          with recall: {}" .format(metrics.recall(1.0)))
    print("        and precision: {}" .format(metrics.precision(1.0)))
    
    print("\nF1 score of the model: {}" .format(f1_score(metrics)))

# Logistic Regression

In [47]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS

lg = LogisticRegressionWithLBFGS.train(train)

In [48]:
print_metrics(lg, test)

Accuracy of the model: 0.818532818533
          with recall: 0.704761904762
        and precision: 0.822222222222

F1 score of the model: 0.758974358974


# SVM
*with SGD update*

In [49]:
from pyspark.mllib.classification import SVMWithSGD

svm = SVMWithSGD.train(train, iterations=1000, step=0.01)

In [50]:
print_metrics(svm, test)

Accuracy of the model: 0.698841698842
          with recall: 0.304761904762
        and precision: 0.864864864865

F1 score of the model: 0.450704225352


# Naive Bayes Classifier

In [51]:
from pyspark.mllib.classification import NaiveBayes

nb = NaiveBayes.train(data)

In [52]:
print_metrics(nb, test)

Accuracy of the model: 0.725868725869
          with recall: 0.485714285714
        and precision: 0.75

F1 score of the model: 0.589595375723
