### Variable descriptions of original data set
|Item|Name|Description|
|:--:|:--|:--|
|1|	Year	|1987-2008|
|2|	Month	|1-12|
|3|	DayofMonth	|1-31|
|4|	DayOfWeek	|1 (Monday) - 7 (Sunday)|
|5|	DepTime	|actual departure time (local, hhmm)|
|6|	CRSDepTime	|scheduled departure time (local, hhmm)|
|7|	ArrTime	actual |arrival time (local, hhmm)|
|8|	CRSArrTime	|scheduled arrival time (local, hhmm)|
|9|	UniqueCarrier	|unique carrier code|
|10|	FlightNum	|flight number|
|11|	TailNum	plane |tail number|
|12|	ActualElapsedTime	|in minutes|
|13|	CRSElapsedTime	|in minutes|
|14|	AirTime	|in minutes|
|15|	ArrDelay	|arrival delay, in minutes|
|16|	DepDelay	|departure delay, in minutes|
|17|	Origin	|origin IATA airport code|
|18|	Dest	|destination IATA airport code|
|19|	Distance	|in miles|
|20|	TaxiIn	|taxi in time, in minutes|
|21|	TaxiOut	|taxi out time in minutes|
|22|	Cancelled	|was the flight cancelled?|
|23|	CancellationCode	|reason for cancellation (A = carrier, B = weather, C = NAS, D = security)|
|24|	Diverted	|1 = yes, 0 = no|
|25|	CarrierDelay	|in minutes|
|26|	WeatherDelay	|in minutes|
|27|	NASDelay	|in minutes|
|28|	SecurityDelay	|in minutes|
|29|	LateAircraftDelay	|in minutes|

In [1]:
# general stuff
import locale
locale.setlocale(locale.LC_ALL, 'en_US')

import numpy as np

import pandas as pd
pd.set_option('display.max_columns', None)
pd.set_option('compute.use_bottleneck', True)
pd.set_option('compute.use_numexpr', True)

import os

from matplotlib import pyplot as plt
%matplotlib inline
plt.style.use('ggplot')

import dask.dataframe as dd
from dask import compute, persist
from dask.distributed import Client, progress

!pwd

/Users/mattbaldree/Google Drive/smu/quantifying/QTW_14/QTW_14


In [2]:
# flags

PURGE_DATA = True

In [3]:
# start Dask distributed client and print out stats

c = Client()
c

0,1
Client  Scheduler: tcp://127.0.0.1:55360  Dashboard: http://127.0.0.1:8787,Cluster  Workers: 4  Cores: 4  Memory: 10.31 GB


In [4]:
# if loading data, then purge existing data directory.

if PURGE_DATA:
    # delete /data directory
    from shutil import rmtree

    path = 'data'
    if os.path.exists(path):
        rmtree(path)
    
    # make /data if it doesn't exist
    path = 'data'
    if not os.path.exists(path):
        os.mkdir(path)

In [None]:
# if loading data, download files and decompress them in parallel.

DOWNLOAD_ONE_FILE_ONLY = False

if PURGE_DATA:
    import urllib.request
    import shutil
    import bz2
    
    def download_file(baseurl, yr):
        file_name = ''

        url_of_data_file = baseurl%(yr)
        file_name = 'data/%d.csv'%(yr)
        size = 0

        print('downloading', url_of_data_file, 'to', file_name)
        decompressor = bz2.BZ2Decompressor()

        # download file and decompress it
        with urllib.request.urlopen(url_of_data_file) as response, open(file_name, 'wb') as out_file:
                data = decompressor.decompress(response.read())
                out_file.write(data)
                size = len(data)
                print('file size (MB)', locale.format('%.1f', size/1000000, grouping=True))

        return(file_name, size)
    
    def print_files(files):
        totalSize = 0        
        for f in files:
            size = f[1]/1000000
            totalSize += size
            print('downloaded file:', f[0], ', of size (MB):', 
                  locale.format('%.1f', size, grouping=True))
            
        print('Number of files downloaded:', len(files), 'for a total size (MB):', 
              locale.format('%.1f', totalSize, grouping=True))


    if DOWNLOAD_ONE_FILE_ONLY:
        # testing
        download_file('http://stat-computing.org/dataexpo/2009/%d.csv.bz2', 1988)
    else:    
        # download airline data from 1987 to 2009
        yrs = range(1987, 2009)
        baseurl = 'http://stat-computing.org/dataexpo/2009/%d.csv.bz2'

        from dask import delayed
        download_file = delayed(download_file)

        files = [download_file(baseurl, yr) for yr in yrs]
        files = delayed(files)

        %time files = files.compute()   
      
        print_files(files)

**Click on Dashboard URL printed out above to see the distributed work.**

In [None]:
# print the head of a csv file

print('csv file format')
!head data/1987.csv

In [None]:
# load csv files into a dataframe in parallel

LOAD_ONE_FILE_ONLY = False

if LOAD_ONE_FILE_ONLY:
    filename = os.path.join('data', '2000.csv')
else:
    filename = os.path.join('data', '*.csv')
print('Loading', filename, 'files')

import dask.dataframe as dd
%time df_csv = dd.read_csv(filename, assume_missing=True, \
                           dtype={'TailNum':np.object, 'CancellationCode':np.object}, \
                           storage_options={'anon': True}).rename(columns=str.lower)

print(df_csv.dtypes)
df_csv.head()

In [None]:
# drop columns we don't need or want
df_csv = df_csv.drop(['tailnum', 'actualelapsedtime', 'crselapsedtime', 'airtime', \
             'taxiin', 'taxiout', 'cancelled', 'cancellationcode', \
             'diverted', 'carrierdelay', 'weatherdelay', 'nasdelay', \
             'securitydelay', 'lateaircraftdelay'], axis=1)

In [None]:
df_csv.dtypes

In [None]:
# shape of dataframe

number_of_items = len(df_csv)
locale.format('%d', number_of_items, grouping=True)

In [None]:
# Top 10 origin airports

origin_counts = df_csv.origin.value_counts().head(10)
print('Top origin airports')
print(origin_counts)

origin_counts.plot(kind='barh', figsize=(8,4), title='Top Origin Airports')

## Q1. What airports have the most delayed departures and arrivals?

A flight is delayed if it leaves or arrives more than 15 minutes after its scheduled time.

In [None]:
# Filter departure flights

df_delayed_departure = df_csv[df_csv.depdelay > 15]
delayed_counts = df_delayed_departure.origin.value_counts().head(10)
print('Top delayed origin airports')
print(delayed_counts)

delayed_counts.plot(kind='barh', figsize=(8,4), title='Top Delayed Origin Airports')

In [None]:
# Filter arrival flights

df_delayed_arrival = df_csv[df_csv.arrdelay > 15]
delayed_counts = df_delayed_arrival.dest.value_counts().head(10)
print('Top delayed destination airports')
print(delayed_counts)

delayed_counts.plot(kind='barh', figsize=(8,4), title='Top Delayed Destination Airports')

## Q2. What flights are most frequently delayed with same origin and destination?

A flight is delayed if it leaves or arrives more than 15 minutes after its scheduled time.

In [None]:
# Filter the dataset to delayed flights

df_filtered = df_csv.loc[(df_csv.arrdelay > 15) or (df_csv.depdelay > 15)].compute()
df_filtered.head()

In [None]:
# Group filtered dataset by origin, dest, and flightnum

grp = df_filtered.groupby(['origin', 'dest', 'flightnum']) \
.flightnum.count().reset_index(name='count').sort_values(['count'], ascending=False)

grp.head(15)

In [None]:
# Plot results

grp.head(15).set_index('flightnum').plot(kind='barh', figsize=(8,4), \
                                         title='Top Delayed Flights with Same Origin and Destination Airports')

## Q3. Can you predict a flight's delayed minutes?

Create a prediction model to predict flight delays. Dependent features like weather were not added because you don't know the weather accurately for the day. A new feature was added named `hdays` to indicate how many days the flight was from a holiday. Holidays have a significant impact on travel. Other key features might be helpful, but time did not permit further exploration.

The work is borrowed from `https://jessesw.com/Air-Delays/`,  `https://gist.github.com/mrocklin/19c89d78e34437e061876a9872f4d2df`, and `https://github.com/dmlc/xgboost/blob/master/demo/guide-python/sklearn_examples.py`.

In [None]:
# Function to determine the difference between flight date and neares holiday.
# see https://jessesw.com/Air-Delays/

from pandas.tseries.holiday import USFederalHolidayCalendar
cal = USFederalHolidayCalendar()
holidays = cal.holidays(start='1987-01-01', end='2008-12-31').to_pydatetime()

from datetime import date, datetime, timedelta

def days_until_holiday(d):
    ans = timedelta(100000)
    for i in range(len(holidays)):
        candidate = abs(holidays[i]-d)
        if candidate < ans:
            ans = candidate
    return(float(ans.days))

#assert(days_until_holiday(datetime(2001, 1, 1))==0)
#assert(days_until_holiday(datetime(2009, 1, 1))==7)

In [None]:
# Using apply is slow. Need to see if there is a faster way.

# create days from nearest holiday column
df_csv['hdays'] = df_csv.apply(lambda r: days_until_holiday(datetime(int(r.year), int(r.month), int(r.dayofmonth))),
                               meta=float, axis=1)
df_csv.head()

In [None]:
# drop columns we don't need or want
df_csv = df_csv.drop(['deptime', 'arrtime', 'flightnum'], axis=1)

In [None]:
# random sample of data (features)
df = df_csv.sample(frac=0.05)

# drop rows with NaN
prev_count = len(df)
df = df.dropna()
number_of_items = len(df)
print('dropped %s percent of rows with NA' % locale.format('%.2f', prev_count/number_of_items))

# target data
y = df.depdelay

df = df.drop(['depdelay'], axis=1)

df, y = persist(df, y)
progress(df, y)

In [None]:
number_of_items = len(df)
locale.format('%d', number_of_items, grouping=True)

In [None]:
# categorize appropriate columns

object_columns = ['uniquecarrier', 'origin', 'dest', 'year', 'month', 'dayofmonth', 'dayofweek']
for i in object_columns:
    df_csv[i] = df_csv[i].astype('category')

df_csv = df_csv.categorize()

In [None]:
df.dtypes

In [None]:
df.head()

In [None]:
y.head()

## One hot encode

In [None]:
X = dd.get_dummies(df.categorize())

In [None]:
X.describe().compute()

In [None]:
len(X.columns)

In [None]:
X.head()

## Split and Train

In [None]:
data_train, data_test = X.random_split([0.9, 0.1], random_state=1234)
labels_train, labels_test = y.random_split([0.9, 0.1], random_state=1234)

## Model

In [None]:
from sklearn.linear_model import Lasso
model = Lasso(alpha=0.1)
model.fit(data_train, labels_train)

In [None]:
from sklearn.metrics import mean_absolute_error
    
y_true, y_pred = labels_test, model.predict(data_test)
    
print('Mean absolute error of regression was:')
print(locale.format('%.1f', mean_absolute_error(y_true, y_pred), grouping=True))