### 1. Construcción de la muestra M.

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [None]:
import findspark
findspark.init()
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

In [None]:
path_file = 'sample_data/Combined_Flights_2022.csv'

In [None]:
df = spark.read.csv(path_file, header=True, sep=",", inferSchema=True)
df.show(5)

+----------+--------------------+------+----+---------+--------+----------+-------+---------------+--------+-------+---------------+-------+--------------+-----------------+--------+----+-------+-----+----------+---------+-------------------------+---------------------------------------+------------------------+---------------------------+-------------------------------+-----------------+------------------------+---------------------------+-----------+-------------------------------+---------------+------------------+------------------+--------------------+-----------+---------------+---------------+---------+-------------+----------------+----------------+-------------------+---------+-------------+-------------+-------+--------+--------------------+----------+-------+---------+--------+------+----------+--------+--------+------------------+----------+-------------+------------------+
|FlightDate|             Airline|Origin|Dest|Cancelled|Diverted|CRSDepTime|DepTime|DepDelayMinutes|De

In [None]:
print("Número de registros: " + str(df.count()))
print("Número de columnas: " + str(len(df.columns)))

Número de registros: 190348
Número de columnas: 61


In [None]:
#Se visualiza esquema del archivo importado a DataFrame
df.printSchema()

root
 |-- FlightDate: string (nullable = true)
 |-- Airline: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Cancelled: boolean (nullable = true)
 |-- Diverted: boolean (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- DepTime: double (nullable = true)
 |-- DepDelayMinutes: double (nullable = true)
 |-- DepDelay: double (nullable = true)
 |-- ArrTime: double (nullable = true)
 |-- ArrDelayMinutes: double (nullable = true)
 |-- AirTime: double (nullable = true)
 |-- CRSElapsedTime: double (nullable = true)
 |-- ActualElapsedTime: double (nullable = true)
 |-- Distance: double (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Quarter: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- Marketing_Airline_Network: string (nullable = true)
 |-- Operated_or_Branded_Code_Share_Partners: string (nullable = true)


## Proceso de limpieza
Se descarta, ya que desde el inicio se buscó ontener un dataset de calidad , como esto me refiero a que no existieran datos faltantes o
nulos, y no existen registros duplicados, por mencionar algunas acciones que se tuvieran que aplicar

In [None]:
# Limpieza de datos

#Se eliminan columnas con valores nulos/faltantes
df_clean = df.na.drop()

In [None]:
#Empaquetado de descriptores y generación de columna de vector con RFormula
#haciendo uso de la variable de caracterización: Airline como objetivo de aprendizaje
#Se generan nuevas columnas: feature  y label (columna de aprendizaje)
from pyspark.ml.feature import RFormula
rForm = RFormula(formula="Airline ~ .")
#Se hace uso del módulo fit para obtener el ensamble
fittedRF = rForm.fit(df_clean)
preparedDF_a = fittedRF.transform(df_clean)
preparedDF_a.show(5)

+----------+--------------------+------+----+---------+--------+----------+-------+---------------+--------+-------+---------------+-------+--------------+-----------------+--------+----+-------+-----+----------+---------+-------------------------+---------------------------------------+------------------------+---------------------------+-------------------------------+-----------------+------------------------+---------------------------+-----------+-------------------------------+---------------+------------------+------------------+--------------------+-----------+---------------+---------------+---------+-------------+----------------+----------------+-------------------+---------+-------------+-------------+-------+--------+--------------------+----------+-------+---------+--------+------+----------+--------+--------+------------------+----------+-------------+------------------+--------------------+-----+
|FlightDate|             Airline|Origin|Dest|Cancelled|Diverted|CRSDepTime

In [None]:
#Se visualiza plan de transformación
preparedDF_a.explain()

== Physical Plan ==
*(1) Project [FlightDate#16, Airline#17, Origin#18, Dest#19, Cancelled#20, Diverted#21, CRSDepTime#22, DepTime#23, DepDelayMinutes#24, DepDelay#25, ArrTime#26, ArrDelayMinutes#27, AirTime#28, CRSElapsedTime#29, ActualElapsedTime#30, Distance#31, Year#32, Quarter#33, Month#34, DayofMonth#35, DayOfWeek#36, Marketing_Airline_Network#37, Operated_or_Branded_Code_Share_Partners#38, DOT_ID_Marketing_Airline#39, ... 39 more fields]
+- *(1) Project [FlightDate#16, Airline#17, Origin#18, Dest#19, Cancelled#20, Diverted#21, CRSDepTime#22, DepTime#23, DepDelayMinutes#24, DepDelay#25, ArrTime#26, ArrDelayMinutes#27, AirTime#28, CRSElapsedTime#29, ActualElapsedTime#30, Distance#31, Year#32, Quarter#33, Month#34, DayofMonth#35, DayOfWeek#36, Marketing_Airline_Network#37, Operated_or_Branded_Code_Share_Partners#38, DOT_ID_Marketing_Airline#39, ... 38 more fields]
   +- *(1) Project [FlightDate#16, Airline#17, Origin#18, Dest#19, Cancelled#20, Diverted#21, CRSDepTime#22, DepTime#23

In [None]:
#Se genera muestra representativa de la población bajo el muestreo estratificado
# Se añade una nueva columna:stratum que combina los valores de concatenación de las variables de caracterización: Airline y DepDel15
from pyspark.sql.functions import concat, lit
preparedDF_a_stratum = preparedDF_a.withColumn("stratum", concat(preparedDF_a["Airline"], lit("_"), preparedDF_a["DepDel15"]))
preparedDF_a_stratum.show(5)

+----------+--------------------+------+----+---------+--------+----------+-------+---------------+--------+-------+---------------+-------+--------------+-----------------+--------+----+-------+-----+----------+---------+-------------------------+---------------------------------------+------------------------+---------------------------+-------------------------------+-----------------+------------------------+---------------------------+-----------+-------------------------------+---------------+------------------+------------------+--------------------+-----------+---------------+---------------+---------+-------------+----------------+----------------+-------------------+---------+-------------+-------------+-------+--------+--------------------+----------+-------+---------+--------+------+----------+--------+--------+------------------+----------+-------------+------------------+--------------------+-----+--------------------+
|FlightDate|             Airline|Origin|Dest|Cancelle

In [None]:
#Se imprime esquema para visualizar la nueva columna y el tipo de dato
preparedDF_a_stratum.printSchema()

root
 |-- FlightDate: string (nullable = true)
 |-- Airline: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Cancelled: boolean (nullable = true)
 |-- Diverted: boolean (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- DepTime: double (nullable = true)
 |-- DepDelayMinutes: double (nullable = true)
 |-- DepDelay: double (nullable = true)
 |-- ArrTime: double (nullable = true)
 |-- ArrDelayMinutes: double (nullable = true)
 |-- AirTime: double (nullable = true)
 |-- CRSElapsedTime: double (nullable = true)
 |-- ActualElapsedTime: double (nullable = true)
 |-- Distance: double (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Quarter: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- Marketing_Airline_Network: string (nullable = true)
 |-- Operated_or_Branded_Code_Share_Partners: string (nullable = true)


In [None]:
# Se calcula la cantidad de instancias por cada valor diferente de las variables Airline, DepDel15 y combinada
stratum_counts_airline = preparedDF_a_stratum.groupBy("Airline").count().collect()
stratum_counts_depDel15 = preparedDF_a_stratum.groupBy("DepDel15").count().collect()
stratum_counts_combined_a = preparedDF_a_stratum.groupBy("Airline", "DepDel15").count().collect()
stratum_counts_combined = preparedDF_a_stratum.groupBy("stratum").count().collect()

print(stratum_counts_airline)
print(stratum_counts_depDel15)
print(stratum_counts_combined_a)
print(stratum_counts_combined)

[Row(Airline='GoJet Airlines, LLC d/b/a United Express', count=394), Row(Airline='Endeavor Air Inc.', count=19999), Row(Airline='Air Wisconsin Airlines Corp', count=774), Row(Airline='Southwest Airlines Co.', count=100763), Row(Airline='Commutair Aka Champlain Enterprises, Inc.', count=642), Row(Airline='Mesa Airlines Inc.', count=10235), Row(Airline='American Airlines Inc.', count=24443), Row(Airline='Republic Airlines', count=29264)]
[Row(DepDel15=0.0, count=139889), Row(DepDel15=1.0, count=46625)]
[Row(Airline='GoJet Airlines, LLC d/b/a United Express', DepDel15=0.0, count=298), Row(Airline='Republic Airlines', DepDel15=0.0, count=24443), Row(Airline='Commutair Aka Champlain Enterprises, Inc.', DepDel15=0.0, count=513), Row(Airline='Southwest Airlines Co.', DepDel15=1.0, count=32816), Row(Airline='Air Wisconsin Airlines Corp', DepDel15=1.0, count=70), Row(Airline='Mesa Airlines Inc.', DepDel15=1.0, count=1432), Row(Airline='Southwest Airlines Co.', DepDel15=0.0, count=67947), Row(Ai

In [None]:
# Se calcula la probabilidad del conjunto: test de cada bin de acuerdo al porcentaje de division a usar (70 - 30)
total_count = preparedDF_a_stratum.count()
# Se calcula la fracción del test
stratum_fractions = {row["stratum"]: 0.3 * (row["count"] / total_count)
                     for row in startum_count_clean}
print(stratum_fractions)

{'Southwest Airlines Co._1.0': 0.05278316909186442, 'Mesa Airlines Inc._1.0': 0.0023033123518877937, 'American Airlines Inc._1.0': 0.0077222085205400125, 'American Airlines Inc._0.0': 0.03159333883783522, 'Southwest Airlines Co._0.0': 0.10928991925539101, 'Air Wisconsin Airlines Corp_1.0': 0.00011259208424032512, 'Commutair Aka Champlain Enterprises, Inc._0.0': 0.000825139131646954, 'Republic Airlines_0.0': 0.03931554735837524, 'Air Wisconsin Airlines Corp_0.0': 0.0011323546757884128, 'Endeavor Air Inc._1.0': 0.003956807531874283, 'Republic Airlines_1.0': 0.00775437768746582, 'Endeavor Air Inc._0.0': 0.0282107509355866, 'Mesa Airlines Inc._0.0': 0.014159258822394029, 'GoJet Airlines, LLC d/b/a United Express_1.0': 0.00015441200124387445, 'GoJet Airlines, LLC d/b/a United Express_0.0': 0.00047932058719452693, 'Commutair Aka Champlain Enterprises, Inc._1.0': 0.0002074911266714563}


### 2. Construcción del conjunto de entrenamiento y prueba.

In [None]:
# Se generan los conjuntos a partir de muestreo estratificado
test_data_stratum = preparedDF_a_stratum.sampleBy("stratum", fractions=stratum_fractions, seed=42)
train_data_stratum = preparedDF_a_stratum.exceptAll(test_data_stratum)

print(f"""Existen {train_data_stratum.count()} instancias en el conjunto train, y {test_data_stratum.count()} en el conjunto test""")

train_data_stratum.show(5)
test_data_stratum.show(5)

Existen 175186 instancias en el conjunto train, y 11328 en el conjunto test
+----------+--------------------+------+----+---------+--------+----------+-------+---------------+--------+-------+---------------+-------+--------------+-----------------+--------+----+-------+-----+----------+---------+-------------------------+---------------------------------------+------------------------+---------------------------+-------------------------------+-----------------+------------------------+---------------------------+-----------+-------------------------------+---------------+------------------+------------------+----------------+-----------+---------------+---------------+---------+-------------+----------------+----------------+--------------+---------+-------------+-------------+-------+--------+--------------------+----------+-------+---------+--------+------+----------+--------+--------+------------------+----------+-------------+------------------+--------------------+-----+--------

### 3. Selección de métricas para medir calidad de resultados

Con el objetivo de medir y determinar la eficacia y la idoneidad del modelo entrenado, se seleccionaron las siguientes metricas de evaluación para el clasificador multiclase que es el tema de estudio a predecir:



1. Precisión: Para determininar la proporción de predicciones correctas donde el módelo predice verdaderos positivos.
2. Accuracy: Porcentaje de predicciones correctas para los datos de prueba, se calcula como la proporción de verdaderos positivos y verdaderos negativos y la suma del número total de predicciones.

### 4. Entrenamiento de Modelos de Aprendizaje

In [None]:
# Modelo de regresión logística,
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=30)


In [None]:
from pyspark.ml.tuning import ParamGridBuilder
# Se define la rejilla para la búsqueda de parametros con Regresión Logística
params = ParamGridBuilder() \
    .addGrid(modelLR.regParam, [0.1, 2.0]) \
    .addGrid(modelLR.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

In [None]:
#Se entrena el modelo con el conjunto de datos de entrenamiento
modelLR = lr.fit(train_data_stratum)

In [None]:
#Evaluación del módelo

#Evaluator for Multiclass Classification
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

#Evaluación del módelo haciendo uso de las métricas definidas en apartado 3
#Precision
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
presicion = evaluator.evaluate(modelLR.transform(test_data_stratum))

print("Accuracy for train_data_stratum: %.3f" % presicion)

Accuracy for train_data_stratum: 1.000


In [None]:
#Evaluación del módelo haciendo uso de las métricas definidas en apartado 3
#Accurancy

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(modelLR.transform(test_data_stratum))

print("Accuracy for train_data_stratum: %.3f" % accuracy)

Accuracy for train_data_stratum: 1.000


### 5. Análisis de resultados

Según los resultados obtendidos para las metricas utlizadas: accurancy y presición en las cuales se obtuvo el valor de 1.0, se realizará una interpretación enfocada al valor obtenido  y al tema de estudio que es predecir los retrasos de los vuelos para las diferentes aerolineas en Estados Unidos

Al observar la métrica: accurancy con su respectivo resultado, podemos deducir que el modelo predice correctamente los retrasos de las aerolineas, aunado a esta métrica también se evalúo la métrica: precisión, para la cual se obtuvo un valor equivalente a 1.0 y refleja que todas las predicciones clasificadas como positivas fueron verdaderos positivos, lo que significa que no hubo falsos positivos.

Como conclusión el procesamiento de los datos, la limpieza de los mismos, la selección de la muestra estratificada  así como el modelo seleccionado para este caso de estudio, refleja un modelo entrenado capaz de predecir de manera óptima con un buen desempeño.