In [15]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *


import matplotlib.pyplot as plt
import seaborn as sns

from scipy import stats
from scipy.stats import norm, skew 

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler,MinMaxScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.classification import LogisticRegression






# Create Spark Session and Read the file

In [100]:

spark = SparkSession.builder.appName("Invistico_Airline").getOrCreate()
df = spark.read.csv('../data/Invistico_Airline.csv',inferSchema=True,header=True)


# Summary of dataset

In [103]:
df.summary

<bound method DataFrame.summary of DataFrame[satisfaction: string, Gender: string, Customer Type: string, Age: int, Type of Travel: string, Class: string, Flight Distance: int, Seat comfort: int, Departure/Arrival time convenient: int, Food and drink: int, Gate location: int, Inflight wifi service: int, Inflight entertainment: int, Online support: int, Ease of Online booking: int, On-board service: int, Leg room service: int, Baggage handling: int, Checkin service: int, Cleanliness: int, Online boarding: int, Departure Delay in Minutes: int, Arrival Delay in Minutes: int]>

In [105]:
df.printSchema()

root
 |-- satisfaction: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Customer Type: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Type of Travel: string (nullable = true)
 |-- Class: string (nullable = true)
 |-- Flight Distance: integer (nullable = true)
 |-- Seat comfort: integer (nullable = true)
 |-- Departure/Arrival time convenient: integer (nullable = true)
 |-- Food and drink: integer (nullable = true)
 |-- Gate location: integer (nullable = true)
 |-- Inflight wifi service: integer (nullable = true)
 |-- Inflight entertainment: integer (nullable = true)
 |-- Online support: integer (nullable = true)
 |-- Ease of Online booking: integer (nullable = true)
 |-- On-board service: integer (nullable = true)
 |-- Leg room service: integer (nullable = true)
 |-- Baggage handling: integer (nullable = true)
 |-- Checkin service: integer (nullable = true)
 |-- Cleanliness: integer (nullable = true)
 |-- Online boarding: integer (nullable = true)

# Checking for null values

In [104]:
from pyspark.sql.functions import col,isnan, when, count

df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()


+------------+------+-------------+---+--------------+-----+---------------+------------+---------------------------------+--------------+-------------+---------------------+----------------------+--------------+----------------------+----------------+----------------+----------------+---------------+-----------+---------------+--------------------------+------------------------+
|satisfaction|Gender|Customer Type|Age|Type of Travel|Class|Flight Distance|Seat comfort|Departure/Arrival time convenient|Food and drink|Gate location|Inflight wifi service|Inflight entertainment|Online support|Ease of Online booking|On-board service|Leg room service|Baggage handling|Checkin service|Cleanliness|Online boarding|Departure Delay in Minutes|Arrival Delay in Minutes|
+------------+------+-------------+---+--------------+-----+---------------+------------+---------------------------------+--------------+-------------+---------------------+----------------------+--------------+----------------------

In [72]:
# df['Arrival Delay in Minutes']
# df.groupBy('Arrival Delay in Minutes').count()



## Imputing null values
### Only column "Arrival Delay in Minutes" has null values

In [107]:

val = float(df.describe("Arrival Delay in Minutes").filter("summary = 'max'").select("Arrival Delay in Minutes").collect()[0].asDict()['Arrival Delay in Minutes'])


In [108]:
from pyspark.ml.feature import Imputer

imputer = Imputer(inputCol='Arrival Delay in Minutes',
                 outputCol='ADMI',
                ).setStrategy("median").setMissingValue(val)

df = imputer.fit(df).transform(df)
df = df.drop('Arrival Delay in Minutes')


In [109]:
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()



+------------+------+-------------+---+--------------+-----+---------------+------------+---------------------------------+--------------+-------------+---------------------+----------------------+--------------+----------------------+----------------+----------------+----------------+---------------+-----------+---------------+--------------------------+----+
|satisfaction|Gender|Customer Type|Age|Type of Travel|Class|Flight Distance|Seat comfort|Departure/Arrival time convenient|Food and drink|Gate location|Inflight wifi service|Inflight entertainment|Online support|Ease of Online booking|On-board service|Leg room service|Baggage handling|Checkin service|Cleanliness|Online boarding|Departure Delay in Minutes|ADMI|
+------------+------+-------------+---+--------------+-----+---------------+------------+---------------------------------+--------------+-------------+---------------------+----------------------+--------------+----------------------+----------------+----------------+-----

### split dta into test and train

In [58]:
train, test = df.randomSplit([0.77, 0.33])

In [110]:
print(type(train))

<class 'pyspark.sql.dataframe.DataFrame'>


# ML pipeline

In [111]:

categorical_features = ['satisfaction',
 'Gender',
 'Customer Type',
 'Type of Travel',
 'Class',
 'Seat comfort',
 'Departure/Arrival time convenient',
 'Food and drink',
 'Gate location',
 'Inflight wifi service',
 'Inflight entertainment',
 'Online support',
 'Ease of Online booking',
 'On-board service',
 'Leg room service',
 'Baggage handling',
 'Checkin service',
 'Cleanliness',
 'Online boarding']

continues_features = ['Age',
 'Flight Distance',
 'Departure Delay in Minutes',
 'ADMI']


pipeline_list=[]
featuress = []

# StringIndexer to convert string values into numerical
[pipeline_list.append(StringIndexer(inputCol=c, outputCol= c+'_indexed')) for c in categorical_features]
[featuress.append(c+'_indexed') for c in categorical_features]


# Feature Scaling
assembler = VectorAssembler(inputCols=continues_features, outputCol='continuous_features')
pipeline_list.append(assembler)

minmaxscalar = MinMaxScaler(inputCol='continuous_features', outputCol='continuous_features_scaled')
pipeline_list.append(minmaxscalar)
featuress.append('continuous_features_scaled')

assembler_final = VectorAssembler(inputCols=['satisfaction_indexed', 'Gender_indexed', 'Customer Type_indexed', 'Type of Travel_indexed', 'Class_indexed', 'Seat comfort_indexed', 'Departure/Arrival time convenient_indexed', 'Food and drink_indexed', 'Gate location_indexed', 'Inflight wifi service_indexed', 'Inflight entertainment_indexed', 'Online support_indexed', 'Ease of Online booking_indexed', 'On-board service_indexed', 'Leg room service_indexed', 'Baggage handling_indexed', 'Checkin service_indexed', 'Cleanliness_indexed', 'Online boarding_indexed', 'continuous_features_scaled'],
                                  outputCol='features')
pipeline_list.append(assembler_final)


label_encoder = StringIndexer(inputCol='satisfaction', outputCol= 'satisfaction_encoded')
pipeline_list.append(label_encoder)
featuress.append('satisfaction_encoded')

# ml model
model = LogisticRegression(featuresCol = 'features', labelCol = 'satisfaction_encoded', maxIter=5)
pipeline_list.append(model)

pipeline = Pipeline(stages=pipeline_list)

pipeline_model = pipeline.fit(train)
data_train = pipeline_model.transform(train)


In [114]:

predictions = pipeline_model.transform(test)
predictions.head(1)
# predictions.select('satisfaction', 'features',  'rawPrediction', 'prediction', 'probability').toPandas().head(5)

[Row(satisfaction='dissatisfied', Gender='Female', Customer Type='Loyal Customer', Age=7, Type of Travel='Personal Travel', Class='Business', Flight Distance=2295, Seat comfort=4, Departure/Arrival time convenient=4, Food and drink=4, Gate location=3, Inflight wifi service=1, Inflight entertainment=4, Online support=1, Ease of Online booking=1, On-board service=5, Leg room service=4, Baggage handling=4, Checkin service=3, Cleanliness=4, Online boarding=1, Departure Delay in Minutes=24, ADMI=10, satisfaction_indexed=1.0, Gender_indexed=0.0, Customer Type_indexed=0.0, Type of Travel_indexed=1.0, Class_indexed=0.0, Seat comfort_indexed=2.0, Departure/Arrival time convenient_indexed=0.0, Food and drink_indexed=2.0, Gate location_indexed=0.0, Inflight wifi service_indexed=4.0, Inflight entertainment_indexed=0.0, Online support_indexed=4.0, Ease of Online booking_indexed=4.0, On-board service_indexed=1.0, Leg room service_indexed=0.0, Baggage handling_indexed=0.0, Checkin service_indexed=1.0

## Prediction Schema

In [113]:
data_train.printSchema()

root
 |-- satisfaction: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Customer Type: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Type of Travel: string (nullable = true)
 |-- Class: string (nullable = true)
 |-- Flight Distance: integer (nullable = true)
 |-- Seat comfort: integer (nullable = true)
 |-- Departure/Arrival time convenient: integer (nullable = true)
 |-- Food and drink: integer (nullable = true)
 |-- Gate location: integer (nullable = true)
 |-- Inflight wifi service: integer (nullable = true)
 |-- Inflight entertainment: integer (nullable = true)
 |-- Online support: integer (nullable = true)
 |-- Ease of Online booking: integer (nullable = true)
 |-- On-board service: integer (nullable = true)
 |-- Leg room service: integer (nullable = true)
 |-- Baggage handling: integer (nullable = true)
 |-- Checkin service: integer (nullable = true)
 |-- Cleanliness: integer (nullable = true)
 |-- Online boarding: integer (nullable = true)