In [1]:
from pyspark import SparkContext

#Used for features and labels in any supervised learning
from pyspark.mllib.regression import LabeledPoint

#Used for storing a set of features in the LabeledPoint
from pyspark.mllib.linalg import Vectors

#Used to compute model performance metrics
from pyspark.mllib.evaluation import MulticlassMetrics

#Used to build the decisionTree model
from pyspark.mllib.tree import DecisionTree

In [2]:
def title(s):
    print("---- %s -----" %s)    
    
def see(s, v):
    print("---- %s -----" %s)
    print(v)


In [3]:
sc = SparkContext.getOrCreate() #new in Spark 2

In [4]:
raw_data=sc.textFile("../assignment-data/HR_comma_sep.csv")
raw_data.count()
analysisRDD=raw_data.filter(lambda line: not(line.startswith("satisfaction"))).map(lambda line:line.split(","))

In [5]:
see("row data",raw_data.take(3))
see(" before encoding analysisRDD",analysisRDD.take(2))

---- row data -----
['satisfaction_level,last_evaluation,number_project,average_montly_hours,time_spend_company,Work_accident,left,promotion_last_5years,sales,salary', '0.38,0.53,2,157,3,0,1,0,sales,low', '0.8,0.86,5,262,6,0,1,0,sales,medium']
----  before encoding analysisRDD -----
[['0.38', '0.53', '2', '157', '3', '0', '1', '0', 'sales', 'low'], ['0.8', '0.86', '5', '262', '6', '0', '1', '0', 'sales', 'medium']]


## Categorical features

In [6]:
#getting the classes of the required to encode features
depclasses=analysisRDD.map(lambda x:(x[8],0)).distinct().keys().collect()
salaryclasses=analysisRDD.map(lambda x:(x[9],0)).distinct().keys().collect()
depclasses,salaryclasses

(['hr',
  'technical',
  'support',
  'management',
  'sales',
  'accounting',
  'IT',
  'product_mng',
  'marketing',
  'RandD'],
 ['low', 'medium', 'high'])

## Encoding the data

In [11]:
def preprocessing(row):
    #getting the department and salary then removing them from the list
    dep=row[8]
    salary=row[9]
    del row[8:10]
    label =row[6]
    del row[6]
#     encoding the data
    for i in depclasses:
        row.append(1) if i==dep else row.append(0)
    for i in salaryclasses:
        row.append(1) if i==salary else row.append(0)
#     row.append(depclasses.index(dep))
#     row.append(salaryclasses.index(salary))

    featureVector = Vectors.dense(row)
    return LabeledPoint(label, featureVector)
encodedRDD=analysisRDD.map(preprocessing)        

In [13]:
encodedRDD.take(5)

[LabeledPoint(1.0, [0.38,0.53,2.0,157.0,3.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0]),
 LabeledPoint(1.0, [0.8,0.86,5.0,262.0,6.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0]),
 LabeledPoint(1.0, [0.11,0.88,7.0,272.0,4.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0]),
 LabeledPoint(1.0, [0.72,0.87,5.0,223.0,5.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0]),
 LabeledPoint(1.0, [0.37,0.52,2.0,159.0,3.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0])]

## Splitting and Training

In [25]:
(trainData, cvData, testData) = encodedRDD.randomSplit(weights=[0.8, 0.1, 0.1],seed=17)
trainData.cache()
cvData.cache()
testData.cache()


PythonRDD[684] at RDD at PythonRDD.scala:48

In [26]:
def buildDecisionTreeClassifier(trainData):
    model = DecisionTree.trainClassifier(trainData,numClasses=2, categoricalFeaturesInfo={}, impurity="gini", maxDepth=3, maxBins=100)
    return  model
%time m1 =  buildDecisionTreeClassifier(trainData)

CPU times: user 20 ms, sys: 8 ms, total: 28 ms
Wall time: 985 ms


## Calcuating Accuracy

In [27]:
def getMetrics(model, data):
    labels = data.map(lambda d: d.label)
    features = data.map(lambda d: d.features)
    predictions = model.predict(features)
    predictionsAndLabels = predictions.zip(labels)
    return MulticlassMetrics(predictionsAndLabels)
metrics = getMetrics(m1, cvData)
%time accuracy = metrics.accuracy
see("accuracy", accuracy)

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 717 ms
---- accuracy -----
0.9546632124352331


# cross validation

In [28]:
def evaluate(trainData, cvData):
    evaluations = []
    #Take combinations of 3 hyperparams, evaluation the accuracy of each using CV Dataset
    for impurity in ["gini", "entropy"]:
        for depth in [1, 20]:
            for bins in [10, 300]:
                model = DecisionTree.trainClassifier(trainData,numClasses=7, categoricalFeaturesInfo={}, impurity=impurity, maxDepth=depth, maxBins=bins)
                accuracy = getMetrics(model, cvData).accuracy
                evaluations.append(((impurity, depth, bins), accuracy))

    return evaluations
evaluations = evaluate(trainData, cvData)
evaluations

[(('gini', 1, 10), 0.7875647668393783),
 (('gini', 1, 300), 0.8134715025906736),
 (('gini', 20, 10), 0.9656735751295337),
 (('gini', 20, 300), 0.977979274611399),
 (('entropy', 1, 10), 0.7875647668393783),
 (('entropy', 1, 300), 0.8134715025906736),
 (('entropy', 20, 10), 0.9682642487046632),
 (('entropy', 20, 300), 0.9766839378238342)]

In [29]:
sortedEvals = sorted(evaluations, key=lambda a:a[1], reverse=True)
title("Sorted Evaluations")
for e in sortedEvals:
    print(e)
bestParams = sortedEvals[0][0]


---- Sorted Evaluations -----
(('gini', 20, 300), 0.977979274611399)
(('entropy', 20, 300), 0.9766839378238342)
(('entropy', 20, 10), 0.9682642487046632)
(('gini', 20, 10), 0.9656735751295337)
(('gini', 1, 300), 0.8134715025906736)
(('entropy', 1, 300), 0.8134715025906736)
(('gini', 1, 10), 0.7875647668393783)
(('entropy', 1, 10), 0.7875647668393783)


In [30]:
(i, d, b) = bestParams
model = DecisionTree.trainClassifier(
    trainData.union(cvData),
    numClasses=7, 
    categoricalFeaturesInfo={}, 
    impurity=i, 
    maxDepth=d, 
    maxBins=b)
see("testData accuracy",getMetrics(model, testData).accuracy)
see("testData+cvData accuracy", getMetrics(model, testData.union(cvData)).accuracy)

---- testData accuracy -----
0.9813019390581718
---- testData+cvData accuracy -----
0.9909638554216867


In [31]:
trainData.unpersist()
cvData.unpersist()
testData.unpersist()

PythonRDD[684] at RDD at PythonRDD.scala:48