Create a logistic regression and a random forest classifier to predict whether passengers would survive the demise of the Titantic

In [1]:
# intiate a sparkcontext to run operation
import pyspark
from pyspark import SparkContext
sc =SparkContext()

In [2]:
from pyspark.sql import Row
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)



In [3]:
#load dataset
df = sqlContext.read.option("inferSchema",True).option("header",True).csv("titanic_dataset.csv")

#print schema
df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [4]:
#see data 
df.show(5, truncate = True)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

Convert continous columns

In [5]:
# Import all from `sql.types`
from pyspark.sql.types import *

# Write a custom function to convert the data type of DataFrame columns
def convertColumn(df, names, newType):
    for name in names: 
        df = df.withColumn(name, df[name].cast(newType))
    return df 

# List of continuous features
CONTI_FEATURES  = ['Survived', 'Pclass','Age', 'Sibsp', 'Parch', 'Fare']

# Convert the type
df = convertColumn(df, CONTI_FEATURES, FloatType())

# Check the dataset
df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: float (nullable = true)
 |-- Pclass: float (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: float (nullable = true)
 |-- Sibsp: float (nullable = true)
 |-- Parch: float (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: float (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [7]:
#print shape
print((df.count(), len(df.columns)))

(891, 12)


### Data exploration

In [8]:
from pyspark.sql.functions import *

#check for null values
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|Name|Sex|Age|Sibsp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|          0|       0|     0|   0|  0|177|    0|    0|     0|   0|  687|       2|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+



In [9]:
#statistical describe for Age
df.describe('Age').show()

+-------+------------------+
|summary|               Age|
+-------+------------------+
|  count|               714|
|   mean| 29.69911764704046|
| stddev|14.526497332370992|
|    min|              0.42|
|    max|              80.0|
+-------+------------------+



In [10]:
#statistical describe for Pclass
df.describe('Pclass').show()

+-------+------------------+
|summary|            Pclass|
+-------+------------------+
|  count|               891|
|   mean| 2.308641975308642|
| stddev|0.8360712409770491|
|    min|               1.0|
|    max|               3.0|
+-------+------------------+



In [11]:
#statistical describe for Sibsp
df.describe('Sibsp').show()

+-------+------------------+
|summary|             Sibsp|
+-------+------------------+
|  count|               891|
|   mean|0.5230078563411896|
| stddev|1.1027434322934315|
|    min|               0.0|
|    max|               8.0|
+-------+------------------+



In [12]:
#statistical describe for Parch
df.describe('Parch').show()

+-------+-------------------+
|summary|              Parch|
+-------+-------------------+
|  count|                891|
|   mean|0.38159371492704824|
| stddev| 0.8060572211299488|
|    min|                0.0|
|    max|                6.0|
+-------+-------------------+



In [13]:
#statistical describe for Fare
df.describe('Fare').show()

+-------+-----------------+
|summary|             Fare|
+-------+-----------------+
|  count|              891|
|   mean|32.20420804114722|
| stddev|49.69342916316158|
|    min|              0.0|
|    max|         512.3292|
+-------+-----------------+



In [14]:
#count occurances of Sex
df.groupBy("Sex").count().sort("count",ascending=True).show()

+------+-----+
|   Sex|count|
+------+-----+
|female|  314|
|  male|  577|
+------+-----+



In [15]:
#count occurances of PClass
df.groupBy("Pclass").count().sort("count",ascending=True).show()

+------+-----+
|Pclass|count|
+------+-----+
|   2.0|  184|
|   1.0|  216|
|   3.0|  491|
+------+-----+



In [16]:
#count occurances of embarked
df.groupBy("Embarked").count().sort("count",ascending=True).show()

+--------+-----+
|Embarked|count|
+--------+-----+
|    null|    2|
|       Q|   77|
|       C|  168|
|       S|  644|
+--------+-----+



In [17]:
#count occurances of Parch
df.groupBy("Parch").count().sort("count",ascending=True).show()

+-----+-----+
|Parch|count|
+-----+-----+
|  6.0|    1|
|  4.0|    4|
|  5.0|    5|
|  3.0|    5|
|  2.0|   80|
|  1.0|  118|
|  0.0|  678|
+-----+-----+



In [18]:
#count occurances of Sibsp
df.groupBy("Sibsp").count().sort("count",ascending=True).show()

+-----+-----+
|Sibsp|count|
+-----+-----+
|  5.0|    5|
|  8.0|    7|
|  3.0|   16|
|  4.0|   18|
|  2.0|   28|
|  1.0|  209|
|  0.0|  608|
+-----+-----+



In [19]:
#count occurances of Survived
df.groupBy("Survived").count().sort("count",ascending=True).show()

+--------+-----+
|Survived|count|
+--------+-----+
|     1.0|  342|
|     0.0|  549|
+--------+-----+



In [20]:
#stats by group
df.groupby('Sex').agg({'Survived': 'mean'}).show()

+------+-------------------+
|   Sex|      avg(Survived)|
+------+-------------------+
|female| 0.7420382165605095|
|  male|0.18890814558058924|
+------+-------------------+



In [21]:
#Number of people above 40
print('Number of people above 30:',df.filter(df.Age > 30).count())
print('Number of people below 20:',df.filter(df.Age < 20).count())

Number of people above 30: 305
Number of people below 20: 164


In [22]:
#Number of people in 20s
y = 891-305-164
y

422

### Drop columns and replace null values

In [23]:
#dropping passenger id and cabin and name
cols = ("PassengerId", "Cabin", "Name")
new_df = df.drop(*cols)
new_df.printSchema()

root
 |-- Survived: float (nullable = true)
 |-- Pclass: float (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: float (nullable = true)
 |-- Sibsp: float (nullable = true)
 |-- Parch: float (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: float (nullable = true)
 |-- Embarked: string (nullable = true)



In [30]:
#check for null values
new_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in new_df.columns]).show()

+--------+------+---+---+-----+-----+------+----+--------+
|Survived|Pclass|Sex|Age|Sibsp|Parch|Ticket|Fare|Embarked|
+--------+------+---+---+-----+-----+------+----+--------+
|       0|     0|  0|177|    0|    0|     0|   0|       2|
+--------+------+---+---+-----+-----+------+----+--------+



In [32]:
#mean age is 29.69
#replace Age null with mean age

new_df = new_df.fillna(value = 29.69, subset=['Age'])

In [33]:
#check for null values
new_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in new_df.columns]).show()

+--------+------+---+---+-----+-----+------+----+--------+
|Survived|Pclass|Sex|Age|Sibsp|Parch|Ticket|Fare|Embarked|
+--------+------+---+---+-----+-----+------+----+--------+
|       0|     0|  0|  0|    0|    0|     0|   0|       2|
+--------+------+---+---+-----+-----+------+----+--------+



In [34]:
#drop 2 embarked
new_df = new_df.na.drop("any")

#check for null values
new_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in new_df.columns]).show()

+--------+------+---+---+-----+-----+------+----+--------+
|Survived|Pclass|Sex|Age|Sibsp|Parch|Ticket|Fare|Embarked|
+--------+------+---+---+-----+-----+------+----+--------+
|       0|     0|  0|  0|    0|    0|     0|   0|       0|
+--------+------+---+---+-----+-----+------+----+--------+



## Build Pipelines

In [35]:
new_df.printSchema()

root
 |-- Survived: float (nullable = true)
 |-- Pclass: float (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: float (nullable = false)
 |-- Sibsp: float (nullable = true)
 |-- Parch: float (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: float (nullable = true)
 |-- Embarked: string (nullable = true)



In [39]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline

Encode categorical data

In [45]:
CONTI_FEATURES  = ['Pclass','Age', 'Sibsp', 'Parch', 'Fare']


CATE_FEATURES = ['Sex', 'Ticket', 'Embarked']
stages = [] # stages in our Pipeline
for categoricalCol in CATE_FEATURES:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()],
                                     outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

In [46]:
# Convert label into label indices using the StringIndexer
label_stringIdx =  StringIndexer(inputCol="Survived", outputCol="Survivor")
stages += [label_stringIdx]

In [47]:
#add continous variables
assemblerInputs = [c + "classVec" for c in CATE_FEATURES] + CONTI_FEATURES

In [48]:
#assemble steps
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [49]:
#push the steps into a pipeline
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(new_df)
model = pipelineModel.transform(new_df)

In [50]:
model.take(1)

[Row(Survived=0.0, Pclass=3.0, Sex='male', Age=22.0, Sibsp=1.0, Parch=0.0, Ticket='A/5 21171', Fare=7.25, Embarked='S', SexIndex=0.0, SexclassVec=SparseVector(1, {0: 1.0}), TicketIndex=558.0, TicketclassVec=SparseVector(679, {558: 1.0}), EmbarkedIndex=0.0, EmbarkedclassVec=SparseVector(2, {0: 1.0}), Survivor=0.0, features=SparseVector(687, {0: 1.0, 559: 1.0, 680: 1.0, 682: 3.0, 683: 22.0, 684: 1.0, 686: 7.25}))]

### Prep

In [77]:
#make computation faster
from pyspark.ml.linalg import DenseVector
input_data = model.rdd.map(lambda x: (x["Survivor"], DenseVector(x["features"])))

In [78]:
#create train data
df_train = sqlContext.createDataFrame(input_data, ["Survivor", "features"])
df_train.show(2)

+--------+--------------------+
|Survivor|            features|
+--------+--------------------+
|     0.0|[1.0,0.0,0.0,0.0,...|
|     1.0|[0.0,0.0,0.0,0.0,...|
+--------+--------------------+
only showing top 2 rows



In [79]:
# Split the data into train and test sets
train_data, test_data = df_train.randomSplit([.8,.2],seed=1234)

In [80]:
#how many survived in train data
train_data.groupby('Survivor').agg({'Survivor': 'count'}).show()

+--------+---------------+
|Survivor|count(Survivor)|
+--------+---------------+
|     0.0|            429|
|     1.0|            278|
+--------+---------------+



In [81]:
#how many survived in test data
test_data.groupby('Survivor').agg({'Survivor': 'count'}).show()

+--------+---------------+
|Survivor|count(Survivor)|
+--------+---------------+
|     0.0|            120|
|     1.0|             62|
+--------+---------------+



## Build Logistic regressor

In [82]:
# Import `LinearRegression`
from pyspark.ml.classification import LogisticRegression

# Initialize `lr`
lr = LogisticRegression(labelCol="Survivor",
                        featuresCol="features",
                        maxIter=10,
                        regParam=0.3)

# Fit the data to the model
linearModel = lr.fit(train_data)

In [83]:
# Print the coefficients and intercept for logistic regression
#print("Coefficients: " + str(linearModel.coefficients))
print("Intercept: " + str(linearModel.intercept))

Intercept: 0.6925964066702734


#### Train and evaluate the model

In [84]:
# Make predictions on test data using the transform() method.
predictions = linearModel.transform(test_data)

In [85]:
predictions.printSchema()

root
 |-- Survivor: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [86]:
#create dataframe of survivor, prediction and probability
selected = predictions.select("Survivor", "prediction", "probability")
selected.show(20)

+--------+----------+--------------------+
|Survivor|prediction|         probability|
+--------+----------+--------------------+
|     0.0|       0.0|[0.57526965729419...|
|     0.0|       1.0|[0.37649414402381...|
|     0.0|       0.0|[0.57021204667659...|
|     0.0|       0.0|[0.59180326907963...|
|     0.0|       0.0|[0.58205234522138...|
|     0.0|       0.0|[0.53410073567563...|
|     0.0|       0.0|[0.53437789306623...|
|     0.0|       0.0|[0.70868208982623...|
|     0.0|       0.0|[0.72214160016881...|
|     0.0|       0.0|[0.69971336307594...|
|     0.0|       0.0|[0.67350520555913...|
|     0.0|       0.0|[0.73298606039628...|
|     0.0|       0.0|[0.73701987370741...|
|     0.0|       0.0|[0.76842747879293...|
|     0.0|       0.0|[0.70783781321456...|
|     0.0|       0.0|[0.63346050359251...|
|     0.0|       0.0|[0.71989498250997...|
|     0.0|       0.0|[0.66148929499494...|
|     0.0|       0.0|[0.74829461058103...|
|     0.0|       0.0|[0.73982194726142...|
+--------+-

In [87]:
#create dataframe with labeled data
cm = predictions.select("Survivor", "prediction")

In [88]:
#check number survived
cm.groupby('Survivor').agg({'Survivor': 'count'}).show()

+--------+---------------+
|Survivor|count(Survivor)|
+--------+---------------+
|     0.0|            120|
|     1.0|             62|
+--------+---------------+



In [89]:
#group by prediction
cm.groupby('prediction').agg({'prediction': 'count'}).show()

+----------+-----------------+
|prediction|count(prediction)|
+----------+-----------------+
|       0.0|              135|
|       1.0|               47|
+----------+-----------------+



In [91]:
#define function to find accuracy
def accuracy_m(model): 
    predictions = model.transform(test_data)
    cm = predictions.select("Survivor", "prediction")
    acc = cm.filter(cm.Survivor == cm.prediction).count() / cm.count()
    print("Model accuracy: %.3f%%" % (acc * 100)) 

#find accuracy
accuracy_m(model = linearModel)

Model accuracy: 79.670%


In [104]:
### Use ROC 
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction",
                                         labelCol="Survivor")
print(evaluator.evaluate(predictions))
print(evaluator.getMetricName())

0.8687500000000002
areaUnderROC


### Tune Log Regression

In [93]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5])
             .build())


In [105]:
#evaluate model with the crodd valudation method with 5 folds

from time import *
start_time = time()

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(train_data)
# likely take a fair amount of time
end_time = time()
elapsed_time = end_time - start_time
print("Time to train model: %.3f seconds" % elapsed_time)

Time to train model: 78.312 seconds


In [106]:
#test ccuracy
accuracy_m(model = cvModel)

Model accuracy: 83.516%


Regularization param of 0.01 was best

In [107]:
#check best model
bestModel = cvModel.bestModel
bestModel.extractParamMap()

{Param(parent='LogisticRegression_a62b1da01a9e', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'): 2,
 Param(parent='LogisticRegression_a62b1da01a9e', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0,
 Param(parent='LogisticRegression_a62b1da01a9e', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial'): 'auto',
 Param(parent='LogisticRegression_a62b1da01a9e', name='featuresCol', doc='features column name.'): 'features',
 Param(parent='LogisticRegression_a62b1da01a9e', name='fitIntercept', doc='whether to fit an intercept term.'): True,
 Param(parent='LogisticRegression_a62b1da01a9e', name='labelCol', doc='label column name.'): 'Survivor',
 Param(parent='LogisticRegression_a62b1da01a9e', name='maxBlockSizeInMB', doc='maximum memory in MB fo

## Random Forest Classifier

In [95]:
from pyspark.ml.classification import RandomForestClassifier

#initialize rf
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'Survivor')
rfModel = rf.fit(train_data)

In [97]:
# Make predictions on test data using the transform() method.
predictions_rf = rfModel.transform(test_data)

In [98]:
predictions_rf.printSchema()

root
 |-- Survivor: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [99]:
#new dataframe looking at survivor, prediciton and probability
selected = predictions_rf.select("Survivor", "prediction", "probability")
selected.show(20)

+--------+----------+--------------------+
|Survivor|prediction|         probability|
+--------+----------+--------------------+
|     0.0|       0.0|[0.62602284325254...|
|     0.0|       1.0|[0.43656030430593...|
|     0.0|       0.0|[0.62602284325254...|
|     0.0|       0.0|[0.59488564597625...|
|     0.0|       0.0|[0.62602284325254...|
|     0.0|       0.0|[0.58428226069235...|
|     0.0|       0.0|[0.58428226069235...|
|     0.0|       0.0|[0.59488564597625...|
|     0.0|       0.0|[0.51428498632561...|
|     0.0|       0.0|[0.51428498632561...|
|     0.0|       0.0|[0.58256202365262...|
|     0.0|       0.0|[0.59188403686787...|
|     0.0|       0.0|[0.57881896178124...|
|     0.0|       0.0|[0.67150879350483...|
|     0.0|       0.0|[0.57072670284347...|
|     0.0|       0.0|[0.57712811602354...|
|     0.0|       0.0|[0.54745532541760...|
|     0.0|       0.0|[0.59068840812437...|
|     0.0|       0.0|[0.64405484842096...|
|     0.0|       0.0|[0.71227034913516...|
+--------+-

In [100]:
#create new dataframe
cm_rf = predictions_rf.select("Survivor", "prediction")

In [101]:
#group by survivor
cm_rf.groupby('Survivor').agg({'Survivor': 'count'}).show()

+--------+---------------+
|Survivor|count(Survivor)|
+--------+---------------+
|     0.0|            120|
|     1.0|             62|
+--------+---------------+



In [102]:
#groupby prediction
cm_rf.groupby('prediction').agg({'prediction': 'count'}).show()

+----------+-----------------+
|prediction|count(prediction)|
+----------+-----------------+
|       0.0|              145|
|       1.0|               37|
+----------+-----------------+



In [103]:
#Check Accuracy
accuracy_m(model=rfModel)

Model accuracy: 75.275%


In [108]:
### Use ROC 
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator_rf = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction",
                                         labelCol="Survivor")
print(evaluator_rf.evaluate(predictions_rf))
print(evaluator_rf.getMetricName())

0.8442876344086022
areaUnderROC


### Tune Forest Classifier

In [110]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid_rf = (ParamGridBuilder()
             .addGrid(rf.numTrees, [10,20,50])
             .addGrid(rf.maxDepth, [5,10,15])
             .addGrid(rf.maxBins, [5,10,15])
             .build())


In [112]:
# Create 5-fold CrossValidator
rfcv = CrossValidator(estimator = rf,
                      estimatorParamMaps = paramGrid_rf,
                      evaluator = evaluator_rf,
                      numFolds = 5)

In [113]:
# Run cross validations
rfcvModel = rfcv.fit(train_data)
# likely take a fair amount of time
end_time = time()
elapsed_time = end_time - start_time
print("Time to train model: %.3f seconds" % elapsed_time)

Time to train model: 839.147 seconds


In [114]:
#test Accuracy
accuracy_m(model = rfcvModel)

Model accuracy: 81.868%


In [116]:
# Use test set here so we can measure the accuracy of our model on new data
rfpredictions = rfcvModel.transform(test_data)

# cvModel uses the best model found from the Cross Validation
# Evaluate best model
print('RMSE:', evaluator_rf.evaluate(rfpredictions))

RMSE: 0.865725806451613


Best params were 50 for number of trees, 15 for maxdepth and 15 for maxbins

In [117]:
#Check best params
rfbestModel = rfcvModel.bestModel
rfbestModel.extractParamMap()

{Param(parent='RandomForestClassifier_ebb8eeabdbff', name='bootstrap', doc='Whether bootstrap samples are used when building trees.'): True,
 Param(parent='RandomForestClassifier_ebb8eeabdbff', name='cacheNodeIds', doc='If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval.'): False,
 Param(parent='RandomForestClassifier_ebb8eeabdbff', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext.'): 10,
 Param(parent='RandomForestClassifier_ebb8eeabdbff', name='featureSubsetStrategy', doc="The number of features to consider for splits at each tree node. Supp