In [45]:
# install pyspark
#pip install pyspark
#pip install findspark
#import findspark
#findspark.init()

### import neccessary libraries

In [97]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer,StandardScaler
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import DecisionTreeClassifier

In [98]:
# Create a SparkSession
spark = SparkSession.builder.appName("AirlineRecommendationPredictor").master("local").getOrCreate()

In [99]:
# Load the dataset
data = spark.read.csv("C:/Users/EZENWAJIAKU CHINEDU/Desktop/Course Work/Big Data/Book.csv", header=True, inferSchema=True)

In [100]:
import pyspark.sql.functions as fc
print((data.count(), len(data.columns)))
#data.describe().show()


# finding null values in each column
data_null = data.agg(*[fc.count(fc.when(fc.isnull(c), c)).alias(c) for c in data.columns])
#data_null.show() 
# no null values

(8365, 17)


In [101]:
# Drop columns not neccessary for classification
airline_data = data.drop('Name', 'Review Date', 'Month Flown')

In [102]:
airline_data.dtypes
#airline_data.summary
#airline_data.describe().show()

[('Title', 'string'),
 ('Airline', 'string'),
 ('Verified', 'string'),
 ('Reviews', 'string'),
 ('Type of Traveller', 'string'),
 ('Route', 'string'),
 ('Class', 'string'),
 ('Seat Comfort', 'string'),
 ('Staff Service', 'string'),
 ('Food & Beverages', 'string'),
 ('Inflight Entertainment', 'string'),
 ('Value For Money', 'string'),
 ('Overall Rating', 'string'),
 ('Recommended', 'string')]

In [103]:
# converting the interger columns to intergers

from pyspark.sql.types import IntegerType
from pyspark.sql.types import BooleanType

airline_data = airline_data.withColumn("Seat Comfort", airline_data["Seat Comfort"].cast(IntegerType()))
airline_data = airline_data.withColumn("Staff Service", airline_data["Staff Service"].cast(IntegerType()))
airline_data = airline_data.withColumn("Food & Beverages", airline_data["Food & Beverages"].cast(IntegerType()))
airline_data = airline_data.withColumn("Inflight Entertainment", airline_data["Inflight Entertainment"].cast(IntegerType()))
airline_data = airline_data.withColumn("Value For Money", airline_data["Value For Money"].cast(IntegerType()))
airline_data = airline_data.withColumn("Overall Rating", airline_data["Overall Rating"].cast(IntegerType()))
airline_data = airline_data.withColumn("Verified", airline_data["Verified"].cast(BooleanType()))

In [104]:
airline_data.dtypes

[('Title', 'string'),
 ('Airline', 'string'),
 ('Verified', 'boolean'),
 ('Reviews', 'string'),
 ('Type of Traveller', 'string'),
 ('Route', 'string'),
 ('Class', 'string'),
 ('Seat Comfort', 'int'),
 ('Staff Service', 'int'),
 ('Food & Beverages', 'int'),
 ('Inflight Entertainment', 'int'),
 ('Value For Money', 'int'),
 ('Overall Rating', 'int'),
 ('Recommended', 'string')]

In [105]:
# Converting the target variable column from string no and yes to integer 0 and 1
from pyspark.sql.functions import when

airline_data = airline_data.withColumn('Recommended_new', when(airline_data.Recommended=='yes', 1).otherwise(0))
airline_data = airline_data.drop("Recommended")

In [106]:
# Converting the target variable column from string no and yes to integer 0 and 1
from pyspark.sql.functions import when

airline_data = airline_data.withColumn('Verified_new', when(airline_data.Verified=='True', 1).otherwise(0))
airline_data = airline_data.drop("Verified")

In [107]:
airline_data=airline_data.dropna()

In [108]:
airline_data.groupBy('Recommended_new').count().orderBy('count').show()

+---------------+-----+
|Recommended_new|count|
+---------------+-----+
|              0| 3408|
|              1| 3948|
+---------------+-----+



In [109]:
print((airline_data.count(), len(airline_data.columns)))

(7356, 14)


In [110]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer

#label encoding of categorical columns
categorical_cols = ['Title','Airline','Reviews','Type of Traveller','Route','Class','Seat Comfort','Staff Service','Food & Beverages','Value For Money','Overall Rating','Inflight Entertainment','Verified_new']
label_encoders = [StringIndexer(inputCol=col, outputCol=col + "_encoded").fit(airline_data) for col in categorical_cols]
pipeline = Pipeline(stages=label_encoders)
airline_data = pipeline.fit(airline_data).transform(airline_data)

In [111]:
airline_data.dtypes

[('Title', 'string'),
 ('Airline', 'string'),
 ('Reviews', 'string'),
 ('Type of Traveller', 'string'),
 ('Route', 'string'),
 ('Class', 'string'),
 ('Seat Comfort', 'int'),
 ('Staff Service', 'int'),
 ('Food & Beverages', 'int'),
 ('Inflight Entertainment', 'int'),
 ('Value For Money', 'int'),
 ('Overall Rating', 'int'),
 ('Recommended_new', 'int'),
 ('Verified_new', 'int'),
 ('Title_encoded', 'double'),
 ('Airline_encoded', 'double'),
 ('Reviews_encoded', 'double'),
 ('Type of Traveller_encoded', 'double'),
 ('Route_encoded', 'double'),
 ('Class_encoded', 'double'),
 ('Seat Comfort_encoded', 'double'),
 ('Staff Service_encoded', 'double'),
 ('Food & Beverages_encoded', 'double'),
 ('Value For Money_encoded', 'double'),
 ('Overall Rating_encoded', 'double'),
 ('Inflight Entertainment_encoded', 'double'),
 ('Verified_new_encoded', 'double')]

In [112]:
airline_data = airline_data.drop('Title','Airline','Reviews','Verified_new','Type of Traveller','Route','Class','Seat Comfort','Staff Service','Food & Beverages','Value For Money','Overall Rating','Inflight Entertainment','Recommended')

In [113]:
airline_data.dtypes

[('Recommended_new', 'int'),
 ('Title_encoded', 'double'),
 ('Airline_encoded', 'double'),
 ('Reviews_encoded', 'double'),
 ('Type of Traveller_encoded', 'double'),
 ('Route_encoded', 'double'),
 ('Class_encoded', 'double'),
 ('Seat Comfort_encoded', 'double'),
 ('Staff Service_encoded', 'double'),
 ('Food & Beverages_encoded', 'double'),
 ('Value For Money_encoded', 'double'),
 ('Overall Rating_encoded', 'double'),
 ('Inflight Entertainment_encoded', 'double'),
 ('Verified_new_encoded', 'double')]

In [114]:
# show the first five rows
airline_data.show(5)

+---------------+-------------+---------------+---------------+-------------------------+-------------+-------------+--------------------+---------------------+------------------------+-----------------------+----------------------+------------------------------+--------------------+
|Recommended_new|Title_encoded|Airline_encoded|Reviews_encoded|Type of Traveller_encoded|Route_encoded|Class_encoded|Seat Comfort_encoded|Staff Service_encoded|Food & Beverages_encoded|Value For Money_encoded|Overall Rating_encoded|Inflight Entertainment_encoded|Verified_new_encoded|
+---------------+-------------+---------------+---------------+-------------------------+-------------+-------------+--------------------+---------------------+------------------------+-----------------------+----------------------+------------------------------+--------------------+
|              1|        788.0|            3.0|         1617.0|                      0.0|         13.0|          1.0|                 1.0|       

In [115]:
print((airline_data.count(), len(airline_data.columns)))

(7356, 14)


In [116]:
# seperating target variable from other features
features = airline_data.drop("Recommended_new")

# putting all the other features as one
features_col = features.columns
print(features_col)
assembler = VectorAssembler(inputCols=features_col, outputCol="Vfeatures")
airline_data = assembler.transform(airline_data)
#airline_data.show(2)
airline_data = airline_data.select("Vfeatures", "Recommended_new")

['Title_encoded', 'Airline_encoded', 'Reviews_encoded', 'Type of Traveller_encoded', 'Route_encoded', 'Class_encoded', 'Seat Comfort_encoded', 'Staff Service_encoded', 'Food & Beverages_encoded', 'Value For Money_encoded', 'Overall Rating_encoded', 'Inflight Entertainment_encoded', 'Verified_new_encoded']


In [20]:
#airline_data = airline_data.select("Vfeatures", "Recommended_encoded")

In [117]:
airline_data.show(5)

+--------------------+---------------+
|           Vfeatures|Recommended_new|
+--------------------+---------------+
|[788.0,3.0,1617.0...|              1|
|[5465.0,3.0,7321....|              0|
|[825.0,3.0,1379.0...|              1|
|(13,[0,1,2,4,10],...|              1|
|(13,[0,1,2,3,4,10...|              1|
+--------------------+---------------+
only showing top 5 rows



### Data Scaling

##### Feature scaling is done to ensure that the age column is not contributing or giving too much importance in the training because it has large numbers

In [22]:
#airline_data.select("Vfeatures").show()

In [118]:
scaled_data = StandardScaler(inputCol="Vfeatures", outputCol="features")
airline_data = scaled_data.fit(airline_data).transform(airline_data)

In [123]:
#renaming the target column to label
airline_data = airline_data.select("features", "Recommended_new")
airline_data = airline_data.withColumnRenamed("Recommended_new","label")

In [124]:
#splitting into test and train data
train_data, test_data = airline_data.randomSplit([0.8, 0.2], seed=42)

In [126]:
# Logistic Regression
log_reg=LogisticRegression().fit(train_data)

#Get Predictions for Logistic Regression Model
predictions = log_reg.transform(test_data)
multi_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="label")

#Metrics for evaluation
recall = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedRecall"})
accuracy = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "accuracy"})
precision = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedPrecision"})
auc = evaluator.evaluate(predictions)
f1_Score = (2*precision*recall)/(precision+recall)

print("AUC-ROC: ", auc)
print("Accuracy: ", accuracy)
print("Precision: ", precision)
print("Recall: ", recall)
print("F-Score: ",f1_Score)

AUC-ROC:  0.801257937952514
Accuracy:  0.7144886363636364
Precision:  0.7142703553986419
Recall:  0.7144886363636364
F-Score:  0.7143794792070255


In [127]:
#Display the Logistic Regresssion predictions
predictions.show()

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|(13,[0,1,2,3,4,5,...|    1|[-1.9934682617580...|[0.11989042018618...|       1.0|
|(13,[0,1,2,3,4,5,...|    1|[-2.4910545809389...|[0.07648767131528...|       1.0|
|(13,[0,1,2,3,4,5,...|    1|[-1.8251995944161...|[0.13881113179141...|       1.0|
|(13,[0,1,2,3,4,5,...|    1|[-1.9759665654852...|[0.12174946148467...|       1.0|
|(13,[0,1,2,3,4,6,...|    0|[1.09666177081946...|[0.74963409959556...|       0.0|
|(13,[0,1,2,3,4,6,...|    0|[0.18514436647063...|[0.54615432536921...|       0.0|
|(13,[0,1,2,3,4,6,...|    0|[2.01302723018359...|[0.88215808275607...|       0.0|
|(13,[0,1,2,3,4,6,...|    1|[-0.7945885943192...|[0.31118425941051...|       1.0|
|(13,[0,1,2,3,4,6,...|    1|[-2.0604753438748...|[0.11299817778541...|       1.0|
|(13,[0,1,2,3,4,

In [128]:
# Random Forest classification

random_forest = RandomForestClassifier(labelCol="label", featuresCol="features")
model = random_forest.fit(train_data)

#Get predictions for Randomforest Boost model
predictionRDF = model.transform(test_data)

# metrics evaluation
recall = multi_evaluator.evaluate(predictionRDF, {multi_evaluator.metricName: "weightedRecall"})
accuracy = multi_evaluator.evaluate(predictionRDF, {multi_evaluator.metricName: "accuracy"})
precision = multi_evaluator.evaluate(predictionRDF, {multi_evaluator.metricName: "weightedPrecision"})
auc = evaluator.evaluate(predictionRDF)
f1_Score = (2*precision*recall)/(precision+recall)

print("AUC-ROC: ", auc)
print("Accuracy: ", accuracy)
print("Precision: ", precision)
print("Recall: ", recall)
print("F-Score: ",f1_Score)

AUC-ROC:  0.977222626703884
Accuracy:  0.9183238636363636
Precision:  0.9213888251183401
Recall:  0.9183238636363636
F-Score:  0.9198537912643187


In [129]:
predictionRDF.show()

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|(13,[0,1,2,3,4,5,...|    1|[0.53628103845633...|[0.02681405192281...|       1.0|
|(13,[0,1,2,3,4,5,...|    1|[1.00972453633475...|[0.05048622681673...|       1.0|
|(13,[0,1,2,3,4,5,...|    1|[0.43833938117239...|[0.02191696905861...|       1.0|
|(13,[0,1,2,3,4,5,...|    1|[0.43833938117239...|[0.02191696905861...|       1.0|
|(13,[0,1,2,3,4,6,...|    0|[15.8198974975502...|[0.79099487487751...|       0.0|
|(13,[0,1,2,3,4,6,...|    0|[19.0769803686714...|[0.95384901843357...|       0.0|
|(13,[0,1,2,3,4,6,...|    0|[16.4688017039836...|[0.82344008519918...|       0.0|
|(13,[0,1,2,3,4,6,...|    1|[0.67325604713817...|[0.03366280235690...|       1.0|
|(13,[0,1,2,3,4,6,...|    1|[1.44644341758519...|[0.07232217087925...|       1.0|
|(13,[0,1,2,3,4,

In [130]:
# Decision Tree classification

Decision_Tree = DecisionTreeClassifier(labelCol="label", featuresCol="features")
model = Decision_Tree.fit(train_data)

#Get predictions for Decision Tree model
predictionDT = model.transform(test_data)

# metrics evaluation
recall = multi_evaluator.evaluate(predictionDT, {multi_evaluator.metricName: "weightedRecall"})
accuracy = multi_evaluator.evaluate(predictionDT, {multi_evaluator.metricName: "accuracy"})
precision = multi_evaluator.evaluate(predictionDT, {multi_evaluator.metricName: "weightedPrecision"})
auc = evaluator.evaluate(predictionDT)
f1_Score = (2*precision*recall)/(precision+recall)

print("AUC-ROC: ", auc)
print("Accuracy: ", accuracy)
print("Precision: ", precision)
print("Recall: ", recall)
print("F-Score: ",f1_Score)

AUC-ROC:  0.9453494721514379
Accuracy:  0.9161931818181818
Precision:  0.919355231165893
Recall:  0.9161931818181819
F-Score:  0.9177714829038154


In [131]:
predictionDT.show()

+--------------------+-----+-------------+--------------------+----------+
|            features|label|rawPrediction|         probability|prediction|
+--------------------+-----+-------------+--------------------+----------+
|(13,[0,1,2,3,4,5,...|    1| [7.0,1675.0]|[0.00416171224732...|       1.0|
|(13,[0,1,2,3,4,5,...|    1| [7.0,1675.0]|[0.00416171224732...|       1.0|
|(13,[0,1,2,3,4,5,...|    1| [7.0,1675.0]|[0.00416171224732...|       1.0|
|(13,[0,1,2,3,4,5,...|    1| [7.0,1675.0]|[0.00416171224732...|       1.0|
|(13,[0,1,2,3,4,6,...|    0|[482.0,343.0]|[0.58424242424242...|       0.0|
|(13,[0,1,2,3,4,6,...|    0|[1446.0,10.0]|[0.99313186813186...|       0.0|
|(13,[0,1,2,3,4,6,...|    0| [569.0,39.0]|[0.93585526315789...|       0.0|
|(13,[0,1,2,3,4,6,...|    1| [7.0,1675.0]|[0.00416171224732...|       1.0|
|(13,[0,1,2,3,4,6,...|    1| [7.0,1675.0]|[0.00416171224732...|       1.0|
|(13,[0,1,2,3,4,6,...|    1| [7.0,1675.0]|[0.00416171224732...|       1.0|
|(13,[0,1,2,3,4,6,...|   