# Loading data

In [63]:
import os

can_flight=sc.textFile('CancelledFlights.csv')
can_flight=can_flight.map(lambda a:a.split(','))
#First Row
can_flight.first()

[u'1', u'12', u'814', u'UA', u'134', u'0', u'0', u'679']

##### Exploring Unique Carrier distinct names

In [64]:
can_flight.map(lambda a:a[3]).distinct().collect()

[u'AA', u'DL', u'UA']

In [65]:
can_flight.toDF().describe

<bound method DataFrame.describe of DataFrame[_1: string, _2: string, _3: string, _4: string, _5: string, _6: string, _7: string, _8: string]>

##### Transforming categorical to integers

In [66]:
def preProc(x):
    dic={'AA':0,'DL':1,'UA':2}
    return map(lambda a:float(a),x[0:3]+x[4:]+[dic[x[3]]])

In [67]:
proc_data=can_flight.map(preProc)

In [68]:
proc_data.take(3)

[[1.0, 12.0, 814.0, 134.0, 0.0, 0.0, 679.0, 2.0],
 [1.0, 12.0, 830.0, 90.0, 0.0, 0.0, 214.0, 1.0],
 [1.0, 1.0, 1835.0, 213.0, 0.0, 0.0, 1605.0, 2.0]]

##### Transforming rows to LabeledPoints. LabeledPoints is a data structure in Spark which contains of (label, feature) pairs.

In [69]:
from pyspark.mllib.classification import LabeledPoint
proc_data=proc_data.map(lambda a:LabeledPoint(a[0],a[1:]))
proc_data.take(2)

[LabeledPoint(1.0, [12.0,814.0,134.0,0.0,0.0,679.0,2.0]),
 LabeledPoint(1.0, [12.0,830.0,90.0,0.0,0.0,214.0,1.0])]

##### Splitting the data into training, validation and testing

In [70]:
weights=[0.9,.05,.05]
seed=1992
train, validation, test = proc_data.randomSplit(weights,seed)

# Support Vector Classifier

In [71]:
from pyspark.mllib.classification import SVMWithSGD

In [43]:
svmFit=SVMWithSGD()
svmModel=svmFit.train(train)
svmLabelsAndPredsTrain = validation.map(lambda p: (p.label, svmModel.predict(p.features)))
#Overall True Prediction Rate
#print svmLabelsAndPredsTrain.filter(lambda (a,b):a==b).count()/float(validation.count())
from pyspark.mllib.evaluation import BinaryClassificationMetrics
testErr = svmLabelsAndPredsTrain.filter(lambda (v, p): v != p).count() / float(validation.count())
print('Validation Accuracy = ' + str(testErr))
print 'Accuracy of Canceled Predictions',svmLabelsAndPredsTrain.filter(lambda (a,b):a==1).filter(lambda (a,b):a==b).count()/float(svmLabelsAndPredsTrain.filter(lambda (a,b):a==1).count())
print 'Accuracy of Not Canceled Predictions',svmLabelsAndPredsTrain.filter(lambda (a,b):a==0).filter(lambda (a,b):a==b).count()/float(svmLabelsAndPredsTrain.filter(lambda (a,b):a==0).count())

0.842465753425
Validation Error = 0.157534246575
Accuracy of Canceled Predictions 0.0
Accuracy of Not Canceled Predictions 1.0


### Accuracy for prediction cancellations is 0, for SVC

# Random Forest

In [41]:
from pyspark.mllib.tree import RandomForest

##### Random Forest with tree size 10

In [42]:
rfFit=RandomForest
rfModel=rfFit.trainClassifier(train,numClasses=2,categoricalFeaturesInfo={},numTrees=10)
#svmLabelsAndPredsTrain = validation.map(lambda p: (p.label, rfModel.predict(p.features)))
#print svmLabelsAndPredsTrain.filter(lambda (a,b):a==b).count()/float(validation.count())
predictions = rfModel.predict(validation.map(lambda x: x.features))
labelsAndPredictions = validation.map(lambda lp: lp.label).zip(predictions)
testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() / float(validation.count())
print('Validation Error = ' + str(testErr))
print 'Accuracy of Canceled Predictions',labelsAndPredictions.filter(lambda (a,b):a==1).filter(lambda (a,b):a==b).count()/float(labelsAndPredictions.filter(lambda (a,b):a==1).count())
print 'Accuracy of Not Canceled Predictions',labelsAndPredictions.filter(lambda (a,b):a==0).filter(lambda (a,b):a==b).count()/float(labelsAndPredictions.filter(lambda (a,b):a==0).count())

Validation Error = 0.00684931506849
Accuracy of Canceled Predictions 1.0
Accuracy of Not Canceled Predictions 0.991869918699


##### Random Forest with tree size 50

In [44]:
rfFit=RandomForest
rfModel=rfFit.trainClassifier(train,numClasses=2,categoricalFeaturesInfo={},numTrees=50)
#svmLabelsAndPredsTrain = validation.map(lambda p: (p.label, rfModel.predict(p.features)))
#print svmLabelsAndPredsTrain.filter(lambda (a,b):a==b).count()/float(validation.count())
predictions = rfModel.predict(validation.map(lambda x: x.features))
labelsAndPredictions = validation.map(lambda lp: lp.label).zip(predictions)
testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() / float(validation.count())
print('Validation Error = ' + str(testErr))

print 'Accuracy of Canceled Predictions',labelsAndPredictions.filter(lambda (a,b):a==1).filter(lambda (a,b):a==b).count()/float(labelsAndPredictions.filter(lambda (a,b):a==1).count())
print 'Accuracy of Not Canceled Predictions',labelsAndPredictions.filter(lambda (a,b):a==0).filter(lambda (a,b):a==b).count()/float(labelsAndPredictions.filter(lambda (a,b):a==0).count())

Validation Error = 0.00342465753425
Accuracy of Canceled Predictions 1.0
Accuracy of Not Canceled Predictions 0.99593495935


##### Random Forest with tree size 200

In [72]:
rfFit=RandomForest
rfModel=rfFit.trainClassifier(train,numClasses=2,categoricalFeaturesInfo={},numTrees=200)
#svmLabelsAndPredsTrain = validation.map(lambda p: (p.label, rfModel.predict(p.features)))
#print svmLabelsAndPredsTrain.filter(lambda (a,b):a==b).count()/float(validation.count())
predictions = rfModel.predict(validation.map(lambda x: x.features))
labelsAndPredictions = validation.map(lambda lp: lp.label).zip(predictions)
testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() / float(validation.count())
print('Validation Error = ' + str(testErr))

print 'Accuracy of Canceled Predictions',labelsAndPredictions.filter(lambda (a,b):a==1).filter(lambda (a,b):a==b).count()/float(labelsAndPredictions.filter(lambda (a,b):a==1).count())
print 'Accuracy of Not Canceled Predictions',labelsAndPredictions.filter(lambda (a,b):a==0).filter(lambda (a,b):a==b).count()/float(labelsAndPredictions.filter(lambda (a,b):a==0).count())

Validation Error = 0.00342465753425
Accuracy of Canceled Predictions 1.0
Accuracy of Not Canceled Predictions 0.99593495935


##### From results of Random Forest, we can see that the accuracy achieved when tree size is 50 is greater than that when tree size is 10 and equal to the accuracy achieved when tree size is 200. Therefore, we perform our final prediction using Random Forest with 50 number of trees

# Final Prediction

In [None]:
rfFit=RandomForest
rfModel=rfFit.trainClassifier(train,numClasses=2,categoricalFeaturesInfo={},numTrees=50)
predictions = rfModel.predict(test.map(lambda x: x.features))
labelsAndPredictions = test.map(lambda lp: lp.label).zip(predictions)
testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() / float(test.count())

In [82]:
print('Final Testing Error = ' + str(testErr))
print 'Accuracy of Canceled Predictions',labelsAndPredictions.filter(lambda (a,b):a==1).filter(lambda (a,b):a==b).count()/float(labelsAndPredictions.filter(lambda (a,b):a==1).count())
print 'Accuracy of Not Canceled Predictions',labelsAndPredictions.filter(lambda (a,b):a==0).filter(lambda (a,b):a==b).count()/float(labelsAndPredictions.filter(lambda (a,b):a==0).count())

Final Testing Error = 0.0034965034965
Accuracy of Canceled Predictions 1.0
Accuracy of Not Canceled Predictions 0.995762711864


##### Misclassified Labels:

In [84]:
labelsAndPredictions.filter(lambda (a,b):a==0).filter(lambda (a,b):a!=b).collect()

[(0.0, 1.0)]

##### One label, which is not cancelled is predicted as cancelled

#### We get almost 100% testing accuracy with Random Forest.