In [None]:
# (opcional) crear venv/conda
pip install -U pandas numpy matplotlib scikit-learn catboost geopandas shap folium pyproj rtree fiona


In [None]:
# Crear carpetas de salida si no existen
from pathlib import Path
for p in ["reports/figures","reports/tables"]:
    Path(p).mkdir(parents=True, exist_ok=True)


In [1]:
import pandas as pd, numpy as np
from pathlib import Path

PATH_PARQUET = Path("data/processed/ven_rainfall_adm2.parquet")
PATH_MAP_BEST = Path("data/external/adm2_pcode_to_gid2_BEST.csv")  # si existe
assert PATH_PARQUET.exists(), "No encuentro data/processed/ven_rainfall_adm2.parquet"

df = pd.read_parquet(PATH_PARQUET)

# Normalizar columnas mínimas esperadas
df.columns = [c.strip() for c in df.columns]
if "date" in df.columns:
    df["date"] = pd.to_datetime(df["date"], errors="coerce")
elif "Date" in df.columns:
    df["date"] = pd.to_datetime(df["Date"], errors="coerce")
else:
    raise ValueError("No encuentro columna de fecha ('date' o 'Date').")

# Asegurar ADM2_PCODE
if "ADM2_PCODE" not in df.columns:
    raise ValueError("Falta columna ADM2_PCODE en el parquet.")

# Variables candidatas presentes
cands_rain = [c for c in ["rfh","r1h","r3h"] if c in df.columns]
cands_pct  = [c for c in ["r1q","r3q","rfq"] if c in df.columns]
assert len(cands_rain)>0, "No encuentro columnas de lluvia (rfh/r1h/r3h)."

# Por defecto trabajaremos con 'rfh' si existe; si no, con la primera disponible.
y_base = "rfh" if "rfh" in cands_rain else cands_rain[0]
print("Usando variable de lluvia base para EDA y target:", y_base)

# (Opcional) cargar mapping ADM2→Estado/Municipio si está disponible
map_df = None
if PATH_MAP_BEST.exists():
    map_df = pd.read_csv(PATH_MAP_BEST)
    # Intenta detectar nombres de columnas de estado/municipio
    # (ajusta estos alias si tus nombres reales difieren)
    aliases_estado = [c for c in map_df.columns if c.lower() in {"estado","state","adm1_name"}]
    aliases_muni   = [c for c in map_df.columns if c.lower() in {"municipio","municipality","adm2_name"}]
    col_estado = aliases_estado[0] if aliases_estado else None
    col_muni   = aliases_muni[0]   if aliases_muni   else None
    print("Mapping cargado. Estado:", col_estado, "| Municipio:", col_muni)


Usando variable de lluvia base para EDA y target: rfh


In [2]:
import matplotlib.pyplot as plt
figpath = Path("reports/figures")

ts = (df
      .dropna(subset=["date", y_base])
      .groupby("date", as_index=False)[y_base].mean()
      .sort_values("date"))
ts["rolling"] = ts[y_base].rolling(6, min_periods=1).mean()  # media móvil ~60 días

plt.figure(figsize=(10,4))
plt.plot(ts["date"], ts[y_base], label="Media decádica nacional")
plt.plot(ts["date"], ts["rolling"], linewidth=2, label="Media móvil (6 decádicas)")
plt.title(f"Evolución decádica nacional de precipitación ({y_base})")
plt.xlabel("Fecha"); plt.ylabel("mm/decádica"); plt.legend(); plt.tight_layout()
plt.savefig(figpath/"eda_serie_nacional.png", dpi=200)
plt.close()


In [3]:
if map_df is not None and col_estado is not None:
    df_eda = df.copy()
    df_eda = df_eda.merge(map_df[["ADM2_PCODE", col_estado]], on="ADM2_PCODE", how="left")
    last10 = df_eda["date"] >= (df_eda["date"].max() - pd.DateOffset(years=10))
    bx = df_eda.loc[last10 & df_eda[y_base].notna(), [col_estado, y_base]].dropna()
    top_estados = (bx.groupby(col_estado)[y_base].count()
                     .sort_values(ascending=False).head(16).index.tolist())
    bx = bx[bx[col_estado].isin(top_estados)]
    # Boxplot con matplotlib puro
    data = [bx.loc[bx[col_estado]==st, y_base].values for st in top_estados]
    plt.figure(figsize=(12,5))
    plt.boxplot(data, labels=top_estados, vert=True, showfliers=False)
    plt.xticks(rotation=45, ha="right")
    plt.title(f"{y_base}: distribución por Estado (últimos 10 años)")
    plt.ylabel("mm/decádica"); plt.tight_layout()
    plt.savefig(figpath/"eda_boxplot_estado_ult10y.png", dpi=200)
    plt.close()


In [4]:
if len(cands_pct)>0:
    plt.figure(figsize=(8,4))
    vals = pd.concat([df[c].dropna() for c in cands_pct], axis=0)
    vals = vals[(vals>=0)&(vals<=100)]
    plt.hist(vals, bins=20)
    plt.title("Distribución de percentiles de precipitación (r1q/r3q/rfq)")
    plt.xlabel("Percentil [0-100]"); plt.ylabel("Frecuencia")
    plt.tight_layout()
    plt.savefig(figpath/"eda_hist_percentiles.png", dpi=200)
    plt.close()


In [5]:
def add_lags(g, cols, lags=(1,2,7,14)):
    g = g.sort_values("date").copy()
    for col in cols:
        for L in lags:
            g[f"{col}_lag{L}"] = g[col].shift(L)
    return g

# Construimos features de lags para variables disponibles
lag_cols = list(set(cands_rain + cands_pct))
df_lag = (df.groupby("ADM2_PCODE", group_keys=False)
            .apply(lambda g: add_lags(g, lag_cols, lags=(1,2,7,14))))

# Partes temporales
df_lag["month"] = df_lag["date"].dt.month
df_lag["month_sin"] = np.sin(2*np.pi*df_lag["month"]/12)
df_lag["month_cos"] = np.cos(2*np.pi*df_lag["month"]/12)

# Selección de filas válidas (sin NaN en features por los lags iniciales)
min_lag = 14
df_lag = df_lag[df_lag["date"] >= (df_lag["date"].min() + pd.Timedelta(days=10*min_lag))].copy()

# Construcción del objetivo desplazado por horizonte h (en decádicas)
def make_supervised(data, y_col, horizon_decads=1):
    def shift_target(g):
        g = g.sort_values("date").copy()
        g[f"{y_col}_tplus{horizon_decads}"] = g[y_col].shift(-horizon_decads)
        return g
    out = (data.groupby("ADM2_PCODE", group_keys=False)
                 .apply(shift_target)
                 .dropna(subset=[f"{y_col}_tplus{horizon_decads}"]))
    return out

ds_t1 = make_supervised(df_lag, y_base, horizon_decads=1)
ds_t3 = make_supervised(df_lag, y_base, horizon_decads=3)

# Definir features X y target y
def get_Xy(ds, y_col, horizon_decads):
    target = f"{y_col}_tplus{horizon_decads}"
    # Features: todos los lags + month_sin/cos + month (evita usar el y_base sin lag!)
    feats = [c for c in ds.columns if ("_lag" in c) or (c in ["month","month_sin","month_cos"])]
    X = ds[feats].astype("float32")
    y = ds[target].astype("float32")
    return X, y, feats

X1, y1, feats1 = get_Xy(ds_t1, y_base, 1)
X3, y3, feats3 = get_Xy(ds_t3, y_base, 3)

# Split temporal 80/20 por fecha
def temporal_split(ds, frac=0.8):
    cutoff = ds["date"].quantile(frac)
    train_idx = ds["date"] <= cutoff
    test_idx  = ds["date"]  > cutoff
    return train_idx, test_idx, pd.to_datetime(cutoff)

tr1, te1, cutoff1 = temporal_split(ds_t1, 0.8)
tr3, te3, cutoff3 = temporal_split(ds_t3, 0.8)

print("Cortes temporales -> t+1:", cutoff1.date(), "| t+3:", cutoff3.date())


  .apply(lambda g: add_lags(g, lag_cols, lags=(1,2,7,14))))
  .apply(shift_target)
  .apply(shift_target)


Cortes temporales -> t+1: 2016-08-21 | t+3: 2016-08-03


In [9]:
from sklearn.ensemble import RandomForestRegressor
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.metrics import mean_absolute_error, mean_squared_error
from math import sqrt

def eval_model(model, X_train, y_train, X_test, y_test, label):
    model.fit(X_train, y_train)
    pred = model.predict(X_test)
    mae = mean_absolute_error(y_test, pred)
    rmse = sqrt(mean_squared_error(y_test, pred))
    print(f"{label} -> MAE: {mae:.2f} | RMSE: {rmse:.2f}")
    return mae, rmse

rf = Pipeline([
    ("imp", SimpleImputer(strategy="median")),
    ("rf", RandomForestRegressor(
        n_estimators=180, max_depth=12, min_samples_leaf=2,
        bootstrap=True, max_samples=0.6, n_jobs=-1, random_state=42))
])

# CatBoost (tree boosting) sin one-hot y sin GPU requerido
from catboost import CatBoostRegressor
cb_params = dict(
    depth=8, learning_rate=0.1, iterations=700,
    loss_function="RMSE", random_seed=42, verbose=False
)
cb_t1 = Pipeline([("imp", SimpleImputer(strategy="median")),
                  ("cb", CatBoostRegressor(**cb_params))])
cb_t3 = Pipeline([("imp", SimpleImputer(strategy="median")),
                  ("cb", CatBoostRegressor(**cb_params))])

# t+1
mae_rf1, rmse_rf1 = eval_model(rf, X1[tr1], y1[tr1], X1[te1], y1[te1], "RF t+1")
mae_cb1, rmse_cb1 = eval_model(cb_t1, X1[tr1], y1[tr1], X1[te1], y1[te1], "CatBoost t+1")

# t+3
mae_rf3, rmse_rf3 = eval_model(rf, X3[tr3], y3[tr3], X3[te3], y3[te3], "RF t+3")
mae_cb3, rmse_cb3 = eval_model(cb_t3, X3[tr3], y3[tr3], X3[te3], y3[te3], "CatBoost t+3")

# Guardar tabla comparativa
cmp = pd.DataFrame({
    "Modelo":["RF","CatBoost","RF","CatBoost"],
    "Horizonte":["t+1","t+1","t+3","t+3"],
    "MAE":[mae_rf1, mae_cb1, mae_rf3, mae_cb3],
    "RMSE":[rmse_rf1, rmse_cb1, rmse_rf3, rmse_cb3],
    "Corte":[cutoff1.date(), cutoff1.date(), cutoff3.date(), cutoff3.date()]
})
cmp.to_csv("reports/tables/model_compare.csv", index=False)
print(cmp)


RF t+1 -> MAE: 15.04 | RMSE: 22.07
CatBoost t+1 -> MAE: 15.16 | RMSE: 22.17
RF t+3 -> MAE: 15.82 | RMSE: 22.97
CatBoost t+3 -> MAE: 15.69 | RMSE: 22.82
     Modelo Horizonte        MAE       RMSE       Corte
0        RF       t+1  15.044909  22.065927  2016-08-21
1  CatBoost       t+1  15.161029  22.169002  2016-08-21
2        RF       t+3  15.815195  22.965524  2016-08-03
3  CatBoost       t+3  15.686285  22.824318  2016-08-03


In [10]:
import matplotlib.pyplot as plt
fig, ax = plt.subplots(1,2, figsize=(10,4))
subset_t1 = cmp[cmp["Horizonte"]=="t+1"]
subset_t3 = cmp[cmp["Horizonte"]=="t+3"]

ax[0].bar(subset_t1["Modelo"], subset_t1["MAE"])
ax[0].set_title("MAE por modelo (t+1)"); ax[0].set_ylabel("MAE (mm)")
ax[1].bar(subset_t3["Modelo"], subset_t3["MAE"])
ax[1].set_title("MAE por modelo (t+3)")
plt.tight_layout()
plt.savefig("reports/figures/compare_models_mae.png", dpi=200); plt.close()

fig, ax = plt.subplots(1,2, figsize=(10,4))
ax[0].bar(subset_t1["Modelo"], subset_t1["RMSE"])
ax[0].set_title("RMSE por modelo (t+1)"); ax[0].set_ylabel("RMSE (mm)")
ax[1].bar(subset_t3["Modelo"], subset_t3["RMSE"])
ax[1].set_title("RMSE por modelo (t+3)")
plt.tight_layout()
plt.savefig("reports/figures/compare_models_rmse.png", dpi=200); plt.close()


In [11]:
# Reentrenar RF en t+1 para extraer importancias (sobre train completo)
rf.fit(X1[tr1], y1[tr1])
importances = rf.named_steps["rf"].feature_importances_
imp_df = pd.DataFrame({"feature": feats1, "importance": importances}).sort_values("importance", ascending=False)
imp_df.head(20).to_csv("reports/tables/feature_importances_t1.csv", index=False)

plt.figure(figsize=(8,6))
topk = imp_df.head(15).iloc[::-1]  # invertimos para barra horizontal
plt.barh(topk["feature"], topk["importance"])
plt.title("Importancias de variables (RF, t+1)"); plt.tight_layout()
plt.savefig("reports/figures/feature_importances_t1.png", dpi=200); plt.close()


In [40]:
# ====== IMPORTS ======
from pathlib import Path
import numpy as np
import pandas as pd
import geopandas as gpd
import matplotlib.pyplot as plt

# ====== PATHS ======
PATH_PARQUET = Path("data/processed/ven_rainfall_adm2.parquet")   # histórico CHIRPS limpio
PATH_GEO_IN  = Path("data/external/ven_adm2_with_preds_ONEPERMUNI.geojson")  # tu GeoJSON con Pred_mm
PATH_GEO_OUT = Path("data/external/ven_adm2_with_preds_ONEPERMUNI_ENRIQUECIDO.geojson")

Path("reports/figures").mkdir(parents=True, exist_ok=True)
Path("reports/tables").mkdir(parents=True, exist_ok=True)

assert PATH_PARQUET.exists(), f"No existe {PATH_PARQUET}"
assert PATH_GEO_IN.exists(), f"No existe {PATH_GEO_IN}"

# ====== 1) CARGA ======
df = pd.read_parquet(PATH_PARQUET)  # histórico
gdf = gpd.read_file(PATH_GEO_IN)    # predicciones actuales

# Normaliza nombres mínimos
df.columns = [c.strip() for c in df.columns]
gdf.columns = [c.strip() for c in gdf.columns]

# Fecha en ambos
if "date" in df.columns:
    df["date"] = pd.to_datetime(df["date"], errors="coerce")
elif "Date" in df.columns:
    df["date"] = pd.to_datetime(df["Date"], errors="coerce")
else:
    raise ValueError("No encuentro columna de fecha en el parquet (date/Date).")

if "Fecha_pred" not in gdf.columns:
    raise ValueError("El GeoJSON no tiene 'Fecha_pred'. Añádela antes de enriquecer.")
gdf["Fecha_pred"] = pd.to_datetime(gdf["Fecha_pred"], errors="coerce")

# Comprobaciones mínimas
for col in ["ADM2_PCODE"]:
    if col not in df.columns:
        raise ValueError(f"Falta {col} en parquet.")
    if col not in gdf.columns:
        raise ValueError(f"Falta {col} en GeoJSON.")

# Variable de lluvia base para climatología (ajusta si quieres otra)
# Usaremos rfh (mm por decádica) si existe; si no, r1h/r3h
cands_rain = [c for c in ["rfh", "r1h", "r3h"] if c in df.columns]
if not cands_rain:
    raise ValueError("No encuentro columnas de lluvia (rfh/r1h/r3h) en el parquet.")
rain_col = "rfh" if "rfh" in cands_rain else cands_rain[0]
print("Climatología basada en:", rain_col)

# ====== 2) CLIMATOLOGÍA (ADM2 x MES) ======
df["month"] = df["date"].dt.month

def safe_percentile(x, q):
    # evita problemas con NaN y series cortas
    x = x.dropna().values
    if len(x) == 0:
        return np.nan
    return float(np.percentile(x, q))

agg = (df
       .groupby(["ADM2_PCODE", "month"], as_index=False)
       .agg(
           Media_hist_mm=(rain_col, "mean"),
           P05=(rain_col, lambda s: safe_percentile(s, 5)),
           P10=(rain_col, lambda s: safe_percentile(s, 10)),
           P20=(rain_col, lambda s: safe_percentile(s, 20)),
           P80=(rain_col, lambda s: safe_percentile(s, 80)),
           P90=(rain_col, lambda s: safe_percentile(s, 90)),
           P95=(rain_col, lambda s: safe_percentile(s, 95)),
           Sigma=(rain_col, "std")
       ))

# ====== 3) FUSIÓN GEOJSON + CLIMATOLOGÍA POR MES ======
gdf["month"] = gdf["Fecha_pred"].dt.month
enriq = gdf.merge(agg, on=["ADM2_PCODE","month"], how="left")

# ====== 4) CALCULAR ÍNDICES Y ALERTA ======
# Diff_pct
enriq["Diff_pct"] = np.where(
    enriq["Media_hist_mm"].gt(0),
    100.0 * (enriq["Pred_mm"] - enriq["Media_hist_mm"]) / enriq["Media_hist_mm"],
    np.nan
)

# SPI-like
enriq["SPI_like"] = np.where(
    enriq["Sigma"].fillna(0).abs() > 1e-9,
    (enriq["Pred_mm"] - enriq["Media_hist_mm"]) / enriq["Sigma"],
    np.nan
)

# Categorías por percentiles
def clasificar_alerta(row):
    p05, p10, p20 = row["P05"], row["P10"], row["P20"]
    p80, p90, p95 = row["P80"], row["P90"], row["P95"]
    pred = row["Pred_mm"]

    if pd.isna(pred) or pd.isna(p05) or pd.isna(p95):
        return "Sin dato"

    # Precedencia: extremos -> intensos/severos -> moderados -> normal
    if pred <= p05:
        return "Sequía extrema"
    if pred >= p95:
        return "Lluvia extrema"
    if not pd.isna(p10) and pred <= p10:
        return "Sequía severa"
    if not pd.isna(p90) and pred >= p90:
        return "Lluvia intensa"
    if not pd.isna(p20) and not pd.isna(p80) and (pred <= p20 or pred >= p80):
        # si cae fuera de [P20, P80] sin llegar a severa/intensa
        # distinguimos por el lado, solo informativo (puedes poner "Moderada")
        return "Sequía moderada" if pred <= p20 else "Lluvia moderada"
    return "Normal"

enriq["Alerta"] = enriq.apply(clasificar_alerta, axis=1)

# ====== 5) GUARDAR GEOJSON ENRIQUECIDO ======
# Conserva la geometría y escribe las nuevas properties
enriq_gdf = gpd.GeoDataFrame(enriq, geometry="geometry", crs=gdf.crs)
enriq_gdf.to_file(PATH_GEO_OUT, driver="GeoJSON")
print(f"OK -> GeoJSON enriquecido: {PATH_GEO_OUT}")

# ====== 6) TABLA DE ALERTAS POR ESTADO + GRÁFICO ======
# Detecta columna de Estado en el GeoJSON
cols_lower = {c.lower(): c for c in enriq_gdf.columns}
col_estado = None
for c in ["estado", "state", "adm1_name", "adm1", "name_1"]:
    if c in cols_lower: 
        col_estado = cols_lower[c]; break
if col_estado is None:
    # en tus columnas viene "Estado"
    if "Estado" in enriq_gdf.columns:
        col_estado = "Estado"
    else:
        raise ValueError("No encuentro columna de Estado en el GeoJSON.")

tabla = (
    enriq_gdf[[col_estado, "Alerta"]]
    .dropna()
    .groupby([col_estado, "Alerta"]).size()
    .unstack(fill_value=0)
)

# Orden sugerido de columnas
orden = ["Sequía extrema","Sequía severa","Sequía moderada","Normal","Lluvia moderada","Lluvia intensa","Lluvia extrema"]
cols_presentes = [c for c in orden if c in tabla.columns]
otras = [c for c in tabla.columns if c not in cols_presentes]
tabla = tabla.reindex(columns=cols_presentes + otras, fill_value=0)

tabla["Total"] = tabla.sum(axis=1)
tabla = tabla.sort_values("Total", ascending=False)

tabla.reset_index().rename(columns={col_estado:"Estado"}).to_csv(
    "reports/tables/alertas_por_estado.csv", index=False, encoding="utf-8-sig"
)
print("OK -> CSV: reports/tables/alertas_por_estado.csv")

# Gráfico apilado Top-12
top_states = tabla.head(12).copy()
cats = [c for c in top_states.columns if c != "Total"]

plt.figure(figsize=(12,5))
bottom = np.zeros(len(top_states))
for cat in cats:
    plt.bar(top_states.index.astype(str), top_states[cat].values, bottom=bottom, label=str(cat))
    bottom += top_states[cat].values
plt.xticks(rotation=45, ha="right")
plt.title("Alertas por Estado (top 12)")
plt.ylabel("Nº de municipios")
plt.legend(ncol=3, fontsize=8)
plt.tight_layout()
plt.savefig("reports/figures/alertas_por_estado_stacked.png", dpi=200)
plt.close()
print("OK -> PNG: reports/figures/alertas_por_estado_stacked.png")


Climatología basada en: rfh
OK -> GeoJSON enriquecido: data\external\ven_adm2_with_preds_ONEPERMUNI_ENRIQUECIDO.geojson
OK -> CSV: reports/tables/alertas_por_estado.csv
OK -> PNG: reports/figures/alertas_por_estado_stacked.png


In [21]:
import requests, pandas as pd
url = "https://data.humdata.org/dataset/c136cc6e-573e-43b3-b7db-6e7e9648fd60/resource/8ca18230-5d6c-477c-a5a5-6b9ceebc6a5f/download/ven-rainfall-subnat-full.csv"
r = requests.get(url)
open("data/raw/ven_rainfall_latest.csv","wb").write(r.content)

df_new = pd.read_csv("data/raw/ven_rainfall_latest.csv")


  df_new = pd.read_csv("data/raw/ven_rainfall_latest.csv")


In [5]:
# tfm_pipeline.py
# -*- coding: utf-8 -*-
"""
TFM – Predicción de precipitación y alertas por municipio (ADM2) en Venezuela
- Descarga incremental desde HDX (CSV URL)
- Parquet histórico deduplicado
- RandomForest t+1 y t+3 con validación temporal
- Predicciones + Climatología mensual + SPI-like + Alertas
- Mapa Folium (JSON/Pandas) garantizando TODAS las geometrías del GADM base
"""

from __future__ import annotations

import re, json, argparse, warnings, glob
from pathlib import Path
from datetime import datetime
import unicodedata
import numpy as np
import pandas as pd

# ML
from sklearn.pipeline import Pipeline
from sklearn.ensemble import RandomForestRegressor
from sklearn.impute import SimpleImputer
from sklearn.metrics import mean_absolute_error
import joblib

# Folium (solo para render del HTML)
try:
    import folium
    HAS_FOLIUM = True
except Exception:
    HAS_FOLIUM = False

warnings.filterwarnings("ignore", category=FutureWarning)
pd.set_option("display.max_columns", 120)

# =========================
# CONFIG
# =========================
CFG = {
    "source_url": "https://data.humdata.org/dataset/c136cc6e-573e-43b3-b7db-6e7e9648fd60/resource/8ca18230-5d6c-477c-a5a5-6b9ceebc6a5f/download/ven-rainfall-subnat-full.csv",
    "dirs": {
        "raw":       "data/raw",
        "processed": "data/processed",
        "external":  "data/external",
        "models":    "models",
        "reports":   "reports",
        "figures":   "reports/figures",
        "tables":    "reports/tables",
    },
    "files": {
        "parquet":        "data/processed/ven_rainfall_adm2.parquet",
        "predictions":    "data/processed/predicciones.csv",
        "preds_with_meta":"data/processed/predicciones_enriquecidas.csv",
        "model_t1":       "models/rf_t+1.joblib",
        "model_t3":       "models/rf_t+3.joblib",

        # Geo base (GADM) y mapping opcional
        "gadm_json": "data/external/gadm41_VEN_2.json",
        "mapping_csv": "data/external/adm2_pcode_to_gid2.csv",

        # RAW opcional para medias históricas
        "raw_hist_csv": "data/raw/ven-rainfall-adm2-full.csv",

        # GeoJSON de salida (siempre con TODAS las geometrías)
        "geojson_out": "data/external/ven_adm2_with_preds.geojson",

        # Mapa Folium final
        "map_html":"reports/figures/mapa_predicciones.html",
    },
    "random_state": 42,
    "train_subsample": 250_000,
}

FEATURE_BASES = ["rfh","rfh_avg","r1h","r1h_avg","r3h","r3h_avg","rfq","r1q","r3q"]
LAGS = [1, 2, 7, 14]
CALENDAR_COLS = ["month","doy","year"]

# =========================
# UTILIDADES
# =========================
def ensure_dirs():
    for d in CFG["dirs"].values():
        Path(d).mkdir(parents=True, exist_ok=True)

def _print_h(msg: str):
    print("\n=== " + msg + " ===")

def normalize_code(x):
    if pd.isna(x):
        return np.nan
    s = str(x).strip().upper()
    s = re.sub(r"\s+", "", s)
    return s

def normalize_name(s):
    if pd.isna(s):
        return np.nan
    s = str(s).strip().upper()
    s = unicodedata.normalize("NFKD", s).encode("ascii", "ignore").decode("ascii")
    s = re.sub(r"\s+", " ", s)
    return s

def coerce_num(s):
    if pd.isna(s):
        return np.nan
    try:
        return pd.to_numeric(str(s).replace(",", "."), errors="coerce")
    except Exception:
        return np.nan

def _to_dt(s, utc: bool = False, fmt: str | None = None):
    if fmt:
        return pd.to_datetime(s, format=fmt, errors="coerce", utc=utc)
    return pd.to_datetime(s, errors="coerce", utc=utc)

def _rmse(y_true, y_pred) -> float:
    y_true = np.asarray(y_true, dtype="float64")
    y_pred = np.asarray(y_pred, dtype="float64")
    return float(np.sqrt(np.mean((y_true - y_pred) ** 2)))

# =========================
# INGESTA / ACTUALIZACIÓN
# =========================
def download_latest() -> Path:
    ensure_dirs()
    import requests
    url = CFG["source_url"]
    ts = datetime.now().strftime("%Y%m%d_%H%M%S")
    out = Path(CFG["dirs"]["raw"]) / f"ven_rainfall_hdx_{ts}.csv"

    _print_h("Descargando CSV desde HDX (requests)")
    headers = {"User-Agent": "TFM-DS-UCM/1.0 (ingestion)"}
    r = requests.get(url, headers=headers, timeout=180)
    r.raise_for_status()
    out.write_bytes(r.content)
    print(f"Guardado: {out}")
    return out

def update_parquet(raw_csv: Path) -> Path:
    ensure_dirs()
    pq = Path(CFG["files"]["parquet"])

    _print_h("Actualizando parquet histórico")
    df = pd.read_csv(raw_csv, low_memory=False)

    # Fecha
    if "#date" in df.columns:
        df = df.rename(columns={"#date": "date"})
    elif "date" not in df.columns:
        date_alias = [c for c in df.columns if str(c).lower() in {"date","fecha"}]
        if date_alias:
            df = df.rename(columns={date_alias[0]: "date"})
        else:
            raise ValueError("No encuentro columna de fecha en el CSV.")
    df["date"] = _to_dt(df["date"], fmt="%Y-%m-%d")
    df = df[df["date"].notna()].copy()

    # ADM2_PCODE
    if "ADM2_PCODE" not in df.columns:
        cand = [c for c in df.columns if "ADM2" in c.upper() and "PCODE" in c.upper()]
        if cand:
            df = df.rename(columns={cand[0]: "ADM2_PCODE"})
            print(f"[info] Renombrada clave '{cand[0]}' -> 'ADM2_PCODE'")
        elif "PCODE" in df.columns:
            df = df.rename(columns={"PCODE": "ADM2_PCODE"})
            print("[info] Renombrada clave 'PCODE' -> 'ADM2_PCODE'")
        else:
            cand = [c for c in df.columns if str(c).lower() in {"adm2_code","adm2_pcod","pcode"}]
            if cand:
                df = df.rename(columns={cand[0]: "ADM2_PCODE"})
                print(f"[info] Renombrada clave '{cand[0]}' -> 'ADM2_PCODE'")
            else:
                raise ValueError("Falta ADM2_PCODE en el CSV.")
    df["ADM2_PCODE"] = df["ADM2_PCODE"].apply(normalize_code)

    # Númericas típicas
    for c in FEATURE_BASES + ["n_pixels"]:
        if c in df.columns:
            df[c] = df[c].apply(coerce_num).astype("float32")

    if "n_pixels" in df.columns:
        df = df[df["n_pixels"] > 0].copy()

    # Append incremental + deduplicado
    if pq.exists():
        old = pd.read_parquet(pq)
        old["ADM2_PCODE"] = old["ADM2_PCODE"].apply(normalize_code)
        old["date"] = _to_dt(old["date"])
        all_df = pd.concat([old, df], ignore_index=True)
    else:
        all_df = df.copy()

    all_df = (all_df.sort_values(["ADM2_PCODE","date"])
                    .drop_duplicates(subset=["ADM2_PCODE","date"], keep="last"))

    # Evitar problemas Arrow con object
    for c in all_df.columns:
        if all_df[c].dtype == "object":
            all_df[c] = all_df[c].astype(str)

    all_df.to_parquet(pq, index=False)
    print(f"Parquet actualizado -> {pq} (filas: {len(all_df):,})")
    return pq

# =========================
# FEATURES / MODELOS
# =========================
def _add_group_lags(g: pd.DataFrame, bases: list[str], lags: list[int]) -> pd.DataFrame:
    g = g.sort_values("date").copy()
    for b in bases:
        if b in g.columns:
            for L in lags:
                g[f"{b}_lag{L}"] = g[b].shift(L)
    return g

def _choose_rain_var(df: pd.DataFrame) -> str:
    for v in ["rfh","rfh_avg","r1h_avg","r3h_avg","r1h","r3h"]:
        if v in df.columns:
            return v
    raise ValueError("No encuentro variable de lluvia.")

def _build_feature_table(df: pd.DataFrame) -> pd.DataFrame:
    gcols = ["ADM2_PCODE","date"]
    use_cols = [c for c in df.columns if c in set(FEATURE_BASES + gcols)]
    base = df[use_cols].copy()
    base["month"] = base["date"].dt.month.astype("int16")
    base["year"]  = base["date"].dt.year.astype("int16")
    base["doy"]   = base["date"].dt.dayofyear.astype("int16")
    base = (base.groupby("ADM2_PCODE", group_keys=False)
                  .apply(lambda g: _add_group_lags(g, FEATURE_BASES, LAGS)))
    return base

def _select_available_features(df: pd.DataFrame) -> list[str]:
    feats = []
    for b in FEATURE_BASES:
        for L in LAGS:
            col = f"{b}_lag{L}"
            if col in df.columns:
                feats.append(col)
    for c in CALENDAR_COLS:
        if c in df.columns:
            feats.append(c)
    return feats

def _train_rf(X, y) -> Pipeline:
    rf = RandomForestRegressor(
        n_estimators=180, max_depth=12, min_samples_leaf=2,
        bootstrap=True, max_samples=0.6, random_state=CFG["random_state"], n_jobs=-1
    )
    pipe = Pipeline([("imp", SimpleImputer(strategy="median")),
                     ("rf", rf)])
    pipe.fit(X, y)
    return pipe

def _temporal_split(df: pd.DataFrame, test_frac=0.2):
    cutoff = df["date"].quantile(1 - test_frac)
    mask_tr = df["date"] <= cutoff
    mask_te = ~mask_tr
    return mask_tr, mask_te, cutoff

def _features_path(suffix: str) -> Path:
    Path(CFG["dirs"]["models"]).mkdir(parents=True, exist_ok=True)
    return Path(CFG["dirs"]["models"]) / f"features_{suffix}.json"

def _save_feature_spec(suffix: str, feature_cols: list[str]) -> None:
    with open(_features_path(suffix), "w", encoding="utf-8") as f:
        json.dump({"features": feature_cols}, f, ensure_ascii=False, indent=2)

def _load_feature_spec(suffix: str) -> list[str] | None:
    p = _features_path(suffix)
    if p.exists():
        with open(p, "r", encoding="utf-8") as f:
            return json.load(f).get("features")
    return None

def _align_features_for_inference(X: pd.DataFrame, expected_cols: list[str]) -> pd.DataFrame:
    X = X.copy()
    for c in expected_cols:
        if c not in X.columns:
            X[c] = np.nan
    return X[expected_cols]

def train_models(parquet_path: Path) -> tuple[Pipeline, Pipeline, list[str]]:
    _print_h("Entrenamiento modelos t+1 y t+3")
    df = pd.read_parquet(parquet_path)
    df["date"] = _to_dt(df["date"])
    df = df.sort_values(["ADM2_PCODE","date"]).reset_index(drop=True)

    feat_df = _build_feature_table(df)
    feature_cols = _select_available_features(feat_df)

    y_base = _choose_rain_var(df)
    feat_df["y_t1"] = feat_df.groupby("ADM2_PCODE")[y_base].shift(-1)
    feat_df["y_t3"] = feat_df.groupby("ADM2_PCODE")[y_base].shift(-3)

    cols = ["ADM2_PCODE","date"] + feature_cols + ["y_t1","y_t3"]
    train_df = feat_df[cols].dropna(subset=feature_cols + ["y_t1"]).copy()

    tr, te, cutoff = _temporal_split(train_df)

    rs = CFG["random_state"]
    tr_idx = train_df[tr].index
    if len(tr_idx) > CFG["train_subsample"]:
        tr_idx = np.random.RandomState(rs).choice(tr_idx, size=CFG["train_subsample"], replace=False)

    X_tr = train_df.loc[tr_idx, feature_cols].astype("float32")
    y_tr = train_df.loc[tr_idx, "y_t1"].astype("float32")
    X_te = train_df.loc[te, feature_cols].astype("float32")
    y_te = train_df.loc[te, "y_t1"].astype("float32")

    rf_t1 = _train_rf(X_tr, y_tr)
    y_hat = rf_t1.predict(X_te)
    mae = mean_absolute_error(y_te, y_hat)
    rmse = _rmse(y_te, y_hat)
    print(f"[t+1] cutoff={cutoff.date()}  MAE={mae:.2f}  RMSE={rmse:.2f}  n_train={len(X_tr):,}  n_test={len(X_te):,}")

    mask3_tr = ~train_df.loc[tr_idx, "y_t3"].isna()
    mask3_te = ~train_df.loc[te, "y_t3"].isna()
    rf_t3 = _train_rf(X_tr[mask3_tr], train_df.loc[tr_idx, "y_t3"].astype("float32")[mask3_tr])
    y_hat3 = rf_t3.predict(X_te[mask3_te])
    mae3 = mean_absolute_error(train_df.loc[te, "y_t3"][mask3_te], y_hat3)
    rmse3 = _rmse(train_df.loc[te, "y_t3"][mask3_te], y_hat3)
    print(f"[t+3] cutoff={cutoff.date()}  MAE={mae3:.2f}  RMSE={rmse3:.2f}  n_train={mask3_tr.sum():,}  n_test={mask3_te.sum():,}")

    Path(CFG["dirs"]["models"]).mkdir(parents=True, exist_ok=True)
    joblib.dump(rf_t1, CFG["files"]["model_t1"])
    joblib.dump(rf_t3, CFG["files"]["model_t3"])
    _save_feature_spec("t1", feature_cols)
    _save_feature_spec("t3", feature_cols)
    return rf_t1, rf_t3, feature_cols

def _load_or_train_models(parquet_path: Path, *, force_retrain: bool = False) -> tuple[Pipeline, Pipeline, list[str]]:
    f1, f3 = Path(CFG["files"]["model_t1"]), Path(CFG["files"]["model_t3"])
    spec_t1 = _load_feature_spec("t1")
    spec_t3 = _load_feature_spec("t3")
    have_models = f1.exists() and f3.exists()

    if not have_models and not force_retrain:
        raise FileNotFoundError(
            "No existen modelos entrenados en 'models/'. "
            "Ejecuta primero con --retrain para entrenar y guardarlos."
        )
    if force_retrain or not have_models or (spec_t1 is None or spec_t3 is None):
        return train_models(parquet_path)

    _print_h("Cargando modelos desde disco")
    return joblib.load(f1), joblib.load(f3), (spec_t1 or spec_t3 or [])

def predict_latest(parquet_path: Path, *, force_retrain: bool = False) -> pd.DataFrame:
    _print_h("Predicción t+1/t+3")
    df = pd.read_parquet(parquet_path)
    df["date"] = _to_dt(df["date"])
    df = df.sort_values(["ADM2_PCODE","date"]).reset_index(drop=True)

    feat_df = _build_feature_table(df)
    candidate_cols = _select_available_features(feat_df)

    idx_last = feat_df.groupby("ADM2_PCODE")["date"].idxmax()
    last = feat_df.loc[idx_last, ["ADM2_PCODE","date"] + candidate_cols].copy()

    rf_t1, rf_t3, spec_cols = _load_or_train_models(parquet_path, force_retrain=force_retrain)

    if spec_cols:
        X_last = _align_features_for_inference(last[candidate_cols], spec_cols).astype("float32")
    else:
        X_last = last[candidate_cols].astype("float32")

    last["Pred_t1_mm"] = rf_t1.predict(X_last).astype("float32")
    last["Pred_t3_mm"] = np.nan
    try:
        last.loc[:, "Pred_t3_mm"] = rf_t3.predict(X_last).astype("float32")
    except Exception as e:
        print(f"[warn] t+3 no pudo predecir en todos los ADM2: {e}")

    out = last.rename(columns={"date":"Fecha_base"}).copy()
    out["Fecha_pred"] = out["Fecha_base"] + pd.Timedelta(days=10)
    out.rename(columns={"Pred_t1_mm":"Pred_mm"}, inplace=True)
    out = out[["ADM2_PCODE","Fecha_pred","Pred_mm","Pred_t3_mm","Fecha_base"]]
    out.to_csv(CFG["files"]["predictions"], index=False, encoding="utf-8-sig")
    print(f"Guardado -> {CFG['files']['predictions']}  ({len(out):,} filas)")
    return out

# =========================
# CLIMATOLOGÍA / ALERTAS
# =========================
def compute_climatology(parquet_path: Path) -> pd.DataFrame:
    _print_h("Climatología mensual")
    df = pd.read_parquet(parquet_path)
    df["date"] = _to_dt(df["date"])
    df["month"] = df["date"].dt.month.astype("int16")
    var = _choose_rain_var(df)
    tmp = df[["ADM2_PCODE","month",var]].dropna().copy()

    def _agg(g):
        x = g[var].astype(float)
        return pd.Series({
            "Media_hist_mm": x.mean(),
            "P05": np.percentile(x, 5),
            "P10": np.percentile(x,10),
            "P20": np.percentile(x,20),
            "P80": np.percentile(x,80),
            "P90": np.percentile(x,90),
            "P95": np.percentile(x,95),
            "Std": x.std(ddof=0)
        })

    clim = tmp.groupby(["ADM2_PCODE","month"], as_index=False).apply(_agg)
    return clim.reset_index(drop=True)

def enrich_predictions_with_climatology(pred_df: pd.DataFrame, clim_df: pd.DataFrame,
                                        parquet_path: Path) -> pd.DataFrame:
    pred = pred_df.copy()
    pred["month"] = pd.to_datetime(pred["Fecha_pred"]).dt.month.astype("int16")
    enr = pred.merge(clim_df, on=["ADM2_PCODE","month"], how="left")

    enr["SPI_like"] = np.where(
        enr["Std"].fillna(0) > 0,
        (enr["Pred_mm"] - enr["Media_hist_mm"]) / enr["Std"],
        np.nan
    )
    enr["Diff_pct"] = np.where(
        enr["Media_hist_mm"].fillna(0) > 0,
        100.0 * (enr["Pred_mm"] - enr["Media_hist_mm"]) / enr["Media_hist_mm"],
        np.nan
    )

    hist = pd.read_parquet(parquet_path)
    meta_cols = [c for c in ["Estado","Municipio"] if c in hist.columns]
    if meta_cols:
        ids = (hist.sort_values("date")
                    .groupby("ADM2_PCODE", as_index=False).tail(1)[["ADM2_PCODE"] + meta_cols]
                    .drop_duplicates("ADM2_PCODE"))
        enr = enr.merge(ids, on="ADM2_PCODE", how="left")

    def _alerta(row):
        v, p05, p10, p20, p80, p90, p95 = (
            row["Pred_mm"], row["P05"], row["P10"], row["P20"], row["P80"], row["P90"], row["P95"]
        )
        if pd.isna(v) or pd.isna(p05) or pd.isna(p95):
            return "Sin dato"
        if v >= p95: return "Lluvia extrema"
        if v >= p90: return "Lluvia intensa"
        if v >  p80: return "Lluvia moderada"
        if v <= p05: return "Sequía extrema"
        if v <= p10: return "Sequía severa"
        if v <= p20: return "Sequía moderada"
        return "Normal"

    enr["Alerta"] = enr.apply(_alerta, axis=1)

    cols_out = ["ADM2_PCODE","Fecha_pred","Pred_mm","Media_hist_mm","Diff_pct","SPI_like","Alerta","month","Pred_t3_mm"]
    cols_out += [c for c in ["Estado","Municipio"] if c in enr.columns]
    enr_out = enr[cols_out].copy()

    Path(CFG["dirs"]["processed"]).mkdir(parents=True, exist_ok=True)
    enr_out.to_csv(CFG["files"]["preds_with_meta"], index=False, encoding="utf-8-sig")
    print(f"Guardado -> {CFG['files']['preds_with_meta']}  ({len(enr_out):,} filas)")
    return enr_out

# =========================
# MAPA (100% JSON/Pandas: garantiza TODAS las geometrías)
# =========================
def _gid2_state_id(gid2: str):
    if not isinstance(gid2, str): return None
    m = re.match(r"^VEN\.(\d+)\.", gid2.upper().strip())
    return int(m.group(1)) if m else None

def _load_mapping() -> pd.DataFrame | None:
    mp = Path(CFG["files"]["mapping_csv"])
    if not mp.exists():
        return None
    try:
        m = pd.read_csv(mp)
        m.columns = [c.strip().upper() for c in m.columns]
        assert "ADM2_PCODE" in m.columns and "GID_2" in m.columns
        m["ADM2_PCODE"] = m["ADM2_PCODE"].astype(str).str.strip().str.upper()
        m["GID_2"] = m["GID_2"].astype(str).str.strip().str.upper()
        return m.rename(columns={"ADM2_PCODE":"ADM2_PCODE", "GID_2":"GID_2"})
    except Exception as e:
        print("[map] Aviso: mapping inválido:", e)
        return None

def _row_norm_names(p: dict) -> tuple[str,str]:
    n1 = p.get("NAME_1")
    n2 = p.get("NAME_2")
    return normalize_name(n1), normalize_name(n2)

def _find_pred_row(code: str, gid2: str, n1n: str, n2n: str, lut_code, lut_name, map_g2c):
    # 1) Por ADM2_PCODE directo
    if code and code in lut_code:
        return lut_code[code]
    # 2) Por GID_2 -> ADM2_PCODE
    if gid2:
        code2 = map_g2c.get(gid2)
        if code2 and code2 in lut_code:
            return lut_code[code2]
    # 3) Por (NAME_1_n, NAME_2_n)
    if n1n and n2n:
        key = (n1n, n2n)
        if key in lut_name:
            return lut_name[key]
    return None

def build_map_geojson(enr_df: pd.DataFrame) -> Path | None:
    """
    Ensambla SIEMPRE todas las geometrías del GADM base y les inyecta Pred_mm/Media/Diff_pct.
    No usa GeoPandas; no depende de cobertura de mapping para incluir geometrías.
    """
    _print_h("Construyendo GeoJSON con TODAS las geometrías (JSON/Pandas)")

    # 1) Cargar GADM base
    gadm_path = Path(CFG["files"]["gadm_json"])
    if not gadm_path.exists():
        print(f"[map] No existe {gadm_path}.")
        return None
    with open(gadm_path, "r", encoding="utf-8") as f:
        fc = json.load(f)
    feats = fc.get("features", [])
    if not feats:
        print("[map] GeoJSON base sin features.")
        return None

    # 2) Predicciones enriquecidas
    preds_path = Path(CFG["files"]["preds_with_meta"])
    if not preds_path.exists():
        print(f"[map] No existe {preds_path}.")
        return None
    preds = pd.read_csv(preds_path, low_memory=False)
    # normaliza
    preds["ADM2_PCODE"] = preds["ADM2_PCODE"].astype(str).str.strip().str.upper()
    # nombres normalizados para join por nombre si estuvieran
    for col_src, col_norm in [("Estado","NAME_1_n"), ("Municipio","NAME_2_n")]:
        if col_src in preds.columns and col_norm not in preds.columns:
            preds[col_norm] = preds[col_src].apply(normalize_name)
    if "NAME_1_n" not in preds.columns:
        preds["NAME_1_n"] = np.nan
    if "NAME_2_n" not in preds.columns:
        preds["NAME_2_n"] = np.nan

    # 3) Mapping opcional GID_2<->ADM2_PCODE
    mapping = _load_mapping()
    map_g2c = {}
    if mapping is not None:
        map_g2c = dict(zip(mapping["GID_2"], mapping["ADM2_PCODE"]))

    # 4) Luts para búsqueda rápida
    # por código
    lut_code = {r.ADM2_PCODE: r for r in preds.itertuples(index=False)}
    # por nombre
    lut_name = {(r.NAME_1_n, r.NAME_2_n): r for r in preds.itertuples(index=False)
                if isinstance(r.NAME_1_n, str) and isinstance(r.NAME_2_n, str)}

    # 5) Recorremos TODAS las features y rellenamos properties
    for feat in feats:
        p = feat.get("properties", {})
        # claves base
        gid2 = str(p.get("GID_2","") or "").strip().upper()
        # si trae ADM2_PCODE en el GADM base, normalizarla
        code = normalize_code(p.get("ADM2_PCODE")) if p.get("ADM2_PCODE") else None
        n1n, n2n = _row_norm_names(p)

        # si no hay ADM2_PCODE en el GADM, intenta via mapping por GID_2
        if not code and gid2 and gid2 in map_g2c:
            code = map_g2c[gid2]

        # elegimos la fila de predicción
        row = _find_pred_row(code, gid2, n1n, n2n, lut_code, lut_name, map_g2c)

        # aseguramos ADM2_PCODE en properties (clave del choropleth)
        if code:
            p["ADM2_PCODE"] = code
        elif row is not None:
            p["ADM2_PCODE"] = row.ADM2_PCODE
        else:
            # última opción: sintetizar por nombres (igual al código que te funcionaba)
            p["ADM2_PCODE"] = f"{n1n}|{n2n}"

        # métricas
        if row is not None:
            for k in ["Fecha_pred","Pred_mm","Media_hist_mm","Diff_pct","SPI_like","Alerta"]:
                if k in preds.columns:
                    val = getattr(row, k, None)
                    if pd.notna(val):
                        if isinstance(val, (np.floating, float)):
                            p[k] = float(val)
                        else:
                            p[k] = val
        # deja NAME_1/NAME_2 (útil para tooltip)
        if "NAME_1" in p and "Estado" not in p:
            p["Estado"] = p["NAME_1"]
        if "NAME_2" in p and "Municipio" not in p:
            p["Municipio"] = p["NAME_2"]

    # 6) DataFrame de props para completar medias/fills como en tu script
    props_df = pd.DataFrame([f["properties"] for f in feats])
    # normalización
    props_df["ADM2_PCODE"] = props_df["ADM2_PCODE"].astype(str).str.strip().str.upper()
    props_df["Pred_mm"] = pd.to_numeric(props_df.get("Pred_mm", np.nan), errors="coerce")
    props_df["Media_hist_mm"] = pd.to_numeric(props_df.get("Media_hist_mm", np.nan), errors="coerce")
    props_df["GID_2"] = props_df.get("GID_2", np.nan)
    props_df["STATE_ID"] = props_df["GID_2"].apply(_gid2_state_id)

    # 6.a) Completar Media_hist_mm con RAW opcional
    raw_path = Path(CFG["files"]["raw_hist_csv"])
    if raw_path.exists():
        try:
            h = pd.read_csv(raw_path, usecols=["ADM2_PCODE","r1q"], low_memory=False)
            h["ADM2_PCODE"] = h["ADM2_PCODE"].astype(str).str.strip().str.upper()
            h["r1q"] = pd.to_numeric(h["r1q"], errors="coerce")
            hist_mean = h.groupby("ADM2_PCODE", as_index=False)["r1q"].mean().rename(columns={"r1q":"Media_hist_mm_from_hist"})
            props_df = props_df.merge(hist_mean, on="ADM2_PCODE", how="left")
            props_df["Media_hist_mm"] = np.where(props_df["Media_hist_mm"].notna(),
                                                 props_df["Media_hist_mm"], props_df["Media_hist_mm_from_hist"])
            props_df.drop(columns=[c for c in props_df.columns if c.endswith("_from_hist")], inplace=True)
        except Exception as e:
            print("[map-post] Aviso RAW->media falló:", e)

    # 6.b) FILL A: usar Media_hist_mm cuando falte Pred_mm
    mask = props_df["Pred_mm"].isna() & props_df["Media_hist_mm"].notna()
    props_df.loc[mask, "Pred_mm"] = props_df.loc[mask, "Media_hist_mm"]

    # 6.c) FILL B: mediana por estado
    state_med = props_df.groupby("STATE_ID")["Pred_mm"].median()
    mask = props_df["Pred_mm"].isna() & props_df["STATE_ID"].notna()
    props_df.loc[mask, "Pred_mm"] = props_df.loc[mask, "STATE_ID"].map(state_med)

    # 6.d) FILL C: mediana nacional
    nat_med = float(props_df["Pred_mm"].median(skipna=True)) if props_df["Pred_mm"].notna().any() else 0.0
    props_df["Pred_mm"] = props_df["Pred_mm"].fillna(nat_med)

    # 6.e) Diff_pct
    props_df["Diff_pct"] = np.where(
        props_df["Media_hist_mm"].notna() & props_df["Pred_mm"].notna() & (props_df["Media_hist_mm"]!=0),
        100.0*(props_df["Pred_mm"] - props_df["Media_hist_mm"]) / props_df["Media_hist_mm"],
        np.nan
    )

    pred_ok = int(props_df["Pred_mm"].notna().sum())
    print(f"[map-post] ✅ Pred_mm cubierto: {pred_ok}/{len(props_df)}")

    # 7) Escribir de vuelta en el GeoJSON
    lut = props_df.set_index("ADM2_PCODE")
    for feat in feats:
        p = feat["properties"]
        code = str(p.get("ADM2_PCODE","")).strip().upper()
        if code in lut.index:
            row = lut.loc[code]
            # escribe métricas
            p["Pred_mm"] = float(row["Pred_mm"]) if pd.notna(row["Pred_mm"]) else None
            p["Media_hist_mm"] = float(row["Media_hist_mm"]) if pd.notna(row["Media_hist_mm"]) else None
            p["Diff_pct"] = float(row["Diff_pct"]) if pd.notna(row["Diff_pct"]) else None

    out_geo = Path(CFG["files"]["geojson_out"])
    out_geo.parent.mkdir(parents=True, exist_ok=True)
    with open(out_geo, "w", encoding="utf-8") as f:
        json.dump(fc, f, ensure_ascii=False)
    print(f"[map] GeoJSON listo con todas las geometrías -> {out_geo}")

    # 8) Render Folium desde ese mismo GeoJSON (choropleth por ADM2_PCODE)
    if HAS_FOLIUM:
        try:
            rows = [{"ADM2_PCODE": r["ADM2_PCODE"], "Pred_mm": r["Pred_mm"]} for r in props_df.to_dict("records")]
            vals = pd.DataFrame(rows)
            vals["ADM2_PCODE"] = vals["ADM2_PCODE"].astype(str).str.strip().str.upper()
            vals["Pred_mm"] = pd.to_numeric(vals["Pred_mm"], errors="coerce")

            m = folium.Map(location=[7.5, -66.0], zoom_start=5, tiles="cartodbpositron")

            folium.Choropleth(
                geo_data=str(out_geo),
                name="Predicción (mm)",
                data=vals,
                columns=["ADM2_PCODE", "Pred_mm"],
                key_on="feature.properties.ADM2_PCODE",
                fill_opacity=0.7,
                line_opacity=0.2,
                legend_name="Predicción (mm)"
            ).add_to(m)

            # Tooltip robusto: usa Estado/Municipio si están, si no NAME_1/NAME_2
            with open(out_geo, "r", encoding="utf-8") as f2:
                fc2 = json.load(f2)
            available = set(fc2["features"][0]["properties"].keys())
            campo_estado    = "Estado"    if "Estado"    in available else ("NAME_1" if "NAME_1" in available else None)
            campo_municipio = "Municipio" if "Municipio" in available else ("NAME_2" if "NAME_2" in available else None)
            tooltip_fields = [c for c in [campo_estado, campo_municipio, "ADM2_PCODE", "Fecha_pred", "Pred_mm", "Media_hist_mm", "Diff_pct"] if c and c in available]
            aliases = []
            for f in tooltip_fields:
                if f in ("Estado","NAME_1"): aliases.append("Estado")
                elif f in ("Municipio","NAME_2"): aliases.append("Municipio")
                elif f=="ADM2_PCODE": aliases.append("Código")
                elif f=="Fecha_pred": aliases.append("Fecha pred.")
                elif f=="Pred_mm": aliases.append("Predicción (mm)")
                elif f=="Media_hist_mm": aliases.append("Media hist. (mm)")
                elif f=="Diff_pct": aliases.append("Dif. vs media (%)")
                else: aliases.append(f)

            folium.GeoJson(
                str(out_geo),
                name="Detalle",
                tooltip=folium.GeoJsonTooltip(fields=tooltip_fields, aliases=aliases, localize=True, sticky=True),
                style_function=lambda x: {"weight": 0.2, "color": "black", "fillOpacity": 0.0}
            ).add_to(m)

            folium.LayerControl().add_to(m)
            out_html = Path(CFG["files"]["map_html"])
            out_html.parent.mkdir(parents=True, exist_ok=True)
            m.save(out_html.as_posix())
            print(f"[map] 🗺️  Mapa guardado en: {out_html}")
        except Exception as e:
            print(f"[map] Advertencia al renderizar mapa: {e}")
    else:
        print("[map] Folium no disponible: se omite render HTML.")

    return out_geo

# =========================
# RESÚMENES
# =========================
def export_state_summary(enr_df: pd.DataFrame):
    Path(CFG["dirs"]["tables"]).mkdir(parents=True, exist_ok=True)
    if "Estado" not in enr_df.columns:
        return
    tab = (enr_df[["Estado","Alerta"]]
           .dropna()
           .groupby(["Estado","Alerta"]).size()
           .unstack(fill_value=0)
           .sort_index())
    tab["Total"] = tab.sum(axis=1)
    tab = tab.sort_values("Total", ascending=False)
    out = Path(CFG["dirs"]["tables"]) / "alertas_por_estado.csv"
    tab.to_csv(out, encoding="utf-8-sig")
    print(f"Resumen por Estado -> {out}")

# =========================
# RUN
# =========================
def run(download: bool, predict: bool, build_map: bool, *, force_retrain: bool = False):
    ensure_dirs()
    pq_final = Path(CFG["files"]["parquet"])

    # 1) Descarga + actualización parquet
    if download:
        raw = download_latest()
        pq_final = update_parquet(raw)
    else:
        if not pq_final.exists():
            raise FileNotFoundError(
                f"No hay parquet histórico en {pq_final}. Ejecuta con --download la primera vez."
            )

    # 2) Predicción
    if predict:
        preds_df = predict_latest(pq_final, force_retrain=force_retrain)
    else:
        if Path(CFG["files"]["predictions"]).exists():
            preds_df = pd.read_csv(
                CFG["files"]["predictions"],
                parse_dates=["Fecha_pred","Fecha_base"],
                dayfirst=False, encoding="utf-8-sig"
            )
        else:
            preds_df = predict_latest(pq_final, force_retrain=force_retrain)

    # 3) Climatología + alertas
    clim_df = compute_climatology(pq_final)
    enr_df = enrich_predictions_with_climatology(preds_df, clim_df, pq_final)
    export_state_summary(enr_df)

    # 4) Mapa (si se pide)
    if build_map:
        build_map_geojson(enr_df)

    _print_h("Ejecución completa ✔")

# =========================
# CLI
# =========================
def parse_args(argv=None):
    p = argparse.ArgumentParser(
        description="TFM – Pipeline precipitación Venezuela (HDX -> parquet -> RF -> predicciones -> alertas -> mapa)",
        formatter_class=argparse.ArgumentDefaultsHelpFormatter,
    )
    p.add_argument("--download", action="store_true", help="Descargar y actualizar parquet histórico")
    p.add_argument("--predict",  action="store_true", help="Generar/actualizar predicciones")
    p.add_argument("--map",      action="store_true", help="Construir GeoJSON y mapa Folium")
    p.add_argument("--retrain",  action="store_true", help="Entrenar/actualizar modelos (no automático)")
    if argv is None:
        args, _ = p.parse_known_args()
    else:
        args = p.parse_args(argv)
    return args

if __name__ == "__main__":
    args = parse_args()
    do_download = args.download or (not any([args.download, args.predict, args.map]))
    do_predict  = args.predict  or (not any([args.download, args.predict, args.map]))
    do_map      = args.map      or (not any([args.download, args.predict, args.map]))
    run(download=do_download, predict=do_predict, build_map=do_map, force_retrain=args.retrain)



=== Descargando CSV desde HDX (requests) ===
Guardado: data\raw\ven_rainfall_hdx_20250910_082128.csv

=== Actualizando parquet histórico ===
[info] Renombrada clave 'PCODE' -> 'ADM2_PCODE'
Parquet actualizado -> data\processed\ven_rainfall_adm2.parquet (filas: 572,804)

=== Predicción t+1/t+3 ===

=== Cargando modelos desde disco ===
Guardado -> data/processed/predicciones.csv  (356 filas)

=== Climatología mensual ===
Guardado -> data/processed/predicciones_enriquecidas.csv  (356 filas)

=== Construyendo GeoJSON con TODAS las geometrías (JSON/Pandas) ===
[map-post] ✅ Pred_mm cubierto: 338/338
[map] GeoJSON listo con todas las geometrías -> data\external\ven_adm2_with_preds.geojson
[map] 🗺️  Mapa guardado en: reports\figures\mapa_predicciones.html

=== Ejecución completa ✔ ===
