In [None]:
pip install pyspark



In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
from pyspark.ml.classification import LogisticRegression

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import StringIndexer

from pyspark.sql import SparkSession

from pyspark.ml import Pipeline

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
session = SparkSession.builder.appName("RegressionPipeline").getOrCreate()

In [None]:
data_path = "/content/drive/MyDrive/Colab Notebooks/Airline_customer_satisfaction.csv"

In [None]:
data = session.read.csv(data_path, header=True, inferSchema=True)
# data.printSchema()
data.show(5)

+------------+--------------+---+---------------+--------+---------------+------------+---------------------------------+--------------+-------------+---------------------+----------------------+--------------+----------------------+----------------+----------------+----------------+---------------+-----------+---------------+--------------------------+------------------------+
|satisfaction| Customer Type|Age| Type of Travel|   Class|Flight Distance|Seat comfort|Departure/Arrival time convenient|Food and drink|Gate location|Inflight wifi service|Inflight entertainment|Online support|Ease of Online booking|On-board service|Leg room service|Baggage handling|Checkin service|Cleanliness|Online boarding|Departure Delay in Minutes|Arrival Delay in Minutes|
+------------+--------------+---+---------------+--------+---------------+------------+---------------------------------+--------------+-------------+---------------------+----------------------+--------------+----------------------+-----

In [None]:
data = data.na.drop()

In [None]:
from pyspark.sql.functions import when, count, isnull
cols = data.columns
data.select([count(when(isnull(c), c)).alias(c) for c in cols]).show()

+------------+-------------+---+--------------+-----+---------------+------------+---------------------------------+--------------+-------------+---------------------+----------------------+--------------+----------------------+----------------+----------------+----------------+---------------+-----------+---------------+--------------------------+------------------------+
|satisfaction|Customer Type|Age|Type of Travel|Class|Flight Distance|Seat comfort|Departure/Arrival time convenient|Food and drink|Gate location|Inflight wifi service|Inflight entertainment|Online support|Ease of Online booking|On-board service|Leg room service|Baggage handling|Checkin service|Cleanliness|Online boarding|Departure Delay in Minutes|Arrival Delay in Minutes|
+------------+-------------+---+--------------+-----+---------------+------------+---------------------------------+--------------+-------------+---------------------+----------------------+--------------+----------------------+----------------+---

In [None]:
data.select('satisfaction').distinct().collect()

[Row(satisfaction='dissatisfied'), Row(satisfaction='satisfied')]

Index string-typed columns by doing indexation for each column in a pipeline

In [None]:
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(data) for column in list(set(['satisfaction', 'Customer Type', 'Type of Travel','Class'])) ]
indexers_pipeline = Pipeline(stages=indexers)
#data_indexed = pipeline.fit(data).transform(data)
#data_indexed.show(5)
#indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(data) for column in list(set(data.columns)-set(['Age', 'Flight Distance','Seat comfort','Departure/Arrival time convenient','Food and drink','Gate location','Inflight wifi service','Inflight entertainment','Online support','Ease of Online booking','On-board service','Leg room service','Baggage handling','Checkin service','Cleanliness','Online boarding','Departure Delay in Minutes','Arrival Delay in Minutes'])) ]
#indexers_pipeline = Pipeline(stages=indexers)
#data_indexed = pipeline.fit(data).transform(data)
#data_indexed.show(5)

Assemble feature columns in a vector

In [None]:
input_features = ['Age','satisfaction_index', 'Customer Type_index', 'Type of Travel_index','Class_index', 'Flight Distance','Seat comfort','Departure/Arrival time convenient','Food and drink','Gate location','Inflight wifi service','Inflight entertainment','Online support','Ease of Online booking','On-board service','Leg room service','Baggage handling','Checkin service','Cleanliness','Online boarding','Departure Delay in Minutes','Arrival Delay in Minutes']
vectorAssembler = VectorAssembler(inputCols=input_features, outputCol="features")

Normalize features

In [None]:
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")

Define the logistic regression classifier

In [None]:
classifier = LogisticRegression(featuresCol="scaledFeatures", labelCol="satisfaction_index")

Define the pipeline containing all the steps

In [None]:
pipeline = Pipeline(stages=[indexers_pipeline, vectorAssembler, scaler, classifier])

Split data into train and test sets

In [None]:
(trainingData, testData) = data.randomSplit([0.7, 0.3], seed=42)

Train the model

In [None]:
model = pipeline.fit(trainingData)

Make predictions for the test set

In [None]:
predictions = model.transform(testData)

Evaluate the accuracy of predictions

In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol="satisfaction_index", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy =", accuracy)

Accuracy = 1.0


Stop the pyspark session

In [None]:
session.stop()