In [16]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import when

from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler

In [17]:
spark = SparkSession.builder\
                    .appName("FlightDelayPrediction")\
                    .getOrCreate()

In [18]:
# Loading dataset
df = spark.read.csv("PATH_TO_FLIGHT_DATASET", header=True, inferSchema=True)
df.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Quarter: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- FlightDate: date (nullable = true)
 |-- Marketing_Airline_Network: string (nullable = true)
 |-- Operated_or_Branded_Code_Share_Partners: string (nullable = true)
 |-- DOT_ID_Marketing_Airline: integer (nullable = true)
 |-- IATA_Code_Marketing_Airline: string (nullable = true)
 |-- Flight_Number_Marketing_Airline: integer (nullable = true)
 |-- Originally_Scheduled_Code_Share_Airline: string (nullable = true)
 |-- DOT_ID_Originally_Scheduled_Code_Share_Airline: double (nullable = true)
 |-- IATA_Code_Originally_Scheduled_Code_Share_Airline: string (nullable = true)
 |-- Flight_Num_Originally_Scheduled_Code_Share_Airline: double (nullable = true)
 |-- Operating_Airline : string (nullable = true)
 |-- DOT_ID_Operating_Airline: integer (nullable = true)
 |-- IATA_Code_Operating_Air

In [19]:
# Required columns for the prediction
requiredColumns = [
    "DayofMonth", "Month", "DayOfWeek", # Flight date details
    "Operating_Airline ", # Airline details
    "Origin", "Dest", # Airport details
    "CRSArrTime", "ArrDel15", "CRSDepTime", "DepDel15", # Time details
]

# Dropping rows with missing values in specific columns
df = df.dropna(subset=requiredColumns)

# Keeping necessary columns and dropping others
df = df[requiredColumns]

# Renaming columns
df = df.withColumnRenamed("Operating_Airline ","OperatingAirline")
df = df.withColumnRenamed("CRSDepTime","ScheduledDepTime")
df = df.withColumnRenamed("CRSArrTime","ScheduledArrTime")

# Creating a column that states whether the flight is delayed
df = df.withColumn("IsDelayed", when((df["ArrDel15"] == "1.0") | (df["DepDel15"] == "1.0"), 1.0).otherwise(0.0))

In [20]:
# Encode categorical variables
categoricalCols = ["OperatingAirline", "Origin", "Dest"]
indexers = [StringIndexer(inputCol=col, outputCol=col+'Index').fit(df)
            for col in categoricalCols]
for indexer in indexers:
    df = indexer.transform(df)

# Select features and target variable
featureCols = ['OperatingAirlineIndex', 'OriginIndex', 'DestIndex']

assembler = VectorAssembler(inputCols=featureCols, outputCol="features")
df = assembler.transform(df)

df.printSchema()

root
 |-- DayofMonth: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- OperatingAirline: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- ScheduledArrTime: integer (nullable = true)
 |-- ArrDel15: double (nullable = true)
 |-- ScheduledDepTime: integer (nullable = true)
 |-- DepDel15: double (nullable = true)
 |-- IsDelayed: double (nullable = false)
 |-- OperatingAirlineIndex: double (nullable = false)
 |-- OriginIndex: double (nullable = false)
 |-- DestIndex: double (nullable = false)
 |-- features: vector (nullable = true)



In [21]:
model_df = df.select(["features", "isDelayed"])
model_df.show(truncate=False)

+-------------+---------+
|features     |isDelayed|
+-------------+---------+
|[7.0,6.0,4.0]|1.0      |
|[7.0,6.0,4.0]|0.0      |
|[7.0,6.0,4.0]|0.0      |
|[7.0,6.0,4.0]|0.0      |
|[7.0,6.0,4.0]|1.0      |
|[7.0,6.0,4.0]|0.0      |
|[7.0,6.0,4.0]|0.0      |
|[7.0,6.0,4.0]|0.0      |
|[7.0,6.0,4.0]|1.0      |
|[7.0,6.0,4.0]|1.0      |
|[7.0,6.0,4.0]|1.0      |
|[7.0,6.0,4.0]|1.0      |
|[7.0,6.0,4.0]|1.0      |
|[7.0,6.0,4.0]|1.0      |
|[7.0,6.0,4.0]|1.0      |
|[7.0,6.0,4.0]|1.0      |
|[7.0,6.0,4.0]|1.0      |
|[7.0,6.0,4.0]|0.0      |
|[7.0,6.0,4.0]|0.0      |
|[7.0,6.0,4.0]|1.0      |
+-------------+---------+
only showing top 20 rows



In [22]:
# Split the dataset into training and testing sets
training_df, testing_df = model_df.randomSplit([0.7, 0.3])

print(training_df.count())
print(testing_df.count())

5017410
2149634


In [23]:
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


In [27]:
# Initialize GBTClassifier with increased maxBins
gbt = GBTClassifier(labelCol="isDelayed", featuresCol="features", maxIter=10, maxBins=700)
gbt_model = gbt.fit(training_df)


# Fit the model
train_model = gbt_model.transform(training_df)
test_model = gbt_model.transform(testing_df)

# Make predictions
predictions = gbt_model.transform(testing_df)

# Evaluate model
evaluator = MulticlassClassificationEvaluator(labelCol="isDelayed", predictionCol="prediction", metricName="accuracy")
test_accuracy = evaluator.evaluate(predictions)
print("Accuracy:", test_accuracy)


Accuracy: 0.7550294608291458
