# Flight Delay Prediction Demo Using SystemML

This notebook is based on datascientistworkbench.com's tutorial notebook for predicting flight delay.

## Loading SystemML 

Requirements to run this notebook:
  1. Spark version 2.x 
  2. SystemML version 0.14 or above
  2. Scala kernel (We have tried Toree kernel 0.2.0)
  3. Add SystemML as a jars sub-option in spark_opts option when configure Toree Scala kernel

### Display SystemML version information

In [None]:
import org.apache.sysml.api.mlcontext.MLContext
import org.apache.sysml.api.mlcontext.ScriptFactory.dml
import org.apache.spark.sql.SparkSession

val sparkSession = SparkSession.builder().master("local").appName("Tutorial").getOrCreate()
val ml = new MLContext(sparkSession)

print ("Spark Version: " + sc.version)
print ("\nSystemML Version: " + ml.version())
print ("\nBuild Time: " + ml.buildTime())

## Import Data

Download the airline dataset from stat-computing.org if not already downloaded

In [None]:
import sys.process._
import java.net.URL
import java.io.File
val url = "http://stat-computing.org/dataexpo/2009/2007.csv.bz2"
val localFilePath = "airline2007.csv.bz2"
if(!new java.io.File(localFilePath).exists) {
    new URL(url) #> new File(localFilePath) !!
}

Load the dataset into DataFrame using Spark CSV package

In [None]:
import org.apache.spark.sql.SparkSession

val sparkSession = SparkSession.builder.master("local").appName("spark session example").getOrCreate()
val airline = sparkSession.read.option("header","true").option("inferSchema","true").csv(localFilePath).na.replace("*", Map("NA" -> "0.0"))

In [None]:
airline.printSchema

## Data Exploration
Which airports have the most delays?

In [None]:
airline.registerTempTable("airline")
sparkSession.sql("""SELECT Origin, count(*) conFlight, avg(DepDelay) delay
                    FROM airline
                    GROUP BY Origin
                    ORDER BY delay DESC""").show

## Modeling: Logistic Regression

Predict departure delays of greater than 15 of flights from JFK

In [None]:
import org.apache.spark.SparkContext._
import org.apache.spark.storage.StorageLevel
import sparkSession.implicits._


sparkSession.udf.register("checkDelay", (depDelay:String) => try { if(depDelay.toDouble > 15) 1.0 else 2.0 } catch { case e:Exception => 1.0 })
val tempSmallAirlineData = sparkSession.sql("SELECT *, checkDelay(DepDelay) label FROM airline WHERE Origin = 'JFK'").persist(StorageLevel.MEMORY_AND_DISK)
val popularDest = tempSmallAirlineData.select("Dest").map(y => (y.get(0).toString, 1)).rdd.reduceByKey(_ + _).filter(_._2 > 1000).collect.toMap
sparkSession.udf.register("onlyUsePopularDest", (x:String) => popularDest.contains(x))
tempSmallAirlineData.registerTempTable("tempAirline")
val smallAirlineData = sparkSession.sql("SELECT * FROM tempAirline WHERE onlyUsePopularDest(Dest)")

val datasets = smallAirlineData.randomSplit(Array(0.7, 0.3))
val trainDataset = datasets(0).cache
val testDataset = datasets(1).cache
print ("Training datasize = " + trainDataset.count)
print ("\nTest datasize = " + testDataset.count)

### Feature selection

Encode the destination using one-hot encoding and include the columns Year, Month, DayofMonth, DayOfWeek, Distance

In [None]:
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, VectorAssembler}

val indexer = new StringIndexer().setInputCol("Dest").setOutputCol("DestIndex").setHandleInvalid("skip") // Only works on Spark 1.6 or later
val encoder = new OneHotEncoder().setInputCol("DestIndex").setOutputCol("DestVec")
val assembler = new VectorAssembler().setInputCols(Array("Year","Month","DayofMonth","DayOfWeek","Distance","DestVec")).setOutputCol("features")

### Build the model: Use SystemML's MLPipeline wrapper. 

This wrapper invokes MultiLogReg.dml (for training) and GLM-predict.dml (for prediction). These DML algorithms are available at https://github.com/apache/incubator-systemml/tree/master/scripts/algorithms

In [None]:
import org.apache.spark.ml.Pipeline
import org.apache.sysml.api.ml.LogisticRegression
import org.apache.spark.sql.types.DoubleType

val lr = new LogisticRegression("log", sparkSession.sparkContext).setRegParam(1e-4).setTol(1e-2).setMaxInnerIter(0).setMaxOuterIter(100)
val pipeline = new Pipeline().setStages(Array(indexer, encoder, assembler, lr))
val model = pipeline.fit(trainDataset)

### Evaluate the model 

Output RMS error on test data

In [None]:
val predictions = model.transform(testDataset.withColumnRenamed("label", "OriginalLabel"))
predictions.select("prediction", "OriginalLabel").show
sparkSession.udf.register("square", (x:Double) => Math.pow(x, 2.0))

In [None]:
predictions.registerTempTable("predictions")
sparkSession.sql("SELECT sqrt(avg(square(OriginalLabel - prediction))) FROM predictions").show

### Perform k-fold cross-validation to tune the hyperparameters

Perform cross-validation to tune the regularization parameter for Logistic regression.

In [None]:
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator}

val crossval = new CrossValidator().setEstimator(pipeline).setEvaluator((new BinaryClassificationEvaluator).setRawPredictionCol("prediction"))
val paramGrid = new ParamGridBuilder().addGrid(lr.regParam, Array(0.1, 1e-3, 1e-6)).build()
crossval.setEstimatorParamMaps(paramGrid)
crossval.setNumFolds(2) // Setting k = 2
val cvmodel = crossval.fit(trainDataset)

### Evaluate the cross-validated model

In [None]:
val cvpredictions = cvmodel.transform(testDataset.withColumnRenamed("label", "OriginalLabel"))
cvpredictions.registerTempTable("cvpredictions")
sparkSession.sql("SELECT sqrt(avg(square(OriginalLabel - prediction))) FROM cvpredictions").show

## Homework ;)

Read http://apache.github.io/incubator-systemml/algorithms-classification.html#multinomial-logistic-regression and perform cross validation on other hyperparameters: for example: icpt, tol, maxOuterIter, maxInnerIter