## Group 8 Assignment Phase 3 Cross Validated Random Forest Classifiers - Angus

* Do the followings in HDFS:

* Remove any folder/files in /tmp that starts with flightData_

* Create folder /tmp/flightData_in/

* Put the parquet dataset file into /tmp/flightData_in/

* Make sure put was successfull (it should have the same size as the local file)!

In [1]:
! hadoop fs -chmod -R 777 hdfs://localhost:9000/tmp
! hadoop fs -rm    -r  hdfs://localhost:9000/tmp/flightData_*
! hadoop fs -mkdir -p  hdfs://localhost:9000/tmp/flightData_in
! hadoop fs -put   -p  flightDelay.parquet hdfs://localhost:9000/tmp/flightData_in
! hadoop fs -ls        hdfs://localhost:9000/tmp/flightData_in/

20/06/08 10:28:37 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.


Deleted hdfs://localhost:9000/tmp/flightData_in


Found 1 items


drwxrwxr-x   - root root          0 2020-05-24 03:38 hdfs://localhost:9000/tmp/flightData_in/flightDelay.parquet




In [2]:
!hdfs getconf -confKey fs.defaultFS

hdfs://localhost:9000



In [3]:
//Start a simple Spark Session
import org.apache.spark.sql.{SparkSession, DataFrame}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.ml.attribute._

//Feature pre-Processing Classes
import org.apache.spark.ml.feature.{VectorAssembler,StringIndexer,
                                    VectorIndexer,OneHotEncoder, PCA, Normalizer}

//Linear Algebra Data Structures
import org.apache.spark.ml.linalg.{Vector,Vectors}

//Model Building Pipeline
import org.apache.spark.ml.{Pipeline, PipelineStage, PipelineModel}

//Binary Classification
import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel, BinaryLogisticRegressionSummary,
                                           RandomForestClassifier, GBTClassifier,
                                           DecisionTreeClassifier, DecisionTreeClassificationModel, MultilayerPerceptronClassifier}
//Model Training
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.tuning.{CrossValidator, CrossValidatorModel, 
                                   ParamGridBuilder, TrainValidationSplit}

//Model Evaluation
import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator,MulticlassClassificationEvaluator}

//Optional: Use the following code below to set the Error reporting
import org.apache.log4j._
Logger.getLogger("org").setLevel(Level.ERROR)


//For Cleaning
//import scala.util.matching.Regex

val spark = SparkSession.builder().appName("Group 8 ML Phase 3").getOrCreate()

Intitializing Scala interpreter ...

Spark Web UI available at http://8c79e9d02429:4043
SparkContext available as 'sc' (version = 2.4.5, master = local[*], app id = local-1591612134983)
SparkSession available as 'spark'


import org.apache.spark.sql.{SparkSession, DataFrame}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.ml.attribute._
import org.apache.spark.ml.feature.{VectorAssembler, StringIndexer, VectorIndexer, OneHotEncoder, PCA, Normalizer}
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.{Pipeline, PipelineStage, PipelineModel}
import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel, BinaryLogisticRegressionSummary, RandomForestClassifier, GBTClassifier, DecisionTreeClassifier, DecisionTreeClassificationModel, MultilayerPerceptronClassifier}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.tuning.{CrossValidator, CrossValidatorModel, ...

## Read in a parquet file of flight delay, fuel-price and meteorological data

In [4]:
val flights = (spark
            .read.parquet("flightDelay.parquet")
            //.read.parquet("hdfs://localhost:9000/tmp/flightData_in/flightDelay.parquet") //uncomment to load from hdfs
            .withColumn("Month_Num1", $"Month_Num" cast "Int")
            //convert month and year to integer index starting Jan 2004
            .withColumn("Date_Num",  ($"Year"-2004)*12 + $"Month_Num1")
            .drop("Sectors_Flown", "Month_Num1", "Change")
            .withColumnRenamed("Departures_Delayed","label")
            .withColumnRenamed("Price","Fuel_Price")
            //drop NA's even though none were found!
            .na.drop()
            .sample(false, 0.2) //delete this in production mode
            //.cache
              )

flights.printSchema()

root
 |-- Departing_Port: string (nullable = true)
 |-- Arriving_Port: string (nullable = true)
 |-- Airline: string (nullable = true)
 |-- label: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month_Num: string (nullable = true)
 |-- Fuel_Price: double (nullable = true)
 |-- Departing_Port_station_ID: string (nullable = true)
 |-- Departing_Port_station_name: string (nullable = true)
 |-- Arriving_Port_station_ID: string (nullable = true)
 |-- Arriving_Port_station_name: string (nullable = true)
 |-- Mean_3pm_cloud_cover_oktas_Depart: double (nullable = true)
 |-- Mean_3pm_dew_point_temperature_Degrees_C_Depart: double (nullable = true)
 |-- Mean_3pm_relative_humidity_%_Depart: double (nullable = true)
 |-- Mean_3pm_temperature_Degrees_C_Depart: double (nullable = true)
 |-- Mean_3pm_wet_bulb_temperature_Degrees_C_Depart: double (nullable = true)
 |-- Mean_3pm_wind_speed_km/h_Depart: double (nullable = true)
 |-- Mean_9am_cloud_cover_okas_Depart: double (nullable 

flights: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Departing_Port: string, Arriving_Port: string ... 72 more fields]


## Split The Data into training and testing dataframes

In [5]:
//Filter out the most recent 12 months of flight data as the test dataset
val testing = flights.filter($"Date_Num"> 183).cache()
println(s"Test Set of the Most Recent 12 Months has ${testing.count()} records")

//Filter out rows prior to the most recent 12 months of flight data as the training dataset
val rawTraining = flights.filter($"Date_Num" < 184)

Test Set of the Most Recent 12 Months has 86981 records


testing: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Departing_Port: string, Arriving_Port: string ... 72 more fields]
rawTraining: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Departing_Port: string, Arriving_Port: string ... 72 more fields]


## Down sample the Ontime Departures To Balance The Training data

In [6]:
val ontimeTrainingFlights = rawTraining.filter($"label"===0)
println(s"On time Training Flights: ${ontimeTrainingFlights.count()}")

val delayedTrainingFlights = rawTraining.filter($"label"===1)
println(s"Delayed Training Flights: ${delayedTrainingFlights.count()}")

//ontime:delayed approx 5:1 so take a random sample of size fifth of the ontime departures
val downSampleFraction = 0.2
val sampledOntimeTrainingFlights = ontimeTrainingFlights.sample(false, downSampleFraction)  

println(s"Down Sampled ontime Training Flights: ${sampledOntimeTrainingFlights.count()}")

//down sample resulting training set for the purposes of local testing
val localTestingSampleFraction = 0.1
//Concatenate rows of ontimeTrainingFlights and delayedTrainingFlights
val training = (sampledOntimeTrainingFlights
                .union(delayedTrainingFlights)
                .sample(false, localTestingSampleFraction)
                .cache())
               
val resampledCounts = training.groupBy("label").count()
println("proportion of lates (label=1) in the sample")
resampledCounts.show()

On time Training Flights: 978002
Delayed Training Flights: 195310
Down Sampled ontime Training Flights: 195219
proportion of lates (label=1) in the sample
+-----+-----+
|label|count|
+-----+-----+
|    1|19378|
|    0|19522|
+-----+-----+



ontimeTrainingFlights: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Departing_Port: string, Arriving_Port: string ... 72 more fields]
delayedTrainingFlights: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Departing_Port: string, Arriving_Port: string ... 72 more fields]
downSampleFraction: Double = 0.2
sampledOntimeTrainingFlights: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Departing_Port: string, Arriving_Port: string ... 72 more fields]
localTestingSampleFraction: Double = 0.1
training: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Departing_Port: string, Arriving_Port: string ... 72 more fields]
resampledCounts: org.apache.spark.sql.DataFrame = [label: int, count: bigint]


## Model Assessment via a Confusion Matrix

In [7]:
def getConfusionMatrix(predictionDF: DataFrame): Unit = {
    
    println("========================Model Assessment Metrics==================================================\n")
    // Define Binary Classification Evaluator
    val binaryEval = new BinaryClassificationEvaluator().setLabelCol("label").setRawPredictionCol("rawPrediction")
    // Run Evaluation.  The area under the ROC curve ranges from 0.5 and 1.0 with larger values indicative of better fit
    println(s"Area under ROC: ${binaryEval.setMetricName("areaUnderROC").evaluate(predictionDF)}")
    // Define Multiclass Classification Evaluator
    val multiEval = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction")
    println(s"Accuracy: ${multiEval.setMetricName("accuracy").evaluate(predictionDF)}")
    println(s"Weighted Precision: ${multiEval.setMetricName("weightedPrecision").evaluate(predictionDF)}")
    println(s"Weighted Recall: ${multiEval.setMetricName("weightedRecall").evaluate(predictionDF)}")
    println(s"F1: ${multiEval.setMetricName("f1").evaluate(predictionDF)}")

    val TP = predictionDF.select("label", "prediction").filter("label = 1 and prediction = 1").count
    val TN = predictionDF.select("label", "prediction").filter("label = 0 and prediction = 0").count
    val FP = predictionDF.select("label", "prediction").filter("label = 0 and prediction = 1").count
    val FN = predictionDF.select("label", "prediction").filter("label = 1 and prediction = 0").count
    val total = predictionDF.select("label").count.toDouble
    // Unweighted Metrics
    val accuracy    = (TP + TN) / total
    val precision   = TP / (TP+FP).toDouble
    val recall      = TP / (TP+FN).toDouble
    val F1 = 2*precision*recall/(precision+recall)
    println(s"Accuracy: ${accuracy}")
    println(s"Precision: ${precision}")
    println(s"Recall: ${recall}")
    println(s"F1: ${F1}")

    //predictionDF.select( $"label",$"prediction" cast "Int").orderBy("label").groupBy("label").pivot("prediction",Seq("0","1")).count.show()

    // Confusion matrix
    printf(s"""|=================== Confusion Matrix ==========================
           |##########| %-15s                     %-15s
           |----------+----------------------------------------------------
           |Actual = 0| %-15d                     %-15d
           |Actual = 1| %-15d                     %-15d
           |===============================================================
         """.stripMargin, "Predicted = 0", "Predicted = 1", TN, FP, FN, TP)

    println("\n==================================================================================================")
}


getConfusionMatrix: (predictionDF: org.apache.spark.sql.DataFrame)Unit


## Set up Flight Data Feature Processing Pipleline Stages for Arbitrary Estimator

In [8]:
// Deal with Categorical Columns
val categoricalVariables = Array(
    "Departing_Port", "Arriving_Port", "Airline")
val categoricalIndexers = categoricalVariables
  .map(i => new StringIndexer().setInputCol(i).setOutputCol(i+"_Index"))
val categoricalEncoders = categoricalVariables
  .map(e => new OneHotEncoder().setInputCol(e + "_Index").setOutputCol(e + "_Vec"))


// select the flight data explanatory fields that will predict flight delay
val explanatoryFields = Array("Date_Num",  "Airline_Vec", "Fuel_Price",
    "Departing_Port_Vec", "Mean_daily_wind_run_km_Depart", "Mean_rainfall_mm_Depart",
    "Mean_number_of_days_of_rain_Depart","Mean_number_of_days_>_40_Degrees_C_Depart",
    "Arriving_Port_Vec")

// Assemble everything together to be ("label","features") format
val assembler = (new VectorAssembler()
                 .setInputCols(explanatoryFields)
                 //.setOutputCol("indexedFeatures")
                 .setOutputCol("features")
                )

val featureProcessingStages: Array[PipelineStage] = categoricalIndexers ++ categoricalEncoders ++ Array(assembler)


categoricalVariables: Array[String] = Array(Departing_Port, Arriving_Port, Airline)
categoricalIndexers: Array[org.apache.spark.ml.feature.StringIndexer] = Array(strIdx_66e3401730d4, strIdx_207284171ed0, strIdx_78feabb4d3e7)
categoricalEncoders: Array[org.apache.spark.ml.feature.OneHotEncoder] = Array(oneHot_9251aefffc36, oneHot_3d93e216ed5d, oneHot_e799ad96be47)
explanatoryFields: Array[String] = Array(Date_Num, Airline_Vec, Fuel_Price, Departing_Port_Vec, Mean_daily_wind_run_km_Depart, Mean_rainfall_mm_Depart, Mean_number_of_days_of_rain_Depart, Mean_number_of_days_>_40_Degrees_C_Depart, Arriving_Port_Vec)
assembler: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_5d9387ee0e7b
featureProcessingStages: Array[org.apache.spark.ml.PipelineStage] = Array(strIdx_66e3401730d4, str...

In [9]:
def pipeLineCrossValidationTrainer(pipeLine: Pipeline, paramGrid: Array[ParamMap], training: DataFrame, testing: DataFrame): DataFrame = {

    //Use Cross Validation to Train the Decision Tree Models
    val cv = new CrossValidator()
      .setEstimator(pipeLine)
      .setEvaluator(new BinaryClassificationEvaluator)
      .setEstimatorParamMaps(paramGrid)
      .setNumFolds(10)  // Use 3+ in practice
      //.setParallelism(2)  // Evaluate up to 2 parameter settings in parallel


    // Run cross-validation, and choose the best set of parameters.
    val cvModel = cv.fit(training)

    // test the model
    cvModel.transform(testing)
}

pipeLineCrossValidationTrainer: (pipeLine: org.apache.spark.ml.Pipeline, paramGrid: Array[org.apache.spark.ml.param.ParamMap], training: org.apache.spark.sql.DataFrame, testing: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame


## Cross Validated RF Classifier Prototype

In [10]:
// instantiate the random forest classifier
val rf = new RandomForestClassifier()
  .setLabelCol("label")
  .setFeaturesCol("features")
  .setMaxMemoryInMB(4096)
  .setImpurity("gini")
  .setFeatureSubsetStrategy("auto")

println(s"Random Forest parameters:\n ${rf.explainParams()}\n")

// build the random forest pipeline
val rfPipeline = new Pipeline().setStages(featureProcessingStages ++ Array(rf))


val rfParamGrid = new ParamGridBuilder()
             .addGrid(rf.maxDepth, Array(3,6))
             .addGrid(rf.maxBins, Array(32))
             .build()

// train and test random forest classifier
val rfCVPredictions = pipeLineCrossValidationTrainer(rfPipeline, rfParamGrid, training, testing)

rfCVPredictions.select("prediction", "label", "features").show(20)
getConfusionMatrix(rfCVPredictions)

Random Forest parameters:
 cacheNodeIds: If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. (default: false)
checkpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext (default: 10)
featureSubsetStrategy: The number of features to consider for splits at each tree node. Supported options: auto, all, onethird, sqrt, log2, (0.0-1.0], [1-n]. (default: auto, current: auto)
featuresCol: features column name (default: features, current: features)
impurity: Criterion used for information gain calculation (case-insensitive). Supported options: entropy, gini (default: gini, current: gini)
labelCol: label column name (default: label, current: label)
maxBins: Max number of bi

rf: org.apache.spark.ml.classification.RandomForestClassifier = rfc_1594db7ecb41
rfPipeline: org.apache.spark.ml.Pipeline = pipeline_2b54d8786791
rfParamGrid: Array[org.apache.spark.ml.param.ParamMap] =
Array({
	rfc_1594db7ecb41-maxBins: 32,
	rfc_1594db7ecb41-maxDepth: 3
}, {
	rfc_1594db7ecb41-maxBins: 32,
	rfc_1594db7ecb41-maxDepth: 6
})
rfCVPredictions: org.apache.spark.sql.DataFrame = [Departing_Port: string, Arriving_Port: string ... 82 more fields]
