# Flight Delay Prediction Using Apache SystemML

## Loading SystemML

In [1]:
%AddDeps org.apache.systemml systemml 0.10.0-incubating

:: loading settings :: url = jar:file:/usr/local/spark-kernel/lib/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
:: resolving dependencies :: com.ibm.spark#spark-kernel;working [not transitive]
	confs: [default]
	found org.apache.systemml#systemml;0.10.0-incubating in central
:: resolution report :: resolve 140ms :: artifacts dl 5ms
	:: modules in use:
	org.apache.systemml#systemml;0.10.0-incubating from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   1   |   0   |   0   |   0   ||   1   |   0   |
	---------------------------------------------------------------------
:: retrieving :: com.ibm.spark#spark-kernel
	confs: [default]
	0 artifacts copied, 1 already retrieved (0kB/6ms)


Use Spark's CSV package for loading the CSV file

In [2]:
%AddDeps com.databricks spark-csv_2.10 1.4.0

:: resolving dependencies :: com.ibm.spark#spark-kernel;working [not transitive]
	confs: [default]
	found com.databricks#spark-csv_2.10;1.4.0 in central
:: resolution report :: resolve 37ms :: artifacts dl 5ms
	:: modules in use:
	com.databricks#spark-csv_2.10;1.4.0 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   1   |   0   |   0   |   0   ||   1   |   0   |
	---------------------------------------------------------------------
:: retrieving :: com.ibm.spark#spark-kernel
	confs: [default]
	0 artifacts copied, 1 already retrieved (0kB/3ms)


## Import Data

Download the airline dataset from stat-computing.org if not already downloaded

In [3]:
import sys.process._
import java.net.URL
import java.io.File
val url = "http://stat-computing.org/dataexpo/2009/2007.csv.bz2"
val localFilePath = "airline2007.csv.bz2"
if(!new java.io.File(localFilePath).exists) {
    new URL(url) #> new File(localFilePath) !!
}

Load the dataset into DataFrame using Spark CSV package

In [4]:
import org.apache.spark.sql.SQLContext
import org.apache.spark.storage.StorageLevel
val sqlContext = new SQLContext(sc)
val fmt = sqlContext.read.format("com.databricks.spark.csv")
val opt = fmt.options(Map("header"->"true", "inferSchema"->"true"))
val airline = opt.load(localFilePath).na.replace( "*", Map("NA" -> "0.0") )

In [5]:
airline.printSchema

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: string (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- TaxiIn: integer (nullable = true)
 |-- TaxiOut: integer (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = true)
 |-- C

## Data Exploration
Let's find out which airports have the most delays?

In [6]:
airline.registerTempTable("airline")
sqlContext.sql("""SELECT Origin, count(*) conFlight, avg(DepDelay) delay
                    FROM airline
                    GROUP BY Origin
                    ORDER BY delay DESC""").show

+------+---------+------------------+
|Origin|conFlight|             delay|
+------+---------+------------------+
|   PIR|        4|              45.5|
|   ACK|      314|45.296178343949045|
|   SOP|      195| 34.02051282051282|
|   HHH|      997| 22.58776328986961|
|   MCN|      992|22.496975806451612|
|   AKN|      235|21.123404255319148|
|   CEC|     1055|20.807582938388627|
|   GNV|     1927| 20.69797612869746|
|   EYW|     1052|20.224334600760457|
|   ACY|      735|20.141496598639456|
|   SPI|     1745|19.545558739255014|
|   GST|       90|19.233333333333334|
|   EWR|   154113|18.800853918877706|
|   BRW|      726| 18.02754820936639|
|   AGS|     2286|17.728346456692915|
|   ORD|   375784|17.695756072637472|
|   TRI|     1207| 17.63628831814416|
|   SBN|     5128|17.505850234009362|
|   FAY|     2185| 17.48970251716247|
|   PHL|   103868| 17.09455270150576|
+------+---------+------------------+
only showing top 20 rows



## Modeling: Logistic Regression

Let's Predict departure delays of greater than 15 of flights from JFK

In [8]:
sqlContext.udf.register("checkDelay", (depDelay:String) => try { if(depDelay.toDouble > 15) 1.0 else 2.0 } catch { case e:Exception => 1.0 })
val tempSmallAirlineData = sqlContext.sql("SELECT *, checkDelay(DepDelay) label FROM airline WHERE Origin = 'JFK'").persist(StorageLevel.MEMORY_AND_DISK)
val popularDest = tempSmallAirlineData.select("Dest").map(y => (y.get(0).toString, 1)).reduceByKey(_ + _).filter(_._2 > 1000).collect.toMap
sqlContext.udf.register("onlyUsePopularDest", (x:String) => popularDest.contains(x))
tempSmallAirlineData.registerTempTable("tempAirline")
val smallAirlineData = sqlContext.sql("SELECT * FROM tempAirline WHERE onlyUsePopularDest(Dest)")

val datasets = smallAirlineData.randomSplit(Array(0.7, 0.3))
val trainDataset = datasets(0).cache
val testDataset = datasets(1).cache
trainDataset.count
testDataset.count

35115

### Feature selection

Let's encode the destination using one-hot encoding and include the columns Year, Month, DayofMonth, DayOfWeek, and Distance as well

In [9]:
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, VectorAssembler}

val indexer = new StringIndexer().setInputCol("Dest").setOutputCol("DestIndex") // .setHandleInvalid("skip") // Only works on Spark 1.6 or later
val encoder = new OneHotEncoder().setInputCol("DestIndex").setOutputCol("DestVec")
val assembler = new VectorAssembler().setInputCols(Array("Year","Month","DayofMonth","DayOfWeek","Distance","DestVec")).setOutputCol("features")

### Let's Build the model: Using SystemML's MLPipeline wrapper. 


In [10]:
import org.apache.spark.ml.Pipeline
import org.apache.sysml.api.ml.LogisticRegression

val lr = new LogisticRegression("log", sc).setRegParam(1e-4).setTol(1e-2).setMaxInnerIter(0).setMaxOuterIter(100)

val pipeline = new Pipeline().setStages(Array(indexer, encoder, assembler, lr))
val model = pipeline.fit(trainDataset)

BEGIN MULTINOMIAL LOGISTIC REGRESSION SCRIPT
Reading X...
Reading Y...
-- Initially:  Objective = 56196.214516717,  Gradient Norm = 4.4298224533061676E7,  Trust Delta = 0.001024586722033724
-- Outer Iteration 1: Had 1 CG iterations
   -- Obj.Reduction:  Actual = 9126.284435011527,  Predicted = 8784.320209556789  (A/P: 1.0389),  Trust Delta = 4.126637606584406E-4
   -- New Objective = 47069.930081705475,  Beta Change Norm = 3.965992001779065E-4,  Gradient Norm = 3433643.5832106518
 
-- Outer Iteration 2: Had 2 CG iterations
   -- Obj.Reduction:  Actual = 104.7197148608102,  Predicted = 102.99000196732598  (A/P: 1.0168),  Trust Delta = 4.126637606584406E-4
   -- New Objective = 46965.210366844665,  Beta Change Norm = 1.0262731974577981E-4,  Gradient Norm = 82609.96051937387
Termination / Convergence condition satisfied.


### Evaluate the model 

Output RMS error on test data

In [11]:
val predictions = model.transform(testDataset.withColumnRenamed("label", "OriginalLabel"))
predictions.select("prediction", "OriginalLabel").show
sqlContext.udf.register("square", (x:Double) => Math.pow(x, 2.0))

+----------+-------------+
|prediction|OriginalLabel|
+----------+-------------+
|       1.0|          2.0|
|       1.0|          2.0|
|       1.0|          2.0|
|       1.0|          2.0|
|       1.0|          2.0|
|       1.0|          2.0|
|       1.0|          2.0|
|       1.0|          2.0|
|       1.0|          1.0|
|       1.0|          1.0|
|       1.0|          2.0|
|       1.0|          1.0|
|       1.0|          2.0|
|       1.0|          2.0|
|       1.0|          1.0|
|       1.0|          2.0|
|       1.0|          1.0|
|       1.0|          2.0|
|       1.0|          1.0|
|       1.0|          2.0|
+----------+-------------+
only showing top 20 rows



UserDefinedFunction(<function1>,DoubleType,List())

In [12]:
predictions.registerTempTable("predictions")
sqlContext.sql("SELECT sqrt(avg(square(OriginalLabel - prediction))) FROM predictions").show

+------------------+
|               _c0|
+------------------+
|0.8573747235188472|
+------------------+



### Perform k-fold cross-validation to tune the hyperparameters

Perform cross-validation to tune the regularization parameter for Logistic regression.

In [13]:
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator}

val crossval = new CrossValidator().setEstimator(pipeline).setEvaluator(new BinaryClassificationEvaluator)
val paramGrid = new ParamGridBuilder().addGrid(lr.regParam, Array(0.1, 1e-3, 1e-6)).build()
crossval.setEstimatorParamMaps(paramGrid)
crossval.setNumFolds(2) // Setting k = 2
val cvmodel = crossval.fit(trainDataset)

BEGIN MULTINOMIAL LOGISTIC REGRESSION SCRIPT
Reading X...
Reading Y...
-- Initially:  Objective = 28068.301929594425,  Gradient Norm = 2.2008280854388464E7,  Trust Delta = 0.001024586722033724
-- Outer Iteration 1: Had 1 CG iterations
   -- Obj.Reduction:  Actual = 4521.557100765323,  Predicted = 4353.477350248426  (A/P: 1.0386),  Trust Delta = 4.115093750302585E-4
   -- New Objective = 23546.7448288291,  Beta Change Norm = 3.9562175519768864E-4,  Gradient Norm = 1690157.626562086
Termination / Convergence condition satisfied.
BEGIN MULTINOMIAL LOGISTIC REGRESSION SCRIPT
Reading X...
Reading Y...
-- Initially:  Objective = 28068.301929594425,  Gradient Norm = 2.2008280854388464E7,  Trust Delta = 0.001024586722033724
-- Outer Iteration 1: Had 1 CG iterations
   -- Obj.Reduction:  Actual = 4521.557100774226,  Predicted = 4353.477350256174  (A/P: 1.0386),  Trust Delta = 4.1150937503107494E-4
   -- New Objective = 23546.7448288202,  Beta Change Norm = 3.9562175519839267E-4,  Gradient Norm 

### Evaluate the cross-validated model

In [14]:
val cvpredictions = cvmodel.transform(testDataset.withColumnRenamed("label", "OriginalLabel"))
cvpredictions.registerTempTable("cvpredictions")
sqlContext.sql("SELECT sqrt(avg(square(OriginalLabel - prediction))) FROM cvpredictions").show