In [1]:
from pyspark import SparkContext

#Used for features and labels in any supervised learning
from pyspark.mllib.regression import LabeledPoint

#Used for storing a set of features in the LabeledPoint
from pyspark.mllib.linalg import Vectors

#Used to compute model performance metrics
from pyspark.mllib.evaluation import MulticlassMetrics

#Used to build the decisionTree model
from pyspark.mllib.tree import DecisionTree

In [2]:
sc = SparkContext.getOrCreate()

In [3]:
def title(s):
    print("---- %s -----" %s)    
    
def see(s, v):
    print("---- %s -----" %s)
    print(v)

In [4]:
files = "./hr.csv"
rawData = sc.textFile(files)
head = rawData.first()
header =head.split(',')
r1 = rawData.filter(lambda l: l!=head)
r2 = r1.map(lambda x: x.split(','))

In [5]:
l1 = r2.map(lambda x:x[8]).distinct().collect()
l2 = r2.map(lambda x:x[9]).distinct().collect()
r3 = r2.collect()
for i in range(0,len(l1)):
    header.insert(9+i,header[8]+"^"+l1[i])
for i in range(0,len(l2)):
    header.insert(10+i+len(l1),header[9+len(l1)]+"^"+l2[i])
header.pop(8)
header.pop(18)
for i in range(0,len(r3)):
    for j in range(0,len(l1)):
        if r3[i][8].split(',')[0]==l1[j]:
            r3[i][8] = r3[i][8] + ',1'
            #r3[i].insert(9+i,'1')
        else:
            r3[i][8] = r3[i][8] + ',0'
            #r3[i].insert(9+i,'0')
    for j in range(0,len(l2)):
        if r3[i][9].split(',')[0]==l2[j]:
            r3[i][9] = r3[i][9] + ',1'
        else:
            r3[i][9] = r3[i][9] + ',0'
    #r3[i][8] = r3[i][8].split(',')[1:]
    #r3[i][9] = r3[i][9].split(',')[1:]
    for j in r3[i][8].split(',')[1:]:
        r3[i].append(j)
    for j in r3[i][9].split(',')[1:]:
        r3[i].append(j)
    r3[i].pop(9)
    r3[i].pop(8)

In [6]:
r8 = sc.parallelize(r3).map(lambda x:[float(i) for i in x])

In [7]:
def preprocessing(line):
    values = list(map(lambda x: x, line))
    label = values.pop(6)
    featureVector = Vectors.dense(values)
    return LabeledPoint(label, featureVector)

data = r8.map(preprocessing)

In [8]:
(trainData, cvData, testData) = data.randomSplit(weights=[0.8, 0.1, 0.1])
trainData.cache()
cvData.cache()
testData.cache()

PythonRDD[17] at RDD at PythonRDD.scala:48

In [9]:
def getMetrics(model, data):
    labels = data.map(lambda d: d.label)
    features = data.map(lambda d: d.features)
    predictions = model.predict(features)
    predictionsAndLabels = predictions.zip(labels)
    return MulticlassMetrics(predictionsAndLabels)

In [10]:
def evaluate(trainData, cvData):
    evaluations = []
    for impurity in ["gini", "entropy"]:
        for depth in range(10, 22):
            for bins in [190,210,230,250,270,290,300]:
                model = DecisionTree.trainClassifier(trainData,numClasses=2, categoricalFeaturesInfo={}, impurity=impurity, maxDepth=depth, maxBins=bins)
                accuracy = getMetrics(model, cvData).accuracy
                evaluations.append(((impurity, depth, bins), accuracy))

    return evaluations
evaluations = evaluate(trainData, cvData)

In [11]:
sortedEvals = sorted(evaluations, key=lambda a:a[1], reverse=True)
title("Sorted Evaluations")
for e in sortedEvals:
    print(e)

---- Sorted Evaluations -----
(('gini', 14, 210), 0.9853917662682603)
(('gini', 14, 190), 0.9847277556440903)
(('gini', 14, 230), 0.9847277556440903)
(('gini', 14, 250), 0.9847277556440903)
(('gini', 14, 270), 0.9847277556440903)
(('gini', 14, 290), 0.9847277556440903)
(('gini', 14, 300), 0.9847277556440903)
(('gini', 15, 210), 0.9847277556440903)
(('gini', 15, 190), 0.9840637450199203)
(('gini', 15, 230), 0.9840637450199203)
(('gini', 15, 250), 0.9840637450199203)
(('gini', 15, 270), 0.9840637450199203)
(('gini', 15, 290), 0.9840637450199203)
(('gini', 15, 300), 0.9840637450199203)
(('gini', 16, 210), 0.9840637450199203)
(('gini', 18, 210), 0.9840637450199203)
(('gini', 19, 210), 0.9840637450199203)
(('gini', 20, 210), 0.9840637450199203)
(('gini', 21, 210), 0.9840637450199203)
(('entropy', 15, 210), 0.9840637450199203)
(('gini', 10, 210), 0.9833997343957503)
(('gini', 16, 190), 0.9833997343957503)
(('gini', 16, 230), 0.9833997343957503)
(('gini', 16, 250), 0.9833997343957503)
(('gini

In [12]:
bestParams = sortedEvals[0][0]
see("bestParams", bestParams)

---- bestParams -----
('gini', 14, 210)


In [13]:
(i, d, b) = bestParams
%time model = DecisionTree.trainClassifier(trainData.union(cvData),numClasses=2,categoricalFeaturesInfo={},impurity=i,maxDepth=d,maxBins=b)
see("testData accuracy",getMetrics(model, testData).accuracy)
see("testData+cvData accuracy", getMetrics(model, testData.union(cvData)).accuracy)

CPU times: user 30 ms, sys: 0 ns, total: 30 ms
Wall time: 1.44 s
---- testData accuracy -----
0.9821314613911933
---- testData+cvData accuracy -----
0.9886104783599089


In [14]:
trainData.unpersist()
cvData.unpersist()
testData.unpersist()

PythonRDD[17] at RDD at PythonRDD.scala:48