In [198]:
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, FloatType
from pyspark.sql.functions import *
spark = (SparkSession.builder
         .appName("Realtime Flight")
         # .master("spark://127.0.0.1:7077")
         .master("local")
         # .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0")
         .config('spark.jars', 'D:\\Coding\\FlightDelaysPredict\\postgresql-42.7.1.jar')  # add postgresql driver
         .config("spark.sql.adaptive.enabled", "false")
         .getOrCreate())

In [199]:
spark.sparkContext.setLogLevel("WARN")

In [200]:
flight_schema = StructType([
    StructField("Year", IntegerType(), True),
    StructField("Month", IntegerType(), True),
    StructField("DayofMonth", IntegerType(), True),
    StructField("DayOfWeek", IntegerType(), True),
    StructField("DepTime", FloatType(), True),
    StructField("CRSDepTime", IntegerType(), True),
    StructField("ArrTime", FloatType(), True),
    StructField("CRSArrTime", IntegerType(), True),
    StructField("UniqueCarrier", StringType(), True),
    StructField("FlightNum", IntegerType(), True),
    StructField("TailNum", StringType(), True),
    StructField("ActualElapsedTime", FloatType(), True),
    StructField("CRSElapsedTime", FloatType(), True),
    StructField("AirTime", FloatType(), True),
    StructField("ArrDelay", FloatType(), True),
    StructField("DepDelay", FloatType(), True),
    StructField("Origin", StringType(), True),
    StructField("Dest", StringType(), True),
    StructField("Distance", IntegerType(), True),
    StructField("TaxiIn", FloatType(), True),
    StructField("TaxiOut", FloatType(), True),
    StructField("Cancelled", IntegerType(), True),
    StructField("CancellationCode", StringType(), True),
    StructField("Diverted", IntegerType(), True),
    StructField("CarrierDelay", FloatType(), True),
    StructField("WeatherDelay", FloatType(), True),
    StructField("NASDelay", FloatType(), True),
    StructField("SecurityDelay", FloatType(), True),
    StructField("LateAircraftDelay", FloatType(), True)
])

In [201]:
# Read from PostgreSQL
df = spark.read.format("jdbc").option("url", "jdbc:postgresql://mthanh.ddns.net:5432/flight").option("dbtable", "delayedflight").option("user", "postgres").option("password", "postgres").option("driver", "org.postgresql.Driver").load()
df.show(5)

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|year|month|dayofmonth|dayofweek|deptime|crsdeptime|arrtime|crsarrtime|uniquecarrier|flightnum|tailnum|actualelapsedtime|crselapsedtime|airtime|arrdelay|depdelay|origin|dest|distance|taxiin|taxiout|cancelled|cancellationcode|diverted|carrierdelay|weatherdelay|nasdelay|securitydelay|lateaircraftdelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|2008|    1|         3|        4| 2003.0|      1955| 2211.0|      2225|           WN|      335

In [202]:
print(df.count())
print(df.dtypes)
print(df.head())

1116
[('year', 'int'), ('month', 'int'), ('dayofmonth', 'int'), ('dayofweek', 'int'), ('deptime', 'double'), ('crsdeptime', 'int'), ('arrtime', 'double'), ('crsarrtime', 'int'), ('uniquecarrier', 'string'), ('flightnum', 'int'), ('tailnum', 'string'), ('actualelapsedtime', 'double'), ('crselapsedtime', 'double'), ('airtime', 'double'), ('arrdelay', 'double'), ('depdelay', 'double'), ('origin', 'string'), ('dest', 'string'), ('distance', 'int'), ('taxiin', 'double'), ('taxiout', 'double'), ('cancelled', 'int'), ('cancellationcode', 'string'), ('diverted', 'int'), ('carrierdelay', 'double'), ('weatherdelay', 'double'), ('nasdelay', 'double'), ('securitydelay', 'double'), ('lateaircraftdelay', 'double')]
Row(year=2008, month=1, dayofmonth=3, dayofweek=4, deptime=2003.0, crsdeptime=1955, arrtime=2211.0, crsarrtime=2225, uniquecarrier='WN', flightnum=335, tailnum='N712SW', actualelapsedtime=128.0, crselapsedtime=150.0, airtime=116.0, arrdelay=-14.0, depdelay=8.0, origin='IAD', dest='TPA', d

In [203]:
# add a new column "status" to the dataframe:
# if arrdelay < -15 or depdelay < -15 status = 1 (delayed),
# if divert = 1 or cancelled = 1 status = 2 (diverted or cancelled),
# else status = 0 (on time)
df = df.withColumn("status", when((df.diverted == 1) | (df.cancelled == 1), 3).when((df.arrdelay <= -60) | (df.depdelay <= -60), 2).when((df.arrdelay <= -15) | (df.depdelay <= -15), 1).otherwise(0))

# drop columns just leave the columns: Year, Month, DayofMonth, CRSDepTime, CRSArrTime, UniqueCarrier, FlightNum, ArrDelay, DepDelay, Origin, Dest, Distance
df = df.drop("DayOfWeek", "DepTime", "ArrTime", "TailNum", "ActualElapsedTime", "CRSElapsedTime", "AirTime", "TaxiIn", "TaxiOut", "Cancelled", "CancellationCode", "Diverted", "CarrierDelay", "WeatherDelay", "NASDelay", "SecurityDelay", "LateAircraftDelay", "FlightNum", "arrdelay", "depdelay")
df = df.dropna()
df.show()
df.count()

+----+-----+----------+----------+----------+-------------+------+----+--------+------+
|year|month|dayofmonth|crsdeptime|crsarrtime|uniquecarrier|origin|dest|distance|status|
+----+-----+----------+----------+----------+-------------+------+----+--------+------+
|2008|    1|         3|      1955|      2225|           WN|   IAD| TPA|     810|     0|
|2008|    1|         3|       735|      1000|           WN|   IAD| TPA|     810|     0|
|2008|    1|         3|       620|       750|           WN|   IND| BWI|     515|     0|
|2008|    1|         3|      1755|      1925|           WN|   IND| BWI|     515|     0|
|2008|    1|         3|      1915|      2110|           WN|   IND| JAX|     688|     0|
|2008|    1|         3|      1830|      1940|           WN|   IND| LAS|    1591|     0|
|2008|    1|         3|       700|       915|           WN|   IND| MCO|     828|     0|
|2008|    1|         3|      1510|      1725|           WN|   IND| MCO|     828|     0|
|2008|    1|         3|      102

1116

In [204]:
# count missing values
column_name = df.columns
# missing_values_counts = [df.where(col(name).isNull()).count() for name in column_name]
# create a new dataframe to store the missing values counts
spark.createDataFrame([(name, df.where(col(name).isNull()).count()) for name in column_name], ["Column", "Missing Values"]).show()

+-------------+--------------+
|       Column|Missing Values|
+-------------+--------------+
|         year|             0|
|        month|             0|
|   dayofmonth|             0|
|   crsdeptime|             0|
|   crsarrtime|             0|
|uniquecarrier|             0|
|       origin|             0|
|         dest|             0|
|     distance|             0|
|       status|             0|
+-------------+--------------+



In [205]:
# index uniquecarrier, origin, dest
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="uniquecarrier", outputCol="uniquecarrier_index").fit(df).transform(df)
indexer = StringIndexer(inputCol="origin", outputCol="origin_index").fit(indexer).transform(indexer)
indexer = StringIndexer(inputCol="dest", outputCol="dest_index").fit(indexer).transform(indexer)
indexer.show(5)

+----+-----+----------+----------+----------+-------------+------+----+--------+------+-------------------+------------+----------+
|year|month|dayofmonth|crsdeptime|crsarrtime|uniquecarrier|origin|dest|distance|status|uniquecarrier_index|origin_index|dest_index|
+----+-----+----------+----------+----------+-------------+------+----+--------+------+-------------------+------------+----------+
|2008|    1|         3|      1955|      2225|           WN|   IAD| TPA|     810|     0|                0.0|        23.0|      14.0|
|2008|    1|         3|       735|      1000|           WN|   IAD| TPA|     810|     0|                0.0|        23.0|      14.0|
|2008|    1|         3|       620|       750|           WN|   IND| BWI|     515|     0|                0.0|        16.0|       4.0|
|2008|    1|         3|      1755|      1925|           WN|   IND| BWI|     515|     0|                0.0|        16.0|       4.0|
|2008|    1|         3|      1915|      2110|           WN|   IND| JAX|     

In [206]:

# aassembling the features
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["year", "month", "dayofmonth", "crsdeptime", "crsarrtime", "uniquecarrier_index", "origin_index", "dest_index", "distance"], outputCol="features")

# transform the dataframe
flight_assembled = assembler.transform(indexer)
flight_assembled.select("features","status").show(5, truncate=False)
flight_assembled.count()

+--------------------------------------------------+------+
|features                                          |status|
+--------------------------------------------------+------+
|[2008.0,1.0,3.0,1955.0,2225.0,0.0,23.0,14.0,810.0]|0     |
|[2008.0,1.0,3.0,735.0,1000.0,0.0,23.0,14.0,810.0] |0     |
|[2008.0,1.0,3.0,620.0,750.0,0.0,16.0,4.0,515.0]   |0     |
|[2008.0,1.0,3.0,1755.0,1925.0,0.0,16.0,4.0,515.0] |0     |
|[2008.0,1.0,3.0,1915.0,2110.0,0.0,16.0,54.0,688.0]|0     |
+--------------------------------------------------+------+
only showing top 5 rows



1116

First classification model:
Decision Trees: offers inherit simplicity and explanablility

In [207]:
flight_train, flight_test = flight_assembled.randomSplit([0.8, 0.2], seed=42)
# check the distribution of the status in the training and testing set
train_ratio = flight_train.count()/ flight_assembled.count()
print(f"Test set is {train_ratio*100:.2f}% of the training set")

Test set is 83.06% of the training set


In [208]:
# create a decision tree model
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol="status", featuresCol="features", maxIter=10, regParam=0.3, elasticNetParam=0.8) # create a logistic regression model with the label column is "status" and the features column is "features" with 10 iterations and regularization parameter is 0.3 and elastic net parameter is 0.8
lr_model = lr.fit(flight_train) # fit the model with the training set

# predict the testing set
lr_prediction = lr_model.transform(flight_assembled)
# lr_prediction.select("status", "prediction").show(5)
# count the number of correct predictions
correct_prediction = lr_prediction.filter(lr_prediction.status == lr_prediction.prediction).count()
total_data = lr_prediction.count()
print(f"Correct prediction: {correct_prediction} out of {total_data}")

Correct prediction: 1104 out of 1116


In [209]:
lr_prediction.filter(lr_prediction.status != lr_prediction.prediction).show()
# show lr_prediction where status is not equal to prediction and status is not equal to 0
lr_prediction.filter((lr_prediction.status == lr_prediction.prediction) & (lr_prediction.status != 0)).show()

+----+-----+----------+----------+----------+-------------+------+----+--------+------+-------------------+------------+----------+--------------------+--------------------+--------------------+----------+
|year|month|dayofmonth|crsdeptime|crsarrtime|uniquecarrier|origin|dest|distance|status|uniquecarrier_index|origin_index|dest_index|            features|       rawPrediction|         probability|prediction|
+----+-----+----------+----------+----------+-------------+------+----+--------+------+-------------------+------------+----------+--------------------+--------------------+--------------------+----------+
|2008|    1|         3|       745|       955|           WN|   IND| PHX|    1489|     1|                0.0|        16.0|       1.0|[2008.0,1.0,3.0,7...|[4.62497281328427...|[0.99029126213592...|       0.0|
|2008|    1|         3|       720|      1020|           WN|   ISP| PBI|    1052|     1|                0.0|        11.0|      52.0|[2008.0,1.0,3.0,7...|[4.62497281328427...|[0.

In [210]:
flight_test.filter(flight_train.status == 1).show()

+----+-----+----------+----------+----------+-------------+------+----+--------+------+-------------------+------------+----------+--------------------+
|year|month|dayofmonth|crsdeptime|crsarrtime|uniquecarrier|origin|dest|distance|status|uniquecarrier_index|origin_index|dest_index|            features|
+----+-----+----------+----------+----------+-------------+------+----+--------+------+-------------------+------------+----------+--------------------+
|2008|    1|         3|      1310|      1440|           WN|   PHL| BNA|     675|     1|                0.0|         7.0|      15.0|[2008.0,1.0,3.0,1...|
|2008|    1|         3|      1620|      1745|           WN|   PHL| MDW|     668|     1|                0.0|         7.0|       2.0|[2008.0,1.0,3.0,1...|
|2008|    1|         3|      1700|      2230|           WN|   LAS| MDW|    1521|     1|                0.0|         1.0|       2.0|[2008.0,1.0,3.0,1...|
+----+-----+----------+----------+----------+-------------+------+----+--------+--

In [213]:
# save the model, check if models folder exists or not. If not, create it. If yes, delete the folder and create a new one
import shutil
if os.path.exists("D:\\Coding\\FlightDelaysPredict\\models\\logistic_regression_model"):
    shutil.rmtree("D:\\Coding\\FlightDelaysPredict\\models\\logistic_regression_model")
lr_model.save("D:\\Coding\\FlightDelaysPredict\\models\\logistic_regression_model")