# Inferența în timp real a satisfacției față de o companie aeriană cu ajutorul Spark Structured Streaming

În continuare vom folosi clasificatorul cu ajutorul căruia am obținut cele mai bune performanțe (**Random Forest**) pentru a prezice în timp real satisfacția pasagerilor din setul de date de test față de compania aeriană.

In [None]:
!pip install pyspark

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("airline satisfaction streaming").getOrCreate()

## Aplicarea pașilor de pregătire a datelor și a modelului pe setul de date static.

In [None]:
from pyspark.sql.functions import when, col

airline = spark.read.csv("/content/drive/MyDrive/Proiect Big Data/airline_satisfaction.csv", header=True, inferSchema=True) \
               .drop("_c0", "id").na.drop().withColumn("label", when(col("satisfaction") == "satisfied", 1).otherwise(0))
airline.show(5)

+------+-----------------+---+---------------+--------+---------------+---------------------+---------------------------------+----------------------+-------------+--------------+---------------+------------+----------------------+----------------+----------------+----------------+---------------+----------------+-----------+--------------------------+------------------------+--------------------+-----+
|Gender|    Customer Type|Age| Type of Travel|   Class|Flight Distance|Inflight wifi service|Departure/Arrival time convenient|Ease of Online booking|Gate location|Food and drink|Online boarding|Seat comfort|Inflight entertainment|On-board service|Leg room service|Baggage handling|Checkin service|Inflight service|Cleanliness|Departure Delay in Minutes|Arrival Delay in Minutes|        satisfaction|label|
+------+-----------------+---+---------------+--------+---------------+---------------------+---------------------------------+----------------------+-------------+--------------+-------

In [None]:
# împărțirea în date de train și test
train_airline_data, test_airline_data = airline.randomSplit([0.7, 0.3], seed=22)

In [None]:
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.classification import RandomForestClassifier

# transformarea variabilelor categoriale
gender_indexer = StringIndexer(inputCol="Gender", outputCol="Gender Index")
customer_type_indexer = StringIndexer(inputCol="Customer Type", outputCol="Customer Type Index")
travel_type_indexer = StringIndexer(inputCol="Type of Travel", outputCol="Travel Type Index")
class_indexer = StringIndexer(inputCol="Class", outputCol="Class Index")
ohe = OneHotEncoder(inputCols=["Gender Index", "Customer Type Index", "Travel Type Index", "Class Index"], \
                    outputCols=["Gender OHE", "Customer Type OHE", "Travel Type OHE", "Class OHE"])

# alegerea și asamblarea coloanelor de tip feature
cols = [c for c in airline.columns if c not in ["Gender", "Customer Type", "Type of Travel", "Class", "satisfaction", "label"]]
cols.extend(["Gender OHE", "Customer Type OHE", "Travel Type OHE", "Class OHE"])
assembler = VectorAssembler(inputCols=cols, outputCol="features")

# instanțierea modelului
rf = RandomForestClassifier(featuresCol="features", labelCol="label", seed=100, bootstrap=True, maxDepth=15, numTrees=100)

In [None]:
# definirea pipeline-ului
from pyspark.ml import Pipeline

rf_pipeline = Pipeline(stages=[gender_indexer, customer_type_indexer, travel_type_indexer, class_indexer, ohe, assembler, rf])

In [None]:
# antrenarea modelului
rf_fit = rf_pipeline.fit(train_airline_data)

In [None]:
# efectuarea predicțiilor
pred_rf = rf_fit.transform(test_airline_data)

In [None]:
# evaluarea modelului cu ajutorul metricii Area Under the Curve
from pyspark.ml.evaluation import BinaryClassificationEvaluator

rf_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='label')
rf_eval.evaluate(pred_rf)

0.956871632231871

## Folosirea Structured Streaming pentru predicția în timp real

Împărțim setul de date de test în 10 și salvăm fișierele într-un folder separat.

Apoi citim datele sub formă de stream, aplicăm modelul antrenat pe ele și evaluăm performanța. Pentru a scoate în evidență procesul de streaming vom aștepta câteva secunde între apelurile modelului și vom afișa de fiecare dată numărul de linii ale DataFrame-ului.

In [None]:
# repartiționăm datele de test în 10 și salvăm fișierele create
test_data = test_airline_data.repartition(10)

import os
import shutil

dir_path = "/content/drive/MyDrive/Proiect Big Data/streaming/"

if os.path.exists(dir_path):
  shutil.rmtree(dir_path, ignore_errors=True)

test_data.write.format("CSV").option("header", True).save(dir_path)

In [None]:
# definirea schemei pentru procesul de streaming
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

schema = StructType([StructField("Gender", StringType(), True),
                     StructField("Customer Type", StringType(), True),
                     StructField("Age", IntegerType(), True),
                     StructField("Type of Travel", StringType(), True),
                     StructField("Class", StringType(), True),
                     StructField("Flight Distance", IntegerType(), True),
                     StructField("Inflight wifi service", IntegerType(), True),
                     StructField("Departure/Arrival time convenient", IntegerType(), True),
                     StructField("Ease of Online booking", IntegerType(), True),
                     StructField("Gate location", IntegerType(), True),
                     StructField("Food and drink", IntegerType(), True),
                     StructField("Online boarding", IntegerType(), True),
                     StructField("Seat comfort", IntegerType(), True),
                     StructField("Inflight entertainment", IntegerType(), True),
                     StructField("On-board service", IntegerType(), True),
                     StructField("Leg room service", IntegerType(), True),
                     StructField("Baggage handling", IntegerType(), True),
                     StructField("Checkin service", IntegerType(), True),
                     StructField("Inflight service", IntegerType(), True),
                     StructField("Cleanliness", IntegerType(), True),
                     StructField("Departure Delay in Minutes", IntegerType(), True),
                     StructField("Arrival Delay in Minutes", DoubleType(), True),
                     StructField("satisfaction", StringType(), True),
                     StructField("label", IntegerType(), True)
                     ])

In [None]:
sourceStream = spark.readStream.schema(schema) \
                               .option("maxFilesPerTrigger", 1) \
                               .csv(dir_path, header=True)

streamingPredictions = rf_fit.transform(sourceStream)

streamingPredictions.isStreaming

True

In [None]:
# efectuarea predicțiilor
query = streamingPredictions.writeStream.format("memory").queryName("predictions").start()

In [None]:
from pyspark.sql.functions import monotonically_increasing_id
import time

time.sleep(10)

pred = spark.sql("SELECT * FROM predictions")

pred.withColumn("index", monotonically_increasing_id()) \
    .orderBy(col("index").desc()).drop("index") \
    .select(["Gender", "Customer Type", "Age", "Type of Travel", "Class", "label", "probability", "prediction"]).show(5)

print(f"\nNumber of lines: {pred.count()}\nCurrent AUC: {rf_eval.evaluate(pred)}")

+------+--------------+---+---------------+--------+-----+--------------------+----------+
|Gender| Customer Type|Age| Type of Travel|   Class|label|         probability|prediction|
+------+--------------+---+---------------+--------+-----+--------------------+----------+
|Female|Loyal Customer| 34|Business travel|Business|    1|[0.00675710154727...|       1.0|
|  Male|Loyal Customer| 35|Business travel|     Eco|    1|[0.15491418905770...|       1.0|
|Female|Loyal Customer| 49|Personal Travel|     Eco|    0|[0.99935691604687...|       0.0|
|  Male|Loyal Customer| 28|Personal Travel|Business|    0|[0.95385580357144...|       0.0|
|Female|Loyal Customer| 52|Personal Travel|Business|    0|[0.96886090550094...|       0.0|
+------+--------------+---+---------------+--------+-----+--------------------+----------+
only showing top 5 rows


Number of lines: 6240
Current AUC: 0.9582375174930883


In [None]:
time.sleep(10)

pred = spark.sql("SELECT * FROM predictions")

pred.withColumn("index", monotonically_increasing_id()) \
    .orderBy(col("index").desc()).drop("index") \
    .select(["Gender", "Customer Type", "Age", "Type of Travel", "Class", "label", "probability", "prediction"]).show(5)

print(f"\nNumber of lines: {pred.count()}\nCurrent AUC: {rf_eval.evaluate(pred)}")

+------+--------------+---+---------------+--------+-----+--------------------+----------+
|Gender| Customer Type|Age| Type of Travel|   Class|label|         probability|prediction|
+------+--------------+---+---------------+--------+-----+--------------------+----------+
|Female|Loyal Customer| 79|Business travel|Business|    0|[0.95080963005910...|       0.0|
|Female|Loyal Customer| 34|Personal Travel|     Eco|    0|[0.98937353239300...|       0.0|
|Female|Loyal Customer| 33|Business travel|Business|    1|[0.00400980220227...|       1.0|
|Female|Loyal Customer| 15|Business travel|Business|    0|[0.91237485695802...|       0.0|
|  Male|Loyal Customer|  8|Personal Travel|Business|    1|[0.55988486601351...|       0.0|
+------+--------------+---+---------------+--------+-----+--------------------+----------+
only showing top 5 rows


Number of lines: 15601
Current AUC: 0.9555831224815547


In [None]:
time.sleep(10)

pred = spark.sql("SELECT * FROM predictions")

pred.withColumn("index", monotonically_increasing_id()) \
    .orderBy(col("index").desc()).drop("index") \
    .select(["Gender", "Customer Type", "Age", "Type of Travel", "Class", "label", "probability", "prediction"]).show(5)

print(f"\nNumber of lines: {pred.count()}\nCurrent AUC: {rf_eval.evaluate(pred)}")

+------+-----------------+---+---------------+--------+-----+--------------------+----------+
|Gender|    Customer Type|Age| Type of Travel|   Class|label|         probability|prediction|
+------+-----------------+---+---------------+--------+-----+--------------------+----------+
|  Male|   Loyal Customer| 34|Personal Travel|     Eco|    0|[0.99901953384427...|       0.0|
|  Male|   Loyal Customer| 40|Business travel|Business|    0|[0.64287888443651...|       0.0|
|  Male|disloyal Customer| 26|Business travel|     Eco|    0|[0.89145069444397...|       0.0|
|Female|   Loyal Customer| 49|Personal Travel|     Eco|    0|[0.98666903019781...|       0.0|
|  Male|   Loyal Customer| 57|Business travel|     Eco|    0|[0.98354241106963...|       0.0|
+------+-----------------+---+---------------+--------+-----+--------------------+----------+
only showing top 5 rows


Number of lines: 21841
Current AUC: 0.9567474969906583


In [None]:
query.stop()