# Initialize Spark-on-YARN:

In [1]:
execfile('../init/spark_init.py')

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.3.0
      /_/

Using Python version 2.7.10 (default, Sep 15 2015 14:50:01)
SparkContext available as sc, HiveContext available as sqlCtx.


# Import Train Data and set schema:
We'll read the csv files directly from hdfs and preview what each row looks like.

In [2]:
dat_train = sc.textFile('/user/cloudera/train.csv')
dat_train.take(3)

[u'PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked',
 u'1,0,3,"Braund, Mr. Owen Harris",male,22,1,0,A/5 21171,7.25,,S',
 u'2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Thayer)",female,38,1,0,PC 17599,71.2833,C85,C']

## Original Data Variable Description:
We have all the sufficent information to try to predict if someone survives the titanic sinking...

| Field     | Description                                                            |
|:--------- | ---------------------------------------------------------------------- |
| obs       | Observation                                                            |
| survival  | Survival (0 = No; 1 = Yes)                                             |
| pclass    | Passenger Class (1 = 1st; 2 = 2nd; 3 = 3rd)                            |
| name      | Name                                                                   |
| sex       | Sex                                                                    |
| age       | Age                                                                    |
| sibsp     | Number of Siblings/Spouses Aboard                                      |
| parch     | Number of Parents/Children Aboard                                      |
| ticket    | Ticket Number                                                          |
| fare      | Passenger Fare                                                         |
| cabin     | Cabin                                                                  |
| embarked  | Port of Embarkation (C = Cherbourg; Q = Queenstown; S = Southampton)   |

However, we will need to make the following changes to the rdd so that it can be read as LabelPoint form into a model:

**survival**: Set the survival field to integer and set as label.

**pclass**: Set the pclass field to integer and set as categorical feature.

**sex**: Set the sex field to integer and set as categorical feature.

**age**: Set the age field to float and set as feature. NOTE: missing ages are set to 30.

**sibsp**: Set the sibsp field to float and set as feature.

**parch**: Set the parch field to float and set as feature.

**cabin**: Map the cabin field to categorical feature.

**embark**: Map the embark field to categorical feature.

In [3]:
from pyspark.mllib.regression import LabeledPoint
def titanic_lp(x):
    fld = x.split(',')
    survival = int(fld[1])
    pclass   = int(fld[2])-1
    sexes    = {'female': 0, 'male': 1}
    sex      = sexes[fld[5]]
    age      = 30.0 if fld[6]=='' else float(fld[6])
    sibsp    = float(fld[7])
    parch    = float(fld[8])
    cabins   = {'A':1,'B':2,'C':3,'D':4}
    cabin    = 0 if ((fld[11]+'X')[0] not in cabins) else cabins[(fld[11]+'X')[0]]
    embarks  = {'C':1,'Q':2,'S':3}
    embark   = 0 if (fld[12] not in embarks) else embarks[fld[12]]
    label    = survival
    feats    =[pclass,sex,age,sibsp,parch,cabin,embark]
    return LabeledPoint(label, feats)    

Since we have categorical variables, we'll need to define the Categorical Feature Info (cfi).

In [4]:
titanic_cfi = {0:3,1:2,5:5,6:4}

Let's map the train data into LabeledPoint form and have a look at the top 3.

In [5]:
lp_train = dat_train.filter(lambda x: x[0]!='P').map(titanic_lp).cache()
lp_train.take(3)

[LabeledPoint(0.0, [2.0,1.0,22.0,1.0,0.0,0.0,3.0]),
 LabeledPoint(1.0, [0.0,0.0,38.0,1.0,0.0,3.0,1.0]),
 LabeledPoint(1.0, [2.0,0.0,26.0,0.0,0.0,0.0,3.0])]

# Train model using Training Data and cfi
We'll train our model. The model form isn't readily available to be plot as a tree diagram. However, our is still small enough that it can be read directly from the DebugString.

In [6]:
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from pyspark.mllib.util import MLUtils

model = DecisionTree.trainClassifier(data=lp_train, numClasses=2, categoricalFeaturesInfo=titanic_cfi,
                                     impurity="gini", maxDepth=3, maxBins=5, minInfoGain=0.0)

print(model.toDebugString())

DecisionTreeModel classifier of depth 3 with 15 nodes
  If (feature 1 in {1.0})
   If (feature 0 in {0.0})
    If (feature 2 <= 28.0)
     Predict: 1.0
    Else (feature 2 > 28.0)
     Predict: 0.0
   Else (feature 0 not in {0.0})
    If (feature 2 <= 20.0)
     Predict: 0.0
    Else (feature 2 > 20.0)
     Predict: 0.0
  Else (feature 1 not in {1.0})
   If (feature 0 in {2.0})
    If (feature 6 in {3.0})
     Predict: 0.0
    Else (feature 6 not in {3.0})
     Predict: 1.0
   Else (feature 0 not in {2.0})
    If (feature 6 in {0.0,1.0,2.0})
     Predict: 1.0
    Else (feature 6 not in {0.0,1.0,2.0})
     Predict: 1.0



# Calculate Model Training Data Prediction Accuracy

In [7]:
te = lp_train.map(lambda lp: lp.label).zip(model.predict(lp_train.map(lambda lp: lp.features)))
train_err = te.filter(lambda (v, p): v != p).count() / float(te.count())
1-train_err

0.8148148148148149

# Prediction Survival with Test Data
Now we can predict the survivors of the test data using the omdel we fit from out training data:

In [8]:
dat_test = sc.textFile('/user/cloudera/test.csv').filter(lambda x: x[0]!='P')
lp_test_feature = dat_test.map(lambda x: titanic_lp(x.split(',')[0] + ',0,'  + ','.join(x.split(',')[1:])).features)
test_predict = model.predict(lp_test_feature).zip(dat_test)
test_predict.take(10)

[(0.0, u'892,3,"Kelly, Mr. James",male,34.5,0,0,330911,7.8292,,Q'),
 (0.0, u'893,3,"Wilkes, Mrs. James (Ellen Needs)",female,47,1,0,363272,7,,S'),
 (0.0, u'894,2,"Myles, Mr. Thomas Francis",male,62,0,0,240276,9.6875,,Q'),
 (0.0, u'895,3,"Wirz, Mr. Albert",male,27,0,0,315154,8.6625,,S'),
 (0.0,
  u'896,3,"Hirvonen, Mrs. Alexander (Helga E Lindqvist)",female,22,1,1,3101298,12.2875,,S'),
 (0.0, u'897,3,"Svensson, Mr. Johan Cervin",male,14,0,0,7538,9.225,,S'),
 (1.0, u'898,3,"Connolly, Miss. Kate",female,30,0,0,330972,7.6292,,Q'),
 (0.0, u'899,2,"Caldwell, Mr. Albert Francis",male,26,1,1,248738,29,,S'),
 (1.0,
  u'900,3,"Abrahim, Mrs. Joseph (Sophie Halaut Easu)",female,18,0,0,2657,7.2292,,C'),
 (0.0, u'901,3,"Davies, Mr. John Samuel",male,21,2,0,A/4 48871,24.15,,S')]