<img src="http://imgur.com/1ZcRyrc.png" style="float: left; margin: 20px; height: 55px">

# Predicting Flight Delays with Spark ML 

---

In this notebook, we will be using features that have already been prepared with PySpark to predict flight delays via regression and classification.

<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#First-unzip-the-data" data-toc-modified-id="First-unzip-the-data-1">First unzip the data</a></span></li><li><span><a href="#Import-the-required-modules" data-toc-modified-id="Import-the-required-modules-2">Import the required modules</a></span></li><li><span><a href="#Load-and-Inspect-our-JSON-Training-Data-(should-take-around-a-min-with-Spark):" data-toc-modified-id="Load-and-Inspect-our-JSON-Training-Data-(should-take-around-a-min-with-Spark):-3">Load and Inspect our JSON Training Data (should take around a min with Spark):</a></span></li><li><span><a href="#Extract-the-head-of-the-dataframe" data-toc-modified-id="Extract-the-head-of-the-dataframe-4">Extract the head of the dataframe</a></span></li><li><span><a href="#Determine-the-number-of-rows-in-the-dataframe" data-toc-modified-id="Determine-the-number-of-rows-in-the-dataframe-5">Determine the number of rows in the dataframe</a></span></li><li><span><a href="#Check-for-null-values-in-the-features-before-using-Spark-ML" data-toc-modified-id="Check-for-null-values-in-the-features-before-using-Spark-ML-6">Check for null values in the features before using Spark ML</a></span></li><li><span><a href="#Add-a-Route-variable-to-replace-FlightNum" data-toc-modified-id="Add-a-Route-variable-to-replace-FlightNum-7">Add a Route variable to replace FlightNum</a></span></li><li><span><a href="#Categorize-or-&quot;bucketize&quot;-the-arrival-delay-field-using-a-DataFrame-UDF" data-toc-modified-id="Categorize-or-&quot;bucketize&quot;-the-arrival-delay-field-using-a-DataFrame-UDF-8">Categorize or "bucketize" the arrival delay field using a DataFrame UDF</a></span></li><li><span><a href="#Use-pyspark.ml.feature.Bucketizer-to-bucketize-ArrDelay" data-toc-modified-id="Use-pyspark.ml.feature.Bucketizer-to-bucketize-ArrDelay-9">Use <code>pyspark.ml.feature.Bucketizer</code> to bucketize <code>ArrDelay</code></a></span></li><li><span><a href="#Turn-categorical-fields-into-categorical-feature-vectors" data-toc-modified-id="Turn-categorical-fields-into-categorical-feature-vectors-10">Turn categorical fields into categorical feature vectors</a></span></li><li><span><a href="#Cross-validate,-train-and-evaluate-a-classifier-of-your-choice-(from-MLlib)" data-toc-modified-id="Cross-validate,-train-and-evaluate-a-classifier-of-your-choice-(from-MLlib)-11">Cross validate, train and evaluate a classifier of your choice (from MLlib)</a></span></li></ul></div>

### First unzip the data

In [1]:
!unzip ../flight_delay_sample.jsonl.zip

Archive:  ../flight_delay_sample.jsonl.zip
  inflating: flight_delay_sample.jsonl  


### Import the required modules

In [2]:
import findspark
findspark.init('/usr/local/spark')

import pyspark as ps    # for the pyspark suite
import warnings         # for displaying warnings
from pyspark.sql import SQLContext
from datetime import datetime
import numpy as np

In [3]:
try:
    # we try to create a SparkContext to work locally on all cpus available
    sc = ps.SparkContext('local[4]')
    sqlContext = SQLContext(sc)
    print("Just created a SparkContext")
except ValueError:
    # give a warning if SparkContext already exists (for use inside pyspark)
    warnings.warn("SparkContext already exists in this scope")

Just created a SparkContext


### Load and Inspect our JSON Training Data (should take around a min with Spark):

In [4]:
from pyspark.sql.types import StringType, IntegerType, FloatType, DoubleType, DateType, TimestampType
from pyspark.sql.types import StructType, StructField

schema = StructType([
  StructField("ArrDelay", DoubleType(), True),     # "ArrDelay":5.0
  StructField("CRSArrTime", TimestampType(), True),    # "CRSArrTime":"2015-12-31T03:20:00.000-08:00"
  StructField("CRSDepTime", TimestampType(), True),    # "CRSDepTime":"2015-12-31T03:05:00.000-08:00"
  StructField("Carrier", StringType(), True),     # "Carrier":"WN"
  StructField("DayOfMonth", IntegerType(), True), # "DayOfMonth":31
  StructField("DayOfWeek", IntegerType(), True),  # "DayOfWeek":4
  StructField("DayOfYear", IntegerType(), True),  # "DayOfYear":365
  StructField("DepDelay", DoubleType(), True),     # "DepDelay":14.0
  StructField("Dest", StringType(), True),        # "Dest":"SAN"
  StructField("Distance", DoubleType(), True),     # "Distance":368.0
  StructField("FlightDate", DateType(), True),    # "FlightDate":"2015-12-30T16:00:00.000-08:00"
  StructField("FlightNum", StringType(), True),   # "FlightNum":"6109"
  StructField("Origin", StringType(), True),      # "Origin":"TUS"
])

In [5]:
features = sqlContext.read.json(
  "flight_delay_sample.jsonl",
  schema=schema
)

### Extract the head of the dataframe

In [6]:
# A:
features.take(5)

[Row(ArrDelay=13.0, CRSArrTime=datetime.datetime(2015, 1, 1, 18, 10), CRSDepTime=datetime.datetime(2015, 1, 1, 15, 30), Carrier='AA', DayOfMonth=1, DayOfWeek=4, DayOfYear=1, DepDelay=14.0, Dest='DFW', Distance=569.0, FlightDate=datetime.date(2014, 12, 31), FlightNum='1024', Origin='ABQ'),
 Row(ArrDelay=36.0, CRSArrTime=datetime.datetime(2015, 1, 1, 11, 45), CRSDepTime=datetime.datetime(2015, 1, 1, 9, 0), Carrier='AA', DayOfMonth=1, DayOfWeek=4, DayOfYear=1, DepDelay=-2.0, Dest='DFW', Distance=569.0, FlightDate=datetime.date(2014, 12, 31), FlightNum='336', Origin='ABQ'),
 Row(ArrDelay=-21.0, CRSArrTime=datetime.datetime(2015, 1, 1, 19, 30), CRSDepTime=datetime.datetime(2015, 1, 1, 17, 55), Carrier='AA', DayOfMonth=1, DayOfWeek=4, DayOfYear=1, DepDelay=-1.0, Dest='DFW', Distance=731.0, FlightDate=datetime.date(2014, 12, 31), FlightNum='125', Origin='ATL'),
 Row(ArrDelay=-14.0, CRSArrTime=datetime.datetime(2015, 1, 1, 10, 25), CRSDepTime=datetime.datetime(2015, 1, 1, 8, 55), Carrier='AA',

### Determine the number of rows in the dataframe

In [7]:
features.count()

250761

### Check for null values in the features before using Spark ML

In [8]:
from pyspark.sql.functions import isnan

In [9]:
for col in features.columns:
    try:
        print(col,':\t', features.na.df.where(isnan(col)).count())
    except:
        print(col,':\t', 'nan')

ArrDelay :	 0
CRSArrTime :	 nan
CRSDepTime :	 nan
Carrier :	 0
DayOfMonth :	 0
DayOfWeek :	 0
DayOfYear :	 0
DepDelay :	 0
Dest :	 0
Distance :	 0
FlightDate :	 nan
FlightNum :	 0
Origin :	 0


### Add a Route variable to replace FlightNum

Take the origin and the destination and create a column `route` as a concatenation of both.

In [10]:
from pyspark.sql.functions import lit, concat

In [11]:
features_with_route = features.withColumn(
  'Route',
  concat(
    features.Origin,
    lit('-'),
    features.Dest
  )
)
features_with_route.select("Origin", "Dest", "Route").show(5)

+------+----+-------+
|Origin|Dest|  Route|
+------+----+-------+
|   ABQ| DFW|ABQ-DFW|
|   ABQ| DFW|ABQ-DFW|
|   ATL| DFW|ATL-DFW|
|   ATL| DFW|ATL-DFW|
|   ATL| DFW|ATL-DFW|
+------+----+-------+
only showing top 5 rows



### Categorize or "bucketize" the arrival delay field using a DataFrame UDF

Create some label to characterize the delay from 0 to 3

In [12]:
# Wrap the function in pyspark.sql.functions.udf (Spark User Defined Functions) with...
# pyspark.sql.types.StructField information

from pyspark.sql.functions import udf

In [13]:
def bucketize_arr_delay(arr_delay):
    bucket = None
    if arr_delay <= -15.0:
        bucket = 0.0
    elif arr_delay > -15.0 and arr_delay <= 0.0:
        bucket = 1.0
    elif arr_delay > 0.0 and arr_delay <= 30.0:
        bucket = 2.0
    elif arr_delay > 30.0:
        bucket = 3.0
    return bucket

In [14]:
dummy_function_udf = udf(bucketize_arr_delay, StringType())

# Add a categorical column via pyspark.sql.DataFrame.withColumn
manual_bucketized_features = features_with_route.withColumn(
  "ArrDelayBucket",
  dummy_function_udf(features['ArrDelay'])
)
manual_bucketized_features.select("ArrDelay", "ArrDelayBucket").show()

+--------+--------------+
|ArrDelay|ArrDelayBucket|
+--------+--------------+
|    13.0|           2.0|
|    36.0|           3.0|
|   -21.0|           0.0|
|   -14.0|           1.0|
|    16.0|           2.0|
|    13.0|           2.0|
|    25.0|           2.0|
|    14.0|           2.0|
|   -29.0|           0.0|
|    -3.0|           1.0|
|    -8.0|           1.0|
|    -1.0|           1.0|
|    18.0|           2.0|
|   -15.0|           0.0|
|   -11.0|           1.0|
|   -16.0|           0.0|
|    16.0|           2.0|
|    42.0|           3.0|
|    25.0|           2.0|
|    -4.0|           1.0|
+--------+--------------+
only showing top 20 rows



### Use `pyspark.ml.feature.Bucketizer` to bucketize `ArrDelay`

Same as before but this time with `pyspark.ml.feature.Bucketizer`

In [15]:
from pyspark.ml.feature import Bucketizer

In [16]:
splits = [-float("inf"), -15.0, 0, 30.0, float("inf")]

bucketizer = Bucketizer(
  splits=splits,
  inputCol="ArrDelay",
  outputCol="ArrDelayBucket"
)
ml_bucketized_features = bucketizer.transform(features_with_route)

# Check the bucket's output
ml_bucketized_features.select("ArrDelay", "ArrDelayBucket").show()

+--------+--------------+
|ArrDelay|ArrDelayBucket|
+--------+--------------+
|    13.0|           2.0|
|    36.0|           3.0|
|   -21.0|           0.0|
|   -14.0|           1.0|
|    16.0|           2.0|
|    13.0|           2.0|
|    25.0|           2.0|
|    14.0|           2.0|
|   -29.0|           0.0|
|    -3.0|           1.0|
|    -8.0|           1.0|
|    -1.0|           1.0|
|    18.0|           2.0|
|   -15.0|           1.0|
|   -11.0|           1.0|
|   -16.0|           0.0|
|    16.0|           2.0|
|    42.0|           3.0|
|    25.0|           2.0|
|    -4.0|           1.0|
+--------+--------------+
only showing top 20 rows



### Turn categorical fields into categorical feature vectors

In [17]:
from pyspark.ml.feature import StringIndexer, VectorAssembler

In [18]:
# Turn categorical fields into categorical feature vectors, then drop intermediate fields
for column in ["Carrier", "DayOfMonth", "DayOfWeek", "DayOfYear",
               "Origin", "Dest", "Route"]:
    string_indexer = StringIndexer(
        inputCol=column,
        outputCol=column + "_index"
    )
    ml_bucketized_features = string_indexer.fit(ml_bucketized_features)\
        .transform(ml_bucketized_features)

# Check out the indices
ml_bucketized_features.show(6)

+--------+--------------------+--------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+-------+--------------+-------------+----------------+---------------+---------------+------------+----------+-----------+
|ArrDelay|          CRSArrTime|          CRSDepTime|Carrier|DayOfMonth|DayOfWeek|DayOfYear|DepDelay|Dest|Distance|FlightDate|FlightNum|Origin|  Route|ArrDelayBucket|Carrier_index|DayOfMonth_index|DayOfWeek_index|DayOfYear_index|Origin_index|Dest_index|Route_index|
+--------+--------------------+--------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+-------+--------------+-------------+----------------+---------------+---------------+------------+----------+-----------+
|    13.0|2015-01-01 18:10:...|2015-01-01 15:30:...|     AA|         1|        4|        1|    14.0| DFW|   569.0|2014-12-31|     1024|   ABQ|ABQ-DFW|           2.0|          4.0|             2.0|         

In [19]:
# Handle continuous, numeric fields by combining them into one feature vector
numeric_columns = ["DepDelay", "Distance"]
index_columns = ["Carrier_index", "DayOfMonth_index",
                 "DayOfWeek_index", "DayOfYear_index", "Origin_index",
                 "Origin_index", "Dest_index", "Route_index"]
vector_assembler = VectorAssembler(
    inputCols=numeric_columns + index_columns,
    outputCol="Features_vec"
)
final_vectorized_features = vector_assembler.transform(ml_bucketized_features)

In [20]:
# Drop the index columns
for column in index_columns:
    final_vectorized_features = final_vectorized_features.drop(column)

# Check out the features
final_vectorized_features.show(5)

+--------+--------------------+--------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+-------+--------------+--------------------+
|ArrDelay|          CRSArrTime|          CRSDepTime|Carrier|DayOfMonth|DayOfWeek|DayOfYear|DepDelay|Dest|Distance|FlightDate|FlightNum|Origin|  Route|ArrDelayBucket|        Features_vec|
+--------+--------------------+--------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+-------+--------------+--------------------+
|    13.0|2015-01-01 18:10:...|2015-01-01 15:30:...|     AA|         1|        4|        1|    14.0| DFW|   569.0|2014-12-31|     1024|   ABQ|ABQ-DFW|           2.0|[14.0,569.0,4.0,2...|
|    36.0|2015-01-01 11:45:...|2015-01-01 09:00:...|     AA|         1|        4|        1|    -2.0| DFW|   569.0|2014-12-31|      336|   ABQ|ABQ-DFW|           3.0|[-2.0,569.0,4.0,2...|
|   -21.0|2015-01-01 19:30:...|2015-01-01 17:55:...|     AA|     

### Cross validate, train and evaluate a classifier of your choice (from MLlib)

In [21]:
# Train/test split
training_data, test_data = final_vectorized_features.randomSplit([0.7, 0.3])

# Instantiate and fit a random forest classifier
from pyspark.ml.classification import RandomForestClassifier
rfc = RandomForestClassifier(
  featuresCol="Features_vec",
  labelCol="ArrDelayBucket",
  maxBins=4657,
  maxMemoryInMB=1024
)
model = rfc.fit(training_data)

In [22]:
# Evaluate the model using test data
predictions = model.transform(test_data)

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="ArrDelayBucket", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = {}".format(accuracy))

# Check a sample
predictions.sample(False, 0.001, 18).orderBy("CRSDepTime").show(6)

Accuracy = 0.5914904310333426
+--------+--------------------+--------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+-------+--------------+--------------------+--------------------+--------------------+----------+
|ArrDelay|          CRSArrTime|          CRSDepTime|Carrier|DayOfMonth|DayOfWeek|DayOfYear|DepDelay|Dest|Distance|FlightDate|FlightNum|Origin|  Route|ArrDelayBucket|        Features_vec|       rawPrediction|         probability|prediction|
+--------+--------------------+--------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+-------+--------------+--------------------+--------------------+--------------------+----------+
|     5.0|2015-01-01 17:15:...|2015-01-01 15:05:...|     WN|         1|        4|        1|     6.0| TPA|   837.0|2014-12-31|      474|   IND|IND-TPA|           2.0|[6.0,837.0,0.0,2....|[2.22270705952843...|[0.11113535297642...|       2.0|
|    36.0|

In [23]:
sc

<pyspark.context.SparkContext at 0x10d9f88d0>

In [24]:
# remove the unzipped file once you do not need it any more
!rm flight_delay_sample.jsonl