# Apache Spark y ML: Predicción de retrasos aéreos

### Introducción al Proyecto

Este proyecto tiene como objetivo principal explorar y familiarizarse con el uso de Apache Spark como herramienta clave para el procesamiento masivo de datos, así como con el desarrollo y despliegue de proyectos en entornos de nube, específicamente utilizando Google Cloud Platform. Además, se aborda todo el proceso necesario para llevar a cabo un análisis de datos y construir un modelo de aprendizaje automático, siguiendo las buenas prácticas de la ciencia de datos, con el **objetivo** de predecir si un vuelo llegára con retraso o no.

**Objetivos del Proyecto**:
1. **Explorar Apache Spark**: Comprender y aplicar las funcionalidades de Spark para el manejo de grandes volúmenes de datos y el desarrollo de pipelines de machine learning.
2. **Desarrollo en la Nube**: Utilizar entornos como Google Cloud para gestionar y procesar los datos, demostrando la escalabilidad de las herramientas cloud en proyectos reales.
3. **Proceso de Ciencia de Datos**:
   - Preprocesar y transformar los datos, incluyendo manejo de variables categóricas, normalización y ensamblaje de características.
   - Construir y evaluar un modelo de clasificación binaria, concretamente un árbol de decisión, para predecir si un vuelo llegará con retraso o no.
   - Analizar el rendimiento del modelo mediante herramientas como la matriz de confusión.

Este proyecto integra tanto aspectos técnicos como analíticos, ofreciendo una experiencia práctica en el diseño y despliegue de proyectos que combinan Big Data, aprendizaje automático y computación en la nube.


### Leemos el fichero desde Google Cloud Storage 

- **Inicializa una sesión de Spark** en Dataproc.
- **Carga un archivo CSV** (`flights.csv`) desde un bucket de Google Cloud Storage como un DataFrame de Spark.

---

In [2]:
# Obtener la sesión de Spark preconfigurada en Dataproc
from pyspark.sql import SparkSession

# Obtener la sesión de Spark preconfigurada en Dataproc
spark = SparkSession.builder \
    .appName("Dataproc HDFS Example") \
    .getOrCreate()


# Cargado el archivo flights.csv desde google storage 

ruta_hdfs = "gs://antonio-simon-bucket-1/notebooks/jupyter/flights.csv" # Reemplaza esto por la ruta correcta del fichero flights.csv 


# Descomentar estas líneas
flightsDF = spark.read\
             .option("header", "true")\
             .option("inferSchema", "true")\
             .csv(ruta_hdfs)

:: loading settings :: url = jar:file:/usr/lib/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-7d6be03e-6dd8-4afe-b73f-a1d4542cac0b;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.1.3 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.1.3 in central
	found org.apache.kafka#kafka-clients;2.6.0 in central
	found com.github.luben#zstd-jni;1.4.8-1 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.2 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.commons#commons-pool2;2.6.2 in central
downloading https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.1.3/spark-sql-kafka-0-10_2.12-3.1.3.jar ...
	[SUCCESSFUL ] org.apache.spark#spark-sql-kafka-0-10_2.12;3.1.3!spark-sql-

Imprimimos el esquema para comprobar qué tipo de dato ha inferido en cada columna

In [3]:
flightsDF.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: string (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)



Mostramos el número de filas que tiene el DataFrame para hacernos una idea de su tamaño:

In [4]:
flightsDF.count()

                                                                                

162049

Vemos que tenemos 162049 filas. Si imprimimos por pantalla las 5 primeras filas, veremos qué tipos parecen tener y en qué columnas no coincide el tipo que podríamos esperar con el tipo que ha inferido Spark.

In [5]:
flightsDF.show(5)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|    1|  1|       1|       96|     235|       70|     AS| N508AS|   145|   PDX| ANC|     194|    1542|   0|     1|
|2014|    1|  1|       4|       -6|     738|      -23|     US| N195UW|  1830|   SEA| CLT|     252|    2279|   0|     4|
|2014|    1|  1|       8|       13|     548|       -4|     UA| N37422|  1609|   PDX| IAH|     201|    1825|   0|     8|
|2014|    1|  1|      28|       -2|     800|      -23|     US| N547UW|   466|   PDX| CLT|     251|    2282|   0|    28|
|2014|    1|  1|      34|       44|     325|       43|     AS| N762AS|   121|   SEA| ANC|     201|    1448|   0|    34|
+----+-----+---+--------+---------+-----

La causa del problema es que en muchas columnas existe un valor faltante llamado "NA". Spark no reconoce ese valor como
*no disponible* ni nada similar, sino que lo considera como un string de texto normal, y por tanto, asigna a toda la columna
el tipo de dato string (cadena de caracteres). Concretamente, las siguientes columnas deberían ser de tipo entero pero Spark
las muestra como string:
<ul>
 <li>dep_time: string (nullable = true)
 <li>dep_delay: string (nullable = true)
 <li>arr_time: string (nullable = true)
 <li>arr_delay: string (nullable = true)
 <li>air_time: string (nullable = true)
 <li>hour: string (nullable = true)
 <li>minute: string (nullable = true)    
</ul>


Vamos a averiguar cuántas filas tienen el valor "NA" (como string) en la columna dep_time:

In [6]:
from pyspark.sql import functions as F
cuantos_NA = flightsDF\
                .where(F.col("dep_time") == "NA")\
                .count()
cuantos_NA

                                                                                

857

Por tanto, hay 857 filas que no tienen un dato válido en esa columna. Hay distintas maneras de trabajar con los valores faltantes, como por ejemplo imputarlos (reemplazarlos por un valor generado por nosotros según cierta lógica, por ejemplo la media de esa columna, etc). Lo más sencillo es quitar toda la fila, aunque esto depende de si nos lo podemos permitir en base
a la cantidad de datos que tenemos. En nuestro caso, como tenemos un número considerable de filas, vamos a quitar todas las filas donde hay un NA en cualquiera de las columnas.

In [7]:
columnas_limpiar = ["dep_time", "dep_delay", "arr_time", "arr_delay", "air_time", "hour", "minute"]

flightsLimpiado = flightsDF
for nombreColumna in columnas_limpiar:  # para cada columna, nos quedamos con las filas que no tienen NA en esa columna
    flightsLimpiado = flightsLimpiado.where(F.col(nombreColumna) != "NA")

flightsLimpiado.cache()

DataFrame[year: int, month: int, day: int, dep_time: string, dep_delay: string, arr_time: string, arr_delay: string, carrier: string, tailnum: string, flight: int, origin: string, dest: string, air_time: string, distance: int, hour: string, minute: string]

Si ahora mostramos el número de filas que tiene el DataFrame `flightsLimpiado` tras eliminar todas esas filas, vemos que ha disminuido ligeramente
pero sigue siendo un número considerable como para realizar analítica y sacar conclusiones sobre estos datos

In [8]:
flightsLimpiado.count()

                                                                                

160748

Una vez que hemos eliminado los NA, vamos a convertir a tipo entero cada una de esas columnas que eran de tipo string. 
Ahora no debe haber problema ya que todas las cadenas de texto contienen dentro un número que puede ser convertido de texto a número. Vamos también a convertir la columna `arr_delay` de tipo entero a número real, necesario para los pasos posteriores donde ajustaremos un modelo predictivo.

In [9]:
from pyspark.sql.types import IntegerType, DoubleType

flightsConvertido = flightsLimpiado

for c in columnas_limpiar:
    # método que crea una columna o reemplaza una existente
    flightsConvertido = flightsConvertido.withColumn(c, F.col(c).cast(IntegerType())) 

flightsConvertido = flightsConvertido.withColumn("arr_delay", F.col("arr_delay").cast(DoubleType()))
flightsConvertido.cache()

DataFrame[year: int, month: int, day: int, dep_time: int, dep_delay: int, arr_time: int, arr_delay: double, carrier: string, tailnum: string, flight: int, origin: string, dest: string, air_time: int, distance: int, hour: int, minute: int]

In [10]:
flightsConvertido.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: integer (nullable = true)
 |-- dep_delay: integer (nullable = true)
 |-- arr_time: integer (nullable = true)
 |-- arr_delay: double (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- minute: integer (nullable = true)



Vamos a volver a mostrar las 5 primeras filas del DataFrame limpio. Aparentemente son iguales a las que ya teníamos, pero ahora
Spark sí está tratando como enteros las columnas que deberían serlo, y si queremos podemos hacer operaciones aritméticas
con ellas.

In [11]:
flightsConvertido.show(5)

[Stage 12:>                                                         (0 + 1) / 1]

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|    1|  1|       1|       96|     235|     70.0|     AS| N508AS|   145|   PDX| ANC|     194|    1542|   0|     1|
|2014|    1|  1|       4|       -6|     738|    -23.0|     US| N195UW|  1830|   SEA| CLT|     252|    2279|   0|     4|
|2014|    1|  1|       8|       13|     548|     -4.0|     UA| N37422|  1609|   PDX| IAH|     201|    1825|   0|     8|
|2014|    1|  1|      28|       -2|     800|    -23.0|     US| N547UW|   466|   PDX| CLT|     251|    2282|   0|    28|
|2014|    1|  1|      34|       44|     325|     43.0|     AS| N762AS|   121|   SEA| ANC|     201|    1448|   0|    34|
+----+-----+---+--------+---------+-----

                                                                                

### Análisis de Aeropuertos y Rutas Únicas

Para comprender mejor la estructura del conjunto de datos de vuelos almacenado en `flightsConvertido`realizamos un análisis de los aeropuertos de origen y las rutas disponibles. Los pasos seguidos son los siguientes:

1. **Identificación de Aeropuertos de Origen Únicos**:
   Es fundamental conocer cuántos aeropuertos distintos aparecen en los datos como puntos de origen. Para ello, se generará un DataFrame con los aeropuertos únicos presentes en la columna `origin` y, posteriormente, se calculará su cantidad total.

2. **Determinación de Rutas Únicas**:
   Comprender la diversidad de rutas aéreas es clave para analizar las conexiones disponibles. Para esto, se identifican las combinaciones únicas entre las columnas `origin` (aeropuerto de origen) y `dest` (aeropuerto de destino), lo que nos permitirá conocer cuántas rutas distintas existen.

Estos pasos son fundamentales para explorar y estructurar mejor el análisis del tráfico aéreo representado en este conjunto de datos.

---


In [12]:
# Identificación de aeropuertos de origen únicos
aeropuertosOrigenDF = flightsConvertido.select("origin").distinct()

# Cálculo de la cantidad total de aeropuertos de origen únicos
n_origen = aeropuertosOrigenDF.count()

# Identificación de rutas únicas entre origen y destino
rutasDistintasDF = flightsConvertido.select("origin", "dest").distinct()

# Cálculo del número total de rutas únicas
n_rutas = rutasDistintasDF.count()

# Salida de los resultados:
print(f"Número de aeropuertos de origen únicos: {n_origen}")
print(f"Número de rutas únicas: {n_rutas}")

                                                                                

Número de aeropuertos de origen únicos: 2
Número de rutas únicas: 115


### Análisis de Retrasos Medios en Aeropuertos de Destino

Para analizar los patrones de retraso en los vuelos, se planteó calcular el retraso medio de llegada en cada aeropuerto de destino, considerando únicamente los vuelos que llegan con un retraso positivo. Este análisis puede ayudar a identificar qué aeropuertos tienen, en promedio, mayores problemas con la puntualidad.

Los pasos seguidos para realizar este análisis son los siguientes:

1. **Filtrar los vuelos con retraso positivo**:
   Se consideraron únicamente los vuelos que tienen valores positivos en la columna `arr_delay`, descartando aquellos que llegaron puntuales o adelantados.

2. **Calcular el retraso medio por aeropuerto de destino**:
   Agrupando los datos por la columna `dest` (aeropuerto de destino), se calculó el promedio de los retrasos en la columna `arr_delay`. La nueva columna, que contiene el retraso medio, se denominó `retraso_medio`.

3. **Ordenar los resultados**:
   Para facilitar la interpretación, los resultados se ordenaron de mayor a menor según el valor de `retraso_medio`.

4. **Encapsular el cálculo en una función**:
   Con el objetivo de que el análisis sea reutilizable y más claro, todo el proceso fue encapsulado en una función llamada `retrasoMedio`. Esta función toma como entrada un DataFrame de Spark y devuelve un nuevo DataFrame con los cálculos realizados.

Este enfoque no solo permite identificar los aeropuertos con mayores problemas de retraso, sino que también facilita la integración de esta funcionalidad en futuros análisis.

---

In [13]:
# Definir la función para calcular el retraso medio
def retrasoMedio(df):
    """
    Calcula el retraso medio de llegada para cada aeropuerto de destino,
    considerando únicamente los vuelos con retraso positivo.
    
    :param df: DataFrame de Spark con información de vuelos.
    :return: DataFrame con el retraso medio por aeropuerto de destino, ordenado de mayor a menor.
    """
    # Filtrar los vuelos con retraso positivo
    df_retraso_positivo = df.filter(F.col("arr_delay") > 0)
    
    # Calcular el retraso medio agrupado por destino
    df_resultado = df_retraso_positivo.groupBy("dest") \
        .agg(F.avg("arr_delay").alias("retraso_medio")) \
        .orderBy(F.col("retraso_medio").desc())
    
    return df_resultado

# Aplicar la función al DataFrame flightsConvertido
retrasoMedioDF = retrasoMedio(flightsConvertido)

# Se muestran los resultados
retrasoMedioDF.show()

+----+------------------+
|dest|     retraso_medio|
+----+------------------+
| BOI|             64.75|
| HDN|              46.8|
| SFO|41.193768844221104|
| CLE| 35.74193548387097|
| SBA|35.391752577319586|
| COS| 35.05607476635514|
| BWI|34.585798816568044|
| EWR| 33.52972258916777|
| DFW| 33.27519181585678|
| MIA| 32.66187050359712|
| ORD| 32.47909024211299|
| BNA| 31.94871794871795|
| JFK|31.255884586180713|
| JAC|             30.25|
| PHL|29.245989304812834|
| OGG|27.511111111111113|
| IAD|27.430875576036865|
| HOU| 27.33009708737864|
| LGB| 27.07634730538922|
| FAT|26.852589641434264|
+----+------------------+
only showing top 20 rows



Ahora invocamos a nuestra función `retrasoMedio` pasándole como argumento `flightsConvertido`, y se muestran los tres aeropuertos con mayor retraso en minutos.

In [14]:
# Obtenemos los tres primeros aeropuertos con mayor retraso medio
top3 = retrasoMedioDF.take(3)

# Mostramos ahora el resultado de los tres aeropuertos y sus retrasos en minutos
for fila in top3:
    print(f"Aeropuerto: {fila['dest']}, Retraso medio: {fila['retraso_medio']:.2f} minutos")

Aeropuerto: BOI, Retraso medio: 64.75 minutos
Aeropuerto: HDN, Retraso medio: 46.80 minutos
Aeropuerto: SFO, Retraso medio: 41.19 minutos


## Modelo de Clasificación para Predecir Retrasos

El objetivo es ajustar un modelo de clasificación binaria utilizando un árbol de decisión (`DecisionTreeClassifier`) en Spark para predecir si un vuelo llegará con retraso. Como variables predictoras se utilizan:

- El mes (`month`).
- El día del mes (`day`).
- La hora de partida (`dep_time`).
- La hora de llegada (`arr_time`).
- El tipo de avión (`carrier`).
- La distancia (`distance`).
- El tiempo en el aire (`air_time`).

---

In [15]:
# Imports
from pyspark.ml.feature import StringIndexer 
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import Binarizer
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline
from pyspark.ml import PipelineModel

### Preparación de Variables Categóricas para el Modelo de Clasificación

En el conjunto de datos que se utilizará para ajustar un modelo de clasificación binaria, es importante distinguir entre variables numéricas y categóricas. Aunque algunas variables, como el mes (`month`) y el tipo de avión (`carrier`), están representadas como números, en realidad son variables categóricas que representan categorías discretas. Para que Spark pueda procesarlas correctamente en un modelo de aprendizaje automático, deben ser indexadas.

#### Conversión de Variables Categóricas
1. **Motivación**:
   - Las variables categóricas deben transformarse en índices numéricos para que los modelos de Spark las interpreten correctamente.
   - Esto se hace utilizando la clase `StringIndexer` de la librería `pyspark.ml.feature`.

2. **Definición del Proceso**:
   - Se crea un `StringIndexer` para la columna `month`, generando una nueva columna llamada `monthIndexed` con los valores indexados.
   - De forma similar, se crea otro `StringIndexer` para la columna `carrier`, generando la columna `carrierIndexed`.

Estas transformaciones son fundamentales para garantizar que el modelo pueda interpretar correctamente las variables categóricas durante el proceso de entrenamiento.

---


In [16]:
# Crear un StringIndexer para la columna 'month'
indexerMonth = StringIndexer(inputCol="month", outputCol="monthIndexed")

# Crear un StringIndexer para la columna 'carrier'
indexerCarrier = StringIndexer(inputCol="carrier", outputCol="carrierIndexed")

# Indagar mas en los indexed

### Creación de la Columna de Características para el Modelo

Para entrenar un modelo de clasificación en Spark, es necesario que todas las variables predictoras estén combinadas en una única columna de tipo vector. Este formato es requerido por los algoritmos de aprendizaje automático de Spark. 

#### Proceso de Ensamblado de Características
1. **Motivación**:
   - Las variables predictoras deben fusionarse en una única representación vectorial para facilitar su uso por el modelo.
   - Este paso también asegura que las columnas categóricas indexadas (`monthIndexed` y `carrierIndexed`) y las numéricas originales se combinen correctamente.

2. **Definición del Proceso**:
   - Se crea una lista con los nombres de las columnas que se utilizarán como características predictoras, excluyendo la variable objetivo `arr_delay`.
   - En esta lista se incluyen las columnas indexadas (`monthIndexed` y `carrierIndexed`) en lugar de las columnas originales (`month` y `carrier`).
   - Con la lista de columnas definida, se utiliza la clase `VectorAssembler` para ensamblar las características en una nueva columna llamada `features`.

Este paso garantiza que las variables estén listas para ser procesadas por el modelo, respetando el formato requerido por Spark.

---

In [17]:
# Definir las columnas a ensamblar como características predictoras
columnas_ensamblar = ["monthIndexed", "day", "dep_time", "arr_time", "distance", "air_time", "carrierIndexed"]

# Crear el VectorAssembler para generar la columna 'features'
vectorAssembler = VectorAssembler(inputCols=columnas_ensamblar, outputCol="features")


### Conversión de la Variable Objetivo a un Formato Binario

En este análisis, la columna `arr_delay`, que representa el retraso en minutos de cada vuelo, es una variable continua. Sin embargo, para abordar el problema como una clasificación binaria, es necesario convertir esta columna en una variable binaria que indique si un vuelo es considerado retrasado o no.

#### Proceso de Binarización
1. **Motivación**:
   - Un vuelo se considerará **retrasado** si tiene más de 15 minutos de retraso en la llegada (`arr_delay > 15`).
   - Caso contrario, será clasificado como **no retrasado**.
   - Este enfoque permite que la columna `arr_delay_binary` sea utilizada como la variable objetivo (`target`) en el modelo de clasificación.

2. **Definición del Proceso**:
   - Se utiliza la clase `Binarizer` de Spark para transformar la columna continua `arr_delay` en la columna binaria `arr_delay_binary`.
   - El umbral (`threshold`) se establece en 15 minutos.
   - Esta nueva columna será interpretada como la variable objetivo del modelo, y por ello no se incluyó en las características (`features`) ensambladas previamente.

Este paso asegura que la variable objetivo sea adecuada para el problema de clasificación binaria, alineándose con el objetivo del análisis.

---

In [18]:
# Crear el Binarizer con umbral de 15 minutos
delayBinarizer = Binarizer(inputCol="arr_delay", outputCol="arr_delay_binary", threshold=15)


### Configuración del Modelo de Clasificación

Una vez preparadas las características predictoras y la variable objetivo, el siguiente paso es definir el modelo de clasificación que se utilizará para predecir si un vuelo llegará con retraso o no. Para ello, se selecciona un árbol de decisión, un algoritmo interpretable y eficiente para problemas de clasificación.

#### Proceso de Configuración del Modelo
1. **Selección del Modelo**:
   - Se utiliza el `DecisionTreeClassifier` de la librería `pyspark.ml.classification`, que implementa árboles de decisión para clasificación binaria.

2. **Definición de las Columnas**:
   - La columna de entrada (`featuresCol`) será `features`, creada previamente con el `VectorAssembler` y que contiene todas las características predictoras combinadas.
   - La columna objetivo (`labelCol`) será `arr_delay_binary`, obtenida tras binarizar la variable continua `arr_delay`.

3. **Motivación**:
   - Los árboles de decisión son ideales para explorar patrones en los datos y entender las decisiones que toma el modelo debido a su naturaleza interpretativa.
   - Este enfoque permite abordar el problema de clasificación de forma estructurada y utilizando herramientas que maximizan la utilidad del conjunto de datos preparado.

Con esta configuración, el modelo queda listo para ser entrenado utilizando los datos procesados.

---

In [20]:
# Configurar el modelo de clasificación
decisionTree = DecisionTreeClassifier(featuresCol="features", labelCol="arr_delay_binary")

### Creación y Entrenamiento de un Pipeline de Clasificación

Para simplificar y automatizar el proceso completo de preprocesamiento, modelado y predicción, se decidió encapsular todas las etapas previas en un único pipeline de Spark. Esto permite realizar todas las transformaciones y entrenar el modelo de forma estructurada y reproducible.

#### Proceso de Creación y Entrenamiento del Pipeline
1. **Motivación**:
   - Un pipeline permite definir de manera clara y ordenada todas las etapas del flujo de trabajo, desde el preprocesamiento de datos hasta la generación del modelo.
   - Este enfoque asegura que cada etapa se aplique en el orden correcto, reduciendo el riesgo de errores y facilitando la reutilización del modelo.

2. **Definición del Pipeline**:
   - Las etapas del pipeline incluyen:
     - **Indexación de Variables Categóricas**: Uso de `indexerMonth` y `indexerCarrier` para convertir variables categóricas en índices numéricos.
     - **Ensamblaje de Características**: Uso de `vectorAssembler` para combinar las columnas predictoras en una única columna de tipo vector.
     - **Binarización de la Variable Objetivo**: Uso de `delayBinarizer` para convertir la variable continua `arr_delay` en una columna binaria `arr_delay_binary`.
     - **Modelo de Clasificación**: Uso de un árbol de decisión `decisionTree` para predecir si un vuelo tendrá retraso o no.

3. **Entrenamiento del Pipeline**:
   - El pipeline es entrenado con los datos disponibles (`flightsConvertido`), generando un modelo entrenado que combina todas las etapas.

4. **Predicción sobre los Datos de Entrenamiento**:
   - Por simplicidad, se aplica el pipeline entrenado al mismo conjunto de datos utilizado para el entrenamiento, generando predicciones que se almacenan en un nuevo DataFrame.

Este pipeline representa un flujo de trabajo automatizado y eficiente que facilita el preprocesamiento y el modelado en Spark.

---

In [22]:
# Crear el pipeline con las etapas definidas previamente
pipeline = Pipeline(stages=[indexerMonth, indexerCarrier, vectorAssembler, delayBinarizer, decisionTree])

# Entrenar el pipeline con los datos disponibles
pipelineModel = pipeline.fit(flightsConvertido)

# Aplicar el pipeline entrenado para realizar predicciones
flightsPredictions = pipelineModel.transform(flightsConvertido)

# Verificar las predicciones 
flightsPredictions.select("features", "arr_delay_binary", "prediction").show(10)


                                                                                

+--------------------+----------------+----------+
|            features|arr_delay_binary|prediction|
+--------------------+----------------+----------+
|[10.0,1.0,1.0,235...|             1.0|       1.0|
|[10.0,1.0,4.0,738...|             0.0|       0.0|
|[10.0,1.0,8.0,548...|             0.0|       0.0|
|[10.0,1.0,28.0,80...|             0.0|       0.0|
|[10.0,1.0,34.0,32...|             1.0|       1.0|
|[10.0,1.0,37.0,74...|             1.0|       0.0|
|[10.0,1.0,346.0,9...|             1.0|       0.0|
|[10.0,1.0,526.0,1...|             0.0|       0.0|
|[10.0,1.0,527.0,9...|             1.0|       0.0|
|[10.0,1.0,536.0,1...|             0.0|       0.0|
+--------------------+----------------+----------+
only showing top 10 rows



### Evaluación del Modelo: Matriz de Confusión

Para evaluar el rendimiento del modelo de clasificación, se decidió construir una matriz de confusión. Esta matriz permite comparar las predicciones generadas por el modelo con las clases verdaderas, lo que facilita identificar los aciertos y los errores cometidos.

#### Proceso de Construcción de la Matriz de Confusión
1. **Agrupación por Clases Verdaderas y Predichas**:
   - Se agrupan los datos por las columnas `arr_delay_binary` (clase verdadera) y `prediction` (clase predicha).
   - Para cada combinación, se calcula el número de casos.

2. **Interpretación**:
   - Cada fila de la matriz representa una combinación de la clase verdadera y la predicha.
   - Los casos en que la clase verdadera coincide con la predicha se consideran aciertos.
   - Las discrepancias reflejan los errores del modelo.

Esta matriz es una herramienta clave para analizar el desempeño del modelo y determinar posibles áreas de mejora.

---

In [28]:
flightsPredictions.groupBy("arr_delay_binary", "prediction").count().show()



+----------------+----------+------+
|arr_delay_binary|prediction| count|
+----------------+----------+------+
|             1.0|       1.0|  1646|
|             0.0|       1.0|   919|
|             1.0|       0.0| 22603|
|             0.0|       0.0|135580|
+----------------+----------+------+



                                                                                

### Conclusión

Este proyecto ha permitido desarrollar y evaluar un modelo de clasificación binaria para predecir si un vuelo llegará con retraso o no, utilizando un pipeline de Spark que integra todo el flujo de preprocesamiento y modelado. 

**Principales Resultados**:
- **Precisión Global**: El modelo tiene un rendimiento aceptable, con una precisión global del 85.35%.
- **Fortalezas**:
  - Identifica con alta precisión los vuelos que no llegan con retraso (99.32% de precisión en esta categoría).
- **Debilidades**:
  - Presenta dificultades para detectar vuelos retrasados, con una cobertura baja (6.78%) en esta clase.

**Áreas de Mejora**:
1. Balancear el conjunto de datos para reducir el sesgo hacia la clase mayoritaria (vuelos no retrasados).
2. Explorar algoritmos más avanzados, como Random Forest o Gradient Boosted Trees, para mejorar la capacidad predictiva.
3. Considerar la inclusión de nuevas variables predictoras o transformaciones que aporten más información al modelo.

En general, el proyecto proporciona una base sólida para analizar patrones de retraso en vuelos y explorar mejoras adicionales en futuras iteraciones.

---