## Group 8 Assignment Phase 3

In [1]:
//Start a simple Spark Session
import org.apache.spark.sql.{SparkSession, DataFrame}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql._

//Feature Processing Classes
import org.apache.spark.ml.feature.{VectorAssembler,StringIndexer,VectorIndexer,OneHotEncoder, PCA}

//Linear Algebra Data Structures
import org.apache.spark.ml.linalg.{Vector,Vectors}

//Model Building Pipeline
import org.apache.spark.ml.{Pipeline, PipelineStage, PipelineModel}

//Binary Classification
import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel,
                                           RandomForestClassifier, GBTClassifier,
                                           DecisionTreeClassifier, DecisionTreeClassificationModel}
//Model Training
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.tuning.{CrossValidator, CrossValidatorModel, 
                                   ParamGridBuilder, TrainValidationSplit}

//Model Evaluation
import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator,MulticlassClassificationEvaluator}

//Optional: Use the following code below to set the Error reporting
import org.apache.log4j._
Logger.getLogger("org").setLevel(Level.ERROR)


//For Cleaning
//import scala.util.matching.Regex

val spark = SparkSession.builder().appName("Group 8 ML Phase 3").getOrCreate()

Intitializing Scala interpreter ...

Spark Web UI available at http://imac.modem:4041
SparkContext available as 'sc' (version = 2.4.5, master = local[*], app id = local-1591169107792)
SparkSession available as 'spark'


import org.apache.spark.sql.{SparkSession, DataFrame}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.ml.feature.{VectorAssembler, StringIndexer, VectorIndexer, OneHotEncoder, PCA}
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.{Pipeline, PipelineStage, PipelineModel}
import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel, RandomForestClassifier, GBTClassifier, DecisionTreeClassifier, DecisionTreeClassificationModel}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.tuning.{CrossValidator, CrossValidatorModel, ParamGridBuilder, TrainValidationSplit}
import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator, Multic...

## Read in a parquet file of flight delay, fuel-price and meteorological data

In [2]:
val flights = (spark
            .read.parquet("flightDelay.parquet")
            .withColumn("Month_Num1", $"Month_Num" cast "Int")
            //convert month and year to integer index starting Jan 2004
            .withColumn("Date_Num",  ($"Year"-2004)*12 + $"Month_Num1")
            .drop("Sectors_Flown", "Month_Num1", "Change")
            .withColumnRenamed("Departures_Delayed","label")
            .withColumnRenamed("Price","Fuel_Price")
            //drop NA's even though none were found!
            .na.drop()
            //.cache
              )

flights.printSchema()

root
 |-- Departing_Port: string (nullable = true)
 |-- Arriving_Port: string (nullable = true)
 |-- Airline: string (nullable = true)
 |-- label: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month_Num: string (nullable = true)
 |-- Fuel_Price: double (nullable = true)
 |-- Departing_Port_station_ID: string (nullable = true)
 |-- Departing_Port_station_name: string (nullable = true)
 |-- Arriving_Port_station_ID: string (nullable = true)
 |-- Arriving_Port_station_name: string (nullable = true)
 |-- Mean_3pm_cloud_cover_oktas_Depart: double (nullable = true)
 |-- Mean_3pm_dew_point_temperature_Degrees_C_Depart: double (nullable = true)
 |-- Mean_3pm_relative_humidity_%_Depart: double (nullable = true)
 |-- Mean_3pm_temperature_Degrees_C_Depart: double (nullable = true)
 |-- Mean_3pm_wet_bulb_temperature_Degrees_C_Depart: double (nullable = true)
 |-- Mean_3pm_wind_speed_km/h_Depart: double (nullable = true)
 |-- Mean_9am_cloud_cover_okas_Depart: double (nullable 

flights: org.apache.spark.sql.DataFrame = [Departing_Port: string, Arriving_Port: string ... 72 more fields]


## Take a look at the degree of imbalance in the dataset

In [None]:
val counts = flights.groupBy("label").count()

println("proportion of lates (label=1) in the sample")
counts.show()

## Split The Data into training and testing dataframes

In [3]:
//Filter out the most recent 12 months of flight data as the test dataset
//Dates after March 2019 have Date_Num > 183
val testing = flights.filter($"Date_Num"> 183)
println(s"Test Set of the Most Recent 12 Months has ${testing.count()} records")

//Filter out rows prior to the most recent 12 months of flight data as the training dataset
val rawTraining = flights.filter($"Date_Num" < 184)

//val Array(imbalancedTraining, testing) = flights.randomSplit(Array(0.7, 0.3), seed = 12345)

Test Set of the Most Recent 12 Months has 435479 records


testing: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Departing_Port: string, Arriving_Port: string ... 72 more fields]
rawTraining: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Departing_Port: string, Arriving_Port: string ... 72 more fields]


## Down sample the Ontime Departures To Balance The Training data

In [4]:
val ontimeTrainingFlights = rawTraining.filter($"label"===0)
println(s"On time Training Flights: ${ontimeTrainingFlights.count()}")

val delayedTrainingFlights = rawTraining.filter($"label"===1)
println(s"Delayed Training Flights: ${delayedTrainingFlights.count()}")

val sampledOntimeTrainingFlights = ontimeTrainingFlights.sample(false, 0.2)  

println(s"Down Sampled ontime Training Flights: ${sampledOntimeTrainingFlights.count()}")

val sampleFraction = 0.1
//Concatenate rows of ontimeTrainingFlights and delayedTrainingFlights
val training = (sampledOntimeTrainingFlights
                .union(delayedTrainingFlights)
                .sample(false, sampleFraction))
               
val resampledCounts = training.groupBy("label").count()
println("proportion of lates (label=1) in the sample")
resampledCounts.show()

On time Training Flights: 4884963
Delayed Training Flights: 976455
Down Sampled ontime Training Flights: 976650
proportion of lates (label=1) in the sample
+-----+-----+
|label|count|
+-----+-----+
|    1|97602|
|    0|97793|
+-----+-----+



ontimeTrainingFlights: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Departing_Port: string, Arriving_Port: string ... 72 more fields]
delayedTrainingFlights: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Departing_Port: string, Arriving_Port: string ... 72 more fields]
sampledOntimeTrainingFlights: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Departing_Port: string, Arriving_Port: string ... 72 more fields]
sampleFraction: Double = 0.1
training: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Departing_Port: string, Arriving_Port: string ... 72 more fields]
resampledCounts: org.apache.spark.sql.DataFrame = [label: int, count: bigint]


## Model Assessment via a Confusion Matrix Method

In [5]:
def getConfusionMatrix(predictionDF: DataFrame): Unit = {
    val eval = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction")
    println(s"Accuracy: ${eval.setMetricName("accuracy").evaluate(predictionDF)}")
    println(s"Weighted Precision: ${eval.setMetricName("weightedPrecision").evaluate(predictionDF)}")
    println(s"Weighted Recall: ${eval.setMetricName("weightedRecall").evaluate(predictionDF)}")
    println(s"F1: ${eval.setMetricName("f1").evaluate(predictionDF)}")

    val TP = predictionDF.select("label", "prediction").filter("label = 1 and prediction = 1").count
    val TN = predictionDF.select("label", "prediction").filter("label = 0 and prediction = 0").count
    val FP = predictionDF.select("label", "prediction").filter("label = 0 and prediction = 1").count
    val FN = predictionDF.select("label", "prediction").filter("label = 1 and prediction = 0").count
    val total = predictionDF.select("label").count.toDouble
    // Unweighted Metrics
    val accuracy    = (TP + TN) / total
    val precision   = (TP + FP) / total
    val recall      = (TP + FN) / total
    val F1 = 2/(1/precision + 1/recall)
    println(s"Accuracy: ${accuracy}")
    println(s"Precision: ${precision}")
    println(s"Recall: ${recall}")
    println(s"F1: ${F1}")

    // Confusion matrix
    printf(s"""|=================== Confusion Matrix ==========================
           |##########| %-15s                     %-15s
           |----------+----------------------------------------------------
           |Actual = 0| %-15d                     %-15d
           |Actual = 1| %-15d                     %-15d
           |===============================================================
         """.stripMargin, "Predicted = 0", "Predicted = 1", TN, FP, FN, TP)


}


getConfusionMatrix: (predictionDF: org.apache.spark.sql.DataFrame)Unit


## Set up the ML Pipleline Logistic Regression

In [6]:
//////////////////////////////////////////////////
//// Setting Up DataFrame for Machine Learning ///
//////////////////////////////////////////////////

// Deal with Categorical Columns
val categoricalVariables = Array(
    "Departing_Port", "Arriving_Port", "Airline")
val categoricalIndexers = categoricalVariables
  .map(i => new StringIndexer().setInputCol(i).setOutputCol(i+"_Index"))
val categoricalEncoders = categoricalVariables
  .map(e => new OneHotEncoder().setInputCol(e + "_Index").setOutputCol(e + "_Vec"))


// columns that need to be added to the features vector
val cols = Array("Date_Num",  "Airline_Vec", "Fuel_Price",
    "Departing_Port_Vec", "Mean_daily_wind_run_km_Depart", "Mean_rainfall_mm_Depart",
    "Mean_number_of_days_of_rain_Depart","Mean_number_of_days_>_40_Degrees_C_Depart",
    "Arriving_Port_Vec")

// Assemble everything together to be ("label","features") format
val assembler = (new VectorAssembler()
                 .setInputCols(cols)
                 .setOutputCol("indexedFeatures"))

// Choose linear combinations of explanatory variables that explain the most variance in the training data
val pca = new PCA()
    .setInputCol(assembler.getOutputCol)
        //"indexedFeatures")
    .setOutputCol("features").setK(9)

// Train a Logistic Regression model.
val lr = new LogisticRegression()
        .setStandardization(true)
        .setLabelCol("label")
        .setFeaturesCol("features")

// Print out the parameters, documentation, and any default values.
println(s"LogisticRegression parameters:\n ${lr.explainParams()}\n")

//////////////////////////////////////////////
//   Define and construct the ML Pipeline  ///
//////////////////////////////////////////////

val stages: Array[PipelineStage] = categoricalIndexers ++ categoricalEncoders ++ Array(assembler, pca, lr)
//val stages: Array[PipelineStage] = categoricalIndexers ++ categoricalEncoders ++ Array(assembler, lr)

// build the pipeline
val pipeline = new Pipeline().setStages(stages)


LogisticRegression parameters:
 aggregationDepth: suggested depth for treeAggregate (>= 2) (default: 2)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty (default: 0.0)
family: The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial. (default: auto)
featuresCol: features column name (default: features, current: features)
fitIntercept: whether to fit an intercept term (default: true)
labelCol: label column name (default: label, current: label)
lowerBoundsOnCoefficients: The lower bounds on coefficients if fitting under bound constrained optimization. (undefined)
lowerBoundsOnIntercepts: The lower bounds on intercepts if fitting under bound constrained optimization. (undefined)
maxIter: maximum number of iterations (>= 0) (default: 100)
predictionCol: prediction column name (default: prediction)
probabilityCol

categoricalVariables: Array[String] = Array(Departing_Port, Arriving_Port, Airline)
categoricalIndexers: Array[org.apache.spark.ml.feature.StringIndexer] = Array(strIdx_5fed8308a072, strIdx_1ca06f8e2bb6, strIdx_fcaded19f502)
categoricalEncoders: Array[org.apache.spark.ml.feature.OneHotEncoder] = Array(oneHot_01b0873b8d15, oneHot_197ce11a5b17, oneHot_1941a45672db)
cols: Array[String] = Array(Date_Num, Airline_Vec, Fuel_Price, Departing_Port_Vec, Mean_daily_wind_run_km_Depart, Mean_rainfall_mm_Depart, Mean_number_of_days_of_rain_Depart, Mean_number_of_days_>_40_Degrees_C_Depart, Arriving_Port_Vec)
assembler: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_8a50dea4dfeb
pca: org.apache.spark.ml.feature.PCA = pca_ffb7754cf879
lr: org.apache.spark.ml.classification.LogisticRegressi...

## Train the Logistic Regression Pipeline using Cross Validation

In [None]:
// We use a ParamGridBuilder to construct a grid of parameters to search over.
val paramGrid = new ParamGridBuilder()
  .addGrid(lr.regParam, Array(0.01))
  .addGrid(lr.threshold, (for (i <- 45 to 55) yield i.toFloat / 100).toArray)
  .addGrid(lr.tol, Array(0.000001))
  .addGrid(lr.elasticNetParam, Array(0.0))
  .build()

// We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
// This will allow us to jointly choose parameters for all Pipeline stages.
// A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
// Note that the evaluator here is a BinaryClassificationEvaluator and its default metric
// is areaUnderROC.
val cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEvaluator(new BinaryClassificationEvaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(10)  // Use 3+ in practice
  //.setParallelism(2)  // Evaluate up to 2 parameter settings in parallel


// Run cross-validation, and choose the best set of parameters.
val model = cv.fit(training)

// Print the coefficients and intercept for logistic regression
//println(s"*******************************************\nCoefficients: ${model.coefficients} Intercept: ${model.intercept}")
// Since model is a Model (i.e., a Transformer produced by an Estimator),
// we can view the parameters it used during fit().
// This prints the parameter (name: value) pairs, where names are unique IDs for this LogisticRegression instance.

println(s"ParamMap: ${model.parent.extractParamMap}")

//Get Coeffs of the Best Logistic Regression Model
val bestModel = model.bestModel match {
  case pm: PipelineModel => Some(pm)
  case _ => None
}

val ml = bestModel
  .map(_.stages.collect { case ml: LogisticRegressionModel => ml })
  .flatMap(_.headOption)

// Get best CV Model Parameters
print("Intercept: ")
println(ml.map(m => (m.intercept)).get)
print("Coefficients: ")
println(ml.map(m => (m.coefficients)).get)
print("ElasticNetParam: ")
println(ml.map(m => m.getElasticNetParam).get)
print("RegParam: ")
println(ml.map(m => m.getRegParam).get)
print("Threshold: ")
println(ml.map(m => m.getThreshold).get)

// Get Results on Test Set
val results = model.transform(testing)

// Take a look at the predictions
results.select ("features", "probability", "prediction", "label").show(10)

// Measure the quality of the predictions
getConfusionMatrix(results)

## Store The Best CV Trained Logistic Model for Reuse

In [None]:
//Persist the Model
//model.write.overwrite().save("./flightDelayModel/")
//val results: DataFrame = CrossValidatorModel
//  .load("./flightDelayModel/")
//  .transform(testing)
//  .select(
//    col("features"),
//    col("label"),
//    col("prediction")
//  )

//results.show()

## Train the Logistic Regression Pipleine using a Train - Validation Split

In [None]:
val paramGrid = new ParamGridBuilder()
  .addGrid(lr.regParam, Array(0.01))
  .addGrid(lr.threshold, (for (i <- 45 to 55) yield i.toFloat / 100).toArray)
  .addGrid(lr.tol, Array(0.000001))
  .addGrid(lr.elasticNetParam, Array(0.0,0.1))
  .build()

val tvs = new TrainValidationSplit()
  .setEstimator(pipeline)
  .setEvaluator(new BinaryClassificationEvaluator)
  .setEstimatorParamMaps(paramGrid)
  .setTrainRatio(0.75)

//Train the Model
val model = tvs.fit(training)

println(s"***model was fit using parameters: ${model.parent.extractParamMap}")



In [None]:
// Get Results on Test Set
val results = model.transform(testing.sample(false,0.1))

// Make Predictions on the Test Dataset
val predictions = results.select ("features", "label", "prediction")


getConfusionMatrix(predictions)
predictions.show(10)

//Get Coeffs of the Best Logistic Regression Model
val bestModel = model.bestModel match {
  case pm: PipelineModel => Some(pm)
  case _ => None
}

val ml = bestModel
  .map(_.stages.collect { case ml: LogisticRegressionModel => ml })
  .flatMap(_.headOption)

print("Intercept: ")
println(ml.map(m => (m.intercept)).get)
print("Coefficients: ")
println(ml.map(m => (m.coefficients)).get)
print("ElasticNetParam: ")
println(ml.map(m => m.getElasticNetParam).get)
print("RegParam: ")
println(ml.map(m => m.getRegParam).get)
print("Threshold: ")
println(ml.map(m => m.getThreshold).get)

In [None]:
//import spark.implicits._
//import org.apache.spark.sql._
//case class cls_Employee(name:String, sector:String, age:Int)
//val df = Seq(
//    cls_Employee("Andy","aaa", 20), 
//    cls_Employee("Berta","bbb", 30), 
//    cls_Employee("Joe","ccc", 40)
//).toDF()

//df.as[cls_Employee]
//    .take(df.count.toInt)
//    .foreach(t => println(s"name=${t.name}, sector=${t.sector}, age=${t.age}"))

## Prototype a Decision Tree Classifier

In [None]:
// Train a DecisionTree model.
val dt = new DecisionTreeClassifier()
        .setLabelCol("label")
        .setFeaturesCol("features")
        .setMaxDepth(5)
println(s"DecisionTree parameters:\n ${dt.explainParams()}\n")

val stages: Array[PipelineStage] = categoricalIndexers ++ categoricalEncoders ++ Array(assembler, pca, dt)
// build the pipeline
val dtPipeline = new Pipeline().setStages(stages)

// build the pipeline
val dtModel = dtPipeline.fit(training)
val dtPredictions = dtModel.transform(testing)
dtPredictions.select("prediction", "label", "features").show(20)
getConfusionMatrix(dtPredictions)

//https://docs.databricks.com/applications/machine-learning/mllib/decision-trees.html

In [None]:
// Import the ML algorithms we will use.
import org.apache.spark.ml.classification.{DecisionTreeClassifier, DecisionTreeClassificationModel}
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.Pipeline
// StringIndexer: Read input column "label" (digits) and annotate them as categorical values.
val indexer = new StringIndexer().setInputCol("label").setOutputCol("indexedLabel")
// DecisionTreeClassifier: Learn to predict column "indexedLabel" using the "features" column.
val dtc = new DecisionTreeClassifier().setLabelCol("indexedLabel")
// Chain indexer + dtc together into a single ML Pipeline.
val pipeline = new Pipeline().setStages(Array(indexer, dtc))
val model = pipeline.fit(training)
// The tree is the last stage of the Pipeline.  Display it!
val tree = model.stages.last.asInstanceOf[DecisionTreeClassificationModel]

val variedMaxDepthModels = (0 until 8).map { maxDepth =>
  // For this setting of maxDepth, learn a decision tree.
  dtc.setMaxDepth(maxDepth)
  // Create a Pipeline with our feature processing stage (indexer) plus the tree algorithm
  val pipeline = new Pipeline().setStages(Array(indexer, dtc))
  // Run the ML Pipeline to learn a tree.
  pipeline.fit(training)
}
// Define an evaluation metric.  In this case, we will use "weightedPrecision", which is equivalent to 0-1 accuracy.
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
val evaluator = new MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setMetricName("weightedPrecision")
// For each maxDepth setting, make predictions on the test data, and compute the accuracy metric.
val accuracies = (0 until 8).map { maxDepth =>
  val model = variedMaxDepthModels(maxDepth)
  // Calling transform() on the test set runs the fitted pipeline.
  // The learned model makes predictions on each test example.
  val predictions = model.transform(test)
  // Calling evaluate() on the predictions DataFrame computes our accuracy metric.
  (maxDepth, evaluator.evaluate(predictions))
}.toDF("maxDepth", "accuracy")

accuracies.show()

## GBT Model

In [None]:
// Train a GBT model.
val gbt = new GBTClassifier()
  .setLabelCol("label")
  .setFeaturesCol("features")
  .setMaxIter(10)
  .setFeatureSubsetStrategy("auto")

println(s"DecisionTree parameters:\n ${gbt.explainParams()}\n")

val stages: Array[PipelineStage] = categoricalIndexers ++ categoricalEncoders ++ Array(assembler, pca, gbt)
// build the pipeline
val gbtPipeline = new Pipeline().setStages(stages)

// build the pipeline
val gbtModel = gbtPipeline.fit(training)
val gbtPredictions = gbtModel.transform(testing.sample(false,0.1))
gbtPredictions.select("prediction", "label", "features").show(20)
getConfusionMatrix(gbtPredictions)

## TPR_FPR DataFrame

In [None]:
val variedThresholdModels = (0 to 20).map { threshold => 
    lr.setThreshold(threshold * 0.05)      
    val stages: Array[PipelineStage] = categoricalIndexers ++ categoricalEncoders ++ Array(assembler, pca, lr)
    // build the pipeline
    val pipeline = new Pipeline().setStages(stages)
    pipeline.fit(training)
}

// Define an evaluation metric.  In this case, we will use "weightedPrecision", which is equivalent to 0-1 accuracy.
//import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
//val evaluator = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("weightedPrecision")
// For each threshold setting, make predictions on the test data, and compute the accuracy metric.
val accuracies = (0 to 20).map { threshold =>
    val model = variedThresholdModels(threshold)
    // Calling transform() on the test set runs the fitted pipeline.
    // The learned model makes predictions on each test example.
    val predictions = model.transform(testing.sample(false,0.1))
    // Calling evaluate() on the predictions DataFrame computes our accuracy metric.
    val TP = predictions.select("label", "prediction").filter("label = 1 and prediction = 1").count
    val TN = predictions.select("label", "prediction").filter("label = 0 and prediction = 0").count
    val FP = predictions.select("label", "prediction").filter("label = 0 and prediction = 1").count
    val FN = predictions.select("label", "prediction").filter("label = 1 and prediction = 0").count
    val TPR = TP/(1D* (TP+FN))
    val FPR = FP/(1D* (FP+TN))
    (threshold * 0.05, TP, TN, FP, FN, FPR, TPR)
}.toDF("threshold", "TP", "TN", "FP", "FN", "FPR", "TPR")


accuracies.show()

//Write ROC data to file
accuracies
    .coalesce(1)
    .write
    .option("header","true")
    .option("sep",",")
    .mode("overwrite")
    .csv("/Users/tod/Documents/_GDDS/_FIT5202/big_data/FlightData/ROC.txt")