# Flight Delay Predictive Analysis - Phase IV Notebook
### 261 Final Project
#### Team 4-1: Austin Chen: auschen@berkeley.edu, Casey Hsiung: hsiungc@berkeley.edu (Phase Leader), Cinthya Rosales: crosales@berkeley.edu, Saurabh Narain: snarain@berkeley.edu

<div>
<img src="https://i.imgur.com/z7ll7aY.jpeg", width=400 />
</div>

#### Credit Assignment Plan

![](https://i.imgur.com/McjrqZz.png)

### Business Case
Flight delays are a significant challenge for airlines and airports, causing inconvenience to passengers and economic losses. Most importantly, flight delays decrease customer satisfaction and affects customer retention. To address this issue, we aim to predict flight delays in the next two hours using data that contains flight information and weather data.  

Since the primary customer is the consumer, we will focus on predicting departure delays two hours ahead. A delay will be defined as a 15 minute delay. By informing passengers of delays, airlines can better manage operations and also increase customer satisfaction at the same time. The project’s success will be measured by evaulating our final model’s metrics.

### Data 
The data we have used so far for EDA and modeling are the following, The Flights Data contains 6 months of flights data during the first and second quarters of 2015. It is a subset of the passenger flight's on-time performance data. It includes columns such as the flight data and distance of the flight.

The Weather Data contains 6 months of weather information during the first and second quarters of 2015. It is a subset of the weather information data from 2015-2021. It includes columns such as hourly precipitation and hourly wind speed

The Weather Stations Data contains weather station information from 2015-2021. It includes columns such as latitude, longitude, and distance to its neighbor stations. 

The airport codes refer to the IATA airport code, a three-letter code which is used in passenger reservation, ticketing and baggage-handling systems, or the ICAO airport code which is a four letter code used by ATC systems and for airports that do not have an IATA airport code. 

The OTPW dataset is a joined dataset of both the flights and weather data.

### Project Abstract

This project addresses the critical issue of flight delays, which impose substantial costs and disruptions on airlines and passengers alike. The project's primary objective is to predict flight delays accurately, thereby enabling proactive planning and enhancing travel experiences. Initially, the project leveraged historical flight data and long-term weather information to develop a robust machine-learning model for delay prediction.

In prior phases, various machine learning models, including Decision Tree, Random Forest, and K-Nearest Neighbors (KNN), were trained using the 60m OTPW dataset. These models were compared against the baseline Logistic Regression model. To enhance model performance, new features were incorporated into each model, accompanied by meticulous hyperparameter tuning. The phase also explored diverse model families, encompassing Decision Trees, Random Forests, and KNN, with the aim of identifying the most effective one. Augmenting the models with additional features further bolstered their predictive capabilities.

Following extensive experimentation and evaluation, the Random Forest classifier emerged as the top-performing model, boasting a test accuracy of 0.75, test precision of 0.72, and test recall of 0.75. This underscored its proficiency in making accurate predictions and effectively recognizing true positive cases. Building upon these achievements, the subsequent phase introduced the Multi Layer Perceptron model, which outshined all others. It achieved remarkable results with a test accuracy of 0.91, test precision of 0.93, and test recall of 0.91, thereby establishing its superiority in prediction accuracy and overall performance.

### Project Description

This notebook contains the Phase IV framework for Team 4-1's flight delay analysis. The focus of the following sections in the fourth phase is to conduct feature engineering, build more advanced models to predict flights delays, discuss experiments and results, and select our final model.

After training the models, we analyze the results of the models in terms of accuracy, precision, and recall to determine which model is most well-rounded.

![Name of the image](https://i.imgur.com/OVhObhg.png)

### Cluster Information

The below are the details of the team 4-1 cluster used for this project:

- **Cluster Size:** 1 driver node and 1 to 8 worker nodes
- **Driver Node:** Standard_DS3_V2, 14 GB Memory, 4 CPU Cores
- **Worker Nodes:** Standard_DS3_V2, 14 GB to 112 GB Memory (per node), 4 to 32 CPU Cores (per node)
- **Runtime:** Databricks Runtime 13.2.x with Scala 2.12 and CPU-optimized machine learning libraries

### Notebook Setup

In [None]:

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier, MultilayerPerceptronClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler, StringIndexer, StandardScaler, OneHotEncoder, PCA, MinMaxScaler
from pyspark.ml.linalg import DenseVector, VectorUDT, Vectors
from pyspark.ml.stat import ChiSquareTest
from pyspark.ml.tuning import CrossValidator

from pyspark.sql import functions as F
from pyspark.sql import Window
from pyspark.sql.functions import col,max,isnan,when,count,regexp_replace,length,udf,lag,lead,round,split,rand,udf,sum,month,dayofmonth,collect_list,log,monotonically_increasing_id,row_number
from pyspark.sql.types import FloatType, StructType, StructField, StringType, IntegerType, DateType, ArrayType, DoubleType
from pyspark.sql.window import Window

from sklearn.base import clone
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.preprocessing import MinMaxScaler as skmm

import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Dense, LSTM

from xgboost.spark import SparkXGBRegressor

import mlflow.pyspark.ml

from hyperopt import fmin, tpe, hp, Trials
from imblearn.over_sampling import ADASYN
import gc
import math
import numpy as np
import pandas as pd
from statsmodels.stats.weightstats import ztest as ztest

import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
import seaborn as sns



In [None]:
# Init Script
blob_container = "261container" # The name of your container created in https://portal.azure.com
storage_account = "261project" # The name of your Storage account created in https://portal.azure.com
secret_scope = "261-fp-project" # The name of the scope created in your local computer using the Databricks CLI
secret_key = "project-key" # saskey The name of the secret key created in your local computer using the Databricks CLI 
blob_url = f"wasbs://{blob_container}@{storage_account}.blob.core.windows.net"
mount_path = "/mnt/mids-w261"

In [None]:
# Set up SAS token
spark.conf.set(
  f"fs.azure.sas.{blob_container}.{storage_account}.blob.core.windows.net",
  dbutils.secrets.get(scope = secret_scope, key = secret_key)
)

### Data Processing

The flight prediction datasets were each converted into Parquet format before being written into the blob storage.

In [None]:
df_flights_6m = spark.read.parquet(f"{blob_url}/df_flights_6m")
df_weather_6m = spark.read.parquet(f"{blob_url}/df_weather_6m")
df_stations = spark.read.parquet(f"{blob_url}/df_stations")
df_airport_codes = spark.read.parquet(f"{blob_url}/df_airport_codes")

# df_otpw = spark.read.parquet(f"{blob_url}/OTPW_3m")
# OTPW_12m = spark.read.parquet(f"{blob_url}/OTPW_12m")
df_otpw = spark.read.parquet(f"{blob_url}/OTPW_60m")

### Data Sanitization

In our initial data cleaning phase for the combined dataset, we faced issues with duplicate entries and missing data. To ensure the uniqueness of each observation, we identified and eliminated duplicate flight records. Furthermore, we removed any rows entirely populated by null values, as they offer no substantial value to our dataset or model. Rows containing invalid characters, specifically within ID fields, were also discarded to maintain the integrity of our data. We recognized that several columns related to weather data were represented in a non-numerical fashion. Accordingly, we converted numerical data stored as strings to float types, enhancing our ability to perform subsequent data cleansing strategies.

In an attempt to refine the dataset further, we addressed the missing weather values through interpolation, selecting a three-hour offset based on initial assessments. However, this approach may warrant a more nuanced investigation in the subsequent phase of our project. Particularly for regions with temperate weather, we do not anticipate severe weather shifts within a six-hour window—three hours before and three hours after the hour of interest. Yet, for states with volatile weather patterns, such as Florida, conditions can change drastically within a single hour.

To ensure that the label was properly sanitized, imputation was also performed on certain arrival and departure delay features. This is a critical step in the machine learning process, as values such as NULLs are not accepted into modeling.

Moving forward, our approach to handling missing values will likely evolve to consider both seasonal and geographical variations. By tailoring our data cleaning process to the unique characteristics of each region and time of year, we can more accurately fill missing values and enhance the predictive power of our model

#### High-Level Cleaning

High-level cleaning dropped NULLs, duplicates, unecessary columns, and invalid characters.

In [None]:
print(f"OTPW Rows: {df_otpw.count()}")

OTPW Rows: 31673119


In [None]:
# Drop duplicate flights
df_otpw_dedupe = df_otpw.dropDuplicates(subset=["FL_DATE", "OP_UNIQUE_CARRIER", "TAIL_NUM","ORIGIN_AIRPORT_ID", "OP_CARRIER_FL_NUM", "CRS_DEP_TIME"])
print(f"df_otpw Rows: {df_otpw_dedupe.count()}")

df_otpw Rows: 31673116


In [None]:
# Initial eyeball list of columns for features and target variable
# Can cut down - this helps with data cleaning section by eliminating more unecessary rows
df_otpw_feat = df_otpw_dedupe.select('QUARTER', 'YEAR', 'MONTH', 'DAY_OF_MONTH', 'DAY_OF_WEEK', 'FL_DATE', 'OP_UNIQUE_CARRIER', 'ORIGIN_AIRPORT_ID', 'ORIGIN_CITY_MARKET_ID', 'ORIGIN_STATE_FIPS', 'DEST_AIRPORT_ID', 'DEST_CITY_MARKET_ID', 'DEST_STATE_FIPS', 'CRS_DEP_TIME', 'DEP_DELAY_GROUP', 'DEP_TIME_BLK', 'CRS_ARR_TIME', 'ARR_TIME', 'ARR_DELAY', 'ARR_DELAY_GROUP','ARR_DEL15', 'ARR_TIME_BLK', 'CANCELLED', 'CANCELLATION_CODE', 'DIVERTED', 'CRS_ELAPSED_TIME', 'ACTUAL_ELAPSED_TIME', 'AIR_TIME', 'DISTANCE', 'DISTANCE_GROUP', 'CARRIER_DELAY', 'WEATHER_DELAY', 'NAS_DELAY', 'SECURITY_DELAY', 'LATE_AIRCRAFT_DELAY', 'sched_depart_date_time', 'two_hours_prior_depart_UTC', 'dest_airport_lon', 'dest_airport_lat', 'STATION', 'DATE', 'LATITUDE', 'LONGITUDE', 'ELEVATION', 'HourlyPrecipitation', 'HourlyPresentWeatherType', 'HourlySkyConditions', 'HourlyVisibility', 'HourlyPressureChange', 'HourlyPressureTendency', 'HourlyStationPressure', 'HourlyWindDirection', 'HourlyWindGustSpeed', 'HourlyWindSpeed','DEP_TIME', 'DEP_DELAY', 'DEP_DEL15')

# List of column names
col_list = [col for col in df_otpw_feat.columns]

In [None]:
# Drop rows with only NULL values
df_otpw_dropna = df_otpw_feat.dropna(how="all")
print(f"df_otpw Rows: {df_otpw_dropna.count()}")

df_otpw Rows: 31673116


In [None]:
# Check rows w/ invalid characters
pattern = "[^a-zA-Z0-9\\s]"
invalid_list = ["OP_UNIQUE_CARRIER", "ORIGIN_AIRPORT_ID", "ORIGIN_CITY_MARKET_ID"]

# Drop rows with invalid characters (IDs) and NULL values
OP_UNIQUE_CARRIER_df = df_otpw_dropna.withColumn(f"cleaned_{invalid_list[0]}", regexp_replace(f"{invalid_list[0]}", f"{pattern}", ""))
OTPW_stage = OP_UNIQUE_CARRIER_df.filter(length(f"cleaned_{invalid_list[0]}") == length(f"{invalid_list[0]}"))

ORIGIN_AIRPORT_ID_df = OTPW_stage.withColumn(f"cleaned_{invalid_list[1]}", regexp_replace(f"{invalid_list[1]}", f"{pattern}", ""))
OTPW_stage = ORIGIN_AIRPORT_ID_df.filter(length(f"cleaned_{invalid_list[1]}") == length(f"{invalid_list[1]}"))

ORIGIN_CITY_MARKET_ID_df = OTPW_stage.withColumn(f"cleaned_{invalid_list[2]}", regexp_replace(f"{invalid_list[2]}", f"{pattern}", ""))
df_otpw = ORIGIN_CITY_MARKET_ID_df.filter(length(f"cleaned_{invalid_list[2]}") == length(f"{invalid_list[2]}"))

print(f"Cleaned OTPW Rows: {df_otpw.count()}")

Cleaned OTPW Rows: 31673116


In [None]:
# Dataframe clean up: drop staging columns and rename, round values
df_otpw = df_otpw.drop("OP_UNIQUE_CARRIER", "ORIGIN_AIRPORT_ID", "ORIGIN_CITY_MARKET_ID")

df_otpw = df_otpw.withColumnRenamed("cleaned_OP_UNIQUE_CARRIER", "OP_UNIQUE_CARRIER")
df_otpw = df_otpw.withColumnRenamed("cleaned_ORIGIN_AIRPORT_ID", "ORIGIN_AIRPORT_ID")
df_otpw = df_otpw.withColumnRenamed("cleaned_ORIGIN_CITY_MARKET_ID", "ORIGIN_CITY_MARKET_ID")

In [None]:
# Count the number of null values in each column
display(df_otpw.select([count(when(df_otpw[c].isNull(), c)).alias(c) for c in df_otpw.columns]))

QUARTER,YEAR,MONTH,DAY_OF_MONTH,DAY_OF_WEEK,FL_DATE,ORIGIN_STATE_FIPS,DEST_AIRPORT_ID,DEST_CITY_MARKET_ID,DEST_STATE_FIPS,CRS_DEP_TIME,DEP_DELAY_GROUP,DEP_TIME_BLK,CRS_ARR_TIME,ARR_TIME,ARR_DELAY,ARR_DELAY_GROUP,ARR_DEL15,ARR_TIME_BLK,CANCELLED,CANCELLATION_CODE,DIVERTED,CRS_ELAPSED_TIME,ACTUAL_ELAPSED_TIME,AIR_TIME,DISTANCE,DISTANCE_GROUP,CARRIER_DELAY,WEATHER_DELAY,NAS_DELAY,SECURITY_DELAY,LATE_AIRCRAFT_DELAY,sched_depart_date_time,two_hours_prior_depart_UTC,dest_airport_lon,dest_airport_lat,STATION,DATE,LATITUDE,LONGITUDE,ELEVATION,HourlyPrecipitation,HourlyPresentWeatherType,HourlySkyConditions,HourlyVisibility,HourlyPressureChange,HourlyPressureTendency,HourlyStationPressure,HourlyWindDirection,HourlyWindGustSpeed,HourlyWindSpeed,DEP_TIME,DEP_DELAY,DEP_DEL15,OP_UNIQUE_CARRIER,ORIGIN_AIRPORT_ID,ORIGIN_CITY_MARKET_ID
0,0,0,0,0,0,0,0,0,0,0,475787,0,0,500367,568976,568976,568976,0,0,31184700,0,164,566378,566378,0,0,25889894,25889894,25889894,25889894,25889894,0,0,0,0,0,0,0,0,31673116,28321039,20814886,3047654,335865,20814886,82467,98260,27515586,90702,31615176,470813,475787,475787,0,0,0


#### Imputation

Missing data was addressed primarily for the weather, arrival/departure delay, and DEP_DEL15 variable.

##### Weather Data

In [None]:
# Convert columns to float
cols_conv = ['HourlyPressureChange', 'HourlyPressureTendency', 'HourlyWindDirection', 'HourlyWindDirection']

for cols in cols_conv:
    df_otpw = df_otpw.withColumn(cols, col(cols).cast('float'))

# columns dropped for now: 'HourlyPrecipitation'

In [None]:
weather_cols = ['HourlyVisibility', 'HourlyStationPressure', 'HourlyWindGustSpeed', 'HourlyWindSpeed', 'HourlyPressureChange', 'HourlyPressureTendency', 'HourlyWindDirection']

for col_name in weather_cols:
    window_spec = Window.orderBy('MONTH')
    
    lag_sum = 0
    lead_sum = 0
    
    for i in range(1, 13):
        lag_sum = lag_sum + F.lag(F.col(col_name), offset=i).over(window_spec)
        lead_sum = lead_sum + F.lead(F.col(col_name), offset=i).over(window_spec)
    
    lag_avg = lag_sum / 12
    lead_avg = lead_sum / 12

    # Define the interpolation logic for general weather columns
    interpolated_value = (F.when((lag_avg.isNotNull()) & (lead_avg.isNotNull()), (lag_avg + lead_avg) / 2.0)
                          .when(lag_avg.isNotNull(), lag_avg)
                          .when(lead_avg.isNotNull(), lead_avg)
                          .otherwise(None))

    # Custom logic for HourlyPrecipitation
    if col_name == 'HourlyPrecipitation':
        interpolated_value = (F.when((lag_avg.isNotNull()) & (lead_avg.isNotNull()), (lag_avg + lead_avg) / 2.0)
                              .otherwise(0.0))


    # Apply the interpolation logic to the DataFrame
    df_otpw = df_otpw.withColumn(col_name, F.when(F.col(col_name).isNull(), interpolated_value).otherwise(F.col(col_name)))




In [None]:
# Impute missing labels and departure & arrivals delay information
df_otpw.createOrReplaceTempView("df_otpw")

df_otpw_weath = spark.sql("""
                            SELECT 
                                o.*,
                                CASE WHEN HourlyPrecipitation IS NULL THEN AVG(HourlyPrecipitation) OVER (
                                    PARTITION BY FL_DATE,
                                                    DEP_TIME_BLK,
                                                    STATION,
                                                    LATITUDE,
                                                    LONGITUDE,
                                                    ELEVATION
                                                    ) ELSE 
                                                        HourlyPrecipitation  END AS HourlyPrecipitation_IMP,
                                CASE WHEN HourlyVisibility IS NULL THEN AVG(HourlyVisibility) OVER (
                                    PARTITION BY FL_DATE,
                                                    DEP_TIME_BLK,
                                                    STATION,
                                                    LATITUDE,
                                                    LONGITUDE,
                                                    ELEVATION
                                                    ) ELSE 
                                                        HourlyVisibility  END AS HourlyVisibility_IMP,
                                CASE WHEN HourlyPresentWeatherType IS NULL THEN AVG(HourlyPresentWeatherType) OVER (
                                    PARTITION BY FL_DATE,
                                                    DEP_TIME_BLK,
                                                    STATION,
                                                    LATITUDE,
                                                    LONGITUDE,
                                                    ELEVATION
                                                    ) ELSE 
                                                        HourlyPresentWeatherType END AS HourlyPresentWeatherType_IMP,
                                CASE WHEN HourlySkyConditions IS NULL THEN AVG(HourlySkyConditions) OVER (
                                    PARTITION BY FL_DATE,
                                                    DEP_TIME_BLK,
                                                    STATION,
                                                    LATITUDE,
                                                    LONGITUDE,
                                                    ELEVATION
                                                    ) ELSE 
                                                        HourlySkyConditions END AS HourlySkyConditions_IMP,
                                CASE WHEN HourlyPressureChange IS NULL THEN AVG(HourlyPressureChange) OVER (
                                    PARTITION BY FL_DATE,
                                                    DEP_TIME_BLK,
                                                    STATION,
                                                    LATITUDE,
                                                    LONGITUDE,
                                                    ELEVATION
                                                    ) ELSE 
                                                        HourlyPressureChange END AS HourlyPressureChange_IMP,
                                CASE WHEN HourlyPressureTendency IS NULL THEN AVG(HourlyPressureTendency) OVER (
                                    PARTITION BY FL_DATE,
                                                DEP_TIME_BLK,
                                                STATION,
                                                LATITUDE,
                                                LONGITUDE,
                                                ELEVATION
                                                ) ELSE 
                                                    HourlyPressureTendency END AS HourlyPressureTendency_IMP, 
                                CASE WHEN HourlyStationPressure IS NULL THEN AVG(HourlyStationPressure) OVER (
                                    PARTITION BY FL_DATE,
                                                    DEP_TIME_BLK,
                                                    STATION,
                                                    LATITUDE,
                                                    LONGITUDE,
                                                    ELEVATION
                                                    ) ELSE 
                                                        HourlyStationPressure END AS HourlyStationPressure_IMP,
                                CASE WHEN HourlyWindDirection IS NULL THEN AVG(HourlyWindDirection) OVER (
                                    PARTITION BY FL_DATE,
                                                    DEP_TIME_BLK,
                                                    STATION,
                                                    LATITUDE,
                                                    LONGITUDE,
                                                    ELEVATION
                                                    ) ELSE 
                                                        HourlyWindDirection END AS HourlyWindDirection_IMP,
                                CASE WHEN HourlyWindGustSpeed IS NULL THEN AVG(HourlyWindGustSpeed) OVER (
                                    PARTITION BY FL_DATE,
                                                    DEP_TIME_BLK,
                                                    STATION,
                                                    LATITUDE,
                                                    LONGITUDE,
                                                    ELEVATION
                                                    ) ELSE 
                                                        HourlyWindGustSpeed END AS HourlyWindGustSpeed_IMP,
                            CASE WHEN HourlyWindSpeed IS NULL THEN AVG(HourlyWindSpeed) OVER (
                                PARTITION BY FL_DATE,
                                                DEP_TIME_BLK,
                                                STATION,
                                                LATITUDE,
                                                LONGITUDE,
                                                ELEVATION
                                                ) ELSE 
                                                    HourlyWindSpeed END AS HourlyWindSpeed_IMP 
                            FROM df_otpw o
                          """)

In [None]:
# Impute missing labels and departure & arrivals delay information
df_otpw_weath.createOrReplaceTempView("df_otpw_weath")

df_otpw_weath = spark.sql("""
                            SELECT 
                                w.*,
                                CASE WHEN HourlyPrecipitation_IMP IS NULL THEN AVG(HourlyPrecipitation_IMP) OVER (
                                    PARTITION BY FL_DATE,
                                                STATION,
                                                LATITUDE,
                                                LONGITUDE,
                                                ELEVATION
                                                ) ELSE 
                                                    HourlyPrecipitation_IMP  END AS HourlyPrecipitation_FINAL,
                                CASE WHEN HourlyVisibility_IMP IS NULL THEN AVG(HourlyVisibility_IMP) OVER (
                                    PARTITION BY FL_DATE,
                                                STATION,
                                                LATITUDE,
                                                LONGITUDE,
                                                ELEVATION
                                                ) ELSE 
                                                    HourlyVisibility_IMP  END AS HourlyVisibility_FINAL,
                                CASE WHEN HourlyPresentWeatherType_IMP IS NULL THEN AVG(HourlyPresentWeatherType_IMP) OVER (    
                                    PARTITION BY FL_DATE,
                                                STATION,
                                                LATITUDE,
                                                LONGITUDE,
                                                ELEVATION
                                                ) ELSE 
                                                    HourlyPresentWeatherType_IMP END AS HourlyPresentWeatherType_FINAL,
                                CASE WHEN HourlySkyConditions_IMP IS NULL THEN AVG(HourlySkyConditions_IMP) OVER (
                                    PARTITION BY FL_DATE,
                                                STATION,
                                                LATITUDE,
                                                LONGITUDE,
                                                ELEVATION
                                                ) ELSE 
                                                    HourlySkyConditions_IMP END AS HourlySkyConditions_FINAL,
                                CASE WHEN HourlyPressureChange_IMP IS NULL THEN AVG(HourlyPressureChange_IMP) OVER (
                                    PARTITION BY FL_DATE,
                                                STATION,
                                                LATITUDE,
                                                LONGITUDE,
                                                ELEVATION
                                                ) ELSE 
                                                    HourlyPressureChange_IMP END AS HourlyPressureChange_FINAL,
                                CASE WHEN HourlyPressureTendency_IMP IS NULL THEN AVG(HourlyPressureTendency_IMP) OVER (        
                                    PARTITION BY FL_DATE,
                                                STATION,
                                                LATITUDE,
                                                LONGITUDE,
                                                ELEVATION
                                                ) ELSE 
                                                    HourlyPressureTendency_IMP END AS HourlyPressureTendency_FINAL, 
                                CASE WHEN HourlyStationPressure_IMP IS NULL THEN AVG(HourlyStationPressure_IMP) OVER (
                                    PARTITION BY FL_DATE,
                                                STATION,
                                                LATITUDE,
                                                LONGITUDE,
                                                ELEVATION
                                                ) ELSE 
                                                    HourlyStationPressure_IMP END AS HourlyStationPressure_FINAL,
                                CASE WHEN HourlyWindDirection_IMP IS NULL THEN AVG(HourlyWindDirection_IMP) OVER (
                                    PARTITION BY FL_DATE,
                                                STATION,
                                                LATITUDE,
                                                LONGITUDE,
                                                ELEVATION
                                                ) ELSE 
                                                    HourlyWindDirection_IMP END AS HourlyWindDirection_FINAL,
                                CASE WHEN HourlyWindGustSpeed_IMP IS NULL THEN AVG(HourlyWindGustSpeed_IMP) OVER (
                                    PARTITION BY FL_DATE,
                                                STATION,
                                                LATITUDE,
                                                LONGITUDE,
                                                ELEVATION
                                                ) ELSE 
                                                    HourlyWindGustSpeed_IMP END AS HourlyWindGustSpeed_FINAL,
                            CASE WHEN HourlyWindSpeed_IMP IS NULL THEN AVG(HourlyWindSpeed_IMP) OVER (
                                PARTITION BY FL_DATE,
                                            STATION,
                                            LATITUDE,
                                            LONGITUDE,
                                            ELEVATION
                                            ) ELSE 
                                                HourlyWindSpeed_IMP END AS HourlyWindSpeed_FINAL 
                            FROM df_otpw_weath w
                          """)

In [None]:
# Dataframe clean up: drop staging columns and rename, round values
df_otpw_weath = df_otpw_weath.drop("HourlyPrecipitation", "HourlyVisibility", "HourlyStationPressure", "HourlyPresentWeatherType", "HourlySkyConditions", "HourlyPressureChange", "HourlyPressureTendency",
                                         "HourlyWindDirection", "HourlyWindGustSpeed", "HourlyWindSpeed", 
                                         "HourlyPrecipitation_IMP", "HourlyVisibility_IMP", "HourlyVisibility_IMP", "HourlyStationPressure_IMP", "HourlyPresentWeatherType_IMP", "HourlySkyConditions_IMP", "HourlyPressureChange_IMP", "HourlyPressureTendency_IMP",
                                         "HourlyWindDirection_IMP", "HourlyWindGustSpeed_IMP", "HourlyWindSpeed_IMP")

df_otpw_weath = df_otpw_weath.withColumnRenamed("HourlyPrecipitation_FINAL","HourlyPrecipitation")
df_otpw_weath = df_otpw_weath.withColumnRenamed("HourlyVisibility_FINAL","HourlyVisibility")
df_otpw_weath = df_otpw_weath.withColumnRenamed("HourlyStationPressure_FINAL","HourlyStationPressure")
df_otpw_weath = df_otpw_weath.withColumnRenamed("HourlyPresentWeatherType_FINAL","HourlyPresentWeatherType")
df_otpw_weath = df_otpw_weath.withColumnRenamed("HourlySkyConditions_FINAL","HourlySkyConditions")
df_otpw_weath = df_otpw_weath.withColumnRenamed("HourlyPressureChange_FINAL","HourlyPressureChange")
df_otpw_weath = df_otpw_weath.withColumnRenamed("HourlyPressureTendency_FINAL","HourlyPressureTendency")
df_otpw_weath = df_otpw_weath.withColumnRenamed("HourlyWindDirection_FINAL","HourlyWindDirection")
df_otpw_weath = df_otpw_weath.withColumnRenamed("HourlyWindGustSpeed_FINAL","HourlyWindGustSpeed")
df_otpw = df_otpw_weath.withColumnRenamed("HourlyWindSpeed_FINAL","HourlyWindSpeed")

In [None]:
# Drop additional weather rows with null values
df_otpw = df_otpw.filter(df_otpw.HourlyVisibility.isNotNull())
df_otpw = df_otpw.filter(df_otpw.HourlyPressureTendency.isNotNull())
df_otpw = df_otpw.filter(df_otpw.HourlyStationPressure.isNotNull())
df_otpw = df_otpw.filter(df_otpw.HourlyWindGustSpeed.isNotNull())

##### Departures, Arrivals & Labels Data

In [None]:
# Impute missing labels and departure & arrivals delay information
df_otpw.createOrReplaceTempView("df_otpw")

df_otpw_label = spark.sql("""
                            SELECT 
                                o.*,
                                CASE WHEN ARR_DELAY IS NULL THEN AVG(ARR_DELAY) OVER (PARTITION BY FL_DATE,
                                                                                                    OP_UNIQUE_CARRIER,
                                                                                                    DEST_AIRPORT_ID,
                                                                                                    ARR_TIME_BLK) ELSE 
                                                                                                                    ARR_DELAY END AS ARR_DELAY_IMP,
                                CASE WHEN ARR_DEL15 IS NULL THEN AVG(ARR_DEL15) OVER (PARTITION BY FL_DATE,
                                                                                                    OP_UNIQUE_CARRIER,
                                                                                                    DEST_AIRPORT_ID,
                                                                                                    ARR_TIME_BLK) ELSE 
                                                                                                                    ARR_DEL15 END AS ARR_DEL15_IMP,
                                CASE WHEN DEP_DELAY IS NULL THEN AVG(DEP_DELAY) OVER (PARTITION BY FL_DATE,
                                                                                                    OP_UNIQUE_CARRIER,
                                                                                                    ORIGIN_AIRPORT_ID,
                                                                                                    DEP_TIME_BLK) ELSE 
                                                                                                                    DEP_DELAY END AS DEP_DELAY_IMP,
                                CASE WHEN DEP_DEL15 IS NULL THEN AVG(DEP_DEL15) OVER (PARTITION BY FL_DATE,
                                                                                                    OP_UNIQUE_CARRIER,
                                                                                                    ORIGIN_AIRPORT_ID,
                                                                                                    DEP_TIME_BLK) ELSE 
                                                                                                                    DEP_DEL15 END AS DEP_DEL15_IMP
                            FROM df_otpw o
                          """)

print(df_otpw_label.filter(df_otpw_label.DEP_DEL15_IMP.isNull()).count())

184252


In [None]:
df_otpw_label.createOrReplaceTempView("df_otpw_label")

df_otpw_label = spark.sql("""
                            SELECT 
                                l.*,
                                CASE WHEN ARR_DELAY_IMP IS NULL THEN AVG(ARR_DELAY) OVER (PARTITION BY FL_DATE,
                                                                                                        OP_UNIQUE_CARRIER,
                                                                                                        DEST_AIRPORT_ID,
                                                                                                        ARR_TIME_BLK) ELSE 
                                                                                                                ARR_DELAY_IMP END AS ARR_DELAY_FINAL, 
                                CASE WHEN ARR_DEL15_IMP IS NULL THEN AVG(ARR_DEL15) OVER (PARTITION BY FL_DATE,
                                                                                                        OP_UNIQUE_CARRIER,
                                                                                                        DEST_AIRPORT_ID,
                                                                                                        ARR_TIME_BLK) ELSE 
                                                                                                                ARR_DEL15_IMP END AS ARR_DEL15_FINAL, 
                                CASE WHEN DEP_DELAY_IMP IS NULL THEN AVG(DEP_DELAY) OVER (PARTITION BY DAY_OF_WEEK,
                                                                                                        OP_UNIQUE_CARRIER,
                                                                                                        ORIGIN_AIRPORT_ID,
                                                                                                        DEP_TIME_BLK) ELSE 
                                                                                                                DEP_DELAY_IMP END AS DEP_DELAY_FINAL,
                                CASE WHEN DEP_DEL15_IMP IS NULL THEN AVG(DEP_DEL15) OVER (PARTITION BY DAY_OF_WEEK,
                                                                                                        OP_UNIQUE_CARRIER,
                                                                                                        ORIGIN_AIRPORT_ID,
                                                                                                        DEP_TIME_BLK) ELSE 
                                                                                                                DEP_DEL15_IMP END AS DEP_DEL15_FINAL
                            FROM df_otpw_label l
                          """)

In [None]:
# Check and eliminate last remaining null values
print(f"Remaining null values: {df_otpw_label.filter(df_otpw_label.DEP_DEL15_FINAL.isNull()).count()}")
df_otpw_label = df_otpw_label.filter(df_otpw_label.DEP_DEL15_FINAL.isNotNull())

Remaining null values: 225


In [None]:
# Dataframe clean up: drop staging columns and rename, round values
df_otpw_label = df_otpw_label.drop("ARR_DELAY", "ARR_DEL15", "ARR_DELAY_IMP", "ARR_DEL15_IMP", 
                                         "DEP_DELAY", "DEP_DEL15", "DEP_DELAY_IMP", "DEP_DEL15_IMP")

df_otpw_label = df_otpw_label.withColumnRenamed("ARR_DEL15_FINAL","ARR_DEL15")
df_otpw_label = df_otpw_label.withColumnRenamed("ARR_DELAY_FINAL","ARR_DELAY")
df_otpw_label = df_otpw_label.withColumnRenamed("DEP_DEL15_FINAL","DEP_DEL15")
df_otpw_label = df_otpw_label.withColumnRenamed("DEP_DELAY_FINAL","DEP_DELAY")

df_otpw_label = df_otpw_label.withColumn("ARR_DEL15", round(df_otpw_label.ARR_DEL15,0))
df_otpw_label = df_otpw_label.withColumn("ARR_DELAY", round(df_otpw_label.ARR_DELAY,0))
df_otpw_label = df_otpw_label.withColumn("DEP_DEL15", round(df_otpw_label.DEP_DEL15,0))
df_otpw_label = df_otpw_label.withColumn("DEP_DELAY", round(df_otpw_label.DEP_DELAY,0))

#### Dataset Checkpoint (Data Sanitization/Imputation)

In [None]:
# Write data frame into blob storage
df_otpw_label.write.mode("overwrite").parquet(f"{blob_url}/OTPW_60m_clean")

In [None]:
# Load checkpointed dataframe
df_otpw = spark.read.parquet(f"{blob_url}/OTPW_60m_clean")

### Target Variable Creation

The purpose of the flight delay prediction was to predict whether a flight will be delayed two hours ahead of time. The OTPW dataset contained labels reflecting delay information at the existing timeframe with no futurization.

To properly capture the objective, a new target variable `TWO_HR_DELAY` was created, which was a binary variable based on the probability of a flight being delayed in two hours. The probability was calculated using a window function average of the `DEP_DEL15` labels, partitioned by date, carrier, airport, and 2-hour future time block. Future time blocks took the current time block with two hours added.

In [None]:
# Breakdown timeblocks to have more granularity
df_otpw.createOrReplaceTempView("df_otpw")

df_otpw_blk_det = spark.sql("""
                            SELECT 
                                a.*, 
                                (
                                    CASE WHEN DEP_TIME_BLK == "0001-0559" AND DEP_TIME BETWEEN 1 AND 59 THEN "0000-0059" 
                                        WHEN DEP_TIME_BLK == "0001-0559" AND DEP_TIME BETWEEN 100 AND 159 THEN "0100-0159"
                                        WHEN DEP_TIME_BLK == "0001-0559" AND DEP_TIME BETWEEN 200 AND 259 THEN "0200-0259"
                                        WHEN DEP_TIME_BLK == "0001-0559" AND DEP_TIME BETWEEN 300 AND 359 THEN "0300-0359"
                                        WHEN DEP_TIME_BLK == "0001-0559" AND DEP_TIME BETWEEN 300 AND 359 THEN "0400-0459"
                                        WHEN DEP_TIME_BLK == "0001-0559" AND DEP_TIME BETWEEN 300 AND 359 THEN "0500-0559"
                                        ELSE DEP_TIME_BLK
                                    END
                                ) AS DEP_TIME_BLK_DET
                            FROM df_otpw a
                        """)

In [None]:
# Convert timeblocks to blocks at the top of each hour
df_otpw_blk_det.createOrReplaceTempView("df_otpw_blk_det")

df_otpw_delay = spark.sql("""
                            SELECT 
                                b.*, 
                                (
                                    CASE WHEN DEP_TIME_BLK_DET == "0000-0059" THEN 0 
                                        WHEN DEP_TIME_BLK_DET == "0100-0159" THEN 100 
                                        WHEN DEP_TIME_BLK_DET == "0200-0259" THEN 200
                                        WHEN DEP_TIME_BLK_DET == "0300-0359" THEN 300 
                                        WHEN DEP_TIME_BLK_DET == "0400-0459" THEN 400 
                                        WHEN DEP_TIME_BLK_DET == "0500-0559" THEN 500 
                                        WHEN DEP_TIME_BLK_DET == "0600-0659" THEN 600 
                                        WHEN DEP_TIME_BLK_DET == "0700-0759" THEN 700 
                                        WHEN DEP_TIME_BLK_DET == "0800-0859" THEN 800 
                                        WHEN DEP_TIME_BLK_DET == "0900-0959" THEN 900 
                                        WHEN DEP_TIME_BLK_DET == "1000-1059" THEN 1000 
                                        WHEN DEP_TIME_BLK_DET == "1100-1159" THEN 1100 
                                        WHEN DEP_TIME_BLK_DET == "1200-1259" THEN 1200 
                                        WHEN DEP_TIME_BLK_DET == "1300-1359" THEN 1300 
                                        WHEN DEP_TIME_BLK_DET == "1400-1459" THEN 1400 
                                        WHEN DEP_TIME_BLK_DET == "1500-1559" THEN 1500 
                                        WHEN DEP_TIME_BLK_DET == "1600-1659" THEN 1600 
                                        WHEN DEP_TIME_BLK_DET == "1700-1759" THEN 1700 
                                        WHEN DEP_TIME_BLK_DET == "1800-1859" THEN 1800 
                                        WHEN DEP_TIME_BLK_DET == "1900-1959" THEN 1900 
                                        WHEN DEP_TIME_BLK_DET == "2000-2059" THEN 2000 
                                        WHEN DEP_TIME_BLK_DET == "2100-2159" THEN 2100 
                                        WHEN DEP_TIME_BLK_DET == "2200-2259" THEN 2200 
                                        WHEN DEP_TIME_BLK_DET == "2300-2359" THEN 2300 
                                    END
                                ) AS HOUR_BLK
                            FROM df_otpw_blk_det b
                        """)

In [None]:
# Take hour blocks and add two hours for futurization
df_otpw_delay.createOrReplaceTempView("df_otpw_delay")

df_otpw_blk = spark.sql("""
                            SELECT 
                                d.*,
                                (
                                    CASE WHEN HOUR_BLK = 2200 THEN 0
                                        WHEN HOUR_BLK = 2300 THEN 100 
                                        ELSE HOUR_BLK + 200 
                                    END
                                ) AS HOUR_BLK_2
                            FROM df_otpw_delay d
                        """)

In [None]:
# Calculate delay probability using window function
df_otpw_blk.createOrReplaceTempView("df_otpw_blk")

df_otpw_prob = spark.sql("""
                            SELECT 
                                l.*,
                                AVG(DEP_DEL15) OVER (PARTITION BY FL_DATE, 
                                                                  OP_UNIQUE_CARRIER,
                                                                  DEST_AIRPORT_ID,
                                                                  HOUR_BLK_2) AS TWO_HR_PROB
                            FROM df_otpw_blk l
                          """)

In [None]:
# Create 2-hour delay new target variable based on threshold
df_otpw_prob.createOrReplaceTempView("df_otpw_prob")

df_otpw = spark.sql("""
                            SELECT 
                                p.*,
                                (
                                    CASE WHEN TWO_HR_PROB >= 0.7 THEN 1
                                        ELSE 0
                                    END
                                ) AS TWO_HR_DELAY
                            FROM df_otpw_prob p
                          """)

#### Dataset Checkpoint (New Label Creation)

In [None]:
# Write data frame into blob storage
df_otpw.write.mode("overwrite").parquet(f"{blob_url}/OTPW_60m_new_label")

In [None]:
# Load checkpointed dataframe
df_otpw = spark.read.parquet(f"{blob_url}/OTPW_60m_new_label")

### Exploratory Data Analysis

In phase IV, we did not conduct expansive EDA, since it has been included in the previous phases. In the previous phases we made exploratory visualizations using the flights and weather datasets. The visualizations helped us determine which features we should be including in the model and variables that are correlated with the target variable. The necessary EDA in the previous phases helped us in the model building process.

**(The below charts and tables were not displayed to reduce the notebook download size)**

#### Flights Data 6m

The Flights Data contains 6 months of flights data during the firsta and second quarters of 2015. It is a subset of the passenger flight's on-time performance data. The data dictionary can be found here https://www.transtats.bts.gov/Fields.asp?gnoyr_VQ=FGJ . 

There are 5779024 rows, 60 numerical columns, and 49 categorical columns in the dataset.

In [None]:
df_flights_6m_pd = df_flights_6m.select(['FL_DATE', 'DEP_DEL15']).toPandas() 

In [None]:
df_flights_6m_pd.groupby(by="FL_DATE")['DEP_DEL15'].sum().plot()
plt.xlabel("Date")
plt.ylabel("Delayed Frequency")
plt.title("Delayed Frequency by Data")
plt.xticks(rotation=45)

The time series plot above contains the number of delayed flights for a given day. It can be observed that the lowest delayed flights for a given day occurs in the end of January, and the highest number of delayed flights occurs in the beginning of January. This makes sense, since many people travel after New Year's Day.

In [None]:
# df_flights_6m_pd_corr = df_flights_6m.limit(1000000).toPandas() 
# correlation_matrix = df_flights_6m_pd_corr.corr()
# correlation_matrix.style.background_gradient(cmap='coolwarm')

Since there are many columns in the dataset, we will only talk about the highly correlated features with DEP_DEL15, which indicates if the flight had a delay in departure. There seems to only be highly correlated features with columns related to delay such as DEP_DELAY. DEP_DELAY is the difference in minutes between scheduled and actual departure time, so this will cause a multicollinearity issue as the columns are related.

#### Weather Data 6m

The Weather Data contains 6 months of weather information during the first and second quarters of 2015. It is a subset of the weather information data from 2015-2021. The data dictionary can be found here https://www.ncei.noaa.gov/data/global-hourly/doc/isd-format-document.pdf on pages 8-12. 

There are 61204666 rows, 1 numerical column, and 123 categorical columns in the dataset. It appears that we will need to convert some columns to float type since some numerical columns are string types. We will do the cleaning in the OTPW dataset since that is where we will build our model. We will also build visualizations and correlation matrices with that dataset.

In [None]:
df_weather_6m.count()

61204666

In [None]:
df_year = df_weather_6m.groupBy("HourlyDewPointTemperature").count()
df_year_pd = df_year.toPandas()
plt.figure(figsize=(35, 6))
sns.barplot(x="HourlyDewPointTemperature", y="count", data = df_year_pd )
plt.xticks(rotation = 90, fontsize = 5)
plt.xlabel("HourlyDewPointTemperature")
plt.ylabel("Count")
plt.title("Histogram of HourlyDewPointTemperature")

The most common hourly dewpoint is 9

#### Stations Data

The Weather Stations Data contains weather station information from 2015-2021. 

**Data Dictionary:**

- **lat:** Latitude (Num)
- **lon:** Longitude (Num)
- **neighbor_lat:** neighbor latitude (Num)
- **neighbor_lon:** neighbor longitude (Num)
- **distance_to_neighbor:** Distance away from neighbor (Num)
- **usaf:** United States Airforce (Str)
- **wban:** Weather Bureau Army Navy (Str)
- **station_id:** Station ID (Str)
- **neighbor_id:** Neighbor ID (Str)
- **neighbor_name:** Neighbor Name (Str)
- **neighbor_state:** Neighbor State (Str)
- **neighbor_call:** Neighbor Call (Str)

There are 5004169 rows, 5 numerical columns, and 7 categorical columns in the dataset.

In [None]:
df_stations.count()

5004169

In [None]:
stations_df = df_stations.toPandas()

plt.figure(figsize=(10, 8))
sns.histplot(data=stations_df, x="lon", y="lat", bins=100, cmap="Blues")
plt.title("Density Map of Stations")
plt.xlabel("Longitude")
plt.ylabel("Latitude")
plt.show()

It looks like weather stations are located mostly located close to each other, and it seems that there are two regions where there are many weather stations,

#### OTPW 3m Data

In [None]:
state_counts_df = df_otpw.groupBy("ORIGIN_STATE_NM").count().orderBy("ORIGIN_STATE_NM")
state_counts_df = state_counts_df.withColumn("ORIGIN_STATE_NM", col("ORIGIN_STATE_NM").cast("string"))
state_counts_pandas = state_counts_df.toPandas()

In [None]:
plt.figure(figsize=(12, 6))
sns.barplot(x="ORIGIN_STATE_NM", y="count", data=state_counts_pandas)
plt.xticks(rotation=90)
plt.xlabel("States")
plt.ylabel("Count")
plt.title("Histogram of Origin States")

plt.show()

In [None]:
state_counts_df = df_otpw.groupBy("DEST_STATE_NM").count().orderBy("DEST_STATE_NM")
state_counts_df = state_counts_df.withColumn("DEST_STATE_NM", col("DEST_STATE_NM").cast("string"))
state_counts_pandas = state_counts_df.toPandas()

In [None]:
plt.figure(figsize=(12, 6))
sns.barplot(x="DEST_STATE_NM", y="count", data=state_counts_pandas)
plt.xticks(rotation=90)
plt.xlabel("States")
plt.ylabel("Count")
plt.title("Histogram of Destination States")

plt.show()

#### Flight Codes Data

In [None]:
display(df_airport_codes.summary())

summary,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates,longitude,latitude
count,57421,57421,57421,49608.0,57421,57421,57421,51527,41561,9225,30030,57421,57421.0,57421.0
mean,2.3873375337777779E8,,,1253.371774713756,,,,,2.3350040754347825E8,,,,-34.63428714679448,25.530823541517584
stddev,9.492375382267495E8,,,1615.4825921097702,,,,,9.393019718598665E8,,,,79.8753094290784,27.306638654766694
min,00A,balloonport,"""""""Der Dingel"""" Airfield""",-1266.0,AF,AD,AD-04,"""""""Big"""" Rock Flat""",00A,-,-,"-0.004722000099718571, 9.425000190734863",-179.876998901,-90.0
25%,31.0,,,208.0,,,,,22.0,0.0,30.0,,-92.02359771728516,7.5
50%,79.0,,,722.0,,,,,70.0,0.0,79.0,,-71.437722,36.20019912719727
75%,7000.0,,,1519.0,,,,,200.0,0.0,1063.0,,14.50030040741,43.753056
max,spgl,small_airport,Å½ocenes lidlauks,29977.0,SA,ZW,ZW-U-A,Å½ocene,ZYYY,ZZV,ZZV,"99.9555969238, 8.47115039825",179.999894,90.0


In [None]:
display(df_airport_codes.dtypes)

_1,_2
ident,string
type,string
name,string
elevation_ft,int
continent,string
iso_country,string
iso_region,string
municipality,string
gps_code,string
iata_code,string


In [None]:
df_airport_codes = df_airport_codes.withColumn("longitude", split(col("coordinates"), ",").getItem(0).cast("double"))
df_airport_codes = df_airport_codes.withColumn("latitude", split(col("coordinates"), ",").getItem(1).cast("double"))

In [None]:
display(df_airport_codes)

ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates,longitude,latitude
00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125",-74.93360137939453,40.07080078125
00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022",-101.473911,38.704022
00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968",-151.695999146,59.94919968
00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172",-86.77030181884766,34.86479949951172
00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087",-91.254898,35.6087
00AS,small_airport,Fulton Airport,1100.0,,US,US-OK,Alex,00AS,,00AS,"-97.8180194, 34.9428028",-97.8180194,34.9428028
00AZ,small_airport,Cordes Airport,3810.0,,US,US-AZ,Cordes,00AZ,,00AZ,"-112.16500091552734, 34.305599212646484",-112.16500091552734,34.305599212646484
00CA,small_airport,Goldstone /Gts/ Airport,3038.0,,US,US-CA,Barstow,00CA,,00CA,"-116.888000488, 35.350498199499995",-116.888000488,35.350498199499995
00CL,small_airport,Williams Ag Airport,87.0,,US,US-CA,Biggs,00CL,,00CL,"-121.763427, 39.427188",-121.763427,39.427188
00CN,heliport,Kitchen Creek Helibase Heliport,3350.0,,US,US-CA,Pine Valley,00CN,,00CN,"-116.4597417, 32.7273736",-116.4597417,32.7273736


In [None]:
pandas_df = df_airport_codes.toPandas()

plt.figure(figsize=(10, 8))
sns.histplot(data=pandas_df, x="longitude", y="latitude", bins=100, cmap="Blues")
plt.title("Density Map of Airports")
plt.xlabel("Longitude")
plt.ylabel("Latitude")
plt.show()

In [None]:
# Print summaries for all columns
for i in range(0, len(col_list), 5):
    df_otpw.select(col_list[i:i+5]).summary().show()

## Modeling Pipeline

![Name of the image](https://i.imgur.com/OVhObhg.png)

### Feature Analysis

The feature analysis determined which features to implement in modeling and experiments.
 
The below lists the families of the features that were being considered:
| Family  | Features | Count Per Family |
|----------|----------|:----------:|
| Time | QUARTER, YEAR, MONTH, DAY_OF_MONTH, DAY_OF_WEEK, FL_DATE, sched_depart_date_time, two_hours_prior_depart_UTC | 8 |
| Flight Info | OP_UNIQUE_CARRIER, OP_CARRIER_AIRLINE_ID, CRS_ELAPSED_TIME, ACTUAL_ELAPSED_TIME, AIR_TIME, FLIGHTS, DISTANCE, DISTANCE_GROUP, TAXI_OUT, WHEELS_OFF, WHEELS_ON, TAXI_IN | 12 |
| Origin Location | ORIGIN_AIRPORT_ID, origin_airport_name, ORIGIN_AIRPORT_SEQ_ID, ORIGIN_CITY_MARKET_ID, ORIGIN, ORIGIN_CITY_NAME, ORIGIN_STATE_ABR, ORIGIN_STATE_FIPS, ORIGIN_STATE_NM | 9 |
| Destination Location | DEST_AIRPORT_ID, DEST_AIRPORT_SEQ_ID, dest_airport_lon, dest_airport_lat, DEST_CITY_MARKET_ID, DEST, DEST_CITY_NAME, DEST_STATE_ABR, DEST_STATE_FIPS, DEST_STATE_NM | 10 |
| Departure Time | CRS_DEP_TIME, DEP_TIME, DEP_DELAY, DEP_DELAY_GROUP, DEP_TIME_BLK, HOUR_BLK, TWO_HOUR_BLK | 7 |
| Arrival Time | CRS_ARR_TIME, ARR_TIME, ARR_DELAY, ARR_DEL15, ARR_DELAY_GROUP, ARR_TIME_BLK | 6 |
| Cancelled/Diverted | CANCELLED, CANCELLATION_CODE, DIVERTED | 3 |
| Delay Details | CARRIER_DELAY, WEATHER_DELAY, NAS_DELAY, SECURITY_DELAY, LATE_AIRCRAFT_DELAY | 6 |
| Station | STATION, DATE, LATITUDE, LONGITUDE, ELEVATION | 12 |
| Hourly Weather | HourlyPrecipitation, HourlyPresentWeatherType, HourlySkyConditions, HourlyVisibility, HourlyPressureChange, HourlyPressureTendency, HourlyStationPressure, HourlyWindDirection, HourlyWindGustSpeed, HourlyWindSpeed | 10 |

#### Pearson Correlation
The below Pearson Correlation analysis was performed on continuous and other numerical features, and the assumption was made that all features were I.I.D. The results indicate that features from the same family are more likely to have higher positive and negative correlations.

In [None]:
# Perform testing on certain features
correlation_col = df_otpw.select('CRS_DEP_TIME', 'CRS_ARR_TIME', 'ARR_TIME', 'ARR_DELAY', 'CRS_ELAPSED_TIME', 'ACTUAL_ELAPSED_TIME', 'AIR_TIME', 'dest_airport_lon', 'dest_airport_lat', 'LATITUDE', 'LONGITUDE', 'ELEVATION', 'CARRIER_DELAY', 'WEATHER_DELAY', 'NAS_DELAY', 'SECURITY_DELAY', 'LATE_AIRCRAFT_DELAY', 'HourlyPrecipitation', 'HourlyVisibility', 'HourlyPressureChange', 'HourlyPressureTendency', 'HourlyStationPressure', 'HourlyWindDirection', 'HourlyWindGustSpeed', 'HourlyWindSpeed', 'DEP_TIME', 'DEP_DELAY', 'TWO_HR_DELAY')

In [None]:
correl_col = [col for col, dtype in correlation_col.dtypes if dtype == "float" or dtype == "integer" or dtype == "double" or col == "TWO_HR_DELAY"]
print(correl_col)

df_otpw_corr = df_otpw[correl_col]
df_otpw_corr = df_otpw_corr.withColumn("TWO_HOUR_DELAY", df_otpw_corr["TWO_HR_DELAY"].cast(IntegerType()))

['ARR_DELAY', 'CRS_ELAPSED_TIME', 'ACTUAL_ELAPSED_TIME', 'AIR_TIME', 'dest_airport_lon', 'dest_airport_lat', 'LATITUDE', 'LONGITUDE', 'ELEVATION', 'CARRIER_DELAY', 'WEATHER_DELAY', 'NAS_DELAY', 'SECURITY_DELAY', 'LATE_AIRCRAFT_DELAY', 'HourlyVisibility', 'HourlyPressureChange', 'HourlyPressureTendency', 'HourlyStationPressure', 'HourlyWindDirection', 'HourlyWindGustSpeed', 'HourlyWindSpeed', 'DEP_DELAY', 'TWO_HR_DELAY']


**(The below correlation matrix has been commented to reduce the notebook download size)**

In [None]:
# df_corr = df_otpw_corr.toPandas() 
# correlation_matrix = df_corr.corr()
# correlation_matrix.style.background_gradient(cmap='coolwarm')

The following features had 1) the largest positive/negative correlations with the target variable as compared with other features belonging to the same feature family, and 2) were selected over similar features that shared high positive/negative correlation.


- **Flight Info:** Actual_Elapsed_Time
- **Destination Location:** dest_airport_lon & dest_airport_lat
- **Departure Time:** DEP_DELAY
- **Arrival Time:** ARR_DELAY

Correlations between the target variable and features from the Hourly Weather and Delay Details families were extremely low. However, there are indications that these features may require further feature engineering (such as scaling, log transformations, or dimension reduction) for a more thorough analysis and determination.

#### Chi-Squared Test
The following Chi-Squared testing was done on non-ordinal categorical features against the target variable, and the assumption that features are I.I.D. was made. With more time we would use additional tests such as ANOVA or time-based cross correlation to assess relationships. Only features with no NULL values were tested.

In [None]:
# Eliminate time and continous features
cat_col = ['OP_UNIQUE_CARRIER', 'ORIGIN_AIRPORT_ID', 'ORIGIN_CITY_MARKET_ID', 'ORIGIN_STATE_FIPS', 'DEST_AIRPORT_ID',
           'DEST_CITY_MARKET_ID', 'DEST_STATE_FIPS', 'DEP_TIME_BLK', 'ARR_TIME_BLK', 'CANCELLED', 'DIVERTED', 'DISTANCE_GROUP', 'HOUR_BLK']

# Create indexed columns based on categorical features
indexers = [StringIndexer(inputCol=col, outputCol=f'i_{col}', handleInvalid='keep')
            for col in cat_col]

for indexer in indexers:
    indexer_model = indexer.fit(df_otpw)
    df_otpw = indexer_model.transform(df_otpw)

In [None]:
# Convert indexed columns to vectors, perform Chi Squared Test
indexed_columns = [f'i_{col}' for col in cat_col]

assemblers = [(VectorAssembler(inputCols=[f'{col}'], outputCol=f'v{col}'), f'v{col}') for col in 
              indexed_columns]

for assembler in assemblers:
    df_VA = assembler[0].transform(df_otpw)
    df_VA.createOrReplaceTempView(f'va_{assembler[1]}')
    print(assembler[1])
    result = ChiSquareTest.test(df_VA, f'{assembler[1]}', 'TWO_HR_DELAY')
    result.show()

vi_OP_UNIQUE_CARRIER
+-------+----------------+-------------------+
|pValues|degreesOfFreedom|         statistics|
+-------+----------------+-------------------+
|  [0.0]|            [18]|[341853.5223240425]|
+-------+----------------+-------------------+

vi_ORIGIN_AIRPORT_ID
+-------+----------------+-------------------+
|pValues|degreesOfFreedom|         statistics|
+-------+----------------+-------------------+
|  [0.0]|           [368]|[453312.0082218895]|
+-------+----------------+-------------------+

vi_ORIGIN_CITY_MARKET_ID
+-------+----------------+--------------------+
|pValues|degreesOfFreedom|          statistics|
+-------+----------------+--------------------+
|  [0.0]|           [341]|[410607.31130195945]|
+-------+----------------+--------------------+

vi_ORIGIN_STATE_FIPS
+-------+----------------+-------------------+
|pValues|degreesOfFreedom|         statistics|
+-------+----------------+-------------------+
|  [0.0]|            [52]|[263208.9869696186]|
+-------+--

In [None]:
# Clean up indexed columns and replace original column
df_otpw = df_otpw.drop('OP_UNIQUE_CARRIER', 'ORIGIN_AIRPORT_ID', 'ORIGIN_CITY_MARKET_ID', 'ORIGIN_STATE_FIPS', 
                       'DEST_AIRPORT_ID', 'DEST_CITY_MARKET_ID','DEST_STATE_FIPS', 'DEP_TIME_BLK', 'ARR_TIME_BLK', 'CANCELLED', 'DIVERTED', 'DISTANCE_GROUP')

df_otpw = df_otpw.withColumnRenamed("i_OP_UNIQUE_CARRIER","OP_UNIQUE_CARRIER")
df_otpw = df_otpw.withColumnRenamed("i_ORIGIN_AIRPORT_ID","ORIGIN_AIRPORT_ID")
df_otpw = df_otpw.withColumnRenamed("i_ORIGIN_CITY_MARKET_ID","ORIGIN_CITY_MARKET_ID")
df_otpw = df_otpw.withColumnRenamed("i_ORIGIN_STATE_FIPS","ORIGIN_STATE_FIPS")
df_otpw = df_otpw.withColumnRenamed("i_DEST_AIRPORT_ID","DEST_AIRPORT_ID")
df_otpw = df_otpw.withColumnRenamed("i_DEST_CITY_MARKET_ID","DEST_CITY_MARKET_ID")
df_otpw = df_otpw.withColumnRenamed("i_DEST_STATE_FIPS","DEST_STATE_FIPS")
df_otpw = df_otpw.withColumnRenamed("i_DEP_TIME_BLK","DEP_TIME_BLK")
df_otpw = df_otpw.withColumnRenamed("i_ARR_TIME_BLK","ARR_TIME_BLK")
df_otpw = df_otpw.withColumnRenamed("i_CANCELLED","CANCELLED")
df_otpw = df_otpw.withColumnRenamed("i_DIVERTED","DIVERTED")
df_otpw = df_otpw.withColumnRenamed("i_DISTANCE_GROUP","DISTANCE_GROUP")

##### Chi-Squared Results

Based on the test results, the Chi-Squared Test suggests a significant association between each tested feature and the target variable (with p-value = 0.05). For example, the DISTANCE_GROUP feature had a Chi-Squared score of 22,436.97 and a p-value of 0.0, which suggests a significant association between the flight carrier and having a departure delay two hours ahead.

#### Dataset Checkpoint (Feature Analysis)

In [None]:
# Write data frame into blob storage
df_otpw.write.mode("overwrite").parquet(f"{blob_url}/OTPW_60m_FA")

In [None]:
# Load checkpointed dataframe
df_otpw = spark.read.parquet(f"{blob_url}/OTPW_60m_FA")

Concluding the feature analysis, the following features underwent feature engineering:

- **Hourly Weather Family:** HourlyPrecipitation, HourlyVisibility, HourlyPressureChange, HourlyPressureTendency, HourlyStationPressure, HourlyWindDirection, HourlyWindSpeed

- dest_airport_lon, dest_airport_lat

- **Delay Details Family:** CARRIER_DELAY, WEATHER_DELAY, NAS_DELAY, SECURITY_DELAY, LATE_AIRCRAFT_DELAY, ARR_DELAY, DEP_DELAY

- CANCELLED

- DIVERTED

- **Flight Info Family:** OP_UNIQUE_CARRIER, ORIGIN_AIRPORT_ID, ORIGIN_CITY_MARKET_ID, ORIGIN_STATE_FIPS, DEST_AIRPORT_ID, DEST_CITY_MARKET_ID, DEST_STATE_FIPS

- FL_DATE

### Feature Engineering

Feature engineering is a critical step in the development of robust machine learning models. In our effort to optimize the dataset, which contains both flight information and hourly weather data, we utilized a range of techniques tailored to the unique characteristics of our data. We applied Log Transformations and Min/Max Scaling to address distribution and scale issues, and turned to Principal Component Analysis (PCA) for dimensionality reduction. Interaction Effects were incorporated to capture the combined influence of features, while Graph-Based Features tapped into structured data relationships. Additionally, One-Hot Encoding ensured we effectively processed categorical data. Each method was carefully chosen to mold our dataset into an optimized form for predictive modeling.

Below is a table containing the feature strategy by feature.

| Feature(s)  | Strategy | Reason |
|----------|----------|----------|
| Origin & Destination Families | One-Hot Encoding, Graph-Based Engineering | There was reason to indicate that the Origin & Destination families could have a relationship with the target variable, which will be confirmed with further engineering. |
| Cancelled/Diverted Family | One-Hot Encoding | There was reason to indicate that the Cancelled/Diverted family could have a relationship with the target variable, which will be confirmed with further engineering. |
| Hourly Weather Family | Log Transformation, Min/Max Scaling | There was reason to indicate that the Hourly Weather family could have a relationship with the target variable, which will be confirmed with further engineering. |
| Delay Details Family | PCA, Scaling | There was similarity and high correlation between these features and the target. Since the minutes vary greatly, scaling will also be applied. |
| dest_airport_lon, dest_airport_lat | Interaction Terms |  There was similarity and high correlation between these features and the target. |
| FL_DAT | Event-Based Feature Engineering |  A separate feature was created to take into consideration both Christmas and New Year's holidays. |

#### Log Transformation

In the course of our data preprocessing, we applied a logarithmic transformation to the weather variables within our dataset. This transformation was implemented to address the non-linearity and potential skewness present in some of the meteorological measurements. By applying the logarithm, we aimed to normalize the distribution of these features, ensuring they align better with the requirements of our predictive models.

In [None]:
# Weather data to transform
columns_to_transform = ['HourlyVisibility', 'HourlyPressureChange', 'HourlyPressureTendency', 'HourlyStationPressure', 'HourlyWindDirection', 'HourlyWindGustSpeed', 'HourlyWindSpeed']

# Log transformation
for col_name in columns_to_transform:
    # Appends new columns with log transformation data
    #df_otpw = df_otpw.withColumn(f"{col_name}_log", log(df_otpw[col_name] + 1))
    # to replace the values in the original dataset with the log transformation uncomment below
    df_otpw = df_otpw.withColumn(col_name, log(df_otpw[col_name] + 1))

#### Principal Component Analysis (PCA)

To reduce the number of dimensions of the Delay Details Family, PCA was used to create a separate PCA feature. Each Delay Details Family feature allocated the number of minutes the flight was delayed to the category of delay. For example, a flight that was delayed by 20 minutes due to weather and the carrier could split 15 minutes to `WEATHER_DELAY` and 5 minutes to `CARRIER_DELAY`.

The six features were condensed into three principal components to eliminate noise and retain only the most important information.

In [None]:
# Apply PCA to Delay Details Family
delay_list = ["CARRIER_DELAY", "WEATHER_DELAY", "NAS_DELAY", "SECURITY_DELAY", "LATE_AIRCRAFT_DELAY", "DEP_DELAY"]

df_delay_pca = df_otpw.fillna(0, subset=delay_list)

assembler = VectorAssembler(inputCols=delay_list, outputCol="delay_features", handleInvalid="skip")
scaler = StandardScaler(inputCol="delay_features", outputCol="scaled_delay_features", withStd=True, withMean=True)

In [None]:
# PCA w/ 3 principal components
k = 3
pca = PCA(k=k, inputCol="scaled_delay_features", outputCol="pca_delay_features")

pca_pipeline = Pipeline(stages=[assembler, scaler, pca])

pipeline_model = pca_pipeline.fit(df_delay_pca)
df_otpw = pipeline_model.transform(df_delay_pca)

In [None]:
# Clean up scaled columns and replace original column
df_otpw = df_otpw.drop('scaled_delay_features')

#### Scaling

When analyzing the features within the Delay Details Family, it became evident that they exhibited significant ranges and long-tail distributions. These values spanned from below zero to thousands of minutes. In order to standardize the range and distribution, each feature underwent individual scaling.

In [None]:
# Individual Scaling of each feature in Delay Details Family
stages = []

for col in delay_list:
    assembler = VectorAssembler(inputCols=[col], outputCol=f"vect_{col}", handleInvalid="keep")
    scaler = StandardScaler(inputCol=f"vect_{col}", outputCol=f"scaled_{col}")
    
    stages.extend([assembler, scaler])

pipeline = Pipeline(stages=stages)
model = pipeline.fit(df_otpw)

df_scaled = model.transform(df_otpw)

In [None]:
# Clean up scaled columns and replace original column
df_otpw = df_scaled.drop('CARRIER_DELAY', 'vect_CARRIER_DELAY', 'WEATHER_DELAY', 'vect_WEATHER_DELAY', 'NAS_DELAY', 'vect_NAS_DELAY', 'SECURITY_DELAY', 'vect_SECURITY_DELAY', 'LATE_AIRCRAFT_DELAY', 'vect_LATE_AIRCRAFT_DELAY', 'DEP_DELAY', 'vect_DEP_DELAY')

df_otpw = df_otpw.withColumnRenamed("scaled_CARRIER_DELAY","CARRIER_DELAY")
df_otpw = df_otpw.withColumnRenamed("scaled_WEATHER_DELAY","WEATHER_DELAY")
df_otpw = df_otpw.withColumnRenamed("scaled_NAS_DELAY","NAS_DELAY")
df_otpw = df_otpw.withColumnRenamed("scaled_SECURITY_DELAY","SECURITY_DELAY")
df_otpw = df_otpw.withColumnRenamed("scaled_LATE_AIRCRAFT_DELAY","LATE_AIRCRAFT_DELAY")
df_otpw = df_otpw.withColumnRenamed("scaled_DEP_DELAY","DEP_DELAY")

#### Interaction Effects

Since latitude and longitude were interacted with each other, we added both latitude and longitude to serve as one feature in the model.

In [None]:
# Sum Latitude and Longitude
df_otpw = df_otpw.withColumn("sum_lat_lon", col("dest_airport_lat") + col("dest_airport_lon"))

#### Min/Max Scaling 

To harmonize the scale of key weather attributes in our dataset, we utilized the Min/Max scaling technique. This approach linearly adjusts each feature to reside specifically within the [0, 1] range, ensuring that no single attribute exerts undue influence on the predictive model due to scale disparities. An integral part of our methodology involved managing null values by temporarily substituting them with a default value for smooth scaling operations. Once the scaling was completed, these substituted values were reverted to their original null status, preserving the integrity of our dataset. Post-processing also entailed the removal of auxiliary columns, maintaining a streamlined data structure.

In [None]:
# Define UDF 
first_element_udf = udf(lambda v: float(v[0]), DoubleType())

# List of columns to be scaled
weather_feat_col = ['HourlyVisibility', 'HourlyPressureChange', 'HourlyPressureTendency', 'HourlyStationPressure', 'HourlyWindDirection', 'HourlyWindSpeed']

# Define a default value for nulls
default_value = 0.0

for col_name in weather_feat_col:
    # Create a new column indicating whether the original value was null
    df_otpw = df_otpw.withColumn(col_name + "_was_null", col(col_name).isNull())

    # Fill null values with the default value
    df_otpw = df_otpw.fillna({col_name: default_value})

    # VectorAssembler Transformation
    assembler = VectorAssembler(inputCols=[col_name], outputCol=col_name + "_vec")

    # MinMaxScaler Transformation
    scaler = MinMaxScaler(inputCol=col_name + "_vec", outputCol=col_name + "_scaled_vec")

    # Construct pipeline
    pipeline = Pipeline(stages=[assembler, scaler])

    # Fit the pipeline model
    pipeline_model = pipeline.fit(df_otpw)

    # Use the pipeline model to transform the data
    df_otpw = pipeline_model.transform(df_otpw)

    # Where the "_was_null" column is true, replace the value with null; otherwise, use the scaled value
    df_otpw = df_otpw.withColumn(
        col_name,
        when(col(col_name + "_was_null"), None).otherwise(first_element_udf(col(col_name + "_scaled_vec")))
    )

    # Drop auxiliary columns: original, vectorized, scaled vector, and "_was_null"
    df_otpw = df_otpw.drop(col_name + "_vec").drop(col_name + "_scaled_vec").drop(col_name + "_was_null")



In [None]:
display( df_otpw.limit(10))

#### Graph-Based Feature

Graph-based approaches involve representing data as a network of interconnected nodes and edges, where nodes represent entities (in this case, airports, flights, etc.), and edges represent relationships between those entities (such as flight routes, connections, etc.). In the case of this feature, we count the number of unique outgoing flights from an origin airport. The theory is that an airport with higher counts of these flights will be more likely to have delays.

In [None]:
df_otpw.select("ORIGIN_AIRPORT_ID", "DEST_AIRPORT_ID").limit(100).show()

In [None]:
df_otpw.groupBy("ORIGIN_AIRPORT_ID", "DEST_AIRPORT_ID") \
                               .agg(F.count("*").alias("count")).show()

In [None]:
airport_counts = df_otpw.groupBy("ORIGIN_AIRPORT_ID", "DEST_AIRPORT_ID") \
                               .agg(F.count("*").alias("count"))

airport_counts = airport_counts.withColumnRenamed("count", "ORIGIN_DESTINATION_AIRPORT_COUNT")

# CHANGE HERE IF NEEDED
df_otpw = df_otpw.join(airport_counts, 
                       df_otpw.ORIGIN_AIRPORT_ID == airport_counts.ORIGIN_AIRPORT_ID, 
                       "left").drop(df_otpw.ORIGIN_AIRPORT_ID, df_otpw.DEST_AIRPORT_ID)

# display(df_otpw.limit(100))

In [None]:
exploratory_df = df_otpw.select("ORIGIN_DESTINATION_AIRPORT_COUNT", "TWO_HR_DELAY").toPandas()

In [None]:
plt.hist(exploratory_df[exploratory_df["TWO_HR_DELAY"] == 0]["ORIGIN_DESTINATION_AIRPORT_COUNT"])
plt.show()

In [None]:
plt.hist(exploratory_df[exploratory_df["TWO_HR_DELAY"] == 1]["ORIGIN_DESTINATION_AIRPORT_COUNT"])
plt.show()

In [None]:
ztest(exploratory_df[exploratory_df["TWO_HR_DELAY"] == 0]["ORIGIN_DESTINATION_AIRPORT_COUNT"], exploratory_df[exploratory_df["TWO_HR_DELAY"] == 1]["ORIGIN_DESTINATION_AIRPORT_COUNT"])

#### One-Hot Encoding

Since location-based features and features in the Cancelled/Diverted Family were both categorial and non-ordinal in nature, one-hot encoding was employed. Values were assigned to a binary matrix. One-hot encoding preserved the information of features without assuming any ordering to the values.

In [None]:
# List of categorical columns
categorical_cols = ['OP_UNIQUE_CARRIER', 'ORIGIN_AIRPORT_ID', 'ORIGIN_CITY_MARKET_ID', 'ORIGIN_STATE_FIPS', 
                    'DEST_AIRPORT_ID', 'DEST_CITY_MARKET_ID', 'DEST_STATE_FIPS', 'CANCELLED', 'DIVERTED', 'DISTANCE_GROUP']

# Create encoder for each categorical column
encoders = [OneHotEncoder(inputCol=col, outputCol=f'onehot_{col}') for col in categorical_cols]

# Apply the encoders to the DataFrame
for encoder in encoders:
    model = encoder.fit(df_otpw)
    df_otpw = model.transform(df_otpw)

#### Event-Based Feature Engineering
Since Christmas and New Years are major holidays that produce many flights, we want to create a column that indicates whether the flight is on Christmas or New Year's to add as a feature in our model.

In [None]:
# Add a new column 'is_christmas_or_newyear' to the DataFrame
df_otpw = df_otpw.withColumn("is_christmas_or_newyear",
                                  when((month(col("FL_DATE")) == 12) & (dayofmonth(col("FL_DATE")) == 25) |
                                       (month(col("FL_DATE")) == 1) & (dayofmonth(col("FL_DATE")) == 1), True)
                                  .otherwise(False))

In [None]:
# Count the number of null values in each column
display(df_otpw2.select([count(when(df_otpw2[c].isNull(), c)).alias(c) for c in df_otpw2.columns]))

QUARTER,YEAR,MONTH,DAY_OF_MONTH,DAY_OF_WEEK,FL_DATE,CRS_DEP_TIME,DEP_DELAY_GROUP,CRS_ARR_TIME,ARR_TIME,ARR_DELAY_GROUP,CANCELLATION_CODE,CRS_ELAPSED_TIME,ACTUAL_ELAPSED_TIME,AIR_TIME,DISTANCE,sched_depart_date_time,two_hours_prior_depart_UTC,dest_airport_lon,dest_airport_lat,STATION,DATE,LATITUDE,LONGITUDE,ELEVATION,DEP_TIME,HourlyPrecipitation,HourlyVisibility,HourlyPresentWeatherType,HourlySkyConditions,HourlyPressureChange,HourlyPressureTendency,HourlyStationPressure,HourlyWindDirection,HourlyWindGustSpeed,HourlyWindSpeed,ARR_DELAY,ARR_DEL15,DEP_DEL15,DEP_TIME_BLK_DET,HOUR_BLK,HOUR_BLK_2,TWO_HR_PROB,TWO_HR_DELAY,i_HOUR_BLK,delay_features,pca_delay_features,CARRIER_DELAY,WEATHER_DELAY,NAS_DELAY,SECURITY_DELAY,LATE_AIRCRAFT_DELAY,DEP_DELAY,sum_lat_lon
0,0,0,0,0,0,0,471812,0,496311,564809,31138080,163,562217,562217,0,0,0,0,0,0,0,0,0,31622477,466869,28275831,0,20780501,3041246,20773284,0,0,27471354,0,31565291,202424,202424,0,0,716854,716854,0,0,0,0,0,0,0,0,0,0,0,0


Databricks data profile. Run in Databricks to view.

#### Dataset Checkpoint (Feature Engineering)

In [None]:
# Write data frame into blob storage
df_otpw.write.mode("overwrite").parquet(f"{blob_url}/OTPW_60m_feat")

In [None]:
# Load checkpointed dataframe
df_otpw = spark.read.parquet(f"{blob_url}/OTPW_60m_feat")

### Data Splitting & Cross Validation

For data splitting, the time-based aspect of the dataset for both regular train/test splits and cross validation splits was taken into consideration. In regular train/test splitting, data from 2019 was used as the test set. For time-based cross validation, the number of folds was based on the number of years in the dataset, with each fold having a single year's data for the test set.

In the future as time permits, we would expand the dataset to include data from 2020-2022, although these years are typically considered anomalous due to the COVID-19 pandemic.

#### Traditional Train/Test Split

In [None]:
# Select features & label
feat_col = [col for col in df_otpw.columns if col != 'TWO_HR_DELAY']
label_col = 'TWO_HR_DELAY'

In [None]:
# Split data by year for cross validation and create artificial folds
split_df = df_otpw.select('YEAR').distinct().collect()
split_list = [row.YEAR for row in split_df]

modeling_df = df_otpw.select(*feat_col, label_col)

train_data = modeling_df.filter(modeling_df.YEAR != 2019).orderBy(rand())
test_data = modeling_df.filter(modeling_df.YEAR == 2019).orderBy(rand())
split_data = (train_data, test_data)

#### Cross Validation

Cross-validation was implemented with the number of folds set equal to the number of years present in the dataset. The dataset was split into equal parts based on the number of years. Each part was used as a validation set while the remaining parts were used for training. This approach helps to assess the models' performances across different years and ensures that each year's data contributes to both training and validation, leading to more robust evaluations.

In [None]:
# Split data by year for cross validation and create artificial folds
cv_split_df = df_otpw.select('YEAR').distinct().collect()
cv_split_list = [row.YEAR for row in cv_split_df]

cv_modeling_df = df_otpw.select(*feat_col, label_col)

fold_list = []
for i in cv_split_list:
    cv_train_data = cv_modeling_df.filter(cv_modeling_df.YEAR != i).orderBy(rand())
    cv_val_data = cv_modeling_df.filter(cv_modeling_df.YEAR == i).orderBy(rand())
    fold_list.append((cv_train_data, cv_val_data))

### Class Imbalance & Resampling

The flight prediction training set was analyzed for class imbalance and corrected using resampling. Both undersampling and oversampling techniques were considered.

#### Undersampling

We explored undersampling as a robust option to correct for class imbalance. In undersampling, the majority class is reduced to balance the distribution and decrease modeling bias. Since it was important to maintain the integrity of the data, the strategy applied a 9:1 ratio of non-delayed flights to delayed flights.

In [None]:
# Look at label distribution
display(train_data.groupBy("TWO_HR_DELAY").agg(count("TWO_HR_DELAY").alias("Count")))

TWO_HR_DELAY,Count
1,1757016
0,22487652


Databricks visualization. Run in Databricks to view.

In [None]:
# Perform undersampling
major_class = train_data.filter(train_data.TWO_HR_DELAY == 0)
minor_class = train_data.filter(train_data.TWO_HR_DELAY == 1)

# Check current ratio
major_count = major_class.count()
minor_count = minor_class.count()
print(f"Before Undersampling Label Ratio: {major_count / minor_count}")

# Undersample to 9:1 ratio for moderate adjustment
fraction = 9 * (minor_count / major_count)

# Perform undersampling
sample_maj = major_class.sample(withReplacement=False, fraction=fraction)
print(f"After Undersampling Label Ratio: {sample_maj.count() / minor_count}")

train_data_undersample = minor_class.union(sample_maj)

Before Undersampling Label Ratio: 12.782856000921983
After Undersampling Label Ratio: 8.998379149561632


In [None]:
# Undersampling results
display(train_data_undersample.groupBy("TWO_HR_DELAY").agg(count("TWO_HR_DELAY").alias("Count")))

TWO_HR_DELAY,Count
1,1761421
0,15850105


Databricks visualization. Run in Databricks to view.

#### Adaptive Synthetic Sampling (ADASYN)

The second resampling strategy explored was the oversampling method of Adaptive Synthetic Sampling. In this approach, ADASYN takes instances in the minority class that are considered more difficult to classify (interpreted as instances close to the decision boundary), and creates synthetic samples to balance the dataset. This helps with eliminating blind spots in data and improving model performance. ADASYN was selected over Synthetic Minority Oversampling Technique (SMOTE), as SMOTE considers all instances of the minority class equally when creating synthetic samples.

The ADASYN sampling method was based on selecting the most relevant features with no nulls, and turning them into vectors to create synthetic samples. ADASYN then uses the 'minority' sampling strategy to select instances from the minority class for resampling.

In [None]:
adasyn_feat = ['QUARTER', 'YEAR', 'MONTH', 'DAY_OF_MONTH', 'DAY_OF_WEEK', 'sum_lat_lon', 'OP_UNIQUE_CARRIER', 'ORIGIN_AIRPORT_ID', 'HOUR_BLK', 'DEP_DELAY', 'DISTANCE']

In [None]:
# Prepare data for ADASYN
assembler = VectorAssembler(inputCols=adasyn_feat, outputCol="features", handleInvalid="skip")
train_assembled = assembler.transform(train_data)
train_oversampled = train_assembled.select("features", label_col)

In [None]:
# Apply ADASYN to training data
adasyn = ADASYN(sampling_strategy="minority", random_state=42)

x_train = train_oversampled.select("features").toPandas()
y_train = train_oversampled.select(label_col).toPandas()

x_resampled, y_resampled = adasyn.fit_resample(x_train, y_train)

# Convert resampled data back to dataframe
train_data_adasyn = spark.createDataFrame(np.column_stack((x_resampled, y_resampled)), ["features", label_col])

#### Selected Class Imbalance Strategy

Ultimately, the resampling strategy we selected was undersampling, as it was simpler to implement and did not require major/direct modifications to the data that would alter individual feature distributions. ADASYN does not handle NULL values, which means complete removal or imputation. Additionally, this strategy does not allow vectorized datatypes (i.e.; one-hot encodings).

Like SMOTE, ADASYN is more complicated to parallelize and requires fitting the dataset into memory. As a result, resampling using ADASYN required a lot of time. Given more time to either hone the group of features used or create a thorough imputation strategy, we would explore ADASYN more effectively.

In [None]:
# Undersampling
train_data = train_data_undersample
split_data = (train_data_undersample, test_data)

# ADASYN
# train_data = train_data_adasyn
# split_data = (train_data_adasyn, test_data)

In [None]:
# If data needs to be checkpointed again, run this to get no nulls:
df_otpw = df_otpw.filter(df_otpw.HourlyVisibility.isNotNull())
train_data = train_data.filter(train_data.HourlyVisibility.isNotNull())
test_data = test_data.filter(test_data.HourlyVisibility.isNotNull())

In [None]:
# Write data frame into blob storage
df_otpw.write.mode("overwrite").parquet(f"{blob_url}/OTPW_60m_under")

train_data.write.mode("overwrite").parquet(f"{blob_url}/OTPW_60m_train")
test_data.write.mode("overwrite").parquet(f"{blob_url}/OTPW_60m_test")

### Hyperparameter Tuning

Hyperopt was chosen for distributed hyperparameter tuning across all models due to its scalability, automated and efficient searching, and parallelization. Instead of manually updating hyperparameters, Hyperopt iterates through a predefined search space for each model, efficiently using the Tree-Structured Parzen Estimator (TPE) algorithm. TPE is a Bayesian optimization algorithm that traverses the search space and selects hyperparameters based on past performance, making it more sophisticated than methods such as random search.

To perform hyperparameter tuning, the tuning function takes in a set of model variables and a pipeline that handles vectorized data and the model. The tuning function then utilizes the variables for the objective function and producing evaluation metrics. Hyperopt's `Trials` function then logs and identifies the best set of parameters by minimizing the objective function.

By using Hyperopt, finding the best set of parameters for each model becomes more efficient. It streamlines the tuning process and contributes to improving model performance across all models. The below cells contain a hypertuning function for a normal train/validation split, and a second cell for hypertuning with time-based cross validation.

**The below section sets the hypertuning function for use in the Experiments & Modeling phase.**

In [None]:

# Pipeline function
def pipelineFunc(model, feat_col, label_col):
    assembler = VectorAssembler(inputCols=feat_col, outputCol="vect_feat", handleInvalid="keep")
    model.setFeaturesCol("vect_feat")
    model.setLabelCol(label_col)

    pipeline = Pipeline(stages=[assembler, model])
    
    return pipeline

In [None]:
# Main tuning function w/ objective function
def hyperTuneFunc(data, pipeline, evaluator, modelStrTitle, experimentNum, searchSpace, maxEvals=10):
    with mlflow.start_run(run_name=f"{modelStrTitle}_{experimentNum}"):

        def objectiveFunc(params):
            with mlflow.start_run(nested=True):
                pipeline.getStages()[-1].setParams(**params)

                train, val = data
                model = pipeline.fit(train)
                pred = model.transform(val)
                metrics = evaluator.evaluate(pred)

                mlflow.log_metric(f"AUC", metrics)
                mlflow.log_params(params)
                                
            return -metrics
                
        trials=Trials()
        best_params = fmin(fn=objectiveFunc, 
                        space=searchSpace, 
                        algo=tpe.suggest, 
                        trials=trials, 
                        max_evals=maxEvals)

        best_auc = -trials.best_trial["result"]["loss"]
        return best_params, best_auc

#### Hypertuning with Time-Based Cross Validation

The below function requires cross validation splits using time-based folds and the pipeline function from above.

In [None]:
def cv_hyperTuneFunc(foldList, pipeline, evaluator, paramGrid, modelStrTitle, experimentNum, searchSpace, maxEvals=10):
    with mlflow.start_run(run_name=f"{modelStrTitle}_hypertuning_{experimentNum}"):
        cross_validator = CrosvsValidator(estimator=pipeline,
                                        estimatorParamMaps=paramGrid,
                                        evaluator=evaluator)

        def objectiveFunc(params):
            with mlflow.start_run(nested=True):
                pipeline.getStages()[-1].setParams(**params)

                avg_metric = []
                for fold, (train, val) in enumerate(foldList):
                    cvmodel = cross_validator.fit(train)
                    cv_pred = cvmodel.transform(val)
                    fold_metrics = evaluator.evaluate(cv_pred)

                    avg_metric.append(fold_metrics)

                    mlflow.log_metric(f"{fold}_metric_auc", fold_metrics)
                    mlflow.log_params(params)

                avg_metric = sum(avg_metric) / len(foldList)

                mlflow.log_metric("overall_auc", avg_metric)
                                
            return -avg_metric
                
        trials=Trials()
        best_params = fmin(fn=objectiveFunc, 
                        space=searchSpace, 
                        algo=tpe.suggest, 
                        trials=trials, 
                        max_evals=maxEvals)

        best_auc = -trials.best_trial["result"]["loss"]
        return best_params, best_auc

### Load Checkpointed Data for Modeling

In [None]:
# Load checkpointed dataframe
df_otpw = spark.read.parquet(f"{blob_url}/OTPW_60m_under")

train_data = spark.read.parquet(f"{blob_url}/OTPW_60m_train")
test_data = spark.read.parquet(f"{blob_url}/OTPW_60m_test")

### Experiments & Modeling

**Models**
- **Baseline:** Logistic Regression
- **Model 2:** Decision Tree
- **Model 3:** Random Forest
- **Model 4:** K-Nearest Neighbors (KNN)

**Sophisticated Models**
- **Model 5:** XGBoost
- **Model 6:** Multilayer Perceptron (MLP)
- **Model 7:** Long Short-Term Memory (LSTM)
- **Model 8:** Convolutional Neural Network (CNN)

In the first two phases of modeling, we explored several machine learning models: logistic regression, decision trees, random forests, and K-Nearest Neighbors (KNN). 

Logistic regression was employed for its simplicity and interpretability, providing insights into the significance of features. Decision trees and random forests were chosen due to their ability to handle non-linear relationships and handle feature interactions effectively. Meanwhile, KNN was tested for its suitability in capturing local patterns and considering the neighbors' labels. Given the immense size of our dataset, for KNN, we opted to work with a representative subset to efficiently identify patterns and optimize model performance.

To ensure robust evaluation, we employed cross-validation techniques, dividing the data into multiple folds, and measuring each model's performance on unseen data. Additionally, we used appropriate evaluation metrics such as accuracy, precision and recall
This phase of experiments and modeling phase will allowed us to gain valuable insights into the strengths and weaknesses of different algorithms in predicting flight delays. The results will guide our selection of the most suitable model for further optimization and deployment in the final phase of the algorithm development process.

#### Baseline Model: Logistic Regression Model

##### Picking Features for our Baseline

We wanted to include some basic feature selection in phases II and III. We took the cleaned data and created box plots comparing the various features against the two labels (DEP_DEL15). Box plots that appeared visually different were selected as features because they would serve as better diferentiators compared to other features.

**(The below box plot grid was not run to reduce the notebook download size)**

In [None]:
pandas_df = df_otpw.toPandas()

# Filter columns with data type int, float32, or float64
numerical_columns = pandas_df.select_dtypes(include=['int', 'float32', 'float64']).columns

# Create box plots for each numerical feature based on the target variable
target_col = "DEP_DEL15"

# Calculate the number of rows and columns for the subplots
num_plots = len(numerical_columns) - 1  # Exclude the target column from the plots
num_cols = 3  # Number of columns in the subplot grid
num_rows = math.ceil(num_plots / num_cols)

# Create the subplot grid
fig, axs = plt.subplots(num_rows, num_cols, figsize=(15, 5 * num_rows))

# Flatten the axis array in case we have only one row in the subplot grid
if num_rows == 1:
    axs = [axs]

# Iterate through each numerical feature column
for i, column in enumerate(numerical_columns):
    if column != target_col:  # Skip the target column
        row = i // num_cols
        col = i % num_cols
        
        axs[row][col].set_title(f"{column} vs. {target_col}")
        axs[row][col].set_xlabel(target_col)
        axs[row][col].set_ylabel(column)

        # Plot box plots for target values 0 and 1
        boxplot_data = [pandas_df[column][pandas_df[target_col] == 0],
                        pandas_df[column][pandas_df[target_col] != 0]]
        boxplot_labels = [f"{target_col}=0", f"{target_col}=1"]
        
        axs[row][col].boxplot(boxplot_data, labels=boxplot_labels)

# Remove any empty subplots
for i in range(len(numerical_columns), num_rows * num_cols):
    fig.delaxes(axs.flatten()[i])

# Adjust layout and spacing
plt.tight_layout()
plt.show()

The features used for the baseline logistic model in phases II and III to predict the target variable DEP_DEL15 are DAY_OF_MONTH, DISTANCE, and sum_lat_lon.
Below is a data dictionary for variables used. 

##### Data Dictionary:

DAY_OF_MONTH: Day of the month

DISTANCE: Distance between airports measured in miles

sum_lat_lon: An interaction feature combining Latitude of departure airport and Longitude of departure airport

DEP_DEL15: Departure Delay Indicator, 15 Minutes or More (1=Yes)

In [None]:
df_otpw_train = train_data.select(['DAY_OF_MONTH', 'DISTANCE', 'sum_lat_lon',  'DEP_DEL15'])
df_otpw_test = test_data.select(['DAY_OF_MONTH', 'DISTANCE', 'sum_lat_lon',  'DEP_DEL15'])

In [None]:
feature_columns = ['DAY_OF_MONTH', 'DISTANCE', 'sum_lat_lon']
target_label = 'DEP_DEL15'

# Prepare data
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
otpw_train_transform = assembler.transform(df_otpw_train)
otpw_test_transform = assembler.transform(df_otpw_test)

# Split the data into training,test,validation sets
# train_data, test_data, validation_data = otpw_model_transform.randomSplit([0.7, 0.15, 0.15], seed=100)

# Create a Logistic Regression model
lr = LogisticRegression(featuresCol="features", labelCol=target_label)

# Train the model on the training data
lr_model = lr.fit(otpw_train_transform)

In [None]:
summary = lr_model.summary
coefficients = lr_model.coefficients
coefficients_by_feature = [(feature_columns[i], coefficients[i]) for i in range(len(feature_columns))]

print("Coefficients:")
for feature, coeff in coefficients_by_feature:
    print(f"{feature}: {coeff}")
print("Area under ROC:", summary.areaUnderROC)
print("Accuracy:", summary.accuracy)

In [None]:
# 1. Make predictions on the test_data
predictions = lr_model.transform(otpw_test_transform)

# 2. Calculate accuracy
correct_predictions = predictions.filter(predictions['prediction'] == predictions[target_label])
accuracy = correct_predictions.count() / predictions.count()
print("Accuracy on test_data: {:.2f}%".format(accuracy * 100))

# 3. Calculate ROC and AUC
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol=target_label)
roc_auc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})
print("ROC AUC on test_data: {:.2f}".format(roc_auc))

##### Logistic Regression Hypertuning

In [None]:
# Set model
lr = LogisticRegression(labelCol=label_col,
                        featuresCol="vect_feat",
                        )

# Set evaluator and metrics
evaluator = BinaryClassificationEvaluator(labelCol=label_col, 
                                          metricName="areaUnderROC")

# SET SEARCH SPACE FOR HYPERTUNING
search_space = {"regParam": hp.uniform("regParam", 0, 1),
                "elasticNetParam": hp.uniform("elasticNetParam", 0, 1)
               }

# Set max evaluations
max_evals = 1

# Set model type, experiment number for logging
model_type = "lr"
experiment_num = 1

# Set pipeline
pipeline = pipelineFunc(lr, feat_col, label_col)

In [None]:
best_params, best_auc = hyperTuneFunc(split_data, pipeline, evaluator, model_type, experiment_num, lr_search_space, max_evals)

#### Model 2: Decision Tree Classifier

In [None]:
df_train_dt = train_data.withColumn("DISTANCE_GROUP", col("DISTANCE_GROUP").cast(FloatType()))
df_test_dt = test_data.withColumn("DISTANCE_GROUP", col("DISTANCE_GROUP").cast(FloatType()))

In [None]:
df_train_dt_model = df_train_dt.select(['DAY_OF_MONTH', 'MONTH', 'DISTANCE', 'sum_lat_lon', 'HourlyVisibility', 'HourlyPrecipitation', 'HourlyWindSpeed', 'ELEVATION', 'DISTANCE_GROUP', 'DEP_DEL15'])
df_test_dt_model = df_test_dt.select(['DAY_OF_MONTH', 'MONTH', 'DISTANCE', 'sum_lat_lon', 'HourlyVisibility', 'HourlyPrecipitation', 'HourlyWindSpeed', 'ELEVATION', 'DISTANCE_GROUP', 'DEP_DEL15'])
feature_columns = ['DAY_OF_MONTH', 'MONTH', 'DISTANCE', 'sum_lat_lon', 'HourlyVisibility', 'HourlyPrecipitation', 'HourlyWindSpeed', 'ELEVATION', 'DISTANCE_GROUP']
target_label = 'DEP_DEL15'

assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
otpw_train_transform = assembler.transform(df_train_dt_model)
otpw_test_transform = assembler.transform(df_test_dt_model)

# train_data, test_data, validation_data = otpw_model_transform.randomSplit([0.7, 0.15, 0.15], seed=100)

In [None]:
dt_classifier = DecisionTreeClassifier(featuresCol="features", labelCol=target_label, maxBins=4, maxDepth=14)
dt_model = dt_classifier.fit(otpw_train_transform)

In [None]:
predictions = dt_model.transform(otpw_test_transform)

evaluator = BinaryClassificationEvaluator(labelCol=target_label)
accuracy = evaluator.evaluate(predictions)

multi_evaluator = MulticlassClassificationEvaluator(labelCol=target_label, metricName="accuracy")
precision = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedPrecision"})
recall = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedRecall"})
# auc = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "areaUnderROC"})

print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
# print(f"AUC: {auc}")

In [None]:
predictions = dt_model.transform(otpw_test_transform)

evaluator = BinaryClassificationEvaluator(labelCol=target_label)
accuracy = evaluator.evaluate(predictions)

multi_evaluator = MulticlassClassificationEvaluator(labelCol=target_label, metricName="accuracy")
precision = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedPrecision"})
recall = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedRecall"})
# auc = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "areaUnderROC"})

print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
# print(f"AUC: {auc}")

tree_model = dt_model.toDebugString
print(tree_model.split("\n")[0])

##### Decision Tree Hypertuning

In [None]:
# Set model
dt = DecisionTreeClassifier(featuresCol="vect_feat", labelCol=label_col)

# Set evaluator and metrics
evaluator = MulticlassClassificationEvaluator(labelCol=target_label, metricName="accuracy")

# SET SEARCH SPACE FOR HYPERTUNING
search_space = {
    "maxDepth": hp.choice("maxDepth", range(1, 30)),  # You can customize the range as needed
    "maxBins": hp.choice("maxBins", range(1, 30))
}

# Set max evaluations
max_evals = 1

# Set model type, experiment number for logging
model_type = "dt"
experiment_num = 1

# Set pipeline
pipeline = pipelineFunc(dt, feat_col, label_col)

In [None]:
best_params, best_acc = hyperTuneFunc(split_data, pipeline, evaluator, model_type, experiment_num, search_space, max_evals)

In [None]:
best_params 

#### Model 3: Random Forest

Random forest is an ensemble learning technique that combines multiple decision trees for prediction. Random forest works well on large datasets and when given multiple features. It understands feature importance, which makes it easy to identify which features are relevant for prediction.

##### Experiment 1

In [None]:
df_train_rf_model = train_data.select(['DAY_OF_MONTH', 'MONTH', 'DISTANCE', 'sum_lat_lon', 'HourlyVisibility', 'HourlyWindSpeed', 'ELEVATION', 'DEP_DEL15'])
df_test_rf_model = test_data.select(['DAY_OF_MONTH', 'MONTH', 'DISTANCE', 'sum_lat_lon', 'HourlyVisibility', 'HourlyWindSpeed', 'ELEVATION', 'DEP_DEL15'])
feature_columns = ['DAY_OF_MONTH', 'MONTH', 'DISTANCE', 'sum_lat_lon', 'HourlyVisibility', 'HourlyWindSpeed', 'ELEVATION' ]
target_label = 'DEP_DEL15'

assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
otpw_train_transform = assembler.transform(df_train_rf_model)
otpw_test_transform = assembler.transform(df_test_rf_model)

# train_data, test_data, validation_data = otpw_model_transform.randomSplit([0.7, 0.15, 0.15], seed=100)

In [None]:
num_trees = 50
rf = RandomForestClassifier(featuresCol="features", labelCol=target_label,  numTrees=num_trees, maxDepth = 5)
rf_model = rf.fit(otpw_train_transform)

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
File [0;32m<command-3984215836264415>:3[0m
[1;32m      1[0m num_trees [38;5;241m=[39m [38;5;241m50[39m
[1;32m      2[0m rf [38;5;241m=[39m RandomForestClassifier(featuresCol[38;5;241m=[39m[38;5;124m"[39m[38;5;124mfeatures[39m[38;5;124m"[39m, labelCol[38;5;241m=[39mtarget_label,  numTrees[38;5;241m=[39mnum_trees, maxDepth [38;5;241m=[39m [38;5;241m5[39m)
[0;32m----> 3[0m rf_model [38;5;241m=[39m rf[38;5;241m.[39mfit(otpw_train_transform)

File [0;32m/databricks/python_shell/dbruntime/MLWorkloadsInstrumentation/_pyspark.py:30[0m, in [0;36m_create_patch_function.<locals>.patched_method[0;34m(self, *args, **kwargs)[0m
[1;32m     28[0m call_succeeded [38;5;241m=[39m [38;5;28;01mFalse[39;00m
[1;32m     29[0m [38;5;28;01mtry[39;00m:
[0;32m---> 30[0m     result [38;5;241m

In [None]:
predictions = rf_model.transform(otpw_test_transform)

evaluator = MulticlassClassificationEvaluator(labelCol=target_label, predictionCol="prediction", metricName="accuracy")

accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")

In [None]:
rf_model.featureImportances

##### Experiment 2

In [None]:
df_train_rf_model = train_data.select(['DAY_OF_MONTH', 'MONTH', 'sum_lat_lon', 'HourlyVisibility', 'ELEVATION', 'DEP_DEL15'])
df_test_rf_model = test_data.select(['DAY_OF_MONTH', 'MONTH', 'sum_lat_lon', 'HourlyVisibility', 'ELEVATION', 'DEP_DEL15'])
feature_columns = ['DAY_OF_MONTH', 'MONTH', 'sum_lat_lon', 'HourlyVisibility',  'ELEVATION' ]
target_label = 'DEP_DEL15'

# Prepare data
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
otpw_train_transform = assembler.transform(df_train_rf_model)
otpw_test_transform = assembler.transform(df_test_rf_model)

# Split the data into training,test,validation sets
# train_data, test_data, validation_data = otpw_model_transform.randomSplit([0.7, 0.15, 0.15], seed=100)
num_trees = 50
rf = RandomForestClassifier(featuresCol="features", labelCol=target_label,  numTrees=num_trees, maxDepth = 5)
rf_model = rf.fit(otpw_train_transform)

In [None]:
predictions = rf_model.transform(otpw_test_transform)

evaluator = MulticlassClassificationEvaluator(labelCol=target_label, predictionCol="prediction", metricName="accuracy")

accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")

In [None]:
rf_model.featureImportances

##### Experiment 3

In [None]:
df_train_rf_model = train_data.select(['DAY_OF_MONTH', 'MONTH', 'sum_lat_lon', 'HourlyVisibility', 'ELEVATION', 'DEP_DEL15'])
df_test_rf_model = test_data.select(['DAY_OF_MONTH', 'MONTH', 'sum_lat_lon', 'HourlyVisibility', 'ELEVATION', 'DEP_DEL15'])
feature_columns = ['DAY_OF_MONTH', 'MONTH', 'sum_lat_lon', 'HourlyVisibility', 'ELEVATION' ]
target_label = 'DEP_DEL15'

# Prepare data
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
otpw_train_transform = assembler.transform(df_train_rf_model)
otpw_test_transform = assembler.transform(df_test_rf_model)

# Split the data into training,test,validation sets
# train_data, test_data, validation_data = otpw_model_transform.randomSplit([0.7, 0.15, 0.15], seed=100)
num_trees = 2
rf = RandomForestClassifier(featuresCol="features", labelCol=target_label,  numTrees=num_trees, maxDepth = 5)
rf_model = rf.fit(otpw_train_transform)

In [None]:
predictions = rf_model.transform(otpw_test_transform)

evaluator = MulticlassClassificationEvaluator(labelCol=target_label, predictionCol="prediction", metricName="accuracy")

accuracy = evaluator.evaluate(predictions)

evaluator_precision = MulticlassClassificationEvaluator(labelCol=target_label, predictionCol="prediction", metricName="weightedPrecision")
precision = evaluator_precision.evaluate(predictions)

evaluator_recall = MulticlassClassificationEvaluator(labelCol=target_label, predictionCol="prediction", metricName="weightedRecall")
recall = evaluator_recall.evaluate(predictions)
print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")


In [None]:
rf_model.featureImportances

##### Random Forest Hypertuning

In [None]:
# Set model
rf = RandomForestClassifier(featuresCol="vect_feat", labelCol=label_col)

# Set evaluator and metrics
evaluator = MulticlassClassificationEvaluator(labelCol=target_label, predictionCol="prediction", metricName="accuracy")

# SET SEARCH SPACE FOR HYPERTUNING
search_space = {
    "maxDepth": hp.choice("maxDepth", range(1, 21)),  # You can customize the range as needed
    "numTrees": hp.choice("numTrees", range(10, 101, 10)),  # You can customize the range as needed
}

# Set max evaluations
max_evals = 1

# Set model type, experiment number for logging
model_type = "rf"
experiment_num = 1

# Set pipeline
pipeline = pipelineFunc(rf, feat_col, label_col)

In [None]:
# Run the function with the parameters to run experiment and produce best hyperparameters
best_params, best_auc = hyperTuneFunc(split_data, pipeline, evaluator, model_type, experiment_num, search_space, max_evals)

In [None]:
best_params

#### Model 4: K-Nearest Neighbors

k-Nearest Neighbors (KNN) is often unsuitable for large datasets due to its computational and memory intensiveness, as it requires calculating and storing distances between each pair of data points, which scales quadratically with dataset size. Applying KNN on a smaller sample of the data is a more feasible approach. By applying a KNN to a small sample of our data, we hope to gain valuable insights into local patterns.

In [None]:
print(type(X_train))

<class 'pandas.core.frame.DataFrame'>


In [None]:
# Pick Values
feature_col = ['DAY_OF_MONTH', 'MONTH', 'HourlyVisibility', 'HourlyStationPressure', 'HourlyWindGustSpeed', 'ORIGIN_AIRPORT_ID', 'DEST_AIRPORT_ID', 'TWO_HR_DELAY']

target_label = 'TWO_HR_DELAY'
  
# Filter and select necessary columns
train_data_df = train_data.select(*feature_col).filter(F.col('HourlyVisibility').isNotNull())
test_data_df = test_data.select(*feature_col).filter(F.col('HourlyVisibility').isNotNull())

# Shuffle the data and limit to 10,000 rows
train_data_sample = train_data_df.orderBy(F.rand(seed=42)).limit(10000)
test_data_sample = test_data_df.orderBy(F.rand(seed=42)).limit(10000)

# Extracting features and target
X_train = train_data_sample.drop(target_label)
y_train = train_data_sample.select(target_label).toPandas()[target_label]

X_test = test_data_sample.drop(target_label)
y_test = test_data_sample.select(target_label).toPandas()[target_label]


In [None]:
# KNN for feature selection
# Specify your feature columns and target label
feature_columns = ['DAY_OF_MONTH', 'MONTH', 'HourlyVisibility', 'HourlyStationPressure', 'HourlyWindGustSpeed', 'ORIGIN_AIRPORT_ID', 'DEST_AIRPORT_ID']

target_label = 'TWO_HR_DELAY'

# Create a KNN classifier
knn = KNeighborsClassifier(n_neighbors=5)

# Feature selection using KNN
def feature_selection_knn(X_train, y_train, X_val, y_val, model, feature_columns):
    remaining_features = list(feature_columns)
    best_score = 0
    
    while len(remaining_features) > 0:
        scores_with_feature_removed = []
        
        for feature in remaining_features:
            model_clone = clone(model)
            
            X_train_reduced = X_train.drop(feature).toPandas()
            X_val_reduced = X_val.drop(feature).toPandas()
            
            model_clone.fit(X_train_reduced, y_train.ravel())
            y_pred_val = model_clone.predict(X_val_reduced)
            score = accuracy_score(y_val.ravel(), y_pred_val)
            
            scores_with_feature_removed.append((feature, score))
        
        scores_with_feature_removed.sort(key=lambda x: x[1], reverse=True)
        best_feature_removed, best_new_score = scores_with_feature_removed[0]
        
        if best_new_score > best_score:
            print("Best Score:", best_score)
            best_score = best_new_score
            remaining_features.remove(best_feature_removed)
        else:
            break

    return remaining_features

# Run the feature selection
selected_features = feature_selection_knn(X_train, y_train, X_test, y_test, knn, feature_columns)

print("Selected features:", selected_features)


  mode, _ = stats.mode(_y[neigh_ind, k], axis=1)
  mode, _ = stats.mode(_y[neigh_ind, k], axis=1)
  mode, _ = stats.mode(_y[neigh_ind, k], axis=1)
  mode, _ = stats.mode(_y[neigh_ind, k], axis=1)
  mode, _ = stats.mode(_y[neigh_ind, k], axis=1)
  mode, _ = stats.mode(_y[neigh_ind, k], axis=1)
  mode, _ = stats.mode(_y[neigh_ind, k], axis=1)
  mode, _ = stats.mode(_y[neigh_ind, k], axis=1)
  mode, _ = stats.mode(_y[neigh_ind, k], axis=1)
  mode, _ = stats.mode(_y[neigh_ind, k], axis=1)
  mode, _ = stats.mode(_y[neigh_ind, k], axis=1)
  mode, _ = stats.mode(_y[neigh_ind, k], axis=1)
  mode, _ = stats.mode(_y[neigh_ind, k], axis=1)
  mode, _ = stats.mode(_y[neigh_ind, k], axis=1)
  mode, _ = stats.mode(_y[neigh_ind, k], axis=1)
  mode, _ = stats.mode(_y[neigh_ind, k], axis=1)
  mode, _ = stats.mode(_y[neigh_ind, k], axis=1)
  mode, _ = stats.mode(_y[neigh_ind, k], axis=1)
  mode, _ = stats.mode(_y[neigh_ind, k], axis=1)
  mode, _ = stats.mode(_y[neigh_ind, k], axis=1)
  mode, _ = stats.mo

Best Score: 0


  mode, _ = stats.mode(_y[neigh_ind, k], axis=1)
  mode, _ = stats.mode(_y[neigh_ind, k], axis=1)
  mode, _ = stats.mode(_y[neigh_ind, k], axis=1)
  mode, _ = stats.mode(_y[neigh_ind, k], axis=1)
  mode, _ = stats.mode(_y[neigh_ind, k], axis=1)
  mode, _ = stats.mode(_y[neigh_ind, k], axis=1)
  mode, _ = stats.mode(_y[neigh_ind, k], axis=1)
  mode, _ = stats.mode(_y[neigh_ind, k], axis=1)
  mode, _ = stats.mode(_y[neigh_ind, k], axis=1)
  mode, _ = stats.mode(_y[neigh_ind, k], axis=1)
  mode, _ = stats.mode(_y[neigh_ind, k], axis=1)
  mode, _ = stats.mode(_y[neigh_ind, k], axis=1)
  mode, _ = stats.mode(_y[neigh_ind, k], axis=1)
  mode, _ = stats.mode(_y[neigh_ind, k], axis=1)
  mode, _ = stats.mode(_y[neigh_ind, k], axis=1)
  mode, _ = stats.mode(_y[neigh_ind, k], axis=1)
  mode, _ = stats.mode(_y[neigh_ind, k], axis=1)
  mode, _ = stats.mode(_y[neigh_ind, k], axis=1)
  mode, _ = stats.mode(_y[neigh_ind, k], axis=1)
  mode, _ = stats.mode(_y[neigh_ind, k], axis=1)
  mode, _ = stats.mo

Selected features: ['DAY_OF_MONTH', 'HourlyVisibility', 'HourlyStationPressure', 'HourlyWindGustSpeed', 'ORIGIN_AIRPORT_ID', 'DEST_AIRPORT_ID']


In [None]:
# Define your feature columns and target label
feature_col = ['DAY_OF_MONTH', 'HourlyVisibility', 'HourlyStationPressure', 'HourlyWindGustSpeed', 'ORIGIN_AIRPORT_ID', 'DEST_AIRPORT_ID', 'TWO_HR_DELAY']
target_label = 'TWO_HR_DELAY'
  
# Filter and select necessary columns
train_data_df = train_data.select(*feature_col).filter(F.col('HourlyVisibility').isNotNull())
test_data_df = test_data.select(*feature_col).filter(F.col('HourlyVisibility').isNotNull())

# Shuffle the data and limit to 170,000 rows
train_data_sample = train_data_df.orderBy(F.rand(seed=42)).limit(170000)
test_data_sample = test_data_df.orderBy(F.rand(seed=42)).limit(170000)

# Extracting features and target
X_train = train_data_sample.drop(target_label).toPandas()
y_train = train_data_sample.select(target_label).toPandas()[target_label]

X_test = test_data_sample.drop(target_label).toPandas()
y_test = test_data_sample.select(target_label).toPandas()[target_label]

# Create a KNN classifier
knn = KNeighborsClassifier(n_neighbors=5)

# Fit the classifier to the training data
knn.fit(X_train, y_train)

# Predict the labels for the test data
y_pred = knn.predict(X_test)

# Calculate the accuracy of the model
accuracy_test = accuracy_score(y_test, y_pred)
accuracy_train = accuracy_score(y_train, knn.predict(X_train))

# Calculate precision and recall of the model
precision = precision_score(y_test, y_pred, average='binary')
recall = recall_score(y_test, y_pred, average='binary')

# Perform k-fold cross-validation
scores = cross_val_score(knn, X_train, y_train, cv=5)

print(f"Train Accuracy: {accuracy_train:.4f}")
print(f"Test Accuracy: {accuracy_test:.4f}")
print(f"Test Precision: {precision:.4f}")
print(f"Test Recall: {recall:.4f}")
print(f"Cross-validation scores: {scores}")
print(f"Mean cross-validation score: {scores.mean():.4f}")



  mode, _ = stats.mode(_y[neigh_ind, k], axis=1)
  mode, _ = stats.mode(_y[neigh_ind, k], axis=1)
  mode, _ = stats.mode(_y[neigh_ind, k], axis=1)
  mode, _ = stats.mode(_y[neigh_ind, k], axis=1)
  mode, _ = stats.mode(_y[neigh_ind, k], axis=1)
  mode, _ = stats.mode(_y[neigh_ind, k], axis=1)
  mode, _ = stats.mode(_y[neigh_ind, k], axis=1)
  mode, _ = stats.mode(_y[neigh_ind, k], axis=1)
  mode, _ = stats.mode(_y[neigh_ind, k], axis=1)
  mode, _ = stats.mode(_y[neigh_ind, k], axis=1)
  mode, _ = stats.mode(_y[neigh_ind, k], axis=1)


Train Accuracy: 0.9022
Test Accuracy: 0.9162
Test Precision: 0.0780
Test Recall: 0.0089
Cross-validation scores: [0.89370588 0.89305882 0.89297059 0.893      0.89391176]
Mean cross-validation score: 0.8933


In [None]:
# Change to look at different features
feature_1 = 'ORIGIN_AIRPORT_ID'
feature_2 = 'HourlyVisibility'

# Get indices of features
index_1 = feature_columns.index(feature_1)
index_2 = feature_columns.index(feature_2)

# Create a scatter plot
plt.figure(figsize=(10, 6))
scatter = plt.scatter(X_train[:, index_1], X_train[:, index_2], c=y_train)

# Add labels and title
plt.xlabel(feature_1)
plt.ylabel(feature_2)
plt.title('KNN: Distance vs Hourly Visibility')

plt.show()


#### Sophisticated Models

Predicting flight delays using flight and weather data requires understanding and discovery of underlying behaviors and complex relationships. Many times these insights are too intricate to unearth with more sophisticated models. To detect these patterns and accurately make predictions, models like XGBoost, Multilayer Perceptrons (MLP), Long Short-Term Memory (LSTM), and 1D Convolutional Neural Networks (CNN) were explored to expand the search landscape.

XGBoost was an enhancement to the decision tree/random forest modeling, effectively boosting to capture interactions between flight delays and other patterns within the dataset.

The MLP, on the other hand, was a generalizable and versatile neural network that also learned patterns in the data while establishing both linear and nonlinear relationships.

The LSTM, a type of recurrent neural network, was also used to capture features considered time-dependent, such as historical delays and seasonal trends. It is able to recognize recurrent patterns through the use of memory cells that retain information over time.

The 1D CNN was implemented to focus on local pattern recognition and extracting features from the sequential data. The goal was to have the CNN identify distinctive patterns in flight delay data that may contribute to predictive accuracy.

By exploring these advanced models, we enhance our predictive capabilities in constructing a comprehensive framework for forecasting future flight delays.

#### Model 5: XGBoost

XGBoost is specifically designed for classification and regression problems and designed for large datasets. The model is a good choice for the data, since we are trying to classify whether a flight is delayed or not delayed. We are also dealing with millions of rows in the data. 

Additionally, XGBoost utilizes decision trees as base learners. The model also provides an attribute to see which features are important for better understanding of the model.

In [None]:
df_train_xgb_model = train_data.select(['DAY_OF_MONTH', 'MONTH','ORIGIN_AIRPORT_ID', 'sum_lat_lon', 'HourlyStationPressure','HourlyWindGustSpeed', 'TWO_HR_DELAY'])
df_test_xgb_model = test_data.select(['DAY_OF_MONTH', 'MONTH','ORIGIN_AIRPORT_ID', 'sum_lat_lon', 'HourlyStationPressure','HourlyWindGustSpeed', 'TWO_HR_DELAY'])
feature_columns = ['DAY_OF_MONTH', 'MONTH','ORIGIN_AIRPORT_ID', 'sum_lat_lon', 'HourlyStationPressure','HourlyWindGustSpeed']
target_label = 'TWO_HR_DELAY'

# Prepare data
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
otpw_train_transform = assembler.transform(df_train_xgb_model)
otpw_test_transform = assembler.transform(df_test_xgb_model)

# Define XGBoost Classifier
xgb_classifier = SparkXGBRegressor(features_col="features", label_col=target_label,num_workers=2,)
xgb_model = xgb_classifier.fit(otpw_train_transform)



In [None]:
# Make predictions on the test data
predictions = xgb_model.transform(otpw_test_transform)
predictions = predictions.select("prediction", target_label)

# Set the threshold for binary predictions
threshold = 0.5

# Add a new column for binary predictions
predictions = predictions.withColumn("binary_prediction", F.when(F.col("prediction") > threshold, 1.0).otherwise(0.0))

# Calculate true positives, false positives, true negatives, false negatives
tp = predictions.filter((F.col("binary_prediction") == 1.0) & (F.col(target_label) == 1.0)).count()
fp = predictions.filter((F.col("binary_prediction") == 1.0) & (F.col(target_label) == 0.0)).count()
tn = predictions.filter((F.col("binary_prediction") == 0.0) & (F.col(target_label) == 0.0)).count()
fn = predictions.filter((F.col("binary_prediction") == 0.0) & (F.col(target_label) == 1.0)).count()

# Calculate accuracy, precision, and recall
accuracy = (tp + tn) / (tp + tn + fp + fn)
precision = tp / (tp + fp)
recall = tp / (tp + fn)

print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")

Accuracy: 0.9225314248327361
Precision: 0.3172043010752688
Recall: 0.00020608257973474728


#### Model 6: Multilayer Perceptron (MLP)

To introduce deep learning and neural network-based models, we began with the versatile multilayer perceptron model containing a single hidden layer as a base comparison.

MLPs are known for their flexibility and adaptability to a wide variety of datasets by learning which features are important. They scale well with large datasets, and can identify non-linear patterns and complexities amongst different features. MLPs are also able to generalize well over unseen data, hence having robust prediction abilities. Flight delay data changes over time (i.e.; through scheduling, demand, seasons, etc.), and MLPs can adapt accordingly to its dynamic nature.

From here we continue our exploration into deep learning with more advanced models.

##### Experiment 1

In [None]:
df_train_mlp = train_data.select(['DAY_OF_MONTH', 'MONTH', 'ORIGIN_AIRPORT_ID', 'sum_lat_lon', 'HourlyVisibility', 
                                  'HourlyStationPressure', 'HourlyWindGustSpeed', 'TWO_HR_DELAY'])
df_test_mlp = test_data.select(['DAY_OF_MONTH', 'MONTH', 'ORIGIN_AIRPORT_ID', 'sum_lat_lon', 'HourlyVisibility', 
                                 'HourlyStationPressure', 'HourlyWindGustSpeed', 'TWO_HR_DELAY'])
                                 
feature_columns = ['DAY_OF_MONTH', 'MONTH', 'ORIGIN_AIRPORT_ID', 'sum_lat_lon', 'HourlyVisibility', 
                   'HourlyStationPressure', 'HourlyWindGustSpeed']
target_label = 'TWO_HR_DELAY'

assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
otpw_train_transform = assembler.transform(df_train_mlp)
otpw_test_transform = assembler.transform(df_test_mlp)

In [None]:
layers = [len(feature_columns), 4, 2]
mlp = MultilayerPerceptronClassifier(featuresCol="features", 
                                     labelCol=target_label, 
                                     layers=layers, seed=42)

model = mlp.fit(otpw_train_transform)

pred = model.transform(otpw_test_transform)

metric_list = ["accuracy", "weightedPrecision", "weightedRecall", "f1"]
for metric in metric_list:
    evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", 
                                                labelCol=target_label, 
                                                metricName=metric
                                                )
    result = evaluator.evaluate(pred)

    print(f"{metric}: {result}")

accuracy: 0.9225987010506778
weightedPrecision: 0.8662997477094988
weightedRecall: 0.9225987010506778
f1: 0.8855039605891918


In [None]:
wlayers = model.layers
wlayers

Param(parent='MultilayerPerceptronClassifier_bd71658a548e', name='layers', doc='Sizes of layers from input layer to output layer E.g., Array(780, 100, 10) means 780 inputs, one hidden layer with 100 neurons and output layer of 10 neurons.')

##### Experiment 2

In [None]:
df_train_mlp = train_data.select(['DAY_OF_MONTH', 'MONTH', 'ORIGIN_AIRPORT_ID', 'sum_lat_lon', 'HourlyVisibility', 
                                  'HourlyStationPressure', 'HourlyWindGustSpeed', 'CARRIER_DELAY', 'WEATHER_DELAY', 'NAS_DELAY', 'SECURITY_DELAY', 'LATE_AIRCRAFT_DELAY', 'DEP_DELAY', 'TWO_HR_DELAY'])
df_test_mlp = test_data.select(['DAY_OF_MONTH', 'MONTH', 'ORIGIN_AIRPORT_ID', 'sum_lat_lon', 'HourlyVisibility', 
                                 'HourlyStationPressure', 'HourlyWindGustSpeed', 'CARRIER_DELAY', 'WEATHER_DELAY', 'NAS_DELAY', 'SECURITY_DELAY', 'LATE_AIRCRAFT_DELAY', 'DEP_DELAY', 'TWO_HR_DELAY'])
                                 
feature_columns = ['DAY_OF_MONTH', 'MONTH', 'ORIGIN_AIRPORT_ID', 'sum_lat_lon', 'HourlyVisibility', 
                   'HourlyStationPressure', 'HourlyWindGustSpeed', 'CARRIER_DELAY', 'WEATHER_DELAY', 'NAS_DELAY', 
                   'SECURITY_DELAY', 'LATE_AIRCRAFT_DELAY', 'DEP_DELAY']
target_label = 'TWO_HR_DELAY'

assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
otpw_train_transform = assembler.transform(df_train_mlp)
otpw_test_transform = assembler.transform(df_test_mlp)

In [None]:
layers = [len(feature_columns), 4, 2]
mlp = MultilayerPerceptronClassifier(featuresCol="features", 
                                     labelCol=target_label, 
                                     layers=layers, seed=42)

model = mlp.fit(otpw_train_transform)

pred = model.transform(otpw_test_transform)

metric_list = ["accuracy", "weightedPrecision", "weightedRecall", "f1"]
for metric in metric_list:
    evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", 
                                                labelCol=target_label, 
                                                metricName=metric
                                                )
    result = evaluator.evaluate(pred)

    print(f"{metric}: {result}")

accuracy: 0.9075594122862221
weightedPrecision: 0.9265980176160474
weightedRecall: 0.9075594122862221
f1: 0.9152963687591603


In [None]:
wlayers = model.layers
wlayers

Param(parent='MultilayerPerceptronClassifier_4a6c7cf7e509', name='layers', doc='Sizes of layers from input layer to output layer E.g., Array(780, 100, 10) means 780 inputs, one hidden layer with 100 neurons and output layer of 10 neurons.')

##### Experiment 3

This experiment used the features from Experiment 2, but with hypertuned parameters.

In [None]:
blockSize = 2
hidden_size_1 = 2
layers = [13, hidden_size_1, 2]
maxIter = 2
stepSize = 0.04908798897714254

layers = [len(feature_columns), 4, 2]
mlp = MultilayerPerceptronClassifier(
                                     featuresCol="features", 
                                     labelCol=target_label, 
                                     layers=layers, 
                                     blockSize=blockSize, 
                                     maxIter=maxIter, 
                                     stepSize=stepSize, 
                                     seed=42
                                    )


model = mlp.fit(otpw_train_transform)

pred = model.transform(otpw_test_transform)

metric_list = ["accuracy", "weightedPrecision", "weightedRecall", "f1"]
for metric in metric_list:
    evaluator = MulticlassClassificationEvaluator(
                                                  predictionCol="prediction", 
                                                  labelCol=target_label, 
                                                  metricName=metric
                                                  )
    result = evaluator.evaluate(pred)

    print(f"{metric}: {result}")

accuracy: 0.922628926826379
weightedPrecision: 0.8512441366167958
weightedRecall: 0.922628926826379
f1: 0.8855001864784349


##### MLP Hypertuning

In [None]:
# Set dataset
mlp_feat_col = [col for col in df_train_mlp.columns if col != "TWO_HR_DELAY"]
mlp_label_col = "TWO_HR_DELAY"
mlp_data = (df_train_mlp, df_test_mlp)

# Set model
mlp = MultilayerPerceptronClassifier(featuresCol="vect_feat", 
                                     labelCol=mlp_label_col, 
                                     seed=42)

# Set evaluator and metrics
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", 
                                              labelCol=mlp_label_col, 
                                              metricName="accuracy")

# Set layer sizes
input_size = len(mlp_feat_col)
output_size = 2
hs1 = hp.choice("hidden_size_1", [4, 5, 10])
hs2 = hp.choice("hidden_size_2", [3, 4, 6])
hs3 = hp.choice("hidden_size_3", [4, 6, 8])

# Set search space for hypertuning
mlp_search_space = {
                "layers": hp.choice("layers", [
                        [input_size, hs1, output_size],
                        [input_size, hs1, hs2, output_size],
                        [input_size, hs1, hs2, hs3, output_size],
                        ]),
                "blockSize": hp.choice("blockSize", [32, 64, 128]),
                "maxIter": hp.choice("maxIter", [10, 15, 20]),
                "stepSize": hp.uniform("stepSize", 0.01, 0.1),
                }


# Set max evaluations
max_evals = 1

# Set model type, experiment number for logging
model_type = "mlp"
experiment_num = 1

# Set pipeline
pipeline = pipelineFunc(mlp, mlp_feat_col, mlp_label_col)

In [None]:
best_params, best_acc = hyperTuneFunc(mlp_data, pipeline, evaluator, model_type, experiment_num, mlp_search_space, max_evals)

  0%|          | 0/1 [00:00<?, ?trial/s, best loss=?]100%|██████████| 1/1 [05:34<00:00, 334.17s/trial, best loss: -0.9226655230570485]100%|██████████| 1/1 [05:34<00:00, 334.17s/trial, best loss: -0.9226655230570485]


In [None]:
print(f"Best Params: {best_params}")
print(f"Best Accuracy: {best_acc}")

Best Params: {'blockSize': 2, 'hidden_size_1': 2, 'layers': 0, 'maxIter': 2, 'stepSize': 0.04908798897714254}
Best Accuracy: 0.9226655230570485


#### Model 7: Long Short-Term Memory (LSTM)

LSTM networks offer a promising avenue for predicting flight delays due to their adeptness in grasping intricate temporal connections within sequential data. These models, when applied to flight delay prediction, harness a blend of weather and flight data for precise forecasts.

These models can process varying sequence lengths adeptly, accommodating irregular intervals in flight data due to unforeseen events. However, in our endeavor to apply LSTMs within PySpark for flight delay prediction, we encountered a limitation. The scale of our Spark DataFrame was too extensive to fit into a single numpy array for training the LSTM model.

In [None]:
# Count the number of null values in each column
display(df_otpw.select([count(when(df_otpw[c].isNull(), c)).alias(c) for c in df_otpw.columns]))

QUARTER,YEAR,MONTH,DAY_OF_MONTH,DAY_OF_WEEK,FL_DATE,CRS_DEP_TIME,DEP_DELAY_GROUP,CRS_ARR_TIME,ARR_TIME,ARR_DELAY_GROUP,CANCELLATION_CODE,CRS_ELAPSED_TIME,ACTUAL_ELAPSED_TIME,AIR_TIME,DISTANCE,sched_depart_date_time,two_hours_prior_depart_UTC,dest_airport_lon,dest_airport_lat,STATION,DATE,LATITUDE,LONGITUDE,ELEVATION,DEP_TIME,HourlyPrecipitation,HourlyVisibility,HourlyPresentWeatherType,HourlySkyConditions,HourlyPressureChange,HourlyPressureTendency,HourlyStationPressure,HourlyWindDirection,HourlyWindGustSpeed,HourlyWindSpeed,ARR_DELAY,ARR_DEL15,DEP_DEL15,DEP_TIME_BLK_DET,HOUR_BLK,HOUR_BLK_2,TWO_HR_PROB,TWO_HR_DELAY,OP_UNIQUE_CARRIER,ORIGIN_AIRPORT_ID,ORIGIN_CITY_MARKET_ID,ORIGIN_STATE_FIPS,DEST_AIRPORT_ID,DEST_CITY_MARKET_ID,DEST_STATE_FIPS,DEP_TIME_BLK,ARR_TIME_BLK,CANCELLED,DIVERTED,DISTANCE_GROUP,i_HOUR_BLK,delay_features,pca_delay_features,CARRIER_DELAY,WEATHER_DELAY,NAS_DELAY,SECURITY_DELAY,LATE_AIRCRAFT_DELAY,DEP_DELAY,sum_lat_lon
0,0,0,0,0,0,0,475497,0,500080,568684,31182239,163,566086,566086,0,0,0,0,0,0,0,0,0,31670371,470530,28318649,47894,20812402,3045741,20805185,0,0,27513512,0,31613185,203939,203939,0,0,720964,720964,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


In [None]:
display(train_data.limit(100))

QUARTER,YEAR,MONTH,DAY_OF_MONTH,DAY_OF_WEEK,FL_DATE,CRS_DEP_TIME,DEP_DELAY_GROUP,CRS_ARR_TIME,ARR_TIME,ARR_DELAY_GROUP,CANCELLATION_CODE,CRS_ELAPSED_TIME,ACTUAL_ELAPSED_TIME,AIR_TIME,DISTANCE,sched_depart_date_time,two_hours_prior_depart_UTC,dest_airport_lon,dest_airport_lat,STATION,DATE,LATITUDE,LONGITUDE,ELEVATION,DEP_TIME,HourlyPrecipitation,HourlyVisibility,HourlyPresentWeatherType,HourlySkyConditions,HourlyPressureChange,HourlyPressureTendency,HourlyStationPressure,HourlyWindDirection,HourlyWindGustSpeed,HourlyWindSpeed,ARR_DELAY,ARR_DEL15,DEP_DEL15,DEP_TIME_BLK_DET,HOUR_BLK,HOUR_BLK_2,TWO_HR_PROB,OP_UNIQUE_CARRIER,ORIGIN_AIRPORT_ID,ORIGIN_CITY_MARKET_ID,ORIGIN_STATE_FIPS,DEST_AIRPORT_ID,DEST_CITY_MARKET_ID,DEST_STATE_FIPS,DEP_TIME_BLK,ARR_TIME_BLK,CANCELLED,DIVERTED,DISTANCE_GROUP,i_HOUR_BLK,delay_features,pca_delay_features,CARRIER_DELAY,WEATHER_DELAY,NAS_DELAY,SECURITY_DELAY,LATE_AIRCRAFT_DELAY,DEP_DELAY,sum_lat_lon,TWO_HR_DELAY
1,2015,2,9,1,2015-02-09,1154,5.0,1255,1409.0,4.0,,61.0,49.0,34.0,135.0,2015-02-09T17:54:00,72531594870,-87.9,42.0,2015-02-09T14:53:00,40.03972,-88.27778,229.8,,1320.0,,3.332204510175204,0.0,30.15,1.791759469228055,4.454347296253507,2.3978952727983707,,2.772588722239781,,74.0,1.0,1.0,1100-1159,1100,1300,0.875,8.0,211.0,190.0,4.0,1.0,0.0,4.0,4.0,11.0,0.0,0.0,3.0,4.0,"Map(vectorType -> dense, length -> 6, values -> List(0.0, 12.0, 0.0, 0.0, 62.0, 86.0))","Map(vectorType -> dense, length -> 3, values -> List(-2.407122648693878, -0.7254590396798071, -0.021467096194487355))","Map(vectorType -> dense, length -> 1, values -> List(0.0))","Map(vectorType -> dense, length -> 1, values -> List(1.0412069980146033))","Map(vectorType -> dense, length -> 1, values -> List(0.0))","Map(vectorType -> dense, length -> 1, values -> List(0.0))","Map(vectorType -> dense, length -> 1, values -> List(2.7031851371988314))","Map(vectorType -> dense, length -> 1, values -> List(1.9603658463490263))",-45.9,1
2,2017,6,1,4,2017-06-01,1659,3.0,1844,1921.0,2.0,,285.0,275.0,251.0,1919.0,2017-06-01T20:59:00,72219013874,-117.9,33.7,2017-06-01T17:52:00,33.6301,-84.4418,307.8,,1746.0,-RA:02 |RA |,4.248495242049359,,30.03,,4.382026634673881,2.3978952727983707,,2.19722457733622,,37.0,1.0,1.0,1600-1659,1600,1800,1.0,1.0,0.0,1.0,3.0,41.0,3.0,0.0,10.0,3.0,0.0,0.0,8.0,10.0,"Map(vectorType -> dense, length -> 6, values -> List(4.0, 0.0, 0.0, 0.0, 33.0, 47.0))","Map(vectorType -> dense, length -> 3, values -> List(-1.0835071803869059, 0.11717704666506264, 0.013836895149833134))","Map(vectorType -> dense, length -> 1, values -> List(0.1510457717928727))","Map(vectorType -> dense, length -> 1, values -> List(0.0))","Map(vectorType -> dense, length -> 1, values -> List(0.0))","Map(vectorType -> dense, length -> 1, values -> List(0.0))","Map(vectorType -> dense, length -> 1, values -> List(1.4387920891542167))","Map(vectorType -> dense, length -> 1, values -> List(1.0713627299814446))",-84.2,1
2,2015,6,9,2,2015-06-09,1220,1.0,1337,1351.0,0.0,,137.0,123.0,104.0,813.0,2015-06-09T16:20:00,72219013874,-97.7,30.2,2015-06-09T12:52:00,33.6301,-84.4418,307.8,,1248.0,,4.330733340286331,0.06,29.85,2.19722457733622,4.060443010546419,2.3978952727983707,,2.302585092994046,,14.0,0.0,1.0,1200-1259,1200,1400,1.0,1.0,0.0,1.0,3.0,32.0,25.0,1.0,5.0,8.0,0.0,0.0,2.0,5.0,"Map(vectorType -> sparse, length -> 6, indices -> List(5), values -> List(28.0))","Map(vectorType -> dense, length -> 3, values -> List(-0.07167423509719416, 0.04878841224751195, -0.007670199991821314))","Map(vectorType -> dense, length -> 1, values -> List(0.0))","Map(vectorType -> dense, length -> 1, values -> List(0.0))","Map(vectorType -> dense, length -> 1, values -> List(0.0))","Map(vectorType -> dense, length -> 1, values -> List(0.0))","Map(vectorType -> dense, length -> 1, values -> List(0.0))","Map(vectorType -> dense, length -> 1, values -> List(0.6382586476485201))",-67.5,1
4,2016,10,7,5,2016-10-07,1735,1.0,2041,2059.0,1.0,,126.0,127.0,93.0,599.0,2016-10-07T22:35:00,72530094846,-81.0,35.2,2016-10-07T18:51:00,41.96019,-87.93162,201.8,,1752.0,,3.8918202981106265,,30.18,,4.143134726391533,2.3978952727983707,,2.302585092994046,,18.0,1.0,1.0,1700-1759,1700,1900,1.0,4.0,1.0,0.0,4.0,7.0,11.0,7.0,3.0,10.0,0.0,0.0,1.0,3.0,"Map(vectorType -> dense, length -> 6, values -> List(17.0, 0.0, 1.0, 0.0, 0.0, 17.0))","Map(vectorType -> dense, length -> 3, values -> List(-0.21270747443373558, 0.3627276892397138, -0.03116979686848626))","Map(vectorType -> dense, length -> 1, values -> List(0.641944530119709))","Map(vectorType -> dense, length -> 1, values -> List(0.0))","Map(vectorType -> dense, length -> 1, values -> List(0.062468629186157115))","Map(vectorType -> dense, length -> 1, values -> List(0.0))","Map(vectorType -> dense, length -> 1, values -> List(0.0))","Map(vectorType -> dense, length -> 1, values -> List(0.38751417892945866))",-45.8,1
3,2018,8,11,6,2018-08-11,1820,11.0,2122,9.0,11.0,,122.0,111.0,90.0,723.0,2018-08-11T23:20:00,72530094846,-73.8,42.7,2018-08-11T19:51:00,41.96019,-87.93162,201.8,,2118.0,,4.204692619390966,,30.01,,3.912023005428146,2.3978952727983707,,0.0,,167.0,1.0,1.0,1800-1859,1800,2000,1.0,4.0,1.0,0.0,4.0,85.0,70.0,5.0,12.0,1.0,0.0,0.0,1.0,12.0,"Map(vectorType -> dense, length -> 6, values -> List(30.0, 0.0, 0.0, 0.0, 137.0, 178.0))","Map(vectorType -> dense, length -> 3, values -> List(-5.628125725365476, 0.6369787089450644, 0.06230187385719516))","Map(vectorType -> dense, length -> 1, values -> List(1.1328432884465454))","Map(vectorType -> dense, length -> 1, values -> List(0.0))","Map(vectorType -> dense, length -> 1, values -> List(0.0))","Map(vectorType -> dense, length -> 1, values -> List(0.0))","Map(vectorType -> dense, length -> 1, values -> List(5.97316715800387))","Map(vectorType -> dense, length -> 1, values -> List(4.05750140290845))",-31.100002,1
3,2017,7,24,1,2017-07-24,1240,2.0,1745,1812.0,1.0,,185.0,180.0,162.0,1371.0,2017-07-24T19:40:00,72386023169,-90.4,38.8,2017-07-24T15:56:00,36.0719,-115.1634,664.5,,1312.0,,4.276666119016055,0.02,29.74,2.19722457733622,3.4657359027997265,2.3978952727983707,,2.079441541679836,,27.0,1.0,1.0,1200-1259,1200,1400,0.7142857142857143,0.0,9.0,13.0,11.0,29.0,23.0,17.0,5.0,7.0,0.0,0.0,6.0,5.0,"Map(vectorType -> sparse, length -> 6, indices -> List(4, 5), values -> List(27.0, 32.0))","Map(vectorType -> dense, length -> 3, values -> List(-0.6601480715211258, 0.033046181756635186, 0.014561530311001807))","Map(vectorType -> dense, length -> 1, values -> List(0.0))","Map(vectorType -> dense, length -> 1, values -> List(0.0))","Map(vectorType -> dense, length -> 1, values -> List(0.0))","Map(vectorType -> dense, length -> 1, values -> List(0.0))","Map(vectorType -> dense, length -> 1, values -> List(1.1771935274898138))","Map(vectorType -> dense, length -> 1, values -> List(0.7294384544554516))",-51.600002,1
1,2016,1,9,6,2016-01-09,800,1.0,1050,1105.0,1.0,,170.0,159.0,144.0,1097.0,2016-01-09T13:00:00,74783012849,-73.7,41.1,2016-01-09T09:53:00,26.07875,-80.16217,3.4,,826.0,,4.290459441148391,-0.06,29.93,0.6931471805599453,4.442651256490317,2.3978952727983707,,1.3862943611198906,,15.0,1.0,1.0,0800-0859,800,1000,1.0,6.0,22.0,9.0,2.0,95.0,2.0,5.0,2.0,5.0,0.0,0.0,4.0,2.0,"Map(vectorType -> sparse, length -> 6, indices -> List(0, 5), values -> List(15.0, 26.0))","Map(vectorType -> dense, length -> 3, values -> List(-0.30364006750737116, 0.3524382416433378, -0.0281186417980711))","Map(vectorType -> dense, length -> 1, values -> List(0.5664216442232727))","Map(vectorType -> dense, length -> 1, values -> List(0.0))","Map(vectorType -> dense, length -> 1, values -> List(0.0))","Map(vectorType -> dense, length -> 1, values -> List(0.0))","Map(vectorType -> dense, length -> 1, values -> List(0.0))","Map(vectorType -> dense, length -> 1, values -> List(0.5926687442450544))",-32.6,1
1,2015,2,8,7,2015-02-08,1815,5.0,2142,2310.0,5.0,,387.0,390.0,351.0,2422.0,2015-02-08T23:15:00,74486094789,-122.3,47.4,2015-02-08T19:22:00,40.63915,-73.76401,3.4,,1940.0,,3.555348061489413,,,,4.406719247264253,2.3978952727983707,,2.302585092994046,,88.0,1.0,1.0,1800-1859,1800,2000,1.0,1.0,18.0,2.0,5.0,12.0,16.0,12.0,12.0,1.0,0.0,0.0,7.0,12.0,"Map(vectorType -> dense, length -> 6, values -> List(72.0, 0.0, 3.0, 0.0, 13.0, 85.0))","Map(vectorType -> dense, length -> 3, values -> List(-2.5346259238773126, 1.449438371495169, -0.09740881144534272))","Map(vectorType -> dense, length -> 1, values -> List(2.7188238922717085))","Map(vectorType -> dense, length -> 1, values -> List(0.0))","Map(vectorType -> dense, length -> 1, values -> List(0.18740588755847135))","Map(vectorType -> dense, length -> 1, values -> List(0.0))","Map(vectorType -> dense, length -> 1, values -> List(0.5667968836062066))","Map(vectorType -> dense, length -> 1, values -> List(1.9375708946472934))",-74.9,1
3,2018,8,6,1,2018-08-06,2045,1.0,2304,2332.0,1.0,,139.0,152.0,113.0,802.0,2018-08-07T01:45:00,72259003927,-87.9,42.0,2018-08-06T21:53:00,32.8978,-97.0189,170.7,,2100.0,,4.31748811353631,,29.98,,3.80666248977032,2.3978952727983707,3.1780538303479458,2.6390573296152584,,28.0,1.0,1.0,2000-2059,2000,2200,0.75,2.0,2.0,4.0,1.0,1.0,0.0,4.0,14.0,15.0,0.0,0.0,2.0,14.0,"Map(vectorType -> dense, length -> 6, values -> List(15.0, 0.0, 13.0, 0.0, 0.0, 15.0))","Map(vectorType -> dense, length -> 3, values -> List(-0.33489949815935083, 0.006475742952068373, -0.03382472677542814))","Map(vectorType -> dense, length -> 1, values -> List(0.5664216442232727))","Map(vectorType -> dense, length -> 1, values -> List(0.0))","Map(vectorType -> dense, length -> 1, values -> List(0.8120921794200425))","Map(vectorType -> dense, length -> 1, values -> List(0.0))","Map(vectorType -> dense, length -> 1, values -> List(0.0))","Map(vectorType -> dense, length -> 1, values -> List(0.34192427552599297))",-45.9,1
4,2016,10,31,1,2016-10-31,2320,3.0,2347,32.0,3.0,,147.0,144.0,123.0,868.0,2016-11-01T04:20:00,72259003927,-112.0,33.4,2016-11-01T00:53:00,32.8978,-97.0189,170.7,,8.0,,4.219507705176107,,29.92,,4.382026634673881,2.3978952727983707,,2.079441541679836,,45.0,1.0,1.0,2300-2359,2300,100,1.0,9.0,2.0,4.0,1.0,6.0,9.0,9.0,18.0,15.0,0.0,0.0,2.0,17.0,"Map(vectorType -> sparse, length -> 6, indices -> List(0, 5), values -> List(45.0, 48.0))","Map(vectorType -> dense, length -> 3, values -> List(-1.1772130376247891, 0.9715548125730632, -0.06939209471823596))","Map(vectorType -> dense, length -> 1, values -> List(1.699264932669818))","Map(vectorType -> dense, length -> 1, values -> List(0.0))","Map(vectorType -> dense, length -> 1, values -> List(0.0))","Map(vectorType -> dense, length -> 1, values -> List(0.0))","Map(vectorType -> dense, length -> 1, values -> List(0.0))","Map(vectorType -> dense, length -> 1, values -> List(1.0941576816831775))",-78.6,1


In [None]:
display(test_data.limit(1))

QUARTER,YEAR,MONTH,DAY_OF_MONTH,DAY_OF_WEEK,FL_DATE,CRS_DEP_TIME,DEP_DELAY_GROUP,CRS_ARR_TIME,ARR_TIME,ARR_DELAY_GROUP,CANCELLATION_CODE,CRS_ELAPSED_TIME,ACTUAL_ELAPSED_TIME,AIR_TIME,DISTANCE,sched_depart_date_time,two_hours_prior_depart_UTC,dest_airport_lon,dest_airport_lat,STATION,DATE,LATITUDE,LONGITUDE,ELEVATION,DEP_TIME,HourlyPrecipitation,HourlyPresentWeatherType,HourlySkyConditions,HourlyPressureTendency,HourlyStationPressure,HourlyWindDirection,HourlyWindGustSpeed,HourlyWindSpeed,ARR_DELAY,ARR_DEL15,DEP_DEL15,DEP_TIME_BLK_DET,HOUR_BLK,HOUR_BLK_2,TWO_HR_PROB,OP_UNIQUE_CARRIER,ORIGIN_CITY_MARKET_ID,ORIGIN_STATE_FIPS,DEST_CITY_MARKET_ID,DEST_STATE_FIPS,DEP_TIME_BLK,ARR_TIME_BLK,CANCELLED,DIVERTED,DISTANCE_GROUP,i_HOUR_BLK,delay_features,pca_delay_features,CARRIER_DELAY,WEATHER_DELAY,NAS_DELAY,SECURITY_DELAY,LATE_AIRCRAFT_DELAY,DEP_DELAY,sum_lat_lon,HourlyVisibility,HourlyPressureChange,ORIGIN_AIRPORT_ID,DEST_AIRPORT_ID,ORIGIN_DESTINATION_AIRPORT_COUNT,onehot_OP_UNIQUE_CARRIER,onehot_ORIGIN_AIRPORT_ID,onehot_ORIGIN_CITY_MARKET_ID,onehot_ORIGIN_STATE_FIPS,onehot_DEST_AIRPORT_ID,onehot_DEST_CITY_MARKET_ID,onehot_DEST_STATE_FIPS,onehot_CANCELLED,onehot_DIVERTED,onehot_DISTANCE_GROUP,is_christmas_or_newyear,TWO_HR_DELAY
4,2019,12,14,6,2019-12-14,735,2,917,1019,4,,102.0,122.0,65.0,404.0,2019-12-14T12:35:00,72205012815,-84.4,33.6,2019-12-14T08:53:00,28.4339,-81.325,27.4,,817,,,29.79,0.9701451561535046,0.5202228853590822,,1.791759469228055,,62.0,1.0,1.0,0700-0759,700,900,0.1555555555555555,1.0,17.0,2.0,1.0,3.0,1.0,9.0,0.0,0.0,0.0,1.0,"Map(vectorType -> dense, length -> 6, values -> List(0.0, 42.0, 20.0, 0.0, 0.0, 42.0))","Map(vectorType -> dense, length -> 3, values -> List(-1.3595004011540661, -3.1301960052237408, -0.24162238570532463))","Map(vectorType -> dense, length -> 1, values -> List(0.0))","Map(vectorType -> dense, length -> 1, values -> List(3.6437275851666917))","Map(vectorType -> dense, length -> 1, values -> List(1.249297282480312))","Map(vectorType -> dense, length -> 1, values -> List(0.0))","Map(vectorType -> dense, length -> 1, values -> List(0.0))","Map(vectorType -> dense, length -> 1, values -> List(0.9573419419222475))",-50.800003,0.915034374841782,,14.0,34.0,444,"Map(vectorType -> sparse, length -> 19, indices -> List(1), values -> List(1.0))","Map(vectorType -> sparse, length -> 369, indices -> List(14), values -> List(1.0))","Map(vectorType -> sparse, length -> 342, indices -> List(17), values -> List(1.0))","Map(vectorType -> sparse, length -> 53, indices -> List(2), values -> List(1.0))","Map(vectorType -> sparse, length -> 368, indices -> List(34), values -> List(1.0))","Map(vectorType -> sparse, length -> 342, indices -> List(1), values -> List(1.0))","Map(vectorType -> sparse, length -> 53, indices -> List(3), values -> List(1.0))","Map(vectorType -> sparse, length -> 2, indices -> List(0), values -> List(1.0))","Map(vectorType -> sparse, length -> 2, indices -> List(0), values -> List(1.0))","Map(vectorType -> sparse, length -> 11, indices -> List(0), values -> List(1.0))",False,0


In [None]:
df_train_lstm_model = train_data.select(['DAY_OF_MONTH', 'MONTH','ORIGIN_AIRPORT_ID', 'sum_lat_lon', 'HourlyStationPressure','HourlyWindGustSpeed', 'TWO_HR_DELAY'])
df_test_lstm_model = test_data.select(['DAY_OF_MONTH', 'MONTH','ORIGIN_AIRPORT_ID', 'sum_lat_lon', 'HourlyStationPressure','HourlyWindGustSpeed', 'TWO_HR_DELAY'])
feature_columns = ['DAY_OF_MONTH', 'MONTH','ORIGIN_AIRPORT_ID', 'sum_lat_lon', 'HourlyStationPressure','HourlyWindGustSpeed']
target_label = 'TWO_HR_DELAY'

# Prepare data
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
otpw_train_transform = assembler.transform(df_train_xgb_model)
otpw_test_transform = assembler.transform(df_test_xgb_model)

In [None]:
df_train_lstm_model = train_data.select(['DAY_OF_MONTH', 'MONTH', 'ORIGIN_AIRPORT_ID', 'sum_lat_lon', 'HourlyVisibility',  'HourlyStationPressure', 'HourlyWindGustSpeed', 'CARRIER_DELAY', 'WEATHER_DELAY', 'NAS_DELAY', 'SECURITY_DELAY', 'LATE_AIRCRAFT_DELAY', 'DEP_DELAY', 'TWO_HR_DELAY'])
df_test_lstm_model = test_data.select(['DAY_OF_MONTH', 'MONTH', 'ORIGIN_AIRPORT_ID', 'sum_lat_lon', 'HourlyVisibility',  'HourlyStationPressure', 'HourlyWindGustSpeed', 'CARRIER_DELAY', 'WEATHER_DELAY', 'NAS_DELAY', 'SECURITY_DELAY', 'LATE_AIRCRAFT_DELAY', 'DEP_DELAY', 'TWO_HR_DELAY'])
feature_columns = ['DAY_OF_MONTH', 'MONTH', 'ORIGIN_AIRPORT_ID', 'sum_lat_lon', 'HourlyVisibility',  'HourlyStationPressure', 'HourlyWindGustSpeed', 'CARRIER_DELAY', 'WEATHER_DELAY', 'NAS_DELAY', 'SECURITY_DELAY', 'LATE_AIRCRAFT_DELAY', 'DEP_DELAY']

target_label = 'TWO_HR_DELAY'

assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
otpw_train_transform = assembler.transform(df_train_lstm_model)
otpw_test_transform = assembler.transform(df_test_lstm_model)


In [None]:
train_data = otpw_train_transform.toPandas()
test_data = otpw_test_transform.toPandas()

X_train = train_data[feature_columns].values
X_test = test_data[feature_columns].values
y_train = train_data[target_label].values
y_test = test_data[target_label].values

# Reshape the input data for LSTM (samples, time steps, features)
num_time_steps = 1  # For this example, we use each sample as a single time step
num_features = len(feature_columns)
X_train_lstm = X_train_scaled.reshape(-1, num_time_steps, num_features)
X_test_lstm = X_test_scaled.reshape(-1, num_time_steps, num_features)

model = Sequential([
    LSTM(units=50, activation='relu', input_shape=(num_time_steps, num_features)),
    Dense(units=1)
])

model.compile(optimizer='adam', loss='mean_squared_error')

model.fit(X_train_lstm, y_train, epochs=10, batch_size=32, validation_split=0.2)

loss = model.evaluate(X_test_lstm, y_test)

[0;31m---------------------------------------------------------------------------[0m
[0;31mThe Python process exited with exit code 137 (SIGKILL: Killed). This may have been caused by an OOM error. Check your command's memory usage.[0m
[0;31m[0m
[0;31m[0m
[0;31m[0m
[0;31mThe last 10 KB of the process's stderr and stdout can be found below. See driver logs for full logs.[0m
[0;31m---------------------------------------------------------------------------[0m
[0;31mLast messages on stderr:[0m
[0;31m3396)[0m
[0;31m	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)[0m
[0;31m	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)[0m
[0;31m	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)[0m
[0;31m	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3396)[0m
[0;31m	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1467)[0m
[0;31m	at 

In [None]:
train_data_lstm = otpw_train_transform.select("features").rdd.map(lambda x: np.array(x[0]))
train_labels_lstm = otpw_train_transform.select(target_label).rdd.map(lambda x: x[0])

test_data_lstm = otpw_test_transform.select("features").rdd.map(lambda x: np.array(x[0]))
test_labels_lstm = otpw_test_transform.select(target_label).rdd.map(lambda x: x[0])

In [None]:
display(train_data_lstm.first())

array([  9.        ,   2.        , 211.        , -45.90000153,
         2.39789527,   2.77258872])

In [None]:
sequence_length = 10
num_features = 9

model = Sequential()
model.add(LSTM(64, input_shape=(sequence_length, num_features)))
model.add(Dense(2, activation='softmax'))
model.compile(loss='sparse_categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

X_train = np.array(train_data.collect())
y_train = np.array(train_labels.collect())

X_test = np.array(test_data.collect())
y_test = np.array(test_labels.collect())

model.fit(X_train, y_train, epochs=10, batch_size=32)

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
File [0;32m<command-3984215836272226>:12[0m
[1;32m      9[0m model[38;5;241m.[39madd(Dense([38;5;241m2[39m, activation[38;5;241m=[39m[38;5;124m'[39m[38;5;124msoftmax[39m[38;5;124m'[39m))
[1;32m     10[0m model[38;5;241m.[39mcompile(loss[38;5;241m=[39m[38;5;124m'[39m[38;5;124msparse_categorical_crossentropy[39m[38;5;124m'[39m, optimizer[38;5;241m=[39m[38;5;124m'[39m[38;5;124madam[39m[38;5;124m'[39m, metrics[38;5;241m=[39m[[38;5;124m'[39m[38;5;124maccuracy[39m[38;5;124m'[39m])
[0;32m---> 12[0m X_train [38;5;241m=[39m np[38;5;241m.[39marray(train_data[38;5;241m.[39mcollect())
[1;32m     13[0m y_train [38;5;241m=[39m np[38;5;241m.[39marray(train_labels[38;5;241m.[39mcollect())
[1;32m     15[0m X_test [38;5;241m=[39m np[38;5;241m.[39marray(test_data[38;5

#### Model 8: 1D Convolutional Neural Network

The 1D Convolutional Neural Network (1D CNN) is particularly suitable for predicting flight delays based on historical and environmental data because it excels at detecting local temporal patterns within sequences. This means that the 1D CNN can identify specific patterns, like short-term weather fluctuations or recurring congestion periods at airports, that lead up to flight delays. These networks use filters that traverse through the sequential input data, allowing them to recognize patterns regardless of their position in time, thereby providing a form of translational invariance. In essence, the 1D CNN's ability to capture and learn from these localized patterns within a large dataset can help yield accurate predictions for flight delays, harnessing information that might be missed by models not designed for sequential pattern detection.

In [None]:
X_train_spark = spark.createDataFrame(X_train)
X_test_spark = spark.createDataFrame(X_test)


def sequence_data(data, feature_columns, N):
    
    # Add an index for sequencing and ordering
    data = data.withColumn("index", monotonically_increasing_id())
    
    # Truncate data to 3,000 entries per day
    windowSpecTruncate = Window.partitionBy("DAY_OF_MONTH").orderBy("index")
    data = data.withColumn("row_num", row_number().over(windowSpecTruncate)).filter(col("row_num") <= 3000).drop("row_num")
    
    # Generate sequences for each feature
    window_spec = Window.partitionBy("DAY_OF_MONTH").orderBy("index").rowsBetween(-N+1, 0)
    for col_name in feature_columns:
        data = data.withColumn(f"{col_name}_seq", collect_list(col(col_name)).over(window_spec))
    
    # Retaining only the sequenced columns
    columns_to_retain = [f"{col_name}_seq" for col_name in feature_columns]
    data = data.select("DAY_OF_MONTH", *columns_to_retain).orderBy("DAY_OF_MONTH", "index")
    
    return data

feature_col = ['HourlyVisibility', 'HourlyStationPressure', 'HourlyWindGustSpeed', 'ORIGIN_AIRPORT_ID', 'DEST_AIRPORT_ID']
target_label = 'TWO_HR_DELAY'

# Define N
N = 30  # Updated to 30 as per our discussion

# Sequence data using the function
X_train_sequenced = sequence_data(X_train_spark, feature_col, N)
X_test_sequenced = sequence_data(X_test_spark, feature_col, N)

# Convert back to pandas
X_train_pandas = X_train_sequenced.toPandas()
X_test_pandas = X_test_sequenced.toPandas()

# Convert to numpy arrays and reshape
X_train_array = X_train_pandas.groupby('DAY_OF_MONTH').apply(lambda group: group.drop(columns=['DAY_OF_MONTH']).to_numpy()).to_numpy()
X_test_array = X_test_pandas.groupby('DAY_OF_MONTH').apply(lambda group: group.drop(columns=['DAY_OF_MONTH']).to_numpy()).to_numpy()

X_train_array = np.array([x for x in X_train_array])
X_test_array = np.array([x for x in X_test_array])

print(X_train_array.shape)
print(X_test_array.shape)

(31, 3000, 5)
(31,)


  X_test_array = np.array([x for x in X_test_array])


In [None]:
# Define the CNN model
model = Sequential()
model.add(Conv1D(filters=64, kernel_size=3, activation='relu', input_shape=(N, len(feature_columns))))
model.add(MaxPooling1D(pool_size=2))
model.add(Flatten())
model.add(Dense(50, activation='relu'))
model.add(Dense(1, activation='sigmoid'))

model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

In [None]:
print(X_train_pandas.shape)
print(X_test_pandas.shape)

print(X_train_pandas.iloc[0, 0])

(93000, 6)
(92719, 6)
1


In [None]:
# Training the model
history = model.fit(X_train_array, y_train_adjusted, 
                    validation_data=(X_test_array, y_test_adjusted),
                    epochs=10, batch_size=64)



y_pred = model.predict(X_test_array)
y_pred_binary = [1 if p >= 0.5 else 0 for p in y_pred] 

test_loss, test_accuracy = model.evaluate(X_test_array, y_test_adjusted)
train_accuracy = history.history['accuracy'][-1] 
test_recall = recall_score(y_test_adjusted, y_pred_binary)
test_f1 = f1_score(y_test_adjusted, y_pred_binary)


print(f"Test Loss: {test_loss}")
print(f"Test Accuracy: {test_accuracy}")
print(f"Train Accuracy: {train_accuracy:.4f}")
print(f"Test Recall: {test_recall:.4f}")
print(f"Test F1 Score: {test_f1:.4f}")

[0;31m---------------------------------------------------------------------------[0m
[0;31mValueError[0m                                Traceback (most recent call last)
File [0;32m<command-4444148632306563>:5[0m
[1;32m      1[0m [38;5;28;01mfrom[39;00m [38;5;21;01msklearn[39;00m[38;5;21;01m.[39;00m[38;5;21;01mmetrics[39;00m [38;5;28;01mimport[39;00m recall_score, f1_score
[1;32m      4[0m [38;5;66;03m# Training the model[39;00m
[0;32m----> 5[0m history [38;5;241m=[39m model[38;5;241m.[39mfit(X_train_sequenced, y_train_adjusted, 
[1;32m      6[0m                     validation_data[38;5;241m=[39m(X_test_sequenced, y_test_adjusted),
[1;32m      7[0m                     epochs[38;5;241m=[39m[38;5;241m10[39m, batch_size[38;5;241m=[39m[38;5;241m64[39m)
[1;32m     11[0m y_pred [38;5;241m=[39m model[38;5;241m.[39mpredict(X_test_array)
[1;32m     12[0m y_pred_binary [38;5;241m=[39m [[38;5;241m1[39m [38;5;28;01mif[39;00m p [38;5;241m>[3

### Loss Functions

Listed below are the loss functions used in our modeling and experimentation.

**Logistic Regression & Convolutional Neural Network (CNN):**
$$
\text{Binary Cross Entropy} = -\frac{1}{N} \sum_{i=1}^{N} \left( y_i \log(p_i) + (1-y_i) \log(1-p_i) \right)
$$

**Decision Tree, Random Forest, XGBoost & Multilayer Perceptron (MLP):**
$$
\text{Cross Entropy Loss} = -\sum_{i=1}^{N} (y_i * \log(p_i) + (1 - y_i) * \log(1 - p_i))
$$

**Long Short-Term Memory (LSTM) Model:**
$$
\text{Sparse Categorical Cross Entropy} = -\sum_{i=1}^{N} y_i \cdot \log(p_i)
$$

### End-to-End Modeling Pipeline Discussion

#### Final Pipeline
To summarize what was discussed above, our final modeling pipeline consists of the following main points after data sanitization/imputation and EDA:

**1. Feature Engineering**
| Technique | Summary |
|------------|----------------|
| Log Transformations | Transformations were applied to hourly weather features to adjust the skewness in the data, as many values in features such as HourlyPrecipitation were largely null. |
| Dimensionality Reduction | Principal Component Analysis was used for dimensionality reduction on the Delay Details features. |
| Scaling | Individual scaling (mean and standard deviation) was applied to each delay feature for standardization. |
| Interaction Effects | Airport longitude and latitude were summed for their combined effect as a single feature. |
| Min/Max Scaling | Min/max scaling was employed on each feature in the Hourly Weather Family to share the same scale. |
| Graph-Based Features | For a graph-based feature, the number of flights was grouped and counted by origin airport. |
| One-Hot Encoding | For features that are categorical and non-ordinal (i.e.; airport or city), one-hot encoding was used. |
| Event-Based Features | An event-based feature was created to indicate whether a day was Christmas or New Year's. |

**2. Data Splitting -** Traditional train/test splitting was performed and selected in favor over a cross validation approach. The splitting was time-based, with 2019 data becoming the test data.

**3. Class Imbalance -** Both undersampling and oversampling techniques were explored. Undersampling was ultimately selected as the best strategy and a 3:1 ratio applied to non-delay/delay training data.

**4. Experiments & Modeling -** The data was then pipelined into our model building and experiments phase. Our results are discussed in the 'Discussions' section below.

* **Hyperparameter Tuning:** Hyperopt was the chosen library for hypertuning due to its parallelizable abilities and application of Tree-Structured Parzen Estimator (TPE).

**5. Model Selection -** Based on the finalized model pipeline, the best-performing model was the **Multilayer Perceptron** (thirteen features, single hidden layer). More details are given in the below sections.

#### Models (Phases II & III)

For our Logistic Regression, Decision Tree, Random Forest and KNN models, we reference our experiments and results from phases II and III.

#### Logistic Regression

**1. Number of Input Features:**
We used four input features for the logistic regression model, namely:
- DAY_OF_MONTH
- DISTANCE
- LATITUDE
- LONGITUDE

**2. Number of Experiments Conducted:**
During phase III, we conducted a total of one main experiment to build the logistic regression model for flight delay prediction. Additionally, we employed k-fold cross-validation to evaluate the model's performance and generalization.

**3. Model Building Time:**
The logistic regression model took approximately 1.5 minutes to train.

**4. Metrics and Coefficients:**
The coefficients (weights) learned by the logistic regression model for each input feature are as follows:

- DAY_OF_MONTH: -0.0236
- DISTANCE: 8.650e-05
- LATITUDE: 0.0070
- LONGITUDE: 0.0074

**5. Discussion:**
The baseline experiment involved using a logistic regression model with the four input features. During phase II, the primary focus was on understanding the importance of each input feature through their corresponding coefficients.

From the coefficients, we observe that the DAY_OF_MONTH feature has a negative coefficient, suggesting a negative correlation with flight delays. In contrast, the DISTANCE, LATITUDE, and LONGITUDE features have positive coefficients, indicating a positive correlation with flight delays.

To further improve the model's performance and overcome the challenge of predicting the majority class, phase III explored various families of models, such as random forest trees, and phas IV employed resampling techniques to address class imbalance.

The results from phase III served as a solid foundation for refining the model and developing a more accurate and robust flight delay prediction system, enhancing the overall efficiency and reliability of airline operations for both airlines and passengers.


#### Decision Tree

**1. Number of Input Features:**
We used eight input features for the decision tree model, namely:
- DAY_OF_MONTH
- MONTH
- DISTANCE
- sum_lat_lon
- HourlyVisibility
- HourlyPrecipitation
- HourlyWindSpeed
- ELEVATION

**2. Number of Experiments Conducted:**
During phase III, we conducted a total of three experiments to build the decision tree model for flight delay prediction. We also conducted hyperparameter tuning to pick the best number of bins and depth of the tree.

**3. Model Building Time:**
The decision tree model took approximate 1.5 minutes to train.

**4. Metrics and Coefficients:**
Below were the specifications of the Decision Tree:
DecisionTreeClassificationModel: uid=DecisionTreeClassifier_cc3b63234f74, depth=14, numNodes=5523, numClasses=2, numFeatures=9

**5. Discussion:**
In phase III, the model had a low accuracy but high precision and recall, and the model learned why flights were delayed.


#### Random Forest

**1. Number of Input Features:**
We used four input features for the random forest model, namely:
- DAY_OF_MONTH
- MONTH
- SUM_LAT_LON
- HourlyVisibility
- HourlyPrecipitation
- ELEVATION

**2. Number of Experiments Conducted:**
During phase III, we conducted a total of three experiments to build the random forest model for flight delay prediction. Additionally, we employed k-fold cross-validation to evaluate the model's performance and generalization.

**3. Model Building Time:**
The random forest model took approximately 1 minute to train.

**4. Metrics and Coefficients:**
The feature importance learned by the random forest model for each input feature were as follows:

- DAY_OF_MONTH: 0.5667
- MONTH: 0.0634
- SUM_LAT_LON: 0.0002
- HourlyVisibility: 0.0127
- HourlyPrecipitation: 0.2563
- Elevation: 0.1008

**5. Discussion:**
The random forest model used six input features: DAY_OF_MONTH, MONTH, SUM_LAT_LON, HourlyVisibility, HourlyPrecipitation, Elevation. The model produced a test accuracy of 0.75, a precision of 0.72, and a recall of 0.75.

From the feature importance, we observed that day of month and hourly precipitation were most relevant. We replaced sum of latitiude and longitude and hourly visibility with other features in phase IV.

We had considered adding more features to see if there was improvement in the model's metrics, however in phase IV, we decided to explore other sophisticated algorithms. The random forest model remained a good candidate for the final model in predicting delayed flights as it performed about the same as the baseline logistic regression model.


#### KNN

**1. Number of Input Features:**
We used twenty-five input features for the logistic regression model in phase III, namely:
- MONTH
- YEAR
- MONTH
- DAY_OF_MONTH
- DAY_OF_WEEK
- DISTANCE
- dest_airport_lon
- dest_airport_lat
- LATITUDE
- LONGITUDE
- ELEVATION
- HourlyPrecipitation
- HourlyVisibility
- HourlyPressureChange
- HourlyPressureTendency
- HourlyStationPressure
- HourlyWindDirection
- HourlyWindSpeed
- ORIGIN_AIRPORT_ID
- ORIGIN_CITY_MARKET_ID
- ORIGIN_STATE_FIPS
- DEST_AIRPORT_ID
- DEP_TIME_BLK
- STATION
- sum_lat_lon

**2. Number of Experiments Conducted:**
During phase III, we conducted one experiment to build the KNN model for flight delay prediction. Additionally, we performed min-max scaling and employed k-fold cross-validation to evaluate the model's performance and generalization.

**3. Model Building Time:**
Using a subset of 50,000 samples, the KNN model took approximately 44.21 seconds to train in phase III.

**4. Metrics and Coefficients:**
In phase IV, the KNN was used for the identification of meaningful features. In the future, we propose to use the KNN as a sophisticated imputation technique. 

**5. Discussion:**
k-Nearest Neighbors (KNN) is often unsuitable for large datasets due to its computational and memory intensiveness. For our purpose we expanded upon this option during phase IV by employing it for feature selection. This model assisted in identifying good clustering to help with selecting the following features:

- DAY_OF_MONTH
- HourlyVisibility
- HourlyStationPressure
- HourlyWindGustSpeed
- ORIGIN_AIRPORT_ID
- DEST_AIRPORT_ID

#### Sophisticated Models (Phase IV)


#### XGBoost
**1. Number of Input Features:** We used six input features for the XGBoost model, namely:
- DAY_OF_MONTH
- MONTH
- ORIGIN_AIRPORT_ID
- sum_lat_lon
- HourlyStationPressure
- HourlyWindGustSpeed

**2. Number of Experiments Conducted:** During phase IV, we conducted a total of one experiment to build the XGBoost model for flight delay prediction.

**3. Model Building Time:** The XGBoost model took approximately 5.85 minutes to run.

**4. Metrics and Coefficients:** The metrics for XGBoost were accuracy, weighted precision, weighted recall. The metrics are the following: accuracy: 0.92, weighted precision: 0.32, weighted recall: 0.0002. Further discussion can be found in the 'Results and Discussion of Results' section.

**5. Discussion:** The XGBoost model was a further enhancement of the decision tree and random forest modeling performed in previous phases. The goal was to ultimately boost the results. Based on the metrics, the model did not perform as expected on predicting delayed flights. The XGBoost model had a high accuracy, but the precision and recall were significantly low. This indicated that out of the delayed flights, the XGBoost model did not predict delayed flights the majority of the time. 


#### Multilayer Perceptron (MLP)

**1. Number of Input Features:** We used thirteen input features for the best-performing MLP model, namely:

- DAY_OF_MONTH
- MONTH
- ORIGIN_AIRPORT_ID
- sum_lat_lon
- HourlyVisibility
- HourlyStationPressure
- HourlyWindGustSpeed
- CARRIER_DELAY
- WEATHER_DELAY 
- NAS_DELAY
- SECURITY_DELAY
- LATE_AIRCRAFT_DELAY
- DEP_DELAY

**2. Number of Experiments Conducted:** During phase IV, a total of three experiments were conducted to build the MLP model for flight delay prediction. The first two tested different features, while the third took the best-performing of the two models and applied tuned-hyperparameters.

**3. Model Building Time:** The MLP model took approximately 9.66 minutes to train the best-performing version (Experiment 2).

**4. Metrics and Coefficients:** The metrics for the MLP were accuracy, weighted precision, weighted recall, and F1 score. The model that performed the most well-rounded was selected with results of accuracy: 0.91, weighted precision: 0.93, weighted recall: 0.91, and f1-score: 0.92. Further discussion can be found in the 'Results and Discussion of Results' section.

**5. Discussion:** Even with a single layer, the MLP was one of the most generalizable of the models experimented with. It gave consistently high performance across different features and experiments.


#### Long Short-Term Memory Model (LSTM)

**1. Number of Input Features:** We used thirteen input features for the LSTM model, namely:

- DAY_OF_MONTH
- MONTH
- ORIGIN_AIRPORT_ID
- sum_lat_lon
- HourlyVisibility
- HourlyStationPressure
- HourlyWindGustSpeed
- CARRIER_DELAY
- WEATHER_DELAY 
- NAS_DELAY
- SECURITY_DELAY
- LATE_AIRCRAFT_DELAY
- DEP_DELAY

**2. Number of Experiments Conducted:** During phase IV, we attempted several experiments to build the LSTM model for flight delay prediction. Constraints such as time and computational resources prevented us from materializing a full implementation.

**3. Model Building Time:** See above comments.

**4. Metrics and Coefficients:** For metrics, we had planned to use the accuracy score for the LSTM model.

**5. Discussion:** As a recurrent neural network, the LSTM model was one of the more challenging to implement. Although it had the advantage of being able to capture dependencies and patterns in sequential data, it also required careful tuning of hyperparameters and a sufficient amount of training data to perform effectively. The model's scalability on Spark was limited as it was built on Tensorflow and required Tensorflow-to-Databricks cluster configuration. This resulted in long training times and fewer possible experimentations we were interested in exploring.


#### Convolutional Neural Network (CNN)

**1. Number of Input Features:** We used five input features for the CNN model, namely:

- HourlyVisibility
- HourlyStationPressure
- HourlyWindGustSpeed
- ORIGIN_AIRPORT_ID
- DEST_AIRPORT_ID

**2. Number of Experiments Conducted:** During phase IV, we attempted several experiments to build the CNN model for flight delay prediction. Constraints such as time and computational resources prevented us from materializing a full implementation.

**3. Model Building Time:** See above comments.

**4. Metrics and Coefficients:** For metrics, we had planned to report the accuracy, recall, and F1 scores for the CNN.

**5. Discussion:** The second of the two models that were more challenging to implement, the CNN used a series of sequential layers, including a single convolutional layer, to operate on the temporal flight delay data. The CNN would have learned to capture temporal patterns and relationships in the data, such as how certain weather conditions at certain times affect flight delays. The model was also built on Tensorflow and needed additional configuration with the cluster to train faster and more efficiently. Thus, long training times were also expected.

### Novel Approaches

Throughout our modeling process, we employed a diverse set of techniques ranging from traditional machine learning models to more intricate deep learning architectures. Our approach aimed to identify the most suitable model that could best harness the intricacies of our dataset while ensuring reliable predictions.

#### What Worked & What Didn't

The Logistic Regression Model provided a solid baseline, achieving an accuracy of 0.75 on both training and testing datasets. The Decision Tree Classifier, surprisingly, underperformed with a test accuracy of 0.45. However, its high precision and recall indicate potential utility in certain scenarios. Random Forest mirrored our baseline's test accuracy and provided a modest boost in precision. The K-Nearest Neighbors, despite a higher training accuracy, did not outperform our baseline in the testing phase. XGBoost, LSTM, and 1D Convolutional Neural Network, unfortunately, were not operational in our environment. However, the standout was the Multilayer Perceptron (MLP), boasting the highest accuracy of 0.9076 along with impressive precision, recall, and F1 scores.

In [None]:
#### Model Experiment Summary Table

In [None]:
model_performances = {"Model Type": ["Baseline Logistic Regression"], "Input Features": ["'DAY_OF_MONTH', 'DISTANCE', 'LATITUDE', 'LONGITUDE'"], "Train Accuracy": [0.75], "Test Accuracy": [0.75], "Test Precision": [None], "Test Recall": [None], }
results_df = pd.DataFrame(model_performances)

# Add the rest of the models here using the below line as an example
results_df.loc[len(results_df)] = ["Decision Tree Classifier", "'DAY_OF_MONTH', 'MONTH', 'DISTANCE', 'sum_lat_lon', 'HourlyVisibility', 'HourlyPrecipitation', 'HourlyWindSpeed', 'ELEVATION', 'DISTANCE_GROUP'", 0.45, 0.45, 0.70, 0.75]
results_df.loc[len(results_df)] = ["Random Forest Classifier", "'DAY_OF_MONTH', 'MONTH', 'sum_lat_lon', 'HourlyVisibility', 'HourlyPrecipitation', 'ELEVATION'", None, 0.75, 0.72, 0.75]
results_df.loc[len(results_df)] = ["KNN", "'MONTH', 'YEAR', 'MONTH', 'DAY_OF_MONTH', 'DAY_OF_WEEK', 'DISTANCE', 'dest_airport_lon', 'dest_airport_lat', 'LATITUDE', 'LONGITUDE', 'ELEVATION', 'HourlyPrecipitation', 'HourlyVisibility', 'HourlyPressureChange', 'HourlyPressureTendency', 'HourlyStationPressure', 'HourlyWindDirection', 'HourlyWindSpeed', 'ORIGIN_AIRPORT_ID', 'ORIGIN_CITY_MARKET_ID', 'ORIGIN_STATE_FIPS', 'DEST_AIRPORT_ID', 'DEP_TIME_BLK', 'STATION', 'sum_lat_lon'", 0.80, 0.73, 0.42, 0.21]
results_df.loc[len(results_df)] = ["XGBoost", "'DAY_OF_MONTH', 'MONTH','ORIGIN_AIRPORT_ID', 'sum_lat_lon', 'HourlyStationPressure','HourlyWindGustSpeed'", None, 0.92, 0.32, 0.0002]
results_df.loc[len(results_df)] = ["Multilayer Perceptron", "'DAY_OF_MONTH', 'MONTH', 'ORIGIN_AIRPORT_ID', 'sum_lat_lon', 'HourlyVisibility', 'HourlyStationPressure', 'HourlyWindGustSpeed', 'CARRIER_DELAY', 'WEATHER_DELAY', 'NAS_DELAY', 'SECURITY_DELAY', 'LATE_AIRCRAFT_DELAY', 'DEP_DELAY', 'TWO_HR_DELAY'", None, 0.91, 0.93, 0.91]

display(results_df)

print("No results for LSMT and 1D CNN ")

Model Type,Input Features,Train Accuracy,Test Accuracy,Test Precision,Test Recall
Baseline Logistic Regression,"'DAY_OF_MONTH', 'DISTANCE', 'LATITUDE', 'LONGITUDE'",0.75,0.75,,
Decision Tree Classifier,"'DAY_OF_MONTH', 'MONTH', 'DISTANCE', 'sum_lat_lon', 'HourlyVisibility', 'HourlyPrecipitation', 'HourlyWindSpeed', 'ELEVATION', 'DISTANCE_GROUP'",0.45,0.45,0.7,0.75
Random Forest Classifier,"'DAY_OF_MONTH', 'MONTH', 'sum_lat_lon', 'HourlyVisibility', 'HourlyPrecipitation', 'ELEVATION'",,0.75,0.72,0.75
KNN,"'MONTH', 'YEAR', 'MONTH', 'DAY_OF_MONTH', 'DAY_OF_WEEK', 'DISTANCE', 'dest_airport_lon', 'dest_airport_lat', 'LATITUDE', 'LONGITUDE', 'ELEVATION', 'HourlyPrecipitation', 'HourlyVisibility', 'HourlyPressureChange', 'HourlyPressureTendency', 'HourlyStationPressure', 'HourlyWindDirection', 'HourlyWindSpeed', 'ORIGIN_AIRPORT_ID', 'ORIGIN_CITY_MARKET_ID', 'ORIGIN_STATE_FIPS', 'DEST_AIRPORT_ID', 'DEP_TIME_BLK', 'STATION', 'sum_lat_lon'",0.8,0.73,0.42,0.21
XGBoost,"'DAY_OF_MONTH', 'MONTH','ORIGIN_AIRPORT_ID', 'sum_lat_lon', 'HourlyStationPressure','HourlyWindGustSpeed'",,0.92,0.32,0.0002
Multilayer Perceptron,"'DAY_OF_MONTH', 'MONTH', 'ORIGIN_AIRPORT_ID', 'sum_lat_lon', 'HourlyVisibility', 'HourlyStationPressure', 'HourlyWindGustSpeed', 'CARRIER_DELAY', 'WEATHER_DELAY', 'NAS_DELAY', 'SECURITY_DELAY', 'LATE_AIRCRAFT_DELAY', 'DEP_DELAY', 'TWO_HR_DELAY'",,0.91,0.93,0.91


No results for LSMT and 1D CNN 


#### What We Learned

Throughout the structured progression of our project, spanning four distinct phases, a multitude of insights were garnered:

**Phase 1:** By implementing the baseline logistic regression model, we established a foundational understanding of our dataset's behavior and set a benchmark for subsequent models. This phase highlighted the importance of setting a comparative standard early on, anchoring our expectations and providing direction for future phases.

**Phase 2:** As we delved into the Decision Tree, Random Forest, and KNN, it became evident that while some models, like the Decision Tree, might underperform in accuracy, they could offer valuable insights in other metrics such as precision and recall. The variances in model performance reiterated the idea that no single algorithm is universally optimal, and a nuanced approach to evaluation is essential.

**Phase 3:** Our experimentation with advanced models, including MLP, XGBoost, LSTM, and 1D CNN, provided a deeper appreciation for the intricacies of model tuning and compatibility. Notably, while some models failed to execute efficiently, the MLP emerged as a stellar performer, reinforcing the notion that while some architectures might excel in certain datasets, they may not be suited for others.

**Phase 4:** The final phase, which focused on fine-tuning feature selection for the MLP, was a testament to the significance of iterative refinement. Even with a high-performing model, the right features are paramount in unlocking its full potential. Through meticulous optimization, we were able to harness the maximum predictive power of the MLP.

In retrospect, the phased approach not only allowed for methodical exploration and optimization but also provided us with a comprehensive understanding of the interplay between data, features, and models. It underscored the principle that effective machine learning isn't just about algorithms but the synergy between data preprocessing, model selection, and continuous evaluation.

### Results and Discussion of Results

**Gap Analysis of Our Team's Performance against Other Teams**

Our flight delay prediction project has been an arduous journey of model exploration and refinement, culminating in the development of a high-performing Multi-Layer Perceptron (MLP) model. With a resounding test accuracy of 0.91, a precision of 0.93, and a recall of 0.91, our MLP model has proven its mettle as an exceptional tool for predicting flight delays accurately.

In the context of the broader competition, we undertook a comprehensive gap analysis to contextualize our achievements among our peers based on the Project Leaderboard. This analysis highlights the following key insights:

1. **Model Excellence**: Our MLP model's standout performance underscores its exceptional predictive capabilities. Achieving a remarkable test accuracy and maintaining high precision and recall rates signifies the model's robustness across various prediction scenarios.

2. **Metric Consideration**: Our choice to focus on accuracy, precision, and recall as evaluation metrics allowed us to gain a holistic understanding of our model's performance. These metrics provide insights into the model's ability to make accurate predictions, identify true positive cases, and maintain precision under different conditions.

3. **Diverse Approaches**: While our team prioritized the development and optimization of the MLP model, other teams explored a diverse array of machine learning algorithms, ranging from Random Forest and Logistic Regression to Gradient-Boosted Trees and beyond. This heterogeneity in approach reflects the multidimensionality of tackling the flight delay prediction challenge.

4. **Feature Engineering Mastery**: A key contributor to our model's success was our rigorous feature engineering process. We strived to maintain the interpretability of selected features while maximizing their predictive power. This approach contrasts with other teams that opted for techniques like Principal Component Analysis (PCA) for dimensionality reduction.

5. **Efficiency and Scalability**: The efficiency of our MLP model, both in training times and execution duration, positions it as a viable solution for real-world applications. This balance between accuracy and efficiency is crucial, particularly in domains with high data volumes and time-sensitive predictions.

6. **Future Trajectory**: Our accomplishments provide a foundation for further refinement. Incorporating insights from the strategies employed by other teams, such as leveraging Gradient-Boosted Trees and Support Vector Machines, presents opportunities to bolster our model's robustness and predictive performance.

7. **Holistic Impact**: Beyond the technical achievements, our successful model contributes to improved travel experiences by enhancing flight delay prediction accuracy. This not only benefits airlines but also empowers passengers with more informed decision-making during their journeys.

In conclusion, our gap analysis underscores the collective commitment and expertise of our team in constructing a formidable flight delay prediction model. The supremacy of our MLP model, as demonstrated by its impressive accuracy, precision, and recall, positions us among the top contenders on the Project Leaderboard. Armed with these accomplishments and informed by the strategies of our peers, we are poised to elevate our model's capabilities even further, thus advancing the accuracy of flight delay prediction and elevating travel experiences for all stakeholders.

### Leakage

Data leakage refers to a mistake made in the preprocessing of data where information from the test set unintentionally influences the training set. It often leads to overly optimistic and inaccurate model performance metrics. For instance, in the context of predicting flight delays, suppose we decide to use the average delay duration of an entire dataset, including the test set, to fill missing values in the training set. This would mean the training model is inadvertently informed by future data, resulting in leakage.

#### Potential Leakage in Our Pipeline

Upon close inspection of our data processing pipeline - from high-level data cleansing, feature analysis and engineering, data splitting, addressing class imbalance, to hyperparameter tuning - there's no evident data leakage. Every step is carefully crafted to ensure separation between training and testing datasets. Especially notable is the time-based split used, ensuring that no future data (from 2019) informs the training models.

#### Potential Violations of ML Cardinal Rules

We have been diligent to avoid common pitfalls in ML. For instance, our handling of time series data ensures temporal consistency, preventing any 'looking into the future'. Similarly, class imbalance is addressed by using undersampling, and hyperparameter tuning is automated using Hyperopt to avoid overfitting and to ensure generalization. Hence, we are not violating cardinal sins of ML in our approach.

#### Why Our Pipeline is Sound

Our pipeline is meticulously structured to prevent leakage. Features were engineered without any knowledge of the test set, and data splitting preserves the chronological order of the dataset. Additionally, undersampling was applied to tackle class imbalance without introducing biases. Hyperparameter tuning, managed by Hyperopt, guarantees a comprehensive yet efficient search without risking overfitting. With these precautions and strategies in place, our pipeline is safeguarded against both data leakage and cardinal sins of ML.

### Performance and Scalability

In the dynamic realm of flight delay prediction, achieving optimal performance and scalability is paramount for effectively handling the vast and intricate datasets while ensuring accurate forecasts. PySpark, with its robust distributed computing capabilities, provides a solid foundation for tackling these challenges. However, the integration of external machine learning (ML) frameworks beyond Spark's native MLlib introduces certain complexities that require thoughtful consideration.

PySpark's strengths lie in its inherent features:

PySpark harnesses the power of distributed computing, making it adept at processing and analyzing extensive datasets, a critical aspect when dealing with historical flight data for delay prediction. Its parallel processing capabilities significantly expedite model training, a crucial advantage for adapting predictive models promptly as new data streams in. Furthermore, PySpark's built-in MLlib offers an array of machine learning algorithms optimized for Spark's distributed environment. This integration simplifies the model development process and ensures efficiency.

Nonetheless, incorporating external ML frameworks can pose challenges:

When introducing external ML frameworks into PySpark, complexities around compatibility and integration can arise. Data transformations may be necessary to align with the external framework's requirements, leading to additional overhead in terms of time and effort. Not all external frameworks seamlessly leverage Spark's distributed architecture, potentially limiting their efficiency with large-scale data. Ensuring consistent deployment between Spark's native MLlib and external frameworks can be intricate due to differences in training behavior and prediction outcomes. Additionally, the performance of external frameworks might vary based on factors like data distribution and cluster configuration.

### Limitations, Challenges, Future Work

#### Limitations

**Data Scope:** Our dataset was primarily sourced from the US Department of Transportation and the National Oceanic and Atmospheric Administration, covering the period between 2015 and 2019. Data from 2020 onwards has been omitted, largely due to the unpredictable impacts of the COVID-19 pandemic on flight patterns.

**Feature Engineering:** While we incorporated a broad array of feature engineering techniques, there's always the potential for other, unexplored features that could enhance our predictive power. For instance, we did not delve deeply into geospatial factors or airline-specific metrics. 

**Class Imbalance:** Although undersampling was applied, we maintained a 3:1 ratio of non-delayed to delayed flights. This means our models might still be biased towards predicting flights as non-delayed.

**Computational Resources:** Our Databricks notebook environment was shared among four members, causing scheduling conflicts and resulting in time constraints. This limited the efficiency and the scope of tasks we could run simultaneously, hindering the timely progression of our analyses and experiments.

#### Challenges

**Complex Data Preprocessing:** The significant size and dimensionality of the datasets necessitated comprehensive preprocessing. Handling missing data, especially with respect to weather information, required sophisticated interpolation techniques. We also identified and and removed data that lacked utility.

**Time-based Data Splitting:** Given the time-series nature of the data, ensuring that our train/test splits and cross-validation did not introduce any temporal leakage was a challenging endeavor.
Hyperparameter Tuning: While Hyperopt streamlined the hyperparameter tuning process, determining the ideal search space and managing computational costs remained demanding tasks.

#### Future Work

**Complete Current Pipeline:** Unfortunately we were not able to problem solve and get the LSTM or 1D CNN models working. Moving forward it would be interesting to see the result of these two models

**Incorporation of Post-2019 Data:** Even though 2020-2022 are considered anomalous due to the pandemic, incorporating this data could provide insights into how major global events impact flight delays.

**Expanded Feature Engineering:** Investigate other potential features, like geospatial data or intricate airport logistical data, to refine our predictive power.

**Alternate Class Imbalance Strategies:** Beyond undersampling, techniques such as oversampling or using synthetic data with techniques like SMOTE could be explored to further mitigate the impact of class imbalance on model performance.

**Enhanced Model Architectures:** Dive deeper into the realm of deep learning, possibly exploring Transformer-based architectures or more sophisticated recurrent neural network setups.

### Conclusion
Based on the data provided, we have conducted exploratory visualizations to gain insights into the data values and structure. The data was thoroughly cleansed, removing inconsistencies, errors, and missing values to ensure a high-quality and reliable dataset. This clean dataset forms the foundation for building accurate predictive models.

By performing feature engineering utilizing log transformations, scaling, and sum of columns, we are able to create better features for predicting delayed flights.

For initial insights, we opted for a simpler logistic regression model. This model serves as a baseline to understand the data and analyze the performance of a low-effort approach. Evaluation metrics like accuracy, AUC, and ROC curve help assess the model's effectiveness in predicting flight delays.

For our more advanced models, we opted for the decision tree, random forest, KNN model, XGBoost and MLP. Decision trees, random forests, XGBoost, and MLP work well on large scale datasets so it made sense to compare the models with the baseline logistic regression model. The KNN model gives us more understanding on which features are relevant in predicting weather delay.

Hyperparameter tuning was a crucial part of the optimization process. Systematic experimentation with various hyperparameter combinations, such as learning rates, regularization parameters, and model complexity helped improve the model's performance. This iterative tuning process involved training and evaluating the model's performance.

If we had more time, we would like to experiment more with the MLP model to achieve a higher accuracy.