# Ejercicio: Pipeline de Datos de Vuelos

Caso: Una aerolínea quiere analizar sus retrasos y puntualidad. Necesita un flujo que tome datos crudos de vuelos, los limpie, los transforme y los cargue en un formato optimizado para análisis.

## Objetivo

Construir un pipeline ETL en PySpark que:

1. Extraiga datos desde CSV/Parquet.
2. Transforme:
    - Limpie nulos.
    - Calcule si el vuelo fue puntual o retrasado (on_time = 1 si arrival_delay <= 15, else 0).
    - Agregue métricas por aerolínea y origen.
3. Cargue resultados en formato Parquet particionado por date.

Generado por: ChatGPT 5.0

In [None]:
# Importacion de librerias
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, when

In [31]:
# Definicion de rutas
dataset_jan = "../data/Jan_2020_ontime.csv"
dataset_feb = "../data/Feb_2020_ontime.csv"
dataset_mar = "../data/March_2020_ontime.csv"
output_path = "../output/flights.parquet"

In [32]:
# Inicializacion de Spark
spark = (
	SparkSession.builder
	.appName("FlightPipeline")
	.getOrCreate()
)

In [33]:
# Importacion de datos fuente
df_jan = spark.read.csv(dataset_jan, header=True, inferSchema=True)
df_feb = spark.read.csv(dataset_feb, header=True, inferSchema=True)
df_mar = spark.read.csv(dataset_mar, header=True, inferSchema=True)

In [34]:
# Visualizacion de datos
df_jan.show(5)

+------------+-----------+-----------------+---------------------+----------+--------+-----------------+-----------------+---------------------+------+---------------+-------------------+----+--------+---------+------------+--------+---------+---------+--------+--------+----+
|DAY_OF_MONTH|DAY_OF_WEEK|OP_UNIQUE_CARRIER|OP_CARRIER_AIRLINE_ID|OP_CARRIER|TAIL_NUM|OP_CARRIER_FL_NUM|ORIGIN_AIRPORT_ID|ORIGIN_AIRPORT_SEQ_ID|ORIGIN|DEST_AIRPORT_ID|DEST_AIRPORT_SEQ_ID|DEST|DEP_TIME|DEP_DEL15|DEP_TIME_BLK|ARR_TIME|ARR_DEL15|CANCELLED|DIVERTED|DISTANCE|_c21|
+------------+-----------+-----------------+---------------------+----------+--------+-----------------+-----------------+---------------------+------+---------------+-------------------+----+--------+---------+------------+--------+---------+---------+--------+--------+----+
|           1|          3|               EV|                20366|        EV|  N48901|             4397|            13930|              1393007|   ORD|          11977|  

In [35]:
# Union de los datasets para obtener el dataset del primer trimestre del año 2020
df_flights = df_jan.union(df_feb).union(df_mar)
df_flights.show(5)

+------------+-----------+-----------------+---------------------+----------+--------+-----------------+-----------------+---------------------+------+---------------+-------------------+----+--------+---------+------------+--------+---------+---------+--------+--------+----+
|DAY_OF_MONTH|DAY_OF_WEEK|OP_UNIQUE_CARRIER|OP_CARRIER_AIRLINE_ID|OP_CARRIER|TAIL_NUM|OP_CARRIER_FL_NUM|ORIGIN_AIRPORT_ID|ORIGIN_AIRPORT_SEQ_ID|ORIGIN|DEST_AIRPORT_ID|DEST_AIRPORT_SEQ_ID|DEST|DEP_TIME|DEP_DEL15|DEP_TIME_BLK|ARR_TIME|ARR_DEL15|CANCELLED|DIVERTED|DISTANCE|_c21|
+------------+-----------+-----------------+---------------------+----------+--------+-----------------+-----------------+---------------------+------+---------------+-------------------+----+--------+---------+------------+--------+---------+---------+--------+--------+----+
|           1|          3|               EV|                20366|        EV|  N48901|             4397|            13930|              1393007|   ORD|          11977|  

In [36]:
def data_quality_check(df):
    # Conteo de registros
    record_count = df.count()

    # Validacion de nulos
    null_counts = df.select(*[sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in df.columns])
    expr_string = "stack({0}, {1}) as (column_name, null)".format(
    len(null_counts.columns),
    ", ".join([f"'{c}', {c}" for c in null_counts.columns])
    )
    null_counts = null_counts.selectExpr(expr_string)

    # Validacion de vacios
    blanks_counts = df.select(*[sum(when(col(c) == '', 1).otherwise(0)).alias(c) for c in df.columns])
    expr_string = "stack({0}, {1}) as (column_name, blacks)".format(
    len(blanks_counts.columns),
    ", ".join([f"'{c}', {c}" for c in blanks_counts.columns])
    )
    blanks_counts = blanks_counts.selectExpr(expr_string)

    # Validacion de duplicados
    if df.count() - df.dropDuplicates().count() == 0:
        duplicate_count = "Sin duplicados"
    else:
        duplicate_count ="Con duplicados"

    # Tipos de datos
    data_types = df = spark.createDataFrame(df.dtypes, ["column_name", "data_type"])

    # Resumen de calidad de datos
    df_quality = null_counts.join(blanks_counts, on = 'column_name', how = 'left').join(data_types, on = 'column_name', how = 'left')
    df_quality = df_quality.withColumnRenamed('column_name', 'columna').withColumn('null_percent', round((col('null')/record_count)*100,2)).withColumn('blanks_percent', round((col('blacks')/record_count)*100,2)).withColumn('duplicados', lit(duplicate_count)).withColumn('record_count', lit(record_count) )

    df_quality = df_quality.select('columna','data_type','null','null_percent','blacks','blanks_percent','duplicados','record_count').sort('columna','null')

    return df_quality.show(20)

In [37]:
# Validacion de calida de datos antes de tranformaciones
data_quality_check(df_flights)

+--------------------+---------+------+------------+------+--------------+--------------+------------+
|             columna|data_type|  null|null_percent|blacks|blanks_percent|    duplicados|record_count|
+--------------------+---------+------+------------+------+--------------+--------------+------------+
|           ARR_DEL15|   double|125259|        6.85|     0|           0.0|Sin duplicados|     1829847|
|            ARR_TIME|      int|122723|        6.71|     0|           0.0|Sin duplicados|     1829847|
|           CANCELLED|   double|     0|         0.0|     0|           0.0|Sin duplicados|     1829847|
|        DAY_OF_MONTH|      int|     0|         0.0|     0|           0.0|Sin duplicados|     1829847|
|         DAY_OF_WEEK|      int|     0|         0.0|     0|           0.0|Sin duplicados|     1829847|
|           DEP_DEL15|   double|121610|        6.65|     0|           0.0|Sin duplicados|     1829847|
|            DEP_TIME|      int|121560|        6.64|     0|           0.0

In [38]:
# Se realiza un query sql para validar una muestra de datos donde DEP_DEL15 y ARR_DEL15 son nulos
spark.sql('select DEP_TIME, DEP_DEL15, ARR_TIME, ARR_DEL15, CANCELLED, count(1) from {df_flights} where (ARR_DEL15 is null or DEP_DEL15 is null ) group by ARR_DEL15, ARR_TIME, DEP_DEL15, DEP_TIME, CANCELLED order by count(1) desc', df_flights=df_flights).show()

#Estos parecen ser irrelevantes para la tarea que buscamos

+--------+---------+--------+---------+---------+--------+
|DEP_TIME|DEP_DEL15|ARR_TIME|ARR_DEL15|CANCELLED|count(1)|
+--------+---------+--------+---------+---------+--------+
|    NULL|     NULL|    NULL|     NULL|      1.0|  121560|
|    1016|      0.0|    NULL|     NULL|      1.0|       4|
|     557|      0.0|    NULL|     NULL|      1.0|       4|
|     953|      0.0|    NULL|     NULL|      1.0|       4|
|     909|      0.0|    NULL|     NULL|      1.0|       3|
|     946|      1.0|    NULL|     NULL|      0.0|       3|
|    1307|      1.0|    NULL|     NULL|      1.0|       3|
|    1342|      0.0|    NULL|     NULL|      0.0|       3|
|    1545|      0.0|    NULL|     NULL|      1.0|       3|
|    2105|      1.0|    NULL|     NULL|      1.0|       3|
|     555|      0.0|    NULL|     NULL|      1.0|       3|
|     553|      0.0|    NULL|     NULL|      1.0|       3|
|     640|      0.0|    NULL|     NULL|      1.0|       3|
|     825|      0.0|    NULL|     NULL|      1.0|       

In [39]:
# Tranformacion: Limpieza de nulos
df_na_clean = df_flights.na.drop(subset=['ARR_DEL15','DEP_DEL15'] )
df_na_clean.show(5)

+------------+-----------+-----------------+---------------------+----------+--------+-----------------+-----------------+---------------------+------+---------------+-------------------+----+--------+---------+------------+--------+---------+---------+--------+--------+----+
|DAY_OF_MONTH|DAY_OF_WEEK|OP_UNIQUE_CARRIER|OP_CARRIER_AIRLINE_ID|OP_CARRIER|TAIL_NUM|OP_CARRIER_FL_NUM|ORIGIN_AIRPORT_ID|ORIGIN_AIRPORT_SEQ_ID|ORIGIN|DEST_AIRPORT_ID|DEST_AIRPORT_SEQ_ID|DEST|DEP_TIME|DEP_DEL15|DEP_TIME_BLK|ARR_TIME|ARR_DEL15|CANCELLED|DIVERTED|DISTANCE|_c21|
+------------+-----------+-----------------+---------------------+----------+--------+-----------------+-----------------+---------------------+------+---------------+-------------------+----+--------+---------+------------+--------+---------+---------+--------+--------+----+
|           1|          3|               EV|                20366|        EV|  N48901|             4397|            13930|              1393007|   ORD|          11977|  

In [40]:
# Tranformacion: Creacion de columna on_time
df_on_time = df_na_clean.withColumnRenamed('ARR_DEL15', 'on_time')

In [41]:
# Validacion de datos sobre el df usado para calcular metricas
data_quality_check(df_on_time)

+--------------------+---------+----+------------+------+--------------+--------------+------------+
|             columna|data_type|null|null_percent|blacks|blanks_percent|    duplicados|record_count|
+--------------------+---------+----+------------+------+--------------+--------------+------------+
|            ARR_TIME|      int|   0|         0.0|     0|           0.0|Sin duplicados|     1704588|
|           CANCELLED|   double|   0|         0.0|     0|           0.0|Sin duplicados|     1704588|
|        DAY_OF_MONTH|      int|   0|         0.0|     0|           0.0|Sin duplicados|     1704588|
|         DAY_OF_WEEK|      int|   0|         0.0|     0|           0.0|Sin duplicados|     1704588|
|           DEP_DEL15|   double|   0|         0.0|     0|           0.0|Sin duplicados|     1704588|
|            DEP_TIME|      int|   0|         0.0|     0|           0.0|Sin duplicados|     1704588|
|        DEP_TIME_BLK|   string|   0|         0.0|     0|           0.0|Sin duplicados|    

In [42]:
# Tranformacion: Calculo de metricas parte 1 (total_flights,departure_delayed_flights,arrival_delayed_flights)
df_metrics = df_on_time.groupBy('OP_CARRIER','ORIGIN','DEST').agg(
    count('*').alias('total_flights'),
    sum('DEP_DEL15').alias('departure_delayed_flights'),
    sum('on_time').alias('arrival_delayed_flights')
)
df_metrics.show(5)

+----------+------+----+-------------+-------------------------+-----------------------+
|OP_CARRIER|ORIGIN|DEST|total_flights|departure_delayed_flights|arrival_delayed_flights|
+----------+------+----+-------------+-------------------------+-----------------------+
|        WN|   HRL| AUS|           78|                      5.0|                    4.0|
|        WN|   MCI| ATL|          191|                     16.0|                   17.0|
|        WN|   MHT| MCO|          192|                      9.0|                   21.0|
|        WN|   MKE| BOS|            4|                      0.0|                    0.0|
|        MQ|   ORD| MQT|           86|                     18.0|                   20.0|
+----------+------+----+-------------+-------------------------+-----------------------+
only showing top 5 rows



In [43]:
# Tranformacion: Calculo de metricas parte 2 (porcentajes)
df_metrics = df_metrics.withColumn('departure_delay_percentage', round((col('departure_delayed_flights')/col('total_flights'))*100,2)).withColumn('arrival_delay_percentage', round((col('arrival_delayed_flights')/col('total_flights'))*100,2))
df_metrics.show(5)

+----------+------+----+-------------+-------------------------+-----------------------+--------------------------+------------------------+
|OP_CARRIER|ORIGIN|DEST|total_flights|departure_delayed_flights|arrival_delayed_flights|departure_delay_percentage|arrival_delay_percentage|
+----------+------+----+-------------+-------------------------+-----------------------+--------------------------+------------------------+
|        WN|   HRL| AUS|           78|                      5.0|                    4.0|                      6.41|                    5.13|
|        WN|   MCI| ATL|          191|                     16.0|                   17.0|                      8.38|                     8.9|
|        WN|   MHT| MCO|          192|                      9.0|                   21.0|                      4.69|                   10.94|
|        WN|   MKE| BOS|            4|                      0.0|                    0.0|                       0.0|                     0.0|
|        MQ| 

In [46]:
# Select para ordenar las columnas en un formato esperado y
# se escribe el parquet en el output esperado particionado por 'OP_CARRIER','ORIGIN','DEST' ,
# esperando que busqueda de los cliente sea como en un trayecto existen retrasos segun la aerolinea
df_output = df_metrics.select('OP_CARRIER','ORIGIN','DEST','total_flights','departure_delayed_flights','departure_delay_percentage','arrival_delayed_flights','arrival_delay_percentage')
df_output.write.mode("overwrite").partitionBy('OP_CARRIER','ORIGIN','DEST').parquet(output_path)