## Install pyspark

In [None]:
!pip install pyspellchecker
!pip install pyspark
!pip install openpyxl
import warnings
warnings.filterwarnings("ignore")

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
from pyspark import SparkContext, SparkConf
conf = SparkConf()
conf.set('spark.sql.autoBroadcastJoinThreshold',-1)

## Import dataset

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
df = pd.read_excel('/kaggle/input/flight-fare-prediction-mh/Data_Train.xlsx')

In [None]:
from pyspark.sql.types import *
df_schema = StructType([StructField('Airline', StringType(), True),StructField('Date_of_Journey', StringType(), True),
                       StructField('Source', StringType(), True), StructField('Destination', StringType(), True),
                       StructField('Route', StringType(), True),StructField('Dep_Time', StringType(), True),
                       StructField('Arrival_Time', StringType(), True),StructField('Duration', StringType(), True),
                       StructField('Total_Stops', StringType(), True),StructField('Additional_Info', StringType(), True),
                       StructField('Price', IntegerType(), True)])
data = spark.createDataFrame(df,schema=df_schema)

In [None]:
data.printSchema()

In [None]:
type(data)

In [None]:
#data.show()
data.toPandas()

#### Check null values and remove them

In [None]:
from pyspark.sql.functions import col,isnan, when, count

In [None]:
# data.select([count(when((col(c) == '') |isnan(c) | col(c).isNull(), c)).alias(c) for c in data.columns]).show()
data.select([count(when((col(c) == '') |isnan(c) | col(c).isNull(), c)).alias(c) for c in data.columns]).toPandas()

In [None]:
# data.filter("Route == 'NaN'").show()
data.filter("Route == 'NaN'").toPandas()

In [None]:
df = data.where(data.Route != 'NaN').where(data.Total_Stops !='NaN')
# df.show()
df.toPandas()

In [None]:
df.count()

In [None]:
# df.select([count(when((col(c) == '') |isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()
df.select([count(when((col(c) == '') |isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).toPandas()

In [None]:
from pyspark.sql.functions import *
from datetime import datetime
import pyspark.sql.functions as sqlFunc
import pyspark.sql.types as types
from pyspark.sql import functions as F
from pyspark.sql.functions import to_timestamp

## Handle Number Data

#### Handle Dep_Time column








In [None]:
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
#df.select('Date_of_Journey', from_unixtime(unix_timestamp('Date_of_Journey', 'dd/MM/yyyy')))

In [None]:
df.select('Dep_Time').show()

In [None]:
# Split Hour from Dep_Time column
df = df.withColumn('Dep_Hour',hour('Dep_Time'))

In [None]:
# Split minute from Dep_Time column
df = df.withColumn('Dep_Minute',minute('Dep_Time'))

In [None]:
df.select('Dep_Hour','Dep_Minute').show()

In [None]:
df = df.drop('Dep_Time')

#### Handle Date_of_Journey column

In [None]:
# convert data to date
#df = df.withColumn("Date_of_Journey",to_date(col("Date_of_Journey"),"dd/MM/yyyy"))
df = df.withColumn('Date_of_Journey',to_timestamp(col('Date_of_Journey'), 'dd/MM/yyyy'))

In [None]:
# Journey Month
df = df.withColumn('Journey_Month',month('Date_of_Journey'))

In [None]:
# Journey Day
df = df.withColumn('Journey_Day', dayofmonth('Date_of_Journey'))

In [None]:
df = df.drop('Date_of_Journey')

In [None]:
df.select('Journey_Day','Journey_Month').show()

In [None]:
df.printSchema()

### Handle Arrival_Time column

In [None]:
df =df.withColumn('Arrival_Time', split(col('Arrival_Time'), ' ').getItem(0))

In [None]:
df.select('Arrival_Time').show()

In [None]:
df = df.withColumn('Arrival_Hour', hour('Arrival_Time'))

In [None]:
df = df.withColumn('Arrival_Minute', minute('Arrival_Time'))

In [None]:
df.select('Arrival_Hour','Arrival_Minute').show()

In [None]:
df = df.drop('Arrival_Time')

#### Handle Duration column

In [None]:
df.select('Duration').show()

In [None]:
# Split hour and minute from duration column
df = df.withColumn('Duration_Hour', split(df['Duration'], ' ').getItem(0)).withColumn('Duration_Minute', split(df['Duration'],' ').getItem(1))

In [None]:
df.select('Duration_Hour',"Duration_Minute").show()

In [None]:
# Handle null values in Duration_Hour 
df = df.withColumn("Duration_Hour", expr("CASE WHEN Duration_Hour IS NULL THEN '0h' " +  "ELSE Duration_Hour END"))

In [None]:
# Handle null values in Duration_Minute
df = df.withColumn("Duration_Minute", expr("CASE WHEN Duration_Minute IS NULL THEN '0m' " +  "ELSE Duration_Minute END"))

In [None]:
df.select('Duration_Hour','Duration_Minute').show()

In [None]:
# get values minute 
df = df.withColumn("Duration_Minute", regexp_extract("Duration_Minute", r'(\d+)m' , 1 ))

In [None]:
# get values hour
df = df.withColumn("Duration_Hour", regexp_extract("Duration_Hour", r'(\d+)h' , 1 ))

In [None]:
df.select('Duration_Hour','Duration_Minute').show()

In [None]:
df = df.withColumn('Duration_Hour', df.Duration_Hour.cast(IntegerType()))

In [None]:
df = df.withColumn('Duration_Minute', df.Duration_Hour.cast(IntegerType()))

In [None]:
df= df.drop('Duration')

In [None]:
df.select('Price','Journey_Day','Journey_Month','Dep_Hour','Dep_Minute','Arrival_Hour',
                       'Arrival_Minute','Duration_Hour','Duration_Minute').show()

## Handling Categorical Data

In [None]:
df.select('Airline', 'Source', 'Destination', 'Route', 'Total_Stops', 'Additional_Info').show()

#### Handle Airline column

In [None]:
df.select(countDistinct('Airline')).show()

In [None]:
df.select('Airline').distinct().collect()

In [None]:
df.groupBy('Airline').count().alias('Count').sort(col('Count').desc()).show()

In [None]:
data = df.toPandas()

In [None]:
plt.figure(figsize=(15,8))
sns.boxplot(y='Price',x='Airline',data=data.sort_values('Price',ascending=False));
plt.xticks(rotation = 45);

In [None]:
plt.figure(figsize=(15,8))
sns.boxplot(y='Price',x='Total_Stops',data=data.sort_values('Price',ascending=False));

In [None]:
import pyspark.sql.functions as F 
categ_air = df.select('Airline').distinct().rdd.flatMap(lambda x:x).collect()
exprs_air = [F.when(F.col('Airline') == cat,1).otherwise(0).alias(str(cat)) for cat in categ_air]
df = df.select(exprs_air + df.columns)

In [None]:
test = df.select('Air India','GoAir','IndiGo','Jet Airways','Jet Airways Business','Multiple carriers',
                    'Multiple carriers Premium economy','SpiceJet','Trujet','Vistara','Vistara Premium economy') # Air Asia

In [None]:
test.toPandas().head()

In [None]:
df = df.drop('Air Asia').drop('Airline')

#### Handle Source column

In [None]:
df.select(countDistinct('Source')).show()

In [None]:
df.select('Source').distinct().collect()

In [None]:
df.groupBy('Source').count().alias('Count').sort(col('Count').desc()).show()

In [None]:
plt.figure(figsize=(15,8));
sns.catplot(y='Price',x='Source',data=data.sort_values('Price',ascending=False),kind='boxen');
plt.xticks(rotation = 0);

In [None]:
categ_sou = df.select('Source').distinct().rdd.flatMap(lambda x:x).collect()
exprs_sou = [F.when(F.col('Source') == cat,1).otherwise(0).alias(str(cat)+'_Sour') for cat in categ_sou]
df = df.select(exprs_sou+df.columns)

In [None]:
df.select('Chennai_Sour','Delhi_Sour','Kolkata_Sour','Mumbai_Sour').show()

In [None]:
df = df.drop('Banglore_Sour').drop('Source')

In [None]:
df.printSchema()

#### Handle Destination column

In [None]:
df.select(countDistinct('Destination')).show()

In [None]:
df.select('Destination').distinct().collect()

In [None]:
df.groupBy('Destination').count().alias('Count').sort(col('Count').desc()).show()

In [None]:
plt.figure(figsize=(15,8));
sns.catplot(y='Price',x='Destination',data=data.sort_values('Price',ascending=False),kind='boxen');
plt.xticks(rotation = 45);

In [None]:
categ_des = df.select('Destination').distinct().rdd.flatMap(lambda x:x).collect()
exprs_des = [F.when(F.col('Destination') == cat,1).otherwise(0).alias(str(cat)+'_Des') for cat in categ_des]
df = df.select(exprs_des +df.columns)

In [None]:
df = df.drop('Banglore_Des').drop('Destination')

#### Handle Route column

In [None]:
df.select('Route').show()

In [None]:
df = df.withColumn('Route_1',split(df['Route'],'→').getItem(0))\
            .withColumn('Route_2',split(df['Route'],'→').getItem(1))\
            .withColumn('Route_3',split(df['Route'],'→').getItem(2))\
            .withColumn('Route_4',split(df['Route'],'→').getItem(3))\
            .withColumn('Route_5',split(df['Route'],'→').getItem(4))

In [None]:
df = df.drop('Route').drop('Additional_Info')

In [None]:
df.select('Route_1','Route_2','Route_3','Route_4','Route_5').show()

In [None]:
df = df.na.fill("None",subset=['Route_1','Route_2','Route_3','Route_4','Route_5'])

In [None]:
df.select('Route_1','Route_2','Route_3','Route_4','Route_5').show()

In [None]:
e = df.select('Route_1','Route_2','Route_3','Route_4','Route_5')

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer

In [None]:
for i in e.columns:
    indexer = StringIndexer(inputCol=i, outputCol=i+ "_Label")
    df = indexer.fit(df).transform(df)

In [None]:
df.printSchema()

In [None]:
df.select('Route_1_Label','Route_2_Label','Route_3_Label','Route_4_Label','Route_5_Label').show()

In [None]:
# Drop Route_1, Route_2, Route_3, Route_4, Route_5 columns
df = df.drop('Route_1').drop('Route_2').drop('Route_3').drop('Route_4').drop('Route_5')

In [None]:
# Convert Route_1_Label, Route_2_Label, Route_3_Label, Route_4_Label, Route_4_Label columns from DoubleType to IntegerType
df = df.withColumn('Route_1_Label', col('Route_1_Label').cast(IntegerType()))\
            .withColumn('Route_2_Label', col('Route_2_Label').cast(IntegerType()))\
            .withColumn('Route_3_Label', col('Route_3_Label').cast(IntegerType()))\
            .withColumn('Route_4_Label', col('Route_4_Label').cast(IntegerType()))\
            .withColumn('Route_5_Label', col('Route_5_Label').cast(IntegerType()))

In [None]:
df.select('Route_1_Label','Route_2_Label','Route_3_Label','Route_4_Label','Route_5_Label').show()

In [None]:
df.printSchema()

#### Handle Total_Stops column

In [None]:
df.select(countDistinct('Total_Stops')).show()

In [None]:
df.select('Total_Stops').distinct().collect()

In [None]:
df.groupBy('Total_Stops').count().alias('Count').sort(col('Count').desc()).show()

In [None]:
plt.figure(figsize=(15,8));
sns.catplot(y='Price',x='Total_Stops',data=data.sort_values('Price',ascending=False),kind='boxen');
plt.xticks(rotation = 45);

In [None]:
df = df.withColumn('Total_Stops',when(col('Total_Stops')=='1 stop',1)
                                                    .when(col('Total_Stops')=='2 stops',2)
                                                    .when(col('Total_Stops')=='3 stops',3)
                                                    .when(col('Total_Stops')=='4 stops',4)
                                                    .otherwise(0))

In [None]:
df.select('Total_Stops').show()

In [None]:
df.printSchema()

## Outlier Detection

In [None]:
def plot(df,col):
    fig,(ax1,ax2)=plt.subplots(2,1)
    sns.distplot(df[col],ax=ax1)
    sns.boxplot(df[col],ax=ax2)

In [None]:
dataPlot = df.toPandas()
plt.figure(figsize=(30,20));
plot(df.toPandas(),'Price');

In [None]:
dataPlot['Price']=np.where(dataPlot['Price']>=40000,dataPlot['Price'].mean(),dataPlot['Price'])

In [None]:
df = df.withColumn('Price', 
              F.when((df['Price'] >=40000),
              F.round(F.lit(df.select(F.mean(F.col('Price')).alias('mean')).collect()[0]['mean'])).cast('Integer')).otherwise(F.col('Price')))

In [None]:
plt.figure(figsize=(30,20));
plot(dataPlot,'Price') ; 

In [None]:
# Split data to Data Train and Data Test 
(trainDF, testDF) = df.randomSplit([.8, .2], seed=1)

In [None]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=['Total_Stops', 'Route_1_Label', 'Route_2_Label', 'Route_3_Label', 'Route_4_Label', 
                                       'Route_5_Label','Air India', 'GoAir', 'IndiGo', 'Jet Airways', 'Jet Airways Business',
                                       'Multiple carriers', 'Multiple carriers Premium economy', 'SpiceJet',
                                       'Trujet', 'Vistara', 'Vistara Premium economy','Kolkata_Des','Delhi_Des',
                                       'Cochin_Des','New Delhi_Des','Hyderabad_Des','Chennai_Sour','Mumbai_Sour','Kolkata_Sour',
                                       'Delhi_Sour', 'Journey_Day', 'Journey_Month', 'Dep_Hour',
                                       'Dep_Minute', 'Arrival_Hour', 'Arrival_Minute',
                                       'Duration_Hour', 'Duration_Minute'], outputCol='features') 
assembler_train = assembler.setHandleInvalid("skip").transform(trainDF)
final_train = assembler_train.select('features','Price')
final_train.show(3)

## Build model and pipeline

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StringIndexer, VectorIndexer, IndexToString

In [None]:
from pyspark.ml.regression import RandomForestRegressor
rfr = RandomForestRegressor(featuresCol = 'features', labelCol='Price', maxDepth=5, numTrees=100)

In [None]:
from pyspark.ml.regression import DecisionTreeRegressor
dt = DecisionTreeRegressor(featuresCol = 'features', labelCol='Price')

In [None]:
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(featuresCol = 'features', labelCol='Price')

In [None]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='Price')

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
stages_ = [dt, rfr, gbt, lr]

In [None]:
for stage in stages_:
    
    #build pipeline for each method
    pipeline = Pipeline(stages=[assembler, stage])

    # fit model
    #stage.setMaxBins(40)

    model = pipeline.fit(trainDF)
    
    # predict and evaluate model

    predictions = model.transform(testDF)

    predictions.select("prediction","Price", "features").show(5)

    evaluator = RegressionEvaluator(labelCol="Price", predictionCol="prediction", metricName="rmse")

    # RMSE
    rmse = evaluator.evaluate(predictions)
    print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

    # R squared
    r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})
    print("R Squared (R^2) on test data = %g" % r2)

## Hyperparameter tuning

In [None]:
maxDepth = []
for i in range(1,10):
  maxDepth.append(i)

### Decision tree

In [None]:
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.tuning import ParamGridBuilder

paramGrid_dt = (ParamGridBuilder().addGrid(dt.maxDepth, maxDepth).build())
# evaluate model by R2
evaluator = RegressionEvaluator(labelCol="Price", predictionCol="prediction", metricName='rmse')

pipeline = Pipeline(stages=[assembler, dt])

cv = CrossValidator(estimator=pipeline,evaluator = evaluator,
                    estimatorParamMaps = paramGrid_dt,
                    numFolds=3,seed=42)
  
# fit on TrainDF
dt.setMaxBins(40)

Model = cv.fit(trainDF)

print(pd.DataFrame(list(zip(Model.getEstimatorParamMaps(), Model.avgMetrics)) ,
                     columns=['Regression','RMSE']).sort_values(by="RMSE"))

### Random forest 

In [None]:
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.tuning import ParamGridBuilder

paramGrid_rfr = ParamGridBuilder().addGrid(rfr.numTrees, [10,100]) \
    .addGrid(rfr.maxDepth, maxDepth) \
    .build()
# evaluate model by R2
evaluator = RegressionEvaluator(labelCol="Price", predictionCol="prediction", metricName='rmse')

pipeline = Pipeline(stages=[assembler, rfr])

cv = CrossValidator(estimator=pipeline,evaluator = evaluator,
                    estimatorParamMaps = paramGrid_rfr,
                    numFolds=3,seed=42)
  
# fit on TrainDF
rfr.setMaxBins(40)

Model = cv.fit(trainDF)

print(pd.DataFrame(list(zip(Model.getEstimatorParamMaps(), Model.avgMetrics)) ,
                     columns=['Regression','RMSE']).sort_values(by="RMSE"))

### Linear regression

In [None]:
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.tuning import ParamGridBuilder

paramGrid_lr = ParamGridBuilder().addGrid(lr.regParam, [0.1, 0.01]).addGrid(lr.elasticNetParam, [0, 1]).build()
# evaluate model by R2
evaluator = RegressionEvaluator(labelCol="Price", predictionCol="prediction", metricName='rmse')

pipeline = Pipeline(stages=[assembler, lr])

cv = CrossValidator(estimator=pipeline,evaluator = evaluator,
                    estimatorParamMaps = paramGrid_lr,
                    numFolds=3,seed=42)
  
# fit on TrainDF

Model = cv.fit(trainDF)

print(pd.DataFrame(list(zip(Model.getEstimatorParamMaps(), Model.avgMetrics)) ,
                     columns=['Regression','RMSE']).sort_values(by="RMSE"))

### Gradient booted tree

In [None]:
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.tuning import ParamGridBuilder

paramGrid_gbt = (ParamGridBuilder().addGrid(gbt.maxDepth,maxDepth).addGrid(gbt.maxIter, [10, 100]).build())
    
# evaluate model by R2
evaluator_gbt = RegressionEvaluator(labelCol="Price", predictionCol="prediction", metricName='rmse')

pipeline = Pipeline(stages=[assembler, gbt])

cv = CrossValidator(estimator=pipeline,evaluator = evaluator_gbt,
                    estimatorParamMaps = paramGrid_gbt,
                    numFolds=3,
                    parallelism=4, seed=42)
  
# fit on TrainDF
gbt.setMaxBins(40)

Model = cv.fit(trainDF)

print(pd.DataFrame(list(zip(Model.getEstimatorParamMaps(), Model.avgMetrics)) ,
                     columns=['Regression','RMSE']).sort_values(by="RMSE"))

In [None]:
bestPipeline = Model.bestModel 
bestModel = bestPipeline.stages[1]

In [None]:
print('numTrees - ', bestModel.getNumTrees)
print('maxDepth - ', bestModel.getOrDefault('maxDepth'))

In [None]:
rmse = Model.getEvaluator().evaluate(Model.transform(testDF))
Pred = Model.transform(df)
Result = Pred.toPandas()
plt.plot(Result.Price, Result.prediction, 'go')
plt.xlabel('Price')
plt.ylabel('Prediction')
plt.suptitle("Model Performance RMSE: %f" % rmse)
plt.show()

In [None]:
importances = bestModel.featureImportances
x_values = list(range(len(importances)))
plt.figure(figsize=(20,10));
plt.bar(x_values, importances, orientation = 'vertical')
feature_list = ['Total_Stops', 'Route_1_Label', 'Route_2_Label', 'Route_3_Label', 'Route_4_Label', 
                                       'Route_5_Label','Air India', 'GoAir', 'IndiGo', 'Jet Airways', 'Jet Airways Business',
                                       'Multiple carriers', 'Multiple carriers Premium economy', 'SpiceJet',
                                       'Trujet', 'Vistara', 'Vistara Premium economy','Kolkata_Des','Delhi_Des',
                                       'Cochin_Des','New Delhi_Des','Hyderabad_Des','Chennai_Sour','Mumbai_Sour','Kolkata_Sour',
                                       'Delhi_Sour', 'Journey_Day', 'Journey_Month', 'Dep_Hour',
                                       'Dep_Minute', 'Arrival_Hour', 'Arrival_Minute',
                                       'Duration_Hour', 'Duration_Minute']
plt.xticks(x_values, feature_list, rotation=90)
plt.ylabel('Importance')
plt.xlabel('Feature')
plt.title('Feature Importances');

In [None]:
featuresDF = pd.DataFrame(list(zip(assembler.getInputCols(), bestModel.featureImportances)), 
                          columns=["feature", "importance"])

In [None]:
featuresDF.sort_values(["importance"], ascending=False)