##Titanic Dataset
### In which we explore Disasters, Trees, Classification & the Kaggle Competition

1. Visit Kaggle and download Data from http://www.kaggle.com/c/titanic-gettingStarted
2. Read Titanic Data
2. Transform and select features
3. Create a simple model & Predict
4. Submit to Kaggle & checkout the leaderboard
5. Decision Tree Model, Predict & Submit
6. Random Forest Model, Predict & Submit
7. Discussion

In [None]:
import datetime
from pytz import timezone
print "Last run @%s" % (datetime.datetime.now(timezone('US/Pacific')))
#
from pyspark.context import SparkContext
print "Running Spark Version %s" % (sc.version)
#
from pyspark.conf import SparkConf
conf = SparkConf()
print conf.toDebugString()

#Read Titanic Data
### The Data is part of the Kaggle Competition "Titanic: Machine Learning from Disaster"
### Download data from http://www.kaggle.com/c/titanic-gettingStarted

In [None]:
# Read Train & Test Datasets
train = sqlContext.read.format('com.databricks.spark.csv').options(header='true').load('titanic-r/train.csv')
test = sqlContext.read.format('com.databricks.spark.csv').options(header='true').load('titanic-r/test.csv')

In [None]:
train.dtypes

In [None]:
train.describe().show()

In [None]:
train.show(2)

In [None]:
import pyspark.sql.functions as F
train_1 = train.select(train['PassengerId'], 
                 train['Survived'].cast("integer").alias("Survived"),
                 train['Pclass'].cast("integer").alias("Pclass"),
                 F.when(train['Sex'] == 'female', 1).otherwise(0).alias("Gender"), 
                 train['Age'].cast("integer").alias("Age"),
                 train['SibSp'].cast("integer").alias("SibSp"),
                 train['Parch'].cast("integer").alias("Parch"),
                 train['Fare'].cast("float").alias("Fare"))

In [None]:
train.count()

In [None]:
train_1.count()

In [None]:
train_1.show(2)

In [None]:
train_1.describe().show()

In [None]:
# Replace null age by 30
# Do we have nulls ?
train_1.filter(train_1['Age'].isNull()).show(40)

In [None]:
# Replace null age by 30
train_1.na.fill(30,'Age').show(40)

In [None]:
# Replace null age by 30
train_2 = train_1.na.fill(30,'Age')

In [None]:
train_2.crosstab("Gender","Survived").show()

In [None]:
print "F = %3.2f%% M = %3.2f%%" % ( (100*233.0/(233+81)), (100*109.0/(109+468)) )

### Dick, The butcher to Jack Cade
### Dick: The first thing we do, let's kill all the men.
### Cade: Nay, that I mean to do.
#### Ref : http://www.enotes.com/shakespeare-quotes/lets-kill-all-lawyers

In [None]:
#
# 1 : Simple Model (M=Survived) 
#
test.show(2)

In [None]:
out = test.select(test['PassengerId'], 
                 F.when(test['Sex'] == 'female', 1).otherwise(0).alias("Survived"))

In [None]:
out.show(2)

In [None]:
out.coalesce(1).write.mode('overwrite').format('com.databricks.spark.csv')\
.options(header='true').save('titanic-r/spark-sub-01.csv')

In [None]:
# Submit
# Rank : 2586 Score : 0.76555

In [None]:
#
# Would age be a better predictor ?
#
train_1.na.drop().crosstab("Age","Survived").show()

In [None]:
#
# *** Home work : See if Pclass, SibSp or Parch is a better indication and change survival accordingly¶
#

In [None]:
from pyspark.mllib.regression import LabeledPoint
def parse_passenger_list(r):
    return LabeledPoint(r[1],[r[2],r[3],r[4],r[5],r[6],r[7]])

In [None]:
train_rdd = train_2.map(lambda x: parse_passenger_list(x))

In [None]:
train_rdd.count()

In [None]:
train_rdd.first()

In [None]:
from pyspark.mllib.tree import DecisionTree
model = DecisionTree.trainClassifier(train_rdd, numClasses=2,categoricalFeaturesInfo={})

In [None]:
print(model)
# print(model.toDebugString())

In [None]:
# Transform test and predict
import pyspark.sql.functions as F
test_1 = test.select(test['PassengerId'], 
                 test['Pclass'].cast("integer").alias("Pclass"),
                 F.when(test['Sex'] == 'female', 1).otherwise(0).alias("Gender"), 
                 test['Age'].cast("integer").alias("Age"),
                 test['SibSp'].cast("integer").alias("SibSp"),
                 test['Parch'].cast("integer").alias("Parch"),
                 test['Fare'].cast("float").alias("Fare"))

In [None]:
test_1.show(2)

In [None]:
# Do we have nulls ?
test_1.filter(test_1['Age'].isNull()).show(40)

In [None]:
test_1.groupBy().avg('Age').show()

In [None]:
# Replace null age by 30.24 - the mean
test_2 = test_1.na.fill(30,'Age')

In [None]:
# parse test data for predictions
from pyspark.mllib.regression import LabeledPoint
def parse_test(r):
    return (r[1],r[2],r[3],r[4],r[5],r[6])

In [None]:
test_rdd = test_2.map(lambda x: parse_test(x))

In [None]:
test_rdd.count()

In [None]:
predictions = model.predict(test_rdd)

In [None]:
predictions.first()

In [None]:
out_rdd = test_2.map(lambda x: x[0]).zip(predictions)

In [None]:
out_rdd.first()

In [None]:
out_df = out_rdd.toDF(['PassengerId','Survived'])

In [None]:
out_df.show(2)

In [None]:
out_1 = out_df.select(out_df['PassengerId'],
                      out_df['Survived'].cast('integer').alias('Survived'))

In [None]:
out_1.show(2)

In [None]:
out_1.coalesce(1).write.mode('overwrite').format('com.databricks.spark.csv')\
.options(header='true').save('titanic-r/spark-sub-02.csv')

In [None]:
# Submit
# Rank : 2038 +549 Score : 0.77512

In [None]:
from pyspark.mllib.tree import RandomForest
model_rf = RandomForest.trainClassifier(train_rdd, numClasses=2,categoricalFeaturesInfo={},numTrees=42)

In [None]:
print(model_rf)
#print(model_rf.toDebugString())

In [None]:
pred_rf = model_rf.predict(test_rdd).coalesce(1)

In [None]:
pred_rf.first()

In [None]:
out_rf = test_2.map(lambda x: x[0]).coalesce(1).zip(pred_rf)

In [None]:
out_rf.first()

In [None]:
out_df_rf = out_rf.toDF(['PassengerId','Survived'])

In [None]:
out_2 = out_df_rf.select(out_df_rf['PassengerId'],
                      out_df_rf['Survived'].cast('integer').alias('Survived'))

In [None]:
out_2.coalesce(1).write.mode('overwrite').format('com.databricks.spark.csv')\
.options(header='true').save('titanic-r/spark-sub-03.csv')

In [None]:
# Submit
# Rank : 1550 +488 Score : 0.78469

In [None]:
# Looks like we are on a roll ! Let us try SVM !

In [None]:
from pyspark.mllib.classification import SVMWithSGD
model_svm = SVMWithSGD.train(train_rdd, iterations=100)

In [None]:
pred_svm = model_svm.predict(test_rdd).coalesce(1)
out_svm = test_2.map(lambda x: x[0]).coalesce(1).zip(pred_svm)
out_df_svm = out_svm.toDF(['PassengerId','Survived'])

In [None]:
out_3 = out_df_svm.select(out_df_svm['PassengerId'],
                      out_df_svm['Survived'].cast('integer').alias('Survived'))

In [None]:
out_3.coalesce(1).write.mode('overwrite').format('com.databricks.spark.csv')\
.options(header='true').save('titanic-r/spark-sub-04.csv')

In [None]:
# Not good. Only 0.39713 !

#### Did Random Forest or SVM do Better ? 
#### Why ? Why Not ?

#### Data Science Folk Wisdom
http://www.slideshare.net/ksankar/data-science-folk-knowledge