# Titanic : Machine Learning From Disaster | Kaggle
*Resources:
https://www.kaggle.com/c/titanic
*Data:
https://www.kaggle.com/c/titanic/data

# Objective
* Use Machine Learning to analyse the probability of the death of a person based on his/her gender, social-class, age and other factors

In [3]:
%sh
wget "https://dl.dropboxusercontent.com/s/ijmml65yga12urf/train.csv?dl=0"  -O titanic_train.csv

In [4]:
# read in file using csv format
train_df = spark.read.load('file:/databricks/driver/titanic_train.csv',
                    format='com.databricks.spark.csv', 
                    header='true',
                    inferSchema='true')
# show 20 rows
display(train_df)

In [5]:
train_df.registerTempTable("Titanic_train")

In [6]:
%sql
select * from Titanic_train

In [7]:
%sql
select * from Titanic_train

In [8]:
%sql
SELECT Survived,Sex,
sum(CASE WHEN Age>= 0 AND Age < 20 THEN 1 END) AS  A0to20,
sum(CASE WHEN Age>= 20 AND Age < 40 THEN 1 END) AS A20to40,
sum(CASE WHEN Age>= 40 AND Age < 60 THEN 1 END) AS A40to60,
sum(CASE WHEN Age>= 60 AND Age < 80 THEN 1 END) AS A60to80,
sum(CASE WHEN Age>= 80 THEN 1 END) AS A80
FROM Titanic_train as Agegroups
group by Survived, Sex


In [9]:
 from pyspark.ml import Pipeline
 from pyspark.ml.classification import LogisticRegression
 from pyspark.ml.feature import HashingTF, Tokenizer
 from pyspark.sql import Row
 from pyspark.sql.functions import UserDefinedFunction
 from pyspark.sql.types import *

In [10]:
# Convert results for working with MLlib input, which requires labels as a float
def labelForResults(s):
     if s == 0:
         return 0.0
     else:
         return 1.0
label = UserDefinedFunction(labelForResults, DoubleType())
labeledData = train_df.select(label(train_df.Survived).alias('label'), train_df.Sex)
labeledData.take(1)

# Data Transformation

In [12]:
# Remove nulls (could change to empty string)
labeledData = labeledData.filter(labeledData.label.isNotNull()).cache()
train, test = labeledData.randomSplit([0.9, 0.1], seed=12345)

# Configuration of Pipeline

In [14]:
# Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="Sex", outputCol="w")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

# Fit the pipeline to training documents.
model = pipeline.fit(train)

# Making predictions on test data

In [16]:
# Make predictions on test data.
predictionsDf = model.transform(test)
predictionsDf.registerTempTable('Predictions')
display(predictionsDf)

# Checking model accuracy

In [18]:
numSuccesses = predictionsDf.where("(label = 0 AND prediction = 0) OR  (label = 1 AND prediction = 1)").count()
numInspections = predictionsDf.count()

print "There were", numInspections, "inspections and there were", numSuccesses, "successful predictions"
print "This is a", str((float(numSuccesses) / float(numInspections)) * 100) + "%", "success rate"

# Visualization

In [20]:
truePositive = int(predictionsDf.where("(label = 1 AND prediction = 1)").count())
trueNegative = int(predictionsDf.where("(label = 0 AND prediction = 0)").count())
falsePositive = int(predictionsDf.where("(label = 0 AND prediction = 1)").count())
falseNegative = int(predictionsDf.where("(label = 1 AND prediction = 0)").count())

print [['TP', truePositive], ['TN', trueNegative], ['FP', falsePositive], ['FN', falseNegative]]
resultDF = sqlContext.createDataFrame([['TP', truePositive], ['TN', trueNegative], ['FP', falsePositive], ['FN', falseNegative]], ['metric', 'value'])
display(resultDF)