# Filtrado por Particionamiento en Retrasos Aéreos

Descripción de la base de datos D
La base de datos contiene información sobre vuelos realizados en los Estados Unidos. Entre sus atributos se encuentran el mes del vuelo (Month), la aerolínea (Airline), y si el vuelo tuvo un retraso de 15 minutos o más en su salida (DepDel15). Esta base se utilizará para analizar la probabilidad de retrasos y construir conjuntos de entrenamiento y prueba basados en reglas de particionamiento.

##**Introducción**
###**Aprendizaje Supervisado**

El aprendizaje supervisado es un tipo de aprendizaje automático donde un modelo es entrenado utilizando un conjunto de datos etiquetado. El objetivo es predecir la salida de datos nuevos basándose en las etiquetas aprendidas. Algunos algoritmos representativos incluyen:

**-Decision Tree:** Un modelo de árbol que toma decisiones en función de características.

**-Random Forest:** Un conjunto de árboles de decisión que mejora la precisión y evita el sobreajuste.

**Gradient Boosted Trees (GBT):** Un algoritmo que construye árboles de decisión en secuencia, corrigiendo errores de árboles anteriores.

**Multilayer Perceptron:** Una red neuronal de múltiples capas que puede aprender relaciones complejas.

Todos estos algoritmos están disponibles en el módulo **pyspark.ml.classification.**

###**Aprendizaje No Supervisado**


El aprendizaje no supervisado es un tipo de aprendizaje automático donde el modelo es entrenado con datos que no están etiquetados. El objetivo es encontrar patrones o agrupaciones en los datos. Algunos algoritmos representativos incluyen:

**-K-Means:** Un algoritmo de agrupamiento que divide los datos en K grupos.

**-Gaussian Mixture Model:** Un modelo probabilístico que asume que los datos provienen de una mezcla de distribuciones Gaussianas.

**Principal Component Analysis (PCA):** Un método para reducir la dimensionalidad de los datos.

Estos algoritmos están disponibles en el módulo **pyspark.ml.clustering**


### Cargando el entorno de PySpark en Goolge Colab

In [None]:
#import os
#os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
#os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [9]:
!pip install findspark
import findspark
findspark.init()
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import SparkSession
#spark = SparkSession.builder.master("local[*]").getOrCreate()
spark = SparkSession.builder \
                    .master("local[*]") \
                    .appName("Ejemplo") \
                    .getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark



## Creando un Dataframe a partir de un archivo "csv" de entrada

In [10]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [11]:
df = spark.read.csv('/content/drive/MyDrive/Big Data/Copia de Combined_Flights_2022.csv', header=True, sep=",", inferSchema=True)
#Visualización limitada a los primeros 10 registros
df.show(10)

+----------+--------------------+------+----+---------+--------+----------+-------+---------------+--------+-------+---------------+-------+--------------+-----------------+--------+----+-------+-----+----------+---------+-------------------------+---------------------------------------+------------------------+---------------------------+-------------------------------+-----------------+------------------------+---------------------------+-----------+-------------------------------+---------------+------------------+------------------+--------------------+-----------+---------------+---------------+---------+-------------+----------------+----------------+-------------------+---------+-------------+-------------+-------+--------+--------------------+----------+-------+---------+--------+------+----------+--------+--------+------------------+----------+-------------+------------------+
|FlightDate|             Airline|Origin|Dest|Cancelled|Diverted|CRSDepTime|DepTime|DepDelayMinutes|De

In [12]:
# Imprimiendo en número de registros en el Dataframe
df.count()

4078318

In [13]:
# Imprimiendo el número de columnas en el Dataframe
print(len(df.columns))

61


#### Imprimiendo información sobre el tipo de datos y esquema del Dataframe

In [14]:
# Imprimiendo los tipos de datos del Dataframe
df.dtypes

[('FlightDate', 'date'),
 ('Airline', 'string'),
 ('Origin', 'string'),
 ('Dest', 'string'),
 ('Cancelled', 'boolean'),
 ('Diverted', 'boolean'),
 ('CRSDepTime', 'int'),
 ('DepTime', 'double'),
 ('DepDelayMinutes', 'double'),
 ('DepDelay', 'double'),
 ('ArrTime', 'double'),
 ('ArrDelayMinutes', 'double'),
 ('AirTime', 'double'),
 ('CRSElapsedTime', 'double'),
 ('ActualElapsedTime', 'double'),
 ('Distance', 'double'),
 ('Year', 'int'),
 ('Quarter', 'int'),
 ('Month', 'int'),
 ('DayofMonth', 'int'),
 ('DayOfWeek', 'int'),
 ('Marketing_Airline_Network', 'string'),
 ('Operated_or_Branded_Code_Share_Partners', 'string'),
 ('DOT_ID_Marketing_Airline', 'int'),
 ('IATA_Code_Marketing_Airline', 'string'),
 ('Flight_Number_Marketing_Airline', 'int'),
 ('Operating_Airline', 'string'),
 ('DOT_ID_Operating_Airline', 'int'),
 ('IATA_Code_Operating_Airline', 'string'),
 ('Tail_Number', 'string'),
 ('Flight_Number_Operating_Airline', 'int'),
 ('OriginAirportID', 'int'),
 ('OriginAirportSeqID', 'int'),

In [15]:
# Imprimiendo el esquema del Dataframe
df.printSchema()

root
 |-- FlightDate: date (nullable = true)
 |-- Airline: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Cancelled: boolean (nullable = true)
 |-- Diverted: boolean (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- DepTime: double (nullable = true)
 |-- DepDelayMinutes: double (nullable = true)
 |-- DepDelay: double (nullable = true)
 |-- ArrTime: double (nullable = true)
 |-- ArrDelayMinutes: double (nullable = true)
 |-- AirTime: double (nullable = true)
 |-- CRSElapsedTime: double (nullable = true)
 |-- ActualElapsedTime: double (nullable = true)
 |-- Distance: double (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Quarter: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- Marketing_Airline_Network: string (nullable = true)
 |-- Operated_or_Branded_Code_Share_Partners: string (nullable = true)
 |

## Encontrando Probabilidades en Conjunto

In [16]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, lit, round, sum as spark_sum

# Start Spark session
spark = SparkSession.builder.appName("JointProbabilityPivot").getOrCreate()

# STEP 1: Count combinations of Airline and DepDel15
joint_counts = df.groupBy("Month", "DepDel15").count()

# STEP 2: Get total number of rows
total_count = df.count()

# STEP 3: Calculate joint probability
joint_prob = joint_counts.withColumn("probability", col("count") / lit(total_count))

# STEP 4: Pivot the table so each DepDel15 value is a column
pivoted_prob = joint_prob.groupBy("Month").pivot("DepDel15").agg(round(spark_sum("probability"), 6))

# Rename columns for clarity
pivoted_prob = pivoted_prob \
    .withColumnRenamed("0.0", "P(DepDel15=0)") \
    .withColumnRenamed("1.0", "P(DepDel15=1)")

sorted_result = pivoted_prob.filter(col("P(DepDel15=1)").isNotNull()) \
                            .orderBy(col("P(DepDel15=1)").desc())
sorted_result.show(n = 1000,truncate=False)




+-----+--------+-------------+-------------+
|Month|null    |P(DepDel15=0)|P(DepDel15=1)|
+-----+--------+-------------+-------------+
|7    |0.002653|0.113488     |0.035586     |
|6    |0.004378|0.108008     |0.035238     |
|4    |0.003198|0.108382     |0.030706     |
|5    |0.002842|0.114389     |0.030612     |
|3    |0.002154|0.112252     |0.030394     |
|1    |0.008652|0.1041       |0.025476     |
|2    |0.005668|0.097827     |0.023996     |
+-----+--------+-------------+-------------+



In [17]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, lit, round, sum as spark_sum

# Start Spark session
spark = SparkSession.builder.appName("JointProbabilityPivot").getOrCreate()

# STEP 1: Count combinations of Airline and DepDel15
joint_counts = df.groupBy("Airline", "DepDel15").count()

# STEP 2: Get total number of rows
total_count = df.count()

# STEP 3: Calculate joint probability
joint_prob = joint_counts.withColumn("probability", col("count") / lit(total_count))

# STEP 4: Pivot the table so each DepDel15 value is a column
pivoted_prob = joint_prob.groupBy("Airline").pivot("DepDel15").agg(round(spark_sum("probability"), 6))

# Rename columns for clarity
pivoted_prob = pivoted_prob \
    .withColumnRenamed("0.0", "P(DepDel15=0)") \
    .withColumnRenamed("1.0", "P(DepDel15=1)")

sorted_result = pivoted_prob.filter(col("P(DepDel15=1)").isNotNull()) \
                            .orderBy(col("P(DepDel15=1)").desc())
sorted_result.show(n = 1000,truncate=False)




+-----------------------------------------+--------+-------------+-------------+
|Airline                                  |null    |P(DepDel15=0)|P(DepDel15=1)|
+-----------------------------------------+--------+-------------+-------------+
|Southwest Airlines Co.                   |0.004423|0.12355      |0.051494     |
|American Airlines Inc.                   |0.004476|0.091394     |0.025737     |
|Delta Air Lines Inc.                     |0.00249 |0.100681     |0.022399     |
|United Air Lines Inc.                    |0.00194 |0.066127     |0.018246     |
|SkyWest Airlines Inc.                    |0.002233|0.088733     |0.017119     |
|JetBlue Airways                          |0.001827|0.023981     |0.012638     |
|Republic Airlines                        |0.00272 |0.037218     |0.008582     |
|Spirit Air Lines                         |0.001005|0.022854     |0.007823     |
|Frontier Airlines Inc.                   |5.94E-4 |0.014144     |0.006486     |
|Comair Inc.                

In [18]:
# Caso 2: agrupando valores a partir de dos columnas
df_grouped = df.groupBy('Airline', 'Month', 'DepDel15').count()
df_grouped.show(n = 1000,truncate=False)

+-----------------------------------------+-----+--------+-----+
|Airline                                  |Month|DepDel15|count|
+-----------------------------------------+-----+--------+-----+
|Envoy Air                                |4    |0.0     |18082|
|Envoy Air                                |4    |1.0     |3372 |
|Endeavor Air Inc.                        |4    |NULL    |200  |
|Endeavor Air Inc.                        |4    |0.0     |17575|
|JetBlue Airways                          |4    |NULL    |2125 |
|Alaska Airlines Inc.                     |4    |0.0     |14662|
|Mesa Airlines Inc.                       |4    |0.0     |8824 |
|American Airlines Inc.                   |4    |NULL    |1293 |
|GoJet Airlines, LLC d/b/a United Express |4    |0.0     |3500 |
|Frontier Airlines Inc.                   |4    |0.0     |7356 |
|Allegiant Air                            |4    |NULL    |309  |
|Delta Air Lines Inc.                     |4    |1.0     |13296|
|Alaska Airlines Inc.    

## Revisión de Particionamiento

In [19]:
def filtrar_por_mes_y_aerolinea(df, mes=None, aerolinea=None):
    if mes is not None:
        df = df.filter(col("Month") == mes)
    if aerolinea is not None:
        df = df.filter(col("Airline") == aerolinea)
    return df

In [20]:
# Obtener los valores únicos de mes y aerolínea
meses = [row['Month'] for row in df.select("Month").distinct().collect()]
aerolineas = [row['Airline'] for row in df.select("Airline").distinct().collect()]

In [21]:
# Iterar sobre todas las combinaciones
for mes in meses:
    print(f"\n🔎 Filtrando: Mes = {mes}")
    df_filtrado = filtrar_por_mes_y_aerolinea(df, mes=mes, aerolinea=None)
    df_filtrado.show(5, truncate=False)


🔎 Filtrando: Mes = 4
+----------+-----------------------------------------+------+----+---------+--------+----------+-------+---------------+--------+-------+---------------+-------+--------------+-----------------+--------+----+-------+-----+----------+---------+-------------------------+---------------------------------------+------------------------+---------------------------+-------------------------------+-----------------+------------------------+---------------------------+-----------+-------------------------------+---------------+------------------+------------------+------------------------+-----------+---------------+---------------+---------+-------------+----------------+----------------+-------------------+---------+-------------+-------------+-------+--------+--------------------+----------+-------+---------+--------+------+----------+--------+--------+------------------+----------+-------------+------------------+
|FlightDate|Airline                                  |

In [22]:
for aerolinea in aerolineas:
    print(f"\n🔎 Filtrando:  Aerolínea = {aerolinea}")
    df_filtrado = filtrar_por_mes_y_aerolinea(df, mes=None, aerolinea=aerolinea)
    df_filtrado.show(5, truncate=False)


🔎 Filtrando:  Aerolínea = GoJet Airlines, LLC d/b/a United Express
+----------+----------------------------------------+------+----+---------+--------+----------+-------+---------------+--------+-------+---------------+-------+--------------+-----------------+--------+----+-------+-----+----------+---------+-------------------------+---------------------------------------+------------------------+---------------------------+-------------------------------+-----------------+------------------------+---------------------------+-----------+-------------------------------+---------------+------------------+------------------+--------------+-----------+---------------+---------------+---------+-------------+----------------+----------------+----------------+---------+-------------+--------------+-------+--------+--------------------+----------+-------+---------+--------+------+----------+--------+--------+------------------+----------+-------------+------------------+
|FlightDate|Airline  

In [23]:
# Iterar sobre todas las combinaciones
for mes in meses:
    for aerolinea in aerolineas:
        print(f"\n🔎 Filtrando: Mes = {mes}, Aerolínea = {aerolinea}")
        df_filtrado = filtrar_por_mes_y_aerolinea(df, mes=mes, aerolinea=aerolinea)
        df_filtrado.show(5, truncate=False)


🔎 Filtrando: Mes = 4, Aerolínea = GoJet Airlines, LLC d/b/a United Express
+----------+----------------------------------------+------+----+---------+--------+----------+-------+---------------+--------+-------+---------------+-------+--------------+-----------------+--------+----+-------+-----+----------+---------+-------------------------+---------------------------------------+------------------------+---------------------------+-------------------------------+-----------------+------------------------+---------------------------+-----------+-------------------------------+---------------+------------------+------------------+--------------+-----------+---------------+---------------+---------+-------------+----------------+----------------+----------------+---------+-------------+--------------+-------+--------+--------------------+----------+-------+---------+--------+------+----------+--------+--------+------------------+----------+-------------+------------------+
|FlightDate|A

#"Selección de los Datos
Para esta sección, aplicaremos muestreo estratificado proporcional basado en las variables "Airline" y "DepDel15".

In [26]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from functools import reduce # Import the standard library reduce

# Crear una sesión de Spark
spark = SparkSession.builder.appName("StratifiedSampling").getOrCreate()

# Cargar datos
# Change the file path to the correct CSV file
data = spark.read.csv("/content/drive/MyDrive/Big Data/Copia de Combined_Flights_2022.csv", header=True, inferSchema=True)

# Calcular el número total de filas en el conjunto de datos
total_count = data.count()

# Contar el número de filas por aerolínea y retraso
strata_counts = data.groupBy("Airline", "DepDel15").count()

# Definir el porcentaje de muestreo
sampling_fraction = 0.1  # 10% de muestreo

# Calcular el número de muestras que se deben tomar de cada estrato
strata_counts = strata_counts.withColumn("sample_size", (F.col("count") * sampling_fraction).cast("int"))

# Función para muestrear desde cada estrato
def stratified_sample(df, strata_cols, sample_size_col):
    strata_values = df.select(strata_cols).distinct().collect()
    sampled_dfs = []

    for row in strata_values:
        # Correctly combine filters using the bitwise AND operator
        filters = [df[col] == row[col] for col in strata_cols]
        # Use the standard library's reduce to combine the filter conditions
        combined_filter = reduce(lambda x, y: x & y, filters)
        strata_df = df.filter(combined_filter)

        # Ensure sample_size is not greater than the actual number of rows in the stratum
        current_strata_count = strata_df.count()
        # Find the sample size for the current stratum from strata_counts DataFrame
        sample_size_row = strata_counts.filter(combined_filter).select(sample_size_col).first()
        # Handle cases where the stratum might not be in strata_counts (though unlikely with distinct values)
        sample_size = sample_size_row[0] if sample_size_row else 0

        sample_size = min(sample_size, current_strata_count) # Added this line

        # Adjust fraction calculation to handle potential sample_size being 0 or greater than available rows
        fraction_to_sample = sample_size / current_strata_count if current_strata_count > 0 else 0.0
        # Ensure fraction is not greater than 1.0 (although min(1.0, ...) handles this)
        # Added .limit(sample_size) as an alternative way to sample a fixed number of rows
        sampled_strata = strata_df.sample(fraction=min(1.0, fraction_to_sample), seed=42).limit(sample_size)

        sampled_dfs.append(sampled_strata)

    # Handle the case where sampled_dfs might be empty
    if not sampled_dfs:
        return spark.createDataFrame([], df.schema) # Return an empty DataFrame with the same schema

    # Union all the sampled DataFrames
    # Use reduce with unionAll for cleaner code
    return reduce(lambda df1, df2: df1.unionAll(df2), sampled_dfs)


# Aplicar el muestreo estratificado
sampled_data = stratified_sample(data, ["Airline", "DepDel15"], "sample_size")

# Mostrar la muestra final
sampled_data.show()

+----------+---------+------+----+---------+--------+----------+-------+---------------+--------+-------+---------------+-------+--------------+-----------------+--------+----+-------+-----+----------+---------+-------------------------+---------------------------------------+------------------------+---------------------------+-------------------------------+-----------------+------------------------+---------------------------+-----------+-------------------------------+---------------+------------------+------------------+--------------------+-----------+---------------+---------------+---------+-------------+----------------+----------------+--------------------+---------+-------------+-------------+-------+--------+--------------------+----------+-------+---------+--------+------+----------+--------+--------+------------------+----------+-------------+------------------+
|FlightDate|  Airline|Origin|Dest|Cancelled|Diverted|CRSDepTime|DepTime|DepDelayMinutes|DepDelay|ArrTime|ArrDel

##Preparación de los Datos

En esta etapa, preprocesamos la muestra M para corregir valores nulos y transformar tipos de datos:

In [27]:
# Corrección de valores nulos
cleaned_data = sampled_data.na.fill({"DepDelayMinutes": 0})

# Identificación y eliminación de valores atípicos
cleaned_data = cleaned_data.filter(cleaned_data['Distance'] <= 3000)

# Transformación de tipos de datos
from pyspark.sql.types import IntegerType

cleaned_data = cleaned_data.withColumn("DepDelayMinutes", cleaned_data["DepDelayMinutes"].cast(IntegerType()))

##Preparación del Conjunto de Entrenamiento y Prueba
Dividimos la muestra M en conjuntos de entrenamiento y prueba usando una división 80/20:


In [None]:
# División de los datos en conjunto de entrenamiento y prueba
train_data, test_data = cleaned_data.randomSplit([0.8, 0.2], seed=42)
print("Entrenamiento:", train_data.count(), "Prueba:", test_data.count())

##Construcción de Modelos de Aprendizaje Supervisado y No Supervisado
Aprendizaje Supervisado
Entrenamos un modelo RandomForestClassifier:

In [None]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import VectorAssembler

# Preparar los datos de entrada
feature_columns = ['Distance', 'DayOfWeek', 'CRSDepTime']
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
train_data = assembler.transform(train_data)

# Entrenar el modelo
rf = RandomForestClassifier(labelCol="DepDel15", featuresCol="features")
model = rf.fit(train_data)

# Predicciones
predictions = model.transform(test_data)
predictions.select("DepDel15", "prediction").show()

##Aprendizaje No Supervisado
Entrenamos un modelo KMeans:

In [None]:
from pyspark.ml.clustering import KMeans

# Preparar los datos de entrada
kmeans_data = assembler.transform(cleaned_data)

# Entrenar el modelo KMeans
kmeans = KMeans(k=3, seed=1)
model = kmeans.fit(kmeans_data)

# Predicciones
predictions = model.transform(kmeans_data)
predictions.select("features", "prediction").show()

## Conclusiones

La actividad de aplicar algoritmos de aprendizaje supervisado y no supervisado utilizando PySpark ha sido exitosa, logrando implementar un modelo de Random Forest para predecir retrasos significativos en vuelos y un modelo de K-Means para identificar patrones de agrupamiento entre aerolíneas. A través del muestreo estratificado proporcional, se aseguró una representación adecuada de las diferentes categorías de aerolíneas y clases de retraso, lo que minimizó sesgos en el análisis. La preparación de datos, que incluyó la limpieza y transformación de tipos, mejoró la calidad del conjunto analizado, lo que se tradujo en resultados más fiables. Los modelos desarrollados no solo proporcionaron insights valiosos sobre el comportamiento de los vuelos, sino que también establecieron una base sólida para análisis futuros, destacando la importancia de la documentación y el ajuste de modelos para optimizar el rendimiento en aplicaciones prácticas.