In [1]:
import os
import sys

# Configure the environment
#if 'SPARK_HOME' not in os.environ:
#    os.environ['SPARK_HOME'] = '/srv/spark'

# Create a variable for our root path
SPARK_HOME = os.environ['SPARK_HOME']

print("SPARK HOME ==>  ",SPARK_HOME);

# Add the PySpark/py4j to the Python Path
sys.path.insert(0, os.path.join(SPARK_HOME, "python", "lib"))
sys.path.insert(0, os.path.join(SPARK_HOME, "python"))


SPARK HOME ==>   D:\Spark1.6


In [2]:
import pyspark
from pyspark import SparkContext, SparkConf
#conf = SparkConf().setAppName("myapp").setMaster("local[0]");
conf = SparkConf().setAppName("MyApp").setMaster("local");
sc = SparkContext(conf=conf)

In [3]:
import pyspark  
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import LogisticRegressionWithSGD
from pyspark.mllib.tree import DecisionTree

In [4]:
raw_data_rdd = sc.textFile("../datasets/titanic.csv");

In [5]:
print("Total File Size ==>  ",raw_data_rdd.count());

Total File Size ==>   1317


In [7]:
#lets take only 500 data points from the dataset instead of pulling all the dataset

raw_data_rdd.take(500);

In [8]:
#remove the header from the csv file

csv_header = raw_data_rdd.first();
data_rdd = raw_data_rdd.filter(lambda line: line != csv_header);

In [30]:
#take sample of data from the dataset

data_rdd.takeSample(False, 500, 0);

In [9]:
#function to create labeled point data vectors

def row_to_labeled_point(line):
    '''
    Builds a LabelPoint consisting of:
    
    survival (truth): 0=no, 1=yes
    ticket class: 0=1st class, 1=2nd class, 2=3rd class
    age group: 0=child, 1=adults
    gender: 0=man, 1=woman
    '''
    passenger_id, klass, age, sex, survived = [segs.strip('"') for segs in line.split(',')]
    klass = int(klass[0]) - 1
    
    if (age not in ['adults', 'child'] or 
        sex not in ['man', 'women'] or
        survived not in ['yes', 'no']):
        raise RuntimeError('unknown value')
    
    features = [
        klass,
        (1 if age == 'adults' else 0),
        (1 if sex == 'women' else 0)
    ]
    return LabeledPoint(1 if survived == 'yes' else 0, features)

In [10]:
#map the labeled point the dataset
labeled_points_rdd = data_rdd.map(row_to_labeled_point)

In [12]:
labeled_points_rdd.takeSample(False,10, 0);

In [13]:
#split the dataset for training(70%) & testing(30%)
training_rdd, test_rdd = labeled_points_rdd.randomSplit([0.7, 0.3], seed = 0)

In [14]:
training_count = training_rdd.count()
test_count = test_rdd.count()

print("Training Dataset Count ==> ",training_count);
print("Test Dataset Count ==> ",test_count);

Training Dataset Count ==>  922
Test Dataset Count ==>  394


In [15]:
#Train and test a decision tree classifier
model = DecisionTree.trainClassifier(training_rdd, 
                                     numClasses=2, 
                                     categoricalFeaturesInfo={
                                        0: 3,
                                        1: 2,
                                        2: 2
                                     })

In [16]:
#predictions outcomes
predictions_rdd = model.predict(test_rdd.map(lambda x: x.features))

In [17]:
#truth outcome for each passenger in the test set
truth_and_predictions_rdd = test_rdd.map(lambda lp: lp.label).zip(predictions_rdd)

In [43]:
accuracy = truth_and_predictions_rdd.filter(lambda v_p: v_p[0] == v_p[1]).count() / float(test_count)
print('Model Accuracy => ', accuracy);
print(model.toDebugString());

Model Accuracy =>  0.8020304568527918
DecisionTreeModel classifier of depth 4 with 21 nodes
  If (feature 2 in {0.0})
   If (feature 1 in {0.0})
    If (feature 0 in {0.0,1.0})
     Predict: 1.0
    Else (feature 0 not in {0.0,1.0})
     Predict: 0.0
   Else (feature 1 not in {0.0})
    If (feature 0 in {1.0})
     Predict: 0.0
    Else (feature 0 not in {1.0})
     If (feature 0 in {0.0})
      Predict: 0.0
     Else (feature 0 not in {0.0})
      Predict: 0.0
  Else (feature 2 not in {0.0})
   If (feature 0 in {2.0})
    If (feature 1 in {0.0})
     Predict: 0.0
    Else (feature 1 not in {0.0})
     Predict: 0.0
   Else (feature 0 not in {2.0})
    If (feature 0 in {1.0})
     If (feature 1 in {0.0})
      Predict: 1.0
     Else (feature 1 not in {0.0})
      Predict: 1.0
    Else (feature 0 not in {1.0})
     If (feature 1 in {0.0})
      Predict: 1.0
     Else (feature 1 not in {0.0})
      Predict: 1.0



In [21]:
#Train and test a logistic regression classifier
logit_model = LogisticRegressionWithSGD.train(training_rdd);

In [22]:
predictions_rdd = logit_model.predict(test_rdd.map(lambda x: x.features));

In [23]:
labels_and_predictions_rdd = test_rdd.map(lambda lp: lp.label).zip(predictions_rdd);

In [24]:
accuracy = labels_and_predictions_rdd.filter(lambda v_p: v_p[0] == v_p[1]).count() / float(test_count)
print('Logit Model Accuracy => ', accuracy);

Logit Model Accuracy =>  0.7918781725888325


In [25]:
print("Logit Model Intercept => ",logit_model.intercept);

Logit Model Intercept =>  0.0


In [26]:
print("Logit Model Weights => ",logit_model.weights);

Logit Model Weights =>  [-0.626119570298,-0.154014997299,1.23177368167]


In [27]:
#shutdown spark istance
sc.stop();