Understand Apache Spark with Titanic data analysis.

Referred from https://try.jupyter.org

In [1]:
import pyspark  
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import LogisticRegressionWithSGD
from pyspark.mllib.tree import DecisionTree

In [2]:
from pyspark import SparkContext

In [3]:
#Creating a SparkContext object
sc = SparkContext()

In [4]:
raw_rdd = sc.textFile("titanic.csv")

In [5]:
raw_rdd.count()

1317

In [6]:
header = raw_rdd.first()
data_rdd = raw_rdd.filter(lambda line: line != header)
#Random sample of the data
data_rdd.takeSample(False, 5, 0)

[u'"116","1st class","adults","man","no"',
 u'"26","1st class","adults","man","yes"',
 u'"393","2nd class","adults","man","no"',
 u'"774","3rd class","adults","man","no"',
 u'"799","3rd class","adults","man","no"']

Defining a function to turn the passenger attributions into structured LabeledPoint objects.

In [7]:
def row_to_labeled_point(line):
    '''
    Builds a LabelPoint consisting of:
    
    survival (truth): 0=no, 1=yes
    ticket class: 0=1st class, 1=2nd class, 2=3rd class
    age group: 0=child, 1=adults
    gender: 0=man, 1=woman
    '''
    passenger_id, _class, age, sex, survived = [segs.strip('"') for segs in line.split(',')]
    _class = int(_class[0]) - 1
    
    if (age not in ['adults', 'child'] or 
        sex not in ['man', 'women'] or
        survived not in ['yes', 'no']):
        raise RuntimeError('unknown value')
    
    features = [
        _class,
        (1 if age == 'adults' else 0),
        (1 if sex == 'women' else 0)
    ]
    return LabeledPoint(1 if survived == 'yes' else 0, features)

In [8]:
labeled_points_rdd = data_rdd.map(row_to_labeled_point)
labeled_points_rdd.takeSample(False, 5, 0)

[LabeledPoint(0.0, [0.0,1.0,0.0]),
 LabeledPoint(1.0, [0.0,1.0,0.0]),
 LabeledPoint(0.0, [1.0,1.0,0.0]),
 LabeledPoint(0.0, [2.0,1.0,0.0]),
 LabeledPoint(0.0, [2.0,1.0,0.0])]

Splitting the data for training and testing. 

Training data - 70%

Testing data - 30%

In [9]:
training_data, test_data = labeled_points_rdd.randomSplit([0.7, 0.3], seed = 0)
training_count = training_data.count()
test_count = test_data.count()

In [10]:
training_count

916

In [11]:
test_count

400

Training a decision tree classifier

In [12]:
model = DecisionTree.trainClassifier(training_data, numClasses=2, categoricalFeaturesInfo={0: 3, 1: 2, 2: 2})

Applying the trained model to the test data.

In [13]:
predictions_data = model.predict(test_data.map(lambda x: x.features))

In [14]:
truth_and_predictions_data = test_data.map(lambda lp: lp.label).zip(predictions_data)

Computing the test data accuracy.

In [16]:
accuracy = truth_and_predictions_data.filter(lambda v_p: v_p[0] == v_p[1]).count() / float(test_count)
print("Accuracy: "),
print accuracy

Accuracy:  0.79


Displaying the decision tree.

In [17]:
print("Decision tree:")
print(model.toDebugString())

Decision tree:
DecisionTreeModel classifier of depth 4 with 21 nodes
  If (feature 2 in {0.0})
   If (feature 1 in {0.0})
    If (feature 0 in {0.0,2.0})
     If (feature 0 in {0.0})
      Predict: 1.0
     Else (feature 0 not in {0.0})
      Predict: 0.0
    Else (feature 0 not in {0.0,2.0})
     Predict: 1.0
   Else (feature 1 not in {0.0})
    If (feature 0 in {1.0})
     Predict: 0.0
    Else (feature 0 not in {1.0})
     If (feature 0 in {0.0})
      Predict: 0.0
     Else (feature 0 not in {0.0})
      Predict: 0.0
  Else (feature 2 not in {0.0})
   If (feature 0 in {2.0})
    If (feature 1 in {0.0})
     Predict: 0.0
    Else (feature 1 not in {0.0})
     Predict: 0.0
   Else (feature 0 not in {2.0})
    If (feature 0 in {1.0})
     If (feature 1 in {0.0})
      Predict: 1.0
     Else (feature 1 not in {0.0})
      Predict: 1.0
    Else (feature 0 not in {1.0})
     Predict: 1.0



Training and testing a logistic regression classifier.

In [18]:
model2 = LogisticRegressionWithSGD.train(training_data)

  "Deprecated in 2.0.0. Use ml.classification.LogisticRegression or "


In [20]:
predictions_data2 = model2.predict(test_data.map(lambda x: x.features))

In [21]:
labels_and_predictions_data2 = test_data.map(lambda lp: lp.label).zip(predictions_data2)

Calculating acuuracy.

In [22]:
accuracy = labels_and_predictions_data2.filter(lambda v_p: v_p[0] == v_p[1]).count() / float(test_count)

In [23]:
print("Accuracy: "),
print accuracy

Accuracy:  0.7825
