In [None]:
from pyspark import SparkConf, SparkContext 

# In Jupyter you have to stop the current context first
spark.stop()
# Create new config
conf = (SparkConf().set("spark.driver.maxResultSize", "1g"))

# Create new context
sc = SparkContext(conf=conf)

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [None]:
# install Java8
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

#!apt-get update

# download spark3.0.1
!wget -q http://apache.osuosl.org/spark/spark-3.0.1/spark-3.0.1-bin-hadoop3.2.tgz

# unzip it
!tar xf spark-3.0.1-bin-hadoop3.2.tgz

# install findspark 
!pip install -q findspark

# environmental variable of java and sp ark was set
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop3.2"

import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Flight_ticket_prediction').getOrCreate()

In [None]:
import pandas as pd

from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

import pyspark.sql.functions as f

In [None]:
from google.colab import files
uploaded = files.upload()

Saving Data_Train.xlsx to Data_Train.xlsx
Saving Test_set.xlsx to Test_set.xlsx


In [None]:
read_file = pd.read_excel ("Data_Train.xlsx") 

read_file.to_csv ("Data_Train.csv", index = None, header=True) 

train_df = spark.read.csv("Data_Train.csv", inferSchema = True, header = True)
train_df.show()

+-----------------+---------------+--------+-----------+--------------------+--------+------------+--------+-----------+--------------------+-----+
|          Airline|Date_of_Journey|  Source|Destination|               Route|Dep_Time|Arrival_Time|Duration|Total_Stops|     Additional_Info|Price|
+-----------------+---------------+--------+-----------+--------------------+--------+------------+--------+-----------+--------------------+-----+
|           IndiGo|     24/03/2019|Banglore|  New Delhi|           BLR → DEL|   22:20|01:10 22 Mar|  2h 50m|   non-stop|             No info| 3897|
|        Air India|      1/05/2019| Kolkata|   Banglore|CCU → IXR → BBI →...|   05:50|       13:15|  7h 25m|    2 stops|             No info| 7662|
|      Jet Airways|      9/06/2019|   Delhi|     Cochin|DEL → LKO → BOM →...|   09:25|04:25 10 Jun|     19h|    2 stops|             No info|13882|
|           IndiGo|     12/05/2019| Kolkata|   Banglore|     CCU → NAG → BLR|   18:05|       23:30|  5h 25m|    

In [None]:
read_file = pd.read_excel ("Test_set.xlsx") 

read_file.to_csv ("Test_set.csv", index = None, header=True) 

test_df = spark.read.csv("Test_set.csv", inferSchema = True, header = True)
test_df.show()

+-----------------+---------------+--------+-----------+--------------------+--------+------------+--------+-----------+--------------------+
|          Airline|Date_of_Journey|  Source|Destination|               Route|Dep_Time|Arrival_Time|Duration|Total_Stops|     Additional_Info|
+-----------------+---------------+--------+-----------+--------------------+--------+------------+--------+-----------+--------------------+
|      Jet Airways|      6/06/2019|   Delhi|     Cochin|     DEL → BOM → COK|   17:30|04:25 07 Jun| 10h 55m|     1 stop|             No info|
|           IndiGo|     12/05/2019| Kolkata|   Banglore|     CCU → MAA → BLR|   06:20|       10:20|      4h|     1 stop|             No info|
|      Jet Airways|     21/05/2019|   Delhi|     Cochin|     DEL → BOM → COK|   19:15|19:00 22 May| 23h 45m|     1 stop|In-flight meal no...|
|Multiple carriers|     21/05/2019|   Delhi|     Cochin|     DEL → BOM → COK|   08:00|       21:00|     13h|     1 stop|             No info|
|     

In [None]:
train_df.printSchema()

root
 |-- Airline: string (nullable = true)
 |-- Date_of_Journey: string (nullable = true)
 |-- Source: string (nullable = true)
 |-- Destination: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Dep_Time: string (nullable = true)
 |-- Arrival_Time: string (nullable = true)
 |-- Duration: string (nullable = true)
 |-- Total_Stops: string (nullable = true)
 |-- Additional_Info: string (nullable = true)
 |-- Price: integer (nullable = true)



In [None]:
test_df.printSchema()

root
 |-- Airline: string (nullable = true)
 |-- Date_of_Journey: string (nullable = true)
 |-- Source: string (nullable = true)
 |-- Destination: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Dep_Time: string (nullable = true)
 |-- Arrival_Time: string (nullable = true)
 |-- Duration: string (nullable = true)
 |-- Total_Stops: string (nullable = true)
 |-- Additional_Info: string (nullable = true)



In [None]:
null_values = train_df.agg(*[f.count(f.when(f.isnull(col), col)).alias(col) for col in train_df.columns])
null_values.show()

+-------+---------------+------+-----------+-----+--------+------------+--------+-----------+---------------+-----+
|Airline|Date_of_Journey|Source|Destination|Route|Dep_Time|Arrival_Time|Duration|Total_Stops|Additional_Info|Price|
+-------+---------------+------+-----------+-----+--------+------------+--------+-----------+---------------+-----+
|      0|              0|     0|          0|    1|       0|           0|       0|          1|              0|    0|
+-------+---------------+------+-----------+-----+--------+------------+--------+-----------+---------------+-----+



In [None]:
train_df = train_df.fillna({ 'Route':0, 'Total_Stops':0 })

In [None]:
null_valuesfinal = train_df.agg(*[f.count(f.when(f.isnull(col), col)).alias(col) for col in train_df.columns])
null_valuesfinal.show()

+-------+---------------+------+-----------+-----+--------+------------+--------+-----------+---------------+-----+
|Airline|Date_of_Journey|Source|Destination|Route|Dep_Time|Arrival_Time|Duration|Total_Stops|Additional_Info|Price|
+-------+---------------+------+-----------+-----+--------+------------+--------+-----------+---------------+-----+
|      0|              0|     0|          0|    0|       0|           0|       0|          0|              0|    0|
+-------+---------------+------+-----------+-----+--------+------------+--------+-----------+---------------+-----+



In [None]:
#null_values_test = test_df.agg(*[f.count(f.when(f.isnull(col), col)).alias(col) for col in test_df.columns])
#null_values_test.show()

In [None]:
train_df.dtypes

[('Airline', 'string'),
 ('Date_of_Journey', 'string'),
 ('Source', 'string'),
 ('Destination', 'string'),
 ('Route', 'string'),
 ('Dep_Time', 'string'),
 ('Arrival_Time', 'string'),
 ('Duration', 'string'),
 ('Total_Stops', 'string'),
 ('Additional_Info', 'string'),
 ('Price', 'int')]

In [None]:

categoricalColumns = [item[0] for item in train_df.dtypes if item[1].startswith('string')]
categoricalColumns

['Airline',
 'Date_of_Journey',
 'Source',
 'Destination',
 'Route',
 'Dep_Time',
 'Arrival_Time',
 'Duration',
 'Total_Stops',
 'Additional_Info']

In [None]:
numericalColumns = [item[0] for item in train_df.dtypes if item[1].startswith('int')]
numericalColumns

['Price']

In [None]:
stages = []
for cols in categoricalColumns:
  stringIndexer = StringIndexer(inputCol= cols, outputCol = cols + '_Index')
  encoder = OneHotEncoder(inputCols = [stringIndexer.getOutputCol()], outputCols= [cols + "OHE"])
  stages += [stringIndexer, encoder]

assembly = [c + "OHE" for c in categoricalColumns] + numericalColumns
assembler = VectorAssembler(inputCols = assembly, outputCol = "features")
stages += [assembler]

In [None]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(train_df)
train_df = pipelineModel.transform(train_df)
selectedCols = ['Price', 'features'] + categoricalColumns
train_df = train_df.select(selectedCols)
train_df.printSchema()

root
 |-- Price: integer (nullable = true)
 |-- features: vector (nullable = true)
 |-- Airline: string (nullable = true)
 |-- Date_of_Journey: string (nullable = true)
 |-- Source: string (nullable = true)
 |-- Destination: string (nullable = true)
 |-- Route: string (nullable = false)
 |-- Dep_Time: string (nullable = true)
 |-- Arrival_Time: string (nullable = true)
 |-- Duration: string (nullable = true)
 |-- Total_Stops: string (nullable = false)
 |-- Additional_Info: string (nullable = true)



In [None]:
categoricalColumns_test = [item[0] for item in test_df.dtypes if item[1].startswith('string')]
categoricalColumns_test

['Airline',
 'Date_of_Journey',
 'Source',
 'Destination',
 'Route',
 'Dep_Time',
 'Arrival_Time',
 'Duration',
 'Total_Stops',
 'Additional_Info']

In [None]:
indexer_test = StringIndexer(inputCols= ['Airline',
                                        'Date_of_Journey',
                                        'Source',
                                        'Destination',
                                        'Route',
                                        'Dep_Time',
                                        'Arrival_Time',
                                        'Duration',
                                        'Total_Stops',
                                        'Additional_Info'], 
                        outputCols=['Airline_index',
                                    'Date_of_Journey_index',
                                    'Source_index',
                                    'Destination_index',
                                    'Route_index',
                                    'Dep_Time_index',
                                    'Arrival_Time_index',
                                    'Duration_index',
                                    'Total_Stops_index',
                                    'Additional_Info_index']) 

indexed = indexer_test.fit(test_df).transform(test_df) 
indexed.show()

+-----------------+---------------+--------+-----------+--------------------+--------+------------+--------+-----------+--------------------+-----------------+-----------+---------------------+--------------+--------------+-------------+---------------------+------------------+------------+-----------------+
|          Airline|Date_of_Journey|  Source|Destination|               Route|Dep_Time|Arrival_Time|Duration|Total_Stops|     Additional_Info|Total_Stops_index|Route_index|Additional_Info_index|Duration_index|Dep_Time_index|Airline_index|Date_of_Journey_index|Arrival_Time_index|Source_index|Destination_index|
+-----------------+---------------+--------+-----------+--------------------+--------+------------+--------+-----------+--------------------+-----------------+-----------+---------------------+--------------+--------------+-------------+---------------------+------------------+------------+-----------------+
|      Jet Airways|      6/06/2019|   Delhi|     Cochin|     DEL → BOM

In [None]:
new_test_df = indexed.drop(*['Airline',
                             'Date_of_Journey',
                              'Source',
                              'Destination',
                              'Route',
                              'Dep_Time',
                              'Arrival_Time',
                              'Duration',
                              'Total_Stops',
                              'Additional_Info'])


In [None]:
new_test_df.show()

+-----------------+-----------+---------------------+--------------+--------------+-------------+---------------------+------------------+------------+-----------------+
|Total_Stops_index|Route_index|Additional_Info_index|Duration_index|Dep_Time_index|Airline_index|Date_of_Journey_index|Arrival_Time_index|Source_index|Destination_index|
+-----------------+-----------+---------------------+--------------+--------------+-------------+---------------------+------------------+------------+-----------------+
|              0.0|        0.0|                  0.0|         167.0|          48.0|          0.0|                  3.0|              32.0|         0.0|              0.0|
|              0.0|       19.0|                  0.0|         222.0|          69.0|          1.0|                 18.0|             204.0|         1.0|              1.0|
|              0.0|        0.0|                  1.0|         132.0|          74.0|          0.0|                  5.0|             137.0|         0.0

In [None]:
assembler = VectorAssembler(
    inputCols=[x for x in new_test_df.columns],
    outputCol='features')

output_test_df = assembler.transform(new_test_df)
output_test_df.show()

+-----------------+-----------+---------------------+--------------+--------------+-------------+---------------------+------------------+------------+-----------------+--------------------+
|Total_Stops_index|Route_index|Additional_Info_index|Duration_index|Dep_Time_index|Airline_index|Date_of_Journey_index|Arrival_Time_index|Source_index|Destination_index|            features|
+-----------------+-----------+---------------------+--------------+--------------+-------------+---------------------+------------------+------------+-----------------+--------------------+
|              0.0|        0.0|                  0.0|         167.0|          48.0|          0.0|                  3.0|              32.0|         0.0|              0.0|(10,[3,4,6,7],[16...|
|              0.0|       19.0|                  0.0|         222.0|          69.0|          1.0|                 18.0|             204.0|         1.0|              1.0|[0.0,19.0,0.0,222...|
|              0.0|        0.0|              

In [None]:
finalized_test_data = output_test_df.select('features')
finalized_test_data.show()

+--------------------+
|            features|
+--------------------+
|(10,[3,4,6,7],[16...|
|[0.0,19.0,0.0,222...|
|(10,[2,3,4,6,7],[...|
|(10,[3,4,5,6,7],[...|
|[1.0,1.0,0.0,0.0,...|
|(10,[2,3,4,6,7],[...|
|[0.0,64.0,0.0,151...|
|[0.0,14.0,0.0,11....|
|[1.0,3.0,0.0,2.0,...|
|[0.0,2.0,0.0,85.0...|
|[2.0,39.0,1.0,295...|
|[0.0,7.0,0.0,16.0...|
|(10,[3,4,5,6,7],[...|
|[0.0,6.0,0.0,164....|
|(10,[2,3,4,6,7],[...|
|[0.0,6.0,0.0,119....|
|[0.0,7.0,0.0,87.0...|
|[0.0,5.0,0.0,51.0...|
|(10,[2,3,4,6,7],[...|
|[0.0,6.0,0.0,16.0...|
+--------------------+
only showing top 20 rows



In [None]:
train_df.show()

+-----+--------------------+-----------------+---------------+--------+-----------+--------------------+--------+------------+--------+-----------+--------------------+
|Price|            features|          Airline|Date_of_Journey|  Source|Destination|               Route|Dep_Time|Arrival_Time|Duration|Total_Stops|     Additional_Info|
+-----+--------------------+-----------------+---------------+--------+-----------+--------------------+--------+------------+--------+-----------+--------------------+
| 3897|(2136,[1,25,56,61...|           IndiGo|     24/03/2019|Banglore|  New Delhi|           BLR → DEL|   22:20|01:10 22 Mar|  2h 50m|   non-stop|             No info|
| 7662|(2136,[2,30,55,59...|        Air India|      1/05/2019| Kolkata|   Banglore|CCU → IXR → BBI →...|   05:50|       13:15|  7h 25m|    2 stops|             No info|
|13882|(2136,[0,14,54,58...|      Jet Airways|      9/06/2019|   Delhi|     Cochin|DEL → LKO → BOM →...|   09:25|04:25 10 Jun|     19h|    2 stops|        

In [None]:
finalized_train_df = train_df.select("features", "Price")
finalized_train_df.show()

+--------------------+-----+
|            features|Price|
+--------------------+-----+
|(2136,[1,25,56,61...| 3897|
|(2136,[2,30,55,59...| 7662|
|(2136,[0,14,54,58...|13882|
|(2136,[1,31,55,59...| 6218|
|(2136,[1,37,56,61...|13302|
|(2136,[4,21,55,59...| 3873|
|(2136,[0,38,56,61...|11087|
|(2136,[0,37,56,61...|22270|
|(2136,[0,38,56,61...|11087|
|(2136,[3,19,54,58...| 8625|
|(2136,[2,22,54,58...| 8907|
|(2136,[1,52,55,59...| 4174|
|(2136,[2,21,70,20...| 4667|
|(2136,[0,16,55,59...| 9663|
|(2136,[1,48,55,59...| 4804|
|(2136,[2,33,54,58...|14011|
|(2136,[4,50,54,58...| 5830|
|(2136,[0,15,54,58...|10262|
|(2136,[2,15,54,58...|13381|
|(2136,[0,19,54,58...|12898|
+--------------------+-----+
only showing top 20 rows



In [None]:
training_data, testing_data = finalized_train_df.randomSplit([0.7,0.3], seed = 2018)
print("Training Dataset Count: " + str(training_data.count()))
print("Testing Dataset Count: " + str(testing_data.count()))

Training Dataset Count: 7526
Testing Dataset Count: 3157


In [None]:
testing_data.columns

['features', 'Price']

In [None]:
from pyspark.ml.regression import RandomForestRegressor

rfr = RandomForestRegressor(labelCol="Price", featuresCol="features")
rfrModel = rfr.fit(training_data)
predictions = rfrModel.transform(testing_data)
predictions.select("features", "Price", "prediction").show()

+--------------------+-----+------------------+
|            features|Price|        prediction|
+--------------------+-----+------------------+
|(2136,[0,11,54,58...|15554|14548.097598340126|
|(2136,[0,11,54,58...|15554|14548.097598340126|
|(2136,[0,11,54,58...|12373|12190.698518329295|
|(2136,[0,11,54,58...|12373|12190.698518329295|
|(2136,[0,11,54,58...|12373|12190.698518329295|
|(2136,[0,11,54,58...|12373|12190.698518329295|
|(2136,[0,11,54,58...|15554|14548.097598340126|
|(2136,[0,11,54,58...|12373|12190.698518329295|
|(2136,[0,11,54,58...|12373|12190.698518329295|
|(2136,[0,11,54,58...|12373|12190.698518329295|
|(2136,[0,11,54,58...|12373|12190.698518329295|
|(2136,[0,11,54,58...|12373|12190.698518329295|
|(2136,[0,11,54,58...|12373|12190.698518329295|
|(2136,[0,11,54,58...|12373|12190.698518329295|
|(2136,[0,11,54,58...|12373|12190.698518329295|
|(2136,[0,11,54,58...|13029|12869.923046451207|
|(2136,[0,11,54,58...|15129|14139.478314954973|
|(2136,[0,11,54,58...|15129|14139.478314

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol="Price", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 1184.32


In [None]:
from pyspark.ml.regression import GBTRegressor

gbt = GBTRegressor(featuresCol="features",labelCol="Price", maxIter=10)
gbtModel = gbt.fit(training_data)
predictions_gbt = gbtModel.transform(testing_data)

predictions_gbt.select("features", "Price", "prediction").show()

+--------------------+-----+------------------+
|            features|Price|        prediction|
+--------------------+-----+------------------+
|(2136,[0,11,54,58...|15554|15860.540874969634|
|(2136,[0,11,54,58...|15554|15860.540874969634|
|(2136,[0,11,54,58...|12373|12136.887067455067|
|(2136,[0,11,54,58...|12373|12136.887067455067|
|(2136,[0,11,54,58...|12373|12136.887067455067|
|(2136,[0,11,54,58...|12373|12136.887067455067|
|(2136,[0,11,54,58...|15554|15860.540874969634|
|(2136,[0,11,54,58...|12373|12136.887067455067|
|(2136,[0,11,54,58...|12373|12136.887067455067|
|(2136,[0,11,54,58...|12373|12136.887067455067|
|(2136,[0,11,54,58...|12373|12136.887067455067|
|(2136,[0,11,54,58...|12373|12136.887067455067|
|(2136,[0,11,54,58...|12373|12136.887067455067|
|(2136,[0,11,54,58...|12373|12136.887067455067|
|(2136,[0,11,54,58...|12373|12136.887067455067|
|(2136,[0,11,54,58...|13029|12851.156908724908|
|(2136,[0,11,54,58...|15129|14734.428095165586|
|(2136,[0,11,54,58...|15129|14734.428095

In [None]:
evaluator = RegressionEvaluator(labelCol="Price", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions_gbt)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 893.155


In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(labelCol="Price", rawPredictionCol='prediction')
print("Test_SET (Area Under ROC): " + str(evaluator.evaluate(predictions_gbt, {evaluator.metricName: "areaUnderROC"})))

Test_SET (Area Under ROC): 1.0
