In [1]:
import pyspark  
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import LogisticRegressionWithSGD
from pyspark.mllib.tree import DecisionTree

In [2]:
sc = pyspark.SparkContext()

In [6]:
raw_rdd = sc.textFile("titanic.csv")

In [7]:
raw_rdd.count()

1317

In [8]:
raw_rdd

titanic.csv MapPartitionsRDD[5] at textFile at NativeMethodAccessorImpl.java:0

In [9]:
raw_rdd.take(5)

[u'"","class","age","sex","survived"',
 u'"1","1st class","adults","man","yes"',
 u'"2","1st class","adults","man","yes"',
 u'"3","1st class","adults","man","yes"',
 u'"4","1st class","adults","man","yes"']

In [10]:
header = raw_rdd.first()
data_rdd = raw_rdd.filter(lambda line: line != header)

In [11]:
data_rdd.takeSample(False, 5, 0)

[u'"116","1st class","adults","man","no"',
 u'"26","1st class","adults","man","yes"',
 u'"393","2nd class","adults","man","no"',
 u'"774","3rd class","adults","man","no"',
 u'"799","3rd class","adults","man","no"']

In [12]:
def row_to_labeled_point(line):
    '''
    Builds a LabelPoint consisting of:
    
    survival (truth): 0=no, 1=yes
    ticket class: 0=1st class, 1=2nd class, 2=3rd class
    age group: 0=child, 1=adults
    gender: 0=man, 1=woman
    '''
    passenger_id, klass, age, sex, survived = [segs.strip('"') for segs in line.split(',')]
    klass = int(klass[0]) - 1
    
    if (age not in ['adults', 'child'] or 
        sex not in ['man', 'women'] or
        survived not in ['yes', 'no']):
        raise RuntimeError('unknown value')
    
    features = [
        klass,
        (1 if age == 'adults' else 0),
        (1 if sex == 'women' else 0)
    ]
    return LabeledPoint(1 if survived == 'yes' else 0, features)

In [13]:
labeled_points_rdd = data_rdd.map(row_to_labeled_point)

In [14]:
labeled_points_rdd.takeSample(False, 5, 0)

[LabeledPoint(0.0, [0.0,1.0,0.0]),
 LabeledPoint(1.0, [0.0,1.0,0.0]),
 LabeledPoint(0.0, [1.0,1.0,0.0]),
 LabeledPoint(0.0, [2.0,1.0,0.0]),
 LabeledPoint(0.0, [2.0,1.0,0.0])]

In [15]:
training_rdd, test_rdd = labeled_points_rdd.randomSplit([0.7, 0.3], seed = 0)

In [16]:
training_count = training_rdd.count()
test_count = test_rdd.count()

In [17]:
model = DecisionTree.trainClassifier(training_rdd, 
                                     numClasses=2, 
                                     categoricalFeaturesInfo={
                                        0: 3,
                                        1: 2,
                                        2: 2
                                     })

In [18]:
predictions_rdd = model.predict(test_rdd.map(lambda x: x.features))

In [19]:
truth_and_predictions_rdd = test_rdd.map(lambda lp: lp.label).zip(predictions_rdd)

In [20]:
accuracy = truth_and_predictions_rdd.filter(lambda v_p: v_p[0] == v_p[1]).count() / float(test_count)
print('Accuracy =', accuracy)
print(model.toDebugString())

('Accuracy =', 0.79)
DecisionTreeModel classifier of depth 4 with 21 nodes
  If (feature 2 in {0.0})
   If (feature 1 in {0.0})
    If (feature 0 in {0.0,2.0})
     If (feature 0 in {0.0})
      Predict: 1.0
     Else (feature 0 not in {0.0})
      Predict: 0.0
    Else (feature 0 not in {0.0,2.0})
     Predict: 1.0
   Else (feature 1 not in {0.0})
    If (feature 0 in {1.0})
     Predict: 0.0
    Else (feature 0 not in {1.0})
     If (feature 0 in {0.0})
      Predict: 0.0
     Else (feature 0 not in {0.0})
      Predict: 0.0
  Else (feature 2 not in {0.0})
   If (feature 0 in {2.0})
    If (feature 1 in {0.0})
     Predict: 0.0
    Else (feature 1 not in {0.0})
     Predict: 0.0
   Else (feature 0 not in {2.0})
    If (feature 0 in {1.0})
     If (feature 1 in {0.0})
      Predict: 1.0
     Else (feature 1 not in {0.0})
      Predict: 1.0
    Else (feature 0 not in {1.0})
     Predict: 1.0



In [21]:
model = LogisticRegressionWithSGD.train(training_rdd)

  "Deprecated in 2.0.0. Use ml.classification.LogisticRegression or "


In [22]:
predictions_rdd = model.predict(test_rdd.map(lambda x: x.features))

In [23]:
labels_and_predictions_rdd = test_rdd.map(lambda lp: lp.label).zip(predictions_rdd)

In [24]:
accuracy = labels_and_predictions_rdd.filter(lambda v_p: v_p[0] == v_p[1]).count() / float(test_count)
print('Accuracy =', accuracy)

('Accuracy =', 0.7825)
