In [1]:
sc

In [2]:
# python dependencies
import pandas as pd
import re
from matplotlib import pyplot as plt

pd.set_option('max_columns', None)
pd.set_option('max_colwidth', -1)

%matplotlib inline

In [38]:
# pyspark dependencies
from pyspark.sql import functions as func
from pyspark.sql.functions import regexp_extract
from pyspark.sql.functions import col

from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassifier

from pyspark.ml.evaluation import BinaryClassificationEvaluator

from pyspark.ml import Pipeline

### 0. Reading the dataset: Training and Test Dataset

In [4]:
# reading files
train_path = 'titanic_train.csv'
test_path  = 'titanic_test.csv'

train_df = spark.read.format('csv').option('header', 'true').load(train_path)
test_df  = spark.read.format('csv').option('header', 'true').load(test_path)

train_df.show(5)
test_df.show(5)

+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex|Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male| 22|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female| 38|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female| 26|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female| 35|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male| 35|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+---+-----+-----+---------------

In [5]:
# print schema for both train and test dataset
train_df.printSchema()
test_df.printSchema()

root
 |-- PassengerId: string (nullable = true)
 |-- Survived: string (nullable = true)
 |-- Pclass: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- SibSp: string (nullable = true)
 |-- Parch: string (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: string (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)

root
 |-- PassengerId: string (nullable = true)
 |-- Pclass: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- SibSp: string (nullable = true)
 |-- Parch: string (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: string (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



### 1. Preprocessing

In [6]:
# Drop Ticket and Cabin columns
train_droppedCT = train_df.drop('Ticket', 'Cabin')
test_droppedCT  = test_df.drop('Ticket', 'Cabin')

# Verify columns are dropped by printing the schema
train_droppedCT.printSchema()
test_droppedCT.printSchema()

root
 |-- PassengerId: string (nullable = true)
 |-- Survived: string (nullable = true)
 |-- Pclass: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- SibSp: string (nullable = true)
 |-- Parch: string (nullable = true)
 |-- Fare: string (nullable = true)
 |-- Embarked: string (nullable = true)

root
 |-- PassengerId: string (nullable = true)
 |-- Pclass: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- SibSp: string (nullable = true)
 |-- Parch: string (nullable = true)
 |-- Fare: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [7]:
# Cast numeric types
train_fare = train_droppedCT.withColumn('Fare', train_droppedCT['Fare'].cast('double'))
test_fare  = test_droppedCT.withColumn('Fare', test_droppedCT['Fare'].cast('double'))

train_sibsp = train_fare.withColumn('SibSp', train_fare['SibSp'].cast('int'))
test_sibsp  = test_fare.withColumn('SibSp', test_fare['SibSp'].cast('int'))

train_parch = train_sibsp.withColumn('Parch', train_sibsp['Parch'].cast('int'))
test_parch  = test_sibsp.withColumn('Parch', test_sibsp['Parch'].cast('int'))

train_casted = train_parch.withColumn('PassengerId', train_droppedCT['PassengerId'].cast('int'))
test_casted  = test_parch.withColumn('PassengerId', test_droppedCT['PassengerId'].cast('int'))

train_agecasted = train_casted.withColumn('AgeC', train_droppedCT['Age'].cast('double')).drop('Age')
test_agecasted  = test_casted.withColumn('AgeC', test_droppedCT['Age'].cast('double')).drop('Age')

train_agecasted.printSchema()
test_agecasted.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: string (nullable = true)
 |-- Pclass: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Embarked: string (nullable = true)
 |-- AgeC: double (nullable = true)

root
 |-- PassengerId: integer (nullable = true)
 |-- Pclass: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Embarked: string (nullable = true)
 |-- AgeC: double (nullable = true)



In [8]:
# Fill null values for Embarked and AgeC
train_filled = train_agecasted.na.fill({'Embarked':'S', 'AgeC':-0.5})
test_filled  = test_agecasted.na.fill({'Embarked':'S', 'AgeC':-0.5})

train_filled.show(5)
test_filled.show(5)

+-----------+--------+------+--------------------+------+-----+-----+-------+--------+----+
|PassengerId|Survived|Pclass|                Name|   Sex|SibSp|Parch|   Fare|Embarked|AgeC|
+-----------+--------+------+--------------------+------+-----+-----+-------+--------+----+
|          1|       0|     3|Braund, Mr. Owen ...|  male|    1|    0|   7.25|       S|22.0|
|          2|       1|     1|Cumings, Mrs. Joh...|female|    1|    0|71.2833|       C|38.0|
|          3|       1|     3|Heikkinen, Miss. ...|female|    0|    0|  7.925|       S|26.0|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|    1|    0|   53.1|       S|35.0|
|          5|       0|     3|Allen, Mr. Willia...|  male|    0|    0|   8.05|       S|35.0|
+-----------+--------+------+--------------------+------+-----+-----+-------+--------+----+
only showing top 5 rows

+-----------+------+--------------------+------+-----+-----+-------+--------+----+
|PassengerId|Pclass|                Name|   Sex|SibSp|Parch|   F

#### Extract Prefix in Names

In [9]:
# Extract Prefix from Names
train_prefix = train_filled.withColumn('Prefix', regexp_extract(col('Name'), ' ([A-Za-z]+)\.', 1))
test_prefix = test_filled.withColumn('Prefix', regexp_extract(col('Name'), ' ([A-Za-z]+)\.', 1))

# Prefix cleaning
to_replace = {'Capt' : 'Rare',
              'Col' :'Rare',
              'Don' : 'Rare',
              'Dr' : 'Rare',
              'Major' : 'Rare',
              'Rev' : 'Rare',
              'Jonkheer' : 'Rare',
              'Dona' : 'Rare',
              'Countess' : 'Royal',
              'Lady' : 'Royal',
              'Sir' : 'Royal',
              'Mlle' : 'Miss',
              'Ms' : 'Miss',
              'Mme' : 'Mrs'}

train_prefix_changed = train_prefix.na.replace(to_replace, 1, 'Prefix')
test_prefix_changed  = test_prefix.na.replace(to_replace, 1, 'Prefix')

train_prefix_changed.groupBy('Prefix').count().show()
test_prefix_changed.groupBy('Prefix').count().show()



+------+-----+
|Prefix|count|
+------+-----+
|  Miss|  185|
|Master|   40|
|    Mr|  517|
| Royal|    3|
|   Mrs|  126|
|  Rare|   20|
+------+-----+

+------+-----+
|Prefix|count|
+------+-----+
|  Miss|   79|
|Master|   21|
|    Mr|  240|
|   Mrs|   72|
|  Rare|    6|
+------+-----+



#### Impute Age based on Average Age for each Prefix

In [10]:
# Average Age per Prefix
avePrefix = train_prefix_changed.groupBy('Prefix').agg(func.avg('AgeC')).withColumnRenamed('avg(AgeC)', 'AgeImpute')
avePrefix.show()

+------+------------------+
|Prefix|         AgeImpute|
+------+------------------+
|  Miss|17.497297297297298|
|Master| 4.066750000000001|
|    Mr|24.802707930367504|
| Royal|43.333333333333336|
|   Mrs|30.892857142857142|
|  Rare|            43.575|
+------+------------------+



In [11]:
train_prefix_changed.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: string (nullable = true)
 |-- Pclass: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Embarked: string (nullable = false)
 |-- AgeC: double (nullable = false)
 |-- Prefix: string (nullable = true)



In [12]:
# impute age for null ages
train_imputedAges = train_prefix_changed.filter(train_prefix_changed.AgeC == -0.5)\
                                        .join(avePrefix, "Prefix")\
                                        .drop('AgeC')\
                                        .withColumnRenamed('AgeImpute','AgeC')
test_imputedAges  = test_prefix_changed.filter(test_prefix_changed.AgeC == -0.5)\
                                        .join(avePrefix, "Prefix")\
                                        .drop('AgeC')\
                                        .withColumnRenamed('AgeImpute','AgeC')

train_imputedAges.printSchema()
test_imputedAges.printSchema()

root
 |-- Prefix: string (nullable = true)
 |-- PassengerId: integer (nullable = true)
 |-- Survived: string (nullable = true)
 |-- Pclass: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Embarked: string (nullable = false)
 |-- AgeC: double (nullable = true)

root
 |-- Prefix: string (nullable = true)
 |-- PassengerId: integer (nullable = true)
 |-- Pclass: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Embarked: string (nullable = false)
 |-- AgeC: double (nullable = true)



In [13]:
# union all: imputedAges and nonImputedAges
train_arrange = train_imputedAges.select('PassengerId', 'Prefix', 'Name', 'AgeC', \
                                        'Sex', 'SibSp', 'Parch', 'Fare', \
                                        'Embarked', 'Pclass', 'Survived')
train_non_null_age = train_prefix_changed.filter(train_prefix_changed.AgeC!=-0.5).select('PassengerId', 'Prefix', 'Name', 'AgeC', \
                                        'Sex', 'SibSp', 'Parch', 'Fare', \
                                        'Embarked', 'Pclass', 'Survived')

test_arrange = test_imputedAges.select('PassengerId', 'Prefix', 'Name', 'AgeC', \
                                        'Sex', 'SibSp', 'Parch', 'Fare', \
                                        'Embarked', 'Pclass')
test_non_null_age = test_prefix_changed.filter(test_prefix_changed.AgeC!=-0.5).select('PassengerId', 'Prefix', 'Name', 'AgeC', \
                                        'Sex', 'SibSp', 'Parch', 'Fare', \
                                        'Embarked', 'Pclass')

train_df1 = train_arrange.union(train_non_null_age).orderBy('PassengerId')
test_df1  = test_arrange.union(test_non_null_age).orderBy('PassengerId')

train_df1.show(5)
test_df1.show(5)

+-----------+------+--------------------+----+------+-----+-----+-------+--------+------+--------+
|PassengerId|Prefix|                Name|AgeC|   Sex|SibSp|Parch|   Fare|Embarked|Pclass|Survived|
+-----------+------+--------------------+----+------+-----+-----+-------+--------+------+--------+
|          1|    Mr|Braund, Mr. Owen ...|22.0|  male|    1|    0|   7.25|       S|     3|       0|
|          2|   Mrs|Cumings, Mrs. Joh...|38.0|female|    1|    0|71.2833|       C|     1|       1|
|          3|  Miss|Heikkinen, Miss. ...|26.0|female|    0|    0|  7.925|       S|     3|       1|
|          4|   Mrs|Futrelle, Mrs. Ja...|35.0|female|    1|    0|   53.1|       S|     1|       1|
|          5|    Mr|Allen, Mr. Willia...|35.0|  male|    0|    0|   8.05|       S|     3|       0|
+-----------+------+--------------------+----+------+-----+-----+-------+--------+------+--------+
only showing top 5 rows

+-----------+------+--------------------+----+------+-----+-----+-------+--------+--

#### One Hot Encoder for all categorical columns

In [14]:
def OHE(df, catcol):
    strIndexer = StringIndexer(inputCol=catcol, outputCol=catcol+'Index')
    model = strIndexer.fit(df)
    indexed = model.transform(df)
    
    encoder = OneHotEncoder(inputCol=catcol+'Index', outputCol=catcol+'Vec')
    encoded = encoder.transform(indexed)
    return encoded

In [15]:
categorical      = ['Prefix', 'Sex', 'Embarked', 'Pclass']
train_ohe_prefix = OHE(train_df1, 'Prefix')
train_ohe_prefix.show(5)

+-----------+------+--------------------+----+------+-----+-----+-------+--------+------+--------+-----------+-------------+
|PassengerId|Prefix|                Name|AgeC|   Sex|SibSp|Parch|   Fare|Embarked|Pclass|Survived|PrefixIndex|    PrefixVec|
+-----------+------+--------------------+----+------+-----+-----+-------+--------+------+--------+-----------+-------------+
|          1|    Mr|Braund, Mr. Owen ...|22.0|  male|    1|    0|   7.25|       S|     3|       0|        0.0|(5,[0],[1.0])|
|          2|   Mrs|Cumings, Mrs. Joh...|38.0|female|    1|    0|71.2833|       C|     1|       1|        2.0|(5,[2],[1.0])|
|          3|  Miss|Heikkinen, Miss. ...|26.0|female|    0|    0|  7.925|       S|     3|       1|        1.0|(5,[1],[1.0])|
|          4|   Mrs|Futrelle, Mrs. Ja...|35.0|female|    1|    0|   53.1|       S|     1|       1|        2.0|(5,[2],[1.0])|
|          5|    Mr|Allen, Mr. Willia...|35.0|  male|    0|    0|   8.05|       S|     3|       0|        0.0|(5,[0],[1.0])|


In [16]:
train_ohe_sex      = OHE(train_ohe_prefix, 'Sex')
train_ohe_embarked = OHE(train_ohe_sex, 'Embarked')
train_ohe_pclass   = OHE(train_ohe_embarked, 'Pclass')

In [17]:
train_ohe_pclass.show(5)

+-----------+------+--------------------+----+------+-----+-----+-------+--------+------+--------+-----------+-------------+--------+-------------+-------------+-------------+-----------+-------------+
|PassengerId|Prefix|                Name|AgeC|   Sex|SibSp|Parch|   Fare|Embarked|Pclass|Survived|PrefixIndex|    PrefixVec|SexIndex|       SexVec|EmbarkedIndex|  EmbarkedVec|PclassIndex|    PclassVec|
+-----------+------+--------------------+----+------+-----+-----+-------+--------+------+--------+-----------+-------------+--------+-------------+-------------+-------------+-----------+-------------+
|          1|    Mr|Braund, Mr. Owen ...|22.0|  male|    1|    0|   7.25|       S|     3|       0|        0.0|(5,[0],[1.0])|     0.0|(1,[0],[1.0])|          0.0|(2,[0],[1.0])|        0.0|(2,[0],[1.0])|
|          2|   Mrs|Cumings, Mrs. Joh...|38.0|female|    1|    0|71.2833|       C|     1|       1|        2.0|(5,[2],[1.0])|     1.0|    (1,[],[])|          1.0|(2,[1],[1.0])|        1.0|(2,[1

In [18]:
train_ohe_pclass.columns

['PassengerId',
 'Prefix',
 'Name',
 'AgeC',
 'Sex',
 'SibSp',
 'Parch',
 'Fare',
 'Embarked',
 'Pclass',
 'Survived',
 'PrefixIndex',
 'PrefixVec',
 'SexIndex',
 'SexVec',
 'EmbarkedIndex',
 'EmbarkedVec',
 'PclassIndex',
 'PclassVec']

In [19]:
# Create a StringIndexer for the target variable
labelIndexer  = StringIndexer(inputCol='Survived', outputCol='SurvivedNum')
labelIndmodel = labelIndexer.fit(train_ohe_pclass)
labelIndexed  = labelIndmodel.transform(train_ohe_pclass)

In [20]:
# create an Assembler
numeric   = ['AgeC', 'SibSp', 'Parch', 'Fare']
asseInp   = [c + 'Vec' for c in categorical] + numeric
assembler = VectorAssembler(inputCols=asseInp, outputCol='features')
assembtra = assembler.transform(labelIndexed)

In [21]:
assembtra.show()

+-----------+------+--------------------+------------------+------+-----+-----+-------+--------+------+--------+-----------+-------------+--------+-------------+-------------+-------------+-----------+-------------+-----------+--------------------+
|PassengerId|Prefix|                Name|              AgeC|   Sex|SibSp|Parch|   Fare|Embarked|Pclass|Survived|PrefixIndex|    PrefixVec|SexIndex|       SexVec|EmbarkedIndex|  EmbarkedVec|PclassIndex|    PclassVec|SurvivedNum|            features|
+-----------+------+--------------------+------------------+------+-----+-----+-------+--------+------+--------+-----------+-------------+--------+-------------+-------------+-------------+-----------+-------------+-----------+--------------------+
|          1|    Mr|Braund, Mr. Owen ...|              22.0|  male|    1|    0|   7.25|       S|     3|       0|        0.0|(5,[0],[1.0])|     0.0|(1,[0],[1.0])|          0.0|(2,[0],[1.0])|        0.0|(2,[0],[1.0])|        0.0|(14,[0,5,6,8,10,1...|
|   

In [22]:
# select columns to use
selected = ['PassengerId', 'SurvivedNum', 'features']
dataset  = assembtra.select(selected)
dataset.count()

891

In [23]:
# RandomSplit for training and Validation
training, validation = dataset.randomSplit([0.7, 0.3], seed=100)
print training.count()
print validation.count()

610
281


In [24]:
training.show(5)

+-----------+-----------+--------------------+
|PassengerId|SurvivedNum|            features|
+-----------+-----------+--------------------+
|          1|        0.0|(14,[0,5,6,8,10,1...|
|          2|        1.0|(14,[2,7,9,10,11,...|
|          3|        1.0|(14,[1,6,8,10,13]...|
|          4|        1.0|(14,[2,6,9,10,11,...|
|          6|        0.0|(14,[0,5,8,10,13]...|
+-----------+-----------+--------------------+
only showing top 5 rows



In [25]:
validation.show(5)

+-----------+-----------+--------------------+
|PassengerId|SurvivedNum|            features|
+-----------+-----------+--------------------+
|          5|        0.0|(14,[0,5,6,8,10,1...|
|         11|        1.0|(14,[1,6,8,10,11,...|
|         13|        0.0|(14,[0,5,6,8,10,1...|
|         15|        0.0|(14,[1,6,8,10,13]...|
|         16|        1.0|(14,[2,6,10,13],[...|
+-----------+-----------+--------------------+
only showing top 5 rows



### Logistic Regression

In [26]:
lr      = LogisticRegression(labelCol='SurvivedNum', featuresCol='features', maxIter=10)
lrmodel = lr.fit(training)
print "Coefficients : " + str(lrmodel.coefficients)
print "Intercept : " + str(lrmodel.intercept)

Coefficients : [-0.998675305167,1.57041700576,2.36220663072,3.46972644055,-0.947308320335,-0.521249601961,0.408436694135,0.783011031359,-0.969890573489,0.874650131679,-0.0106414947916,-0.658283083915,-0.433255618467,0.00429524460515]
Intercept : -0.0387130998414


In [27]:
# make predictions for validation set
val_pred = lrmodel.transform(validation)
val_pred.printSchema()
val_pred.show(5)

root
 |-- PassengerId: integer (nullable = true)
 |-- SurvivedNum: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = true)

+-----------+-----------+--------------------+--------------------+--------------------+----------+
|PassengerId|SurvivedNum|            features|       rawPrediction|         probability|prediction|
+-----------+-----------+--------------------+--------------------+--------------------+----------+
|          5|        0.0|(14,[0,5,6,8,10,1...|[2.45796748495796...|[0.92114214869829...|       0.0|
|         11|        1.0|(14,[1,6,8,10,11,...|[0.09212407007966...|[0.52301474294346...|       0.0|
|         13|        0.0|(14,[0,5,6,8,10,1...|[2.29834506308375...|[0.90873988505028...|       0.0|
|         15|        0.0|(14,[1,6,8,10,13]...|[-0.8550048096577...|[0.29838403947695...|       1.0|
|         16|        1.0|(14,[2,6,10,

In [28]:
# evaluate
evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='SurvivedNum')

In [29]:
evaluator.evaluate(val_pred)

0.7875369822485206

In [30]:
# accuracy
val_pred_diff = val_pred.withColumn('Diff', (col('SurvivedNum') - col('prediction'))**2)
val_pred_diff.show(5)

+-----------+-----------+--------------------+--------------------+--------------------+----------+----+
|PassengerId|SurvivedNum|            features|       rawPrediction|         probability|prediction|Diff|
+-----------+-----------+--------------------+--------------------+--------------------+----------+----+
|          5|        0.0|(14,[0,5,6,8,10,1...|[2.45796748495796...|[0.92114214869829...|       0.0| 0.0|
|         11|        1.0|(14,[1,6,8,10,11,...|[0.09212407007966...|[0.52301474294346...|       0.0| 1.0|
|         13|        0.0|(14,[0,5,6,8,10,1...|[2.29834506308375...|[0.90873988505028...|       0.0| 0.0|
|         15|        0.0|(14,[1,6,8,10,13]...|[-0.8550048096577...|[0.29838403947695...|       1.0| 1.0|
|         16|        1.0|(14,[2,6,10,13],[...|[-2.2153719251562...|[0.09837855317847...|       1.0| 0.0|
+-----------+-----------+--------------------+--------------------+--------------------+----------+----+
only showing top 5 rows



In [44]:
accuracy = 1- val_pred_diff.select('Diff').groupBy().sum().rdd.map(lambda x: x[0]).collect()[0]/281
accuracy

0.8078291814946619

### ParamGridBuilder, CrossValidator

In [32]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5])
             .addGrid(lr.elasticNetParam, [0.0, 0.5])
             .addGrid(lr.maxIter, [1, 5])
             .build())

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(training)
# this will likely take a fair amount of time because of the amount of models that we're creating and testing

# Use test set to measure the accuracy of our model on new data
predictions = cvModel.transform(validation)

# cvModel uses the best model found from the Cross Validation
# Evaluate best model
evaluator.evaluate(predictions) 



+-----------+-----------+--------------------+--------------------+--------------------+----------+----+
|PassengerId|SurvivedNum|            features|       rawPrediction|         probability|prediction|Diff|
+-----------+-----------+--------------------+--------------------+--------------------+----------+----+
|          5|        0.0|(14,[0,5,6,8,10,1...|[2.22505960051157...|[0.90247740864142...|       0.0| 0.0|
|         11|        1.0|(14,[1,6,8,10,11,...|[-0.0609064977009...|[0.48477808088830...|       1.0| 0.0|
|         13|        0.0|(14,[0,5,6,8,10,1...|[2.28881957979900...|[0.90794683878190...|       0.0| 0.0|
|         15|        0.0|(14,[1,6,8,10,13]...|[-0.4941598988515...|[0.37891409258227...|       1.0| 1.0|
|         16|        1.0|(14,[2,6,10,13],[...|[-1.6328337468435...|[0.16344253602646...|       1.0| 0.0|
+-----------+-----------+--------------------+--------------------+--------------------+----------+----+
only showing top 5 rows



0.19217081850533807

In [45]:
# accuracy
best_pred_diff = predictions.withColumn('Diff', (col('SurvivedNum') - col('prediction'))**2)
best_pred_diff.show(5)
accuracy_best = 1 - best_pred_diff.select('Diff').groupBy().sum().rdd.map(lambda x: x[0]).collect()[0]/281
accuracy_best

+-----------+-----------+--------------------+--------------------+--------------------+----------+----+
|PassengerId|SurvivedNum|            features|       rawPrediction|         probability|prediction|Diff|
+-----------+-----------+--------------------+--------------------+--------------------+----------+----+
|          5|        0.0|(14,[0,5,6,8,10,1...|[2.22505960051157...|[0.90247740864142...|       0.0| 0.0|
|         11|        1.0|(14,[1,6,8,10,11,...|[-0.0609064977009...|[0.48477808088830...|       1.0| 0.0|
|         13|        0.0|(14,[0,5,6,8,10,1...|[2.28881957979900...|[0.90794683878190...|       0.0| 0.0|
|         15|        0.0|(14,[1,6,8,10,13]...|[-0.4941598988515...|[0.37891409258227...|       1.0| 1.0|
|         16|        1.0|(14,[2,6,10,13],[...|[-1.6328337468435...|[0.16344253602646...|       1.0| 0.0|
+-----------+-----------+--------------------+--------------------+--------------------+----------+----+
only showing top 5 rows



0.8078291814946619

### Decision Tree Classifier

In [34]:
dtc = DecisionTreeClassifier(labelCol='SurvivedNum', featuresCol='features', maxDepth=2)

In [35]:
dtcmodel = dtc.fit(training)
dtc_pred = dtcmodel.transform(validation)
dtc_pred.show()

+-----------+-----------+--------------------+-------------+--------------------+----------+
|PassengerId|SurvivedNum|            features|rawPrediction|         probability|prediction|
+-----------+-----------+--------------------+-------------+--------------------+----------+
|          5|        0.0|(14,[0,5,6,8,10,1...| [257.0,30.0]|[0.89547038327526...|       0.0|
|         11|        1.0|(14,[1,6,8,10,11,...|  [59.0,52.0]|[0.53153153153153...|       0.0|
|         13|        0.0|(14,[0,5,6,8,10,1...| [257.0,30.0]|[0.89547038327526...|       0.0|
|         15|        0.0|(14,[1,6,8,10,13]...|  [59.0,52.0]|[0.53153153153153...|       0.0|
|         16|        1.0|(14,[2,6,10,13],[...| [14.0,124.0]|[0.10144927536231...|       1.0|
|         18|        1.0|(14,[0,5,6,10,13]...| [257.0,30.0]|[0.89547038327526...|       0.0|
|         26|        1.0|(14,[2,6,8,10,11,...|  [59.0,52.0]|[0.53153153153153...|       0.0|
|         28|        0.0|(14,[0,5,6,9,10,1...|  [50.0,24.0]|[0.6756756

In [46]:
dtc_diff = dtc_pred.withColumn('Diff', (col('SurvivedNum')-col('prediction'))**2)
dtc_accuracy = 1 - dtc_diff.select('Diff').groupBy().sum().rdd.map(lambda x: x[0]).collect()[0]/281
dtc_accuracy

0.7580071174377224

### Random Forest Classifier

In [47]:
rfc = RandomForestClassifier(numTrees=3, maxDepth=2, labelCol='SurvivedNum', featuresCol='features', seed=42)
rfc_model = rfc.fit(training)
rfc_pred  = rfc_model.transform(validation)
rfc_diff  = rfc_pred.withColumn('Diff', (col('SurvivedNum')-col('prediction'))**2)
rfc_accuracy = 1 - rfc_diff.select('Diff').groupBy().sum().rdd.map(lambda x: x[0]).collect()[0]/281
rfc_accuracy

0.7651245551601423

In [48]:
# print 'Accuracy:'
print 'Logistic Regression: ' + `accuracy`
print 'Best LR: ' + `accuracy_best`
print 'Random Forest: ' + `rfc_accuracy`
print 'Decision Trees: ' + `dtc_accuracy`

Logistic Regression: 0.8078291814946619
Best LR: 0.8078291814946619
Random Forest: 0.7651245551601423
Decision Trees: 0.7580071174377224
