In [1]:
import numpy as np 
import matplotlib.pyplot as plt 
import pyspark as ps 
from pyspark.sql import SparkSession
from pyspark.sql.functions import when
import pyspark.sql.functions as pf
import pandas as pd 
import seaborn as sns
import os 
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import accuracy_score, mean_squared_error


from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler, MinMaxScaler
from pyspark.sql.functions import col
from pyspark.sql.functions import col
from pyspark.ml.classification import RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import (
    MulticlassClassificationEvaluator,
    BinaryClassificationEvaluator,
)
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
from pyspark.ml import Pipeline
from pyspark.sql.types import StringType, DateType



In [2]:
df = pd.read_csv('flight_delay_predict.csv')
df['is_delay'] = df['is_delay'].astype(int)
df

Unnamed: 0,is_delay,Year,Quarter,Month,DayofMonth,DayOfWeek,FlightDate,Reporting_Airline,Origin,OriginState,Dest,DestState,CRSDepTime,Cancelled,Diverted,Distance,DistanceGroup,ArrDelay,ArrDelayMinutes,AirTime
0,1,2014,1,1,1,3,2014-01-01,UA,LAX,CA,ORD,IL,900,0.0,0.0,1744.0,7,43.0,43.0,218.0
1,0,2014,1,1,1,3,2014-01-01,AA,IAH,TX,DFW,TX,1750,0.0,0.0,224.0,1,2.0,2.0,50.0
2,1,2014,1,1,1,3,2014-01-01,AA,LAX,CA,ORD,IL,1240,0.0,0.0,1744.0,7,26.0,26.0,220.0
3,1,2014,1,1,1,3,2014-01-01,AA,DFW,TX,LAX,CA,1905,0.0,0.0,1235.0,5,159.0,159.0,169.0
4,0,2014,1,1,1,3,2014-01-01,AA,DFW,TX,CLT,NC,1115,0.0,0.0,936.0,4,-13.0,0.0,108.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1635585,1,2018,4,12,31,1,2018-12-31,AA,DFW,TX,DEN,CO,1440,0.0,0.0,641.0,3,24.0,24.0,100.0
1635586,0,2018,4,12,31,1,2018-12-31,DL,PHX,AZ,ATL,GA,1420,0.0,0.0,1587.0,7,-14.0,0.0,179.0
1635587,1,2018,4,12,31,1,2018-12-31,AA,ORD,IL,SFO,CA,1700,0.0,0.0,1846.0,8,39.0,39.0,272.0
1635588,0,2018,4,12,31,1,2018-12-31,AA,ORD,IL,LAX,CA,720,0.0,0.0,1744.0,7,-10.0,0.0,240.0


In [3]:
spark = (
SparkSession.builder
.appName("Flight_Delay")
.config("spark.dynamicAllocation.enabled", True)
.config("spark.dynamicAllocation.minExecutors", 5)
.config("spark.dynamicAllocation.maxExecutors", 20)
.config("spark.executor.cores", 1)
.config("spark.executor.instances", 18)
.config("spark.driver.memory", "10g")
.config("spark.executor.memory", "4g")
.config("spark.sql.execution.arrow.pyspark.enabled", True)
.config("spark.memory.offHeap.enabled", "true")
.config("spark.memory.offHeap.size", "10g")
.getOrCreate()
)


24/07/03 11:41:28 WARN Utils: Your hostname, pail resolves to a loopback address: 127.0.1.1; using 10.20.34.13 instead (on interface enp68s0)
24/07/03 11:41:28 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/07/03 11:41:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Preprocessing 

In [4]:
df = spark.read.csv('flight_delay_predict.csv', header=True, inferSchema=True)

                                                                                

In [5]:
# spark_df = df.to_spark()
spark_df = df.dropna()


categorical_columns = ["Reporting_Airline", "Origin", "OriginState", "Dest", "DestState"]

# dropping datetime columns
date_cols = [
    field.name
    for field in spark_df.schema.fields
    if isinstance(field.dataType, (DateType))
]


spark_df = spark_df.drop(*date_cols)

# dropping the ArrDelay and ArrDelayMinutes columns
spark_df = spark_df.drop("ArrDelay", "ArrDelayMinutes")



# encoding categorical columns
indexers = [
    StringIndexer(inputCol=column, outputCol=column+"_idx").fit(spark_df)
    for column in categorical_columns
]

for idx in indexers:
    spark_df = idx.transform(spark_df)


# dropping the original string columns

string_cols = [
    field.name
    for field in spark_df.schema.fields
    if isinstance(field.dataType, (StringType))
]

spark_df = spark_df.drop(*string_cols)

# assembling all numerical columns into 1 vector

numerical_columns = [col for col in spark_df.columns if col != "is_delay"]


assembler = VectorAssembler(
    inputCols=[col + "_idx" for col in categorical_columns] + numerical_columns,
    outputCol="features",
)


spark_df = assembler.transform(spark_df)

                                                                                

In [6]:
spark_df.head(5)

24/07/03 11:41:43 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


[Row(is_delay=1.0, Year=2014, Quarter=1, Month=1, DayofMonth=1, DayOfWeek=3, CRSDepTime=900, Cancelled=0.0, Diverted=0.0, Distance=1744.0, DistanceGroup=7, AirTime=218.0, Reporting_Airline_idx=1.0, Origin_idx=0.0, OriginState_idx=0.0, Dest_idx=1.0, DestState_idx=2.0, features=DenseVector([1.0, 0.0, 0.0, 1.0, 2.0, 2014.0, 1.0, 1.0, 1.0, 3.0, 900.0, 0.0, 0.0, 1744.0, 7.0, 218.0, 1.0, 0.0, 0.0, 1.0, 2.0])),
 Row(is_delay=0.0, Year=2014, Quarter=1, Month=1, DayofMonth=1, DayOfWeek=3, CRSDepTime=1750, Cancelled=0.0, Diverted=0.0, Distance=224.0, DistanceGroup=1, AirTime=50.0, Reporting_Airline_idx=0.0, Origin_idx=7.0, OriginState_idx=1.0, Dest_idx=3.0, DestState_idx=1.0, features=DenseVector([0.0, 7.0, 1.0, 3.0, 1.0, 2014.0, 1.0, 1.0, 1.0, 3.0, 1750.0, 0.0, 0.0, 224.0, 1.0, 50.0, 0.0, 7.0, 1.0, 3.0, 1.0])),
 Row(is_delay=1.0, Year=2014, Quarter=1, Month=1, DayofMonth=1, DayOfWeek=3, CRSDepTime=1240, Cancelled=0.0, Diverted=0.0, Distance=1744.0, DistanceGroup=7, AirTime=220.0, Reporting_Airl

In [7]:
spark_df = spark_df.withColumn("is_delay", col('is_delay').cast('integer')) # cast label column to integer

In [8]:
spark_df.head(5)

[Row(is_delay=1, Year=2014, Quarter=1, Month=1, DayofMonth=1, DayOfWeek=3, CRSDepTime=900, Cancelled=0.0, Diverted=0.0, Distance=1744.0, DistanceGroup=7, AirTime=218.0, Reporting_Airline_idx=1.0, Origin_idx=0.0, OriginState_idx=0.0, Dest_idx=1.0, DestState_idx=2.0, features=DenseVector([1.0, 0.0, 0.0, 1.0, 2.0, 2014.0, 1.0, 1.0, 1.0, 3.0, 900.0, 0.0, 0.0, 1744.0, 7.0, 218.0, 1.0, 0.0, 0.0, 1.0, 2.0])),
 Row(is_delay=0, Year=2014, Quarter=1, Month=1, DayofMonth=1, DayOfWeek=3, CRSDepTime=1750, Cancelled=0.0, Diverted=0.0, Distance=224.0, DistanceGroup=1, AirTime=50.0, Reporting_Airline_idx=0.0, Origin_idx=7.0, OriginState_idx=1.0, Dest_idx=3.0, DestState_idx=1.0, features=DenseVector([0.0, 7.0, 1.0, 3.0, 1.0, 2014.0, 1.0, 1.0, 1.0, 3.0, 1750.0, 0.0, 0.0, 224.0, 1.0, 50.0, 0.0, 7.0, 1.0, 3.0, 1.0])),
 Row(is_delay=1, Year=2014, Quarter=1, Month=1, DayofMonth=1, DayOfWeek=3, CRSDepTime=1240, Cancelled=0.0, Diverted=0.0, Distance=1744.0, DistanceGroup=7, AirTime=220.0, Reporting_Airline_id

In [9]:
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
scaler = scaler.fit(spark_df)
scaled_df = scaler.transform(spark_df)

                                                                                

In [10]:
scaled_df.head(5)

[Row(is_delay=1, Year=2014, Quarter=1, Month=1, DayofMonth=1, DayOfWeek=3, CRSDepTime=900, Cancelled=0.0, Diverted=0.0, Distance=1744.0, DistanceGroup=7, AirTime=218.0, Reporting_Airline_idx=1.0, Origin_idx=0.0, OriginState_idx=0.0, Dest_idx=1.0, DestState_idx=2.0, features=DenseVector([1.0, 0.0, 0.0, 1.0, 2.0, 2014.0, 1.0, 1.0, 1.0, 3.0, 900.0, 0.0, 0.0, 1744.0, 7.0, 218.0, 1.0, 0.0, 0.0, 1.0, 2.0]), scaled_features=DenseVector([0.8252, 0.0, 0.0, 0.4043, 1.0347, 1461.0425, 0.9039, 0.2939, 0.1139, 1.5081, 1.8035, 0.0, 0.0, 3.2409, 3.2656, 3.435, 0.8252, 0.0, 0.0, 0.4043, 1.0347])),
 Row(is_delay=0, Year=2014, Quarter=1, Month=1, DayofMonth=1, DayOfWeek=3, CRSDepTime=1750, Cancelled=0.0, Diverted=0.0, Distance=224.0, DistanceGroup=1, AirTime=50.0, Reporting_Airline_idx=0.0, Origin_idx=7.0, OriginState_idx=1.0, Dest_idx=3.0, DestState_idx=1.0, features=DenseVector([0.0, 7.0, 1.0, 3.0, 1.0, 2014.0, 1.0, 1.0, 1.0, 3.0, 1750.0, 0.0, 0.0, 224.0, 1.0, 50.0, 0.0, 7.0, 1.0, 3.0, 1.0]), scaled_f

# Modeling

In [11]:
train_df, test_df = scaled_df.randomSplit([0.8, 0.2], seed=42)

In [12]:
evaluator = MulticlassClassificationEvaluator(labelCol='is_delay', metricName='weightedFMeasure', beta=2.0)

In [13]:
lr = LogisticRegression(featuresCol="scaled_features", labelCol="is_delay")
lr_model = lr.fit(train_df)
lr_predictions = lr_model.transform(test_df)
print(lr_predictions)
lr_fbeta = evaluator.evaluate(lr_predictions)
print(f"f-Beta measure for Logistic Regression: ", lr_fbeta)

24/07/03 11:41:49 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
                                                                                

+--------+----+-------+-----+----------+---------+----------+---------+--------+--------+-------------+-------+---------------------+----------+---------------+--------+-------------+--------------------+--------------------+--------------------+--------------------+----------+
|is_delay|Year|Quarter|Month|DayofMonth|DayOfWeek|CRSDepTime|Cancelled|Diverted|Distance|DistanceGroup|AirTime|Reporting_Airline_idx|Origin_idx|OriginState_idx|Dest_idx|DestState_idx|            features|     scaled_features|       rawPrediction|         probability|prediction|
+--------+----+-------+-----+----------+---------+----------+---------+--------+--------+-------------+-------+---------------------+----------+---------------+--------+-------------+--------------------+--------------------+--------------------+--------------------+----------+
|       0|2014|      1|    1|         1|        3|        59|      0.0|     0.0|  1199.0|            5|  135.0|                  2.0|       5.0|            4.0|   



f-Beta measure for Logistic Regression:  0.754712023961489


                                                                                

In [14]:
rf = RandomForestClassifier(featuresCol="scaled_features", labelCol="is_delay")
rf_model = rf.fit(train_df)
rf_predictions = rf_model.transform(test_df)
rf_fbeta = evaluator.evaluate(rf_predictions)
print(f"f-Beta measure for Random Forest: ", rf_fbeta)



f-Beta measure for Random Forest:  0.7503046353317645


                                                                                

In [15]:
acc_evaluator = MulticlassClassificationEvaluator(labelCol='is_delay', metricName='accuracy')

In [16]:

print("Accuracy for Random Forest: ", acc_evaluator.evaluate(rf_predictions))
print("Accuracy for Logistic Regression: ", acc_evaluator.evaluate(lr_predictions))


                                                                                

Accuracy for Random Forest:  0.7901566022826774
Accuracy for Logistic Regression:  0.7927529113136226


                                                                                

# Grid Search

In [17]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [18]:
rf_param_grid = ParamGridBuilder().\
    addGrid(rf.numTrees, [10, 20, 30]).\
    addGrid(rf.maxDepth, [5, 10, 15]).\
    addGrid(rf.minInfoGain, [0.0, 0.1, 0.2]).\
    build()
    
lr_param_grid = ParamGridBuilder().\
    addGrid(lr.regParam, [0.0, 0.1, 0.2]).\
    addGrid(lr.elasticNetParam, [0.0, 0.1, 0.2]).\
    addGrid(lr.fitIntercept, [False, True]).\
    build()

In [19]:
rf_crossval = CrossValidator(estimator=rf, estimatorParamMaps=rf_param_grid, evaluator=evaluator, numFolds=3)
rf_cv_model = rf_crossval.fit(train_df)



rf_cv_predictions = rf_cv_model.transform(test_df)



print("Accuracy for Random Forest after cross-validation: ", acc_evaluator.evaluate(rf_cv_predictions))


24/07/03 11:42:25 WARN DAGScheduler: Broadcasting large task binary with size 1517.9 KiB
24/07/03 11:42:30 WARN DAGScheduler: Broadcasting large task binary with size 1517.9 KiB
24/07/03 11:42:30 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB
24/07/03 11:42:31 WARN DAGScheduler: Broadcasting large task binary with size 4.5 MiB
24/07/03 11:42:32 WARN DAGScheduler: Broadcasting large task binary with size 1118.7 KiB
24/07/03 11:42:32 WARN DAGScheduler: Broadcasting large task binary with size 7.6 MiB
24/07/03 11:42:33 WARN DAGScheduler: Broadcasting large task binary with size 1777.6 KiB
24/07/03 11:42:34 WARN DAGScheduler: Broadcasting large task binary with size 12.3 MiB
24/07/03 11:42:36 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB
24/07/03 11:42:38 WARN DAGScheduler: Broadcasting large task binary with size 19.1 MiB
24/07/03 11:42:40 WARN DAGScheduler: Broadcasting large task binary with size 3.7 MiB
24/07/03 11:42:41 WARN DAGScheduler: Bro

Accuracy for Random Forest after cross-validation:  0.7977533231840328


                                                                                

In [20]:
print(f"Best NumTrees: {rf_cv_model.bestModel.getNumTrees}")
print(f"Best MaxDepth: {rf_cv_model.bestModel.getMaxDepth()}")
print(f"Best MinInfoGain: {rf_cv_model.bestModel.getMinInfoGain()}")

Best NumTrees: 10
Best MaxDepth: 15
Best MinInfoGain: 0.0


In [21]:
lr_crossval = CrossValidator(
    estimator=lr, estimatorParamMaps=lr_param_grid, evaluator=evaluator, numFolds=3
)
lr_cv_model = lr_crossval.fit(train_df)

lr_cv_predictions = lr_cv_model.transform(test_df)
print(
    "Accuracy for Logistic Regression after cross-validation: ",
    acc_evaluator.evaluate(lr_cv_predictions),
)

24/07/03 11:52:35 ERROR LBFGS: Failure! Resetting history: breeze.optimize.FirstOrderException: Line search failed
                                                                                

Accuracy for Logistic Regression after cross-validation:  0.7927529113136226


                                                                                

In [22]:
print(f"Best regParam: {lr_cv_model.bestModel.getRegParam()}")
print(f"Best elasticNetParam: {lr_cv_model.bestModel.getElasticNetParam()}")
print(f"Fit Intercept: {lr_cv_model.bestModel.getFitIntercept()}")

Best regParam: 0.0
Best elasticNetParam: 0.0
Fit Intercept: True


In [23]:
print(f"Best f-Beta score for Random Forest {evaluator.evaluate(rf_cv_predictions)}")
print(f"Best f-Beta score for Logistic Regression {evaluator.evaluate(lr_cv_predictions)}")

24/07/03 11:52:52 WARN DAGScheduler: Broadcasting large task binary with size 9.5 MiB
                                                                                

Best f-Beta score for Random Forest 0.7636769140071639




Best f-Beta score for Logistic Regression 0.754712023961489


                                                                                

In [24]:
from pyspark.ml.classification import FMClassifier

In [25]:
fm = FMClassifier(featuresCol="scaled_features", labelCol="is_delay")

fm_param_grid = ParamGridBuilder().\
    addGrid(fm.stepSize, [0.1, 0.01, 0.001]).\
    addGrid(fm.maxIter, [10, 20, 30]).\
    addGrid(fm.regParam, [0.1, 0.01, 0.001]).\
    build()

In [26]:
fm_crossval = CrossValidator(estimator=fm, estimatorParamMaps=fm_param_grid, evaluator=evaluator, numFolds=3)
fm_cv_model = fm_crossval.fit(train_df)

fm_cv_predictions = fm_cv_model.transform(test_df)

                                                                                

In [27]:
print(f"Accuracy for Factorization Machine after cross-validation: {acc_evaluator.evaluate(fm_cv_predictions)}")
print(f"Best f-Beta score for Factorization Machine: {evaluator.evaluate(fm_cv_predictions)}")
print(f"Best stepSize: {fm_cv_model.bestModel.getStepSize()}")
print(f"Best maxIter: {fm_cv_model.bestModel.getMaxIter()}")
print(f"Best regParam: {fm_cv_model.bestModel.getRegParam()}")

                                                                                

Accuracy for Factorization Machine after cross-validation: 0.7901566022826774




Best f-Beta score for Factorization Machine: 0.7503046353317645
Best stepSize: 0.1
Best maxIter: 10
Best regParam: 0.01


                                                                                

In [28]:
spark.stop()