## Group 8 Assignment Phase 3

In [1]:
! hadoop fs -chmod -R 777 hdfs://localhost:9000/tmp
! hadoop fs -rm    -r  hdfs://localhost:9000/tmp/flightData_*
! hadoop fs -mkdir -p  hdfs://localhost:9000/tmp/flightData_in
! hadoop fs -put   -p  flightDelay.parquet hdfs://localhost:9000/tmp/flightData_in
! hadoop fs -ls        hdfs://localhost:9000/tmp/flightData_in/

20/06/07 00:15:45 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.


Deleted hdfs://localhost:9000/tmp/flightData_in


Found 1 items


drwxrwxrwx   - root root          0 2020-06-06 01:25 hdfs://localhost:9000/tmp/flightData_in/flightDelay.parquet




In [2]:
!hdfs getconf -confKey fs.defaultFS

hdfs://localhost:9000



In [3]:
//Start a simple Spark Session
import org.apache.spark.sql.{SparkSession, DataFrame}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql._

//Feature Processing Classes
import org.apache.spark.ml.feature.{VectorAssembler,StringIndexer,VectorIndexer,OneHotEncoder, PCA}

//Linear Algebra Data Structures
import org.apache.spark.ml.linalg.{Vector,Vectors}

//Model Building Pipeline
import org.apache.spark.ml.{Pipeline, PipelineStage, PipelineModel}

//Binary Classification
import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel,
                                           RandomForestClassifier, GBTClassifier,
                                           DecisionTreeClassifier, DecisionTreeClassificationModel}
//Model Training
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.tuning.{CrossValidator, CrossValidatorModel, 
                                   ParamGridBuilder, TrainValidationSplit}

//Model Evaluation
import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator,MulticlassClassificationEvaluator}

//Optional: Use the following code below to set the Error reporting
import org.apache.log4j._
Logger.getLogger("org").setLevel(Level.ERROR)


//For Cleaning
//import scala.util.matching.Regex

val spark = SparkSession.builder().appName("Group 8 ML Phase 3").getOrCreate()

Intitializing Scala interpreter ...

Spark Web UI available at http://1c0c4af6b969:4042
SparkContext available as 'sc' (version = 2.4.5, master = local[*], app id = local-1591488959724)
SparkSession available as 'spark'


import org.apache.spark.sql.{SparkSession, DataFrame}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.ml.feature.{VectorAssembler, StringIndexer, VectorIndexer, OneHotEncoder, PCA}
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.{Pipeline, PipelineStage, PipelineModel}
import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel, RandomForestClassifier, GBTClassifier, DecisionTreeClassifier, DecisionTreeClassificationModel}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.tuning.{CrossValidator, CrossValidatorModel, ParamGridBuilder, TrainValidationSplit}
import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator, Multic...

## Read in a parquet file of flight delay, fuel-price and meteorological data

In [4]:
val flights = (spark
            .read.parquet("flightDelay.parquet")
            .withColumn("Month_Num1", $"Month_Num" cast "Int")
            //convert month and year to integer index starting Jan 2004
            .withColumn("Date_Num",  ($"Year"-2004)*12 + $"Month_Num1")
            .drop("Sectors_Flown", "Month_Num1", "Change")
            .withColumnRenamed("Departures_Delayed","label")
            .withColumnRenamed("Price","Fuel_Price")
            //drop NA's even though none were found!
            .na.drop()
            //.cache
              )

flights.printSchema()

root
 |-- Departing_Port: string (nullable = true)
 |-- Arriving_Port: string (nullable = true)
 |-- Airline: string (nullable = true)
 |-- label: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month_Num: string (nullable = true)
 |-- Fuel_Price: double (nullable = true)
 |-- Departing_Port_station_ID: string (nullable = true)
 |-- Departing_Port_station_name: string (nullable = true)
 |-- Arriving_Port_station_ID: string (nullable = true)
 |-- Arriving_Port_station_name: string (nullable = true)
 |-- Mean_3pm_cloud_cover_oktas_Depart: double (nullable = true)
 |-- Mean_3pm_dew_point_temperature_Degrees_C_Depart: double (nullable = true)
 |-- Mean_3pm_relative_humidity_%_Depart: double (nullable = true)
 |-- Mean_3pm_temperature_Degrees_C_Depart: double (nullable = true)
 |-- Mean_3pm_wet_bulb_temperature_Degrees_C_Depart: double (nullable = true)
 |-- Mean_3pm_wind_speed_km/h_Depart: double (nullable = true)
 |-- Mean_9am_cloud_cover_okas_Depart: double (nullable 

flights: org.apache.spark.sql.DataFrame = [Departing_Port: string, Arriving_Port: string ... 72 more fields]


## Take a look at the degree of imbalance in the dataset

In [5]:
val counts = flights.groupBy("label").count()

println("proportion of lates (label=1) in the sample")
counts.show()

proportion of lates (label=1) in the sample
+-----+-------+
|label|  count|
+-----+-------+
|    1|1072071|
|    0|5224826|
+-----+-------+



counts: org.apache.spark.sql.DataFrame = [label: int, count: bigint]


## Split The Data into training and testing dataframes

In [6]:
//Filter out the most recent 12 months of flight data as the test dataset
//Dates after March 2019 have Date_Num > 183
val rawTesting = flights.filter($"Date_Num"> 183)
println(s"Test Set of the Most Recent 12 Months has ${rawTesting.count()} records")

//Filter out rows prior to the most recent 12 months of flight data as the training dataset
val rawTraining = flights.filter($"Date_Num" < 184)

//val Array(imbalancedTraining, testing) = flights.randomSplit(Array(0.7, 0.3), seed = 12345)

Test Set of the Most Recent 12 Months has 435479 records


rawTesting: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Departing_Port: string, Arriving_Port: string ... 72 more fields]
rawTraining: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Departing_Port: string, Arriving_Port: string ... 72 more fields]


## Down sample the Ontime Departures To Balance The Training data

In [7]:
val ontimeTrainingFlights = rawTraining.filter($"label"===0)
println(s"On time Training Flights: ${ontimeTrainingFlights.count()}")

val delayedTrainingFlights = rawTraining.filter($"label"===1)
println(s"Delayed Training Flights: ${delayedTrainingFlights.count()}")

val sampledOntimeTrainingFlights = ontimeTrainingFlights.sample(false, 0.2)  

println(s"Down Sampled ontime Training Flights: ${sampledOntimeTrainingFlights.count()}")


val sampleFraction = 0.1
//Concatenate rows of ontimeTrainingFlights and delayedTrainingFlights
val training = (sampledOntimeTrainingFlights
                .union(delayedTrainingFlights)
                .sample(false, sampleFraction))

val testing = rawTesting.sample(false, 0.1)
               
val resampledCounts = training.groupBy("label").count()
println("proportion of lates (label=1) in the sample")
resampledCounts.show()

On time Training Flights: 4884963
Delayed Training Flights: 976455
Down Sampled ontime Training Flights: 976479
proportion of lates (label=1) in the sample
+-----+-----+
|label|count|
+-----+-----+
|    1|97279|
|    0|97701|
+-----+-----+



ontimeTrainingFlights: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Departing_Port: string, Arriving_Port: string ... 72 more fields]
delayedTrainingFlights: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Departing_Port: string, Arriving_Port: string ... 72 more fields]
sampledOntimeTrainingFlights: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Departing_Port: string, Arriving_Port: string ... 72 more fields]
sampleFraction: Double = 0.1
training: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Departing_Port: string, Arriving_Port: string ... 72 more fields]
testing: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Departing_Port: string, Arriving_Port: string ... 72 more fields]
resampledCounts: org.apache.spark.sql.DataFrame = [l...

## Model Assessment via a Confusion Matrix Method

In [8]:
def getConfusionMatrix(predictionDF: DataFrame): Unit = {
    val eval = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction")
    println(s"Accuracy: ${eval.setMetricName("accuracy").evaluate(predictionDF)}")
    println(s"Weighted Precision: ${eval.setMetricName("weightedPrecision").evaluate(predictionDF)}")
    println(s"Weighted Recall: ${eval.setMetricName("weightedRecall").evaluate(predictionDF)}")
    println(s"F1: ${eval.setMetricName("f1").evaluate(predictionDF)}")

    val TP = predictionDF.select("label", "prediction").filter("label = 1 and prediction = 1").count
    val TN = predictionDF.select("label", "prediction").filter("label = 0 and prediction = 0").count
    val FP = predictionDF.select("label", "prediction").filter("label = 0 and prediction = 1").count
    val FN = predictionDF.select("label", "prediction").filter("label = 1 and prediction = 0").count
    val total = predictionDF.select("label").count.toDouble
    // Unweighted Metrics
    val accuracy    = (TP + TN) / total
    val precision   = TP / (TP+FP).toDouble
    val recall      = TP / (TP+FN).toDouble
    val F1 = 2/(1/precision + 1/recall)
    println(s"Accuracy: ${accuracy}")
    println(s"Precision: ${precision}")
    println(s"Recall: ${recall}")
    println(s"F1: ${F1}")

    // Confusion matrix
    printf(s"""|=================== Confusion Matrix ==========================
           |##########| %-15s                     %-15s
           |----------+----------------------------------------------------
           |Actual = 0| %-15d                     %-15d
           |Actual = 1| %-15d                     %-15d
           |===============================================================
         """.stripMargin, "Predicted = 0", "Predicted = 1", TN, FP, FN, TP)


}


getConfusionMatrix: (predictionDF: org.apache.spark.sql.DataFrame)Unit


## Set up the ML Pipleline Logistic Regression

In [9]:
//////////////////////////////////////////////////
//// Setting Up DataFrame for Machine Learning ///
//////////////////////////////////////////////////

// Deal with Categorical Columns
val categoricalVariables = Array(
    "Departing_Port", "Arriving_Port", "Airline")
val categoricalIndexers = categoricalVariables
  .map(i => new StringIndexer().setInputCol(i).setOutputCol(i+"_Index"))
val categoricalEncoders = categoricalVariables
  .map(e => new OneHotEncoder().setInputCol(e + "_Index").setOutputCol(e + "_Vec"))


// columns that need to be added to the features vector
val cols = Array("Date_Num",  "Airline_Vec", "Fuel_Price",
    "Departing_Port_Vec", "Mean_daily_wind_run_km_Depart", "Mean_rainfall_mm_Depart",
    "Mean_number_of_days_of_rain_Depart","Mean_number_of_days_>_40_Degrees_C_Depart",
    "Arriving_Port_Vec")

// Assemble everything together to be ("label","features") format
val assembler = (new VectorAssembler()
                 .setInputCols(cols)
                 .setOutputCol("indexedFeatures"))

// Choose linear combinations of explanatory variables that explain the most variance in the training data
val pca = new PCA()
    .setInputCol(assembler.getOutputCol)
        //"indexedFeatures")
    .setOutputCol("features").setK(9)

categoricalVariables: Array[String] = Array(Departing_Port, Arriving_Port, Airline)
categoricalIndexers: Array[org.apache.spark.ml.feature.StringIndexer] = Array(strIdx_3a39d6d84025, strIdx_78428a9cdbb9, strIdx_f6be01bdc8e7)
categoricalEncoders: Array[org.apache.spark.ml.feature.OneHotEncoder] = Array(oneHot_94946b4dedcc, oneHot_57f0d0c82872, oneHot_53f5f0f32577)
cols: Array[String] = Array(Date_Num, Airline_Vec, Fuel_Price, Departing_Port_Vec, Mean_daily_wind_run_km_Depart, Mean_rainfall_mm_Depart, Mean_number_of_days_of_rain_Depart, Mean_number_of_days_>_40_Degrees_C_Depart, Arriving_Port_Vec)
assembler: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_7b3e1d11a772
pca: org.apache.spark.ml.feature.PCA = pca_58286315bfac


In [10]:
//Create a random forest classifier

val randomForestClassifier = new RandomForestClassifier()
  .setImpurity("gini")
  .setMaxDepth(3)
  .setNumTrees(10)
  .setFeatureSubsetStrategy("auto")
  .setSeed(12345)

// Print out the parameters, documentation, and any default values.
println(s"Random Forest parameters:\n ${randomForestClassifier.explainParams()}\n")

Random Forest parameters:
 cacheNodeIds: If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. (default: false)
checkpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext (default: 10)
featureSubsetStrategy: The number of features to consider for splits at each tree node. Supported options: auto, all, onethird, sqrt, log2, (0.0-1.0], [1-n]. (default: auto, current: auto)
featuresCol: features column name (default: features)
impurity: Criterion used for information gain calculation (case-insensitive). Supported options: entropy, gini (default: gini, current: gini)
labelCol: label column name (default: label)
maxBins: Max number of bins for discretizing continuous feat

randomForestClassifier: org.apache.spark.ml.classification.RandomForestClassifier = rfc_a489eacb088b


In [11]:
// Initialise stages for the pipline
val stages: Array[PipelineStage] = categoricalIndexers ++ categoricalEncoders ++ Array(assembler, pca, randomForestClassifier)

// build the pipeline
val pipeline = new Pipeline().setStages(stages)

stages: Array[org.apache.spark.ml.PipelineStage] = Array(strIdx_3a39d6d84025, strIdx_78428a9cdbb9, strIdx_f6be01bdc8e7, oneHot_94946b4dedcc, oneHot_57f0d0c82872, oneHot_53f5f0f32577, vecAssembler_7b3e1d11a772, pca_58286315bfac, rfc_a489eacb088b)
pipeline: org.apache.spark.ml.Pipeline = pipeline_d20db20c2824


In [12]:
//val rfpipemodel = pipeline.fit(training)

In [13]:
// // test the model with test data
// val rfpipeprediction = rfpipemodel.transform(testing)

// rfpipeprediction.select ("features", "probability", "prediction", "label").show(10)

// // Measure the quality of the predictions
// getConfusionMatrix(rfpipeprediction)

## Train the Logistic Regression Pipeline using Cross Validation

In [14]:
// We use a ParamGridBuilder to construct a grid of parameters to search over.
val paramGrid = new ParamGridBuilder()
  .addGrid(randomForestClassifier.maxBins, Array(3,5))
  .addGrid(randomForestClassifier.maxDepth, Array(4,5))
  .addGrid(randomForestClassifier.impurity, Array("entropy", "gini"))
  .build()

//Unable to go to the extend required on current machine - ideally this would be the CV
// val paramGrid = new ParamGridBuilder()
//   .addGrid(randomForestClassifier.maxBins, Array(20,30,50))
//   .addGrid(randomForestClassifier.maxDepth, Array(3,6,9))
//   .addGrid(randomForestClassifier.impurity, Array("entropy", "gini"))
//   .build()

paramGrid: Array[org.apache.spark.ml.param.ParamMap] =
Array({
	rfc_a489eacb088b-impurity: entropy,
	rfc_a489eacb088b-maxBins: 3,
	rfc_a489eacb088b-maxDepth: 4
}, {
	rfc_a489eacb088b-impurity: entropy,
	rfc_a489eacb088b-maxBins: 5,
	rfc_a489eacb088b-maxDepth: 4
}, {
	rfc_a489eacb088b-impurity: gini,
	rfc_a489eacb088b-maxBins: 3,
	rfc_a489eacb088b-maxDepth: 4
}, {
	rfc_a489eacb088b-impurity: gini,
	rfc_a489eacb088b-maxBins: 5,
	rfc_a489eacb088b-maxDepth: 4
}, {
	rfc_a489eacb088b-impurity: entropy,
	rfc_a489eacb088b-maxBins: 3,
	rfc_a489eacb088b-maxDepth: 5
}, {
	rfc_a489eacb088b-impurity: entropy,
	rfc_a489eacb088b-maxBins: 5,
	rfc_a489eacb088b-maxDepth: 5
}, {
	rfc_a489eacb088b-impurity: gini,
	rfc_a489eacb088b-maxBins: 3,
	rfc_a489eacb088b-maxDepth: 5
}, {
	rfc_a489eacb088b-impurity: g...

In [15]:
// We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
// This will allow us to jointly choose parameters for all Pipeline stages.
// A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
// Note that the evaluator here is a BinaryClassificationEvaluator and its default metric
// is areaUnderROC.

val cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEvaluator(new BinaryClassificationEvaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(5)  // Use 3+ in practice
  .setParallelism(2)  // Evaluate up to 2 parameter settings in parallel

cv: org.apache.spark.ml.tuning.CrossValidator = cv_d77c502e3b99


In [16]:
// Run cross-validation, and choose the best set of parameters.
val model = cv.fit(training)

2020-06-07 00:18:54,768 WARN  [CrossValidator-thread-pool-1] netlib.LAPACK (LAPACK.java:<clinit>(61)) - Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
2020-06-07 00:18:54,769 WARN  [CrossValidator-thread-pool-1] netlib.LAPACK (LAPACK.java:<clinit>(61)) - Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK


model: org.apache.spark.ml.tuning.CrossValidatorModel = cv_d77c502e3b99


In [17]:
// Print the coefficients and intercept for logistic regression
//println(s"*******************************************\nCoefficients: ${model.coefficients} Intercept: ${model.intercept}")
// Since model is a Model (i.e., a Transformer produced by an Estimator),
// we can view the parameters it used during fit().
// This prints the parameter (name: value) pairs, where names are unique IDs for this LogisticRegression instance.

println(s"ParamMap: ${model.parent.extractParamMap}")

//Get Coeffs of the Best Logistic Regression Model
val bestModel = model.bestModel match {
  case pm: PipelineModel => Some(pm)
  case _ => None
}

val ml = bestModel
  .map(_.stages.collect { case ml: RandomForestClassifier => ml })
  .flatMap(_.headOption)

// Get best CV Model Parameters
print("Max Bins: ")
println(ml.map(m => (m.maxBins)).get)
print("Max Depth: ")
println(ml.map(m => (m.maxDepth)).get)
print("Impurity: ")
println(ml.map(m => m.impurity).get)

ParamMap: {
	cv_d77c502e3b99-collectSubModels: false,
	cv_d77c502e3b99-estimator: pipeline_d20db20c2824,
	cv_d77c502e3b99-estimatorParamMaps: [Lorg.apache.spark.ml.param.ParamMap;@37c3165c,
	cv_d77c502e3b99-evaluator: binEval_1b45d4d593ea,
	cv_d77c502e3b99-numFolds: 5,
	cv_d77c502e3b99-parallelism: 2,
	cv_d77c502e3b99-seed: -1191137437
}
Max Bins: 

<console>: 61: warning: fruitless type test: a value of type org.apache.spark.ml.Transformer cannot also be a org.apache.spark.ml.classification.RandomForestClassifier

In [18]:
// Get Results on Test Set
val results = model.transform(testing)

// Take a look at the predictions
results.select ("features", "probability", "prediction", "label").show(10)

// Measure the quality of the predictions
getConfusionMatrix(results)

+--------------------+--------------------+----------+-----+
|            features|         probability|prediction|label|
+--------------------+--------------------+----------+-----+
|[228.390185669663...|[0.45034270041792...|       1.0|    1|
|[228.390790219345...|[0.45034270041792...|       1.0|    1|
|[228.390790219345...|[0.45034270041792...|       1.0|    1|
|[228.390790219345...|[0.45034270041792...|       1.0|    1|
|[228.390790219345...|[0.45034270041792...|       1.0|    1|
|[213.818565039766...|[0.45731710200474...|       1.0|    1|
|[213.819169589448...|[0.45731710200474...|       1.0|    1|
|[213.819169589448...|[0.45731710200474...|       1.0|    1|
|[214.837604819055...|[0.46170091161084...|       1.0|    1|
|[214.837604819055...|[0.46170091161084...|       1.0|    1|
+--------------------+--------------------+----------+-----+
only showing top 10 rows

Accuracy: 0.42503046607344047
Weighted Precision: 0.6878150962030158
Weighted Recall: 0.4250304660734405
F1: 0.456710822

results: org.apache.spark.sql.DataFrame = [Departing_Port: string, Arriving_Port: string ... 83 more fields]


## Store The Best CV Trained Logistic Model for Reuse

In [19]:
//Persist the Model

model
    .write
    .overwrite()
    .save("./flightDelayModel/")

val results: DataFrame = CrossValidatorModel
.load("./flightDelayModel/")
.transform(testing)
.select(
    col("features"),
    col("label"),
    col("prediction")
)

//results.show()

results: org.apache.spark.sql.DataFrame = [features: vector, label: int ... 1 more field]
