In [1]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName('Flight Delay Prediction') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/12/12 18:23:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
airports_df = (spark.read
               .format("csv")
               .option('header', 'true')
               .load("airports.csv"))

In [3]:
airports_df.describe()

                                                                                

DataFrame[summary: string, IATA_CODE: string, AIRPORT: string, CITY: string, STATE: string, COUNTRY: string, LATITUDE: string, LONGITUDE: string]

In [4]:
airlines_df = (spark.read
               .format("csv")
               .option('header', 'true')
               .load("airlines.csv"))

In [5]:
airlines_df.describe()

DataFrame[summary: string, IATA_CODE: string, AIRLINE: string]

In [6]:
flights_df = (spark.read
              .format("csv")
              .option('header', 'true')
              .load("flights.csv"))

In [7]:
flights_df.describe()

                                                                                

DataFrame[summary: string, YEAR: string, MONTH: string, DAY: string, DAY_OF_WEEK: string, AIRLINE: string, FLIGHT_NUMBER: string, TAIL_NUMBER: string, ORIGIN_AIRPORT: string, DESTINATION_AIRPORT: string, SCHEDULED_DEPARTURE: string, DEPARTURE_TIME: string, DEPARTURE_DELAY: string, TAXI_OUT: string, WHEELS_OFF: string, SCHEDULED_TIME: string, ELAPSED_TIME: string, AIR_TIME: string, DISTANCE: string, WHEELS_ON: string, TAXI_IN: string, SCHEDULED_ARRIVAL: string, ARRIVAL_TIME: string, ARRIVAL_DELAY: string, DIVERTED: string, CANCELLED: string, CANCELLATION_REASON: string, AIR_SYSTEM_DELAY: string, SECURITY_DELAY: string, AIRLINE_DELAY: string, LATE_AIRCRAFT_DELAY: string, WEATHER_DELAY: string]

In [8]:
flights_df = flights_df.drop(*['YEAR', 'FLIGHT_NUMBER', 'AIRLINE', 'DISTANCE', 'TAIL_NUMBER', 'TAXI_OUT',
                              'SCHEDULED_TIME', 'DEPARTURE_TIME', 'WHEELS_OFF', 'ELAPSED_TIME',
                              'AIR_TIME', 'WHEELS_ON', 'DAY_OF_WEEK', 'TAXI_IN', 'CANCELLATION_REASON'])

In [9]:
flights_df.head()

Row(MONTH='1', DAY='1', ORIGIN_AIRPORT='ANC', DESTINATION_AIRPORT='SEA', SCHEDULED_DEPARTURE='0005', DEPARTURE_DELAY='-11', SCHEDULED_ARRIVAL='0430', ARRIVAL_TIME='0408', ARRIVAL_DELAY='-22', DIVERTED='0', CANCELLED='0', AIR_SYSTEM_DELAY=None, SECURITY_DELAY=None, AIRLINE_DELAY=None, LATE_AIRCRAFT_DELAY=None, WEATHER_DELAY=None)

In [10]:
flights_df = flights_df.withColumn('AIR_SYSTEM_DELAY', flights_df['AIR_SYSTEM_DELAY'].cast('double'))
flights_df = flights_df.withColumn('SECURITY_DELAY', flights_df['SECURITY_DELAY'].cast('double'))
flights_df = flights_df.withColumn('AIRLINE_DELAY', flights_df['AIRLINE_DELAY'].cast('double'))
flights_df = flights_df.withColumn('LATE_AIRCRAFT_DELAY', flights_df['LATE_AIRCRAFT_DELAY'].cast('double'))
flights_df = flights_df.withColumn('WEATHER_DELAY', flights_df['WEATHER_DELAY'].cast('double'))


In [11]:
from pyspark.sql.functions import avg


def fill_with_mean(df, exclude=set()):
    stats = df.agg(*(
        avg(c).alias(c) for c in df.columns if c not in exclude
    ))
    return df.na.fill(stats.first().asDict())

fill_with_mean(flights_df, ["MONTH", "DAY", "ORIGIN_AIRPORT", "DESTINATION_AIRPORT", "SCHEDULED_DEPARTURE", "SCHEDULED_ARRIVAL", "ARRIVAL_TIME", "DIVERTED", "CANCELLED"])

                                                                                

DataFrame[MONTH: string, DAY: string, ORIGIN_AIRPORT: string, DESTINATION_AIRPORT: string, SCHEDULED_DEPARTURE: string, DEPARTURE_DELAY: string, SCHEDULED_ARRIVAL: string, ARRIVAL_TIME: string, ARRIVAL_DELAY: string, DIVERTED: string, CANCELLED: string, AIR_SYSTEM_DELAY: double, SECURITY_DELAY: double, AIRLINE_DELAY: double, LATE_AIRCRAFT_DELAY: double, WEATHER_DELAY: double]

In [12]:
from pyspark.sql.functions import udf

flights_15_delayed = udf(lambda row: 1 if int('0' if row is None else row) > 15 else 0)

flights_df = flights_df.withColumn('result', flights_15_delayed(flights_df['ARRIVAL_DELAY']))
flights_df.show()

[Stage 16:>                                                         (0 + 1) / 1]

+-----+---+--------------+-------------------+-------------------+---------------+-----------------+------------+-------------+--------+---------+----------------+--------------+-------------+-------------------+-------------+------+
|MONTH|DAY|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_DELAY|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|AIR_SYSTEM_DELAY|SECURITY_DELAY|AIRLINE_DELAY|LATE_AIRCRAFT_DELAY|WEATHER_DELAY|result|
+-----+---+--------------+-------------------+-------------------+---------------+-----------------+------------+-------------+--------+---------+----------------+--------------+-------------+-------------------+-------------+------+
|    1|  1|           ANC|                SEA|               0005|            -11|             0430|        0408|          -22|       0|        0|            null|          null|         null|               null|         null|     0|
|    1|  1|           LAX|                PBI|               001

                                                                                

In [13]:
flights_df = flights_df.drop(*['ORIGIN_AIRPORT','DESTINATION_AIRPORT','ARRIVAL_TIME','ARRIVAL_DELAY'])

In [14]:
flights_df

DataFrame[MONTH: string, DAY: string, SCHEDULED_DEPARTURE: string, DEPARTURE_DELAY: string, SCHEDULED_ARRIVAL: string, DIVERTED: string, CANCELLED: string, AIR_SYSTEM_DELAY: double, SECURITY_DELAY: double, AIRLINE_DELAY: double, LATE_AIRCRAFT_DELAY: double, WEATHER_DELAY: double, result: string]

In [19]:
from pyspark.mllib.feature import StandardScaler

sc = StandardScaler()
X = flights_df.drop(*['result'])
Y = flights_df['result']
X = sc.fit(X)

ModuleNotFoundError: No module named 'numpy'

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier
from sklearn.model_selection import train_test_split

X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.2, random_state=42)
clf = DecisionTreeClassifier()
clf.fit(X_train, Y_train)

In [None]:
clf.score(X_test, Y_test)

In [None]:
grid_param = {
    'criterion': ['gini', 'entropy'],
    'max_depth': range(30, 31, 1),
    'min_samples_leaf': range(35, 38, 1),
    'min_samples_split': range(35, 38, 1),
    'splitter': ['best', 'random']
}

In [None]:
from sklearn.model_selection import GridSearchCV

grid_search = GridSearchCV(estimator=clf,
                           param_grid=grid_param,
                           cv=5,
                           n_jobs=-1)

In [None]:
grid_search.fit(X_train, Y_train)

In [None]:
clf = DecisionTreeClassifier(criterion = 'entropy', max_depth=30, min_samples_leaf=35, min_samples_split=35, splitter='best')
clf.fit(X_train, Y_train)

In [None]:
clf.score(X_test, Y_test)

In [None]:
import pickle

filename = 'finalized_model.sav'
pickle.dump(clf, open(filename, 'wb'))

In [None]:
loaded_model = pickle.load(open(filename, 'rb'))
prediction = loaded_model.predict([[1,1,5,-11,430,0,0,13,0,18,22,3]])
print(prediction)