In [None]:

import pyspark

if "sc" in globals ():
    sc.stop()

#Inicializar Spark
sc = pyspark.SparkContext('local[*]')

#Cargar datos archivo
data = sc.textFile('calidad_aire_datos_meteo_mes.csv')

header = data.first()
data_no_header = data.filter(lambda row: row != header)

magnitud = 3

parded_data = data_no_header.map(lambda line:line.split(';'))
       

rdd_temp = parded_data.filter(lambda cols: cols[magnitud] == "83")


def tiene_dato_valido(cols):
    return 'V' in cols

rdd_filtrado = rdd_temp.filter(tiene_dato_valido)

# 5. Contar los registros resultantes 
total_registros_validos = rdd_filtrado.count()

print(f"Registros diarios de temperatura con al menos un dato válido: {total_registros_validos}")




Registros diarios de temperatura con al menos un dato válido: 224


In [7]:
import pyspark

# 1. Inicializar Spark (reiniciando si ya existe)
if "sc" in globals():
    sc.stop()
sc = pyspark.SparkContext('local[*]')

# 2. Cargar datos y quitar cabecera
data = sc.textFile('calidad_aire_datos_meteo_mes.csv')
header = data.first()
rdd_datos = data.filter(lambda row: row != header).map(lambda line: line.split(';'))

# 3. Filtrar solo la Magnitud 83 (Temperatura)
rdd_temp = rdd_datos.filter(lambda cols: cols[3] == "83")

# 4. Función simplificada para extraer (fecha, temperatura) de cada hora válida
def extraer_temperaturas(cols):
    # Formatear la fecha como YYYY-MM-DD (índices 5=Año, 6=Mes, 7=Día)
    fecha = f"{cols[5]}-{cols[6].zfill(2)}-{cols[7].zfill(2)}"
    
    # Recorrer las 24 horas. Los valores van del índice 8 al 54 saltando de 2 en 2
    for i in range(8, 56, 2):
        valor = cols[i]
        validacion = cols[i+1]
        
        # Si la hora es válida ('V') y no está vacía, devolvemos el par (fecha, temperatura)
        if validacion == 'V' and valor.strip() != '':
            temp_float = float(valor.replace(',', '.'))
            yield (fecha, temp_float)  #Yield lo pone en formato listas expulsando los datos no validos

# 5. Aplicar flatMap para obtener una lista gigante de todas las (fecha, temperatura) válidas
rdd_pares = rdd_temp.flatMap(extraer_temperaturas)  #FlatMap se encarga él solo de coger todas esas listitas de cada estación y juntarlas en uno solo

# 6. Agrupar por fecha y quedarse con la temperatura máxima de cada día
rdd_max_diaria = rdd_pares.reduceByKey(max).sortByKey()

# 7. Mostrar resultados
for fecha, temp_max in rdd_max_diaria.collect():
    print(f"{fecha}: {temp_max} °C")

sc.stop()


2026-02-01: 11.7 °C
2026-02-02: 12.2 °C
2026-02-03: 9.8 °C
2026-02-04: 11.3 °C
2026-02-05: 15.5 °C
2026-02-06: 10.3 °C
2026-02-07: 9.0 °C
2026-02-08: 12.7 °C


In [9]:
import pyspark

# 1. Inicializar Spark
if "sc" in globals():
    sc.stop()
sc = pyspark.SparkContext('local[*]')

# 2. Cargar datos y quitar cabecera
data = sc.textFile('calidad_aire_datos_meteo_mes.csv')
header = data.first()
rdd_datos = data.filter(lambda row: row != header).map(lambda line: line.split(';'))

# 3. Filtrar solo la Magnitud 89 (Precipitación)
rdd_precip = rdd_datos.filter(lambda cols: cols[3] == "89")


# 4. Función para calcular la suma de precipitación de una estación
def calcular_precipitacion_diaria(cols):
    municipio = cols[1]
    estacion = cols[2]
    # Formateamos la fecha a YYYY-MM-DD
    fecha = f"{cols[5]}-{cols[6].zfill(2)}-{cols[7].zfill(2)}"
    
    suma_diaria = 0.0

    # Recorremos las 24 horas (valores y validaciones)
    for i in range(8, 56, 2):
        valor = cols[i]
        validacion = cols[i+1]
        
        # Solo sumamos si el dato es válido ('V') y no está vacío
        if validacion == 'V' and valor.strip() != '':
            suma_diaria += float(valor.replace(',', '.'))
            
    # Usamos yield para expulsar una tupla: (Clave, Valor)
    # Clave -> fecha
    # Valor -> (municipio, estacion, suma_diaria)
    yield (fecha, (municipio, estacion, suma_diaria))



# 5. Aplicar flatMap (al usar yield, flatMap "desempaqueta" lo que el generador produce)
rdd_totales_diarios = rdd_precip.flatMap(calcular_precipitacion_diaria)


# 6. Agrupar por fecha y quedarse con la estación que tenga la suma mayor ese día
# Comparamos el índice 2 de los valores (que corresponde a suma_diaria)
rdd_max_diaria = rdd_totales_diarios.reduceByKey(lambda a, b: a if a[2] >= b[2] else b)

# Ordenamos por fecha
rdd_max_diaria_ordenado = rdd_max_diaria.sortByKey()

# 7. Recoger y mostrar el primer resultado
resultados = rdd_max_diaria_ordenado.collect()

print("ESTACIÓN CON MAYOR PRECIPITACIÓN POR DÍA:")

for fecha, datos in resultados:
    municipio, estacion, precipitacion = datos
    print(f"{fecha} | Municipio: {municipio} | Estación: {estacion} | Total: {round(precipitacion, 2)} mm")

# 8. Calcular el récord absoluto de todo el periodo
# La función max de Spark busca el máximo evaluando la función que le pasemos.
# En este caso le decimos que mire en x[1][2], que es el total de precipitación.
record_absoluto = rdd_max_diaria.max(key=lambda x: x[1][2])

fecha_rec = record_absoluto[0]
muni_rec, est_rec, prec_rec = record_absoluto[1]

print("RÉCORD ABSOLUTO DEL PERIODO:")
print(f"Fecha: {fecha_rec} | Municipio: {muni_rec} | Estación: {est_rec} | Total: {round(prec_rec, 2)} mm")

sc.stop()





ESTACIÓN CON MAYOR PRECIPITACIÓN POR DÍA:
2026-02-01 | Municipio: 120 | Estación: 1 | Total: 20.8 mm
2026-02-02 | Municipio: 161 | Estación: 1 | Total: 18.4 mm
2026-02-03 | Municipio: 127 | Estación: 4 | Total: 7.6 mm
2026-02-04 | Municipio: 45 | Estación: 2 | Total: 11.6 mm
2026-02-05 | Municipio: 115 | Estación: 3 | Total: 30.8 mm
2026-02-06 | Municipio: 67 | Estación: 1 | Total: 6.3 mm
2026-02-07 | Municipio: 115 | Estación: 3 | Total: 21.8 mm
2026-02-08 | Municipio: 120 | Estación: 1 | Total: 25.1 mm
RÉCORD ABSOLUTO DEL PERIODO:
Fecha: 2026-02-05 | Municipio: 115 | Estación: 3 | Total: 30.8 mm


In [10]:
import pyspark

# 1. Inicializar Spark
if "sc" in globals():
    sc.stop()
sc = pyspark.SparkContext('local[*]')

# 2. Cargar datos y quitar cabecera
data = sc.textFile('calidad_aire_datos_meteo_mes.csv')
header = data.first()
rdd_datos = data.filter(lambda row: row != header).map(lambda line: line.split(';'))

# 3. Filtrar Magnitud 83 (Temperatura)
rdd_temp = rdd_datos.filter(lambda cols: cols[3] == "83")

# 4. Función para extraer las medias de las dos estaciones concretas
def extraer_medias_estaciones(cols):
    # Convertimos a entero para evitar problemas con ceros a la izquierda (ej. "006" vs "6")
    mun = int(cols[1])
    est = int(cols[2])
    
    # Identificamos de qué estación se trata
    if mun == 6 and est == 4:
        tipo = 'referencia'
    elif mun == 5 and est == 2:
        tipo = 'comparada'
    else:
        # Si no es ninguna de las dos, devolvemos lista vacía para que flatMap la ignore
        return []
        
    fecha = f"{cols[5]}-{cols[6].zfill(2)}-{cols[7].zfill(2)}"
    
    suma_temp = 0.0
    horas_validas = 0
    
    # Recorremos las horas
    for i in range(8, 56, 2):
        valor = cols[i]
        validacion = cols[i+1]
        
        if validacion == 'V' and valor.strip() != '':
            suma_temp += float(valor.replace(',', '.'))
            horas_validas += 1
            
    # Solo si hubo alguna hora válida ese día calculamos la media
    if horas_validas > 0:
        media_diaria = suma_temp / horas_validas
        # Devolvemos la fecha, el tipo de estación y su media
        return [(fecha, (tipo, media_diaria))]
    
    return []

# 5. Aplicar la función a todo el RDD
rdd_medias = rdd_temp.flatMap(extraer_medias_estaciones)

# 6. Separar en dos RDDs diferentes
# Nos quedamos con tuplas de (Fecha, Media) para cada RDD
rdd_referencia = rdd_medias.filter(lambda x: x[1][0] == 'referencia').map(lambda x: (x[0], x[1][1]))
rdd_comparada  = rdd_medias.filter(lambda x: x[1][0] == 'comparada').map(lambda x: (x[0], x[1][1]))

# 7. Unir ambos RDDs por la fecha usando JOIN
# El resultado será: (Fecha, (Media_Referencia, Media_Comparada))
rdd_unido = rdd_referencia.join(rdd_comparada)

# 8. Calcular el porcentaje y formatear
def calcular_porcentaje(datos_unidos):
    fecha = datos_unidos[0]
    media_ref = datos_unidos[1][0]
    media_comp = datos_unidos[1][1]
    
    # Evitamos dividir por cero por si la media de referencia fuera exactamente 0.0
    if media_ref != 0:
        porcentaje = (media_comp / media_ref) * 100
    else:
        porcentaje = 0.0
        
    return (fecha, porcentaje)

# Aplicamos el cálculo y ordenamos por fecha
rdd_resultado = rdd_unido.map(calcular_porcentaje).sortByKey()

# 9. Recoger y mostrar resultados
resultados = rdd_resultado.collect()

print("COMPARATIVA DE TEMPERATURAS MEDIAS DIARIAS")
print("(Referencia: Mun 6 / Est 4  |  Comparada: Mun 5 / Est 2)")
print("-" * 60)
for fecha, porcentaje in resultados:
    print(f"Fecha: {fecha} | Porcentaje: {round(porcentaje, 2)} %")

# Detener Spark
sc.stop()

COMPARATIVA DE TEMPERATURAS MEDIAS DIARIAS
(Referencia: Mun 6 / Est 4  |  Comparada: Mun 5 / Est 2)
------------------------------------------------------------
Fecha: 2026-02-01 | Porcentaje: 129.04 %
Fecha: 2026-02-02 | Porcentaje: 113.45 %
Fecha: 2026-02-03 | Porcentaje: 114.98 %
Fecha: 2026-02-04 | Porcentaje: 114.01 %
Fecha: 2026-02-05 | Porcentaje: 112.09 %
Fecha: 2026-02-06 | Porcentaje: 116.52 %
Fecha: 2026-02-07 | Porcentaje: 131.91 %
Fecha: 2026-02-08 | Porcentaje: 116.34 %
