<CENTER>

#  Prediction of Airline Delay and Cancellation Data from 2014 - 2022 using machine leaning models

</CENTER>

## 1. Imports and Initialization

In [29]:
import os

# Define the path to the directory containing your datasets
dataset_path = "/content/drive/MyDrive/Airline Delay and Cancellation Data from 2014 to  2022"

# Use os.listdir() to get a list of files and directories
datasets = os.listdir(dataset_path)

data = sorted(datasets)

# Print the list of datasets
print(data)

['2014.csv', '2015.csv', '2016.csv', '2017.csv', '2018.csv', '2019.csv', '2021.csv', '2022.csv']


In [30]:
pip install pyspark



In [31]:
from pyspark import SparkContext, SparkConf


from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier, GBTClassifier

from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

import warnings
warnings.filterwarnings('ignore')

In [32]:
# initialize sparkSession
#setting the log level to control the verbosity of the logging output.
spark = SparkSession.builder.config("spark.executor.memory","2g").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

In [33]:
#removes all cached data for all tables in the catalog.
spark.catalog.clearCache()

## 2. Loading and Cleaning the Data

In [34]:
file_names_range = list(range(2014, 2019)) + list(range(2021, 2022))
file_paths = [f'/content/drive/MyDrive/Airline Delay and Cancellation Data from 2014 to  2022' for year in file_names_range]




"""
So, the file_paths list will contain paths like:

'/content/drive/MyDrive/Airline Delay and Cancellation Data from 2014 to 2022/2014'
'/content/drive/MyDrive/Airline Delay and Cancellation Data from 2014 to 2022/2015'
...
'/content/drive/MyDrive/Airline Delay and Cancellation Data from 2014 to 2022/2021'
"""

"\nSo, the file_paths list will contain paths like:\n\n'/content/drive/MyDrive/Airline Delay and Cancellation Data from 2014 to 2022/2014'\n'/content/drive/MyDrive/Airline Delay and Cancellation Data from 2014 to 2022/2015'\n...\n'/content/drive/MyDrive/Airline Delay and Cancellation Data from 2014 to 2022/2021'\n"

In [35]:
schema = T.StructType([
    T.StructField("FL_DATE", T.TimestampType(), nullable=True),
    T.StructField("OP_CARRIER", T.StringType(), nullable=True),
    T.StructField("OP_CARRIER_FL_NUM", T.IntegerType(), nullable=True),
    T.StructField("ORIGIN", T.StringType(), nullable=True),
    T.StructField("DEST", T.StringType(), nullable=True),
    T.StructField("CRS_DEP_TIME", T.DoubleType(), nullable=True),
    T.StructField("DEP_TIME", T.DoubleType(), nullable=True),
    T.StructField("DEP_DELAY", T.DoubleType(), nullable=True),
    T.StructField("TAXI_OUT", T.DoubleType(), nullable=True),
    T.StructField("WHEELS_OFF", T.DoubleType(), nullable=True),
    T.StructField("WHEELS_ON", T.DoubleType(), nullable=True),
    T.StructField("TAXI_IN", T.DoubleType(), nullable=True),
    T.StructField("CRS_ARR_TIME", T.DoubleType(), nullable=True),
    T.StructField("ARR_TIME",T.DoubleType(), nullable=True),
    T.StructField("ARR_DELAY", T.DoubleType(), nullable=True),
    T.StructField("CANCELLED", T.DoubleType(), nullable=True),
    T.StructField("CANCELLATION_CODE", T.StringType(), nullable=True),
    T.StructField("DIVERTED", T.DoubleType(), nullable=True),
    T.StructField("CRS_ELAPSED_TIME", T.DoubleType(), nullable=True),
    T.StructField("ACTUAL_ELAPSED_TIME", T.DoubleType(), nullable=True),
    T.StructField("AIR_TIME", T.DoubleType(), nullable=True),
    T.StructField("DISTANCE", T.DoubleType(), nullable=True),
    T.StructField("CARRIER_DELAY", T.DoubleType(), nullable=True),
    T.StructField("WEATHER_DELAY", T.DoubleType(), nullable=True),
    T.StructField("NAS_DELAY", T.DoubleType(), nullable=True),
    T.StructField("SECURITY_DELAY", T.DoubleType(), nullable=True),
    T.StructField("LATE_AIRCRAFT_DELAY", T.DoubleType(), nullable=True),
    T.StructField("Unnamed: 27", T.StringType(), nullable=True)
])

In [36]:
df = spark.read.schema(schema).format("csv").option("header", "true").load(file_paths)

In [37]:
# remove null values from the cols used for classification:
df = df.dropna(subset= [
    'FL_DATE',
 'OP_CARRIER',
 'OP_CARRIER_FL_NUM',
 'ORIGIN',
 'DEST',
 'CRS_DEP_TIME',
 'CRS_ARR_TIME',
 'CANCELLED',
 'DIVERTED',
 'CRS_ELAPSED_TIME',
 'DISTANCE'])

# save df for analysis
analysis_df = df

In [38]:
# drop the cols who indirectly indicate if a flight is canelled or not (apart from the column CANCELLED)
# most of those cols contain null values, if the flight is cancelled

classify_df = df.drop("Unnamed: 27",
                        "CARRIER_DELAY",
                        "WEATHER_DELAY",
                        "NAS_DELAY",
                        "SECURITY_DELAY",
                        "LATE_AIRCRAFT_DELAY",
                        "CANCELLATION_CODE",
                        "DEP_TIME",
                        "DEP_DELAY",
                        "TAXI_OUT",
                        "WHEELS_OFF",
                        "WHEELS_ON",
                        "TAXI_IN",
                        "ARR_TIME",
                        "ARR_DELAY",
                        "ACTUAL_ELAPSED_TIME",
                        "AIR_TIME")

In [39]:
# numerical timestamp column
classify_df = classify_df.withColumn("FL_DATE", F.unix_timestamp("FL_DATE"))


"""
Before Conversion:
+-------------------+
|         event_time|
+-------------------+
|2023-01-01 12:30:45|
+-------------------+

After Conversion:
+-------------------+------------------+
|         event_time|   event_time_unix|
+-------------------+------------------+
|2023-01-01 12:30:45|1641079845.0      |
+-------------------+------------------+
"""

'\nBefore Conversion:\n+-------------------+\n|         event_time|\n+-------------------+\n|2023-01-01 12:30:45|\n+-------------------+\n\nAfter Conversion:\n+-------------------+------------------+\n|         event_time|   event_time_unix|\n+-------------------+------------------+\n|2023-01-01 12:30:45|1641079845.0      |\n+-------------------+------------------+\n'

In [40]:
classify_df.columns

['FL_DATE',
 'OP_CARRIER',
 'OP_CARRIER_FL_NUM',
 'ORIGIN',
 'DEST',
 'CRS_DEP_TIME',
 'CRS_ARR_TIME',
 'CANCELLED',
 'DIVERTED',
 'CRS_ELAPSED_TIME',
 'DISTANCE']

In [41]:
# Take a subset: either balanced (with subsampling) or unbalanced
# we take a subset, because of memory limitations

# select subsample of positive samples
pos_df = classify_df.filter(F.col('CANCELLED').isin(1)).sample(fraction=0.1)
# select an equal amount of negative samples (number of neg samples == number of pos samples)
neg_df = classify_df.filter(F.col('CANCELLED').isin(0)).orderBy(F.rand()).limit(pos_df.count())


# balanced df - a subset - around 141k
classify_df = pos_df.union(neg_df).sample(fraction=1.0).cache()

# unbalanced df - but a subset - around 215k
#classify_df = classify_df.sample(fraction=0.005).cache()

## 3. Analysis (on the analysis_df)

In [42]:
# get the most present flight carriers
carriers_flight_count_df = analysis_df.groupBy(F.col('OP_CARRIER')).count().orderBy(F.col('count').desc())
top_10 = carriers_flight_count_df.limit(10).toPandas()
top_10 = top_10.rename(columns={'OP_CARRIER':'Carrier'})
top_10

Unnamed: 0,Carrier,count
0,WN,59018034
1,DL,40070868
2,AA,33522552
3,OO,30683742
4,EV,24626412
5,UA,24626208
6,MQ,14054196
7,B6,12522108
8,US,11021874
9,AS,8313600


In [43]:
import altair as alt

# visualisation
chart = alt.Chart(top_10).mark_arc(outerRadius=260, innerRadius=75).encode(
    theta = alt.Theta(field="count", type="quantitative", stack=True),
    color = alt.Color('Carrier:N', scale=alt.Scale(scheme='category20'), legend=None),
).properties(
    title='Top 10 Carriers by amount of flights',
    width=600,
    height=300
)

pie = chart.mark_arc(outerRadius=350)
value_text = pie.mark_text(radius=300, size=15).encode(text=alt.Text('count:Q'))

pie2 = chart.mark_arc(outerRadius=250)
text = pie2.mark_text(radius=200, size=15).encode(
    text=alt.Text('Carrier:N'),
    color=alt.value("#000000")
)

(chart + text + value_text).configure_view(
    strokeWidth=0
).configure_title(
    fontSize=18
)

In [44]:
# count number of cancellations per code/reason
#This filters out rows where the 'CANCELLATION_CODE' column is not null. This is done to exclude flights that were not canceled.
carriers_flight_count_df = analysis_df.filter(F.col('CANCELLATION_CODE').isNotNull()).groupBy(F.col('CANCELLATION_CODE')).count()
cancellation_reasons = carriers_flight_count_df.toPandas()
cancellation_reasons

Unnamed: 0,CANCELLATION_CODE,count
0,B,2299086
1,D,3894
2,C,907284
3,A,1425474


In [45]:
# rename col values
cancellation_reasons['CANCELLATION_CODE'][cancellation_reasons['CANCELLATION_CODE'] == 'A'] = 'By carrier'
cancellation_reasons['CANCELLATION_CODE'][cancellation_reasons['CANCELLATION_CODE'] == 'B'] = 'Due to weather'
cancellation_reasons['CANCELLATION_CODE'][cancellation_reasons['CANCELLATION_CODE'] == 'C'] = 'By national air system'
cancellation_reasons['CANCELLATION_CODE'][cancellation_reasons['CANCELLATION_CODE'] == 'D'] = 'For security'
cancellation_reasons = cancellation_reasons.rename(columns={'CANCELLATION_CODE':'Reason'})

In [46]:
cancellation_reasons

Unnamed: 0,Reason,count
0,Due to weather,2299086
1,For security,3894
2,By national air system,907284
3,By carrier,1425474


In [47]:
# visualisation of calcellation reasons
chart = alt.Chart(cancellation_reasons).mark_arc(outerRadius=180, innerRadius=50).encode(
    theta = alt.Theta(field="count", type="quantitative", stack=True),
    color = alt.Color('Reason:N', scale=alt.Scale(scheme='category20'), legend=None),
).properties(
    title='Reasons for flight cancellations',
    width=600,
    height=300
)

pie = chart.mark_arc(outerRadius=250)
value_text = pie.mark_text(radius=220, size=15).encode(text=alt.Text('count:Q'))

pie2 = chart.mark_arc(outerRadius=150)
text = pie2.mark_text(radius=120, size=12).encode(
    text=alt.Text('Reason:N'),
    color=alt.value("#000000")
)

(chart + text + value_text).configure_view(
    strokeWidth=0
).configure_title(
    fontSize=18
)

# **4. Preprocessing**

In [48]:
# define StringIndexer: categorical (string) cols -> to column indices,
# each category gets a integer based on their frequency (start from 0)

carrier_indexer = StringIndexer(inputCol="OP_CARRIER", outputCol="OP_CARRIER_Index")
origin_indexer = StringIndexer(inputCol="ORIGIN", outputCol="ORIGIN_Index")
dest_indexer = StringIndexer(inputCol="DEST", outputCol="DEST_Index")

In [49]:
# define onehotencoder for a index columns
onehotencoder_carrier_vector = OneHotEncoder(inputCol="OP_CARRIER_Index", outputCol="OP_CARRIER_vec")
onehotencoder_origin_vector = OneHotEncoder(inputCol="ORIGIN_Index", outputCol="ORIGIN_vec")
onehotencoder_dest_vector = OneHotEncoder(inputCol="DEST_Index", outputCol="DEST_vec")

In [51]:
# Pipelining the preprocessing stages defined above
pipeline = Pipeline(stages=[carrier_indexer, origin_indexer, dest_indexer,
                            onehotencoder_carrier_vector, onehotencoder_origin_vector,
                            onehotencoder_dest_vector])

transformed_df = pipeline.fit(classify_df).transform(classify_df)

In [55]:
# select columns that are combined to one feature column
feature_columns = transformed_df.columns

# remove cols that whould not be in our feature cols (label col, intermediate preprocessing cols)
for item in ["CANCELLED", "ORIGIN", "DEST", "OP_CARRIER", "OP_CARRIER_Index", "ORIGIN_Index", "DEST_Index"]:
    feature_columns.remove(item)


assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

# build feature col
assembled_df = assembler.transform(transformed_df)



In [56]:
# select only feature and label column
final_classify_df = assembled_df.select("features", F.col("CANCELLED").alias("label"))

In [57]:
final_classify_df.printSchema()


root
 |-- features: vector (nullable = true)
 |-- label: double (nullable = true)



In [58]:
train, test = final_classify_df.randomSplit([.7, .3], seed=9) # 70, 30 split on balanced set or on subset of samples

In [59]:
#spark.catalog.clearCache()
# caching data into memory - models run quicker
train = train.repartition(32).cache()
test = test.repartition(32).cache()

# **5. Training Models (on balanced and unbalanced data)**

In [60]:
# define the models
log_regress = LogisticRegression(labelCol = 'label', featuresCol = 'features')
decision_tree = DecisionTreeClassifier(labelCol = 'label', featuresCol = 'features')
rand_forest = RandomForestClassifier(labelCol = 'label', featuresCol = 'features')
gbt = GBTClassifier(labelCol = 'label', featuresCol = 'features')

In [61]:
log_regress_model = log_regress.fit(train)

In [62]:
decision_tree_model = decision_tree.fit(train)

In [63]:
rand_forest_model = rand_forest.fit(train)

In [64]:
gbt_model = gbt.fit(train)

# **6. Evaluation**

In [65]:
# Predications on test set
log_regress_predictions = log_regress_model.transform(test)
decision_tree_predictions = decision_tree_model.transform(test)
rand_forest_predictions = rand_forest_model.transform(test)
gbt_predictions = gbt_model.transform(test)


In [66]:
# Define metrics to evaluate the models
# ROC = areaUnderROC = Area under the Receiver Operating Characteristic (ROC) curve,
# A curve that plots the TPR against the FPR.
# The area under the ROC curve represents the probability that the model correctly ranks a randomly chosen positive instance higher than a randomly chosen negative instance.
# A higher value of areaUnderROC indicates better model performance, with 1.0 being the maximum achievable value.
evaluator_ROC = BinaryClassificationEvaluator(labelCol='label', metricName='areaUnderROC')

# PR = areaUnderPR = Area Under the Precision-Recall curve
# A curve that plots the precision (positive predictive value) against the recall (sensitivity).
# The area under the precision-recall curve represents the trade-off between precision and recall.
# A higher value of areaUnderPR indicates better model performance, with 1.0 being the maximum achievable value.
evaluator_PR = BinaryClassificationEvaluator(labelCol='label', metricName='areaUnderPR')

# Accuracy
# in pyspark accuracy metrics is for multiclass-classification
evaluator_Acc = MulticlassClassificationEvaluator(labelCol='label', metricName='accuracy')


In [67]:
# set evaluations

log_regress_ROC = evaluator_ROC.evaluate(log_regress_predictions)
decision_tree_ROC = evaluator_ROC.evaluate(decision_tree_predictions)
rand_forest_ROC = evaluator_ROC.evaluate(rand_forest_predictions)
gbt_ROC = evaluator_ROC.evaluate(gbt_predictions)

log_regress_PR = evaluator_PR.evaluate(log_regress_predictions)
decision_tree_PR = evaluator_PR.evaluate(decision_tree_predictions)
rand_forest_PR = evaluator_PR.evaluate(rand_forest_predictions)
gbt_PR = evaluator_PR.evaluate(gbt_predictions)

log_regress_Acc = evaluator_Acc.evaluate(log_regress_predictions)
decision_tree_Acc = evaluator_Acc.evaluate(decision_tree_predictions)
rand_forest_Acc = evaluator_Acc.evaluate(rand_forest_predictions)
gbt_Acc = evaluator_Acc.evaluate(gbt_predictions)

In [68]:
# Print the metrics of each model - unbalanced dataset
print('Metric esults:')
print('Area under Receiver Operating Characteristic curve:')
print("Logistic Regression ROC: {:.4f}".format(log_regress_ROC))
print("Decision Tree ROC: {:.4f}".format(decision_tree_ROC))
print("Random Forest ROC: {:.4f}".format(rand_forest_ROC))
print("Gradient Boosted Trees ROC: {:.4f}".format(gbt_ROC))

print('Area under Precision Recall curve:')
print("Logistic Regression PR: {:.4f}".format(log_regress_PR))
print("Decision Tree PR: {:.4f}".format(decision_tree_PR))
print("Random Forest PR: {:.4f}".format(rand_forest_PR))
print("Gradient Boosted Trees PR: {:.4f}".format(gbt_PR))

print('Accuracy:')
print("Logistic Regression PR: {:.4f}".format(log_regress_Acc))
print("Decision Tree PR: {:.4f}".format(decision_tree_Acc))
print("Random Forest PR: {:.4f}".format(rand_forest_Acc))
print("Gradient Boosted Trees PR: {:.4f}".format(gbt_Acc))

Metric esults:
Area under Receiver Operating Characteristic curve:
Logistic Regression ROC: 0.7100
Decision Tree ROC: 0.4639
Random Forest ROC: 0.6687
Gradient Boosted Trees ROC: 0.7181
Area under Precision Recall curve:
Logistic Regression PR: 0.6940
Decision Tree PR: 0.4655
Random Forest PR: 0.6629
Gradient Boosted Trees PR: 0.7074
Accuracy:
Logistic Regression PR: 0.6536
Decision Tree PR: 0.6265
Random Forest PR: 0.6220
Gradient Boosted Trees PR: 0.6523
