#**Maestría en Inteligencia Artificial Aplicada**
##**Curso: Análisis de grandes volúmenes de datos. TC4034.10 **
###Tecnológico de Monterrey
###Prof: Dr. Iván Olmos Pineda

##Salvador D’Carlo Abad Guarro		A01017779


## **Actividad 3 | Aprendizaje supervisado y no supervisado**

# **1. Introducción:**

En esta actividad exploraremos dos enfoques clave del aprendizaje automático:

el aprendizaje supervisado y el no supervisado. El primero se basa en tener una “respuesta correcta” (etiqueta) para cada dato —como saber si un vuelo fue puntual o retrasado— y entrenar modelos para predecir esas respuestas. Ejemplos clásicos de algoritmos supervisados que usaremos son el árbol de decisión (DecisionTreeClassifier) y el bosque aleatorio (RandomForestClassifier), ambos disponibles en PySpark.

Por otro lado, el aprendizaje no supervisado no necesita etiquetas; se enfoca en descubrir patrones o grupos naturales en los datos, como encontrar tipos de vuelos con comportamientos similares. En este caso, usaremos el algoritmo KMeans, también implementado en PySpark.

La ventaja de usar PySpark es que permite trabajar con grandes volúmenes de datos sin que nuestra compu explote. Durante esta actividad, construiremos una muestra representativa, la prepararemos adecuadamente y aplicaremos ambos tipos de algoritmos para extraer valor de los datos. ¿La meta? Identificar patrones interesantes y poner en práctica todo lo aprendido de forma eficiente y escalable.


# 2. Selección de los datos

In [1]:
!apt install openjdk-11-jdk-headless -qq > /dev/null
!pip install pyspark==3.4.1 kagglehub findspark --quiet



[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
dataproc-spark-connect 0.7.3 requires pyspark[connect]>=3.5, but you have pyspark 3.4.1 which is incompatible.[0m[31m
[0m

In [62]:
import os
import sys
import findspark
import pyspark
import numpy as np
import pandas as pd
from pyspark.sql.functions import col, when, count, isnan
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

In [3]:
import findspark

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages "org.apache.hadoop:hadoop-aws:3.3.4" pyspark-shell'
findspark.init()

In [4]:
from pyspark.sql import SparkSession

# Crear sesión de Spark
spark = SparkSession.builder.appName("FlightDelaysAnalysis").getOrCreate()

In [26]:
import kagglehub

path = kagglehub.dataset_download("robikscube/flight-delay-dataset-20182022")
print("Path to dataset files:", path)

Path to dataset files: /kaggle/input/flight-delay-dataset-20182022


In [27]:
from pyspark.sql.functions import col

df = spark.read.parquet(f"{path}/Combined_Flights_2022.parquet")

In [28]:
print(f"Número de filas: {df.count():,}")
print(f"Número de columnas: {len(df.columns)}")

Número de filas: 4,078,318
Número de columnas: 62


In [29]:
particiones = [
    (True, 1),
    (True, 5),
    (False, 1),
    (False, 5)
]

In [30]:
muestras = []
for cancelado, dia in particiones:
    muestra = (
        df
        .filter((df["Cancelled"] == cancelado) & (df["DayOfWeek"] == dia))
        .sample(False, 0.05, seed=42)
        .limit(1000)
    )
    muestras.append(muestra)

In [31]:
muestra_M = muestras[0]
for i in range(1, len(muestras)):
    muestra_M = muestra_M.union(muestras[i])

In [32]:
print("Número total de registros en la muestra M:", muestra_M.count())
muestra_M.groupBy("Cancelled", "DayOfWeek").count().show()

Número total de registros en la muestra M: 3874
+---------+---------+-----+
|Cancelled|DayOfWeek|count|
+---------+---------+-----+
|     true|        1|  874|
|     true|        5| 1000|
|    false|        1| 1000|
|    false|        5| 1000|
+---------+---------+-----+



# 3. Preparación de los datos

In [38]:
muestra_M.printSchema()

root
 |-- FlightDate: timestamp_ntz (nullable = true)
 |-- Airline: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Cancelled: boolean (nullable = true)
 |-- Diverted: boolean (nullable = true)
 |-- CRSDepTime: long (nullable = true)
 |-- DepTime: double (nullable = true)
 |-- DepDelayMinutes: double (nullable = true)
 |-- DepDelay: double (nullable = true)
 |-- ArrTime: double (nullable = true)
 |-- ArrDelayMinutes: double (nullable = true)
 |-- AirTime: double (nullable = true)
 |-- CRSElapsedTime: double (nullable = true)
 |-- ActualElapsedTime: double (nullable = true)
 |-- Distance: double (nullable = true)
 |-- Year: long (nullable = true)
 |-- Quarter: long (nullable = true)
 |-- Month: long (nullable = true)
 |-- DayofMonth: long (nullable = true)
 |-- DayOfWeek: long (nullable = true)
 |-- Marketing_Airline_Network: string (nullable = true)
 |-- Operated_or_Branded_Code_Share_Partners: string (nullable = true)
 |-- DOT_ID

In [35]:
columnas_clave = ["CRSDepTime", "CRSElapsedTime", "Distance", "DayOfWeek", "DepTimeBlk", "Cancelled"]

In [36]:
nulos = muestra_M.select([count(when(col(c).isNull(), c)).alias(c) for c in columnas_clave])
print("Valores nulos por columna:")
nulos.show()

Valores nulos por columna:
+----------+--------------+--------+---------+----------+---------+
|CRSDepTime|CRSElapsedTime|Distance|DayOfWeek|DepTimeBlk|Cancelled|
+----------+--------------+--------+---------+----------+---------+
|         0|             0|       0|        0|         0|        0|
+----------+--------------+--------+---------+----------+---------+



No hay valores nulos en las columnas seleccionadas

In [40]:
indexador_horas = StringIndexer(inputCol="DepTimeBlk", outputCol="DepTimeBlk_index")
muestra_M = indexador_horas.fit(muestra_M).transform(muestra_M)

In [44]:
muestra_M.select("CRSDepTime", "CRSElapsedTime", "Distance", "DayOfWeek", "DepTimeBlk_index", "Cancelled").show(5)
df_muestra = muestra_M.select("CRSDepTime", "CRSElapsedTime", "Distance", "DayOfWeek", "DepTimeBlk_index", "Cancelled")

+----------+--------------+--------+---------+----------------+---------+
|CRSDepTime|CRSElapsedTime|Distance|DayOfWeek|DepTimeBlk_index|Cancelled|
+----------+--------------+--------+---------+----------------+---------+
|      1600|         135.0|   696.0|        1|            10.0|     true|
|      1850|         135.0|   849.0|        1|             4.0|     true|
|      2135|          55.0|   192.0|        1|            15.0|     true|
|      1830|         125.0|   666.0|        1|             4.0|     true|
|      1855|         170.0|   955.0|        1|             4.0|     true|
+----------+--------------+--------+---------+----------------+---------+
only showing top 5 rows



In [45]:
df_muestra.show(5)

+----------+--------------+--------+---------+----------------+---------+
|CRSDepTime|CRSElapsedTime|Distance|DayOfWeek|DepTimeBlk_index|Cancelled|
+----------+--------------+--------+---------+----------------+---------+
|      1600|         135.0|   696.0|        1|            10.0|     true|
|      1850|         135.0|   849.0|        1|             4.0|     true|
|      2135|          55.0|   192.0|        1|            15.0|     true|
|      1830|         125.0|   666.0|        1|             4.0|     true|
|      1855|         170.0|   955.0|        1|             4.0|     true|
+----------+--------------+--------+---------+----------------+---------+
only showing top 5 rows



In [46]:
df_muestra = df_muestra.withColumn("label", when(col("Cancelled") == True, 1).otherwise(0))
df_muestra.show(5)

+----------+--------------+--------+---------+----------------+---------+-----+
|CRSDepTime|CRSElapsedTime|Distance|DayOfWeek|DepTimeBlk_index|Cancelled|label|
+----------+--------------+--------+---------+----------------+---------+-----+
|      1600|         135.0|   696.0|        1|            10.0|     true|    1|
|      1850|         135.0|   849.0|        1|             4.0|     true|    1|
|      2135|          55.0|   192.0|        1|            15.0|     true|    1|
|      1830|         125.0|   666.0|        1|             4.0|     true|    1|
|      1855|         170.0|   955.0|        1|             4.0|     true|    1|
+----------+--------------+--------+---------+----------------+---------+-----+
only showing top 5 rows



In [47]:
ensamblador = VectorAssembler(
    inputCols=["CRSDepTime", "CRSElapsedTime", "Distance", "DayOfWeek", "DepTimeBlk_index"],
    outputCol="features"
)
df_final = ensamblador.transform(df_muestra)

In [49]:
df_final= df_final.select("features", "label")
df_final.show(5)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[1600.0,135.0,696...|    1|
|[1850.0,135.0,849...|    1|
|[2135.0,55.0,192....|    1|
|[1830.0,125.0,666...|    1|
|[1855.0,170.0,955...|    1|
+--------------------+-----+
only showing top 5 rows



In [55]:
df_final.groupBy("label").count().show()

+-----+-----+
|label|count|
+-----+-----+
|    1| 1874|
|    0| 2000|
+-----+-----+



# 4. Preparación del conjunto de entrenamiento y prueba

In [50]:
train_data, test_data = df_final.randomSplit([0.8, 0.2], seed=42)

In [51]:
print("Tamaño del conjunto de entrenamiento:", train_data.count())
print("Tamaño del conjunto de prueba:", test_data.count())

Tamaño del conjunto de entrenamiento: 3136
Tamaño del conjunto de prueba: 738


Se eligió una división del 80% para entrenamiento y 20% para prueba porque la muestra contiene registros limitados (~3,800), y es importante asegurar que el modelo tenga suficiente información para aprender sin sacrificar evaluaciones confiables. Dado que el evento de cancelación es poco frecuente en la base general, esta proporción permite mantener ejemplos positivos y negativos en ambas particiones. Además, usar randomSplit con semilla fija asegura reproducibilidad y evita sesgos por ordenamiento accidental en los datos.

# 5. Construcción de modelos de aprendizaje supervisado y no supervisado

# 5a. Construcción de modelos de aprendizaje supervisado

In [57]:
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=100, maxDepth=5, seed=42)

In [58]:
modelo_rf = rf.fit(train_data)

In [59]:
predicciones = modelo_rf.transform(test_data)

In [60]:
evaluator = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")
roc_auc = evaluator.evaluate(predicciones)

In [61]:
print(f"AUC ROC: {roc_auc:.4f}")

AUC ROC: 0.6892


El modelo fue entrenado con 3,136 registros y evaluado sobre 738 vuelos. Usando RandomForestClassifier se logró un AUC-ROC que indica qué tan bien el modelo separa los vuelos cancelados de los no cancelados. Esta métrica es adecuada porque no se ve afectada por un leve desbalance. Dado que los datos tienen suficiente diversidad en variables horarias, duración del vuelo y día de la semana, el modelo logra captar patrones que anticipan cancelaciones.

El resultado fue de 69% la cual es una métrica regular lo cual indica que al haber un vuelo, el modelo puede predecir el resultado correctamente con tan solo un 69% de eficacia, esta métrica pudo haber mejorado si no se descartaban tantas columnas del df original, sin embargo para fines prácticos se busco utilizar menos columnas para eficientizar recursos computacionales.

# 5b. Construcción de modelos de aprendizaje no supervisado

In [63]:
kmeans = KMeans(featuresCol='features', k=3, seed=42)

In [64]:
modelo_kmeans = kmeans.fit(df_final)

In [65]:
predicciones_kmeans = modelo_kmeans.transform(df_final)

In [66]:
evaluador = ClusteringEvaluator()
silhouette = evaluador.evaluate(predicciones_kmeans)

In [67]:
print(f"Silhouette Score: {silhouette:.4f}")

Silhouette Score: 0.6311


El modelo no supervisado se entrenó con los 3,874 vuelos de la muestra M usando el algoritmo KMeans, que agrupa los vuelos en diferentes grupos según sus características. No usamos la columna de cancelación, solo datos como la hora de salida, duración del vuelo, distancia y el día de la semana, lo cual permite encontrar vuelos que se parecen entre sí.

El resultado fue un Silhouette Score de 0.6311, lo cual indica que los grupos que se formaron están bien separados y que los vuelos dentro de cada grupo son parecidos entre ellos. Aunque no se usó la variable de cancelación, este análisis puede ayudar a encontrar grupos de vuelos que tengan más riesgo de ser cancelados. Se usaron pocas columnas para que el análisis fuera más rápido y ligero, aunque eso pudo afectar un poco la precisión de los grupos.