## Data processing Stage

In [1]:
import os
os.environ["PYARROW_IGNORE_TIMEZONE"]='1'

from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .master("local") \
    .appName("Airlines") \
    .getOrCreate()

import pyspark.sql.functions as F

In [2]:
data = spark.read.csv(".//archive/train.csv", sep=',', encoding="UTF-8", header=True, inferSchema=True)
data.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- id: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Customer Type: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Type of Travel: string (nullable = true)
 |-- Class: string (nullable = true)
 |-- Flight Distance: integer (nullable = true)
 |-- Inflight wifi service: integer (nullable = true)
 |-- Departure/Arrival time convenient: integer (nullable = true)
 |-- Ease of Online booking: integer (nullable = true)
 |-- Gate location: integer (nullable = true)
 |-- Food and drink: integer (nullable = true)
 |-- Online boarding: integer (nullable = true)
 |-- Seat comfort: integer (nullable = true)
 |-- Inflight entertainment: integer (nullable = true)
 |-- On-board service: integer (nullable = true)
 |-- Leg room service: integer (nullable = true)
 |-- Baggage handling: integer (nullable = true)
 |-- Checkin service: integer (nullable = true)
 |-- Inflight service: integer (nullable = true)
 |-- Cleanline

In [3]:
data = data.drop('_c0')
data.printSchema()

root
 |-- id: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Customer Type: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Type of Travel: string (nullable = true)
 |-- Class: string (nullable = true)
 |-- Flight Distance: integer (nullable = true)
 |-- Inflight wifi service: integer (nullable = true)
 |-- Departure/Arrival time convenient: integer (nullable = true)
 |-- Ease of Online booking: integer (nullable = true)
 |-- Gate location: integer (nullable = true)
 |-- Food and drink: integer (nullable = true)
 |-- Online boarding: integer (nullable = true)
 |-- Seat comfort: integer (nullable = true)
 |-- Inflight entertainment: integer (nullable = true)
 |-- On-board service: integer (nullable = true)
 |-- Leg room service: integer (nullable = true)
 |-- Baggage handling: integer (nullable = true)
 |-- Checkin service: integer (nullable = true)
 |-- Inflight service: integer (nullable = true)
 |-- Cleanliness: integer (nullable = true)
 |-- D

In [4]:
data = data.withColumnRenamed("satisfaction", "Satisfaction")

In [5]:
data.show(1, vertical=True)

-RECORD 0-------------------------------------------------
 id                                | 70172                
 Gender                            | Male                 
 Customer Type                     | Loyal Customer       
 Age                               | 13                   
 Type of Travel                    | Personal Travel      
 Class                             | Eco Plus             
 Flight Distance                   | 460                  
 Inflight wifi service             | 3                    
 Departure/Arrival time convenient | 4                    
 Ease of Online booking            | 3                    
 Gate location                     | 1                    
 Food and drink                    | 5                    
 Online boarding                   | 3                    
 Seat comfort                      | 5                    
 Inflight entertainment            | 5                    
 On-board service                  | 4                  

In [6]:
data.select(F.col('id'), 'Type of Travel', 'Gender', 'Satisfaction').show(5)

+------+---------------+------+--------------------+
|    id| Type of Travel|Gender|        Satisfaction|
+------+---------------+------+--------------------+
| 70172|Personal Travel|  Male|neutral or dissat...|
|  5047|Business travel|  Male|neutral or dissat...|
|110028|Business travel|Female|           satisfied|
| 24026|Business travel|Female|neutral or dissat...|
|119299|Business travel|  Male|           satisfied|
+------+---------------+------+--------------------+
only showing top 5 rows



In [7]:
data.summary().pandas_api()

Unnamed: 0,summary,id,Gender,Customer Type,Age,Type of Travel,Class,Flight Distance,Inflight wifi service,Departure/Arrival time convenient,Ease of Online booking,Gate location,Food and drink,Online boarding,Seat comfort,Inflight entertainment,On-board service,Leg room service,Baggage handling,Checkin service,Inflight service,Cleanliness,Departure Delay in Minutes,Arrival Delay in Minutes,Satisfaction
0,count,103904.0,103904,103904,103904.0,103904,103904,103904.0,103904.0,103904.0,103904.0,103904.0,103904.0,103904.0,103904.0,103904.0,103904.0,103904.0,103904.0,103904.0,103904.0,103904.0,103904.0,103594.0,103904
1,mean,64924.21050200185,,,39.37970626732368,,,1189.4483754234675,2.72968316907915,3.06029604250077,2.7569006005543577,2.9768825069294733,3.2021288882044963,3.250375346473668,3.4393959809054517,3.358157530027718,3.382362565445026,3.3510548198336925,3.6318332306744687,3.304290498922082,3.6404277024946095,3.2863508623344626,14.815618263012013,15.178678301832152,
2,stddev,37463.8122515513,,,15.114963699737896,,,997.1472805289563,1.3278294712362229,1.5250751972834864,1.398929472659151,1.2776210103002337,1.329532710858201,1.349508953713335,1.319087518617336,1.332990714627123,1.2883543609017745,1.315604619325567,1.180903411075977,1.2653958269270715,1.1756630338758984,1.3122728470665397,38.23090058414181,38.6986820209665,
3,min,1.0,Female,Loyal Customer,7.0,Business travel,Business,31.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,neutral or dissatisfied
4,25%,32537.0,,,27.0,,,414.0,2.0,2.0,2.0,2.0,2.0,2.0,2.0,2.0,2.0,2.0,3.0,3.0,3.0,2.0,0.0,0.0,
5,50%,64858.0,,,40.0,,,843.0,3.0,3.0,3.0,3.0,3.0,3.0,4.0,4.0,4.0,4.0,4.0,3.0,4.0,3.0,0.0,0.0,
6,75%,97363.0,,,51.0,,,1743.0,4.0,4.0,4.0,4.0,4.0,4.0,5.0,4.0,4.0,4.0,5.0,4.0,5.0,4.0,12.0,13.0,
7,max,129880.0,Male,disloyal Customer,85.0,Personal Travel,Eco Plus,4983.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,1592.0,1584.0,satisfied


In [8]:
(data
    .groupBy('Type of Travel')
    .agg(F.count('Type of Travel').alias('Count'),
         F.avg('Arrival Delay in Minutes').alias('Avg Delay')
    )
    .orderBy(F.col('Count').desc())
    .show(5)
)
data.select('Type of Travel', 'Class').distinct().show()

+---------------+-----+------------------+
| Type of Travel|Count|         Avg Delay|
+---------------+-----+------------------+
|Business travel|71655|15.326145665710488|
|Personal Travel|32249|14.850664508699307|
+---------------+-----+------------------+

+---------------+--------+
| Type of Travel|   Class|
+---------------+--------+
|Business travel|Business|
|Business travel|Eco Plus|
|Personal Travel|     Eco|
|Personal Travel|Business|
|Personal Travel|Eco Plus|
|Business travel|     Eco|
+---------------+--------+



In [9]:
(data.groupBy('Customer Type', 'Gender')
     .agg(F.count('Customer Type').alias('Count'))
     .show()
)

(data.groupBy('Gender')
     .agg(F.min('Age').alias('Min'),
          F.max('Age').alias('Max'),
          F.avg('Age').alias('Avg'))
     .show()
)

data.groupBy("Class").pivot("Customer Type").count().show()

+-----------------+------+-----+
|    Customer Type|Gender|Count|
+-----------------+------+-----+
|disloyal Customer|  Male| 8712|
|   Loyal Customer|  Male|42465|
|   Loyal Customer|Female|42458|
|disloyal Customer|Female|10269|
+-----------------+------+-----+

+------+---+---+------------------+
|Gender|Min|Max|               Avg|
+------+---+---+------------------+
|Female|  7| 85|  39.2467616211808|
|  Male|  7| 85|39.516677413681926|
+------+---+---+------------------+

+--------+--------------+-----------------+
|   Class|Loyal Customer|disloyal Customer|
+--------+--------------+-----------------+
|Eco Plus|          6779|              715|
|Business|         42309|             7356|
|     Eco|         35835|            10910|
+--------+--------------+-----------------+



In [12]:
num_col = [name[0] for name in data.dtypes if name[1] != 'string']
datacorr = data.select(num_col)

In [11]:
datacorr.pandas_api().corr().style.background_gradient(cmap='coolwarm').format(precision=2)

Unnamed: 0,id,Age,Flight Distance,Inflight wifi service,Departure/Arrival time convenient,Ease of Online booking,Gate location,Food and drink,Online boarding,Seat comfort,Inflight entertainment,On-board service,Leg room service,Baggage handling,Checkin service,Inflight service,Cleanliness,Departure Delay in Minutes,Arrival Delay in Minutes
id,1.0,0.02,0.1,-0.02,-0.0,0.01,-0.0,0.0,0.06,0.05,0.0,0.06,0.04,0.07,0.08,0.08,0.02,-0.02,-0.04
Age,0.02,1.0,0.1,0.02,0.04,0.02,-0.0,0.02,0.21,0.16,0.08,0.06,0.04,-0.05,0.04,-0.05,0.05,-0.01,-0.01
Flight Distance,0.1,0.1,1.0,0.01,-0.02,0.07,0.0,0.06,0.21,0.16,0.13,0.11,0.13,0.06,0.07,0.06,0.09,0.0,-0.0
Inflight wifi service,-0.02,0.02,0.01,1.0,0.34,0.72,0.34,0.13,0.46,0.12,0.21,0.12,0.16,0.12,0.04,0.11,0.13,-0.02,-0.02
Departure/Arrival time convenient,-0.0,0.04,-0.02,0.34,1.0,0.44,0.44,0.0,0.07,0.01,-0.0,0.07,0.01,0.07,0.09,0.07,0.01,0.0,-0.0
Ease of Online booking,0.01,0.02,0.07,0.72,0.44,1.0,0.46,0.03,0.4,0.03,0.05,0.04,0.11,0.04,0.01,0.04,0.02,-0.01,-0.01
Gate location,-0.0,-0.0,0.0,0.34,0.44,0.46,1.0,-0.0,0.0,0.0,0.0,-0.03,-0.01,0.0,-0.04,0.0,-0.0,0.01,0.01
Food and drink,0.0,0.02,0.06,0.13,0.0,0.03,-0.0,1.0,0.23,0.57,0.62,0.06,0.03,0.03,0.09,0.03,0.66,-0.03,-0.03
Online boarding,0.06,0.21,0.21,0.46,0.07,0.4,0.0,0.23,1.0,0.42,0.29,0.16,0.12,0.08,0.2,0.07,0.33,-0.02,-0.02
Seat comfort,0.05,0.16,0.16,0.12,0.01,0.03,0.0,0.57,0.42,1.0,0.61,0.13,0.11,0.07,0.19,0.07,0.68,-0.03,-0.03


In [17]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline

In [34]:
categoricalColumns = [name[0] for name in data.dtypes if name[1] == 'string']
categoricalColumns

['Gender', 'Customer Type', 'Type of Travel', 'Class', 'Satisfaction']

In [22]:
stages = []
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol+' Index', handleInvalid='keep')
    stages += [stringIndexer]

In [26]:
pipeline = Pipeline(stages=stages)
model = pipeline.fit(data)
data_indexed = model.transform(data).drop(*categoricalColumns)

In [266]:
data_indexed.printSchema()
data_indexed.summary().pandas_api()

root
 |-- id: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Flight Distance: integer (nullable = true)
 |-- Inflight wifi service: integer (nullable = true)
 |-- Departure/Arrival time convenient: integer (nullable = true)
 |-- Gate location: integer (nullable = true)
 |-- Food and drink: integer (nullable = true)
 |-- Online boarding: integer (nullable = true)
 |-- On-board service: integer (nullable = true)
 |-- Leg room service: integer (nullable = true)
 |-- Baggage handling: integer (nullable = true)
 |-- Checkin service: integer (nullable = true)
 |-- Inflight service: integer (nullable = true)
 |-- Cleanliness: integer (nullable = true)
 |-- Departure Delay in Minutes: integer (nullable = true)
 |-- Gender Index: byte (nullable = true)
 |-- Customer Type Index: byte (nullable = true)
 |-- Type of Travel Index: byte (nullable = true)
 |-- Class Index: byte (nullable = true)
 |-- Satisfaction Index: byte (nullable = true)



Unnamed: 0,summary,id,Age,Flight Distance,Inflight wifi service,Departure/Arrival time convenient,Gate location,Food and drink,Online boarding,On-board service,Leg room service,Baggage handling,Checkin service,Inflight service,Cleanliness,Departure Delay in Minutes,Gender Index,Customer Type Index,Type of Travel Index,Class Index,Satisfaction Index
0,count,103904.0,103904.0,103904.0,103904.0,103904.0,103904.0,103904.0,103904.0,103904.0,103904.0,103904.0,103904.0,103904.0,103904.0,103904.0,103904.0,103904.0,103904.0,103904.0,103904.0
1,mean,64924.21050200185,39.37970626732368,1189.4483754234675,2.72968316907915,3.06029604250077,2.9768825069294733,3.2021288882044963,3.250375346473668,3.382362565445026,3.3510548198336925,3.6318332306744687,3.304290498922082,3.6404277024946095,3.2863508623344626,14.815618263012013,0.4925411918694179,0.1826782414536495,0.3103730366492147,0.5941349707422235,0.4333326917154296
2,stddev,37463.8122515513,15.114963699737896,997.1472805289563,1.3278294712362229,1.5250751972834864,1.2776210103002337,1.329532710858201,1.349508953713335,1.2883543609017745,1.315604619325567,1.180903411075977,1.2653958269270715,1.1756630338758984,1.3122728470665397,38.23090058414181,0.4999467689025778,0.3864043717876392,0.4626485434800056,0.620798561128819,0.4955379231911788
3,min,1.0,7.0,31.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
4,25%,32537.0,27.0,414.0,2.0,2.0,2.0,2.0,2.0,2.0,2.0,3.0,3.0,3.0,2.0,0.0,0.0,0.0,0.0,0.0,0.0
5,50%,64858.0,40.0,843.0,3.0,3.0,3.0,3.0,3.0,4.0,4.0,4.0,3.0,4.0,3.0,0.0,0.0,0.0,0.0,1.0,0.0
6,75%,97363.0,51.0,1743.0,4.0,4.0,4.0,4.0,4.0,4.0,4.0,5.0,4.0,5.0,4.0,12.0,1.0,0.0,1.0,1.0,1.0
7,max,129880.0,85.0,4983.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,1592.0,1.0,1.0,1.0,2.0,1.0


In [29]:
data_indexed.pandas_api().corr().style.background_gradient(cmap='coolwarm').format(precision=2)

Unnamed: 0,id,Age,Flight Distance,Inflight wifi service,Departure/Arrival time convenient,Ease of Online booking,Gate location,Food and drink,Online boarding,Seat comfort,Inflight entertainment,On-board service,Leg room service,Baggage handling,Checkin service,Inflight service,Cleanliness,Departure Delay in Minutes,Arrival Delay in Minutes,Gender Index,Customer Type Index,Type of Travel Index,Class Index,Satisfaction Index
id,1.0,0.02,0.1,-0.02,-0.0,0.01,-0.0,0.0,0.06,0.05,0.0,0.06,0.04,0.07,0.08,0.08,0.02,-0.02,-0.04,-0.0,0.0,0.0,-0.1,0.01
Age,0.02,1.0,0.1,0.02,0.04,0.02,-0.0,0.02,0.21,0.16,0.08,0.06,0.04,-0.05,0.04,-0.05,0.05,-0.01,-0.01,0.01,-0.28,-0.05,-0.12,0.14
Flight Distance,0.1,0.1,1.0,0.01,-0.02,0.07,0.0,0.06,0.21,0.16,0.13,0.11,0.13,0.06,0.07,0.06,0.09,0.0,-0.0,0.01,-0.23,-0.27,-0.43,0.3
Inflight wifi service,-0.02,0.02,0.01,1.0,0.34,0.72,0.34,0.13,0.46,0.12,0.21,0.12,0.16,0.12,0.04,0.11,0.13,-0.02,-0.02,0.01,-0.01,-0.11,-0.02,0.28
Departure/Arrival time convenient,-0.0,0.04,-0.02,0.34,1.0,0.44,0.44,0.0,0.07,0.01,-0.0,0.07,0.01,0.07,0.09,0.07,0.01,0.0,-0.0,0.01,-0.21,0.26,0.09,-0.05
Ease of Online booking,0.01,0.02,0.07,0.72,0.44,1.0,0.46,0.03,0.4,0.03,0.05,0.04,0.11,0.04,0.01,0.04,0.02,-0.01,-0.01,0.01,-0.02,-0.13,-0.09,0.17
Gate location,-0.0,-0.0,0.0,0.34,0.44,0.46,1.0,-0.0,0.0,0.0,0.0,-0.03,-0.01,0.0,-0.04,0.0,-0.0,0.01,0.01,0.0,0.01,-0.03,-0.0,0.0
Food and drink,0.0,0.02,0.06,0.13,0.0,0.03,-0.0,1.0,0.23,0.57,0.62,0.06,0.03,0.03,0.09,0.03,0.66,-0.03,-0.03,0.01,-0.06,-0.06,-0.08,0.21
Online boarding,0.06,0.21,0.21,0.46,0.07,0.4,0.0,0.23,1.0,0.42,0.29,0.16,0.12,0.08,0.2,0.07,0.33,-0.02,-0.02,-0.04,-0.19,-0.22,-0.3,0.5
Seat comfort,0.05,0.16,0.16,0.12,0.01,0.03,0.0,0.57,0.42,1.0,0.61,0.13,0.11,0.07,0.19,0.07,0.68,-0.03,-0.03,-0.03,-0.16,-0.12,-0.21,0.35


In [31]:
data_indexed = data_indexed.drop('Seat comfort', 'Inflight entertainment', 'Arrival Delay in Minutes')
data_indexed.pandas_api().corr().style.background_gradient(cmap='coolwarm').format(precision=2)

Unnamed: 0,id,Age,Flight Distance,Inflight wifi service,Departure/Arrival time convenient,Ease of Online booking,Gate location,Food and drink,Online boarding,On-board service,Leg room service,Baggage handling,Checkin service,Inflight service,Cleanliness,Departure Delay in Minutes,Gender Index,Customer Type Index,Type of Travel Index,Class Index,Satisfaction Index
id,1.0,0.02,0.1,-0.02,-0.0,0.01,-0.0,0.0,0.06,0.06,0.04,0.07,0.08,0.08,0.02,-0.02,-0.0,0.0,0.0,-0.1,0.01
Age,0.02,1.0,0.1,0.02,0.04,0.02,-0.0,0.02,0.21,0.06,0.04,-0.05,0.04,-0.05,0.05,-0.01,0.01,-0.28,-0.05,-0.12,0.14
Flight Distance,0.1,0.1,1.0,0.01,-0.02,0.07,0.0,0.06,0.21,0.11,0.13,0.06,0.07,0.06,0.09,0.0,0.01,-0.23,-0.27,-0.43,0.3
Inflight wifi service,-0.02,0.02,0.01,1.0,0.34,0.72,0.34,0.13,0.46,0.12,0.16,0.12,0.04,0.11,0.13,-0.02,0.01,-0.01,-0.11,-0.02,0.28
Departure/Arrival time convenient,-0.0,0.04,-0.02,0.34,1.0,0.44,0.44,0.0,0.07,0.07,0.01,0.07,0.09,0.07,0.01,0.0,0.01,-0.21,0.26,0.09,-0.05
Ease of Online booking,0.01,0.02,0.07,0.72,0.44,1.0,0.46,0.03,0.4,0.04,0.11,0.04,0.01,0.04,0.02,-0.01,0.01,-0.02,-0.13,-0.09,0.17
Gate location,-0.0,-0.0,0.0,0.34,0.44,0.46,1.0,-0.0,0.0,-0.03,-0.01,0.0,-0.04,0.0,-0.0,0.01,0.0,0.01,-0.03,-0.0,0.0
Food and drink,0.0,0.02,0.06,0.13,0.0,0.03,-0.0,1.0,0.23,0.06,0.03,0.03,0.09,0.03,0.66,-0.03,0.01,-0.06,-0.06,-0.08,0.21
Online boarding,0.06,0.21,0.21,0.46,0.07,0.4,0.0,0.23,1.0,0.16,0.12,0.08,0.2,0.07,0.33,-0.02,-0.04,-0.19,-0.22,-0.3,0.5
On-board service,0.06,0.06,0.11,0.12,0.07,0.04,-0.03,0.06,0.16,1.0,0.36,0.52,0.24,0.55,0.12,-0.03,0.01,-0.06,-0.06,-0.21,0.32


In [32]:
data_indexed = data_indexed.drop('Ease of Online booking')
data_indexed.pandas_api().corr().style.background_gradient(cmap='coolwarm').format(precision=2)

Unnamed: 0,id,Age,Flight Distance,Inflight wifi service,Departure/Arrival time convenient,Gate location,Food and drink,Online boarding,On-board service,Leg room service,Baggage handling,Checkin service,Inflight service,Cleanliness,Departure Delay in Minutes,Gender Index,Customer Type Index,Type of Travel Index,Class Index,Satisfaction Index
id,1.0,0.02,0.1,-0.02,-0.0,-0.0,0.0,0.06,0.06,0.04,0.07,0.08,0.08,0.02,-0.02,-0.0,0.0,0.0,-0.1,0.01
Age,0.02,1.0,0.1,0.02,0.04,-0.0,0.02,0.21,0.06,0.04,-0.05,0.04,-0.05,0.05,-0.01,0.01,-0.28,-0.05,-0.12,0.14
Flight Distance,0.1,0.1,1.0,0.01,-0.02,0.0,0.06,0.21,0.11,0.13,0.06,0.07,0.06,0.09,0.0,0.01,-0.23,-0.27,-0.43,0.3
Inflight wifi service,-0.02,0.02,0.01,1.0,0.34,0.34,0.13,0.46,0.12,0.16,0.12,0.04,0.11,0.13,-0.02,0.01,-0.01,-0.11,-0.02,0.28
Departure/Arrival time convenient,-0.0,0.04,-0.02,0.34,1.0,0.44,0.0,0.07,0.07,0.01,0.07,0.09,0.07,0.01,0.0,0.01,-0.21,0.26,0.09,-0.05
Gate location,-0.0,-0.0,0.0,0.34,0.44,1.0,-0.0,0.0,-0.03,-0.01,0.0,-0.04,0.0,-0.0,0.01,0.0,0.01,-0.03,-0.0,0.0
Food and drink,0.0,0.02,0.06,0.13,0.0,-0.0,1.0,0.23,0.06,0.03,0.03,0.09,0.03,0.66,-0.03,0.01,-0.06,-0.06,-0.08,0.21
Online boarding,0.06,0.21,0.21,0.46,0.07,0.0,0.23,1.0,0.16,0.12,0.08,0.2,0.07,0.33,-0.02,-0.04,-0.19,-0.22,-0.3,0.5
On-board service,0.06,0.06,0.11,0.12,0.07,-0.03,0.06,0.16,1.0,0.36,0.52,0.24,0.55,0.12,-0.03,0.01,-0.06,-0.06,-0.21,0.32
Leg room service,0.04,0.04,0.13,0.16,0.01,-0.01,0.03,0.12,0.36,1.0,0.37,0.15,0.37,0.1,0.01,0.03,-0.05,-0.14,-0.2,0.31


In [70]:
data_indexed.pandas_api().head(5)

Unnamed: 0,id,Age,Flight Distance,Inflight wifi service,Departure/Arrival time convenient,Gate location,Food and drink,Online boarding,On-board service,Leg room service,Baggage handling,Checkin service,Inflight service,Cleanliness,Departure Delay in Minutes,Gender Index,Customer Type Index,Type of Travel Index,Class Index,Satisfaction Index
0,70172,13,460,3,4,1,5,3,4,3,4,4,5,5,25,1,0,1,2,0
1,5047,25,235,3,2,3,1,3,1,5,3,1,4,1,1,1,1,0,0,0
2,110028,26,1142,2,2,2,5,5,4,3,4,4,4,5,0,0,0,0,0,1
3,24026,25,562,2,5,5,2,2,2,5,3,1,4,2,11,0,0,0,0,0
4,119299,61,214,3,3,3,4,5,3,4,4,3,3,3,0,1,0,0,0,1


## ML Stage

In [47]:
#
#categoricalColumns = [name[0] for name in data.dtypes if name[1] == 'string']

#for categoricalCol in categoricalColumns:
#    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index', handleInvalid = 'keep')
#    encoder = OneHotEncoder(inputCol = stringIndexer.getOutputCol(), outputCol = categoricalCol + "classVec")
#    stages += [stringIndexer, encoder]

In [144]:
numericColumns = [name[0] for name in data_indexed.dtypes if name[1] != 'string']
numericColumns = numericColumns[1:-1]
numericColumns

['Age',
 'Flight Distance',
 'Inflight wifi service',
 'Departure/Arrival time convenient',
 'Gate location',
 'Food and drink',
 'Online boarding',
 'On-board service',
 'Leg room service',
 'Baggage handling',
 'Checkin service',
 'Inflight service',
 'Cleanliness',
 'Departure Delay in Minutes',
 'Gender Index',
 'Customer Type Index',
 'Type of Travel Index',
 'Class Index']

In [145]:
stages = []
assembler = VectorAssembler(inputCols=numericColumns, outputCol="features")
stages += [assembler]

In [146]:
pipeline = Pipeline(stages=stages)
model = pipeline.fit(data_indexed)
ml_data = model.transform(data_indexed)

In [147]:
ml_data.printSchema()
ml_data.pandas_api().head(5)

root
 |-- id: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Flight Distance: integer (nullable = true)
 |-- Inflight wifi service: integer (nullable = true)
 |-- Departure/Arrival time convenient: integer (nullable = true)
 |-- Gate location: integer (nullable = true)
 |-- Food and drink: integer (nullable = true)
 |-- Online boarding: integer (nullable = true)
 |-- On-board service: integer (nullable = true)
 |-- Leg room service: integer (nullable = true)
 |-- Baggage handling: integer (nullable = true)
 |-- Checkin service: integer (nullable = true)
 |-- Inflight service: integer (nullable = true)
 |-- Cleanliness: integer (nullable = true)
 |-- Departure Delay in Minutes: integer (nullable = true)
 |-- Gender Index: byte (nullable = true)
 |-- Customer Type Index: byte (nullable = true)
 |-- Type of Travel Index: byte (nullable = true)
 |-- Class Index: byte (nullable = true)
 |-- Satisfaction Index: byte (nullable = true)
 |-- features: vector (nullable = true

Unnamed: 0,id,Age,Flight Distance,Inflight wifi service,Departure/Arrival time convenient,Gate location,Food and drink,Online boarding,On-board service,Leg room service,Baggage handling,Checkin service,Inflight service,Cleanliness,Departure Delay in Minutes,Gender Index,Customer Type Index,Type of Travel Index,Class Index,Satisfaction Index,features
0,70172,13,460,3,4,1,5,3,4,3,4,4,5,5,25,1,0,1,2,0,"[13.0, 460.0, 3.0, 4.0, 1.0, 5.0, 3.0, 4.0, 3...."
1,5047,25,235,3,2,3,1,3,1,5,3,1,4,1,1,1,1,0,0,0,"[25.0, 235.0, 3.0, 2.0, 3.0, 1.0, 3.0, 1.0, 5...."
2,110028,26,1142,2,2,2,5,5,4,3,4,4,4,5,0,0,0,0,0,1,"[26.0, 1142.0, 2.0, 2.0, 2.0, 5.0, 5.0, 4.0, 3..."
3,24026,25,562,2,5,5,2,2,2,5,3,1,4,2,11,0,0,0,0,0,"[25.0, 562.0, 2.0, 5.0, 5.0, 2.0, 2.0, 2.0, 5...."
4,119299,61,214,3,3,3,4,5,3,4,4,3,3,3,0,1,0,0,0,1,"[61.0, 214.0, 3.0, 3.0, 3.0, 4.0, 5.0, 3.0, 4...."


In [148]:
ml_data.select('features').show(10, truncate=False)

+-----------------------------------------------------------------------------+
|features                                                                     |
+-----------------------------------------------------------------------------+
|[13.0,460.0,3.0,4.0,1.0,5.0,3.0,4.0,3.0,4.0,4.0,5.0,5.0,25.0,1.0,0.0,1.0,2.0]|
|[25.0,235.0,3.0,2.0,3.0,1.0,3.0,1.0,5.0,3.0,1.0,4.0,1.0,1.0,1.0,1.0,0.0,0.0] |
|[26.0,1142.0,2.0,2.0,2.0,5.0,5.0,4.0,3.0,4.0,4.0,4.0,5.0,0.0,0.0,0.0,0.0,0.0]|
|[25.0,562.0,2.0,5.0,5.0,2.0,2.0,2.0,5.0,3.0,1.0,4.0,2.0,11.0,0.0,0.0,0.0,0.0]|
|[61.0,214.0,3.0,3.0,3.0,4.0,5.0,3.0,4.0,4.0,3.0,3.0,3.0,0.0,1.0,0.0,0.0,0.0] |
|[26.0,1180.0,3.0,4.0,1.0,1.0,2.0,3.0,4.0,4.0,4.0,4.0,1.0,0.0,0.0,0.0,1.0,1.0]|
|[47.0,1276.0,2.0,4.0,3.0,2.0,2.0,3.0,3.0,4.0,3.0,5.0,2.0,9.0,1.0,0.0,1.0,1.0]|
|[52.0,2035.0,4.0,3.0,4.0,5.0,5.0,5.0,5.0,5.0,4.0,5.0,4.0,4.0,0.0,0.0,0.0,0.0]|
|[41.0,853.0,1.0,2.0,2.0,4.0,3.0,1.0,2.0,1.0,4.0,1.0,2.0,0.0,0.0,0.0,0.0,0.0] |
|[20.0,1061.0,3.0,3.0,4.0,2.0,3.0,2.0,3.

In [242]:
from pyspark.ml.classification import DecisionTreeClassifier

# Оценка качества модели
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [287]:
train_data, test_data = ml_data.randomSplit([0.9, 0.1])

In [288]:
# labelCol – target, featuresCol – признаки, predictionCol – название колонки с результатом
classifier_tree = DecisionTreeClassifier(labelCol="Satisfaction Index", featuresCol="features", impurity="gini", maxDepth=18, maxBins=30, minInfoGain=0.0)

# Тренируем модель
model_first_tree = classifier_tree.fit(train_data)

# Делаем предсказания на тестовой выборке
predictions_first_tree = model_first_tree.transform(test_data)

evaluator = MulticlassClassificationEvaluator(labelCol="Satisfaction Index", predictionCol="prediction", metricName="accuracy")
evaluator.evaluate(predictions_first_tree)

0.943668799689742

In [240]:
#print(model_first_tree.toDebugString)

In [241]:
predictions_first_tree.pandas_api().head(5)

Unnamed: 0,id,Age,Flight Distance,Inflight wifi service,Departure/Arrival time convenient,Gate location,Food and drink,Online boarding,On-board service,Leg room service,Baggage handling,Checkin service,Inflight service,Cleanliness,Departure Delay in Minutes,Gender Index,Customer Type Index,Type of Travel Index,Class Index,Satisfaction Index,features,rawPrediction,probability,prediction
0,1,48,821,3,3,3,5,3,3,2,5,4,5,5,2,1,1,0,0,0,"[48.0, 821.0, 3.0, 3.0, 3.0, 5.0, 3.0, 3.0, 2....","[19.0, 0.0]","[1.0, 0.0]",0.0
1,4,50,1905,2,2,2,4,4,5,5,5,3,5,4,0,1,0,0,0,1,"[50.0, 1905.0, 2.0, 2.0, 2.0, 4.0, 4.0, 5.0, 5...","[0.0, 10395.0]","[0.0, 1.0]",1.0
2,8,60,853,4,3,4,4,4,3,4,3,3,3,4,0,0,0,0,0,1,"[60.0, 853.0, 4.0, 3.0, 4.0, 4.0, 4.0, 3.0, 4....","[335.0, 615.0]","[0.3526315789473684, 0.6473684210526316]",1.0
3,28,58,2867,0,5,4,5,5,3,3,3,4,3,5,0,1,0,0,0,1,"[58.0, 2867.0, 0.0, 5.0, 4.0, 5.0, 5.0, 3.0, 3...","[0.0, 3458.0]","[0.0, 1.0]",1.0
4,40,51,212,2,2,2,2,4,5,4,4,3,5,5,0,1,0,0,0,1,"[51.0, 212.0, 2.0, 2.0, 2.0, 2.0, 4.0, 5.0, 4....","[0.0, 10395.0]","[0.0, 1.0]",1.0


In [154]:
# labelCol – target, featuresCol – признаки, predictionCol – название колонки с результатом
classifier_tree = DecisionTreeClassifier(labelCol="Satisfaction Index", featuresCol="features")
stages = [classifier_tree]
pipeline_with_optimization = Pipeline(stages=stages)

In [99]:
from pyspark.ml.tuning import ParamGridBuilder

In [182]:
# Определяю гиперпараметры
paramGrid_with_optimization = (ParamGridBuilder()
    .addGrid(classifier_tree.impurity, ["gini", "entropy"])
    .addGrid(classifier_tree.maxDepth, [10, 15, 20, 25])
    .addGrid(classifier_tree.maxBins, [10, 15, 20, 25, 30])
    .addGrid(classifier_tree.minInfoGain, [0.0])
    .build()
)
# impurity - Присмесь
# maxDepth - Максимальная глубина дерева
# maxBins - Максимальное количество бинов (розвилок) в дереве
# minInfoGain- Минимальный прирост информации

In [101]:
# Построение модели (которая будет искать самый оптимальный вариант)
from pyspark.ml.tuning import TrainValidationSplit

In [188]:
# estimator - контейнер с логичкой построения модели
# evaluator - по какой метрике мы будем сравнивать модели
# estimatorParamMaps - гиперпараметры, которые будем варьировать
# trainRatio - с каким соотношением разобьёться train выборка во время обучения
validator = TrainValidationSplit(estimator=pipeline_with_optimization, evaluator=multiClassEval, estimatorParamMaps=paramGrid_with_optimization, trainRatio=0.9)
validator_with_optimization = validator.fit(ml_data)

In [189]:
metrics = validator_with_optimization.validationMetrics #оценки
params = validator_with_optimization.getEstimatorParamMaps() #гиперпараметры
metrics_and_params = list(zip(metrics, params))
metrics_and_params.sort(key=lambda x: x[0], reverse=True)

metrics_and_params[:5]

[(0.9478663834717127,
  {Param(parent='DecisionTreeClassifier_c71b69b31981', name='impurity', doc='Criterion used for information gain calculation (case-insensitive). Supported options: entropy, gini'): 'entropy',
   Param(parent='DecisionTreeClassifier_c71b69b31981', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 15,
   Param(parent='DecisionTreeClassifier_c71b69b31981', name='maxBins', doc='Max number of bins for discretizing continuous features.  Must be >=2 and >= number of categories for any categorical feature.'): 30,
   Param(parent='DecisionTreeClassifier_c71b69b31981', name='minInfoGain', doc='Minimum information gain for a split to be considered at a tree node.'): 0.0}),
 (0.9474802085344661,
  {Param(parent='DecisionTreeClassifier_c71b69b31981', name='impurity', doc='Criterion used for information gain calculation (case-insensitive). Supported options: entropy

In [258]:
# Результат лучшей модели на тестовой выборке
best_model = validator_with_optimization.bestModel
evaluator.evaluate(best_model.transform(test_data))

0.9684482264078784

In [259]:
evaluator.evaluate(best_model.transform(train_data))

0.968644657285034

In [273]:
test_csv = spark.read.csv(".//archive/test.csv", sep=',', encoding="UTF-8", header=True, inferSchema=True)
test_csv = test_csv.drop('_c0', 'Arrival Delay in Minutes', 'Seat comfort', 'Inflight entertainment', 'Ease of Online booking').withColumnRenamed('satisfaction', 'Satisfaction')

testCategoricalColumns = [name[0] for name in test_csv.dtypes if name[1] == 'string']
stages = []
for categoricalCol in testCategoricalColumns:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol+' Index', handleInvalid='keep')
    stages += [stringIndexer]

pipeline = Pipeline(stages=stages)
model = pipeline.fit(test_csv)
test_csv_data = model.transform(test_csv).drop(*categoricalColumns)
    
test_csv_data.printSchema()

root
 |-- id: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Flight Distance: integer (nullable = true)
 |-- Inflight wifi service: integer (nullable = true)
 |-- Departure/Arrival time convenient: integer (nullable = true)
 |-- Gate location: integer (nullable = true)
 |-- Food and drink: integer (nullable = true)
 |-- Online boarding: integer (nullable = true)
 |-- On-board service: integer (nullable = true)
 |-- Leg room service: integer (nullable = true)
 |-- Baggage handling: integer (nullable = true)
 |-- Checkin service: integer (nullable = true)
 |-- Inflight service: integer (nullable = true)
 |-- Cleanliness: integer (nullable = true)
 |-- Departure Delay in Minutes: integer (nullable = true)
 |-- Gender Index: double (nullable = false)
 |-- Customer Type Index: double (nullable = false)
 |-- Type of Travel Index: double (nullable = false)
 |-- Class Index: double (nullable = false)
 |-- Satisfaction Index: double (nullable = false)



In [274]:
numericColumns = [name[0] for name in test_csv_data.dtypes if name[1] != 'string']
numericColumns = numericColumns[1:-1]
numericColumns

['Age',
 'Flight Distance',
 'Inflight wifi service',
 'Departure/Arrival time convenient',
 'Gate location',
 'Food and drink',
 'Online boarding',
 'On-board service',
 'Leg room service',
 'Baggage handling',
 'Checkin service',
 'Inflight service',
 'Cleanliness',
 'Departure Delay in Minutes',
 'Gender Index',
 'Customer Type Index',
 'Type of Travel Index',
 'Class Index']

In [276]:
assembler = VectorAssembler(inputCols=numericColumns, outputCol="features")
test_data = assembler.transform(test_csv_data)
test_data.show(1, truncate=False, vertical=True)

-RECORD 0----------------------------------------------------------------------------------------------------------
 id                                | 19556                                                                         
 Age                               | 52                                                                            
 Flight Distance                   | 160                                                                           
 Inflight wifi service             | 5                                                                             
 Departure/Arrival time convenient | 4                                                                             
 Gate location                     | 4                                                                             
 Food and drink                    | 3                                                                             
 Online boarding                   | 4                                  

## Оценки моделей с предобработанными данными

In [277]:
f1Evaluator = MulticlassClassificationEvaluator(labelCol="Satisfaction Index", predictionCol="prediction", metricName="f1")
f1Evaluator.evaluate(best_model.transform(test_data))

0.952434406094673

In [278]:
evaluator.evaluate(best_model.transform(test_data))

0.9524946104096089

## Если данные не предобрабатывать

In [269]:
data_raw = spark.read.csv(".//archive/train.csv", sep=',', encoding="UTF-8", header=True, inferSchema=True)
data_raw = data_raw.drop('_c0').withColumnRenamed('satisfaction', 'Satisfaction')

rawCategoricalColumns = [name[0] for name in data_raw.dtypes if name[1] == 'string']

stages = []
for categoricalCol in rawCategoricalColumns:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol+' Index', handleInvalid='keep')
    stages += [stringIndexer]

pipeline = Pipeline(stages=stages)
model = pipeline.fit(data_raw)
data_raw = model.transform(data_raw).drop(*rawCategoricalColumns)
    
data_raw.printSchema()

numericColumns = [name[0] for name in test_csv.dtypes if name[1] != 'string']
numericColumns = numericColumns[1:-1]

assembler = VectorAssembler(inputCols=numericColumns, outputCol="features")
data_raw = assembler.transform(data_raw)
data_raw.show(1, truncate=False, vertical=True)

root
 |-- id: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Flight Distance: integer (nullable = true)
 |-- Inflight wifi service: integer (nullable = true)
 |-- Departure/Arrival time convenient: integer (nullable = true)
 |-- Ease of Online booking: integer (nullable = true)
 |-- Gate location: integer (nullable = true)
 |-- Food and drink: integer (nullable = true)
 |-- Online boarding: integer (nullable = true)
 |-- Seat comfort: integer (nullable = true)
 |-- Inflight entertainment: integer (nullable = true)
 |-- On-board service: integer (nullable = true)
 |-- Leg room service: integer (nullable = true)
 |-- Baggage handling: integer (nullable = true)
 |-- Checkin service: integer (nullable = true)
 |-- Inflight service: integer (nullable = true)
 |-- Cleanliness: integer (nullable = true)
 |-- Departure Delay in Minutes: integer (nullable = true)
 |-- Arrival Delay in Minutes: double (nullable = true)
 |-- Gender Index: double (nullable = false)
 |-- Custome

In [279]:
train_data_raw, test_data_raw = data_raw.randomSplit([0.85, 0.15])

raw_classifier_tree = DecisionTreeClassifier(labelCol="Satisfaction Index", featuresCol="features", impurity="entropy", maxDepth=18, maxBins=30, minInfoGain=0.0)

# Тренируем модель
model_raw_tree = raw_classifier_tree.fit(train_data_raw)

# Делаем предсказания на тестовой выборке
predictions_raw_tree = model_raw_tree.transform(test_data_raw)

f1Evaluator.evaluate(predictions_raw_tree)

0.9470069349498844

In [280]:
evaluator.evaluate(predictions_raw_tree)

0.9470546759823089

In [285]:
data_raw.groupBy('Satisfaction Index').count().show()
predictions_raw_tree_train = model_raw_tree.transform(data_raw)
predictions_raw_tree_train.groupBy('prediction').count().show()

+------------------+-----+
|Satisfaction Index|count|
+------------------+-----+
|               0.0|58879|
|               1.0|45025|
+------------------+-----+

+----------+-----+
|prediction|count|
+----------+-----+
|       0.0|59640|
|       1.0|44264|
+----------+-----+



## Random Forest

In [289]:
from pyspark.ml.classification import RandomForestClassifier

In [290]:
# Поменяем модель с дерева на лес
classifier_forest = RandomForestClassifier(labelCol="Satisfaction Index", featuresCol="features")

stages_new = [classifier_forest]

pipeline_forest = Pipeline(stages=stages_new)

In [307]:
paramGrid_forest = (ParamGridBuilder()
    .addGrid(classifier_forest.impurity, ['entropy', "gini"])
    .addGrid(classifier_forest.maxDepth, [10, 15, 18, 20])
    .addGrid(classifier_forest.maxBins, [25, 30])
    .addGrid(classifier_forest.minInfoGain, [0.0])
    .addGrid(classifier_forest.numTrees, [14, 16])
    .build()
)

In [308]:
# Построение модели (которая будет искать самый оптимальный вариант)
validator = TrainValidationSplit(estimator=classifier_forest, evaluator=f1Evaluator, estimatorParamMaps=paramGrid_forest, trainRatio=0.9)
validator_forest = validator.fit(ml_data)

# Извлекаю  RandomForestClassifier() из PipelineModel
best_model_forest = validator_forest.bestModel

In [309]:
evaluator.evaluate(best_model_forest.transform(test_data))

0.9852627496606554

In [310]:
f1Evaluator.evaluate(best_model_forest.transform(test_data))

0.9852491838919855

In [311]:
metrics = validator_forest.validationMetrics #оценки
params = validator_forest.getEstimatorParamMaps() #гиперпараметры
metrics_and_params = list(zip(metrics, params))
metrics_and_params.sort(key=lambda x: x[0], reverse=True)

metrics_and_params[0]

(0.9567377768957863,
 {Param(parent='RandomForestClassifier_8dd99bd30fcf', name='impurity', doc='Criterion used for information gain calculation (case-insensitive). Supported options: entropy, gini'): 'gini',
  Param(parent='RandomForestClassifier_8dd99bd30fcf', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 18,
  Param(parent='RandomForestClassifier_8dd99bd30fcf', name='maxBins', doc='Max number of bins for discretizing continuous features.  Must be >=2 and >= number of categories for any categorical feature.'): 25,
  Param(parent='RandomForestClassifier_8dd99bd30fcf', name='minInfoGain', doc='Minimum information gain for a split to be considered at a tree node.'): 0.0,
  Param(parent='RandomForestClassifier_8dd99bd30fcf', name='numTrees', doc='Number of trees to train (>= 1).'): 14})

In [None]:
spark.stop()