In [7]:
# ---------------------------------------------------------
# SCRIPT: 02_Silver_Layer (MEJORADO)
# DESCRIPCI√ìN: Lectura de Bronze, Joins (Enriquecimiento)
# y Feature Engineering (Limpieza de Fechas/Horas) para consumo final.
# ---------------------------------------------------------

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, floor, format_string, when, length

# --- 1. CONFIGURACI√ìN ---
print("üîå Iniciando sesi√≥n de Spark...")
spark = SparkSession.builder \
    .appName("SkyTracker_Silver_ETL") \
    .master("local[*]") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()

# --- 2. LECTURA ---
print("üìÇ Cargando datos desde Bronze...")
df_vuelos = spark.read.parquet("s3a://bronze/formatted/flights")
df_airlines = spark.read.parquet("s3a://bronze/formatted/airlines")
df_airports = spark.read.parquet("s3a://bronze/formatted/airports")

print("üßπ AUDITOR√çA: Eliminando c√≥digos de aeropuerto corruptos (num√©ricos)...")
conteo_inicial = df_vuelos.count()

# Solo nos quedamos con filas donde Origen Y Destino tengan 3 letras (ej: 'JFK')
df_vuelos = df_vuelos.filter(
    (length(col("ORIGIN_AIRPORT")) == 3) & 
    (length(col("DESTINATION_AIRPORT")) == 3)
)

conteo_final = df_vuelos.count()
eliminados = conteo_inicial - conteo_final
print(f"   üìâ Filas iniciales: {conteo_inicial}")
print(f"   ‚úÖ Filas limpias:   {conteo_final}")
print(f"   üóëÔ∏è Filas eliminadas (Basura): {eliminados}")

# --- 3. TRANSFORMACI√ìN: JOINS ---
print("üîÑ Ejecutando cruces de datos (Joins)...")

# A) Aerol√≠neas
df_step1 = df_vuelos.join(
    df_airlines, 
    df_vuelos["AIRLINE"] == df_airlines["AIRLINE_ID"], 
    "left"
)

# B) Origen
print("   üìç Cruzando Origen...")
df_step2 = df_step1.join(
    df_airports.alias("origen"), 
    col("ORIGIN_AIRPORT") == col("origen.IATA_CODE"),
    "left"
)

# C) Destino
print("   üìç Cruzando Destino...")
df_silver_raw = df_step2.join(
    df_airports.alias("destino"), 
    col("DESTINATION_AIRPORT") == col("destino.IATA_CODE"),
    "left"
)

# --- 4. LIMPIEZA Y FEATURE ENGINEERING (TU CAMBIO) ---
print("üßπ Aplicando l√≥gica de negocio (D√≠as y Horas)...")

df_master = df_silver_raw.select(
    # --- Tiempo (Originales) ---
    col("YEAR"), col("MONTH"), col("DAY"), 
    
    # --- NUEVO: Nombre del D√≠a (Lunes, Martes...) ---
    col("DAY_OF_WEEK"), # Mantenemos el n√∫mero por si acaso
    when(col("DAY_OF_WEEK") == 1, "1. Lunes")
        .when(col("DAY_OF_WEEK") == 2, "2. Martes")
        .when(col("DAY_OF_WEEK") == 3, "3. Mi√©rcoles")
        .when(col("DAY_OF_WEEK") == 4, "4. Jueves")
        .when(col("DAY_OF_WEEK") == 5, "5. Viernes")
        .when(col("DAY_OF_WEEK") == 6, "6. S√°bado")
        .when(col("DAY_OF_WEEK") == 7, "7. Domingo")
        .otherwise("Desconocido")
        .alias("DAY_NAME"),
    
    # --- Aerol√≠nea ---
    col("AIRLINE").alias("AIRLINE_CODE"),
    col("AIRLINE_NAME"),
    
    # --- Origen ---
    col("ORIGIN_AIRPORT").alias("ORIGIN_CODE"),
    col("origen.AIRPORT").alias("ORIGIN_NAME"),
    col("origen.CITY").alias("ORIGIN_CITY"),
    col("origen.STATE").alias("ORIGIN_STATE"),
    col("origen.LATITUDE").alias("ORIGIN_LAT"),
    col("origen.LONGITUDE").alias("ORIGIN_LONG"),
    
    # --- Destino ---
    col("DESTINATION_AIRPORT").alias("DEST_CODE"),
    col("destino.AIRPORT").alias("DEST_NAME"),
    col("destino.CITY").alias("DEST_CITY"),
    col("destino.STATE").alias("DEST_STATE"),
    col("destino.LATITUDE").alias("DEST_LAT"),
    col("destino.LONGITUDE").alias("DEST_LONG"),
    
    # --- M√©tricas y Tiempos ---
    col("SCHEDULED_DEPARTURE"), # Hora original (ej: 1445)
    
    # --- NUEVO: Hora Formateada (ej: "14:00") ---
    format_string("%02d:00", floor(col("SCHEDULED_DEPARTURE") / 100)).alias("DEPARTURE_HOUR"),
    
    col("DEPARTURE_TIME"),
    col("DEPARTURE_DELAY"),
    col("ARRIVAL_DELAY"),
    col("DISTANCE"),
    col("AIR_TIME"),
    col("CANCELLED"),
    col("CANCELLATION_REASON")
)

# --- 5. ESCRITURA ---
ruta_destino = "s3a://silver/master_flights"
print(f"üíæ Guardando Tabla Maestra MEJORADA en: {ruta_destino}")
df_master.write.mode("overwrite").parquet(ruta_destino)
print("‚úÖ ¬°SILVER LAYER ACTUALIZADA CON √âXITO!")


üîå Iniciando sesi√≥n de Spark...
üìÇ Cargando datos desde Bronze...
üîÑ Ejecutando cruces de datos (Joins)...
   üìç Cruzando Origen...
   üìç Cruzando Destino...
üßπ Aplicando l√≥gica de negocio (D√≠as y Horas)...
üíæ Guardando Tabla Maestra MEJORADA en: s3a://silver/master_flights
‚úÖ ¬°SILVER LAYER ACTUALIZADA CON √âXITO!
