# Demo2: Flight delay prediction - Submitting to the Spark Sandbox cluster

__Warning__: Do not run this notebook locally! Read the instructions below for how to submit this notebook to a remote Spark cluster. Running this notebook locally will take a long time and run out of available memory.

In previous tutorial, we used the __Flight Dataset__ to analyze and predict flight delays based on past flights. We showed how you can use Jupyter Notebook and Spark to read, explore, analyze and visualize your resutls. The dataset that we used in the previous tutorial was the flights related to 2007, containing 7 Million flights.

Can we improve the results?

Yes, the prediction accuracy can be improved by using a much bigger data set. In this tutorial you will learn how to __submit a Jupyter notebook__ for execution on a remote spark cluster. Because of the large data set, his is a batch execution rather than an interactive demo.

- We will use the [Flight Dataset](http://stat-computing.org/dataexpo/2009/the-data.html) that is already acceccible in a shared space. We will build a classification model to predicts airline delay from historial flight data. This dataset is 5 GB and contains 52 million flight records. Processing of such a large data set requires use of a Spark cluster. 

- This Notebook will operate on the data set stored in the HDFS file system of our Spark Sandbox cluster.

First, we import some python packages that we need for this use-case. 

In [None]:
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.mllib.regression import LabeledPoint
from pyspark.sql.functions import udf
from pyspark.mllib.linalg import Vectors
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.param import Param, Params
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.stat import Statistics
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
import sys
import numpy as np
import pandas as pd
import time
import datetime

###  Getting the data and creating the RDD
As mentioned, we will use the complete dataset, containing nearly 50 million flights. The file is provided as a __.csv file__ that we have access through DSWB. Size of this dataset is 5 GB. We read data from __HDFS__ into an __RDD__.

This data is already preloaded into HDFS on the Spark Sandbox cluster.

In [None]:
textFile = sc.textFile("hdfs://spark.datascientistworkbench.com:9000/sample_data/2001-2008-merged.csv")

### Cleaning and Caching
In this section, we remove the header of file

In [None]:
textFileRDD=textFile.map(lambda x: x.split(','))
header = textFileRDD.first()
textRDD = textFileRDD.filter(lambda r: r != header)

### Creating the Dataframe from RDD
A DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in Python, but with richer optimizations under the hood.

In [None]:
def parse(r):
    try:
        x=Row(Year=int(r[0]),\
          Month=int(r[1]),\
          DayofMonth=int(r[2]),\
          DayOfWeek=int(r[3]),\
          DepTime=int(float(r[4])), \
          CRSDepTime=int(r[5]),\
          ArrTime=int(float(r[6])),\
          CRSArrTime=int(r[7]), \
          UniqueCarrier=r[8],\
          DepDelay=int(float(r[15])),\
          Origin=r[16],\
          Dest=r[17], \
          Distance=int(float(r[18])))  
    except:
        x=None  
    return x
rowRDD=textRDD.map(lambda r: parse(r)).filter(lambda r:r != None)
airline_df = sqlContext.createDataFrame(rowRDD)

In this section, we add a new column to our data frame to determine the delayed flight against non-delayed ones. Later, we use this column as target/label column in the classification process. So, a binary variable is defined as __DepDelayed__ which its value __True__ for flights having 15 mins or more of delay, and __False__ otherwise.

In [None]:
airline_df=airline_df.withColumn('DepDelayed',airline_df['DepDelay']>15)

In [None]:
# define hour function to obtain hour of day
def hour_ex(x): 
    h=int(str(int(x)).zfill(4)[:2])
    return h
# register as a UDF 
f = udf(hour_ex, IntegerType())
#CRSDepTime: scheduled departure time (local, hhmm)
airline_df=airline_df.withColumn('hour', f(airline_df.CRSDepTime))
airline_df.registerTempTable("airlineDF")

## Modeling: Logistic Regression

### Preprocessing: Airport selection
In the following cell we select the airline that we want to predict its delay. To simplify, we will build a supervised learning model to predict flight delays for flights leaving JFK

In [None]:
Origin_Airport="JFK"

In [None]:
df_ORG =sqlContext.sql("SELECT * from airlineDF WHERE Origin='"+ Origin_Airport+"'")
df_ORG.registerTempTable("df_ORG")

### Preprocessing: Feature selection
In the next two cell we select the featurs that we need to create the model.

In [None]:
df_model=df_ORG
stringIndexer1 = StringIndexer(inputCol="Origin", outputCol="originIndex")
model_stringIndexer = stringIndexer1.fit(df_model)
indexedOrigin = model_stringIndexer.transform(df_model)
encoder1 = OneHotEncoder(dropLast=False, inputCol="originIndex", outputCol="originVec")
df_model = encoder1.transform(indexedOrigin)

We use __labeled point__ to make local vectors associated with a label/response. In MLlib, labeled points are used in supervised learning algorithms and they are stored as doubles. For binary classification, a label should be either 0 (negative) or 1 (positive). 

In [None]:
assembler = VectorAssembler(
    inputCols=['Year','Month','DayofMonth','DayOfWeek','hour','Distance','originVec'],
    outputCol="features")
output = assembler.transform(df_model)
airlineRDD=output.map(lambda row: LabeledPoint([0,1][row['DepDelayed']],row['features']))

### Preprocessing: Spliting dataset into train and test dtasets

In [None]:
trainRDD,testRDD=airlineRDD.randomSplit([0.7,0.3])
model = LogisticRegressionWithLBFGS.train(trainRDD)

## Model Evaluation

In [None]:
# Evaluating the model on testing data
labelsAndPreds = testRDD.map(lambda p: (p.label, model.predict(p.features)))

In [None]:
def conf(r):
    if r[0] == r[1] ==1: x= 'TP'
    if r[0] == r[1] ==0: x= 'TN'
    if r[0] == 1 and  r[1] ==0: x= 'FN'
    if r[0] == 0 and  r[1] ==1: x= 'FP'
    return (x)
acc1=labelsAndPreds.map(lambda (v, p): ((v, p),1)).reduceByKey(lambda a, b: a + b).take(5)
acc=[(conf(x[0]),x[1]) for x in acc1]

In [None]:
TP=TN=FP=FN=0.0
for x in acc: 
    if x[0]=='TP': TP= x[1]
    if x[0]=='TN': TN= x[1]
    if x[0]=='FP': FP= x[1]
    if x[0]=='FN': FN= x[1]
eps=sys.float_info.epsilon
Accuracy= (TP+TN) / (TP + TN+ FP+FN+eps) 
print "Model Accuracy for JFK: %1.2f %%" % (Accuracy*100)

## Submit to Spark Cluster

When you create a notebook in the Data Scientist Workbench, you have full access to a local Spark. Local Spark is very useful for building code and working with small datasets.

When working with large datasets, you will need the power of a Spark cluster to finish your jobs in a reasonable time.

To submit your Python notebook to a *Spark cluster* for execution, expand the twistie next to the notebook name in the sidebar and click on *Submit to Spark Cluster* menu item. Please see the screenshot below.

<img src="https://ibm.box.com/shared/static/2hh6wr5o03sldyt3k33h2zxf1a3ekxnb.png"/>

### Warning
  "Submit notebook to Spark Cluster" will open a new notebook named "Submit-to-Spark-ClusterX". Follow the instructions in the new notebook to run this notebook on the remote Spark cluster.



## Want to learn more?

<a href="http://bigdatauniversity.com/courses/spark-fundamentals/?utm_source=tutorial-flight-demo-2&utm_medium=dswb&utm_campaign=bdu"><img src = "https://ibm.box.com/shared/static/r3pj5oo2ivnzqar0poj2eexiqrnvq6vy.png"> </a>

<a href="http://bigdatauniversity.com/courses/advanced-classification-and-prediction/?utm_source=tutorial-flight-demo-2&utm_medium=dswb&utm_campaign=bdu"><img src = "https://ibm.box.com/shared/static/u7iyiej98gb971gmjqvfsveqz3ik4fxj.png"> </a>


<h3>Authors:</h3>
<article class="teacher">
<div class="teacher-image" style="    float: left;
    width: 115px;
    height: 115px;
    margin-right: 10px;
    margin-bottom: 10px;
    border: 1px solid #CCC;
    padding: 3px;
    border-radius: 3px;
    text-align: center;"><img class="alignnone wp-image-2258 " src="https://ibm.box.com/shared/static/tyd41rlrnmfrrk78jx521eb73fljwvv0.jpg" alt="Saeed Aghabozorgi" width="178" height="178" /></div>
<h4>Saeed Aghabozorgi</h4>
<p><a href="https://ca.linkedin.com/in/saeedaghabozorgi">Saeed Aghabozorgi</a>, PhD is a Data Scientist in IBM with a track record of developing enterprise level applications that substantially increases clients’ ability to turn data into actionable knowledge. He is a researcher in data mining field and expert in developing advanced analytic methods like machine learning and statistical modelling on large datasets.</p>
</article>
<article class="teacher">
<div class="teacher-image" style="    float: left;
    width: 115px;
    height: 115px;
    margin-right: 10px;
    margin-bottom: 10px;
    border: 1px solid #CCC;
    padding: 3px;
    border-radius: 3px;
    text-align: center;"><img class="alignnone size-medium wp-image-2177" src="https://ibm.box.com/shared/static/2ygdi03ahcr97df2ofrr6cf8knq4kodd.jpg" alt="Polong Lin" width="300" height="300" /></div>
<h4>Polong Lin</h4>
<p>
<a href="https://ca.linkedin.com/in/polonglin">Polong Lin</a> is a Data Scientist at IBM in Canada. Under the Emerging Technologies division, Polong is responsible for educating the next generation of data scientists through Big Data University. Polong is a regular speaker in conferences and meetups, and holds a M.Sc. in Cognitive Psychology.</p>
</article>

Created by: <a href="https://bigdatauniversity.com/?utm_source=bducreatedbylink&utm_medium=dswb&utm_campaign=bdu">The Cognitive Class Team</a>