## About Data
- **Name** : Flight Status Prediction
- **Source** : Kaggle (https://www.kaggle.com/datasets/robikscube/flight-delay-dataset-20182022)

##### Description
- This dataset contains all flight information including cancellation and delays by airline for dates back to January 2018 to year 2022. For your convenience you can use the Combined_Flights_XXXX.csv or Combined_Flights_XXXX.parquet files to access the combined data for the entire year.

##### Columns
- The DataSet has total of 61 columns and over 25M records.




### Importing Required Libraries

In [0]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
from pyspark.sql.functions import *

### Creating SparkSession with appName "Data603_FinalProject"

In [0]:
spark = SparkSession.builder.appName('Data603_FinalProject').getOrCreate()

### Importing Data from DBFS (Here we are choosing .parquet files because size issues)

In [0]:
df = spark.read.parquet('/FileStore/tables/*.parquet')

### Total Number of Records

In [0]:
df.count()

Out[5]: 29193782

### DataFrame Overview

In [0]:
display(df)

### Schema of the DataFrame

In [0]:
df.printSchema()

root
 |-- FlightDate: timestamp (nullable = true)
 |-- Airline: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Cancelled: boolean (nullable = true)
 |-- Diverted: boolean (nullable = true)
 |-- CRSDepTime: long (nullable = true)
 |-- DepTime: double (nullable = true)
 |-- DepDelayMinutes: double (nullable = true)
 |-- DepDelay: double (nullable = true)
 |-- ArrTime: double (nullable = true)
 |-- ArrDelayMinutes: double (nullable = true)
 |-- AirTime: double (nullable = true)
 |-- CRSElapsedTime: double (nullable = true)
 |-- ActualElapsedTime: double (nullable = true)
 |-- Distance: double (nullable = true)
 |-- Year: long (nullable = true)
 |-- Quarter: long (nullable = true)
 |-- Month: long (nullable = true)
 |-- DayofMonth: long (nullable = true)
 |-- DayOfWeek: long (nullable = true)
 |-- Marketing_Airline_Network: string (nullable = true)
 |-- Operated_or_Branded_Code_Share_Partners: string (nullable = true)
 |-- DOT_ID_Mar

# Data Cleaning

#### Finding Null Values

In [0]:
def findMissing():
    from pyspark.sql.functions import col, isnan, when
    total_count = 0
    for column in df.columns:
        null_count = df[df[column].isNull()].count()
        total_count = total_count + null_count
        print(f"Number of null or NA values in column {column}: {null_count}")
    
    print(f'Total Null Values: {total_count}, Null Values Percentage: {total_count * 100/ df.count()}')

#### Count of Null Values by Column

In [0]:
findMissing()

Number of null or NA values in column FlightDate: 0
Number of null or NA values in column Airline: 0
Number of null or NA values in column Origin: 0
Number of null or NA values in column Dest: 0
Number of null or NA values in column Cancelled: 0
Number of null or NA values in column Diverted: 0
Number of null or NA values in column CRSDepTime: 0
Number of null or NA values in column DepTime: 761652
Number of null or NA values in column DepDelayMinutes: 763084
Number of null or NA values in column DepDelay: 763084
Number of null or NA values in column ArrTime: 786177
Number of null or NA values in column ArrDelayMinutes: 846183
Number of null or NA values in column AirTime: 852561
Number of null or NA values in column CRSElapsedTime: 22
Number of null or NA values in column ActualElapsedTime: 845637
Number of null or NA values in column Distance: 0
Number of null or NA values in column Year: 0
Number of null or NA values in column Quarter: 0
Number of null or NA values in column Month: 

###### Filling null values of DepDelayMinutes as DepDelay and DepDelayMinutes will be 0 if there is no Delay. 
###### Note: We considered DepTime as NotNull because if incase the flight has canceled then it may not have DepTime at all.

In [0]:
df = df.withColumn("DepDelayMinutes", when((col("DepDelay").isNull()) & (col("DepDelayMinutes").isNull()) & (col("DepTime").isNotNull()), 0).otherwise(col("DepDelayMinutes")))

df = df.withColumn("DepDelay", when((col("DepDelay").isNull()) & (col("DepDelayMinutes").isNull()) & (col("DepTime").isNotNull()), 0).otherwise(col("DepDelay")))

#### Null value count after replacing with 0

In [0]:
null_count_depdelay_minutes = df.where(col("DepDelayMinutes").isNull()).count()
print(f"Count of null values in 'DepDelayMinutes': {null_count_depdelay_minutes}")

null_count_depdelay = df.where(col("DepDelay").isNull()).count()
print(f"Count of null values in 'DepDelay': {null_count_depdelay}")


Count of null values in 'DepDelayMinutes': 761632
Count of null values in 'DepDelay': 763084


#### Filling null values if DepTime is Null which means the flight has canceled

In [0]:
df = df.withColumn("DepDelayMinutes", when((col("DepDelay").isNull()) & (col("DepDelayMinutes").isNull()) & (col("DepTime").isNull()), 0).otherwise(col("DepDelayMinutes")))

df = df.withColumn("DepDelay", when((col("DepDelay").isNull()) & (col("DepDelayMinutes") == 0) & (col("DepTime").isNull()), 0).otherwise(col("DepDelay")))

df = df.withColumn("DepTime", when((col("DepDelay") == 0) & (col("DepDelayMinutes") == 0) & (col("DepTime").isNull()), 0).otherwise(col("DepTime")))


null_count_depdelay_minutes = df.where(col("DepDelayMinutes").isNull()).count()
print(f"Count of null values in 'DepDelayMinutes': {null_count_depdelay_minutes}")

null_count_depdelay = df.where(col("DepDelay").isNull()).count()
print(f"Count of null values in 'DepDelay': {null_count_depdelay}")

null_count_deptime = df.where(col("DepTime").isNull()).count()
print(f"Count of null values in 'DepTime': {null_count_deptime}")

Count of null values in 'DepDelayMinutes': 0
Count of null values in 'DepDelay': 1452
Count of null values in 'DepTime': 0


#### Filling null values of DepDelay where CRSDepTime == DepTime. This means the flight is on time

In [0]:
df = df.withColumn("DepDelay", when(col("CRSDepTime") == col("DepTime"), 0).otherwise(col("DepDelay")))

# Featuring Engineering

#### Creating Delayed and EarlyDep using DepDelay

In [0]:
df = df.withColumn("Delayed", when((col("DepDelay") > 0) | (col("DepDelay") == 0), 0).otherwise(1))
df = df.withColumn("EarlyDep", when((col("DepDelay") < 0) | (col("DepDelay") == 0), 0).otherwise(1))

In [0]:
display(df)

#### Encoding Cancelled and Diverted using StringIndexer

In [0]:
df = df.withColumn("Cancelled", col("Cancelled").cast("string"))
df = df.withColumn("Diverted", col("Diverted").cast("string"))

In [0]:
from pyspark.ml.feature import StringIndexer

# Create a StringIndexer
indexer_C = StringIndexer(inputCol="Cancelled", outputCol="Cancelled_Flight")

# Fit and transform the DataFrame
df = indexer_C.fit(df).transform(df)

In [0]:
from pyspark.ml.feature import StringIndexer

# Create a StringIndexer
indexer_D = StringIndexer(inputCol="Diverted", outputCol="Diverted_Flight")

# Fit and transform the DataFrame
df = indexer_D.fit(df).transform(df)

In [0]:
display(df)

#### Dropping unwanted Columns

In [0]:
df = df.drop("Cancelled")
df = df.drop("Diverted")
df = df.drop("FlightDate")

In [0]:
df = df.drop("DepDelayMinutes")
df = df.drop("DepDelay")

In [0]:
df = df.drop("Operated_or_Branded_Code_Share_Partners")
df = df.drop("DOT_ID_Marketing_Airline")
df = df.drop("IATA_Code_Marketing_Airline")
df = df.drop("Flight_Number_Marketing_Airline")
df = df.drop("Operating_Airline")
df = df.drop("DOT_ID_Operating_Airline")
df = df.drop("IATA_Code_Operating_Airline")
df = df.drop("Tail_Number")
df = df.drop("Flight_Number_Operating_Airline")
df = df.drop("OriginAirportID")
df = df.drop("OriginAirportSeqID")
df = df.drop("OriginCityMarketID")
df = df.drop("OriginStateFips")
df = df.drop("OriginWac")
df = df.drop("DestAirportID")
df = df.drop("DestAirportSeqID")
df = df.drop("DestCityMarketID")
df = df.drop("DestStateFips")
df = df.drop("DestWac")
df = df.drop("DepTimeBlk")
df = df.drop("ArrTimeBlk")

In [0]:
findMissing()

Number of null or NA values in column Airline: 0
Number of null or NA values in column Origin: 0
Number of null or NA values in column Dest: 0
Number of null or NA values in column CRSDepTime: 0
Number of null or NA values in column DepTime: 0
Number of null or NA values in column ArrTime: 786177
Number of null or NA values in column ArrDelayMinutes: 846183
Number of null or NA values in column AirTime: 852561
Number of null or NA values in column CRSElapsedTime: 22
Number of null or NA values in column ActualElapsedTime: 845637
Number of null or NA values in column Distance: 0
Number of null or NA values in column Year: 0
Number of null or NA values in column Quarter: 0
Number of null or NA values in column Month: 0
Number of null or NA values in column DayofMonth: 0
Number of null or NA values in column DayOfWeek: 0
Number of null or NA values in column Marketing_Airline_Network: 0
Number of null or NA values in column OriginCityName: 0
Number of null or NA values in column OriginSta

#### Creating 'AirTraffic' column based on total of flights that are scheduled on the same hour, day, month, year.

In [0]:
df = df.withColumn("DepTime", col("DepTime").cast('string'))

df = df.withColumn("hour", when(length(col("DepTime")) == 6, col("DepTime").substr(1, 2)).otherwise(col("DepTime").substr(1, 1)))

In [0]:
df = df.drop("DepTime")

In [0]:
from pyspark.sql.window import Window

windowSpec = Window.partitionBy("Origin", "hour", "DayofMonth", "Month", "Year")

df = df.withColumn("AirTraffic", count("*").over(windowSpec))

In [0]:
df = df.drop("DivAirportLandings")

#### Displaying Final DataFrame

In [0]:
display(df)

# Exploratory Data Analysis

#### Number of flights per year

In [0]:
#Number of flights per year
from pyspark.sql.functions import col, isnan, when,count,desc,expr,substring,length,lpad
flights_per_year = df.groupBy("year").agg(count("year").alias("num_flights")).toPandas()
flights_per_year

Unnamed: 0,year,num_flights
0,2018,5689512
1,2019,8091684
2,2020,5022397
3,2021,6311871
4,2022,4078318


In [0]:
#Plotting graph for yearly flights
import matplotlib.pyplot as plt
import seaborn as sns

plt.figure(figsize=(15,10))
sns.barplot(x = 'year', y = 'num_flights', data =flights_per_year)
plt.title("Flights on yearly basis", size = 20)

#### Number of flights per month

In [0]:
#Number of flights per month
flights_per_month = df.groupBy("Month").agg(count("month").alias("num_flights_monthly")).toPandas()
flights_per_month

Unnamed: 0,Month,num_flights_monthly
0,7,2690257
1,6,2425282
2,9,2211536
3,5,2359776
4,1,2700014
5,10,2332945
6,3,2794295
7,12,2307306
8,8,2231032
9,11,2260337


In [0]:
#Plotting graph for monthly flights
plt.figure(figsize=(15,10))
sns.barplot(x = 'Month', y = 'num_flights_monthly', data =flights_per_month)
plt.title("Flights on monthly basis", size = 20)

#### Nmuber of flights per quater of a year

In [0]:
#Nmuber of flights per quater of a year
flights_per_quarter = df.groupBy("Quarter").agg(count("Quarter").alias("num_flights_quarterly")).toPandas()
flights_per_quarter

Unnamed: 0,Quarter,num_flights_quarterly
0,1,7838605
1,3,7132825
2,2,7321764
3,4,6900588


In [0]:
#Plotting graph for Quaterly flights
plt.figure(figsize=(15,10))
sns.barplot(x = 'Quarter', y = 'num_flights_quarterly', data =flights_per_quarter)
plt.title("Flights on Quarterly basis", size = 20)

#### Nmuber of Flights in a Day of a Week

In [0]:
df_day = df.groupBy('DayOfWeek').count().sort(desc("count")).toPandas()
df_day

Unnamed: 0,DayOfWeek,count
0,1,4356643
1,5,4353468
2,4,4332718
3,7,4253915
4,3,4124239
5,2,4050008
6,6,3722791


#### Busiest Day of the Week

In [0]:
plt.figure(figsize=(10,6))
sns.barplot(x = 'DayOfWeek',y = 'count',data = df_day,color= "#04D8B2")
plt.title("Busiest Day of the Week", size = 20)

#### Fight Count Based on Origin and Destination

In [0]:
df_o = df.groupBy('Origin').count().sort(desc("count")).toPandas()
(df_o)

Unnamed: 0,Origin,count
0,ORD,1375187
1,ATL,1371796
2,DEN,1170585
3,DFW,1104266
4,CLT,907841
...,...,...
383,CDB,262
384,BFM,205
385,ILG,165
386,ROP,151


In [0]:
df_d = df.groupBy('Dest').count().sort(desc("count")).toPandas()
(df_d)

Unnamed: 0,Dest,count
0,ORD,1375012
1,ATL,1371789
2,DEN,1170597
3,DFW,1104182
4,CLT,907769
...,...,...
383,CDB,262
384,BFM,204
385,ILG,165
386,ROP,151


#### No of Flights that are canceled

In [0]:
#Number of flights cancelled
df_c = df.groupBy('Cancelled_Flight').count().sort(desc("count")).toPandas()
(df_c)


Unnamed: 0,Cancelled_Flight,count
0,0.0,28416515
1,1.0,777267


#### Scatter plot between Canceled Flights and Air Traffic

In [0]:
df_pd = df.select('Cancelled_Flight', 'AirTraffic').toPandas()

# Create a scatter plot
plt.figure(figsize=(10, 6))
plt.scatter(df_pd['AirTraffic'], df_pd['Cancelled_Flight'], alpha=0.5)
plt.title('Canceled Flights vs Air Traffic')
plt.xlabel('Air Traffic')
plt.ylabel('Canceled Flights')
plt.grid(True)
plt.show()

#### Heatmap corealtion between DataFrame to determine the dependencies between features

In [0]:
import seaborn as sns
import matplotlib.pyplot as plt
from pyspark.sql.functions import col

numeric_cols = [col_name for col_name, col_type in df.dtypes if col_type in ['double', 'bigint', 'int']]

# Select only numeric columns
numeric = df.select(*numeric_cols).sample(False, 0.1).toPandas()

correlationMatrix = numeric.corr()

plt.figure(figsize=(20, 10))
sns.heatmap(correlationMatrix, annot=True, cmap='coolwarm', fmt='.2f', linewidths=.5)
plt.show()


#### To predict cancellation we are going to take AirTraffic as target value.

# Machine Learning Models - Predecting Flight Cancelation

#### Logistic Regression

In [0]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

(train_data, test_data) = df.randomSplit([0.8, 0.2], seed=123)

assembler = VectorAssembler(inputCols=["AirTraffic"], outputCol="features")

lr = LogisticRegression(labelCol="Cancelled_Flight", featuresCol="features")

pipeline_lr = Pipeline(stages=[assembler, lr])


In [0]:
model_lr = pipeline_lr.fit(train_data)

predictions_lr_train = model_lr.transform(train_data)
predictions_lr_test = model_lr.transform(test_data)

#### Logistic Regression Evaluation

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="Cancelled_Flight", predictionCol="prediction", metricName="accuracy")
train_accuracy = evaluator.evaluate(predictions_lr_train)
test_accuracy = evaluator.evaluate(predictions_lr_test)

print("Train Accuracy:", train_accuracy)
print("Test Accuracy:", test_accuracy)

# Calculate F1-score
evaluator_f1 = MulticlassClassificationEvaluator(labelCol="Cancelled_Flight", predictionCol="prediction", metricName="f1")
train_f1_score = evaluator_f1.evaluate(predictions_lr_train)
test_f1_score = evaluator_f1.evaluate(predictions_lr_test)

print("Train F1-score:", train_f1_score)
print("Test F1-score:", test_f1_score)


Train Accuracy: 0.977339506314514
Test Accuracy: 0.9773839436736447
Train F1-score: 0.9690792004825959
Test F1-score: 0.9691531008715702


## Random Forest Classifier

In [0]:
rf = RandomForestClassifier(labelCol="Cancelled_Flight", featuresCol="features")

pipeline_rf = Pipeline(stages=[assembler, rf])

model_rf = pipeline_rf.fit(train_data)

predictions_rf_train = model_rf.transform(train_data)
predictions_rf_test = model_rf.transform(test_data)

#### Evaluation of Random Forest Classifier

In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Evaluate Random Forest model on train data
evaluator_rf_train = MulticlassClassificationEvaluator(labelCol="Cancelled_Flight", metricName="accuracy")
accuracy_rf_train = evaluator_rf_train.evaluate(predictions_rf_train)

evaluator_f1_train = MulticlassClassificationEvaluator(labelCol="Cancelled_Flight", metricName="f1")
f1_score_rf_train = evaluator_f1_train.evaluate(predictions_rf_train)

print("Random Forest - Train Data:")
print("Accuracy:", accuracy_rf_train)
print("F1 Score:", f1_score_rf_train)

# Evaluate Random Forest model on test data
evaluator_rf_test = MulticlassClassificationEvaluator(labelCol="Cancelled_Flight", metricName="accuracy")
accuracy_rf_test = evaluator_rf_test.evaluate(predictions_rf_test)

evaluator_f1_test = MulticlassClassificationEvaluator(labelCol="Cancelled_Flight", metricName="f1")
f1_score_rf_test = evaluator_f1_test.evaluate(predictions_rf_test)

print("\nRandom Forest - Test Data:")
print("Accuracy:", accuracy_rf_test)
print("F1 Score:", f1_score_rf_test)




## UnSupervised Learing K-Means Clustering

In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans

# Assemble features into a feature vector
assembler = VectorAssembler(inputCols=["AirTraffic"], outputCol="features")
df_assembled = assembler.transform(df)

# Initialize KMeans model
kmeans = KMeans(k=2, seed=1)  # Assuming 2 clusters

# Fit KMeans model
kmeans_model = kmeans.fit(df_assembled)

# Predict clusters
kmeans_predictions = kmeans_model.transform(df_assembled)
kmeans_predictions.show()



### plot

In [0]:
import matplotlib.pyplot as plt

# Extracting the cluster centers
centers = kmeans_model.clusterCenters()

# Extracting the data
data = df_assembled.select("AirTraffic").rdd.map(lambda x: x[0]).collect()

# Plotting the data points
plt.scatter(data, [0] * len(data), c=kmeans_predictions.select("prediction").rdd.map(lambda x: x[0]).collect(), cmap='viridis')

# Plotting the cluster centers
plt.scatter(centers, [0]*len(centers), c='red', marker='x', label='Cluster Centers')

plt.xlabel('AirTraffic')
plt.title('KMeans Clustering')
plt.legend()
plt.show()




In [0]:
from pyspark.ml.evaluation import ClusteringEvaluator

# Compute silhouette score
evaluator = ClusteringEvaluator()
silhouette_avg = evaluator.evaluate(kmeans_predictions)
print("Silhouette Score: ", silhouette_avg)



## Based on the results from different machine learning models Random Forest Classifier has performed more accurately with accuracy of 0.97 and F1 score of 0.97 for both Train and Test datasets.