In [0]:
#%pip install meteostat tqdm
#%restart_python

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, DoubleType, DateType, StringType
import pandas as pd
from datetime import datetime
from meteostat import Point, Daily, Stations
from tqdm import tqdm

In [0]:
# --- CARGAR DATOS INICIALES ---
spark = SparkSession.builder.appName("ClimaEnrichment").getOrCreate()
df_con_geo = spark.table("workspace.default.union_datasets")

In [0]:
METEOSTAT_API_KEY = "8a4b224e05msh7db26cbb8f4532bp1b931djsn4a9215830d1a"
Daily.key = METEOSTAT_API_KEY
Stations.key = METEOSTAT_API_KEY

In [0]:
# --- Llamadas a la API ---
loc_dates_unicas_pd = df_con_geo.select("lat_municipio", "lon_municipio", "fecha_notificacion").distinct().toPandas().dropna()

loc_unicas = loc_dates_unicas_pd[['lat_municipio', 'lon_municipio']].drop_duplicates().to_records(index=False)
estaciones_map = {}
loc_sin_estacion = []
for lat, lon in tqdm(loc_unicas, desc="Buscando estaciones cercanas"):
    try:
        stations = Stations().nearby(lat, lon).fetch(1)
        if not stations.empty:
            estaciones_map[(lat, lon)] = stations.index[0]
        else:
            loc_sin_estacion.append((lat, lon))
    except Exception:
        loc_sin_estacion.append((lat, lon))

mapa_pd = pd.DataFrame(estaciones_map.items(), columns=['coords', 'station_id'])
mapa_pd[['lat_municipio', 'lon_municipio']] = pd.DataFrame(mapa_pd['coords'].tolist(), index=mapa_pd.index)
df_con_estaciones = pd.merge(loc_dates_unicas_pd, mapa_pd, on=['lat_municipio', 'lon_municipio'], how='left').dropna(subset=['station_id'])
consultas_por_estacion = df_con_estaciones.groupby('station_id')['fecha_notificacion'].agg(list)

lista_clima_pd = []
for station_id, fechas in tqdm(consultas_por_estacion.items(), desc="Descargando datos climáticos"):
    try:
        start_date = datetime(min(fechas).year, min(fechas).month, min(fechas).day)
        end_date = datetime(max(fechas).year, max(fechas).month, max(fechas).day)
        data = Daily(station_id, start_date, end_date).fetch()
        if not data.empty:
            data['station_id'] = station_id
            lista_clima_pd.append(data)
    except Exception as e:
        print(f"Advertencia: No se pudieron obtener datos para la estación {station_id}. Error: {e}")

In [0]:
# --- Unir datos climáticos y guardar tabla final ---
if not lista_clima_pd:
    print("Error: No se pudo obtener ningún dato climático de la API. El proceso se detendrá.")
else:
    clima_pd_completo = pd.concat(lista_clima_pd).reset_index()[['time', 'station_id', 'tavg', 'prcp']]
    clima_pd_completo.rename(columns={'time': 'fecha_notificacion', 'tavg': 'temperatura_media', 'prcp': 'precipitacion'}, inplace=True)
    
    mapa_estaciones_spark = spark.createDataFrame(mapa_pd)
    df_con_station_id = df_con_geo.join(mapa_estaciones_spark, on=['lat_municipio', 'lon_municipio'], how='left')
    
    clima_spark_df = spark.createDataFrame(clima_pd_completo)
    df_final_con_clima = df_con_station_id.join(clima_spark_df, on=['station_id', 'fecha_notificacion'], how='left')
    
    print("Guardando la tabla enriquecida en 'workspace.default.complete_dataset_con_clima'...")
    df_final_con_clima.write.mode("overwrite").option("mergeSchema", "true").saveAsTable("workspace.default.complete_dataset_con_clima")    
    
    count_final = df_final_con_clima.count()
    print(f"--- Proceso completado. Se guardaron {count_final} registros en 'complete_dataset_con_clima'. ---")