<a href="https://colab.research.google.com/github/PharahMain/Flight-Delay-Predictor/blob/master/flightstats_client.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.osuosl.org/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz

In [3]:
# Install spark-related depdencies for Python
!pip install -q findspark
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/87/21/f05c186f4ddb01d15d0ddc36ef4b7e3cedbeb6412274a41f26b55a650ee5/pyspark-2.4.4.tar.gz (215.7MB)
[K     |████████████████████████████████| 215.7MB 62kB/s 
[?25hCollecting py4j==0.10.7
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 56.7MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-2.4.4-py2.py3-none-any.whl size=216130387 sha256=74b47eb209cb1b717a59a95e68e791812f4eb2d0f565b21711b8294bfbf4a724
  Stored in directory: /root/.cache/pip/wheels/ab/09/4d/0d184230058e654eb1b04467dbc1292f00eaa186544604b471
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4.4


In [0]:
# Set up required environment variables

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

In [33]:
# Point Colaboratory to Google Drive

from google.colab import drive
drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [0]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.sql.functions import isnan, when, count, col
from pyspark.sql.functions import month, dayofweek, weekofyear, hour, minute
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.types import DateType, TimestampType

from datetime import datetime, date, timedelta
import time

In [0]:
#ARR_JSON_PATH = f"/content/gdrive/My Drive/Colab Datasets/flights_data/arrivals/{date.today()}.json"
# for testing
ARR_JSON_PATH = "/content/gdrive/My Drive/Colab Datasets/flights_data/arrivals/2020-01-17.json"

#DEP_JSON_PATH = f"/content/gdrive/My Drive/Colab Datasets/flights_data/departures/{date.today()}.json"
DEP_JSON_PATH = f"/content/gdrive/My Drive/Colab Datasets/flights_data/departures/2020-01-17.json"

#WEATH_JSON_PATH = f"/content/gdrive/My Drive/Colab Datasets/flights_data/weather/{date.today()}.json"
WEATH_JSON_PATH = f"/content/gdrive/My Drive/Colab Datasets/flights_data/weather/2020-01-17.json"

APP_NAME = 'Flight Delay Predictor'
SPARK_URL = 'local[*]'
RANDOM_SEED = 263646
TRAINING_DATA_RATIO = 0.7

In [0]:
spark = SparkSession.builder.appName(APP_NAME).master(SPARK_URL).getOrCreate()

In [82]:
df_arr = spark.read.json(ARR_JSON_PATH)
df_arr.show(5)

+--------------------+--------------------+--------------------+-------------+--------------------+--------------------+----------------------+--------------------+---------------------+--------------------+------------------+----------+------------+--------------------+--------------------+------+
|    airportResources|arrivalAirportFsCode|         arrivalDate|carrierFsCode|          codeshares|              delays|departureAirportFsCode|       departureDate|divertedAirportFsCode|     flightDurations|   flightEquipment|  flightId|flightNumber|    operationalTimes|            schedule|status|
+--------------------+--------------------+--------------------+-------------+--------------------+--------------------+----------------------+--------------------+---------------------+--------------------+------------------+----------+------------+--------------------+--------------------+------+
|      [, TBIT,, C9,]|                 LAX|[2020-01-17T13:10...|           QR|     [[3515, UL, L]]| 

The data in a little more detail.

In [83]:
df_arr.head(5)

[Row(airportResources=Row(arrivalGate=None, arrivalTerminal='TBIT', baggage=None, departureGate='C9', departureTerminal=None), arrivalAirportFsCode='LAX', arrivalDate=Row(dateLocal='2020-01-17T13:10:00.000', dateUtc='2020-01-17T21:10:00.000Z'), carrierFsCode='QR', codeshares=[Row(flightNumber='3515', fsCode='UL', relationship='L')], delays=Row(arrivalGateDelayMinutes=None, arrivalRunwayDelayMinutes=13, departureGateDelayMinutes=52, departureRunwayDelayMinutes=60), departureAirportFsCode='DOH', departureDate=Row(dateLocal='2020-01-17T07:45:00.000', dateUtc='2020-01-17T04:45:00.000Z'), divertedAirportFsCode=None, flightDurations=Row(blockMinutes=None, scheduledAirMinutes=1021, scheduledBlockMinutes=985, scheduledTaxiInMinutes=None, scheduledTaxiOutMinutes=10, taxiOutMinutes=18), flightEquipment=Row(actualEquipmentIataCode='77L', scheduledEquipmentIataCode='77L', tailNumber='A7-BBA'), flightId=1028199581, flightNumber='739', operationalTimes=Row(actualGateArrival=None, actualGateDeparture

Let's see how many flights arrived at LAX in the past 24 hours.

In [84]:
df_arr.count()

337

Let's first drop all the unnecessary columns.

In [85]:
df_arr = df_arr.drop('airportResources', 'codeshares', 'departureDate',
            'divertedAirportFsCode', 'operationalTimes', 'schedule', 'status',
            'flightId', 'flightNumber')
df_arr.show(5)

+--------------------+--------------------+-------------+--------------------+----------------------+--------------------+------------------+
|arrivalAirportFsCode|         arrivalDate|carrierFsCode|              delays|departureAirportFsCode|     flightDurations|   flightEquipment|
+--------------------+--------------------+-------------+--------------------+----------------------+--------------------+------------------+
|                 LAX|[2020-01-17T13:10...|           QR|      [, 13, 52, 60]|                   DOH|[, 1021, 985,, 10...|[77L, 77L, A7-BBA]|
|                 LAX|[2020-01-17T13:15...|           QF|[205, 247, 226, 240]|                   SYD|[, 763, 830, 57, ...|[388, 388, VH-OQJ]|
|                 LAX|[2020-01-17T13:50...|           SU|          [14, 23,,]|                   SVO|[, 745, 770, 15, ...|[332, 332, VQ-BBE]|
|                 LAX|[2020-01-17T14:00...|           NZ|       [, 7, 17, 25]|                   AKL|[, 681, 725, 34, ...|[77W, 77W, ZK-OKS]|
|     

In [86]:
df_arr.printSchema()

root
 |-- arrivalAirportFsCode: string (nullable = true)
 |-- arrivalDate: struct (nullable = true)
 |    |-- dateLocal: string (nullable = true)
 |    |-- dateUtc: string (nullable = true)
 |-- carrierFsCode: string (nullable = true)
 |-- delays: struct (nullable = true)
 |    |-- arrivalGateDelayMinutes: long (nullable = true)
 |    |-- arrivalRunwayDelayMinutes: long (nullable = true)
 |    |-- departureGateDelayMinutes: long (nullable = true)
 |    |-- departureRunwayDelayMinutes: long (nullable = true)
 |-- departureAirportFsCode: string (nullable = true)
 |-- flightDurations: struct (nullable = true)
 |    |-- blockMinutes: long (nullable = true)
 |    |-- scheduledAirMinutes: long (nullable = true)
 |    |-- scheduledBlockMinutes: long (nullable = true)
 |    |-- scheduledTaxiInMinutes: long (nullable = true)
 |    |-- scheduledTaxiOutMinutes: long (nullable = true)
 |    |-- taxiOutMinutes: long (nullable = true)
 |-- flightEquipment: struct (nullable = true)
 |    |-- actualEq

Let's explore the dataframe a little bit here.

In [87]:
df_arr.groupBy('carrierFsCode').count().orderBy('count', ascending=False).show()

+-------------+-----+
|carrierFsCode|count|
+-------------+-----+
|           DL|   48|
|           AA|   45|
|           OO|   44|
|           WN|   42|
|           CP|   30|
|           UA|   25|
|           AS|   24|
|           NK|    6|
|           B6|    6|
|           QX|    5|
|           AC|    4|
|           Y4|    3|
|           BA|    3|
|           WS|    3|
|           QF|    3|
|           CZ|    2|
|           BR|    2|
|           1I|    2|
|          BAI|    2|
|           OZ|    2|
+-------------+-----+
only showing top 20 rows



Let's see what the busiest time of the day is for LAX next. But before we do that, I'm going to separate out the arrival time from 'arrivalDate'.

In [88]:
# first need to extract the date info from the arrivalDate struct
df_arr = df_arr.withColumn('localDate', col('arrivalDate').getField('dateLocal'))
#df_arr.select('localDate').show(5)

df_arr = df_arr.withColumn('localTimeStamp', df_arr['localDate'].cast(TimestampType()))
df_arr.select('localTimeStamp').show(5)

+-------------------+
|     localTimeStamp|
+-------------------+
|2020-01-17 13:10:00|
|2020-01-17 13:15:00|
|2020-01-17 13:50:00|
|2020-01-17 14:00:00|
|2020-01-17 14:40:00|
+-------------------+
only showing top 5 rows



Give each time increment its own column.

In [89]:
df_arr.withColumn('date', col('localTimeStamp').cast('date')).withColumn('hour', hour(col('localTimeStamp'))).withColumn('minute', minute(col('localTimeStamp'))).show(5)

+--------------------+--------------------+-------------+--------------------+----------------------+--------------------+------------------+--------------------+-------------------+----------+----+------+
|arrivalAirportFsCode|         arrivalDate|carrierFsCode|              delays|departureAirportFsCode|     flightDurations|   flightEquipment|           localDate|     localTimeStamp|      date|hour|minute|
+--------------------+--------------------+-------------+--------------------+----------------------+--------------------+------------------+--------------------+-------------------+----------+----+------+
|                 LAX|[2020-01-17T13:10...|           QR|      [, 13, 52, 60]|                   DOH|[, 1021, 985,, 10...|[77L, 77L, A7-BBA]|2020-01-17T13:10:...|2020-01-17 13:10:00|2020-01-17|  13|    10|
|                 LAX|[2020-01-17T13:15...|           QF|[205, 247, 226, 240]|                   SYD|[, 763, 830, 57, ...|[388, 388, VH-OQJ]|2020-01-17T13:15:...|2020-01-17 13:

Since travel tends to change with the time of year, let's also add on weekOfYear and dayOfWeek as well.

In [90]:
df_arr.withColumn('weekOfYear', weekofyear(col('localTimeStamp'))).withColumn('dayOfWeek', dayofweek(col('localTimeStamp'))).show(5)

+--------------------+--------------------+-------------+--------------------+----------------------+--------------------+------------------+--------------------+-------------------+----------+---------+
|arrivalAirportFsCode|         arrivalDate|carrierFsCode|              delays|departureAirportFsCode|     flightDurations|   flightEquipment|           localDate|     localTimeStamp|weekOfYear|dayOfWeek|
+--------------------+--------------------+-------------+--------------------+----------------------+--------------------+------------------+--------------------+-------------------+----------+---------+
|                 LAX|[2020-01-17T13:10...|           QR|      [, 13, 52, 60]|                   DOH|[, 1021, 985,, 10...|[77L, 77L, A7-BBA]|2020-01-17T13:10:...|2020-01-17 13:10:00|         3|        6|
|                 LAX|[2020-01-17T13:15...|           QF|[205, 247, 226, 240]|                   SYD|[, 763, 830, 57, ...|[388, 388, VH-OQJ]|2020-01-17T13:15:...|2020-01-17 13:15:00|  

Delays column, the column that contains our target data, is an array of four values. They are minutes of delay in different stages of the flight but, since travelers don't really care where the delays occur, but rather how long the delays are, let's sum up the numbers to get one total delayed minutes.

In [91]:
# a df with just the delay data
delay_df = df_arr.select('delays.*').na.fill(0)
delay_df.show(5)

+-----------------------+-------------------------+-------------------------+---------------------------+
|arrivalGateDelayMinutes|arrivalRunwayDelayMinutes|departureGateDelayMinutes|departureRunwayDelayMinutes|
+-----------------------+-------------------------+-------------------------+---------------------------+
|                      0|                       13|                       52|                         60|
|                    205|                      247|                      226|                        240|
|                     14|                       23|                        0|                          0|
|                      0|                        7|                       17|                         25|
|                      0|                        0|                        0|                          0|
+-----------------------+-------------------------+-------------------------+---------------------------+
only showing top 5 rows



In [92]:
# create a new column in main arrival df with total delay minutes
delay_df = delay_df.withColumn('totalDelay', sum(delay_df[col] for col in delay_df.columns))
delay_df = delay_df.select('*').withColumn('d_id', monotonically_increasing_id())

delay_df.show(5)


+-----------------------+-------------------------+-------------------------+---------------------------+----------+----+
|arrivalGateDelayMinutes|arrivalRunwayDelayMinutes|departureGateDelayMinutes|departureRunwayDelayMinutes|totalDelay|d_id|
+-----------------------+-------------------------+-------------------------+---------------------------+----------+----+
|                      0|                       13|                       52|                         60|       125|   0|
|                    205|                      247|                      226|                        240|       918|   1|
|                     14|                       23|                        0|                          0|        37|   2|
|                      0|                        7|                       17|                         25|        49|   3|
|                      0|                        0|                        0|                          0|         0|   4|
+-----------------------

Now let's merge this table with the main table.

In [93]:
# In order to join two dataframes, we need to create an index column to join the two 
# dataframes on
df_arr = df_arr.select('*').withColumn('id', monotonically_increasing_id())

df_arr = df_arr.join(delay_df, col('id') == col('d_id'), how='leftouter').drop('d_id')
df_arr.show(5)

+--------------------+--------------------+-------------+--------------------+----------------------+--------------------+------------------+--------------------+-------------------+---+-----------------------+-------------------------+-------------------------+---------------------------+----------+
|arrivalAirportFsCode|         arrivalDate|carrierFsCode|              delays|departureAirportFsCode|     flightDurations|   flightEquipment|           localDate|     localTimeStamp| id|arrivalGateDelayMinutes|arrivalRunwayDelayMinutes|departureGateDelayMinutes|departureRunwayDelayMinutes|totalDelay|
+--------------------+--------------------+-------------+--------------------+----------------------+--------------------+------------------+--------------------+-------------------+---+-----------------------+-------------------------+-------------------------+---------------------------+----------+
|                 LAX|[2020-01-17T13:10...|           QR|      [, 13, 52, 60]|                

Now let's do the same for flightDurations and flightEquipment.

In [0]:
df_weath = spark.read.json(WEATH_JSON_PATH)
df_weath.show(5)

+-------------------+----------+--------+--------+-------------------+-----+---------------+-----------------+----------+--------+-------------------+-----------+----------+-------+----------+-----------+--------+---------+
|apparentTemperature|cloudCover|dewPoint|humidity|               icon|ozone|precipIntensity|precipProbability|precipType|pressure|            summary|temperature|      time|uvIndex|visibility|windBearing|windGust|windSpeed|
+-------------------+----------+--------+--------+-------------------+-----+---------------+-----------------+----------+--------+-------------------+-----------+----------+-------+----------+-----------+--------+---------+
|              53.33|      0.93|   48.75|    0.84|               rain|306.5|         0.0474|             0.87|      rain|  1020.0|         Light Rain|      53.33|1579248000|      0|      4.94|        118|    9.85|     7.89|
|               53.2|      0.88|   49.85|    0.88|               rain|303.5|         0.0262|            