IDS 561 Group 24 Project Code

ML Model: Random Forest

#Spark Data Frame Transformations and Actions

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

In [None]:
#Untar the Spark installer
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

In [None]:
!ls

sample_data  spark-3.0.0-bin-hadoop3.2	spark-3.0.0-bin-hadoop3.2.tgz


In [None]:
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
#Mounting Google Drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
#Changing the directory to the required path
%cd /content/drive/My Drive/Big Data Project
!ls 

/content/drive/My Drive/Big Data Project
Datasets


#1. Creating the Environment

In [None]:
# For SQL-type queries (Spark)
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.sql.functions import udf

# For regression and other possible ML tools (Spark)
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import Vectors
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.param import Param, Params
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.stat import Statistics

In [None]:
# Important for managing features  (Spark)
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml.feature import VectorAssembler

# For displaying and other related IPython tools...
from IPython.display import display
from IPython.html.widgets import interact



In [None]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from pyspark.sql.functions import isnan, when, count, col

In [None]:
# Typycal Python tools
import sys
import numpy as np
import pandas as pd
import time
import datetime
import matplotlib.pyplot as plt
import os.path

#2. Load the data

In [None]:
df1 = spark.read.csv('/content/drive/My Drive/Big Data Project/Datasets/data2019-001.csv', header='True')
df2 = spark.read.csv('/content/drive/My Drive/Big Data Project/Datasets/data2020.csv', header='True')
df3 = spark.read.csv('/content/drive/My Drive/Big Data Project/Datasets/data2021.csv', header='True')

In [None]:
df3.dtypes

# Data Cleaning for Datasets

In [None]:
# Determine the data shape before starting cleaning

print(f"The shape is {df1.count():d} rows by {len(df1.columns):d} columns.")

The shape is 7422037 rows by 110 columns.


In [None]:
# Count the number of null values 

null_counts = df1.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) 
                         for c in df1.columns]).toPandas().to_dict(orient='records')

print(f"We have {sum(null_counts[0].values()):d} null values in this dataset.")

We have 395605008 null values in this dataset.


  df[column_name] = series


In [None]:
# Checked whether null values have been dropped

null_counts2 = df2.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) 
                         for c in df2.columns]).toPandas().to_dict(orient='records')

print(f"We have {sum(null_counts2[0].values()):d} null values in this dataset.")

In [None]:
# Checked whether null values have been dropped

null_counts3 = df3.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) 
                         for c in df3.columns]).toPandas().to_dict(orient='records')

print(f"We have {sum(null_counts3[0].values()):d} null values in this dataset.")

#Changing Data types

In [None]:
from pyspark.sql.types import (
    StringType, BooleanType, IntegerType, FloatType, DateType
)

In [None]:
df1 = df1.withColumn("YEAR", df1["YEAR"].cast(IntegerType()))
df1 = df1.withColumn("MONTH", df1["MONTH"].cast(IntegerType()))
df1 = df1.withColumn("DAY_OF_MONTH", df1["DAY_OF_MONTH"].cast(IntegerType()))
df1 = df1.withColumn("DAY_OF_WEEK", df1["DAY_OF_WEEK"].cast(IntegerType()))
df1 = df1.withColumn("CRS_DEP_TIME", df1["CRS_DEP_TIME"].cast(IntegerType()))
df1 = df1.withColumn("CRS_ARR_TIME", df1["CRS_ARR_TIME"].cast(IntegerType()))
df1 = df1.withColumn("FLIGHTS", df1["FLIGHTS"].cast(IntegerType()))
df1 = df1.withColumn("DISTANCE", df1["DISTANCE"].cast(IntegerType()))
df1 = df1.withColumn("DIVERTED", df1["DIVERTED"].cast(IntegerType()))

#3. Running our classifier and models (FOR 2019 only)

---



In [None]:
df1.dtypes

In [None]:
# Create list of selected columns

selected_cols = ['YEAR', 'MONTH', 'DAY_OF_MONTH', 'DAY_OF_WEEK', 'CRS_DEP_TIME', 
                'CRS_ARR_TIME', 'FLIGHTS', 'DISTANCE', 'DIVERTED']

In [None]:
# Generate and create our new feature vector column

df1 = VectorAssembler(inputCols=selected_cols, outputCol="features").transform(df1)

In [None]:
# Select input columns

df1.select("Cancelled", "features").show(5)

+---------+--------------------+
|Cancelled|            features|
+---------+--------------------+
|      0.0|[2019.0,1.0,16.0,...|
|      0.0|[2019.0,1.0,17.0,...|
|      0.0|[2019.0,1.0,18.0,...|
|      0.0|[2019.0,1.0,19.0,...|
|      1.0|[2019.0,1.0,20.0,...|
+---------+--------------------+
only showing top 5 rows



#Set Constant for the Random Forrest

In [None]:
RANDOM_SEED = 141109
TRAINING_DATA_RATIO = 0.7
RF_NUM_TREES = 8
RF_MAX_DEPTH = 4
RF_NUM_BINS = 32

In [None]:
# Build the training indexers

# Generate a labelIndexer for Cancelled flight
labelIndexer = StringIndexer(inputCol="CANCELLED", outputCol="indexedLabel").fit(df1)

# Generate the indexed feature vector
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(df1)

In [None]:
# Generate a labelIndexer for arrival delay
labelIndexer1 = StringIndexer(inputCol="ARR_DEL15", outputCol="indexedLabel1").fit(df1)
labelIndexer1.setHandleInvalid('skip')

StringIndexerModel: uid=StringIndexer_e8610144a729, handleInvalid=error

In [None]:
# Generate a labelIndexer for departure delay
labelIndexer2 = StringIndexer(inputCol="DEP_DEL15", outputCol="indexedLabel2").fit(df1)
labelIndexer2.setHandleInvalid('skip')

StringIndexerModel: uid=StringIndexer_e65e7e1d8ab5, handleInvalid=error

#Train and evaluating the model with "Cancelled flight" as the Estimator




In [None]:
# Split the data into training and tests sets
(trainingData, testData) = df1.randomSplit([TRAINING_DATA_RATIO, 1 - TRAINING_DATA_RATIO])

# Train the RandomForest model
df1_rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=RF_NUM_TREES)

# Chain indexers and the forest models in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, df1_rf])

In [None]:
# Train the model

model = pipeline.fit(trainingData)

In [None]:
# Make predictions

predictions = model.transform(testData)

In [None]:
# Select prediction, true label and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

print(f"Test Error = {(1.0 - accuracy):g}")
print(f"Accuracy = {accuracy:g}")

Test Error = 0.0181524
Accuracy = 0.981848


#Train and evaluating the model with "Arrival Delay" as the Estimator

In [None]:
# Split the data into training and tests sets
(trainingData1, testData1) = df1.randomSplit([TRAINING_DATA_RATIO, 1 - TRAINING_DATA_RATIO])

# Train the RandomForest model
df1_rf1 = RandomForestClassifier(labelCol="indexedLabel1", featuresCol="indexedFeatures", numTrees=RF_NUM_TREES)

# Chain indexers and the forest models in a Pipeline
pipeline1 = Pipeline(stages=[labelIndexer1, featureIndexer, df1_rf1])

In [None]:
# Train the model

model1 = pipeline1.fit(trainingData1)

In [None]:
# Make predictions

predictions1 = model1.transform(testData1)

In [None]:
# Select prediction, true label and compute test error
evaluator1 = MulticlassClassificationEvaluator(
    labelCol="indexedLabel1", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator1.evaluate(predictions1)

print(f"Test Error = {(1.0 - accuracy):g}")
print(f"Accuracy = {accuracy:g}")

Test Error = 0.191325
Accuracy = 0.808675


#Train and evaluating the model with "Departure Delay" as the Estimator

In [None]:
# Split the data into training and tests sets
(trainingData2, testData2) = df1.randomSplit([TRAINING_DATA_RATIO, 1 - TRAINING_DATA_RATIO])

# Train the RandomForest model
df1_rf2 = RandomForestClassifier(labelCol="indexedLabel2", featuresCol="indexedFeatures", numTrees=RF_NUM_TREES)

# Chain indexers and the forest models in a Pipeline
pipeline2 = Pipeline(stages=[labelIndexer2, featureIndexer, df1_rf2])

In [None]:
# Train the model

model2 = pipeline2.fit(trainingData2)

In [None]:
# Make predictions

predictions2 = model2.transform(testData2)

In [None]:
# Select prediction, true label and compute test error
evaluator2 = MulticlassClassificationEvaluator(
    labelCol="indexedLabel2", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator2.evaluate(predictions2)

print(f"Test Error = {(1.0 - accuracy):g}")
print(f"Accuracy = {accuracy:g}")

Test Error = 0.187006
Accuracy = 0.812994


#4. Running Our Classifier Model (For 2020 only)

In [None]:
df2.dtypes

In [None]:
#Changing specific column datatypes from string to int

df2 = df2.withColumn("YEAR", df2["YEAR"].cast(IntegerType()))
df2 = df2.withColumn("MONTH", df2["MONTH"].cast(IntegerType()))
df2 = df2.withColumn("DAY_OF_MONTH", df2["DAY_OF_MONTH"].cast(IntegerType()))
df2 = df2.withColumn("DAY_OF_WEEK", df2["DAY_OF_WEEK"].cast(IntegerType()))
df2 = df2.withColumn("CRS_DEP_TIME", df2["CRS_DEP_TIME"].cast(IntegerType()))
df2 = df2.withColumn("CRS_ARR_TIME", df2["CRS_ARR_TIME"].cast(IntegerType()))
df2 = df2.withColumn("FLIGHTS", df2["FLIGHTS"].cast(IntegerType()))
df2 = df2.withColumn("DISTANCE", df2["DISTANCE"].cast(IntegerType()))
df2 = df2.withColumn("DIVERTED", df2["DIVERTED"].cast(IntegerType()))

In [None]:
# Create list of selected columns

selected_cols1 = ['YEAR', 'MONTH', 'DAY_OF_MONTH', 'DAY_OF_WEEK', 'CRS_DEP_TIME', 
                'CRS_ARR_TIME', 'FLIGHTS', 'DISTANCE', 'DIVERTED']

In [None]:
# Generate and create our new feature vector column

df2 = VectorAssembler(inputCols=selected_cols1, outputCol="features1").transform(df2)

In [None]:
# Select input columns

df2.select("DEP_DEL15", "features1").show(5)

+---------+--------------------+
|DEP_DEL15|           features1|
+---------+--------------------+
|      1.0|[2020.0,1.0,1.0,3...|
|      0.0|[2020.0,1.0,1.0,3...|
|      0.0|[2020.0,1.0,1.0,3...|
|      0.0|[2020.0,1.0,1.0,3...|
|      0.0|[2020.0,1.0,1.0,3...|
+---------+--------------------+
only showing top 5 rows



#Perform and Evaluate the Random Forest Model

In [None]:
RANDOM_SEED = 141109
TRAINING_DATA_RATIO = 0.7
RF_NUM_TREES = 8
RF_MAX_DEPTH = 4
RF_NUM_BINS = 32

In [None]:
# Build the training indexers

# Generate a Departure Indexer
DelIndexer = StringIndexer(inputCol="DEP_DEL15", outputCol="indexedDel").fit(df2)
DelIndexer.setHandleInvalid('skip')

# Generate the indexed feature vector
featureIndexer1 = VectorIndexer(inputCol="features1", outputCol="indexedFeatures1", maxCategories=4).fit(df2)

In [None]:
# Generate a Arrival Indexer
ArrIndexer = StringIndexer(inputCol="ARR_DEL15", outputCol="indexedDel1").fit(df2)
ArrIndexer.setHandleInvalid('skip')

StringIndexerModel: uid=StringIndexer_2ed5186a08e6, handleInvalid=error

In [None]:
# Generate a CancelIndexer
CancelIndexer = StringIndexer(inputCol="CANCELLED", outputCol="indexedDel2").fit(df2)

#Train and evaluating the model with "Departure Delay" as the Estimator

In [None]:
# Split the data into training and tests sets
(trainingData3, testData3) = df2.randomSplit([TRAINING_DATA_RATIO, 1 - TRAINING_DATA_RATIO])

# Train the RandomForest model
df2_rf = RandomForestClassifier(labelCol="indexedDel", featuresCol="indexedFeatures1", numTrees=RF_NUM_TREES)

# Chain indexers and the forest models in a Pipeline
pipeline3 = Pipeline(stages=[DelIndexer, featureIndexer1, df2_rf])

In [None]:
# Train the model

model3 = pipeline3.fit(trainingData1)

In [None]:
# Make predictions

predictions3 = model3.transform(testData3)

In [None]:
# Select prediction, true label and compute test error
evaluator3 = MulticlassClassificationEvaluator(
    labelCol="indexedDel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator3.evaluate(predictions3)

print(f"Test Error = {(1.0 - accuracy):g}")
print(f"Accuracy = {accuracy:g}")

Test Error = 0.0907222
Accuracy = 0.909278


#Train and evaluating the model with "Arrival Delay" as the Estimator

In [None]:
# Split the data into training and tests sets
(trainingData4, testData4) = df2.randomSplit([TRAINING_DATA_RATIO, 1 - TRAINING_DATA_RATIO])

# Train the RandomForest model
df2_rf1 = RandomForestClassifier(labelCol="indexedDel1", featuresCol="indexedFeatures1", numTrees=RF_NUM_TREES)

# Chain indexers and the forest models in a Pipeline
pipeline4 = Pipeline(stages=[ArrIndexer, featureIndexer1, df2_rf1])

In [None]:
# Train the model

model4 = pipeline4.fit(trainingData4)

In [None]:
# Make predictions

predictions4 = model4.transform(testData4)

In [None]:
# Select prediction, true label and compute test error
evaluator4 = MulticlassClassificationEvaluator(
    labelCol="indexedDel1", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator4.evaluate(predictions4)

print(f"Test Error = {(1.0 - accuracy):g}")
print(f"Accuracy = {accuracy:g}")

Test Error = 0.0980773
Accuracy = 0.901923


#Train and evaluating the model with "Cancelled" as the Estimator

In [None]:
# Split the data into training and tests sets
(trainingData5, testData5) = df2.randomSplit([TRAINING_DATA_RATIO, 1 - TRAINING_DATA_RATIO])

# Train the RandomForest model
df2_rf2 = RandomForestClassifier(labelCol="indexedDel2", featuresCol="indexedFeatures1", numTrees=RF_NUM_TREES)

# Chain indexers and the forest models in a Pipeline
pipeline5 = Pipeline(stages=[CancelIndexer, featureIndexer1, df2_rf2])

In [None]:
# Train the model

model5 = pipeline5.fit(trainingData5)

In [None]:
# Make predictions

predictions5 = model5.transform(testData5)

In [None]:
# Select prediction, true label and compute test error
evaluator5 = MulticlassClassificationEvaluator(
    labelCol="indexedDel2", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator5.evaluate(predictions5)

print(f"Test Error = {(1.0 - accuracy):g}")
print(f"Accuracy = {accuracy:g}")

Test Error = 0.0601021
Accuracy = 0.939898


#5. Running Our Classifier Model (For 2021 only)

In [None]:
#Changing specific column datatypes from string to int

df3 = df3.withColumn("YEAR", df3["YEAR"].cast(IntegerType()))
df3 = df3.withColumn("MONTH", df3["MONTH"].cast(IntegerType()))
df3 = df3.withColumn("DAY_OF_MONTH", df3["DAY_OF_MONTH"].cast(IntegerType()))
df3 = df3.withColumn("DAY_OF_WEEK", df3["DAY_OF_WEEK"].cast(IntegerType()))
df3 = df3.withColumn("CRS_DEP_TIME", df3["CRS_DEP_TIME"].cast(IntegerType()))
df3 = df3.withColumn("CRS_ARR_TIME", df3["CRS_ARR_TIME"].cast(IntegerType()))
df3 = df3.withColumn("FLIGHTS", df3["FLIGHTS"].cast(IntegerType()))
df3 = df3.withColumn("DISTANCE", df3["DISTANCE"].cast(IntegerType()))
df3 = df3.withColumn("DIVERTED", df3["DIVERTED"].cast(IntegerType()))

In [None]:
# Create list of selected columns

selected_cols2 = ['YEAR', 'MONTH', 'DAY_OF_MONTH', 'DAY_OF_WEEK', 'CRS_DEP_TIME', 
                'CRS_ARR_TIME', 'FLIGHTS', 'DISTANCE', 'DIVERTED']

In [None]:
# Generate and create our new feature vector column

df3 = VectorAssembler(inputCols=selected_cols2, outputCol="features2").transform(df3)

In [None]:
# Select input columns

df3.select("ARR_DEL15", "features2").show(5)

+---------+--------------------+
|ARR_DEL15|           features2|
+---------+--------------------+
|      0.0|[2021.0,1.0,5.0,2...|
|      0.0|[2021.0,1.0,5.0,2...|
|      0.0|[2021.0,1.0,5.0,2...|
|      0.0|[2021.0,1.0,5.0,2...|
|      0.0|[2021.0,1.0,5.0,2...|
+---------+--------------------+
only showing top 5 rows



#Perform and Evaluate the Random Forest Model

In [None]:
# Build the training indexers

# Generate a Arrival Indexer
ArrIndexer1 = StringIndexer(inputCol="ARR_DEL15", outputCol="indexedArr").fit(df3)
ArrIndexer1.setHandleInvalid('skip')

# Generate the indexed feature vector
featureIndexer2 = VectorIndexer(inputCol="features2", outputCol="indexedFeatures2", maxCategories=4).fit(df3)

In [None]:
# Generate a Departure Indexer
DelIndexer1 = StringIndexer(inputCol="DEP_DEL15", outputCol="indexedArr1").fit(df3)
DelIndexer1.setHandleInvalid('skip')

StringIndexerModel: uid=StringIndexer_72f30991854f, handleInvalid=error

In [None]:
# Generate a Cancelled Indexer
CancelIndexer1 = StringIndexer(inputCol="CANCELLED", outputCol="indexedArr2").fit(df3)
CancelIndexer1.setHandleInvalid('skip')

StringIndexerModel: uid=StringIndexer_d43c80d3c11a, handleInvalid=error

#Train and evaluating the model with "Arrival Delay" as the Estimator

In [None]:
# Split the data into training and tests sets
(trainingData6, testData6) = df3.randomSplit([TRAINING_DATA_RATIO, 1 - TRAINING_DATA_RATIO])

# Train the RandomForest model
df3_rf = RandomForestClassifier(labelCol="indexedArr", featuresCol="indexedFeatures2", numTrees=RF_NUM_TREES)

# Chain indexers and the forest models in a Pipeline
pipeline6 = Pipeline(stages=[ArrIndexer1, featureIndexer2, df3_rf])

In [None]:
# Train the model

model6 = pipeline6.fit(trainingData6)

In [None]:
# Make predictions

predictions6 = model6.transform(testData6)

In [None]:
# Select prediction, true label and compute test error
evaluator6 = MulticlassClassificationEvaluator(
    labelCol="indexedArr", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator6.evaluate(predictions6)

print(f"Test Error = {(1.0 - accuracy):g}")
print(f"Accuracy = {accuracy:g}")

Test Error = 0.17194
Accuracy = 0.82806


#Train and evaluating the model with "Departure Delay" as the Estimator

In [None]:
# Split the data into training and tests sets
(trainingData7, testData7) = df3.randomSplit([TRAINING_DATA_RATIO, 1 - TRAINING_DATA_RATIO])

# Train the RandomForest model
df3_rf1 = RandomForestClassifier(labelCol="indexedArr1", featuresCol="indexedFeatures2", numTrees=RF_NUM_TREES)

# Chain indexers and the forest models in a Pipeline
pipeline7 = Pipeline(stages=[DelIndexer1, featureIndexer2, df3_rf1])

In [None]:
# Train the model

model7 = pipeline7.fit(trainingData7)

In [None]:
# Make predictions

predictions7 = model7.transform(testData7)

In [None]:
# Select prediction, true label and compute test error
evaluator7 = MulticlassClassificationEvaluator(
    labelCol="indexedArr1", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator7.evaluate(predictions7)

print(f"Test Error = {(1.0 - accuracy):g}")
print(f"Accuracy = {accuracy:g}")

Test Error = 0.173875
Accuracy = 0.826125


#Train and evaluating the model with "Cancelled" as the Estimator

In [None]:
# Split the data into training and tests sets
(trainingData8, testData8) = df3.randomSplit([TRAINING_DATA_RATIO, 1 - TRAINING_DATA_RATIO])

# Train the RandomForest model
df3_rf1 = RandomForestClassifier(labelCol="indexedArr2", featuresCol="indexedFeatures2", numTrees=RF_NUM_TREES)

# Chain indexers and the forest models in a Pipeline
pipeline8 = Pipeline(stages=[CancelIndexer1, featureIndexer2, df3_rf1])

In [None]:
# Train the model

model8 = pipeline8.fit(trainingData8)

In [None]:
# Make predictions

predictions8 = model8.transform(testData8)

In [None]:
# Select prediction, true label and compute test error
evaluator8 = MulticlassClassificationEvaluator(
    labelCol="indexedArr2", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator8.evaluate(predictions8)

print(f"Test Error = {(1.0 - accuracy):g}")
print(f"Accuracy = {accuracy:g}")

Test Error = 0.0172961
Accuracy = 0.982704
