### **Procesamiento y Análisis de Datos en la Nube**: Implementación de un Pipeline ETL con Apache Spark y Databricks

- **Objetivo General**: Implementación de un proceso ETL (Extract, Transform, Load) utilizando Apache Spark en el entorno de Databricks para analizar datos de retrasos de vuelos.
- **Entorno**:	Databricks Free Edition. Ejecución de código a través de Job Clusters para simular un ambiente de producción.
- **Proceso ETL**: E (Extracción): Lectura del dataset público departuredelays.csv (vuelos). T (Transformación): Limpieza de datos, creación de la columna es_retrasado y cálculo de agregaciones (retraso promedio). L (Carga): Almacenamiento de los datos limpios en formato Delta Lake (vuelos_final_delta) para asegurar la integridad (ACID).
- **Resultados y Análisis**: La visualización con Matplotlib mostró que el aeropuerto SWF presentó el mayor retraso promedio (aprox. 75 minutos). El gráfico de dispersión confirmó que la distancia del vuelo tiene poca correlación con el tiempo de retraso, sugiriendo que factores operacionales son más determinantes.

In [0]:
# Comando simple para verificar que el contexto de Spark está disponible
print("Conectando con Apache Spark...")

# Usar Spark SQL para crear un DataFrame simple
data = [("Alice", 1), ("Bob", 2), ("Charlie", 3)]
columns = ["name", "id"]
df = spark.createDataFrame(data, columns)

# Mostrar el DataFrame (acción que forzará la inicialización del clúster del Job)
df.show()

In [0]:
# Ruta universal para un dataset CSV de ejemplo (vuelos)
csv_file_path = "/databricks-datasets/flights/departuredelays.csv"

In [0]:
# Leer el archivo CSV en un DataFrame de Spark
vuelos_df = (
  spark.read
    .format("csv")
    .option("header", "true")  # Trata la primera fila como encabezado de columna
    .option("inferSchema", "true") # Pide a Spark que intente adivinar el tipo de dato de cada columna
    .load(csv_file_path)
)

# Mostrar las primeras 5 filas y el esquema
print("Primeras 5 filas (para inspección visual):")
vuelos_df.show(5)

print("\nEsquema del DataFrame:")
vuelos_df.printSchema()

In [0]:
# 1. Registrar el DataFrame como una vista temporal para poder consultarlo con SQL
vista_temp_name = "vuelos_temporal"
vuelos_df.createOrReplaceTempView(vista_temp_name)
print(f"DataFrame registrado como vista temporal: {vista_temp_name}")

# 2. Usar Spark SQL para hacer una consulta básica (Contar filas)
# La directiva mágica '%sql' cambia el lenguaje de la celda a SQL
# Esto debe ejecutarse en una CELDA SEPARADA

In [0]:
%sql
-- Contar el número total de transacciones (filas)
SELECT count(*) AS Total_Registros
FROM vuelos_temporal;

In [0]:
%sql
-- Consultar las primeras 10 filas (SELECT TOP 10)
SELECT *
FROM vuelos_temporal
LIMIT 10;

In [0]:
# Contar registros antes de la eliminación de duplicados
total_antes = vuelos_df.count()

# Eliminar duplicados
vuelos_limpio_df = vuelos_df.dropDuplicates()

# Contar registros después de la eliminación
total_despues = vuelos_limpio_df.count()

print(f"Total de registros antes de limpieza: {total_antes}")
print(f"Total de registros después de eliminar duplicados: {total_despues}")

In [0]:
# Eliminar filas donde 'delay' o 'distance' sean nulas
# Esto se hace sobre el DataFrame ya sin duplicados
vuelos_final_df = vuelos_limpio_df.na.drop(subset=["delay", "distance"])

print(f"Registros después de manejar nulos: {vuelos_final_df.count()}")

In [0]:
from pyspark.sql.functions import when, col

# Crear una columna binaria 'es_retrasado'
vuelos_transformado_df = vuelos_final_df.withColumn(
    "es_retrasado",
    when(col("delay") > 15, 1).otherwise(0)
)

print("\nVista previa del DataFrame con la nueva columna 'es_retrasado':")
vuelos_transformado_df.select("delay", "es_retrasado", "origin").show(5)

In [0]:
# 1. Actualizar la vista temporal con el nuevo DataFrame
vuelos_transformado_df.createOrReplaceTempView("vuelos_final")

print("Vista temporal 'vuelos_final' actualizada.")

In [0]:
%sql
-- Contar cuántos vuelos fueron clasificados como "retrasados" (> 15 minutos)
SELECT
    COUNT(*) AS Total_Vuelos_Retrasados,
    MIN(delay) AS Min_Retraso,
    MAX(delay) AS Max_Retraso
FROM vuelos_final
WHERE es_retrasado = 1;

In [0]:
%sql
-- Calcular el retraso promedio y la distancia total por aeropuerto de origen
SELECT
    origin,
    AVG(delay) AS Retraso_Promedio,
    SUM(distance) AS Distancia_Total
FROM vuelos_final
GROUP BY origin
ORDER BY Retraso_Promedio DESC
LIMIT 5;

In [0]:
# Crear un DataFrame con datos de aeropuertos para simular un JOIN
from pyspark.sql import Row

aeropuertos_data = [
    Row(code="ATL", name="Atlanta Intl"),
    Row(code="DEN", name="Denver Intl"),
    Row(code="SFO", name="San Francisco Intl"),
    Row(code="LAX", name="Los Angeles Intl")
]

aeropuertos_df = spark.createDataFrame(aeropuertos_data)

# Realizar un INNER JOIN con el DataFrame de vuelos
vuelos_con_nombres_df = vuelos_transformado_df.alias("v").join(
    aeropuertos_df.alias("a"),
    col("v.origin") == col("a.code"),
    "inner"
).select(
    "v.origin",
    "a.name",  # Nombre del aeropuerto obtenido del Join
    "v.delay",
    "v.distance"
)

print("\nResultado del JOIN (vuelos con nombre de aeropuerto):")
vuelos_con_nombres_df.show(5, truncate=False)

In [0]:
# Crear el nombre de la tabla en el catálogo
delta_table_name = "vuelos_final_delta"

# Guardar el DataFrame transformado como una TABLA DELTA en el catálogo
(
  vuelos_transformado_df.write
    .format("delta")
    .mode("overwrite")  # Sobrescribe la tabla si ya existe
    .saveAsTable(delta_table_name) # usa saveAsTable
)

print(f"Datos guardados con éxito en la tabla Delta: {delta_table_name}")

In [0]:
# Leer los datos desde el catálogo (usando el nombre de la tabla)
vuelos_delta_df = spark.table(delta_table_name)

# 1. Validar el esquema (debe incluir la columna 'es_retrasado')
print("\nEsquema de la tabla Delta (validación):")
vuelos_delta_df.printSchema()

# 2. Validar el conteo de filas
conteo_delta = vuelos_delta_df.count()
print(f"\nConteo de filas de la tabla Delta: {conteo_delta}")

# 3. Mostrar las primeras filas (validación de contenido)
print("\nPrimeras 5 filas del Delta Lake:")
vuelos_delta_df.show(5)

# Opcional: Ejecutar una consulta SQL simple para validar el acceso al catálogo

In [0]:
# 1. Consulta SQL para obtener los datos de agregación
retraso_por_origen_df = spark.sql("""
    SELECT
        origin,
        AVG(delay) AS Retraso_Promedio
    FROM vuelos_final_delta
    WHERE delay > 0 -- Solo vuelos con retraso
    GROUP BY origin
    HAVING Retraso_Promedio IS NOT NULL
    ORDER BY Retraso_Promedio DESC
    LIMIT 10
""")

# 2. Convertir el DataFrame de Spark a un DataFrame de Pandas
# El método .toPandas() transfiere los datos al nodo conductor para su procesamiento local (visualización)
retraso_pandas_df = retraso_por_origen_df.toPandas()

print("Datos listos para graficar (Pandas DataFrame):")
print(retraso_pandas_df)

In [0]:
import matplotlib.pyplot as plt

# Crear la figura y los ejes para el gráfico
plt.figure(figsize=(10, 6))

# Crear el gráfico de barras
plt.bar(
    retraso_pandas_df['origin'],
    retraso_pandas_df['Retraso_Promedio'],
    color='skyblue'
)

# Añadir títulos y etiquetas
plt.title('Top 10 Aeropuertos con Mayor Retraso Promedio')
plt.xlabel('Aeropuerto de Origen')
plt.ylabel('Retraso Promedio (Minutos)')
plt.xticks(rotation=45, ha='right') # Rotar etiquetas para mejor lectura
plt.grid(axis='y', linestyle='--')
plt.tight_layout() # Ajustar el diseño

# Mostrar el gráfico
# Databricks renderizará automáticamente este gráfico en la salida del Job
plt.show()

**Conclusión**: El gráfico de barras permite ver de forma clara qué aeropuertos tienen el peor rendimiento en términos de retraso promedio. Por ejemplo, el aeropuerto SWF (Stewart - Newburgh) que muestra la barra más alta, como visto anteriormente en una de las consultas.

In [0]:
# 1. Consulta SQL para obtener los datos agregados: Retraso Promedio por Distancia
retraso_por_distancia_df = spark.sql("""
    SELECT
        distance,
        AVG(delay) AS Retraso_Promedio_Minutos
    FROM vuelos_final_delta
    WHERE delay > 0 -- Solo consideramos vuelos con algún retraso
    GROUP BY distance
    HAVING Retraso_Promedio_Minutos IS NOT NULL
    ORDER BY distance
""")

# 2. Convertir el DataFrame de Spark a un DataFrame de Pandas
retraso_distancia_pandas_df = retraso_por_distancia_df.toPandas()

print("Datos listos para el Gráfico de Dispersión (Pandas DataFrame):")
print(retraso_distancia_pandas_df.head())

In [0]:
import matplotlib.pyplot as plt

# Crear la figura y los ejes para el gráfico
plt.figure(figsize=(10, 6))

# Crear el gráfico de dispersión
plt.scatter(
    retraso_distancia_pandas_df['distance'],
    retraso_distancia_pandas_df['Retraso_Promedio_Minutos'],
    alpha=0.6, # Transparencia para ver la densidad de los puntos
    s=20 # Tamaño del marcador
)

# Añadir títulos y etiquetas
plt.title('Relación entre Distancia de Vuelo y Retraso Promedio')
plt.xlabel('Distancia (Millas)')
plt.ylabel('Retraso Promedio (Minutos)')
plt.grid(True, linestyle='--', alpha=0.6)
plt.tight_layout()

# Mostrar el gráfico
plt.show()

**Conclusión**: La distancia del vuelo (distance) tiene poca correlación directa con el tiempo de retraso (delay). Esto refuerza la idea de que los factores operativos y climáticos en el aeropuerto de origen son más importantes que la duración del vuelo.