# Tarea 3

Autor: Fabian Morera Gutierrez

Profesor: Dr. Juan Manuel Esquivel Rodriguez

Curso: Big Data

# Entrenamiento de un modelo de clasificación binaria.

El presente programa tiene como finalidad la creación de un modelo de clasificaciónbinaria, de principio a fin, utilizando el framwork Apache Spark. Para tal fin, se utilizará una fuente de datos en formato CSV,la cual se describirá con mayor detalle más adelante. 

Para lograr el objetivo, se iniciará desde la lectura de los datos, limpieza de los mismos, análisis descriptivo, almacenamiento intermedio y entrenamiento de un modelo de regresión.

Para esta tarea se utiliza un conjunto de datos relacionado con la predicción binaria de la satisfacción de los clientes de una aerolínera ficticia llamada Invistico, la cual se puede descargar desde https://www.kaggle.com/sjleshrac/airlines-customer-satisfaction.

# Dataset: Invistico Airlines

Este dataset incluye información relacionada con la retroalimentación de clientes pasados, que volaron con dicha compañía, así como de información específica del vuelo en cuestión de cada entrevistado.

El objetivo principal de este set de datos, es el de lograr predecir si un futuro cliente de la aerolínea se mostrará satisfecho con el servicio proveído, o no, dados los valores de los demás parámetros.

Los parámetros originales del dataset son los siguientes:

Nota: Los nombres se encuentran tal y como estan escritos en el archivo original. Igualmete, dado el contexto, los términos pasajero, cliente e usuario son equivalentes.

- satisfaction: String. Estado de satisfacción del usuario, con las opciones "satisfied" y "dissatisfied". 
- Gender: String. Género del usuario.
- Customer Type: String. Categoría del usuario, con las opciones binarias "Loyal Customer" y "disloyal Customer".
- Age: Integer. Edad completa del usuario volando.
- Type of Travel: String. Tipo de vuelo realizado, ya sea "Business travel" o "Personal Travel".
- Class: String. Clase de asiento o tiquete del pasajaro. (Por ejemplo, "Business", "Eco", etc.).
- Flight Distance: Integer. Distancia total del vuelo.
- Seat comfort: Integer (0-5). Grado de confort del asiento.
- Departure/Arrival time convenient: Integer (0-5). Grado de conveniencia de las horas de despegue y aterrizaje según el pasajero y sus necesidades.
- Food and Drink: Integer (0-5). Nivel de agrado con la calidad y cantidad de alimentación y bebidas otorgadas al usuario.
- Gate location: Integer (0-5). Nivel de satisfacción del pasajero con la ubicación de ls puerta de abordaje dentro del aeropuerto.
- Inflight wifi service: Integer (0-5). Nivel de satisfacción del pasajero con la disponibilidad y/o calidad del servicio de wifi dentro del avión.
- Inflight entertainment: Integer (0-5). Nivel de satisfacción del pasajero con las opciones y/o calidad del entretenimiento disponible a bordo del avión.
- Online support: Integer (0-5). Nivel de satisfacción con el soporte al cliente via online.
- Ease of Online bookin: Integer (0-5). Nivel de satisfacción del cliente con el sistema de reserva online para el vuelo en cuestión.
- On-board service: Integer (0-5). Grado de satisfacción del pasajero con el servicio de abordaje.
- Leg room service: Integer (0-5). Grado de satisfacción del cliente con el espacio disponible para las piernas a bordo del avión.
- Baggage handling: Integer (0-5). Nivel de satisfacción del cliente con el manejo de sus maletas durante todo el proceso.
- Checkin service: Integer (0-5). Grado de satiscacción del usuario con el servicio de "Check-In" proporsionado para el vuelo.
- Cleanliness: Integer (0-5).Nivel de satisfacción del pasajero respecto a la limpieza del avión.
- Online boarding: Integer (0-5). Grado de satisfacción del usuario respecto al servicio de abordaje online.
- Departure Delay in Minutes: Integer. Cantidad de minutos de retraso del despegue, según la hora establecida originalmente.
- Arrival Delay in Minutes: Integer. Cantidad de minutos de retraso del aterrizaje, según la hora establecida originalmente.

## Variable a predecir: satisfaction. 
(Traducida eventualmente a una variable binaria "isSatisfied", la cual denota si el pasajero se encuentra satisfecho, o no, con el servicio otorgado).


## Dependencias
Importamos dependencias requeridas a lo largo del programa aquí.

In [None]:
# Numpy y Matplotlib.
import numpy as np
import matplotlib.mlab as mlab
import matplotlib.pyplot as plt

# Others.
import findspark
import tempfile
import random

# Pyspark main.
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Pyspark machine learning.
from pyspark.ml.classification import LogisticRegression
from pyspark.mllib.regression import LabeledPoint
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.linalg import Vectors
from pyspark.ml.tuning import CrossValidatorModel, CrossValidator, ParamGridBuilder
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler, HashingTF, Tokenizer, StandardScaler
from pyspark.ml import Pipeline

from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from pyspark.mllib.util import MLUtils

# Seaborn.
import seaborn as sns

print("Dependencies loaded.")

# Lectura y limpieza 
Una de las tareas más comunes al procesar datos es el ajuste y limpieza de los datos. En las celdas siguientes leeremos los datos del archivo Invistico_Airline.csv y realizaremos diferentes pasos para poder cargar los datos a una tabla en PostgreSQL que almacenará el conjunto de datos en su forma deseada para abordar el análisis.

In [None]:
findspark.init('/usr/lib/python3.7/site-packages/pyspark')

# Cargar el conjunto de datos completo. Este paso no realiza ningún ajuste; simplemente lectura
spark = SparkSession \
    .builder \
    .appName("Basic JDBC pipeline") \
    .config("spark.driver.extraClassPath", "postgresql-42.2.14.jar") \
    .config("spark.executor.extraClassPath", "postgresql-42.2.14.jar") \
    .getOrCreate()

# Lectura y definición del schema para los datos iniciales.
flights_df = spark \
    .read \
    .format("csv") \
    .option("path", "../resources/Invistico_Airline.csv") \
    .option("header", False) \
    .schema(StructType([
        StructField("satisfaction", StringType()),
        StructField("gender", StringType()),
        StructField("customer_type", StringType()),
        StructField("age", IntegerType()),
        StructField("type_of_travel", StringType()),
        StructField("class", StringType()),
        StructField("flight_distance", IntegerType()),
        StructField("seat_comfort", IntegerType()),
        StructField("departure_arrival_time_convenient", IntegerType()),
        StructField("food_drink", IntegerType()),
        StructField("gate_location", IntegerType()),
        StructField("inflight_wifi_service", IntegerType()),
        StructField("inflight_entertainment", IntegerType()),
        StructField("online_support", IntegerType()),
        StructField("ease_of_online_booking", IntegerType()),
        StructField("onboard_service", IntegerType()),
        StructField("leg_room_service", IntegerType()),
        StructField("baggage_handling", IntegerType()),
        StructField("checkin_service", IntegerType()),
        StructField("cleanliness", IntegerType()),
        StructField("online_boarding", IntegerType()),
        StructField("departure_delay_minutes", IntegerType()),
        StructField("arrival_delay_minutes", IntegerType())])) \
    .load()

flights_df.printSchema()
flights_df.show()

root
 |-- id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- price: string (nullable = true)
 |-- bedrooms: integer (nullable = true)
 |-- bathrooms: float (nullable = true)
 |-- sqft_living: integer (nullable = true)
 |-- sqft_lot: integer (nullable = true)
 |-- floors: string (nullable = true)
 |-- waterfront: integer (nullable = true)
 |-- view: integer (nullable = true)
 |-- condition: integer (nullable = true)
 |-- grade: integer (nullable = true)
 |-- sqft_above: integer (nullable = true)
 |-- sqft_basement: integer (nullable = true)
 |-- yr_built: integer (nullable = true)
 |-- yr_renovated: integer (nullable = true)
 |-- zipcode: string (nullable = true)
 |-- lat: float (nullable = true)
 |-- long: float (nullable = true)
 |-- sqft_living15: integer (nullable = true)
 |-- sqft_lot15: integer (nullable = true)

+----------+---------------+----------+--------+---------+-----------+--------+------+----------+----+---------+-----+----------+-------------+--------

## Ajuste de variables.
Algunas variables tienen que ser reinterpretadas como valores numéricos.

In [None]:
# Ajusta tipo de columnas para que sean númericas.
flights_df = flights_df.na.drop()

correct_types_df = flights_df \
    .withColumn('satisfaction', F.upper(F.col('satisfaction'))) \
    .withColumn("satisfaction", F.trim(F.col("satisfaction"))) \
    .withColumn('customer_type', F.upper(F.col('customer_type'))) \
    .withColumn("customer_type", F.trim(F.col("customer_type"))) \
    .withColumn('gender', F.upper(F.col('gender'))) \
    .withColumn("gender", F.trim(F.col("gender"))) \
    .withColumn('type_of_travel', F.upper(F.col('type_of_travel'))) \
    .withColumn("type_of_travel", F.trim(F.col("type_of_travel"))) \
    .withColumn('class', F.upper(F.col('class'))) \
    .withColumn("class", F.trim(F.col("class"))) \
    .withColumn('isSatisfied',
                F.when(F.col('satisfaction') == "SATISFIED", 1)
                .when(F.col('satisfaction') == "DISSATISFIED", 0)) \
    .withColumn('isMale',
                F.when(F.col("gender") == "MALE", 1)
                .otherwise(0)) \
    .withColumn('isFemale', 
                F.when(F.col("gender") == "FEMALE", 1)
                .otherwise(0)) \
    .withColumn('isLoyalCustomer', 
                F.when(F.col("customer_type") == "LOYAL CUSTOMER", 1)
                .otherwise(0)) \
    .withColumn('isDisloyalCustomer', 
                F.when(F.col("customer_type") == "DISLOYAL CUSTOMER", 1)
                .otherwise(0)) \
    .withColumn('isBusinessTravel', 
                F.when(F.col("type_of_travel") == "BUSINESS TRAVEL", 1)
                .otherwise(0)) \
    .withColumn('isPersonalTravel', 
                F.when(F.col("type_of_travel") == "PERSONAL TRAVEL", 1)
                .otherwise(0)) \
     .withColumn('isBusinessClass', 
                F.when(F.col("class") == "BUSINESS", 1)
                .otherwise(0)) \
    .withColumn('isEcoClass', 
                F.when(F.col("class") == "ECO", 1)
                .otherwise(0)) \
    .withColumn('isOtherClass', 
                F.when((F.col("class") != "ECO") & (F.col("class") != "BUSINESS"), 1)
                .otherwise(0))

# Eliminar las columnas que ya no se necesitan dado a los cambios anteriores.
correct_types_df = correct_types_df \
    .drop(F.col("satisfaction")) \
    .drop(F.col("gender")) \
    .drop(F.col("customer_type")) \
    .drop(F.col("type_of_travel")) \
    .drop(F.col("class")) \

# Print.
print("Corrected Types:\n")
correct_types_df.printSchema()
correct_types_df.show()

## Almacenar en Base de Datos.
Se almacena el conjunto de datos limpios en la tabla "flights" dentro de un contenedor de postgres.

In [None]:
# Almacenar el conjunto de datos limpio en la base de datos, la cual debe correr en 
# un contenedor aparte al momento de ejecutarse.

# Utilizamos el método de sobreescribir.
OVERWRITE_MODE = 'overwrite'

correct_types_df \
    .write \
    .format("jdbc") \
    .mode(OVERWRITE_MODE) \
    .option("url", "jdbc:postgresql://host.docker.internal:5433/postgres") \
    .option("user", "postgres") \
    .option("password", "testPassword") \
    .option("dbtable", "tarea3") \
    .save()

print("Clean flights data saved!\n")

# Inspección de datos
Previo a entrenar el modelo es común que se realice algún tipo de descripción de los datos, para tener una idea del tipo de problema con el que nos enfrentamos. A continuación, algunas observaciones interesantes:

In [None]:
# Cargar el conjunto de datos. Esta vez desde la base de datos
clear_flights_df = spark \
    .read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://host.docker.internal:5433/postgres") \
    .option("user", "postgres") \
    .option("password", "testPassword") \
    .option("dbtable", "tarea3") \
    .load()

clear_flights_df.show()

## Información Descriptiva de los Datos.
Se muestran histogramas y resúmenes descriptivos de algunas de las variables disponibles.

In [None]:
# Información descriptiva de alguno valores interesantes del dataframe.
clear_flights_df.describe([
    'age', 
    'flight_distance', 
    'departure_delay_minutes',
    'arrival_delay_minutes']).show()

In [None]:
# Creación de histogramas.
def print_hist(rdd_histogram_data):
    heights = np.array(rdd_histogram_data[1])
    full_bins = rdd_histogram_data[0]
    mid_point_bins = full_bins[:-1]
    widths = [abs(i - j) for i, j in zip(full_bins[:-1], full_bins[1:])]
    plt.bar(mid_point_bins, heights, width=widths, color='b')
    plt.show()

# Obtener histograma, con los parámetros de rango determinados
# y para la propiedad especificada.
def create_histogram(h_start=0, h_stop=100, h_step=5, h_prop="age"):
    """
    Params:
        h_start: Histogram start value.
        h_stop: Histogram end value.
        h_step: Iteration size.
        h_prop: Name of property to create Histogram for.
    """
    buckets = np.arange(
        start=h_start, 
        stop=h_stop, 
        step=h_step).tolist()

    rdd_histogram_data = clear_flights_df\
        .select(h_prop)\
        .rdd\
        .flatMap(lambda x: x)\
        .histogram(buckets)

    print_hist(rdd_histogram_data)

# Age Histogram.
create_histogram(0,100,5,'age')

# Flight_distance Histogram.
create_histogram(50,6951,50,'flight_distance')

# Departure Delay in Minutes Histogram.
create_histogram(0,500,20,'departure_delay_minutes')

# Arrival Delay in Minutes Histogram.
create_histogram(0,500,20,'arrival_delay_minutes')

In [None]:
# Para realizar operaciones más detalladas es necesario expresar las filas originales en vectores
assembler = VectorAssembler(
    inputCols=[
        'age', 
        'flight_distance', 
        'seat_comfort',
        'departure_arrival_time_convenient',
        'food_drink',
        'gate_location',
        'inflight_wifi_service',
        'inflight_entertainment',
        'online_support',
        'ease_of_online_booking',
        'onboard_service',
        'leg_room_service',
        'baggage_handling',
        'checkin_service',
        'cleanliness',
        'online_boarding',
        'departure_delay_minutes',
        'arrival_delay_minutes',
        'isMale',
        'isFemale',
        'isLoyalCustomer',
        'isDisloyalCustomer',
        'isBusinessTravel',
        'isPersonalTravel',
        'isBusinessClass',
        'isEcoClass',
        'isOtherClass'],
    outputCol='features')

vector_df = assembler.transform(clear_flights_df)
vector_df = vector_df.select(['features', 'isSatisfied'])
vector_df.show()

In [None]:
# Con la representación de vectores podemos calcular correlaciones
pearson_matrix = Correlation.corr(vector_df, 'features').collect()[0][0]

sns.heatmap(pearson_matrix.toArray(), annot=True, fmt=".2f", cmap='viridis')

# Estandarización
Como recordamos de los módulos anteriores es deseable que los datos se encuentren estandarizados o normalizados, para evitar que la magnitud de ciertos atributos dominen el proceso de entrenamiento. 

In [None]:
standard_scaler = StandardScaler(inputCol='features', outputCol='scaled')
scale_model = standard_scaler.fit(vector_df)

scaled_df = scale_model.transform(vector_df)
scaled_df = scaled_df.withColumnRenamed("isSatisfied", "label")
scaled_df.show()

# Clasificación Binaria (Usando K Fold Cross Validation)
Para entrenar los modelos, utilizaremos los algoritmos de Regresión Lineal

In [None]:
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = scaled_df.randomSplit([0.7, 0.3])

print("Training Data Count: ", trainingData.count())
print("Test Data Count:", testData.count())

In [None]:
# Modelo 1.

# Utilizamos regersión logística para predicción binaria.
lr = LogisticRegression(maxIter=10)
pipeline = Pipeline(stages=[lr])
grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()

evaluator = BinaryClassificationEvaluator()

cv = CrossValidator(
    estimator=lr, 
    estimatorParamMaps=grid, 
    evaluator=evaluator,
    parallelism=3)

# Run cross-validation, and choose the best set of parameters.
# Modelo 1 a utilizar.
cvModel = cv.fit(trainingData)

trainingSummary = cvModel.bestModel.summary
print("Modelo 1: Regresión Logística\n")
print("Total Iterations: ", trainingSummary.totalIterations)
print("Objective History: ", trainingSummary.objectiveHistory)

numFolds = cvModel.getNumFolds()
print("Number of folds: ", numFolds)

avgMetricsTest = cvModel.avgMetrics[0]
print("Average metrics Test (First position): ", avgMetricsTest)

evaluation = evaluator.evaluate(cvModel.transform(testData))

print('Evaluator for training data: ', evaluation)

In [None]:
# Modelo 2.

from pyspark.mllib import linalg as mllib_linalg
from pyspark.ml import linalg as ml_linalg

def as_old(v):
    if isinstance(v, ml_linalg.SparseVector):
        return mllib_linalg.SparseVector(v.size, v.indices, v.values)
    if isinstance(v, ml_linalg.DenseVector):
        return mllib_linalg.DenseVector(v.values)
    raise ValueError("Unsupported type {0}".format(type(v)))

parsedData = trainingData.rdd.map(lambda row: LabeledPoint(row.label, as_old(row.features)))

# Train a DecisionTree model.
# Empty categoricalFeaturesInfo indicates all features are continuous.
model = DecisionTree.trainClassifier(
    parsedData, 
    numClasses=2, 
    categoricalFeaturesInfo={},
    impurity='gini', 
    maxDepth=5,
    maxBins=32)

# Evaluate model on test instances and compute test error
predictions = model.predict(testData.rdd.map(lambda x: x.features))