# Data Science Project: Flight Delay Prediction: Big Data and Cloud

Note: this is a part of an exercise in CME 250A: Machine Learning with Big data

## Stage 1: Ask a question

My objective is, again, to the predict if a flight arrival is going to be delayed or not.

The performance is measured by AUC since it is a classification problem.

## Stage 2: Set the environment up and get data

First, set up necessary computational tools. We use a tool for parallel computing called h2o. See instruction along the way.

We also need Amazon Web Service. Get an account and if you are a student, get AWS educate so that you have some money to play with. Follow how to launch virtual machines ("instances") from this 10-Minute Tutorials: Launch a Linux Virtual Machine http://aws.amazon.com/s/dm/optimization/server-side-test/launch-a-virtual-machine-b/ 

This is how to launch h2o on an EC2 instance. Note that you will get/see [.pem file] and [public ip] along the process of launching an instance.

1. Open Git bash (right click on Desktop) and type $\texttt{ssh -i ~/.ssh/[.pem file] -L 55555:localhost:54321 ec-2user@[ip]}$. This will get you through the ec2-user terminal. Then check java by typing $\texttt{java -version}$.

2. Open another Git bash window. Upload by using command $\texttt{scp -i ~/.ssh/[.pem file] ~/h2o-3.8.2.2/h2o.jar ec2-user@[ip]:/home/ec2-user}$. Then launch h2o with command $\texttt{java -jar h2o.jar}$

3. To see the web interface of h2o. Go to URL: $\texttt{localhost:55555/flow/index.html}$

In this exercise, we need to use multiple clusters simulatenously to handle a big data set. We need to run a script given in class called 'python h2o-cluster-launch-instances.py'. It requires command called $\texttt{bash}$ which is not natural in Windows. Hence, such a script needed to be run on a supportive terminal such as git shell. Here we integrate the script into this notebook. Hence, it is important that we initialize this notebook with command $\texttt{ipython notebook}$ in git shell.


In [19]:
#!/usr/bin/env python

import os
import sys
import time
import boto
import boto.ec2

Please aware that some variables need to be set according to AWS setup. See detail inside the cell.

In [20]:
# Environment variables you MUST set (either here or by passing them in).
# -----------------------------------------------------------------------
#
# These keys are given during launching an instance process. It is a good practice to save both the .csv file (key + secret key info.
# and the .pem file (ssh private key file) in the folder C:/User/Admin/.ssh/
os.environ['AWS_ACCESS_KEY_ID'] = '...'
os.environ['AWS_SECRET_ACCESS_KEY'] = '...'
os.environ['AWS_SSH_PRIVATE_KEY_FILE'] = '..'

# Launch EC2 instances with an IAM role
# --------------------------------------
# 
iam_profile_resource_name = None
# or
iam_profile_name = None

# Options you MUST tailor to your own AWS account.
# ------------------------------------------------

# SSH key pair name. This should be consistent with keys above.
#keyName = 'cliff click cme250a test key' 
keyName = 'Tee-h2o'   

# AWS security group name. #You need to set this up in AWS and gives permission to all TCP and Anywhere. Go to the tab called
#security group name. Set a new group and add two new rules: All TCP Anywhere and All UDP Anywhere.
# Note:
#     H2O uses TCP and UDP ports 54321 and 54322.
#     RStudio uses TCP port 8787.
#securityGroupName = 'h2o'
securityGroupName = 'Tee-h2o'


# Options you might want to change.
# ---------------------------------
# This is how to specify the number and the size of cluster. Try small things first

numInstancesToLaunch = 4
#numInstancesToLaunch = 1
instanceType = 'm4.2xlarge'
#instanceType = 't2.micro'
instanceNameRoot = 'h2o-instance'


# Options to help debugging.
# --------------------------

debug = 0
# debug = 1
dryRun = False
# dryRun = True


# Options you should not change unless you really mean to.
# --------------------------------------------------------

#regionName = 'us-east-1'
#amiId = 'ami-0b100e61'

regionName = 'us-west-1'
amiId = 'ami-c1afd6a1'


Now, we are ready to call a group of instances.

In [21]:


#--------------------------------------------------------------------------
# No need to change anything below here.
#--------------------------------------------------------------------------

# Note: this python script was initially developed with boto 2.13.3.
def botoVersionMismatch():
    print 'WARNING:  Unsupported boto version.  Please upgrade boto to at least 2.13.x and try again.'
    print 'Comment this out to run anyway.'
    print 'Exiting.'
    sys.exit(1)

if not 'AWS_ACCESS_KEY_ID' in os.environ:
    print 'ERROR: You must set AWS_ACCESS_KEY_ID in the environment.'
    sys.exit(1)

if not 'AWS_SECRET_ACCESS_KEY' in os.environ:
    print 'ERROR: You must set AWS_SECRET_ACCESS_KEY in the environment.'
    sys.exit(1)

if not 'AWS_SSH_PRIVATE_KEY_FILE' in os.environ:
    print 'ERROR: You must set AWS_SSH_PRIVATE_KEY_FILE in the environment.'
    sys.exit(1)

publicFileName = 'nodes-public'
privateFileName = 'nodes-private'

if not dryRun:
    fpublic = open(publicFileName, 'w')
    fprivate = open(privateFileName, 'w')

print 'Using boto version', boto.Version
if True:
    botoVersionArr = boto.Version.split(".")
    if (botoVersionArr[0] != 2):
        botoVersionMismatch
    if (botoVersionArr[1] < 13):
        botoVersionMismatch

if (debug):
    boto.set_stream_logger('h2o-ec2')
ec2 = boto.ec2.connect_to_region(regionName, debug=debug)

print 'Launching', numInstancesToLaunch, 'instances.'

reservation = ec2.run_instances(
    image_id=amiId,
    min_count=numInstancesToLaunch,
    max_count=numInstancesToLaunch,
    key_name=keyName,
    instance_type=instanceType,
    security_groups=[securityGroupName],
    instance_profile_arn=iam_profile_resource_name,
    instance_profile_name=iam_profile_name,
    dry_run=dryRun
)

for i in range(numInstancesToLaunch):
    instance = reservation.instances[i]
    print 'Waiting for instance', i+1, 'of', numInstancesToLaunch, '...'
    instance.update()
    while instance.state != 'running':
        print '    .'
        time.sleep(1)
        instance.update()
    print '    instance', i+1, 'of', numInstancesToLaunch, 'is up.'
    name = instanceNameRoot + str(i)
    instance.add_tag('Name', value=name)

print
print 'Creating output files: ', publicFileName, privateFileName
print

for i in range(numInstancesToLaunch):
    instance = reservation.instances[i]
    instanceName = ''
    if 'Name' in instance.tags:
        instanceName = instance.tags['Name'];
    print 'Instance', i+1, 'of', numInstancesToLaunch
    print '    Name:   ', instanceName
    print '    PUBLIC: ', instance.public_dns_name
    print '    PRIVATE:', instance.private_ip_address
    print
    fpublic.write(instance.public_dns_name + '\n')
    fprivate.write(instance.private_ip_address + '\n')

fpublic.close()
fprivate.close()
os.system("dos2unix "+publicFileName)
os.system("dos2unix "+privateFileName)

print 'Sleeping for 60 seconds for ssh to be available...'
time.sleep(60)



Using boto version 2.40.0
Launching 4 instances.
Waiting for instance 1 of 4 ...
    .
    .
    .
    .
    .
    .
    .
    .
    .
    .
    .
    .
    .
    .
    .
    .
    .
    instance 1 of 4 is up.
Waiting for instance 2 of 4 ...
    instance 2 of 4 is up.
Waiting for instance 3 of 4 ...
    instance 3 of 4 is up.
Waiting for instance 4 of 4 ...
    instance 4 of 4 is up.

Creating output files:  nodes-public nodes-private

Instance 1 of 4
    Name:    h2o-instance0
    PUBLIC:  ec2-54-67-81-81.us-west-1.compute.amazonaws.com
    PRIVATE: 172.31.15.85

Instance 2 of 4
    Name:    h2o-instance1
    PUBLIC:  ec2-54-153-122-120.us-west-1.compute.amazonaws.com
    PRIVATE: 172.31.15.84

Instance 3 of 4
    Name:    h2o-instance2
    PUBLIC:  ec2-54-153-31-38.us-west-1.compute.amazonaws.com
    PRIVATE: 172.31.15.87

Instance 4 of 4
    Name:    h2o-instance3
    PUBLIC:  ec2-54-183-182-145.us-west-1.compute.amazonaws.com
    PRIVATE: 172.31.15.86

Sleeping for 60 seconds for s

Next, we put h2o into each instance. Note that we need to point to several in EC2_Scripts folder given in the class. Change the directory as appropriated.

In [22]:
#d = os.path.dirname(os.path.realpath(__file__))
#d = os.path.dirname("C:/Users/Admin/Documents/GitHub/DataSciencePortal/DataScienceTemplateAndProjects/")
d = os.path.dirname("C:/Users/Admin/Desktop/CME250A/EC2_Scripts/")
#d = os.path.dirname("C:/Users/Tee/Desktop/CME250A/EC2_Scripts/")

In [23]:
print 'Testing ssh access to instances...'
cmd = d + '/' + 'h2o-cluster-test-ssh.sh'
rv = os.system("bash "+cmd)
if rv != 0:
    print 'Failed 1.'
    sys.exit(1)

print 'Distributing flatfile...'
cmd = d + '/' + 'h2o-cluster-distribute-flatfile.sh'
rv = os.system("bash "+cmd)
if rv != 0:
    print 'Failed.'
    sys.exit(1)

print 'Distributing AWS S3 credentials...'
cmd = d + '/' + 'h2o-cluster-distribute-aws-credentials.sh'
rv = os.system("bash "+cmd)
if rv != 0:
    print 'Failed.'
    sys.exit(1)

print 'Starting h2o cluster...'
cmd = d + '/' + 'h2o-cluster-start-h2o.sh'
rv = os.system("bash "+cmd)
if rv != 0:
    print 'Failed.'
    sys.exit(1)

sys.exit(0)

Testing ssh access to instances...
Distributing flatfile...
Distributing AWS S3 credentials...
Starting h2o cluster...


SystemExit: 0

To exit: use 'exit', 'quit', or Ctrl-D.


Next, connect with h2o using one of the public IP in the cluster. You may see an error here if the H2O version of python and ones in the cluster do not match. Try to install python version to match. Once done, you can look up FLOW interface with http://[ip]:54321/flow/index.html

In [24]:
import h2o
# Connect to h2o
h2o.init(ip="54.67.81.81") #public IP

0,1
H2O cluster uptime:,46 seconds 976 milliseconds
H2O cluster version:,3.8.2.2
H2O cluster name:,H2ODemo
H2O cluster total nodes:,4
H2O cluster total free memory:,100.56 GB
H2O cluster total cores:,32
H2O cluster allowed cores:,32
H2O cluster healthy:,True
H2O Connection ip:,54.67.81.81
H2O Connection port:,54321


I get data by setup Imports and Variables below. Note that the directory is different from the small data version as we need to access Amazon's S3. 

In [25]:
# Load the weather data. We need it from S3 amazon set up for CME250A class.

#path for a big flight data
path = "s3n://stanford-cme250a/allyears.csv"
#path = "s3n://stanford-cme250a/allyears2k.csv"

print("Import and Parse flight data")
data = h2o.import_file(path=path)


Import and Parse flight data

Parse Progress: [##################################################] 100%


In [26]:
# Load the weather data. We need it from S3 amazon set up for CME250A class.

#Path for a big weather data
path = "s3n://stanford-cme250a/weather/data/"

#We only need from 1987 to 2008 to match flight data
weather_years = [path+"Xheader.csv",
                 path+"X1987.csv",
                 path+"X1988.csv",
                 path+"X1989.csv",
                 path+"X1990.csv",
                 path+"X1991.csv",
                 path+"X1992.csv",
                 path+"X1993.csv",
                 path+"X1994.csv",
                 path+"X1995.csv",
                 path+"X1996.csv",
                 path+"X1997.csv",
                 path+"X1998.csv",
                 path+"X1999.csv",
                 path+"X2000.csv",
                 path+"X2001.csv",
                 path+"X2002.csv",
                 path+"X2003.csv",
                 path+"X2004.csv",
                 path+"X2005.csv",
                 path+"X2006.csv",
                 path+"X2007.csv",
                 path+"X2008.csv"]

print("Import and Parse weather data")
wthr = h2o.import_file(path=weather_years)

Import and Parse weather data

Parse Progress: [##################################################] 100%


We need to reformat this weather link file to work with the cluster. I don't know why it did not work in the first place. It is probably related to the double quote format of CSV file. A quick trick is to remove everything except two columns we need: iata_ref and maslib. Do it and save it as a new file called $\texttt{master-location-identifier-database-20130801-reformatted.csv}$

In [27]:
# Load the link data. The conversion between an airport name and a 6-digit Station code. This is local file. 
# This is a local file but downloadable from http://weather.noaa.gov/tg/site.shtml


print("Import and Parse airport/station link data")
#Upload to S3 and import
#path = "s3n://tee-cme250a/master-location-identifier-database-20130801-reformatted.csv"
#airport_weather_link = h2o.import_file(path=path)

#or upload locally
path = "C:/Users/Admin/Desktop/CME250A/weather/master-location-identifier-database-20130801-reformatted.csv"
airport_weather_link = h2o.upload_file(path=path)


Import and Parse airport/station link data

Parse Progress: [##################################################] 100%


## Stage 3: Explore the data

Explore, Visualize, Clean, Transform, Feature engineering

In this exercise, we will just follow the procedure as explain in the small data exercise. To see the detail of experimentation and choices of action. See the small data exercise. 

### Flight data 

In [28]:
#Year, Month, DayOfMonth, integer is good. Leave it as it is. 

#DayOfWeek should be treated as enum instead of int
data['DayOfWeek'] = data['DayOfWeek'].asfactor()

#All Time should be converted to minutes (1 = 0*60+ 1, 2359 = 25*60+39)

data = data.drop("DepTime")
data['CRSDepTime'] = (data['CRSDepTime']/100).floor()*60 + (data['CRSDepTime']%100)
data = data.drop("ArrTime")
data['CRSArrTime'] = (data['CRSArrTime']/100).floor()*60 + (data['CRSArrTime']%100)

#UniqueCarrier is good.

#FlightNum should be treated as enum intead of int
data['FlightNum'] = data['FlightNum'].asfactor()

#ActualElapsedTime, CRSElapseTime, AirTime: keep only CRSElapseTime.
data = data.drop("ActualElapsedTime")
data = data.drop("AirTime")

#ArrDelay, DepDelay are "too good" predictors. Drop it.
data = data.drop("ArrDelay").drop("DepDelay")

#Origin and Dest are good.

#Cancelled, CancellationCode, Diverted, CarrierDelay, WeatherDelay, NASDelay, SecurityDelay, LateAircraftDelay are "too good" predictors. Drop it.
data = data.drop("Cancelled").drop("CancellationCode").drop("Diverted").drop("CarrierDelay")\
    .drop("WeatherDelay").drop("NASDelay").drop("SecurityDelay").drop("LateAircraftDelay")
    
#Drop TaxiIn and TaxiOut? This is suspicious. So just drop it.
data = data.drop("TaxiIn").drop("TaxiOut")

#IsArrDelayed and IsDepDelayed should be treated as boolean. enum is fine.
#Just focus on IsArrDelayed as in the lecture. Drop the other.
data = data.drop("IsDepDelayed")

#Construct a proper date
data["Date"] = (data["Year"]*10000) + (data["Month"]*100) + data["DayofMonth"]

### Weather data

In [29]:
#Construct a proper date
wthr["Date"] = (wthr["Year"]*10000) + wthr["MonthDay"]

#Select only relevant columns
wthr = wthr[["Date","Station","temp","sea level pres","visibility",\
               "mean wind speed","precipitation","snow depth","fog","rain","snow","hail","thunder","tornado"]]

#Fix those columns with 9999.9 etc.
wthr[wthr["Station"]==999999.0,"Station"] = None
wthr[wthr["sea level pres"]==9999.9,"sea level pres"] = None
wthr[wthr["visibility"]==999.9,"visibility"] = None
wthr[wthr["mean wind speed"]==999.9,"mean wind speed"] = None
wthr[wthr["precipitation"]==99.99,"precipitation"] = None
wthr[wthr["snow depth"]==999.9,"snow depth"] = None

In [30]:
#remove data with Station info
wthr2 = wthr[ wthr["Station"] > 0]

### Airport/Weather station link data

In [31]:
#No need for this as we already have two columns of what we want.
#airport_weather_link = airport_weather_link[['iata_xref','maslib']]

### Merging data

Similar to small data scheme. We want to merge weather information at the destination airport at the same date.

In [32]:
#Let's connect with weather information on that day at the arrival airport. We know what is what since we reformat the weather link file.
airport_weather_link.set_name("C1","Dest")
airport_weather_link.set_name("C2","Station")

#Merge with flight data
data_with_link = data.merge(airport_weather_link,all_x=True,all_y=False)

In [33]:
wthr2.shape

(59819229, 14)

In [34]:
wthr_with_link = wthr2.merge(airport_weather_link,all_x=True,all_y=False)

In [35]:
wthr2 = wthr_with_link[~wthr_with_link["Dest"].isna()]
wthr2 = wthr2.drop("Dest")

In [36]:
wthr2.shape

(22198897, 14)

In [None]:
#Prune out more than half of weather data

In [37]:
#Create the feature to merge
data_with_link["DateStation"] = (data_with_link["Date"]*1000000) + data_with_link["Station"]
data_with_link = data_with_link.drop("Station") #Redundant information (similar to destination) 
data_with_link = data_with_link.drop("Date") #Redundant 

In [38]:
#Create the feature to merge
wthr2["DateStation"] = (wthr2["Date"]*1000000) + wthr2["Station"]
wthr2 = wthr2.drop("Station").drop("Date") # no longer needed these columns once merged

In [39]:
wthr2.shape, data_with_link.shape

((22198897, 12), (123534969, 14))

In [None]:
#These are big tables. Difficult to merge. 

In [40]:
#Merge data with DateStation key.
data_with_weather = data_with_link.merge(wthr2,all_x=True,all_y=False)
data_with_weather = data_with_weather.drop("DateStation") #no longer need this link

KeyboardInterrupt: 

It turned out that the merge process took too long (overnight). STOP HERE. This might be because the limitation of MERGE function in H2O.

Below I outline the plan for the rest which is similar to the small data case. 

## Stage 4: Model the data

In term of preparing data for validation, it is done in the function split_fit_predict as defined below. It is modified such that the peformance measure is AUC.

Here four models are used: GBM (Gradient Boost Method) DRF (Distributed Random Forest) GLM (Generalized Linear Model) and Deep Learning.

In [None]:
def split_fit_predict(data):
    global gbm0,drf0,glm0,dl0
    # Classic Test/Train split
    r = data['Year'].runif() # Random UNIForm numbers, one per row
    train = data[ r < 0.7]
    test = data[0.7 <= r]
    print("Training data has",train.ncol,"columns and",train.nrow,"rows, test has",test.nrow,"rows")
    flight_names_x = data.names
    if "IsArrDelayed" in flight_names_x: flight_names_x.remove("IsArrDelayed")

    # Run GBM
    s = time.time()
    gbm0 = h2o.H2OGradientBoostingEstimator(ntrees=400, max_depth=6, learn_rate=0.1)
    gbm0.train(x=flight_names_x,y="IsArrDelayed",training_frame =train,validation_frame=test)
    gbm_elapsed = time.time() - s #measure elapse time

    # Run DRF
    s = time.time()
    drf0 = h2o.H2ORandomForestEstimator(ntrees=100, max_depth=30)
    drf0.train(x=flight_names_x,y="IsArrDelayed",training_frame =train,validation_frame=test)
    drf_elapsed = time.time() - s

    # Run GLM
    #if "WC1" in bike_names_x: bike_names_x.remove("WC1")
    s = time.time()
    glm0 = h2o.H2OGeneralizedLinearEstimator(Lambda=[1e-5], family="binomial") #For logistic
    glm0.train(x=flight_names_x,y="IsArrDelayed",training_frame =train,validation_frame=test)
    glm_elapsed = time.time() - s

    # Run DL
    s = time.time()
    dl0 = h2o.H2ODeepLearningEstimator(hidden=[50,50,50,50], epochs=6)
    dl0.train(x=flight_names_x,y="IsArrDelayed",training_frame =train,validation_frame=test)
    dl_elapsed = time.time() - s

    # ----------
    # Score & report
    header = ["Model", "AUC TRAIN", "AUC TEST", "Model Training Time (s)"]
    table = [
     ["GBM", gbm0.auc(train=True), gbm0.auc(valid=True),
    round(gbm_elapsed,3)],
     ["DRF", drf0.auc(train=True), drf0.auc(valid=True),
    round(drf_elapsed,3)],
     ["GLM", glm0.auc(train=True), glm0.auc(valid=True),
    round(glm_elapsed,3)],
     ["DL ", dl0 .auc(train=True), dl0 .auc(valid=True),
    round( dl_elapsed,3)],
    ]
    h2o.display.H2ODisplay(table,header)
    # --------


In [None]:
# Split the data (into test & train), fit some models and look at the results
split_fit_predict(data)
# Explore (in Flow) the 4 models - training time, quality of fit, tendency to overfit


Next, let's look at the model WITH weather information at the destination airport.

In [None]:
split_fit_predict(data_with_weather)

By comparing results and checking model in H2O Flow (Go to http://[ip]:54321/flow/index.html Choose Model > List All Models). I found that the best performing model is xxx with weather information. Important features are Origin, Destination, and Flight number. It turns out that weather information is/ is not helpful.

Validation is done implicitly when we look at H2O Flow models and train-test comparison.

## Stage 5: Communicate the data

For small data, I concluded that xxx with additional weather information is the best model with test AUC = 0.xxx. Weather information is/ is not helpful.

Here is the performance visual.

<img src="flight-delay-big-GBM-performance-1.JPG" width = "500x">

<img src="flight-delay-big-GBM-performance-2.JPG" width = "500x">




