# Actividad 2 - Base de datos de Big Data
## **Datos**
#### **Equipo: 79**
#### **Nombre y Matrícula**
* A01746998 - Alexys Martín Coate Reyes             
* A00821663 - Manuel Gerardo López Garza            
* A01795975 - Diego Andres Bernal Diaz              
* A00571041 - Annette Cristina Narváez Andrade		

## Parte 1 - Caracterización de la población

In [1]:
# Librerias
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, countDistinct, isnan, when, desc, avg, min, max, round, sum, lit, row_number
from functools import reduce
from pyspark.sql.window import Window

In [2]:
# Crear sesión Spark
spark = SparkSession.builder \
    .appName("EDA_Vuelos") \
    .getOrCreate()

# Leer el CSV
df = spark.read.csv("Airline_Delay_2016-2018.csv", header=True, inferSchema=True)

# Mostrar esquema de datos
df.printSchema()

# Número total de registros
total_registros = df.count()
print(f"Número total de registros: {total_registros}")

root
 |-- FL_DATE: date (nullable = true)
 |-- OP_CARRIER: string (nullable = true)
 |-- OP_CARRIER_FL_NUM: integer (nullable = true)
 |-- ORIGIN: string (nullable = true)
 |-- DEST: string (nullable = true)
 |-- CRS_DEP_TIME: integer (nullable = true)
 |-- DEP_TIME: double (nullable = true)
 |-- DEP_DELAY: double (nullable = true)
 |-- TAXI_OUT: double (nullable = true)
 |-- WHEELS_OFF: double (nullable = true)
 |-- WHEELS_ON: double (nullable = true)
 |-- TAXI_IN: double (nullable = true)
 |-- CRS_ARR_TIME: integer (nullable = true)
 |-- ARR_TIME: double (nullable = true)
 |-- ARR_DELAY: double (nullable = true)
 |-- CANCELLED: double (nullable = true)
 |-- DIVERTED: double (nullable = true)
 |-- CRS_ELAPSED_TIME: double (nullable = true)
 |-- ACTUAL_ELAPSED_TIME: double (nullable = true)
 |-- AIR_TIME: double (nullable = true)
 |-- DISTANCE: double (nullable = true)

Número total de registros: 18505725


In [3]:
# Lista de columnas
columnas = df.columns

# Mapear tipos de datos
tipo_columnas = dict(df.dtypes)
# Conteo de CANCELLED
print("=== Conteo de CANCELLED ===")
df.groupBy("CANCELLED").count().show()

# Conteo de DIVERTED
print("=== Conteo de DIVERTED ===")
df.groupBy("DIVERTED").count().show()

=== Conteo de CANCELLED ===
+---------+--------+
|CANCELLED|   count|
+---------+--------+
|      0.0|18240587|
|      1.0|  265138|
+---------+--------+

=== Conteo de DIVERTED ===
+--------+--------+
|DIVERTED|   count|
+--------+--------+
|     0.0|18461684|
|     1.0|   44041|
+--------+--------+



In [4]:
# Analizar cada columna
for columna in columnas:
    print(f"\n=== Análisis de columna: {columna} ===")
    
    tipo_dato = tipo_columnas[columna]
    
    if tipo_dato in ['double', 'float']:
        # Nulos y NaNs para numéricas flotantes
        nulos = df.filter((col(columna).isNull()) | (isnan(col(columna)))).count()
    else:
        # Solo nulos (y vacíos para strings) para otros tipos
        nulos = df.filter((col(columna).isNull()) | (col(columna) == "")).count()
    
    print(f"Valores nulos o vacíos: {nulos}")
    
    # Número de valores únicos
    distintos = df.select(columna).distinct().count()
    print(f"Valores únicos: {distintos}")
    
    # Análisis según tipo de dato
    if tipo_dato in ['int', 'bigint', 'double', 'float']:
        resumen = df.select(
            min(columna).alias("Minimo"),
            max(columna).alias("Maximo"),
            avg(columna).alias("Promedio")
        ).collect()[0]
        print(f"Min: {resumen['Minimo']}, Max: {resumen['Maximo']}, Promedio: {resumen['Promedio']:.2f}")
    
    elif tipo_dato in ['string', 'date', 'timestamp']:
        print("Top 5 valores más frecuentes:")
        top5 = df.groupBy(columna).count().orderBy(desc("count")).limit(5)
        top5.show(truncate=False)


=== Análisis de columna: FL_DATE ===
Valores nulos o vacíos: 0
Valores únicos: 1096
Top 5 valores más frecuentes:
+----------+-----+
|FL_DATE   |count|
+----------+-----+
|2018-11-25|22160|
|2018-07-13|22022|
|2018-07-20|22002|
|2018-07-27|21997|
|2018-08-03|21990|
+----------+-----+


=== Análisis de columna: OP_CARRIER ===
Valores nulos o vacíos: 0
Valores únicos: 18
Top 5 valores más frecuentes:
+----------+-------+
|OP_CARRIER|count  |
+----------+-------+
|WN        |3981440|
|DL        |2795589|
|AA        |2727661|
|OO        |2086597|
|UA        |1751113|
+----------+-------+


=== Análisis de columna: OP_CARRIER_FL_NUM ===
Valores nulos o vacíos: 0
Valores únicos: 7119
Min: 1, Max: 8402, Promedio: 2304.64

=== Análisis de columna: ORIGIN ===
Valores nulos o vacíos: 0
Valores únicos: 362
Top 5 valores más frecuentes:
+------+-------+
|ORIGIN|count  |
+------+-------+
|ATL   |1139076|
|ORD   |843495 |
|DEN   |685290 |
|DFW   |656555 |
|LAX   |648766 |
+------+-------+


=== Aná

## Parte 2 - Particionamiento

In [5]:
# Imprimiendo los 3 primeros rengloes del dataframe dataframe
df.show(3)

+----------+----------+-----------------+------+----+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+--------+----------------+-------------------+--------+--------+
|   FL_DATE|OP_CARRIER|OP_CARRIER_FL_NUM|ORIGIN|DEST|CRS_DEP_TIME|DEP_TIME|DEP_DELAY|TAXI_OUT|WHEELS_OFF|WHEELS_ON|TAXI_IN|CRS_ARR_TIME|ARR_TIME|ARR_DELAY|CANCELLED|DIVERTED|CRS_ELAPSED_TIME|ACTUAL_ELAPSED_TIME|AIR_TIME|DISTANCE|
+----------+----------+-----------------+------+----+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+--------+----------------+-------------------+--------+--------+
|2016-01-01|        DL|             1248|   DTW| LAX|        1935|  1935.0|      0.0|    23.0|    1958.0|   2107.0|   13.0|        2144|  2120.0|    -24.0|      0.0|     0.0|           309.0|              285.0|   249.0|  1979.0|
|2016-01-01|        DL|             1251|   ATL| GRR|        2125|  2130.0|     

In [6]:
# Imprime el total de columnas (total de variables de caracterización) 
print(df.columns)
print(len(df.columns))

['FL_DATE', 'OP_CARRIER', 'OP_CARRIER_FL_NUM', 'ORIGIN', 'DEST', 'CRS_DEP_TIME', 'DEP_TIME', 'DEP_DELAY', 'TAXI_OUT', 'WHEELS_OFF', 'WHEELS_ON', 'TAXI_IN', 'CRS_ARR_TIME', 'ARR_TIME', 'ARR_DELAY', 'CANCELLED', 'DIVERTED', 'CRS_ELAPSED_TIME', 'ACTUAL_ELAPSED_TIME', 'AIR_TIME', 'DISTANCE']
21


In [7]:
# Variables seleccionadas
vars_particion = ["OP_CARRIER", "ORIGIN", "DEST", "CANCELLED", "DIVERTED"]

# Imrpimiendo la cantidad de valores únicos que se tiene por las columnas seleccionadas
for col in vars_particion:
    print(col, df.select(col).distinct().count())

OP_CARRIER 18
ORIGIN 362
DEST 360
CANCELLED 2
DIVERTED 2


In [11]:
from pyspark.sql.functions import col, count, round
# Calcular frecuencia de combinaciones
combinaciones = df.groupBy(vars_particion).count()

# Total de registros
total = df.count()

# Agregar probabilidad
combinaciones = combinaciones.withColumn("probabilidad", col("count") / total)

combinaciones.show(truncate=False)

+----------+------+----+---------+--------+-----+---------------------+
|OP_CARRIER|ORIGIN|DEST|CANCELLED|DIVERTED|count|probabilidad         |
+----------+------+----+---------+--------+-----+---------------------+
|EV        |ATL   |AGS |0.0      |0.0     |2912 |1.573567098830227E-4 |
|EV        |DTW   |BWI |0.0      |0.0     |23   |1.2428586288837644E-6|
|EV        |BRO   |IAH |0.0      |0.0     |3054 |1.6503001098308766E-4|
|F9        |DEN   |MIA |0.0      |0.0     |1089 |5.884665421106171E-5 |
|WN        |BDL   |RSW |0.0      |0.0     |735  |3.971743879258986E-5 |
|WN        |BWI   |MEM |0.0      |0.0     |1348 |7.284232311892671E-5 |
|WN        |DEN   |BOS |0.0      |0.0     |2467 |1.3331009728070637E-4|
|WN        |LAX   |LAS |0.0      |0.0     |10922|5.901957367247162E-4 |
|WN        |LAX   |SAT |0.0      |0.0     |2056 |1.1110075395587041E-4|
|WN        |MCO   |CMH |0.0      |0.0     |3558 |1.922648261551493E-4 |
|WN        |PIT   |DEN |0.0      |0.0     |1307 |7.0626792519612

In [None]:
# Escribe en un archivo csv las reglas de partición 
#combinaciones_filtradas.write.csv("../Dataset/reglas_particion.csv", header=True)

In [12]:
# Sumando todas las probabilidades (Total cercano a 1)
suma_probabilidad = combinaciones.select(sum("probabilidad"))
suma_probabilidad.show()

# Calculando la media de todas las probabilidades
media_probabilidad = combinaciones.select(avg("probabilidad"))
media_probabilidad.show()

+------------------+
| sum(probabilidad)|
+------------------+
|0.9999999999999842|
+------------------+

+--------------------+
|   avg(probabilidad)|
+--------------------+
|2.989000478240029...|
+--------------------+



In [13]:
# Asignando los valores de la tabla
suma_probabilidad_value = suma_probabilidad.collect()[0][0]
media_probabilidad_value = media_probabilidad.collect()[0][0]

In [14]:
# Filtrando las probabilidades más relevantes
#combinaciones_filtradas = combinaciones.filter(col("probabilidad") > media_probabilidad_value)
#combinaciones_filtradas.show(truncate=False)

In [15]:
combs = combinaciones.select(vars_particion).collect()

for row in combs:
    cond = [df[c] == row[c] for c in vars_particion]
    particion = df.filter(reduce(lambda x, y: x & y, cond))

In [16]:
# Imprime las primeras 10 reglas de particionamiento
combs[:10]

[Row(OP_CARRIER='EV', ORIGIN='ATL', DEST='AGS', CANCELLED=0.0, DIVERTED=0.0),
 Row(OP_CARRIER='EV', ORIGIN='DTW', DEST='BWI', CANCELLED=0.0, DIVERTED=0.0),
 Row(OP_CARRIER='EV', ORIGIN='BRO', DEST='IAH', CANCELLED=0.0, DIVERTED=0.0),
 Row(OP_CARRIER='F9', ORIGIN='DEN', DEST='MIA', CANCELLED=0.0, DIVERTED=0.0),
 Row(OP_CARRIER='WN', ORIGIN='BDL', DEST='RSW', CANCELLED=0.0, DIVERTED=0.0),
 Row(OP_CARRIER='WN', ORIGIN='BWI', DEST='MEM', CANCELLED=0.0, DIVERTED=0.0),
 Row(OP_CARRIER='WN', ORIGIN='DEN', DEST='BOS', CANCELLED=0.0, DIVERTED=0.0),
 Row(OP_CARRIER='WN', ORIGIN='LAX', DEST='LAS', CANCELLED=0.0, DIVERTED=0.0),
 Row(OP_CARRIER='WN', ORIGIN='LAX', DEST='SAT', CANCELLED=0.0, DIVERTED=0.0),
 Row(OP_CARRIER='WN', ORIGIN='MCO', DEST='CMH', CANCELLED=0.0, DIVERTED=0.0)]

In [17]:
# Cantidad de reglas de particionamiento
len(combs)

33456

In [18]:
# Cantidad de renglones / filas de la tabla de probabilidades de ocurrencia
combinaciones.count()

33456

## Parte 3 - Verificación de reglas de particionamiento

In [19]:
# Filtrado y visualización de cada submuestra
for i, regla in enumerate(combs[:10]):
    filtro = df.filter(
        (col("OP_CARRIER") == regla["OP_CARRIER"]) &
        (col("ORIGIN") == regla["ORIGIN"]) &
        (col("DEST") == regla["DEST"]) &
        (col("CANCELLED") == regla["CANCELLED"]) &
        (col("DIVERTED") == regla["DIVERTED"])
    )
    print(f"\n=== Submuestra {i+1} ===")
    filtro.show(5)  # Muestra las primeras 5 filas de la submuestra


=== Submuestra 1 ===
+----------+----------+-----------------+------+----+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+--------+----------------+-------------------+--------+--------+
|   FL_DATE|OP_CARRIER|OP_CARRIER_FL_NUM|ORIGIN|DEST|CRS_DEP_TIME|DEP_TIME|DEP_DELAY|TAXI_OUT|WHEELS_OFF|WHEELS_ON|TAXI_IN|CRS_ARR_TIME|ARR_TIME|ARR_DELAY|CANCELLED|DIVERTED|CRS_ELAPSED_TIME|ACTUAL_ELAPSED_TIME|AIR_TIME|DISTANCE|
+----------+----------+-----------------+------+----+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+--------+----------------+-------------------+--------+--------+
|2016-01-01|        EV|             4960|   ATL| AGS|        1914|  1909.0|     -5.0|    14.0|    1923.0|   1955.0|    3.0|        2010|  1958.0|    -12.0|      0.0|     0.0|            56.0|               49.0|    32.0|   143.0|
|2016-01-01|        EV|             5107|   ATL| AGS|     

## Parte 4 - Técnica de muestreo a aplicar por partición

In [20]:
# Definir tamaño total de la muestra (por ejemplo, 1% del total)
tamaño_muestra_total = int(total * 0.01)

# Establecer un mínimo de registros por partición
minimo_por_particion = 0

# Calcular el tamaño de muestra por partición según su probabilidad
combinaciones_con_tamaño = combinaciones.withColumn(
    "tamaño_muestra",
    round(
        when(
            (col("probabilidad") * tamaño_muestra_total) < minimo_por_particion,
            lit(minimo_por_particion)
        ).otherwise(col("probabilidad") * tamaño_muestra_total)
    ).cast("int")
)

# Ordenar para visualizar mejor
combinaciones_con_tamaño = combinaciones_con_tamaño.orderBy(col("tamaño_muestra").desc())

combinaciones_con_tamaño.show(100, truncate=False)

# Calcular el tamaño final total de la muestra
tamaño_muestra_final = combinaciones_con_tamaño.agg({"tamaño_muestra": "sum"}).collect()[0][0]

print(f"Tamaño total esperado de la muestra final: {tamaño_muestra_final}")

+----------+------+----+---------+--------+-----+---------------------+--------------+
|OP_CARRIER|ORIGIN|DEST|CANCELLED|DIVERTED|count|probabilidad         |tamaño_muestra|
+----------+------+----+---------+--------+-----+---------------------+--------------+
|HA        |OGG   |HNL |0.0      |0.0     |28891|0.0015611925498730799|289           |
|HA        |HNL   |OGG |0.0      |0.0     |28830|0.0015578962726399534|288           |
|HA        |KOA   |HNL |0.0      |0.0     |21350|0.0011536970315942769|213           |
|HA        |HNL   |KOA |0.0      |0.0     |20673|0.0011171137580397417|207           |
|HA        |HNL   |LIH |0.0      |0.0     |19946|0.0010778286179006767|199           |
|HA        |LIH   |HNL |0.0      |0.0     |19901|0.0010753969379745998|199           |
|WN        |DAL   |HOU |0.0      |0.0     |19109|0.0010325993712756458|191           |
|WN        |HOU   |DAL |0.0      |0.0     |19131|0.0010337881925728389|191           |
|DL        |MCO   |ATL |0.0      |0.0     |

In [21]:
# Unir el tamaño de muestra a cada combinación en el dataset original
df_con_muestra = df.join(
    combinaciones_con_tamaño.select(*vars_particion, "tamaño_muestra"),
    on=vars_particion,
    how="inner"
)

# Crear ventana ordenada por fecha y hora dentro de cada combinación
ventana = Window.partitionBy(*vars_particion).orderBy("FL_DATE", "CRS_DEP_TIME")

# Enumerar vuelos por combinación (ordenados por tiempo)
df_con_muestra = df_con_muestra.withColumn("row_num", row_number().over(ventana))

# Filtrar solo los primeros N registros por combinación
muestra_final = df_con_muestra.filter(col("row_num") <= col("tamaño_muestra"))

# Mostrar la muestra final
muestra_final.show(100, truncate=False)

print(f"Tamaño total de la muestra: {muestra_final.count()}")


+----------+------+----+---------+--------+----------+-----------------+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+----------------+-------------------+--------+--------+--------------+-------+
|OP_CARRIER|ORIGIN|DEST|CANCELLED|DIVERTED|FL_DATE   |OP_CARRIER_FL_NUM|CRS_DEP_TIME|DEP_TIME|DEP_DELAY|TAXI_OUT|WHEELS_OFF|WHEELS_ON|TAXI_IN|CRS_ARR_TIME|ARR_TIME|ARR_DELAY|CRS_ELAPSED_TIME|ACTUAL_ELAPSED_TIME|AIR_TIME|DISTANCE|tamaño_muestra|row_num|
+----------+------+----+---------+--------+----------+-----------------+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+----------------+-------------------+--------+--------+--------------+-------+
|9E        |ATL   |AGS |0.0      |0.0     |2018-01-01|3296             |855         |854.0   |-1.0     |12.0    |906.0     |939.0    |3.0    |953         |942.0   |-11.0    |58.0            |48.0               |33.0    |143.0   |10          