Preliminaries
Add the necessary libraries and create Spark Context and create an RDD

In [3]:
from pyspark import SparkContext
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import RandomForest
import time

sc = SparkContext(appName="MNISTDigitsDT")
fileNameTrain = 'wasb://datasets@chuvyrov.blob.core.windows.net/trainingsample.csv'
fileNameTest = 'wasb://datasets@chuvyrov.blob.core.windows.net/validationsample.csv'

mnist_train = sc.textFile(fileNameTrain)
mnist_test = sc.textFile(fileNameTest)


Create a function to parse CSV lines and convert them to LabeledPoint. LabeledPoint is the class that our Machine Learning algorithms expects (down below). Note the additional values we are extracting from the data - 'gender' and 'embarked.'

In [4]:
def parsePoint(line):
    """
    Parse a line of text into a set of MLlib LabeledPoint object.
    """
    values = line.split(',')
    values = [0 if e == '' else int(e) for e in values]
    return LabeledPoint(int(values[0]), values[1:])

In [5]:
#skip header
header = mnist_train.first() #extract header
mnist_train = mnist_train.filter(lambda x:x !=header) #filter out header

print mnist_train.first()


1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,188,255,94,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,191,250,253,93,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,123,248,253,167,10,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,80,247,253,208,13,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,29,207,253,235,77,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,54,209,253,253,88,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,93,254,253,238,170,17,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,23,210,254,253,159,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,16,209,253,254,240,81,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,27,253,253,254,13,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,20,206,254,254,198,7,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,168,253,253,196,7,0,0,0,0,0,0,0,0,0,0,0,0,0,0

Split the data into training and test set - much better to test on unseen data

In [6]:
labeledPoints = mnist_train.map(parsePoint)
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = labeledPoints.randomSplit([0.7, 0.3])

Train Random Forest classifier

In [7]:
depthLevel = 4
treeLevel = 3
#start timer
start_time = time.time()

model = RandomForest.trainClassifier(trainingData, numClasses=10, categoricalFeaturesInfo={},
                                             numTrees=treeLevel, featureSubsetStrategy="auto",
                                             impurity='gini', maxDepth=depthLevel, maxBins=32)  

print("Training time --- %s seconds ---" % (time.time() - start_time))

Training time --- 21.2118899822 seconds ---


And make predictions

In [9]:
# Evaluate model on test instances and compute test error

#start timer
start_time = time.time()

predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() / float(testData.count())

print('Test Error = ' + str(testErr))
print("Prediction time --- %s seconds ---" % (time.time() - start_time))

#print('Learned classification tree model:')
#print(model.toDebugString())

Test Error = 0.452333990796
Prediction time --- 6.6730120182 seconds ---
Learned classification tree model:
TreeEnsembleModel classifier with 3 trees

  Tree 0:
    If (feature 511 <= 1.0)
     If (feature 350 <= 132.0)
      If (feature 542 <= 0.0)
       If (feature 402 <= 0.0)
        Predict: 7.0
       Else (feature 402 > 0.0)
        Predict: 9.0
      Else (feature 542 > 0.0)
       If (feature 216 <= 0.0)
        Predict: 6.0
       Else (feature 216 > 0.0)
        Predict: 8.0
     Else (feature 350 > 132.0)
      If (feature 515 <= 1.0)
       If (feature 597 <= 9.0)
        Predict: 1.0
       Else (feature 597 > 9.0)
        Predict: 3.0
      Else (feature 515 > 1.0)
       If (feature 375 <= 15.0)
        Predict: 1.0
       Else (feature 375 > 15.0)
        Predict: 8.0
    Else (feature 511 > 1.0)
     If (feature 385 <= 14.0)
      If (feature 318 <= 5.0)
       If (feature 630 <= 154.0)
        Predict: 2.0
       Else (feature 630 > 154.0)
        Predict: 3.0
      

Perform grid search over parameters to improve accuracy

In [None]:
bestModel = None
bestTestErr = 100

maxDepths = range(4,10)
maxTrees = range(3,10)
for depthLevel in maxDepths:
    for treeLevel in maxTrees:
        
        #start timer
        start_time = time.time()
        model = RandomForest.trainClassifier(trainingData, numClasses=10, categoricalFeaturesInfo={},
                                             numTrees=treeLevel, featureSubsetStrategy="auto",
                                             impurity='gini', maxDepth=depthLevel, maxBins=32)        

        predictions = model.predict(testData.map(lambda x: x.features))
        labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
        testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() / float(testData.count())
        
        print ('\maxDepth = {0:.1f}, trees = {1:.1f}: trainErr = {2:.5f}'
               .format(depthLevel, treeLevel, testErr))
        print("Prediction time --- %s seconds ---" % (time.time() - start_time))
        if (testErr < bestTestErr):
            bestModel = model
            bestTestErr = testErr
            
print ('Best Test Error: = {0:.3f}\n'
       .format(bestTestErr))


\maxDepth = 4.0, trees = 3.0: trainErr = 0.41749
Prediction time --- 15.9577219486 seconds ---
\maxDepth = 4.0, trees = 4.0: trainErr = 0.41683
Prediction time --- 14.4539039135 seconds ---
\maxDepth = 4.0, trees = 5.0: trainErr = 0.38067
Prediction time --- 14.2586538792 seconds ---
\maxDepth = 4.0, trees = 6.0: trainErr = 0.37278
Prediction time --- 14.1658930779 seconds ---
\maxDepth = 4.0, trees = 7.0: trainErr = 0.31755
Prediction time --- 14.6696748734 seconds ---
\maxDepth = 4.0, trees = 8.0: trainErr = 0.32347
Prediction time --- 14.2442209721 seconds ---
\maxDepth = 4.0, trees = 9.0: trainErr = 0.33662
Prediction time --- 14.6478559971 seconds ---
\maxDepth = 5.0, trees = 3.0: trainErr = 0.40763
Prediction time --- 14.5690767765 seconds ---
\maxDepth = 5.0, trees = 4.0: trainErr = 0.30769
Prediction time --- 14.6435680389 seconds ---
\maxDepth = 5.0, trees = 5.0: trainErr = 0.31098

Sample code for making predictions.

In [7]:
from pyspark.mllib.linalg import Vectors

def parseTestPoint(line):
    """
    Parse a line of text into an MLlib LabeledPoint object.
    """
    values = line.split(',')
    values = [0 if e == '' else e for e in values]
    gender = abs(hash(values[4]))
    embarked = abs(hash(values[11]))
    return Vectors.dense([float(values[1]),gender,float(values[5]),float(values[6]),float(values[7]),float(values[9]),embarked])

fileNameTest = 'wasb://kaggle@criteo.blob.core.windows.net/test.csv'
testPoints = sc.textFile(fileNameTest)

#skip header
headerTest = testPoints.first() #extract header
testPoints = testPoints.filter(lambda x:x !=headerTest) #filter out header
print testPoints.take(2)

testPoints = testPoints.map(parseTestPoint)

predictions = model.predict(testPoints)
print predictions.take(125)

[u'892,3,"Kelly, Mr. James",male,34.5,0,0,330911,7.8292,,Q', u'893,3,"Wilkes, Mrs. James (Ellen Needs)",female,47,1,0,363272,7,,S']
[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 1.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, 1.0, 1.0, 0.0, 1.0, 0.0, 0.0]


Sample code for creating a submission file for kaggle.com

In [8]:
#write out predictions to .CSV file
submissionIds = sc.textFile(fileNameTest).map(lambda x: x.split(',')[0])

#skip header
headerSubmission = submissionIds.first() #extract header
submissionIds = submissionIds.filter(lambda x:x !=headerSubmission) #filter out header
print submissionIds.take(3)

[u'892', u'893', u'894']


In [14]:
submission = submissionIds.zip(predictions.map(lambda x: int(x)))

In [16]:
submission.map(lambda a: str(a[0]) + "," + str(a[1])).coalesce(1,True).saveAsTextFile('wasb://criteo@criteo.blob.core.windows.net/kaggle/svcc_test2.csv')