Setting Up Pyspark Environment

In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 40 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 64.6 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=633f1230e3f9cf776ecc24804b797ecc66305e389274bf2a71b7f619e8573704
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.1


Connecting Google Drive to Collab

In [None]:
from google.colab import drive
drive.mount("/content/drive")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
%cd /content/drive/MyDrive/IDS 561 Big Data Analytics Project

/content/drive/MyDrive/IDS 561 Big Data Analytics Project


Importing the Important Libraries

In [None]:
import numpy as np
import pandas as pd 
import os
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.functions import round
from pyspark.ml.classification import LogisticRegression 
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, VectorAssembler
from pyspark.sql.functions import isnan, when, count, col

Craeting Local Spark Session

In [None]:
spark = SparkSession.builder.master('local[*]').appName('group7').getOrCreate()

Importing the CSV from Google Drive

In [None]:
df = spark.read.csv('/content/drive/MyDrive/IDS 561 Big Data Analytics Project/On_Time_Prediction_Dataset.csv', header='True',sep=',',inferSchema=True,
                         nullValue='NA')

Removing Uncessary columns and records with missing values

In [None]:

df =  df.drop('flight')

df = df.dropna()
print(df.count())

258289


Creating Label and making changes in the columns

In [None]:
df1 = df
flight_df = df1.withColumn('Distance_km', round(df1.Distance * 1.60934, 0)).drop('Distance')

flight_df = flight_df.withColumn('label', (flight_df.Delay >= 10).cast('integer'))

flight_df.show(5)

+-----+----+-----------+----------+------+------+---------------+-----+-----------+-----+
|Month|Date|Day of Week|Airline_ID|Origin|depart|Flight_Duration|Delay|Distance_km|label|
+-----+----+-----------+----------+------+------+---------------+-----+-----------+-----+
|   10|  10|          1|        OO|   ORD|  8.18|             51|   27|      253.0|    1|
|   11|  22|          1|        OO|   ORD|  7.17|            127|  -19|     1188.0|    0|
|    2|  14|          5|        B6|   JFK| 21.17|            365|   60|     3618.0|    1|
|    5|  25|          3|        WN|   SJC| 12.92|             85|   22|      621.0|    1|
|    3|  28|          1|        B6|   LGA| 13.33|            182|   70|     1732.0|    1|
+-----+----+-----------+----------+------+------+---------------+-----+-----------+-----+
only showing top 5 rows



Using the indexer to create colums with numerical index values

In [None]:
indexer = StringIndexer(inputCol='Airline_ID', outputCol='airline_idx').fit(flight_df).transform(flight_df)

indexer = StringIndexer(inputCol='Origin', outputCol='origin_idx').fit(indexer).transform(indexer)
indexer.show(5)

+-----+----+-----------+----------+------+------+---------------+-----+-----------+-----+-----------+----------+
|Month|Date|Day of Week|Airline_ID|Origin|depart|Flight_Duration|Delay|Distance_km|label|airline_idx|origin_idx|
+-----+----+-----------+----------+------+------+---------------+-----+-----------+-----+-----------+----------+
|   10|  10|          1|        OO|   ORD|  8.18|             51|   27|      253.0|    1|        2.0|       0.0|
|   11|  22|          1|        OO|   ORD|  7.17|            127|  -19|     1188.0|    0|        2.0|       0.0|
|    2|  14|          5|        B6|   JFK| 21.17|            365|   60|     3618.0|    1|        4.0|       2.0|
|    5|  25|          3|        WN|   SJC| 12.92|             85|   22|      621.0|    1|        3.0|       5.0|
|    3|  28|          1|        B6|   LGA| 13.33|            182|   70|     1732.0|    1|        4.0|       3.0|
+-----+----+-----------+----------+------+------+---------------+-----+-----------+-----+-------

Creating the Vector Assembler

In [None]:

assembler = VectorAssembler(inputCols=['Month', 'Date', 'Day of Week', 'airline_idx', 'origin_idx', 'Distance_km', 'depart', 'Flight_Duration'], 
                            outputCol='features')

flight_assembler = assembler.transform(indexer)

flight_assembler.select('features', 'Delay').show(5, truncate=False)

+-----------------------------------------+-----+
|features                                 |Delay|
+-----------------------------------------+-----+
|[10.0,10.0,1.0,2.0,0.0,253.0,8.18,51.0]  |27   |
|[11.0,22.0,1.0,2.0,0.0,1188.0,7.17,127.0]|-19  |
|[2.0,14.0,5.0,4.0,2.0,3618.0,21.17,365.0]|60   |
|[5.0,25.0,3.0,3.0,5.0,621.0,12.92,85.0]  |22   |
|[3.0,28.0,1.0,4.0,3.0,1732.0,13.33,182.0]|70   |
+-----------------------------------------+-----+
only showing top 5 rows



Building The ML Models and using them for carrying out the prediction

## Decision Tree

In [None]:
# Split into training and testing sets in a 80:20 ratio
train_data, test_data = flight_assembler.randomSplit([0.7, 0.3], seed=142)

# Check that training set has around 80% of records
training_ratio = train_data.count() / flight_assembler.count()
print(training_ratio)

In [None]:
# Create a DT classifier object and train the model
dt = DecisionTreeClassifier()
dt_model = dt.fit(train_data)

# Create predictions on test data
pred_dt = dt_model.transform(test_data)
pred_dt.select('label', 'prediction', 'probability').show(5, False)

+-----+----------+----------------------------------------+
|label|prediction|probability                             |
+-----+----------+----------------------------------------+
|0    |1.0       |[0.45795745628689166,0.5420425437131083]|
|0    |1.0       |[0.33855858846965636,0.6614414115303436]|
|1    |1.0       |[0.33855858846965636,0.6614414115303436]|
|0    |1.0       |[0.33855858846965636,0.6614414115303436]|
|1    |1.0       |[0.45795745628689166,0.5420425437131083]|
+-----+----------+----------------------------------------+
only showing top 5 rows



In [None]:
# Create a confusion matrix
pred_dt.groupBy('label', 'prediction').count().show()

# Calculate the True Negative, True Positive, False Positive, False Negative of the confusion matrix
TN = pred_dt.filter('prediction = 0 AND label = prediction').count()
TP = pred_dt.filter('prediction = 1 AND label = prediction').count()
FN = pred_dt.filter('prediction = 0 AND label != prediction').count()
FP = pred_dt.filter('prediction = 1 AND label != prediction').count()

print(TN)
print(TP)
print(FN)
print(FP)

# Calucating accuracy of the decision tree
accuracy = (TN + TP) / (TN + TP + FN + FP)
print(accuracy)

# Calculating precision, recall and F1-score
precision = TP / (TP + FP)
recall = TP / (TP + FN)
print('precision = {:.2f}\nrecall    = {:.2f}'.format(precision, recall))

F1 = 2 * (precision * recall) / (precision + recall)
print(F1)

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0| 8247|
|    0|       0.0|16084|
|    1|       1.0|33616|
|    0|       1.0|19652|
+-----+----------+-----+

16084
33616
8247
19652
0.6404721710331319
precision = 0.63
recall    = 0.80
0.7067307186931705


## Logistic Regression

In [None]:
# Train the model using logistic regression
logistic_model = LogisticRegression().fit(train_data)

# Printing the coefficients and intercept for logistic regression
print("Coefficients: %s" % str(logistic_model.coefficients))
print("Intercept: %s" % str(logistic_model.intercept))

# Summarizing the model over the training set and printing accuracy, precision and recall
train_Summary = logistic_model.summary
print("Accuracy: %s" % str(train_Summary.accuracy))
print("Precision: %s" % str(train_Summary.precisionByLabel))
print("Recall: %s" % str(train_Summary.recallByLabel))  

# Creating predictions on test data
pred_log_reg = logistic_model.transform(test_data)
pred_log_reg.groupBy('label', 'prediction').count().show()

Coefficients: [-0.05628186768554718,-0.0006786536847674437,0.0034686176921529042,-0.047332940615740254,-0.13122707872530115,-0.0006032439639722247,0.07582648553569797,0.009506467271411218]
Intercept: -0.8784838175296692
Accuracy: 0.6155681000608777
Precision: [0.602545739135571, 0.6233745540812081]
Recall: [0.4895503407236779, 0.7234790112802811]
+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0|11773|
|    0|       0.0|17346|
|    1|       1.0|30090|
|    0|       1.0|18390|
+-----+----------+-----+



In [None]:
# Calculate the True Negative, True Positive, False Positive, False Negative of the confusion matrix
TN1 = pred_log_reg.filter('prediction = 0 AND label = prediction').count()
TP1 = pred_log_reg.filter('prediction = 1 AND label = prediction').count()
FN1 = pred_log_reg.filter('prediction = 0 AND label != prediction').count()
FP1 = pred_log_reg.filter('prediction = 1 AND label != prediction').count()

# Calculate precision and recall
precision = TP1 / (TP1 + FP1)
recall = TP1 / (TP1 + FN1)
print('precision = {:.2f}\nrecall    = {:.2f}'.format(precision, recall))

# Computing accuracy and F1-score
accuracy = (TN1 + TP1) / (TN1 + TP1 + FN1 + FP1)
print('accuracy = {:.2f}'.format(accuracy))

F1 = 2 * (precision * recall) / (precision + recall)
print(F1)

# Find weighted precision
multi_evaluator = MulticlassClassificationEvaluator()
weighted_precision = multi_evaluator.evaluate(pred_log_reg, {multi_evaluator.metricName: "weightedPrecision"})
print('weighted_precision = {:.2f}'.format(weighted_precision))

# Find AUC
binary_evaluator = BinaryClassificationEvaluator()
auc = binary_evaluator.evaluate(pred_log_reg, {binary_evaluator.metricName: "areaUnderROC"})
print('AUC = {:.2f}'.format(auc))

precision = 0.62
recall    = 0.72
accuracy = 0.61
0.6661279789247645
weighted_precision = 0.61
AUC = 0.64


## Random Forest

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

# Converting it to a DataFrame.
RF_data = flight_assembler

# Automatically identify and index categorical features.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(RF_data)

# Split the data into training and test sets in the 70-30 ratio respectively
(RF_train, RF_test) = RF_data.randomSplit([0.7, 0.3])

# Train a RandomForest model.
rf = RandomForestRegressor(featuresCol="indexedFeatures")

# Chain indexer and forest in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, rf])

#The below code also runs the indexers and trains the model
model = pipeline.fit(RF_train)

# making predictions
RF_pred = model.transform(RF_test)

# Selecting rows to display
RF_pred.select("prediction", "label", "features").show(5)

# Taking prediction and true label and compute test error
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(RF_pred)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

+------------------+-----+--------------------+
|        prediction|label|            features|
+------------------+-----+--------------------+
|0.5625121603238583|    1|[0.0,1.0,2.0,1.0,...|
|0.5676846689495693|    0|[0.0,1.0,2.0,1.0,...|
|0.6493864675296191|    0|[0.0,1.0,2.0,1.0,...|
|0.7049749199684863|    1|[0.0,1.0,2.0,1.0,...|
|0.7023850773601166|    1|[0.0,1.0,2.0,1.0,...|
+------------------+-----+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 0.471753


## Naive Bayes 

In [None]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Creating the trainer and set its parameters
naive_bayes = NaiveBayes(smoothing=1.0, modelType="multinomial")

# Train model
nb_model = naive_bayes.fit(train_data)

# Rows to display
nb_pred = nb_model.transform(test_data)
nb_pred.show()

# Computing accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(nb_pred)
print("Test set accuracy = " + str(accuracy))

+-----+----+-----------+----------+------+------+---------------+-----+-----------+-----+-----------+----------+--------------------+--------------------+--------------------+----------+
|Month|Date|Day of Week|Airline_ID|Origin|depart|Flight_Duration|Delay|Distance_km|label|airline_idx|origin_idx|            features|       rawPrediction|         probability|prediction|
+-----+----+-----------+----------+------+------+---------------+-----+-----------+-----+-----------+----------+--------------------+--------------------+--------------------+----------+
|    0|   1|          2|        AA|   JFK|   9.0|            385|  -11|     3983.0|    0|        1.0|       2.0|[0.0,1.0,2.0,1.0,...|[-1505.4802112244...|[1.08462806877391...|       1.0|
|    0|   1|          2|        AA|   JFK| 14.92|            245|    6|     2239.0|    0|        1.0|       2.0|[0.0,1.0,2.0,1.0,...|[-977.21772598987...|[0.00324534372163...|       1.0|
|    0|   1|          2|        AA|   JFK| 15.75|            185|

In [None]:
nb_pred.groupBy('label', 'prediction').count().show()
# Calculate the True Negative, True Positive, False Positive, False Negative of the confusion matrix
TN2 = nb_pred.filter('prediction = 0 AND label = prediction').count()
TP2 = nb_pred.filter('prediction = 1 AND label = prediction').count()
FN2 = nb_pred.filter('prediction = 0 AND label != prediction').count()
FP2 = nb_pred.filter('prediction = 1 AND label != prediction').count()

# Calculating precision and recall
precision = TP2 / (TP2 + FP2)
recall = TP2 / (TP2 + FN2)
print('precision = {:.2f}\nrecall    = {:.2f}'.format(precision, recall))

# Calculating accuracy and F1-score
accuracy = (TN2 + TP2) / (TN2 + TP2 + FN2 + FP2)
print('accuracy = {:.2f}'.format(accuracy))

F1 = 2 * (precision * recall) / (precision + recall)
print(F1)

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0|21505|
|    0|       0.0|22197|
|    1|       1.0|20358|
|    0|       1.0|13539|
+-----+----------+-----+

precision = 0.60
recall    = 0.49
accuracy = 0.55
0.5374340021119324
