# Airline Data Analysis

Authors: Jose Rodrigo Flores Espinosa & Marta Napa

The project aims to analyze the US domestic flight dataset using PySpark Dataframes and predict which flight/flight carrier is most likely to be canceled or delayed.

## Load data

The dataset is taken from Kaggle and consists of 7 CSV files containing information about
airlines, delay information, location details (origins and destinations), and cancellation (reasons
labeled as cancellation codes) over the period between 2009 to 2015 [1]. 

In [None]:
%pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
import numpy as np
import pandas as pd
# SparkContext represents the connection to a Spark cluster
from pyspark.context import SparkContext
# Configuration for a Spark application
from pyspark.conf import SparkConf
# The entry point to programming Spark with the Dataset and DataFrame API
from pyspark.sql.session import SparkSession

In [None]:
conf = SparkConf().setAppName("project3")
sc = SparkContext.getOrCreate()
sqlContext = SparkSession.builder.getOrCreate()

In [None]:
# Load the 2009 CSV file and combine the dataset (2009 to 2015 data)
path = ["2009.csv", "2010.csv", "2011.csv", "2012.csv", "2013.csv", "2014.csv", "2015.csv", "2016.csv", "2017.csv", "2018.csv"]

In [None]:
# Loading the data (creating the main dataframe)
df = sqlContext.read.option("inferSchema",True).format("csv").option("header", "true").load(path)
display(df)

DataFrame[FL_DATE: string, OP_CARRIER: string, OP_CARRIER_FL_NUM: int, ORIGIN: string, DEST: string, CRS_DEP_TIME: double, DEP_TIME: double, DEP_DELAY: double, TAXI_OUT: double, WHEELS_OFF: double, WHEELS_ON: double, TAXI_IN: double, CRS_ARR_TIME: double, ARR_TIME: double, ARR_DELAY: double, CANCELLED: double, CANCELLATION_CODE: string, DIVERTED: double, CRS_ELAPSED_TIME: double, ACTUAL_ELAPSED_TIME: double, AIR_TIME: double, DISTANCE: double, CARRIER_DELAY: double, WEATHER_DELAY: double, NAS_DELAY: double, SECURITY_DELAY: double, LATE_AIRCRAFT_DELAY: double, Unnamed: 27: string]

In [None]:
df.show(3)

+----------+----------+-----------------+------+----+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+--------+----------------+-------------------+--------+--------+-------------+-------------+---------+--------------+-------------------+-----------+
|   FL_DATE|OP_CARRIER|OP_CARRIER_FL_NUM|ORIGIN|DEST|CRS_DEP_TIME|DEP_TIME|DEP_DELAY|TAXI_OUT|WHEELS_OFF|WHEELS_ON|TAXI_IN|CRS_ARR_TIME|ARR_TIME|ARR_DELAY|CANCELLED|CANCELLATION_CODE|DIVERTED|CRS_ELAPSED_TIME|ACTUAL_ELAPSED_TIME|AIR_TIME|DISTANCE|CARRIER_DELAY|WEATHER_DELAY|NAS_DELAY|SECURITY_DELAY|LATE_AIRCRAFT_DELAY|Unnamed: 27|
+----------+----------+-----------------+------+----+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+--------+----------------+-------------------+--------+--------+-------------+-------------+---------+--------------+-------------------+-----------+
|

## Data preprocessing

In [None]:
# get column names
df.columns

['FL_DATE',
 'OP_CARRIER',
 'OP_CARRIER_FL_NUM',
 'ORIGIN',
 'DEST',
 'CRS_DEP_TIME',
 'DEP_TIME',
 'DEP_DELAY',
 'TAXI_OUT',
 'WHEELS_OFF',
 'WHEELS_ON',
 'TAXI_IN',
 'CRS_ARR_TIME',
 'ARR_TIME',
 'ARR_DELAY',
 'CANCELLED',
 'CANCELLATION_CODE',
 'DIVERTED',
 'CRS_ELAPSED_TIME',
 'ACTUAL_ELAPSED_TIME',
 'AIR_TIME',
 'DISTANCE',
 'CARRIER_DELAY',
 'WEATHER_DELAY',
 'NAS_DELAY',
 'SECURITY_DELAY',
 'LATE_AIRCRAFT_DELAY',
 'Unnamed: 27']

In [None]:
# drop unnamed columns
df = df.drop('Unnamed: 27')

In [None]:
df.show()

+----------+----------+-----------------+------+----+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+--------+----------------+-------------------+--------+--------+-------------+-------------+---------+--------------+-------------------+
|   FL_DATE|OP_CARRIER|OP_CARRIER_FL_NUM|ORIGIN|DEST|CRS_DEP_TIME|DEP_TIME|DEP_DELAY|TAXI_OUT|WHEELS_OFF|WHEELS_ON|TAXI_IN|CRS_ARR_TIME|ARR_TIME|ARR_DELAY|CANCELLED|CANCELLATION_CODE|DIVERTED|CRS_ELAPSED_TIME|ACTUAL_ELAPSED_TIME|AIR_TIME|DISTANCE|CARRIER_DELAY|WEATHER_DELAY|NAS_DELAY|SECURITY_DELAY|LATE_AIRCRAFT_DELAY|
+----------+----------+-----------------+------+----+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+--------+----------------+-------------------+--------+--------+-------------+-------------+---------+--------------+-------------------+
|2009-01-01|        XE|             1

In [None]:
df.toPandas().info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 243440 entries, 0 to 243439
Data columns (total 27 columns):
 #   Column               Non-Null Count   Dtype  
---  ------               --------------   -----  
 0   FL_DATE              243440 non-null  object 
 1   OP_CARRIER           243438 non-null  object 
 2   OP_CARRIER_FL_NUM    243438 non-null  float64
 3   ORIGIN               243437 non-null  object 
 4   DEST                 243437 non-null  object 
 5   CRS_DEP_TIME         243437 non-null  float64
 6   DEP_TIME             239149 non-null  float64
 7   DEP_DELAY            239132 non-null  float64
 8   TAXI_OUT             239039 non-null  float64
 9   WHEELS_OFF           239039 non-null  float64
 10  WHEELS_ON            238815 non-null  float64
 11  TAXI_IN              238813 non-null  float64
 12  CRS_ARR_TIME         243432 non-null  float64
 13  ARR_TIME             238813 non-null  float64
 14  ARR_DELAY            238425 non-null  float64
 15  CANCELLED        

When dealing with null values, we discovered that df.dropna(how='any') cannot be used, as it would result in an empty dataset (flights that weren't cancelled have CANCELLATION_CODE set as null, whereas cancelled flights have nulls for flight information). 

Therefore, we decided:
1. For flights that weren't cancelled the cancellation code will be set as 'NOT CANCELLED'
3. For not cancelled flights the missing numeric data about the flight will be filled with each column's mean value
4. For cancelled flights the missing numeric data about the flight will be filled with 0s

In [None]:
# how many cancelled flights in dataset
df.where(df.CANCELLED == 1).count()

4448

In [None]:
# how many not cancelled flights in dataset
df.where(df.CANCELLED == 0).count()

238982

In [None]:
df.filter("CANCELLATION_CODE is NULL").show(5) 

+----------+----------+-----------------+------+----+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+--------+----------------+-------------------+--------+--------+-------------+-------------+---------+--------------+-------------------+
|   FL_DATE|OP_CARRIER|OP_CARRIER_FL_NUM|ORIGIN|DEST|CRS_DEP_TIME|DEP_TIME|DEP_DELAY|TAXI_OUT|WHEELS_OFF|WHEELS_ON|TAXI_IN|CRS_ARR_TIME|ARR_TIME|ARR_DELAY|CANCELLED|CANCELLATION_CODE|DIVERTED|CRS_ELAPSED_TIME|ACTUAL_ELAPSED_TIME|AIR_TIME|DISTANCE|CARRIER_DELAY|WEATHER_DELAY|NAS_DELAY|SECURITY_DELAY|LATE_AIRCRAFT_DELAY|
+----------+----------+-----------------+------+----+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+--------+----------------+-------------------+--------+--------+-------------+-------------+---------+--------------+-------------------+
|2009-01-01|        XE|             1

In [None]:
# creating a separate df for cancellations (needed later to understand the distribution of cancellation codes)
df_cancellations = df.where(df.CANCELLATION_CODE!='null')

In [None]:
# see if there are cancelled flights without cancellation code
df_cancellations.where(df.CANCELLED == 1.0).select('CANCELLATION_CODE').distinct().collect()

[Row(CANCELLATION_CODE='B'),
 Row(CANCELLATION_CODE='C'),
 Row(CANCELLATION_CODE='A')]

In [None]:
df_cancellations.toPandas().info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4448 entries, 0 to 4447
Data columns (total 27 columns):
 #   Column               Non-Null Count  Dtype  
---  ------               --------------  -----  
 0   FL_DATE              4448 non-null   object 
 1   OP_CARRIER           4448 non-null   object 
 2   OP_CARRIER_FL_NUM    4448 non-null   int32  
 3   ORIGIN               4448 non-null   object 
 4   DEST                 4448 non-null   object 
 5   CRS_DEP_TIME         4448 non-null   float64
 6   DEP_TIME             160 non-null    float64
 7   DEP_DELAY            159 non-null    float64
 8   TAXI_OUT             52 non-null     float64
 9   WHEELS_OFF           52 non-null     float64
 10  WHEELS_ON            0 non-null      float64
 11  TAXI_IN              0 non-null      float64
 12  CRS_ARR_TIME         4448 non-null   float64
 13  ARR_TIME             0 non-null      float64
 14  ARR_DELAY            0 non-null      float64
 15  CANCELLED            4448 non-null   f

In [None]:
# drop columns that won't be needed for predictions (columns that have only nulls as values for cancelled flights)
df = df.drop('WHEELS_ON', 'TAXI_IN', 'ARR_TIME', 'ARR_DELAY', 'ACTUAL_ELAPSED_TIME', 'AIR_TIME', 'CARRIER_DELAY', 'WEATHER_DELAY', 'NAS_DELAY', 'SECURITY_DELAY', 'LATE_AIRCRAFT_DELAY')

In [None]:
df.toPandas().info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 243440 entries, 0 to 243439
Data columns (total 16 columns):
 #   Column             Non-Null Count   Dtype  
---  ------             --------------   -----  
 0   FL_DATE            243440 non-null  object 
 1   OP_CARRIER         243438 non-null  object 
 2   OP_CARRIER_FL_NUM  243438 non-null  float64
 3   ORIGIN             243437 non-null  object 
 4   DEST               243437 non-null  object 
 5   CRS_DEP_TIME       243437 non-null  float64
 6   DEP_TIME           239149 non-null  float64
 7   DEP_DELAY          239132 non-null  float64
 8   TAXI_OUT           239039 non-null  float64
 9   WHEELS_OFF         239039 non-null  float64
 10  CRS_ARR_TIME       243432 non-null  float64
 11  CANCELLED          243430 non-null  float64
 12  CANCELLATION_CODE  4448 non-null    object 
 13  DIVERTED           243430 non-null  float64
 14  CRS_ELAPSED_TIME   243430 non-null  float64
 15  DISTANCE           243430 non-null  float64
dtypes:

In [None]:
df.filter("DEP_DELAY is NULL").show(5) 
# here we can see that some cancelled flights didn't have DEP_TIME, DEP_DELAY, TAXI_OUT and WHEELS_OFF marked, as the flight didn't take off
# these empty values will be replaced by 0s later

+----------+----------+-----------------+------+----+------------+--------+---------+--------+----------+------------+---------+-----------------+--------+----------------+--------+
|   FL_DATE|OP_CARRIER|OP_CARRIER_FL_NUM|ORIGIN|DEST|CRS_DEP_TIME|DEP_TIME|DEP_DELAY|TAXI_OUT|WHEELS_OFF|CRS_ARR_TIME|CANCELLED|CANCELLATION_CODE|DIVERTED|CRS_ELAPSED_TIME|DISTANCE|
+----------+----------+-----------------+------+----+------------+--------+---------+--------+----------+------------+---------+-----------------+--------+----------------+--------+
|2009-01-01|        YV|             7104|   DEN| CPR|      2150.0|    null|     null|    null|      null|      2312.0|      1.0|                A|     0.0|            82.0|   230.0|
|2009-01-01|        YV|             7329|   DTW| ORD|      1857.0|    null|     null|    null|      null|      1914.0|      1.0|                A|     0.0|            77.0|   235.0|
|2009-01-01|        YV|             7065|   EWR| IAD|      1013.0|    null|     null|    n

There are no cancelled flights without a cancellation code. Meaning, that all flights that were not cancelled, can be marked as 'NOT CANCELLED'

In [None]:
# replacing null values in column CANCELLATION_CODE
df = df.na.fill("NOT CANCELLED", "CANCELLATION_CODE")

In [None]:
df.select('CANCELLATION_CODE').distinct().collect()

[Row(CANCELLATION_CODE='B'),
 Row(CANCELLATION_CODE='C'),
 Row(CANCELLATION_CODE='A'),
 Row(CANCELLATION_CODE='NOT CANCELLED')]

In [None]:
df.toPandas().info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 243440 entries, 0 to 243439
Data columns (total 16 columns):
 #   Column             Non-Null Count   Dtype  
---  ------             --------------   -----  
 0   FL_DATE            243440 non-null  object 
 1   OP_CARRIER         243438 non-null  object 
 2   OP_CARRIER_FL_NUM  243438 non-null  float64
 3   ORIGIN             243437 non-null  object 
 4   DEST               243437 non-null  object 
 5   CRS_DEP_TIME       243437 non-null  float64
 6   DEP_TIME           239149 non-null  float64
 7   DEP_DELAY          239132 non-null  float64
 8   TAXI_OUT           239039 non-null  float64
 9   WHEELS_OFF         239039 non-null  float64
 10  CRS_ARR_TIME       243432 non-null  float64
 11  CANCELLED          243430 non-null  float64
 12  CANCELLATION_CODE  243440 non-null  object 
 13  DIVERTED           243430 non-null  float64
 14  CRS_ELAPSED_TIME   243430 non-null  float64
 15  DISTANCE           243430 non-null  float64
dtypes:

In [None]:
# replace missing numeric values with 0s
df = df.fillna(0, ['OP_CARRIER_FL_NUM', 'CRS_DEP_TIME', 'DEP_TIME', 'DEP_DELAY', 'TAXI_OUT', 'WHEELS_OFF', 'CRS_ARR_TIME', 'CANCELLED', 'DIVERTED', 'CRS_ELAPSED_TIME', 'DISTANCE'])

In [None]:
df.toPandas().info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 243440 entries, 0 to 243439
Data columns (total 16 columns):
 #   Column             Non-Null Count   Dtype  
---  ------             --------------   -----  
 0   FL_DATE            243440 non-null  object 
 1   OP_CARRIER         243438 non-null  object 
 2   OP_CARRIER_FL_NUM  243440 non-null  int32  
 3   ORIGIN             243437 non-null  object 
 4   DEST               243437 non-null  object 
 5   CRS_DEP_TIME       243440 non-null  float64
 6   DEP_TIME           243440 non-null  float64
 7   DEP_DELAY          243440 non-null  float64
 8   TAXI_OUT           243440 non-null  float64
 9   WHEELS_OFF         243440 non-null  float64
 10  CRS_ARR_TIME       243440 non-null  float64
 11  CANCELLED          243440 non-null  float64
 12  CANCELLATION_CODE  243440 non-null  object 
 13  DIVERTED           243440 non-null  float64
 14  CRS_ELAPSED_TIME   243440 non-null  float64
 15  DISTANCE           243440 non-null  float64
dtypes:

In [None]:
# drop other null values, as there are very few of them
df = df.dropna(how='any')

In [None]:
df.toPandas().info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 243437 entries, 0 to 243436
Data columns (total 16 columns):
 #   Column             Non-Null Count   Dtype  
---  ------             --------------   -----  
 0   FL_DATE            243437 non-null  object 
 1   OP_CARRIER         243437 non-null  object 
 2   OP_CARRIER_FL_NUM  243437 non-null  int32  
 3   ORIGIN             243437 non-null  object 
 4   DEST               243437 non-null  object 
 5   CRS_DEP_TIME       243437 non-null  float64
 6   DEP_TIME           243437 non-null  float64
 7   DEP_DELAY          243437 non-null  float64
 8   TAXI_OUT           243437 non-null  float64
 9   WHEELS_OFF         243437 non-null  float64
 10  CRS_ARR_TIME       243437 non-null  float64
 11  CANCELLED          243437 non-null  float64
 12  CANCELLATION_CODE  243437 non-null  object 
 13  DIVERTED           243437 non-null  float64
 14  CRS_ELAPSED_TIME   243437 non-null  float64
 15  DISTANCE           243437 non-null  float64
dtypes:

In [None]:
df.where(df.CANCELLED == 1).count() # how many cancelled flights in dataset

4448

In [None]:
df.where(df.CANCELLED == 0).count() # how many not cancelled flights in dataset

238989

## Data analysis

### Find the top 10 airlines with the most flight operations from 2009 to 2015

In [None]:
from pyspark.sql.functions import col
from pyspark.sql.functions import *
from pyspark.sql.types import TimestampType
# filter initial dataset, so it would contain only rows from 2009 to 2015
dates = ("2009-01-01",  "2015-12-31")
date_from, date_to = [to_date(lit(s)).cast(TimestampType()) for s in dates]
df_filtered = df.where((df.FL_DATE > date_from) & (df.FL_DATE < date_to))

# selecting top 10 airlines with most rows from the filtered dataset
df_filtered.groupby('OP_CARRIER').count().sort(col("count").desc()).show(10)

+----------+-----+
|OP_CARRIER|count|
+----------+-----+
|        DL|22405|
|        WN|22301|
|        AA|18960|
|        OO|13874|
|        EV|13421|
|        MQ|11987|
|        UA|10178|
|        B6| 8751|
|        US| 7833|
|        AS| 5000|
+----------+-----+
only showing top 10 rows



### Visualize the proportion for the total flight cancellation reasons across 2009 to 2015

In [None]:
from pyspark.sql.functions import year
# using the dataframe with cancelled flights that was saved in the data preprcessing step
df_cancellations_filtered = df_cancellations.where((df_cancellations.FL_DATE > date_from) & (df_cancellations.FL_DATE < date_to))
df_cancellations_filtered = df_cancellations_filtered.groupby(year('FL_DATE'), 'CANCELLATION_CODE').count()
df_cancellations_filtered.sort(asc('year(FL_DATE)')).show()
# A - Airline/Carrier
# B - Weather
# C - National Air System
# D - Security

+-------------+-----------------+-----+
|year(FL_DATE)|CANCELLATION_CODE|count|
+-------------+-----------------+-----+
|         2009|                A|   56|
|         2009|                C|    6|
|         2009|                B|   11|
|         2010|                C|   13|
|         2010|                B|   82|
|         2010|                A|  311|
|         2011|                B|   83|
|         2011|                A|  217|
|         2011|                C|   30|
|         2012|                C|    1|
|         2012|                A|  199|
|         2012|                B|   78|
|         2013|                A|  142|
|         2013|                C|   25|
|         2013|                B|   14|
|         2014|                A|  383|
|         2014|                C|  173|
|         2014|                B| 1048|
|         2015|                B|  428|
|         2015|                C|   74|
+-------------+-----------------+-----+
only showing top 20 rows



## Model prediction

### StringIndexer & OneHotEncoder

In [None]:
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import OneHotEncoder, VectorAssembler, StringIndexer
from pyspark.ml import Pipeline

In [None]:
CATE_FEATURES = ['FL_DATE','OP_CARRIER', 'ORIGIN', 'DEST']
stages = []
for categoricalCol in CATE_FEATURES:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

In [None]:
label_stringIdx =  StringIndexer(inputCol="CANCELLED", outputCol="label")
stages += [label_stringIdx]

### VectorAssembler

In [None]:
# Transform all features into a vector using VectorAssembler
numericCols = ["OP_CARRIER_FL_NUM", "CRS_DEP_TIME", "DEP_TIME", "DEP_DELAY", "TAXI_OUT", "WHEELS_OFF", "CRS_ARR_TIME", "DIVERTED", "CRS_ELAPSED_TIME", "DISTANCE"]
assemblerInputs = [c + "classVec" for c in CATE_FEATURES] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [None]:
# Creating a pipeline
pipeline = Pipeline().setStages(stages)
pipelineModel = pipeline.fit(df)
PreparedData = pipelineModel.transform(df)

In [None]:
PreparedData.show(n=1, truncate=False, vertical=True)

-RECORD 0-----------------------------------------------------------------------------------------------------------------------------------------------
 FL_DATE            | 2009-01-01                                                                                                                        
 OP_CARRIER         | XE                                                                                                                                
 OP_CARRIER_FL_NUM  | 1204                                                                                                                              
 ORIGIN             | DCA                                                                                                                               
 DEST               | EWR                                                                                                                               
 CRS_DEP_TIME       | 1100.0                                                      

### Splitting the data into 70/30 test and train ratio

In [None]:
(trainingData, testData) = PreparedData.randomSplit([0.7, 0.3], seed=100)
print(trainingData.count())
print(testData.count())

170287
73150


### Logistic regression

In [None]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10, regParam=0.1)
lr_model = lr.fit(trainingData)
pred = lr_model.transform(testData)
pred.show(3)

+----------+----------+-----------------+------+----+------------+--------+---------+--------+----------+------------+---------+-----------------+--------+----------------+--------+------------+---------------+---------------+------------------+-----------+----------------+---------+-----------------+-----+--------------------+--------------------+--------------------+----------+
|   FL_DATE|OP_CARRIER|OP_CARRIER_FL_NUM|ORIGIN|DEST|CRS_DEP_TIME|DEP_TIME|DEP_DELAY|TAXI_OUT|WHEELS_OFF|CRS_ARR_TIME|CANCELLED|CANCELLATION_CODE|DIVERTED|CRS_ELAPSED_TIME|DISTANCE|FL_DATEIndex|FL_DATEclassVec|OP_CARRIERIndex|OP_CARRIERclassVec|ORIGINIndex|  ORIGINclassVec|DESTIndex|     DESTclassVec|label|            features|       rawPrediction|         probability|prediction|
+----------+----------+-----------------+------+----+------------+--------+---------+--------+----------+------------+---------+-----------------+--------+----------------+--------+------------+---------------+---------------+--------

In [None]:
from sklearn.metrics import confusion_matrix

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
acc = evaluator.evaluate(pred)

print("Prediction Accuracy: ", acc)
 
y_pred=pred.select("prediction").collect()
y_orig=pred.select("label").collect()

cm = confusion_matrix(y_orig, y_pred)
print("Confusion Matrix:")
print(cm)

Prediction Accuracy:  0.9722333529183536
Confusion Matrix:
[[71790     0]
 [ 1359     1]]


### Decision tree classifier

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

dt = DecisionTreeClassifier(featuresCol="features", labelCol="label")
dt_model = dt.fit(trainingData)
pred = dt_model.transform(testData)
pred.show(3)

+----------+----------+-----------------+------+----+------------+--------+---------+--------+----------+------------+---------+-----------------+--------+----------------+--------+------------+---------------+---------------+------------------+-----------+----------------+---------+-----------------+-----+--------------------+---------------+--------------------+----------+
|   FL_DATE|OP_CARRIER|OP_CARRIER_FL_NUM|ORIGIN|DEST|CRS_DEP_TIME|DEP_TIME|DEP_DELAY|TAXI_OUT|WHEELS_OFF|CRS_ARR_TIME|CANCELLED|CANCELLATION_CODE|DIVERTED|CRS_ELAPSED_TIME|DISTANCE|FL_DATEIndex|FL_DATEclassVec|OP_CARRIERIndex|OP_CARRIERclassVec|ORIGINIndex|  ORIGINclassVec|DESTIndex|     DESTclassVec|label|            features|  rawPrediction|         probability|prediction|
+----------+----------+-----------------+------+----+------------+--------+---------+--------+----------+------------+---------+-----------------+--------+----------------+--------+------------+---------------+---------------+------------------

In [None]:
acc = evaluator.evaluate(pred)

print("Prediction Accuracy: ", acc)
 
y_pred=pred.select("prediction").collect()
y_orig=pred.select("label").collect()

cm = confusion_matrix(y_orig, y_pred)
print("Confusion Matrix:")
print(cm)

Prediction Accuracy:  0.999753215972325
Confusion Matrix:
[[71789     1]
 [   17  1343]]


### Random forest

In [None]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label')
rf_model = rf.fit(trainingData)
pred = rf_model.transform(testData)
pred.show(3)

+----------+----------+-----------------+------+----+------------+--------+---------+--------+----------+------------+---------+-----------------+--------+----------------+--------+------------+---------------+---------------+------------------+-----------+----------------+---------+-----------------+-----+--------------------+--------------------+--------------------+----------+
|   FL_DATE|OP_CARRIER|OP_CARRIER_FL_NUM|ORIGIN|DEST|CRS_DEP_TIME|DEP_TIME|DEP_DELAY|TAXI_OUT|WHEELS_OFF|CRS_ARR_TIME|CANCELLED|CANCELLATION_CODE|DIVERTED|CRS_ELAPSED_TIME|DISTANCE|FL_DATEIndex|FL_DATEclassVec|OP_CARRIERIndex|OP_CARRIERclassVec|ORIGINIndex|  ORIGINclassVec|DESTIndex|     DESTclassVec|label|            features|       rawPrediction|         probability|prediction|
+----------+----------+-----------------+------+----+------------+--------+---------+--------+----------+------------+---------+-----------------+--------+----------------+--------+------------+---------------+---------------+--------

In [None]:
acc = evaluator.evaluate(pred)

print("Prediction Accuracy: ", acc)
 
y_pred=pred.select("prediction").collect()
y_orig=pred.select("label").collect()

cm = confusion_matrix(y_orig, y_pred)
print("Confusion Matrix:")
print(cm)

Prediction Accuracy:  0.9740456434651485
Confusion Matrix:
[[71790     0]
 [ 1304    56]]


### Gradient boosted trees

In [None]:
from pyspark.ml.classification import GBTClassifier

gbt = GBTClassifier(labelCol="label", featuresCol="features", maxIter=10)
  
gbt_model = gbt.fit(trainingData)
pred = gbt_model.transform(trainingData)
pred.show(3)

+----------+----------+-----------------+------+----+------------+--------+---------+--------+----------+------------+---------+-----------------+--------+----------------+--------+------------+---------------+---------------+------------------+-----------+----------------+---------+----------------+-----+--------------------+--------------------+--------------------+----------+
|   FL_DATE|OP_CARRIER|OP_CARRIER_FL_NUM|ORIGIN|DEST|CRS_DEP_TIME|DEP_TIME|DEP_DELAY|TAXI_OUT|WHEELS_OFF|CRS_ARR_TIME|CANCELLED|CANCELLATION_CODE|DIVERTED|CRS_ELAPSED_TIME|DISTANCE|FL_DATEIndex|FL_DATEclassVec|OP_CARRIERIndex|OP_CARRIERclassVec|ORIGINIndex|  ORIGINclassVec|DESTIndex|    DESTclassVec|label|            features|       rawPrediction|         probability|prediction|
+----------+----------+-----------------+------+----+------------+--------+---------+--------+----------+------------+---------+-----------------+--------+----------------+--------+------------+---------------+---------------+----------

In [None]:
acc = evaluator.evaluate(pred)

print("Prediction Accuracy: ", acc)
 
y_pred=pred.select("prediction").collect()
y_orig=pred.select("label").collect()

cm = confusion_matrix(y_orig, y_pred)
print("Confusion Matrix:")
print(cm)

Prediction Accuracy:  0.9997763127024091
Confusion Matrix:
[[167195      4]
 [    34   3054]]


### Comparing accuracy

From the results, we can tell that the accuracy was very high for all the different methods. This could be a result of an unbalanced dataset (approximately 238K not cancelled flights vs 4400 cancelled flights).
Therefore, if these results were used for making decisions, for example, more attention should be put into on how to deal with the unbalanced dataset to get better predictions.

From the results we got, the gradient boosted trees had the highest accuracy, it was followed by the decision tree classifier, the third highest accuracy belonged to random forest method and the forth was logistic regression.