# Predicting Delayed Flights in the U.S.

- This workbook will explore the "[`Flights Dataset`](http://stat-computing.org/dataexpo/2009/the-data.html)" and predicts delayed flights at airports located in the United States bases on previous flight records. The dataset consists of flight information such as arrival and departure times. Additionally, the .dat file for plotting the geographical location of airports in the U.S. can be found [`here`](https://github.com/jpatokal/openflights/blob/master/data/airports.dat). Jupyter Notebook and Spark is utilized to explore, analyze and visualize the results. This dataset examines flights in 2008 which is roughly 7 million flights! Apache Spark (locally) will be utilized to analyze this dataset. 

In [1]:
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import Row
import sys
import os
import numpy as np
import pandas as pd
import time
import datetime
import matplotlib.pyplot as plt
import os.path
%matplotlib inline
import sqlite3
plt.style.use('ggplot')

In [6]:
try:
    db = sqlite3.connect('Flights')
    c = db.cursor()
    c.execute('''CREATE TABLE IF NOT EXISTS flight_info(Year INT,
        Month INT,DayofMonth INT, DayOfWeek INT, DepTime INT, CRSDepTime INT, ArrTime INT,
        CRSArrTime INT, UniqueCarrier TEXT, FlightNum INT, TailNum CHAR(8), ActualElapsedTime INT,
        CRSElapsedTime INT, AirTime INT, ArrDelay INT, DepDelay INT,
        Origin TEXT, Dest TEXT, Distance INT,TaxiIn INT, TaxiOut INT,Cancelled INT,CancellationCode TEXT,
        Diverted INT,CarrierDelay INT,WeatherDelay INT,NASDelay INT,SecurityDelay INT,LateAircraftDelay INT)''')
    db.commit()
    db.close()
except Exception as e:
    print(str(e))
    
try:
    db = sqlite3.connect('Flights')
    c = db.cursor()
    c.execute('''CREATE TABLE IF NOT EXISTS airports(AirportID INT, Name TEXT, City TEXT, 
                Country TEXT, FAACODE TEXT, ICAO TEXT, LATITUDE INT, LONGITUDE INT, 
                ALTITUDE INT, TimeZ INT, DST TEXT, TZ TEXT)''')
    db.commit()
    db.close()
except Exception as e:
    print(str(e))

In [7]:
db = sqlite3.connect('Flights')
c = db.cursor()
df = pd.read_csv('2008.csv',sep=',')
%time df.to_sql('flight_info',db, if_exists='append', index=False)

CPU times: user 1min 51s, sys: 7.75 s, total: 1min 58s
Wall time: 2min


In [8]:
db = sqlite3.connect('Flights')
c = db.cursor()
df_a = pd.read_csv('airport_location.csv',sep=',',names = ['AirportID','Name','City','Country','FAACode',
                        'ICAO','Latitude','Longitude','Altitude','TimeZ','DST','TZ'])

%time df_a.to_sql('airports',db,if_exists='append', index=False)

CPU times: user 59.2 ms, sys: 6.6 ms, total: 65.8 ms
Wall time: 68.9 ms


In [9]:
%time c.execute('''SELECT * FROM flight_info''')

CPU times: user 597 µs, sys: 828 µs, total: 1.42 ms
Wall time: 961 µs


<sqlite3.Cursor at 0x1101d73b0>

In [10]:
%time c.execute('''SELECT * FROM airports''')

CPU times: user 234 µs, sys: 453 µs, total: 687 µs
Wall time: 700 µs


<sqlite3.Cursor at 0x1101d73b0>

## Apache Spark

In [2]:
airport_loc_df = pd.read_csv('airport_location.csv',index_col=0,
        names = ['name', 'city', 'country','faa_code','ICAO','lat',
        'lng','alt','TZone','DST','Tz'], header=0)
airport_loc_df.head()

Unnamed: 0,name,city,country,faa_code,ICAO,lat,lng,alt,TZone,DST,Tz
2,Madang,Madang,Papua New Guinea,MAG,AYMD,-5.207083,145.7887,20,10.0,U,Pacific/Port_Moresby
3,Mount Hagen,Mount Hagen,Papua New Guinea,HGU,AYMH,-5.826789,144.295861,5388,10.0,U,Pacific/Port_Moresby
4,Nadzab,Nadzab,Papua New Guinea,LAE,AYNZ,-6.569828,146.726242,239,10.0,U,Pacific/Port_Moresby
5,Port Moresby Jacksons Intl,Port Moresby,Papua New Guinea,POM,AYPY,-9.443383,147.22005,146,10.0,U,Pacific/Port_Moresby
6,Wewak Intl,Wewak,Papua New Guinea,WWK,AYWK,-3.583828,143.669186,19,10.0,U,Pacific/Port_Moresby


In [3]:
try:
    data = sc.textFile("2008.csv")
    count_data = data.count()
    print "Total Number of Rows: {}".format(count_data)
except Exception as e:
    print str(e)

Total Number of Rows: 7009729


#### Preprocessing Data: 

In [4]:
#transformation
split_data = data.map(lambda line: line.split(',')) 
#action
header = split_data.first()
print header

[u'Year', u'Month', u'DayofMonth', u'DayOfWeek', u'DepTime', u'CRSDepTime', u'ArrTime', u'CRSArrTime', u'UniqueCarrier', u'FlightNum', u'TailNum', u'ActualElapsedTime', u'CRSElapsedTime', u'AirTime', u'ArrDelay', u'DepDelay', u'Origin', u'Dest', u'Distance', u'TaxiIn', u'TaxiOut', u'Cancelled', u'CancellationCode', u'Diverted', u'CarrierDelay', u'WeatherDelay', u'NASDelay', u'SecurityDelay', u'LateAircraftDelay']


In [5]:
def parse(r):
    try:
        x = Row(Year=int(r[0]), Month=int(r[1]),DayofMonth=int(r[2]),\
            DayOfWeek=int(r[3]),DepTime=int(float(r[4])),CRSDepTime=int(r[5]),\
            ArrTime=int(float(r[6])),CRSArrTime=int(r[7]),UniqueCarrier=r[8],\
            ArrDelay=int(float(r[14])),DepDelay=int(float(r[15])),Origin=r[16],Dest=r[17],Distance=int(float(r[18])))  
    except:
        x=None  
    return x

textRDD = split_data.filter(lambda r: r != header)
rowRDD = textRDD.map(lambda r: parse(r)).filter(lambda r:r != None)
dataframe = sqlContext.createDataFrame(rowRDD)
dataframe.show(5)

+--------+-------+----------+----------+---------+----------+--------+-------+----+--------+-----+------+-------------+----+
|ArrDelay|ArrTime|CRSArrTime|CRSDepTime|DayOfWeek|DayofMonth|DepDelay|DepTime|Dest|Distance|Month|Origin|UniqueCarrier|Year|
+--------+-------+----------+----------+---------+----------+--------+-------+----+--------+-----+------+-------------+----+
|     -14|   2211|      2225|      1955|        4|         3|       8|   2003| TPA|     810|    1|   IAD|           WN|2008|
|       2|   1002|      1000|       735|        4|         3|      19|    754| TPA|     810|    1|   IAD|           WN|2008|
|      14|    804|       750|       620|        4|         3|       8|    628| BWI|     515|    1|   IND|           WN|2008|
|      -6|   1054|      1100|       930|        4|         3|      -4|    926| BWI|     515|    1|   IND|           WN|2008|
|      34|   1959|      1925|      1755|        4|         3|      34|   1829| BWI|     515|    1|   IND|           WN|2008|


- Adding binary labels to **DepDelayed** columns as this will become the "target" column for the predicitons. 
    - **True** for flights that have more than 15 minute delay (t>15)
    - **False** for flights that are less than or equal to 15 minute delay (<=15)

In [11]:
#Adding column 'DepDelayed' and assigning True/False if >15 mins
dataframe = dataframe.withColumn('DepDelayed', dataframe['DepDelay']>15)

In [7]:
%time df = dataframe.toPandas()

In [None]:
fig = plt.figure(figsize=(15,10))
ax = fig.add_subplot(111)
df['DayOfMonth'].plot(kind='bar')
plt.title('Total Number of Flights by Day of Month',fontsize=20,y=1.03)
plt.show()

In [None]:
fig = plt.figure(figsize=(15,10))
ax = fig.add_subplot(111)
df['DayOfWeek'].plot(kind='bar')
plt.title('Total Number of Flights by Day of Week',fontsize=20,y=1.03)
plt.show()

In [None]:
%time df = dataframe.toPandas()
fig = plt.figure(figsize=(15,10))
ax = fig.add_subplot(111)
df['DepDelayed'].plot(kind='bar',color='Navy')
plt.title('Delayed Flights Breakdown',fontsize=20,y=1.03)
plt.show()

CPU times: user 1min 11s, sys: 5.86 s, total: 1min 17s
Wall time: 2min 17s


- In the following script, a new column **Hour** is added to determine what is the hour of flight (0 to 24)

In [20]:
def hour(x):
    hour = int(str(int(x)).zfill(4)[:2])
    return hour
from pyspark.sql.functions import udf

In [21]:
# register as a UDF 
f = udf(hour, IntegerType())
#CRSDepTime: scheduled departure time (local, hhmm)
dataframe = dataframe.withColumn('hour', f(dataframe.CRSDepTime))
dataframe.registerTempTable("dataframe")

In [22]:
dataframe.select('CRSDepTime','hour').show(5)

+----------+----+
|CRSDepTime|hour|
+----------+----+
|      1955|  19|
|       735|   7|
|       620|   6|
|       930|   9|
|      1755|  17|
+----------+----+
only showing top 5 rows



### Exploring the Data:

In [23]:
# Which airport has the most delays??
# Write SQL query to group delays 

delays = sqlContext.sql("SELECT Origin, count(*) Num_Flights,avg(DepDelay) Delay FROM dataframe GROUP BY Origin")

origin_df = delays.toPandas() #convert to dataframe
origin_df.head()

Unnamed: 0,Origin,Num_Flights,Delay
0,BGM,699,5.915594
1,PSE,742,0.057951
2,DLG,111,16.495495
3,INL,71,-4.802817
4,MSY,38510,8.891587


In [None]:
#Join origin_df with airports
df_airports = pd.merge(origin_df,airport_loc_df,left_on = 'Origin',right_on = 'faa_code')
df_airports.head()

In [None]:
from mpl_toolkits.basemap import Basemap
import matplotlib.pyplot as pl
from pylab import rcParams
%matplotlib inline

def zscore(x):
    return (x-np.average(x))/np.std(x)

fig = plt.figure(figsize=(20,20))
map = Basemap(projection = 'merc',area_thresh = 4000,resolution ='i',
              rsphere=6371200.,llcrnrlon=-130, 
              llcrnrlat=21,urcrnrlon=-62, urcrnrlat=52)

map.drawcoastlines()
map.drawcountries()
map.fillcontinents(color = 'white',alpha=0.3)
map.drawstates()
map.shadedrelief()

#Plot Airport Delay
countrange=max(df_airports['Num_Flights'])-min(df_airports['Num_Flights'])
standarize = (zscore(df_airports['Delay']))
x,y = map(np.asarray(df_airports['lng']),np.asarray(df_airports['lat']))
volume=df_airports['Num_Flights']*4000.0/countrange

#Add color to map:
color = pl.get_cmap('coolwarm')(np.linspace(0.0,1.0,70))
color = np.flipud(color)
map.scatter(x, y,  marker='o', s= volume, linewidths=1.5,
    edgecolors='white', alpha = .7, color=color[(standarize*10)])

#Add Labels to Map
df_text=df_airports[(df_airports['Num_Flights']>70000) \
                    & (df_airports['faa_code'] != 'HNL')] #ignoring Hawaii
xtext,ytext = map(np.asarray(df_text['lng']), np.asarray(df_text['lat']))
txt=np.asarray(df_text['faa_code'])
zp=zip(xtext,ytext,txt)
for row in zp:
    pl.text(row[0],row[1],row[2], fontsize=16, color='blue',)

plt.title("U.S. Airport Delays", fontsize = 42)    
plt.show()