In [None]:
# Predict Survival of passengers on the Titanic
# Reference: https://ww2.amstat.org/publications/jse/v3n3/datasets.dawson.html

In [None]:
# Machine Learning Library of PySpark
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row, SQLContext

import os
import sys
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *

from pyspark.mllib.classification import LogisticRegressionWithSGD
from pyspark.mllib.regression import LabeledPoint
from numpy import array

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Library for confusion matrix, precision, test error
from pyspark.mllib.evaluation import MulticlassMetrics
# Library For Area under ROC curve and Area under precision-recall curve
from pyspark.mllib.evaluation import BinaryClassificationMetrics

# Assign resources to the application
sqlContext = SQLContext(sc)

In [None]:
# The data will be loaded into an array.
# This is the summary of the data structure, including the column position and name.
# The first filed starts from position 0. 

# 0 Name    -  Passenger first and last name.
# 1 PClass  -  Passenger class (1st, 2nd, or 3rd)
# 2 Age
# 3 Sex
# 4 Survived -  1 if the passenger survived;  0 if the passenger did not survive
# 5 PersonID

# Label is a target variable.  PersonInfo is a list of independent variables besides unique identifier

LabeledDocument = Row("PersonID", "PersonInfo", "label")

# Define a function that parses the raw CSV file and returns an object of type LabeledDocument

def parseDocument(line):
    values = [str(x) for x in line.split(',')] 
    if (values[4]>'0'):
      alive = 1.0
    else:
     alive = 0.0
        
    textValue = str(values[1]) + " " + str(values[2])+" " + str(values[3])
    return LabeledDocument(values[5], textValue, alive)


# Load the raw Titanic.csv file, parse it using the function above
# @hidden_cell
# This function is used to setup the access of Spark to your Object Storage. The definition contains your credentials.
# You might want to remove those credentials before you share your notebook.
def set_hadoop_config_with_credentials_11c12758b6344651a9a656f1ba0d558d(name):
    """This function sets the Hadoop configuration so it is possible to
    access data from Bluemix Object Storage using Spark"""

    prefix = 'fs.swift.service.' + name
    hconf = sc._jsc.hadoopConfiguration()
    hconf.set(prefix + '.auth.url', 'https://identity.open.softlayer.com'+'/v3/auth/tokens')
    hconf.set(prefix + '.auth.endpoint.prefix', 'endpoints')
    hconf.set(prefix + '.tenant', '79ab4cb5dbe64b6c924aec2badf3fe96')
    hconf.set(prefix + '.username', '934bcd78d58b4bc9bb0bde1b15aaa627')
    hconf.set(prefix + '.password', 'RYuZ_*tA-.hh4j9T')
    hconf.setInt(prefix + '.http.port', 8080)
    hconf.set(prefix + '.region', 'dallas')
    hconf.setBoolean(prefix + '.public', False)

# you can choose any name
name = 'keystone'
set_hadoop_config_with_credentials_11c12758b6344651a9a656f1ba0d558d(name)

data = sc.textFile("swift://Default." + name + "/Titanic.csv")
data.take(5)


In [None]:
#Load the data into a dataframe
documents = data.filter(lambda s: "Name" not in s).map(parseDocument)
training = documents.toDF() # ToDataFrame
training.take(5)

In [None]:
# set up Logistic Regression using Pipeline of SparkML
tokenizer = Tokenizer(inputCol="PersonInfo", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

In [None]:
# set up Logistic Regression Model
# the stages are executed in order
model = pipeline.fit(training)

In [None]:
# Verify the training document 
# to checkpoint your progress with the application
training.show()

In [None]:
# PersonInfo here is a combination of pclass followed by age and sex
# *x in Document(*x) is for header of Row("PersonID", "PersonInfo")
Document = Row("PersonID", "PersonInfo")
test = sc.parallelize([(757, "1st 20 female"),
              (758, "3rd 15 male"),
              (759, "2nd 16 female"),
              (760, "1st 22 male"),
              (761, "3rd 17 female"),
              (762, "1st 7 male")]).map(lambda x: Document(*x)).toDF()

In [None]:
# Make predictions on test documents and print columns of interest
prediction = model.transform(test)
selected = prediction.select("PersonInfo", "prediction", "probability")
for row in selected.collect():
    print row

In [None]:
# Evaluate the Logistic Regression model
# Select (prediction, true label) and compute test error
pred_lr=model.transform(training).select("prediction", "label")
eval_lr=MulticlassClassificationEvaluator (
    labelCol="label", predictionCol="prediction", metricName="precision")
accuracy_lr=eval_lr.evaluate(pred_lr)
# create RDD
predictionAndLabels_lr=pred_lr.rdd
metrics_lr=MulticlassMetrics(predictionAndLabels_lr)
precision_lr=metrics_lr.precision()
recall_lr=metrics_lr.recall()
f1Measure_lr = metrics_lr.fMeasure()
print("F1 Measure = %s" % f1Measure_lr)
print ("Test Accuracy = %s" %accuracy_lr)
print ("Test Error = %s" % (1-accuracy_lr))
print ("Precision = %s" %precision_lr)
print ("Recall = %s" %recall_lr)

In [None]:
#Print the confusion matrix
metrics_lr.confusionMatrix().toArray()

In [None]:
bin_lr=BinaryClassificationMetrics(predictionAndLabels_lr)

# Area under precision-recall curve
print("Area under PR = %s" % bin_lr.areaUnderPR)
# Area under precision-recall curve
print("Area under ROC = %s" % bin_lr.areaUnderROC)