In [None]:
val dfFlight = spark.read.sqlanalytics("edw.dbo.hyssh_flighttransaction") 
val dfweather = spark.read.sqlanalytics("edw.dbo.hyssh_weather")
val dfairport = spark.read.sqlanalytics("edw.dbo.hyssh_airport")

In [None]:
spark.sql("CREATE DATABASE IF NOT EXISTS ##REPLACETHIS##_flight_db")
spark.catalog.setCurrentDatabase("##REPLACETHIS##_flight_db")

In [None]:
dfFlight.write.format("parquet").mode("overwrite").saveAsTable("flight_history")
dfweather.write.format("parquet").mode("overwrite").saveAsTable("weather_history_curated")
dfairport.write.format("parquet").mode("overwrite").saveAsTable("airport_master")

In [None]:
%%sql

select distinct  
fhc.dep_delay_15,
amo.airport_id as origin_airport_id,
fhc.origin_airport_cd,
amo.display_airport_name as origin_airport_nm,
amo.latitude as origin_airport_latitude,
amo.longitude as origin_airport_longitude,
fhc.month as flight_month,
fhc.day_of_month as flight_day_of_month,
fhc.day_of_week as flight_day_of_week,
fhc.crs_dep_tm as flight_dep_hour, 
fhc.carrier_cd,
amd.airport_id as dest_airport_id,
fhc.dest_airport_cd,
amd.display_airport_name as dest_airport_nm,
amd.latitude as dest_airport_latitude,
amd.longitude as dest_airport_longitude,
whc.wind_speed,
whc.sea_level_pressure,
whc.hourly_precip
from 
flight_history fhc
left outer join
airport_master amo
on (fhc.origin_airport_cd=amo.airport)
left outer join
airport_master amd
on (fhc.dest_airport_cd=amd.airport)
left outer join
weather_history_curated whc
on (whc.latitude=amo.latitude AND
    whc.longitude=amo.longitude AND
    fhc.day_of_month=whc.day AND
    fhc.month=whc.month AND
    fhc.crs_dep_tm=whc.hour)
WHERE fhc.dep_delay_15 is not null

In [None]:
val matViewDF = spark.sqlContext.sql("""
select distinct  
fhc.dep_delay_15,
amo.airport_id as origin_airport_id,
fhc.origin_airport_cd,
amo.display_airport_name as origin_airport_nm,
amo.latitude as origin_airport_latitude,
amo.longitude as origin_airport_longitude,
fhc.month as flight_month,
fhc.day_of_month as flight_day_of_month,
fhc.day_of_week as flight_day_of_week,
fhc.crs_dep_tm as flight_dep_hour, 
fhc.carrier_cd,
amd.airport_id as dest_airport_id,
fhc.dest_airport_cd,
amd.display_airport_name as dest_airport_nm,
amd.latitude as dest_airport_latitude,
amd.longitude as dest_airport_longitude,
whc.wind_speed,
whc.sea_level_pressure,
whc.hourly_precip
from 
flight_history fhc
left outer join
airport_master amo
on (fhc.origin_airport_cd=amo.airport)
left outer join
airport_master amd
on (fhc.dest_airport_cd=amd.airport)
left outer join
weather_history_curated whc
on (whc.latitude=amo.latitude AND
    whc.longitude=amo.longitude AND
    fhc.day_of_month=whc.day AND
    fhc.month=whc.month AND
    fhc.crs_dep_tm=whc.hour)
WHERE fhc.dep_delay_15 is not null
""").cache()

In [None]:
matViewDF.write.format("parquet").mode("overwrite").saveAsTable("##REPLACETHIS##_flight_db.hyssh_materialized_view")

In [None]:
import org.apache.spark.mllib.regression.LabeledPoint 
import org.apache.spark.mllib.linalg.Vectors 
import org.apache.spark.mllib.util.MLUtils 
import org.apache.spark.ml.feature.StringIndexer 
import org.apache.spark.ml.feature.VectorAssembler 
import org.apache.spark.mllib.linalg.Vectors 
import org.apache.spark.storage.StorageLevel 

In [None]:
val modelInputDF = spark.sqlContext.sql("""
select distinct 
cast(dep_delay_15 as double),
cast(origin_airport_id as double),
cast(flight_month as double),
cast(flight_day_of_month as double),
cast(flight_day_of_week as double),
cast(flight_dep_hour as double),
cast(dest_airport_id as double),
cast(wind_speed as double),
cast(sea_level_pressure as double),
cast(hourly_precip as double) 
from materialized_view
""").na.drop.cache()

In [None]:
val cols = Array("origin_airport_id", "flight_month", "flight_day_of_month","flight_day_of_week", "flight_dep_hour",  "dest_airport_id", "wind_speed","sea_level_pressure","hourly_precip")
val datawithFeatures = new VectorAssembler().setInputCols(cols).setOutputCol("features").transform(modelInputDF)

val labelDf = new StringIndexer().setInputCol("dep_delay_15").setOutputCol("label").fit(datawithFeatures).transform(datawithFeatures)
labelDf.show()

In [None]:
import org.apache.spark.mllib.regression.LabeledPoint 
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
import org.apache.spark.ml.tuning.{ParamGridBuilder,CrossValidator}
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator


In [None]:
val Array(trainingDataset, testDataset) = labelDf.randomSplit(Array(0.75, 0.25), seed =1234)

In [None]:
val randForest = new RandomForestClassifier().setLabelCol("label").setFeaturesCol("features").setNumTrees(100).setMaxDepth(10).setMaxBins(160)

val model = randForest.fit(trainingDataset.toDF())

In [None]:
// Make predictions on the test dataset
val predictions = model.transform(testDataset)

// Select example rows to display.
predictions.select("prediction", "label", "probability" ,"features").show(5)

In [None]:
// Select (prediction, true label) and compute test error.
val evaluator = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("accuracy")

val accuracy = evaluator.evaluate(predictions)
println("Test Error = " + (1.0 - accuracy))
//0.794150404516767

In [None]:
//* Binary classification evaluation metrics*//
val evaluator = new BinaryClassificationEvaluator().setLabelCol("label").setRawPredictionCol("probability").setMetricName("areaUnderROC")
val ROC = evaluator.evaluate(predictions)
println("ROC on test data = " + ROC)
//0.7348870570394918

In [None]:
//Print model 
println("Learned classification forest model:\n" + model.toDebugString)

In [None]:
model.write.overwrite.save("abfss://sampledataset@REPLACETHIS.dfs.core.windows.net/model/hyssh/flightdelay")

In [None]:
//FEATURE IMPORTANCE
model.featureImportances

//0 - origin_airport_id - 0.061617093986384194
//1 - flight_month - 0.04194211627845236
//2 - flight_day_of_month - 0.11509911942769702
//3 - flight_day_of_week - 0.061531660019990865
//4 - flight_dep_hour - 0.4371281475075368
//5 - carrier_indx - 0.05770819357125712
//6 - dest_airport_id - 0.01769814715174663
//7 - wind_speed - 0.02170460496570623
//8 - sea_level_pressure - 0.12510959002658545
//9 - hourly_precip - 0.060461327064643425