<h1> Predicting airline delays with Spark and ML-Lib using pySpark </h1>
<br>
Adapted from http://nbviewer.ipython.org/github/ofermend/IPython-notebooks/blob/master/blog-part-2.ipynb

<h3>Pre-processing with PySpark</h3>

In [1]:
import os.path

AIRPORT = 'LAX'

base_dir = os.path.join('data')
input_path_2007 = os.path.join('flights', '2007.csv')
input_path_2008 = os.path.join('flights', '2008.csv')
input_path_LAX = os.path.join('flights', 'LAX.csv')

file_name_2007 = os.path.join(base_dir, input_path_2007)
file_name_2008 = os.path.join(base_dir, input_path_2008)
file_name_LAX = os.path.join(base_dir, input_path_LAX)

raw_data_2007 = sc.textFile(file_name_2007)
raw_data_2008 = sc.textFile(file_name_2008)
weather_data_LAX = sc.textFile(file_name_LAX)

header = raw_data_2007.take(1) 

filtered_data_2007 = (raw_data_2007
                        # filter on Airport
                        .filter(lambda line: ',' + AIRPORT + ',' in line)
                        # filter out cancelled flights
                        .filter(lambda line: ',,' in line)
                        .filter(lambda line: 'Year' not in line))
filtered_data_2008 = (raw_data_2008
                        # filter on Airport
                        .filter(lambda line: ',' + AIRPORT + ',' in line)
                        # filter out cancelled flights
                        .filter(lambda line: ',,' in line)
                        .filter(lambda line: 'Year' not in line))

# CRS = Computer Reservation System
# scheduled time as opposed to the actual time
print header
print filtered_data_2007.take(1)
print filtered_data_2008.take(1)

[u'Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay']
[u'2007,1,1,1,1446,1445,1631,1700,WN,732,N483,225,255,209,-29,1,STL,LAX,1593,7,9,0,,0,0,0,0,0,0']
[u'2008,1,3,4,1738,1715,1838,1820,WN,82,N499WN,60,65,42,18,23,LAS,LAX,236,6,12,0,,0,0,0,0,12,6']


In [2]:
holidays = ['01/01/2007', '01/15/2007', '02/19/2007', '05/28/2007', '06/07/2007', '07/04/2007',
      '09/03/2007', '10/08/2007' ,'11/11/2007', '11/22/2007', '12/25/2007',
      '01/01/2008', '01/21/2008', '02/18/2008', '05/22/2008', '05/26/2008', '07/04/2008',
      '09/01/2008', '10/13/2008' ,'11/11/2008', '11/27/2008', '12/25/2008']

In [3]:
import datetime

def make_weather_dict():
    weather_dict = {}
    weather_lines = weather_data_LAX.collect()
    for line in weather_lines:
        all_vals = line.split(',')
        # key: date / values: wind speed, max temp, min temp, precipitation
        weather_dict[all_vals[2]] = [all_vals[3], all_vals[5], all_vals[6], all_vals[7]]
    
    return weather_dict
    
def days_from_nearest_holiday(year, month, day):
    diffs = []
    sample_date = datetime.date(year, month, day)
    for holiday in holidays:
        dt = datetime.datetime.strptime(holiday, '%m/%d/%Y').date()
        td = dt - sample_date
        diffs.append(abs(td.days))

    return min(diffs) * 1.0

def make_features(flight, weather_dict):
    try:
        flight_data = flight.split(',')
        date_string = str(flight_data[0]) + str(flight_data[1]).zfill(2) + str(flight_data[2]).zfill(2)
        weather_data = weather_dict[date_string]
        features = []
        features.append(float(flight_data[15]))        # DepDelay
        features.append(float(flight_data[1]))         # Month
        features.append(float(flight_data[2]))         # DayofMonth
        features.append(float(flight_data[3]))         # DayOfWeek
        features.append(float(flight_data[5]) / 100)   # CRSDepTime
        #features.append(float(flight_data[18]))        # Distance
        features.append(days_from_nearest_holiday(int(flight_data[0]), int(flight_data[1]), int(flight_data[2]))) 
        features.append(float(weather_data[0]))        # wind speed
        features.append(float(weather_data[1]) / 10.0) # max temp
        features.append(float(weather_data[2]) / 10.0) # min temp
        features.append(float(weather_data[3]))        # precipitation

        return (date_string, np.array(features))
    except:
        return (date_string, [i for i in range(11)])

In [4]:
weather_dict = make_weather_dict()

features_2007 = filtered_data_2007.map(lambda line: make_features(line, weather_dict))
features_2008 = filtered_data_2008.map(lambda line: make_features(line, weather_dict))

print features_2007.take(1)
print features_2008.take(1)

[('20070101', [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10])]
[('20080103', [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10])]


<h3>Modeling with Spark and ML-Lib</h3>

In [5]:
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.feature import StandardScaler
import numpy as np

def rescale(feat):
    m = feat.mean()
    s = np.std(feat)
    r = []
    for x in feat:
        r.append((x - m) / s)

    return np.array(r)

train_data = features_2007.map(lambda t: LabeledPoint(1.0, t[1]) if t[1][0] >= 15 else LabeledPoint(0.0, t[1]))
train_data.cache()
scaled_train_data = train_data.map(lambda lp: LabeledPoint(lp.label, rescale(lp.features)))
scaled_train_data.cache()

test_data = features_2008.map(lambda t: LabeledPoint(1.0, t[1]) if t[1][0] >= 15 else LabeledPoint(0.0, t[1]))
test_data.cache()
scaled_test_data = train_data.map(lambda lp: LabeledPoint(lp.label, rescale(lp.features)))
scaled_test_data.cache()

print scaled_train_data.take(1)
print scaled_test_data.take(1)

[LabeledPoint(0.0, [-0.694398239158,-0.694398239158,-0.694398239158,-0.694398239158,1.00836499087,-0.820997735814,-0.820997735814,1.35651360667,0.0905186401091,1.96419119062])]
[LabeledPoint(0.0, [-0.694398239158,-0.694398239158,-0.694398239158,-0.694398239158,1.00836499087,-0.820997735814,-0.820997735814,1.35651360667,0.0905186401091,1.96419119062])]


In [6]:
def eval_metrics(lbl_pred):
    tp = float(lbl_pred.filter(lambda lp: lp[0]==1.0 and lp[1]==1.0).count())
    tn = float(lbl_pred.filter(lambda lp: lp[0]==0.0 and lp[1]==0.0).count())
    fp = float(lbl_pred.filter(lambda lp: lp[0]==1.0 and lp[1]==0.0).count())
    fn = float(lbl_pred.filter(lambda lp: lp[0]==0.0 and lp[1]==1.0).count())
    
    precision = tp / (tp + fp)
    recall = tp / (tp + fn)
    F_measure = 2 * precision * recall / (precision + recall)
    accuracy = (tp + tn) / (tp + tn + fp + fn)
    return([tp, tn, fp, fn], [precision, recall, F_measure, accuracy])

In [50]:
late = test_data.filter(lambda lp: lp.label == 1.0).count() * 1.0
on_time = test_data.filter(lambda lp: lp.label == 0.0).count() * 1.0
tot = test_data.count()

print('Delayed : %i' % late)
print('On time : %i' % on_time)
print('Percent delays : %.2f pct' % round((late / tot) * 100, 2))

Delayed : 36178
On time : 148417
Percent delays : 19.60 pct


In [7]:
from pyspark.mllib.classification import LogisticRegressionWithSGD

model_lr = LogisticRegressionWithSGD.train(scaled_train_data, iterations=100)
labels_and_predictions = scaled_test_data.map(lambda lp: (model_lr.predict(lp.features), lp.label))
metrics = eval_metrics(labels_and_predictions)

print('Precision : %.2f' % round(metrics[1][0], 2))
print('Recall : %.2f' % round(metrics[1][1], 2))
print('F1 : %.2f' % round(metrics[1][2], 2))
print('Accuracy : %.2f' % round(metrics[1][3], 2))

Precision : 0.99
Recall : 0.84
F1 : 0.91
Accuracy : 0.97


In [8]:
from pyspark.mllib.classification import SVMWithSGD

model_svm = SVMWithSGD.train(scaled_train_data, iterations=100, step=1.0, regParam=0.01)
labels_and_predictions = scaled_test_data.map(lambda lp: (model_svm.predict(lp.features), lp.label))
metrics = eval_metrics(labels_and_predictions)

print('Precision : %.2f' % round(metrics[1][0], 2))
print('Recall : %.2f' % round(metrics[1][1], 2))
print('F1 : %.2f' % round(metrics[1][2], 2))
print('Accuracy : %.2f' % round(metrics[1][3], 2))

Precision : 0.98
Recall : 0.92
F1 : 0.95
Accuracy : 0.98
