# Installing PySpark 

In [1]:
#pip install pyspark


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.1.2[0m[39;49m -> [0m[32;49m23.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip3 install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


# Installing and Importing All Libraries

In [2]:
import numpy as np
import pandas as pd 
import os
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

from pyspark.sql.functions import round
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

# Creating A Spark Session

In [3]:
spark = SparkSession.builder \
                    .master('local[*]') \
                    .appName('Flight Predict Project') \
                    .getOrCreate()
#Printing the version Of Pyspark installed
print(spark.version)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/17 22:03:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


3.5.0


# Reading the dataset

In [4]:
flights_Predict = spark.read.csv('flights-larger.csv',
                         sep=',',
                         header=True,
                         inferSchema=True,
                         nullValue='NA')

# Printing Out the Dataset

In [6]:
# Getting The number of records
print("The dataset contain %d records." % flights_Predict.count())
# Printing first ten rows of the dataset
flights_Predict.show(10)

The dataset contain 275000 records.
+---+---+---+-------+------+---+----+------+--------+-----+
|mon|dom|dow|carrier|flight|org|mile|depart|duration|delay|
+---+---+---+-------+------+---+----+------+--------+-----+
| 10| 10|  1|     OO|  5836|ORD| 157|  8.18|      51|   27|
|  1|  4|  1|     OO|  5866|ORD| 466|  15.5|     102| NULL|
| 11| 22|  1|     OO|  6016|ORD| 738|  7.17|     127|  -19|
|  2| 14|  5|     B6|   199|JFK|2248| 21.17|     365|   60|
|  5| 25|  3|     WN|  1675|SJC| 386| 12.92|      85|   22|
|  3| 28|  1|     B6|   377|LGA|1076| 13.33|     182|   70|
|  5| 28|  6|     B6|   904|ORD| 740|  9.58|     130|   47|
|  1| 19|  2|     UA|   820|SFO| 679| 12.75|     123|  135|
|  8|  5|  5|     US|  2175|LGA| 214|  13.0|      71|  -10|
|  5| 27|  5|     AA|  1240|ORD|1197| 14.42|     195|  -11|
+---+---+---+-------+------+---+----+------+--------+-----+
only showing top 10 rows



# Checking data Type of each columns

In [7]:
print(flights_Predict.dtypes)

[('mon', 'int'), ('dom', 'int'), ('dow', 'int'), ('carrier', 'string'), ('flight', 'int'), ('org', 'string'), ('mile', 'int'), ('depart', 'double'), ('duration', 'int'), ('delay', 'int')]


# Data Pre-processing

In [8]:
# Removing the 'flight' column
flights_Predict =  flights_Predict.drop('flight')

# Removing records with missing values 
flights_Predict = flights_Predict.dropna()
print(flights_Predict.count())

258289


# Converting Miles to Kilometers

In [10]:
# Conversion From 'mile' to 'km' 
flights_Predict_km = flights_Predict.withColumn('km', round(flights_Predict.mile * 1.60934, 0)) \
                    .drop('mile')

# Generating a 'label' column to indicate the delay status of a flight.
flights_Predict_km = flights_Predict_km.withColumn('label', (flights_Predict_km.delay >= 15).cast('integer'))

# Printing the ten records
flights_Predict_km.show(10)

+---+---+---+-------+---+------+--------+-----+------+-----+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|label|
+---+---+---+-------+---+------+--------+-----+------+-----+
| 10| 10|  1|     OO|ORD|  8.18|      51|   27| 253.0|    1|
| 11| 22|  1|     OO|ORD|  7.17|     127|  -19|1188.0|    0|
|  2| 14|  5|     B6|JFK| 21.17|     365|   60|3618.0|    1|
|  5| 25|  3|     WN|SJC| 12.92|      85|   22| 621.0|    1|
|  3| 28|  1|     B6|LGA| 13.33|     182|   70|1732.0|    1|
|  5| 28|  6|     B6|ORD|  9.58|     130|   47|1191.0|    1|
|  1| 19|  2|     UA|SFO| 12.75|     123|  135|1093.0|    1|
|  8|  5|  5|     US|LGA|  13.0|      71|  -10| 344.0|    0|
|  5| 27|  5|     AA|ORD| 14.42|     195|  -11|1926.0|    0|
|  8| 20|  6|     B6|JFK| 14.67|     198|   20|1902.0|    1|
+---+---+---+-------+---+------+--------+-----+------+-----+
only showing top 10 rows



# One Hot Encoding

In [11]:
# Generate an indexer to identify categories and create a new column with numeric index values
flights_indexed = StringIndexer(inputCol='carrier', outputCol='carrier_idx').fit(flights_Predict_km).transform(flights_Predict_km)

# Extend the process to the 'org' column
flights_indexed = StringIndexer(inputCol='org', outputCol='org_idx').fit(flights_indexed).transform(flights_indexed)
flights_indexed.show(10)

+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|label|carrier_idx|org_idx|
+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+
| 10| 10|  1|     OO|ORD|  8.18|      51|   27| 253.0|    1|        2.0|    0.0|
| 11| 22|  1|     OO|ORD|  7.17|     127|  -19|1188.0|    0|        2.0|    0.0|
|  2| 14|  5|     B6|JFK| 21.17|     365|   60|3618.0|    1|        4.0|    2.0|
|  5| 25|  3|     WN|SJC| 12.92|      85|   22| 621.0|    1|        3.0|    5.0|
|  3| 28|  1|     B6|LGA| 13.33|     182|   70|1732.0|    1|        4.0|    3.0|
|  5| 28|  6|     B6|ORD|  9.58|     130|   47|1191.0|    1|        4.0|    0.0|
|  1| 19|  2|     UA|SFO| 12.75|     123|  135|1093.0|    1|        0.0|    1.0|
|  8|  5|  5|     US|LGA|  13.0|      71|  -10| 344.0|    0|        6.0|    3.0|
|  5| 27|  5|     AA|ORD| 14.42|     195|  -11|1926.0|    0|        1.0|    0.0|
|  8| 20|  6|     B6|JFK| 14

# Feature Engineering

In [12]:
# Instantiate an assembler object
assembler = VectorAssembler(inputCols=['mon', 'dom', 'dow', 'carrier_idx', 'org_idx', 'km', 'depart', 'duration'], outputCol='features')

# Combine predictor columns
flights_assembled = assembler.transform(flights_indexed)

# Examine the resulting column
flights_assembled.select('features', 'delay').show(5, truncate=False)

+-----------------------------------------+-----+
|features                                 |delay|
+-----------------------------------------+-----+
|[10.0,10.0,1.0,2.0,0.0,253.0,8.18,51.0]  |27   |
|[11.0,22.0,1.0,2.0,0.0,1188.0,7.17,127.0]|-19  |
|[2.0,14.0,5.0,4.0,2.0,3618.0,21.17,365.0]|60   |
|[5.0,25.0,3.0,3.0,5.0,621.0,12.92,85.0]  |22   |
|[3.0,28.0,1.0,4.0,3.0,1732.0,13.33,182.0]|70   |
+-----------------------------------------+-----+
only showing top 5 rows



# Training Data

In [13]:
# Divide the dataset into training and testing sets with an 80:20 ratio
flights_train, flights_test = flights_assembled.randomSplit([0.8, 0.2], seed=42)

# Verify that the training set constitutes approximately 80% of the records
training_ratio = flights_train.count() / flights_assembled.count()
print(training_ratio)

                                                                                

0.7998753334443202


# Decision Tree Model

In [14]:
# Create a DT classifier object and fit to the training data
tree = DecisionTreeClassifier()
tree_model = tree.fit(flights_train)
# Create predictions on test data
prediction = tree_model.transform(flights_test)
prediction.select('label', 'prediction', 'probability').show(5, False)

                                                                                

+-----+----------+----------------------------------------+
|label|prediction|probability                             |
+-----+----------+----------------------------------------+
|0    |1.0       |[0.32147118544723785,0.6785288145527621]|
|1    |0.0       |[0.5346618613525546,0.4653381386474454] |
|1    |0.0       |[0.5346618613525546,0.4653381386474454] |
|1    |1.0       |[0.32147118544723785,0.6785288145527621]|
|1    |1.0       |[0.32147118544723785,0.6785288145527621]|
+-----+----------+----------------------------------------+
only showing top 5 rows



# Confusion Matrix

In [16]:
# Create a confusion matrix
prediction.groupBy('label', 'prediction').count().show()

# Calculate the elements of the confusion matrix
TN = prediction.filter('prediction = 0 AND label = prediction').count()
TP = prediction.filter('prediction = 1 AND label = prediction').count()
FN = prediction.filter('prediction = 0 AND label != prediction').count()
FP = prediction.filter('prediction = 1 AND label != prediction').count()

# Accuracy measures the proportion of correct predictions
accuracy = (TN + TP) / (TN + TP + FN + FP)
print(accuracy)

                                                                                

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0| 9251|
|    0|       0.0|16079|
|    1|       1.0|16764|
|    0|       1.0| 9596|
+-----+----------+-----+

0.6353840201199459


# Logistic Regression

In [17]:
# Create a classifier object and train on training data
logistic = LogisticRegression().fit(flights_train)
# Create predictions for the testing data and show confusion matrix
prediction = logistic.transform(flights_test)
prediction.groupBy('label', 'prediction').count().show()

23/12/17 22:16:10 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS


+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0| 9471|
|    0|       0.0|14934|
|    1|       1.0|16544|
|    0|       1.0|10741|
+-----+----------+-----+



In [18]:
# Calculate precision and recall
precision = TP / (TP + FP)
recall = TP / (TP + FN)
print('precision = {:.2f}\nrecall    = {:.2f}'.format(precision, recall))

# Find weighted precision
multi_evaluator = MulticlassClassificationEvaluator()
weighted_precision = multi_evaluator.evaluate(prediction, {multi_evaluator.metricName: "weightedPrecision"})

# Find AUC
binary_evaluator = BinaryClassificationEvaluator()
auc = binary_evaluator.evaluate(prediction, {binary_evaluator.metricName: "areaUnderROC"})

precision = 0.64
recall    = 0.64


# Naive Bayes

In [19]:
# from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

assembler = VectorAssembler(inputCols=['mon', 'dom', 'dow',
'carrier_idx', 'org_idx', 'km', 'depart', 'duration'], outputCol="features_NB2")
flights_train = assembler.transform(flights_train)

# Create a Naive Bayes classifier
naive_bayes = NaiveBayes(smoothing=1.0, modelType="multinomial", labelCol="label", featuresCol="features_NB2")

# Train the Naive Bayes model
naive_bayes_model = naive_bayes.fit(flights_train)

# Assuming 'flights_test' is your test dataset
flights_test = assembler.transform(flights_test)

# Make predictions on the test data
predictions = naive_bayes_model.transform(flights_test)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")

# Show confusion matrix
predictions.groupBy('label', 'prediction').count().show()

                                                                                

Accuracy: 0.557109692396982
+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0|13343|
|    0|       0.0|16125|
|    1|       1.0|12672|
|    0|       1.0| 9550|
+-----+----------+-----+



# Random Forest

In [21]:
# from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


# Assuming 'flights_train' has columns 'feature1', 'feature2', ..., and 'label'
assembler = VectorAssembler(inputCols=['mon', 'dom', 'dow',
'carrier_idx', 'org_idx', 'km', 'depart', 'duration'], outputCol="features_RF")
flights_train = assembler.transform(flights_train)

# Create a Random Forest classifier
random_forest = RandomForestClassifier(labelCol="label", featuresCol="features_RF", numTrees=10)

# Train the Random Forest model
random_forest_model = random_forest.fit(flights_train)

# Assuming 'flights_test' is your test dataset
flights_test = assembler.transform(flights_test)

# Make predictions on the test data
predictions = random_forest_model.transform(flights_test)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")

# Show predictions including label, predicted label, and probability
predictions.select('label', 'prediction', 'probability').show(5, False)

                                                                                

Accuracy: 0.6344360611336816
+-----+----------+----------------------------------------+
|label|prediction|probability                             |
+-----+----------+----------------------------------------+
|0    |1.0       |[0.35090104797685606,0.6490989520231438]|
|1    |1.0       |[0.49068345563733534,0.5093165443626646]|
|1    |1.0       |[0.49068345563733534,0.5093165443626646]|
|1    |1.0       |[0.3548775052248382,0.6451224947751618] |
|1    |1.0       |[0.3548775052248382,0.6451224947751618] |
+-----+----------+----------------------------------------+
only showing top 5 rows



In [22]:
# Create a confusion matrix
predictions.groupBy('label', 'prediction').count().show()

# Calculate the elements of the confusion matrix
TN = predictions.filter('prediction = 0 AND label = prediction').count()
TP = predictions.filter('prediction = 1 AND label = prediction').count()
FN = predictions.filter('prediction = 0 AND label != prediction').count()
FP = predictions.filter('prediction = 1 AND label != prediction').count()

# Accuracy measures the proportion of correct predictions
accuracy = (TN + TP) / (TN + TP + FN + FP)
print(f"Accuracy: {accuracy}")

# Precision measures the accuracy of the positive predictions
precision = TP / (TP + FP)
print(f"Precision: {precision}")

# Recall (Sensitivity) measures the proportion of actual positives correctly predicted
recall = TP / (TP + FN)
print(f"Recall: {recall}")

# F1 Score is the harmonic mean of precision and recall
f1_score = 2 * (precision * recall) / (precision + recall)
print(f"F1 Score: {f1_score}")

                                                                                

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0| 7432|
|    0|       0.0|14211|
|    1|       1.0|18583|
|    0|       1.0|11464|
+-----+----------+-----+



                                                                                

Accuracy: 0.6344360611336816
Precision: 0.6184644057643026
Recall: 0.7143186623102057
F1 Score: 0.6629445970532625


# Support Vector Machines (SVM)

In [23]:
#from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LinearSVC, OneVsRest
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


# Assuming 'flights_train' has columns 'feature1', 'feature2', ..., and 'label'
assembler = VectorAssembler(inputCols=['mon', 'dom', 'dow',
'carrier_idx', 'org_idx', 'km', 'depart', 'duration'], outputCol="features_SVM")
flights_train = assembler.transform(flights_train)

# Create a Linear SVM classifier
svm = LinearSVC(maxIter=10, regParam=0.1, labelCol="label", featuresCol="features_SVM")

# Wrap the SVM with OneVsRest for multiclass classification
ovr = OneVsRest(classifier=svm, labelCol="label", featuresCol="features_SVM")

# Train the SVM model
svm_model = ovr.fit(flights_train)

# Assuming 'flights_test' is your test dataset
flights_test = assembler.transform(flights_test)

# Make predictions on the test data
predictions = svm_model.transform(flights_test)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")

# Show confusion matrix
predictions.groupBy('label', 'prediction').count().show()

                                                                                

Accuracy: 0.603869220352099




+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0| 9028|
|    0|       0.0|14227|
|    1|       1.0|16987|
|    0|       1.0|11448|
+-----+----------+-----+



                                                                                

# Shutting The spark Session

In [24]:
spark.stop()