# PSTAT 194 Final Project: Predicting Flight Delays
## Team: Walk in the Spark
* Andrew Zhang
* Wendy Gao
* Alex Wu
* Shon Inouye

## Initializing SQL Dataframe

In [4]:
import pandas as pd
import numpy as np

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import col
from pyspark.ml.feature import VectorAssembler 
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.sql.types import DoubleType
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder


sc = SparkContext.getOrCreate()
sqlCtx = SQLContext(sc)

### Flight Data

In [5]:
# Read in data from parquet file
flight_df = sqlCtx.read.parquet("/mnt/data/flight_df.parquet")

In [7]:
flight_df.show(4)

+-----+---+-----------+-------+--------------+-------------------+--------------+--------+-----------------+---------------+-----+
|MONTH|DAY|DAY_OF_WEEK|AIRLINE|ORIGIN_AIRPORT|SCHEDULED_DEPARTURE|SCHEDULED_TIME|DISTANCE|SCHEDULED_ARRIVAL|DEPARTURE_DELAY|DELAY|
+-----+---+-----------+-------+--------------+-------------------+--------------+--------+-----------------+---------------+-----+
|    4| 13|          1|      9|             6|               1327|         134.0|     867|             1641|          -10.0|    0|
|    1| 13|          2|      5|            49|               1235|          65.0|     223|             1340|            0.0|    0|
|    1| 11|          7|      9|            72|               1950|          74.0|     228|             2104|           13.0|    1|
|    9|  2|          3|      1|            10|               1835|         285.0|    1947|             2020|            8.0|    1|
+-----+---+-----------+-------+--------------+-------------------+--------------+--

## Prepare data for modeling

In [8]:
# Use OneHotEncoder to map categorical variables to binary vectors
cat_columns = ['MONTH','DAY','DAY_OF_WEEK','AIRLINE','ORIGIN_AIRPORT']
encoders = [OneHotEncoder(inputCol=column, outputCol=column+"_vec") for column in cat_columns]
pipelineOHE = Pipeline(stages=encoders)
flight_df2 = pipelineOHE.fit(flight_df).transform(flight_df)

In [9]:
flight_df2.show(2)

+-----+---+-----------+-------+--------------+-------------------+--------------+--------+-----------------+---------------+-----+--------------+---------------+---------------+--------------+------------------+
|MONTH|DAY|DAY_OF_WEEK|AIRLINE|ORIGIN_AIRPORT|SCHEDULED_DEPARTURE|SCHEDULED_TIME|DISTANCE|SCHEDULED_ARRIVAL|DEPARTURE_DELAY|DELAY|     MONTH_vec|        DAY_vec|DAY_OF_WEEK_vec|   AIRLINE_vec|ORIGIN_AIRPORT_vec|
+-----+---+-----------+-------+--------------+-------------------+--------------+--------+-----------------+---------------+-----+--------------+---------------+---------------+--------------+------------------+
|    4| 13|          1|      9|             6|               1327|         134.0|     867|             1641|          -10.0|    0|(12,[4],[1.0])|(31,[13],[1.0])|  (7,[1],[1.0])|(14,[9],[1.0])|   (323,[6],[1.0])|
|    1| 13|          2|      5|            49|               1235|          65.0|     223|             1340|            0.0|    0|(12,[1],[1.0])|(31,[13

In [10]:
# Create a vector of features
assembler = VectorAssembler(inputCols=['MONTH_vec', 'DAY_vec', 'DAY_OF_WEEK_vec','AIRLINE_vec',
                                       'SCHEDULED_DEPARTURE', 'ORIGIN_AIRPORT_vec', 'SCHEDULED_TIME', 'DISTANCE', 
                                       'SCHEDULED_ARRIVAL'], outputCol="features")

In [11]:
# Apply vector assembler to data
transformed = assembler.transform(flight_df2)

In [12]:
transformed.select(['DELAY', 'features']).show(5)

+-----+--------------------+
|DELAY|            features|
+-----+--------------------+
|    0|(391,[4,25,44,59,...|
|    0|(391,[1,25,45,55,...|
|    1|(391,[1,23,59,64,...|
|    1|(391,[9,14,46,51,...|
|    1|(391,[4,15,48,64,...|
+-----+--------------------+
only showing top 5 rows



In [13]:
# Convert to RDD
dataRDD = transformed.select(['DELAY','features']).rdd.map(tuple)

In [14]:
# Map label to binary values, then convert to LabeledPoint
lp = dataRDD.map(lambda row : (0 if row[0] == 0 else 1, Vectors.dense(row[1])))    \
            .map(lambda row : LabeledPoint(row[0], row[1]))

In [15]:
lp.take(2)

[LabeledPoint(0.0, [0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,1327.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0

### Split data approximately into training (80%) and test (20%)

In [16]:
# Split data
split = lp.randomSplit([0.8, 0.2], 314)
training = split[0]
test = split[1]

# Modeling

## Logistic Regression

In [19]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel

In [20]:
# Build model
LR_model = LogisticRegressionWithLBFGS.train(training)

In [21]:
# Evaluate model on training data
LR_LAPtrain = training.map(lambda lp: (float(LR_model.predict(lp.features)), lp.label))

In [22]:
# Print training accuracy
LR_accTrain = 1.0 * LR_LAPtrain.filter(lambda x:x[0] == x[1]).count()/training.count()
print(LR_accTrain)

0.620115374674


In [23]:
# Evaluate model on test data
LR_LAP = test.map(lambda lp: (float(LR_model.predict(lp.features)), lp.label))

In [24]:
# Print test accuracy
LR_acc = 1.0 * LR_LAP.filter(lambda x:x[0] == x[1]).count()/test.count()
print(LR_acc)

0.617390688116


## Random Forest

In [26]:
from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.mllib.util import MLUtils
from pyspark.ml.feature import StringIndexer
from pyspark.ml.classification import RandomForestClassifier

In [27]:
# Build model
RF_model = RandomForest.trainClassifier(training, numClasses = 2,
                                       categoricalFeaturesInfo = {}, 
                                       numTrees = 5, featureSubsetStrategy = "auto", 
                                       impurity = 'gini', maxDepth = 4, maxBins = 32)

In [28]:
# Evaluate model on training data
RF_predtrain = RF_model.predict(training.map(lambda x: x.features))
RF_LAPtrain = training.map(lambda lp: lp.label).zip(RF_predtrain)

In [30]:
# Print training accuracy
RF_trainAcc = RF_LAPtrain.filter(lambda x: x[0] == x[1]).count() / float(training.count())
print(RF_trainAcc)

0.528185727377


In [32]:
# Evaluate model on test data
RF_pred = RF_model.predict(test.map(lambda x: x.features))
RF_LAP = test.map(lambda lp: lp.label).zip(RF_pred)

In [34]:
# Print test accuracy
RF_testAcc = RF_LAP.filter(lambda x: x[0] == x[1]).count() / float(test.count())
print(RF_testAcc)

0.536012094583


## Decision Tree

In [37]:
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel

In [39]:
# Build model
DT_model = DecisionTree.trainClassifier(training, numClasses=2, categoricalFeaturesInfo={},
                                     impurity='gini', maxDepth=5, maxBins=32)

In [40]:
# Evaluate model on training data
DT_predtrain = DT_model.predict(training.map(lambda x: x.features))
DT_LAPtrain = training.map(lambda lp: lp.label).zip(DT_predtrain)

In [41]:
# Print training accuracy
DT_trainAcc = DT_LAPtrain.filter(lambda x: x[0] == x[1]).count() / float(training.count())
print(DT_trainAcc)

0.608178443029


In [43]:
# Evaluate model on test data
DT_pred = DT_model.predict(test.map(lambda x: x.features))
DT_LAP = test.map(lambda lp: lp.label).zip(DT_pred)

In [44]:
cDT_testAcc = DT_LAP.filter(lambda x: x[0] == x[1]).count() / float(test.count())
print(DT_testAcc)

0.603878770699


## Gradient Boosted Trees

In [53]:
from pyspark.mllib.tree import GradientBoostedTrees, GradientBoostedTreesModel

In [54]:
# Build model
GBT_model = GradientBoostedTrees.trainClassifier(training,
                                             categoricalFeaturesInfo={}, numIterations=5)

In [55]:
# Evaluate model on training data
GBT_predtrain = GBT_model.predict(training.map(lambda x: x.features))
GBT_LAPtrain = training.map(lambda lp: lp.label).zip(GBT_predtrain)

In [57]:
GBT_trainErr = GBT_LAPtrain.filter(
    lambda lp: lp[0] == lp[1]).count() / float(training.count())

In [58]:
# Print training accuracy
print('Training Accuracy = ' + str(GBT_trainErr))

Training Accuracy = 0.611186173837


In [59]:
# Evaluate model on test data
GBT_predtest = GBT_model.predict(test.map(lambda x: x.features))
GBT_LAPtest = test.map(lambda lp: lp.label).zip(GBT_predtest)

In [60]:
# Print test accuracy
GBT_testErr = GBT_LAPtest.filter(
    lambda lp: lp[0] == lp[1]).count() / float(test.count())
print('Test Accuracy = ' + str(GBT_testErr))

Test Accuracy = 0.608957550847


# K-Fold Cross Validation

In [61]:
# Prepare data for modeling
flight_cv = transformed.select(['DELAY', 'features'])
flight_cv = flight_cv.withColumnRenamed('DELAY', 'label')
flight_cv = flight_cv.select(flight_cv.label.cast(DoubleType()).alias('label'), 
                                 'features')
flight_cv.show(5)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(391,[4,25,44,59,...|
|  0.0|(391,[1,25,45,55,...|
|  1.0|(391,[1,23,59,64,...|
|  1.0|(391,[9,14,46,51,...|
|  1.0|(391,[4,15,48,64,...|
+-----+--------------------+
only showing top 5 rows



### Split data approximately into training (80%) and test (20%)

In [62]:
train_cv, test_cv = flight_cv.randomSplit([0.8, 0.2], 314)

## Logistic Regression

In [63]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [64]:
# Build model
lr_k = LogisticRegression()

In [65]:
# Create grid of parameters
grid_k = ParamGridBuilder().addGrid(lr_k.maxIter, [0, 1, 5, 10, 25]) \
                           .addGrid(lr_k.regParam, [0.1,0.01]) \
                           .addGrid(lr_k.fitIntercept, [False, True])\
                           .addGrid(lr_k.elasticNetParam, [0.0,0.3, 0.5,0.8, 1.0])\
                           .build()

In [66]:
evaluator_k = BinaryClassificationEvaluator()

In [67]:
cv_lr = CrossValidator(estimator = lr_k, estimatorParamMaps = grid_k, evaluator = evaluator_k)

In [68]:
# Run cross-validation
cvmodel_lr = cv_lr.fit(train_cv)

In [69]:
# Evaluate tuned model on training data
evaluator_k.evaluate(cvmodel_lr.transform(train_cv))

0.6659110102881041

In [70]:
# Evaluate tuned model on test data
evaluator_k.evaluate(cvmodel_lr.transform(test_cv))

0.6683368558266156

## Random Forest

In [74]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorIndexer, IndexToString

In [75]:
labelIndexer = StringIndexer(inputCol = "label", 
                             outputCol = "indexedLabel").fit(flight_cv)

In [76]:
featureIndexer = VectorIndexer(inputCol="features", 
                              outputCol="indexedFeatures", 
                              maxCategories=4).fit(flight_cv)

In [77]:
labelConverter = IndexToString(inputCol="prediction",
                               outputCol="predictedLabel", 
                               labels=labelIndexer.labels)

In [78]:
rf_k = RandomForestClassifier(labelCol = "indexedLabel", 
                              featuresCol = "indexedFeatures")

In [79]:
evaluator_rf = MulticlassClassificationEvaluator(labelCol="indexedLabel",
                                                 predictionCol="prediction",
                                                metricName="accuracy")
numFolds = 5

In [80]:
# Create grid of parameters
grid_k_rf = ParamGridBuilder().addGrid(rf_k.numTrees, [5,10,25])\
                           .addGrid(rf_k.maxDepth, [3, 5,10,15])\
                           .addGrid(rf_k.maxBins, [5, 10, 20, 30])\
                           .build()

In [81]:
# Create pipeline of transformers and estimators
pipeline_rf = Pipeline(stages=[labelIndexer, 
                               featureIndexer,
                               rf_k,
                               labelConverter])

In [82]:
# Treat pipeline as estimator in a CrossValidator instance.
cv_rf = CrossValidator(estimator = pipeline_rf, 
                       estimatorParamMaps = grid_k_rf, 
                       evaluator = evaluator_rf, 
                       numFolds = numFolds)

In [83]:
# Run cross-validation
cvmodel_rf = cv_rf.fit(train_cv)

In [85]:
# Evaluate tuned model on training data
predictions_rf_train = cvmodel_rf.transform(train_cv)
evaluator_rf.evaluate(predictions_rf_train)

0.6441231233354545

In [86]:
# Evaluate tuned model on test data
predictions_rf = cvmodel_rf.transform(test_cv)

In [87]:
predictions_rf.select("predictedLabel", "label", "features").show(5)

+--------------+-----+--------------------+
|predictedLabel|label|            features|
+--------------+-----+--------------------+
|           0.0|  0.0|(391,[1,13,47,51,...|
|           0.0|  0.0|(391,[1,13,47,52,...|
|           0.0|  0.0|(391,[1,13,47,54,...|
|           1.0|  0.0|(391,[1,13,47,55,...|
|           0.0|  0.0|(391,[1,13,47,58,...|
+--------------+-----+--------------------+
only showing top 5 rows



In [88]:
evaluator_rf.evaluate(predictions_rf)

0.6260473800910258

## Decision Tree

In [93]:
from pyspark.ml.classification import DecisionTreeClassifier

In [94]:
# Build model
dt_k = DecisionTreeClassifier(labelCol = "indexedLabel",
                              featuresCol = "indexedFeatures")

In [95]:
# Create grid of parameters
grid_k_dt = ParamGridBuilder().addGrid(dt_k.maxDepth, [3, 5,10,15])\
                           .addGrid(dt_k.maxBins, [5, 10, 20, 30, 35])\
                           .build()

In [96]:
# Create pipeline of transformers and estimators
pipeline_dt = Pipeline(stages=[labelIndexer, 
                               featureIndexer, 
                               dt_k])

In [97]:
# Treat pipeline as estimator in a CrossValidator instance.
cv_dt = CrossValidator(estimator = pipeline_dt, 
                       estimatorParamMaps = grid_k_dt, 
                       evaluator = evaluator_rf, 
                       numFolds = numFolds)

In [98]:
# Run cross-validation
cvmodel_dt = cv_dt.fit(train_cv)

In [None]:
# Evaluate tuned model on training data
predictions_dt_train = cvmodel_dt.transform(train_cv)
evaluator_rf.evaluate(predictions_dt_train)

In [99]:
# Evaluate tuned model on test data
predictions_dt = cvmodel_dt.transform(test_cv)

In [100]:
predictions_dt.select("prediction", "indexedLabel", "features").show(5)

+----------+------------+--------------------+
|prediction|indexedLabel|            features|
+----------+------------+--------------------+
|       0.0|         0.0|(391,[1,13,47,51,...|
|       0.0|         0.0|(391,[1,13,47,52,...|
|       1.0|         0.0|(391,[1,13,47,54,...|
|       1.0|         0.0|(391,[1,13,47,55,...|
|       0.0|         0.0|(391,[1,13,47,58,...|
+----------+------------+--------------------+
only showing top 5 rows



In [101]:
evaluator_rf.evaluate(predictions_dt)

0.6143307270393278

## Gradient Boosted Trees

In [103]:
from pyspark.ml.classification import GBTClassifier

In [104]:
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(flight_cv)


In [105]:
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(flight_cv)

In [106]:
evaluator_gbt = MulticlassClassificationEvaluator(labelCol="indexedLabel", 
                                                  predictionCol="prediction", 
                                                  metricName="accuracy")
numFolds = 5

In [107]:
# Build model
gbt_k = GBTClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxIter=10)

In [108]:
# Create pipeline of transformers and estimators
pipeline_gbt = Pipeline(stages=[labelIndexer, 
                                featureIndexer, 
                                gbt_k])


In [109]:
# Create grid of parameters
grid_k_gbt = ParamGridBuilder().addGrid(gbt_k.maxIter, [5,10,20])\
                           .addGrid(gbt_k.maxDepth, [3,5,10,15])\
                           .addGrid(gbt_k.maxBins, [5, 10, 20])\
                           .build()

In [110]:
# Treat pipeline as estimator in a CrossValidator instance.
cv_gbt = CrossValidator(estimator = pipeline_gbt, 
                       estimatorParamMaps = grid_k_gbt, 
                       evaluator = evaluator_gbt, 
                       numFolds = numFolds)

In [111]:
# Run cross-validation
cvmodel_gbt = cv_gbt.fit(train_cv)

In [112]:
# Evaluate tuned model on training data
predictions_gbt_train = cvmodel_gbt.transform(train_cv)

accuracy_gbt_train = evaluator_gbt.evaluate(predictions_gbt_train)
print("Training Accuracy = %g" % (accuracy_gbt_train))


Training Accuracy = 0.670167


In [113]:
# Evaluate tuned model on test data
predictions_gbt_test = cvmodel_gbt.transform(test_cv)

accuracy_gbt_test = evaluator_gbt.evaluate(predictions_gbt_test)
print("Test Accuracy = %g" % (accuracy_gbt_test))

Training Accuracy = 0.630412


## Model Results

In [21]:
results = pd.DataFrame(data={'Logistic Regression': [0.620115374674,0.617390688116],
                             'Random Forests': [0.528185727377,0.536012094583],
                             'Decision Trees': [0.608178443029,0.603878770699],
                             'Gradient Boosted Trees': [0.611186173837,0.608957550847]},
                       index={'Training Accuracy',
                              'Test Accuracy'})
results

Unnamed: 0,Decision Trees,Gradient Boosted Trees,Logistic Regression,Random Forests
Training Accuracy,0.608178,0.611186,0.620115,0.528186
Test Accuracy,0.603879,0.608958,0.617391,0.536012


## K-Fold Cross Validation Model Results

In [23]:
results_kfold = pd.DataFrame(data={'Logistic Regression': [0.6659110102881041,0.6683368558266156],
                             'Random Forests': [0.6441231233354545,0.6260473800910258],
                             'Decision Trees': [0.6265408093520941,0.6143307270393278],
                             'Gradient Boosted Trees': [0.670167,0.630412]},
                       index={'Training Accuracy',
                              'Test Accuracy'})
results_kfold

Unnamed: 0,Decision Trees,Gradient Boosted Trees,Logistic Regression,Random Forests
Training Accuracy,0.626541,0.670167,0.665911,0.644123
Test Accuracy,0.614331,0.630412,0.668337,0.626047


### Our best overall model is Cross-Validated Logistic Regression with an accuracy of 66.83%.