In [None]:
from pyspark import SparkContext

# Inicialización del Contexto (si no estás en la shell interactiva)
sc = SparkContext("local[*]", "Sd_P1")
file_name = "calidad_aire_datos_meteo_mes.csv"

# Constantes Simbólicas del Dominio
MAG_TEMP = 83  # Magnitud Temperatura 
MAG_PREC = 89  # Magnitud Precipitación 
FLAG_VALID = 'V' # Bandera de validación [cite: 81]

# Carga del archivo (Asumiendo que lo subiste a Jupyter)
raw_rdd = sc.textFile(f"file:///home/jovyan/Sd_P1/{file_name}")

# Función para limpiar y estructurar una línea del CSV
def parse_line(line):
    try:
        parts = line.split(';')
        
        # 1. Filtro de integridad básica
        # Si la línea no tiene ni siquiera los metadatos básicos, la ignoramos.
        if len(parts) < 9: 
            return None
        
        # Extracción de metadatos
        municipio = parts[1]
        estacion = parts[2]
        magnitud = int(parts[3])
        
        # Construcción de Fecha
        ano = parts[6]
        mes = parts[7]
        dia = parts[8]
        fecha = f"{ano}-{mes.zfill(2)}-{dia.zfill(2)}"
        
        valores_validos = []
        
        # 2. Iteración segura
        base_idx = 9
        for i in range(24):
            val_idx = base_idx + (i * 2)
            flag_idx = base_idx + (i * 2) + 1
            
            # CRÍTICO: Verificamos que el índice del flag (que es el mayor)
            # esté dentro de los límites de la lista 'parts'.
            if flag_idx >= len(parts): 
                break
            
            val_str = parts[val_idx]
            flag = parts[flag_idx]
            
            if flag == 'V':
                # Conversión segura a float
                valor = float(val_str.replace(',', '.'))
                valores_validos.append(valor)
                
        return {
            'fecha': fecha,
            'muni': municipio,
            'est': estacion,
            'mag': magnitud,
            'vals': valores_validos
        }
    except Exception:
        # Si cualquier cosa falla en esta línea específica (formato, índices, conversión),
        # devolvemos None para que el filtro posterior la elimine limpiamente
        # sin abortar el trabajo de Spark.
        return None

# --- RECARGA DEL RDD CON LA NUEVA FUNCIÓN ---

# Asegúrate de volver a ejecutar la creación del RDD filtrado
data_rdd = raw_rdd.map(parse_line).filter(lambda x: x is not None)

temp_rdd = data_rdd.filter(lambda x: x['mag'] == MAG_TEMP)

# Prueba de nuevo el conteo
print(f"Total registros válidos: {data_rdd.count()}")



# Ejercicio 2
# 1. Mapeamos a (Fecha, Max_Temperatura_Del_Registro)
# Usamos max(x['vals']) porque la lista ya está filtrada y convertida a float
# Filtramos primero para asegurar que 'vals' no esté vacío y evitar errores
daily_max_rdd = temp_rdd.filter(lambda x: len(x['vals']) > 0) \
                        .map(lambda x: (x['fecha'], max(x['vals'])))

# Nota: Si hay múltiples estaciones midiendo el mismo día y quieres 
# EL MÁXIMO de la Comunidad ese día, hacemos una reducción:
final_daily_max = daily_max_rdd.reduceByKey(lambda a, b: max(a, b)).sortByKey()

# Salida
print("Fecha\t\tTemp_Max")
for date, t_max in final_daily_max.collect():
    print(f"{date}\t{t_max}")


#Ejercicio 3
# 1. Filtramos por Precipitación y calculamos la suma diaria por estación
precip_rdd = data_rdd.filter(lambda x: x['mag'] == MAG_PREC) \
                     .map(lambda x: (
                         x['fecha'], 
                         (x['muni'], x['est'], sum(x['vals'])) # Payload
                     ))

# PARTE A: Máximo por día
# ReduceByKey: Comparamos dos estaciones del mismo día y nos quedamos con la de mayor lluvia
# x e y son tuplas (muni, est, total). Comparamos x[2] con y[2]
max_precip_por_dia = precip_rdd.reduceByKey(lambda x, y: x if x[2] >= y[2] else y) \
                               .sortByKey()

print("--- Máximos por día ---")
# Output: Fecha, Muni, Est, Total [cite: 90]
for date, data in max_precip_por_dia.collect():
    print(f"Fecha: {date}, Muni: {data[0]}, Est: {data[1]}, Precip: {data[2]}")

# PARTE B: Máximo Absoluto del periodo [cite: 91]
# Buscamos en el RDD el registro con el valor [2] (total) más alto
# Usamos una función key para indicarle a max() qué valor comparar
max_absoluto = max_precip_por_dia.max(key=lambda x: x[1][2])

print("\n--- Máximo Absoluto ---")
print(f"Fecha: {max_absoluto[0]}")
print(f"Municipio: {max_absoluto[1][0]}, Estación: {max_absoluto[1][1]}")
print(f"Precipitación Total: {max_absoluto[1][2]}")

# Ejercicio 4
def get_daily_avg(record):
    if len(record['vals']) == 0: return None
    avg = sum(record['vals']) / len(record['vals'])
    return (record['fecha'], avg)

# 1. Creamos RDDs separados para cada estación específica [cite: 96]
# Referencia: Municipio 6, Estación 4
ref_station = temp_rdd.filter(lambda x: int(x['muni']) == 6 and int(x['est']) == 4) \
                      .map(get_daily_avg) \
                      .filter(lambda x: x is not None)

# Comparada: Municipio 5, Estación 2
comp_station = temp_rdd.filter(lambda x: int(x['muni']) == 5 and int(x['est']) == 2) \
                       .map(get_daily_avg) \
                       .filter(lambda x: x is not None)

# 2. Unimos los datos por Fecha (Inner Join)
# Resultado estructura: (Fecha, (Media_Ref, Media_Comp))
joined_data = ref_station.join(comp_station)

# 3. Calculamos el porcentaje
# (Fecha, (Ref, Comp)) -> (Fecha, Porcentaje)
percentage_rdd = joined_data.mapValues(lambda x: (x[1] / x[0]) * 100).sortByKey()

print("--- Comparativa (Porcentaje) ---")
for date, pct in percentage_rdd.collect():
    print(f"Fecha: {date}, Porcentaje: {percentage_rdd:.2f}%")

Total registros válidos: 1568
Fecha		Temp_Max
--- Máximos por día ---
Fecha: 2-01-0,0, Muni: 102, Est: 1, Precip: 0
Fecha: 2-02-0,0, Muni: 106, Est: 1, Precip: 0
Fecha: 2-02-0,2, Muni: 14, Est: 2, Precip: 0
Fecha: 2-02-0,3, Muni: 120, Est: 1, Precip: 0
Fecha: 2-02-0,4, Muni: 123, Est: 2, Precip: 0
Fecha: 2-02-0,7, Muni: 6, Est: 4, Precip: 0
Fecha: 2-02-0,8, Muni: 134, Est: 2, Precip: 0
Fecha: 2-02-1,0, Muni: 16, Est: 1, Precip: 0
Fecha: 2-02-1,1, Muni: 80, Est: 3, Precip: 0
Fecha: 2-02-1,2, Muni: 102, Est: 1, Precip: 0
Fecha: 2-02-1,8, Muni: 133, Est: 2, Precip: 0
Fecha: 2-02-2,2, Muni: 127, Est: 4, Precip: 0
Fecha: 2-02-3,0, Muni: 115, Est: 3, Precip: 0
Fecha: 2-03-0,0, Muni: 102, Est: 1, Precip: 0
Fecha: 2-03-0,1, Muni: 120, Est: 1, Precip: 0
Fecha: 2-03-0,2, Muni: 123, Est: 2, Precip: 0
Fecha: 2-03-0,4, Muni: 133, Est: 2, Precip: 0
Fecha: 2-03-1,0, Muni: 127, Est: 4, Precip: 0
Fecha: 2-04-0,6, Muni: 45, Est: 2, Precip: 0
Fecha: 2-04-0,7, Muni: 16, Est: 1, Precip: 0
Fecha: 2-04-0,8, 