###Preliminaries
Add the necessary libraries and create Spark Context and create an RDD

In [1]:
from pyspark import SparkContext
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import LogisticRegressionWithSGD
from pyspark.mllib.classification import SVMWithSGD

sc = SparkContext(appName="TitanicLR")
#open file from Azure Storage
fileNameTrain = 'wasb://kaggle@criteo.blob.core.windows.net/train.csv'

#create RDD
points = sc.textFile(fileNameTrain)

Create a function to parse CSV lines and convert them to LabeledPoint. LabeledPoint is the class that our Machine Learning algorithms expects (down below).

In [2]:
def parsePoint(line):
    """
    Parse a line of text into an MLlib LabeledPoint object.
    """
    values = line.split(',')
    values = [0 if e == '' else e for e in values]
    return LabeledPoint(float(values[1]), [float(values[2]),float(values[6]),float(values[7]),float(values[8]),float(values[10])])

Skip the header information - there are certainly other (more efficient) ways to do it but this also works

In [4]:
#skip header
header = points.first() #extract header
points = points.filter(lambda x:x !=header) #filter out header

print points.first()


1,0,3,"Braund, Mr. Owen Harris",male,22,1,0,A/5 21171,7.25,,S


Apply transformation on every line of the CSV

In [5]:
labeledPoints = points.map(parsePoint)

Use MLLib Logistic Regression on training data

In [7]:
model = LogisticRegressionWithSGD.train(labeledPoints)
print("Final weights: " + str(model.weights))
print("Final intercept: " + str(model.intercept))

Final weights: [-3.18506726732,-0.729241975158,-1.71365575134,-0.384488786453,0.403867774938]
Final intercept: 0.0


Perform basic evaluation of the model - % correct

In [8]:
# evaluate the model
labelsAndPreds = labeledPoints.map(lambda point: (int(point.label),
        model.predict(point.features)))

# Evaluating the model on training data
trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() / float(labeledPoints.count())
print("Training Error = " + str(trainErr))

Training Error = 0.334455667789


Now prepare the testing data for prediction

In [10]:
from pyspark.mllib.linalg import Vectors

def parseTestPoint(line):
    """
    Parse a line of text into an MLlib LabeledPoint object.
    """
    values = line.split(',')
    values = [0 if e == '' else e for e in values]
    return Vectors.dense([float(values[1]),float(values[5]),float(values[6]),float(values[7]),float(values[9])])

fileNameTest = 'wasb://kaggle@criteo.blob.core.windows.net/test.csv'
testPoints = sc.textFile(fileNameTest)

#skip header
headerTest = testPoints.first() #extract header
testPoints = testPoints.filter(lambda x:x !=headerTest) #filter out header
print testPoints.take(2)


[u'892,3,"Kelly, Mr. James",male,34.5,0,0,330911,7.8292,,Q', u'893,3,"Wilkes, Mrs. James (Ellen Needs)",female,47,1,0,363272,7,,S']


Make a prediction

In [11]:
testPoints = testPoints.map(parseTestPoint)

predictions = model.predict(testPoints)
print predictions.take(125)

[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0]


Prepare .CSV for submission - grab Ids from the first column

In [12]:
#write out predictions to .CSV file
submissionIds = sc.textFile(fileNameTest).map(lambda x: x.split(',')[0])

#skip header
headerSubmission = submissionIds.first() #extract header
submissionIds = submissionIds.filter(lambda x:x !=headerSubmission) #filter out header
print submissionIds.take(3)

[u'892', u'893', u'894']


Combine Ids with predictions

In [14]:
submission = submissionIds.zip(predictions)

Finally, write out the CSV file to Azure storage. Notice the coalesce(1,True) to force the job to execute on a single node (and thus, create a single file)

In [15]:
submission.map(lambda a: str(a[0]) + "," + str(a[1])).coalesce(1,True).saveAsTextFile('wasb://criteo@criteo.blob.core.windows.net/kaggle/svcc_test1.csv')