In [1]:
import findspark
findspark.init()

import pyspark  
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import LogisticRegressionWithSGD
from pyspark.mllib.tree import DecisionTree

In [2]:
sc = pyspark.SparkContext()

In [3]:
sc

<pyspark.context.SparkContext at 0x7f55b4715b00>

In [4]:
raw_rdd = sc.textFile("data/titanic.csv")
raw_rdd.count()

1317

In [5]:
raw_rdd.take(5)

['"","class","age","sex","survived"',
 '"1","1st class","adults","man","yes"',
 '"2","1st class","adults","man","yes"',
 '"3","1st class","adults","man","yes"',
 '"4","1st class","adults","man","yes"']

In [6]:
header = raw_rdd.first()
data_rdd = raw_rdd.filter(lambda line: line != header)

In [7]:
data_rdd.takeSample(False, 5, 0)

['"256","1st class","adults","women","yes"',
 '"25","1st class","adults","man","yes"',
 '"1064","3rd class","adults","man","no"',
 '"867","3rd class","adults","man","no"',
 '"392","2nd class","adults","man","no"']

We'll train two classifiers to predict survivors in the classic and very popular Titanic dataset using Apache Spark local mode in a notebook.

Let's create labeled points (feature vectors and ground truth) by turning passenger attributions into structured LabeledPoint objects.

In [8]:
def row_to_labeled_point(line):
    passenger_id, klass, age, sex, survived = [segs.strip('"') for segs in line.split(',')]
    klass = int(klass[0]) - 1
    
    if (age not in ['adults', 'child'] or 
        sex not in ['man', 'women'] or
        survived not in ['yes', 'no']):
        raise RuntimeError('unknown value')
    
    features = [
        klass,
        (1 if age == 'adults' else 0),
        (1 if sex == 'women' else 0)
    ]
    return LabeledPoint(1 if survived == 'yes' else 0, features)

In [9]:
labeled_points_rdd = data_rdd.map(row_to_labeled_point)

In [10]:
labeled_points_rdd.takeSample(False, 5, 0)

[LabeledPoint(1.0, [0.0,1.0,1.0]),
 LabeledPoint(1.0, [0.0,1.0,0.0]),
 LabeledPoint(0.0, [2.0,1.0,0.0]),
 LabeledPoint(0.0, [2.0,1.0,0.0]),
 LabeledPoint(0.0, [1.0,1.0,0.0])]

Let's split data to into a training and test set:

In [11]:
training_rdd, test_rdd = labeled_points_rdd.randomSplit([0.7, 0.3], seed = 0)

In [12]:
training_count = training_rdd.count()
test_count = test_rdd.count()
training_count, test_count

(922, 394)

So let's train a DecisionTree model with a boolean classifier (i.e., there are two outcomes - dead and alive):

In [13]:
model = DecisionTree.trainClassifier(training_rdd, 
                                     numClasses=2, 
                                     categoricalFeaturesInfo={
                                        0: 3,
                                        1: 2,
                                        2: 2
                                     })

In [14]:
predictions_rdd = model.predict(test_rdd.map(lambda x: x.features))

In [15]:
truth_and_predictions_rdd = test_rdd.map(lambda lp: lp.label).zip(predictions_rdd)

In [16]:
accuracy = truth_and_predictions_rdd.filter(lambda v_p: v_p[0] == v_p[1]).count() / float(test_count)
print('Accuracy =', accuracy)

Accuracy = 0.8020304568527918


In [17]:
print(model.toDebugString())

DecisionTreeModel classifier of depth 4 with 21 nodes
  If (feature 2 in {0.0})
   If (feature 1 in {0.0})
    If (feature 0 in {0.0,1.0})
     Predict: 1.0
    Else (feature 0 not in {0.0,1.0})
     Predict: 0.0
   Else (feature 1 not in {0.0})
    If (feature 0 in {1.0})
     Predict: 0.0
    Else (feature 0 not in {1.0})
     If (feature 0 in {0.0})
      Predict: 0.0
     Else (feature 0 not in {0.0})
      Predict: 0.0
  Else (feature 2 not in {0.0})
   If (feature 0 in {2.0})
    If (feature 1 in {0.0})
     Predict: 0.0
    Else (feature 1 not in {0.0})
     Predict: 0.0
   Else (feature 0 not in {2.0})
    If (feature 0 in {1.0})
     If (feature 1 in {0.0})
      Predict: 1.0
     Else (feature 1 not in {0.0})
      Predict: 1.0
    Else (feature 0 not in {1.0})
     If (feature 1 in {0.0})
      Predict: 1.0
     Else (feature 1 not in {0.0})
      Predict: 1.0



So we've computed the predicted accuracy (% predicted survival outcomes == actual outcomes) and displayed the decision tree for good measure.

In [18]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS

model = LogisticRegressionWithLBFGS.train(training_rdd)

In [19]:
predictions_rdd = model.predict(test_rdd.map(lambda x: x.features))

In [20]:
labels_and_predictions_rdd = test_rdd.map(lambda lp: lp.label).zip(predictions_rdd)

In [21]:
accuracy = labels_and_predictions_rdd.filter(lambda v_p: v_p[0] == v_p[1]).count() / float(test_count)
print('Accuracy =', accuracy)

Accuracy = 0.7842639593908629


So both classifiers (Decision tree and Logistic regression) show almost similar prediction accuracy.