In [1]:
import os
import re
import xarray as xr
import rioxarray
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from collections import defaultdict
from pathlib import Path
import time
from tqdm import tqdm

In [None]:
### funciones

def start_of_epi_year(Y: int) -> pd.Timestamp:
    dec29 = pd.Timestamp(Y - 1, 12, 29)
    jan4  = pd.Timestamp(Y, 1, 4)
    w = dec29.weekday()
    offset = (6 - w) % 7
    first_sunday = dec29 + pd.Timedelta(days=offset)
    if first_sunday > jan4:
        first_sunday -= pd.Timedelta(days=7)
    return first_sunday



def fecha_a_nombre(fecha: pd.Timestamp) -> str:
    """Convierte fecha calendario a nombre YYYYDDD.nc"""
    doy = fecha.timetuple().tm_yday
    return f"{fecha.year}{doy:03d}.nc"
def get_epi_year_week(date: pd.Timestamp) -> tuple[int, int]:
    date = pd.Timestamp(date)
    # ‚Äúdomingo epidemiol√≥gico‚Äù de la fecha
    offset = (date.weekday() + 1) % 7          # 0 = domingo
    sunday = date - pd.Timedelta(days=offset)

    next_start = start_of_epi_year(sunday.year + 1)
    epi_year   = sunday.year if sunday < next_start else sunday.year + 1

    start_year = start_of_epi_year(epi_year)
    epi_week   = ((sunday - start_year).days // 7) + 1
    return epi_year, epi_week


def process_mswep(path: str,
                  start_date: str | pd.Timestamp,
                  end_date: str | pd.Timestamp,
                  path_out: str,
                  var_name: str = "precipitation") -> xr.Dataset:
    """
    Procesa archivos NetCDF diarios MSWEP ya recortados en path,
    entre start_date y end_date (ej: "2013-06-01" a "2025-01-31").

    - Ignora archivos da√±ados o con coordenadas duplicadas.
    - Combina archivos v√°lidos.
    - Acumula por semana epidemiol√≥gica (coord: epi_week).
    - Guarda un NetCDF comprimido en path_out con engine="h5netcdf".

    Devuelve un Dataset con dims ("epi_week", "y", "x").
    """

    start_time = time.time()

    # --- Convertir fechas a rango ---
    fechas = pd.date_range(start=start_date, end=end_date, freq="D")
    expected_files = {fecha_a_nombre(dt): dt for dt in fechas}
    print(f"üìÖ Procesando rango {start_date} ‚Üí {end_date} ({len(fechas)} d√≠as)")

    # --- Buscar archivos existentes ---
    available = {f: os.path.join(path, f) for f in os.listdir(path) if f.endswith((".nc",".nc4"))}
    target_files, missing_files = [], []

    for fname, dt in expected_files.items():
        if fname in available:
            target_files.append(available[fname])
        else:
            missing_files.append(fname)

    print(f"‚úÖ {len(target_files)} archivos encontrados, ‚ùå {len(missing_files)} faltantes")

    # --- Validar archivos (descartar duplicados/da√±ados) ---
    valid_files, bad_files = [], []
    for f in target_files:
        try:
            with xr.open_dataset(f, engine="h5netcdf") as ds:
                if "lon" in ds.coords and len(np.unique(ds["lon"])) < ds.sizes["lon"]:
                    bad_files.append((f, "lon duplicado"))
                elif "lat" in ds.coords and len(np.unique(ds["lat"])) < ds.sizes["lat"]:
                    bad_files.append((f, "lat duplicado"))
                else:
                    valid_files.append(f)
        except Exception as e:
            bad_files.append((f, f"error: {str(e)}"))

    print(f"üìÇ Usando {len(valid_files)} archivos v√°lidos, ignorados {len(bad_files)}")
    if bad_files:
        for f, reason in bad_files:
            print(f"   ‚ö†Ô∏è {os.path.basename(f)} ‚Üí {reason}")

    if not valid_files:
        raise RuntimeError("No se encontr√≥ ning√∫n archivo v√°lido para procesar.")

    # --- Preproceso individual ---
    def preprocess(ds: xr.Dataset) -> xr.Dataset:
        rn = {}
        for d in ds.dims:
            dl = d.lower()
            if dl in ("longitude", "x"): rn[d] = "x"
            if dl in ("latitude",  "y"): rn[d] = "y"
        if rn:
            ds = ds.rename(rn)

        v = var_name if var_name in ds.data_vars else list(ds.data_vars)[0]
        ds = ds[[v]]

        # Garantizar 'time'
        if "time" not in ds.dims:
            src = ds.encoding.get("source", None)
            m = re.search(r'(\d{4})(\d{3})\.nc', os.path.basename(src)) if src else None
            if m:
                year, doy = int(m.group(1)), int(m.group(2))
                t = pd.to_datetime(f"{year}{doy:03d}", format="%Y%j")
                ds = ds.expand_dims(time=[t])
            else:
                raise ValueError(f"No se pudo inferir 'time' en {src}")
        return ds.transpose("time", ...)

    # --- Abrir todos los archivos v√°lidos ---
    ds_all = xr.open_mfdataset(
        valid_files,
        engine="h5netcdf",
        preprocess=preprocess,
        combine="nested",
        concat_dim="time",
        parallel=False,
        chunks={"time": 50},
        data_vars="minimal",
        coords="minimal",
        compat="override",
    ).sortby("time")

    # --- Ordenar dims ---
    for v in ds_all.data_vars:
        dims = ds_all[v].dims
        order = [d for d in ("time", "y", "x") if d in dims] + [d for d in dims if d not in ("time","y","x")]
        ds_all[v] = ds_all[v].transpose(*order)

    # --- Semana epidemiol√≥gica ---
    times = pd.to_datetime(ds_all.time.values)
    pairs = np.array([get_epi_year_week(t) for t in times])
    labels = np.array([f"{y}-{w:02d}" for y, w in pairs])
    ds_epi = ds_all.assign_coords(epi_week=("time", labels))

    # --- Acumulado semanal ---
    weekly_sum = ds_epi.groupby("epi_week").sum("time", skipna=True)

    if isinstance(weekly_sum, xr.DataArray):
        ds_final = weekly_sum.to_dataset(name=var_name)
    else:
        ds_final = weekly_sum.rename({list(weekly_sum.data_vars)[0]: var_name})

    # --- Nombre salida ---
    epi_labels = sorted(set(labels))
    epi_min, epi_max = epi_labels[0], epi_labels[-1]
    filename = os.path.join(path_out, f"mswep_{epi_min}_{epi_max}.nc")

    comp = dict(zlib=True, complevel=4)
    encoding = {v: comp for v in ds_final.data_vars}
    ds_final.to_netcdf(filename, format="NETCDF4", engine="h5netcdf", encoding=encoding)
    elapsed = time.time() - start_time
    print(f"\n‚úÖ Archivo guardado: {filename}")
    print(f"üïí Tiempo total: {elapsed:.2f} segundos ({elapsed/60:.2f} minutos)")
    return ds_final

def process_c3s(
    path: str,
    start_date: str | pd.Timestamp,
    end_date: str | pd.Timestamp,
    vars: dict[str, str],           # {'tmean': '2M_TEMPERATURE_MEAN', ...}
    path_out: str,
    crs: str = "EPSG:4326",
) -> xr.Dataset:
    """
    Une d√≠a por d√≠a los NetCDF de C3S que est√°n separados por carpeta/variable.
    Devuelve un Dataset con todas las variables concatenadas a lo largo de 'time'.
    """

    start_time = time.time()
    path = Path(path)
    fechas = pd.date_range(pd.Timestamp(start_date), pd.Timestamp(end_date), freq="D")

    diarios = []
    print(f"üì• Procesando {len(fechas)} fechas entre {start_date} y {end_date} desde: {path}")
    
    for dt in tqdm(fechas, desc="Procesando archivos C3S", unit="d√≠a"):
        fecha_str = dt.strftime("%Y%m%d")
        datasets_del_dia = []

        for alias, carpeta in vars.items():
            folder = path / carpeta
            patron = f"*{fecha_str}*"
            archivos = list(folder.rglob(patron))

            if not archivos:
                raise FileNotFoundError(
                    f"‚õî No se encontr√≥ archivo con fecha {fecha_str} en {folder}"
                )

            ds = xr.open_dataset(archivos[0], decode_coords="all")
            var_name = list(ds.data_vars)[0]
            ds = ds.rename({var_name: alias})

            if "time" not in ds.dims:
                ds = ds.expand_dims(time=[dt])
            else:
                ds = ds.assign_coords(time=("time", [dt]))

            datasets_del_dia.append(ds)

        ds_diario = xr.merge(datasets_del_dia, combine_attrs="override")
        diarios.append(ds_diario)

    datos = xr.concat(diarios, dim="time").sortby("time")

    # Asignar semanas epidemiol√≥gicas
    epi_weeks = []
    for ts in datos.time.dt.floor("D").values:
        epi_year, epi_week = get_epi_year_week(ts)
        epi_weeks.append(f"{epi_year}-{epi_week:02d}")

    datos_epi = datos.assign_coords(epi_week=("time", epi_weeks))
    c3s_mean = datos_epi.groupby(["epi_week"]).mean("time")

    if crs:
        try:
            c3s_mean = c3s_mean.rio.write_crs(crs, inplace=False)
        except ImportError:
            pass

    # Obtener rango de semanas para el nombre del archivo
    epi_tuples = pd.Series(epi_weeks).str.extract(r'(?P<year>\d{4})-(?P<week>\d{2})').astype(int)
    epi_tuples_list = list(zip(epi_tuples['year'], epi_tuples['week']))
    epi_tuples_sorted = sorted(epi_tuples_list)
    epi_min = f"{epi_tuples_sorted[0][0]}-{epi_tuples_sorted[0][1]:02d}"
    epi_max = f"{epi_tuples_sorted[-1][0]}-{epi_tuples_sorted[-1][1]:02d}"
    
    filename = os.path.join(path_out, f"c3s_{epi_min}_{epi_max}.nc") 
    c3s_mean.to_netcdf(filename, format="NETCDF4", engine="netcdf4")

    elapsed = time.time() - start_time
    print(f"\n‚úÖ Archivo guardado: {filename}")
    print(f"üïí Tiempo total: {elapsed:.2f} segundos ({elapsed/60:.2f} minutos)")

    return c3s_mean


def process_gee(
    path: Path,
    files: list[Path],
    shape: tuple[int, int],
    crs: str = "EPSG:4326"
) -> xr.DataArray:
    """
    Procesa una lista de archivos GeoTIFF diarios (NDVI, NDWI, etc.) provenientes de GEE.
    Agrega coordenadas temporales y CRS, luego agrega semanalmente por semana epidemiol√≥gica.
    
    Par√°metros:
    - path: Carpeta contenedora (solo usada para nombrar la variable final).
    - files: Lista de archivos GeoTIFF (uno por d√≠a).
    - shape: Dimensiones esperadas de los datos (alto, ancho).
    - crs: Sistema de referencia de coordenadas a usar.

    Retorna:
    - xr.DataArray con dimensi√≥n 'epi_week' y nombre igual al nombre de la carpeta.
    """
    start_time = time.time()

    if not files:
        raise ValueError(f"No se proporcionaron archivos para {path}")

    print(f"üìÇ Procesando {len(files)} archivos desde: {path}")

    # Obtener shape y coordenadas de referencia
    ref_coords_found = False
    for f_ref in files:
        try:
            ds_ref = rioxarray.open_rasterio(f_ref).squeeze("band", drop=True)
            if ds_ref.shape == shape:
                ref_x, ref_y = ds_ref.x.values, ds_ref.y.values
                ref_coords_found = True
                break
        except Exception as e:
            print(f"‚ö†Ô∏è Error leyendo {f_ref.name}: {e}")

    if not ref_coords_found:
        raise RuntimeError("No se encontr√≥ archivo con shape v√°lido para usar como referencia.")

    datasets = []
    archivos_excluidos = []

    for f in tqdm(files, desc="Procesando GeoTIFFs GEE", unit="archivo"):
        fecha = extraer_fecha_individual(f)
        try:
            ds = rioxarray.open_rasterio(f, chunks={"y": 200, "x": 200}).squeeze("band", drop=True)
            if ds.shape == shape:
                ds = ds.reset_coords(drop=True)
                ds = ds.assign_coords({"x": ref_x, "y": ref_y})
                ds = ds.expand_dims(time=[fecha])
                datasets.append(ds)
            else:
                print(f"‚õî Excluido (dimensiones incorrectas {ds.shape}): {f.name}")
                archivos_excluidos.append(f.name)
        except Exception as e:
            print(f"‚ö†Ô∏è Error leyendo {f.name}: {e}")
            archivos_excluidos.append(f.name)

    if not datasets:
        raise RuntimeError("No se pudo cargar ning√∫n archivo con dimensiones v√°lidas.")

    da_concat = xr.concat(datasets, dim="time")

    epi_weeks = []
    for ts in da_concat.time.dt.floor("D").values:
        epi_year, epi_week = get_epi_year_week(ts)
        epi_weeks.append(f"{epi_year}-{epi_week:02d}")

    datos_epi = da_concat.assign_coords(epi_week=("time", epi_weeks))
    datos_mean = datos_epi.groupby("epi_week").mean("time")

    if crs:
        try:
            datos_mean = datos_mean.rio.write_crs(crs, inplace=False)
        except ImportError:
            pass

    variable_name = path.name
    datos_mean.name = variable_name

    elapsed = time.time() - start_time
    print(f"\n‚úÖ {variable_name}: {len(datasets)} archivos incluidos, {len(archivos_excluidos)} excluidos.")
    print(f"üïí Tiempo total: {elapsed:.2f} segundos ({elapsed/60:.2f} minutos)")

    return datos_mean


def extraer_fecha_individual(f: Path) -> pd.Timestamp:
    """
    Extrae la fecha desde el nombre del archivo, buscando un patr√≥n YYYY-MM-DD o YYYYMMDD.
    Devuelve un pd.Timestamp.
    """
    fname = f.name
    # Intenta YYYY-MM-DD
    match = re.search(r"\d{4}-\d{2}-\d{2}", fname)
    if match:
        return pd.to_datetime(match.group(0))
    
    # Intenta YYYYMMDD
    match = re.search(r"\d{8}", fname)
    if match:
        return pd.to_datetime(match.group(0), format="%Y%m%d")
    
    raise ValueError(f"No se pudo extraer fecha del nombre: {fname}")


In [3]:
# ---------- Agregaci√≥n semana epidemiologica MSWEP ----------
MSWEP_IN  = r"C:\Users\Sebasti√°n Palomino\Downloads\mswep_files_valle"
MSWEP_OUT = r"C:\Users\Sebasti√°n Palomino\Desktop\Down\data\proccesed"
mswep = process_mswep(MSWEP_IN, "2013-06-01", "2025-01-31", MSWEP_OUT)
print(mswep)

NameError: name 'fecha_a_nombre' is not defined

In [None]:
# ---------- Agregaci√≥n semana epidemiologica Copernicus ----------
C3S_VARIABLES = {
    'tmax' : '2M_TEMPERATURE_MAXIMUM',
    'tmin' : '2M_TEMPERATURE_MINIMUM',
    'humidity': '2M_RELATIVE_HUMIDITY',
    'windspeed': '10M_WIND_SPEED'}
C3S_INPUT = "D:/OneDrive - CGIAR/Desktop/codigos_dengue/climate_data_downloader/data/processed/copernicus"
C3S_OUTPUT= "D:/OneDrive - CGIAR/Desktop/downscaling/data/preprocessed/copernicus"
c3s  = process_c3s(C3S_INPUT,"2013-06-01", "2025-01-31",C3S_VARIABLES,C3S_OUTPUT)
print(c3s)

üì• Procesando 4263 fechas entre 2013-06-01 y 2025-01-31 desde: D:\OneDrive - CGIAR\Desktop\codigos_dengue\climate_data_downloader\data\processed\copernicus


Procesando archivos C3S: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 4263/4263 [13:50<00:00,  5.13d√≠a/s]



‚úÖ Archivo guardado: D:/OneDrive - CGIAR/Desktop/downscaling/processed/copernicus\c3s_2013-22_2025-05.nc
üïí Tiempo total: 901.42 segundos (15.02 minutos)
<xarray.Dataset> Size: 3MB
Dimensions:      (epi_week: 610, lat: 20, lon: 17)
Coordinates:
  * lon          (lon) float64 136B -77.4 -77.3 -77.2 ... -76.0 -75.9 -75.8
  * lat          (lat) float64 160B 5.0 4.9 4.8 4.7 4.6 ... 3.5 3.4 3.3 3.2 3.1
  * epi_week     (epi_week) object 5kB '2013-22' '2013-23' ... '2025-05'
    spatial_ref  int64 8B 0
Data variables:
    tmax         (epi_week, lat, lon) float32 830kB 301.0 302.7 ... 287.0 290.3
    tmin         (epi_week, lat, lon) float32 830kB 296.5 295.1 ... 280.5 282.8
    humidity     (epi_week, lat, lon) float32 830kB 82.11 86.37 ... 92.78 91.18
    windspeed    (epi_week, lat, lon) float32 830kB 2.703 1.769 ... 1.564 1.246
Attributes:
    CDI:          Climate Data Interface version 1.9.2 (http://mpimet.mpg.de/...
    history:      Fri Mar 12 15:16:22 2021: cdo splitday /archive

In [None]:
# ---------- Agregaci√≥n semana epidemiologica Google Earth Engine ----------

GEE_NDVI = Path(r"D:/OneDrive - CGIAR/Desktop/codigos_dengue/climate_data_downloader/data/processed/gee/NDVI")
GEE_NDWI = Path(r"D:/OneDrive - CGIAR/Desktop/codigos_dengue/climate_data_downloader/data/processed/gee/NDWI")
GEE_OUTPUT= "D:/OneDrive - CGIAR/Desktop/downscaling/data/preprocessed/gee"
shape = (433, 422) ## primero Y y luego X

files_ndvi = list(GEE_NDVI.glob("*.tif"))
files_ndwi = list(GEE_NDWI.glob("*.tif"))


NDVI = process_gee(GEE_NDVI, files_ndvi, shape)
NDWI = process_gee(GEE_NDWI, files_ndwi, shape)

# Unir datasets en uno solo
combined_dataset = xr.merge([NDVI, NDWI])
print(combined_dataset)

epi_weeks = combined_dataset.epi_week.values

# Convertir a tuplas de enteros: (a√±o, semana)
epi_tuples = pd.Series(epi_weeks).str.extract(r'(?P<year>\d{4})-(?P<week>\d{2})').astype(int)
epi_tuples_list = list(zip(epi_tuples['year'], epi_tuples['week']))

# Ordenar por a√±o y semana
epi_tuples_sorted = sorted(epi_tuples_list)

# Obtener m√≠nimo y m√°ximo como strings 'YYYY-WW'
epi_min = f"{epi_tuples_sorted[0][0]}-{epi_tuples_sorted[0][1]:02d}"
epi_max = f"{epi_tuples_sorted[-1][0]}-{epi_tuples_sorted[-1][1]:02d}"

filename = os.path.join(GEE_OUTPUT, f"gee_{epi_min}_{epi_max}.nc") 

combined_dataset.to_netcdf(filename, format="NETCDF4", engine="netcdf4")

print(f"‚úÖguardada: {filename}")

üìÇ Procesando 4242 archivos desde: D:\OneDrive - CGIAR\Desktop\codigos_dengue\climate_data_downloader\data\processed\gee\NDVI


Procesando GeoTIFFs GEE:  46%|‚ñà‚ñà‚ñà‚ñà‚ñå     | 1943/4242 [00:35<00:41, 55.78archivo/s]

‚õî Excluido (dimensiones incorrectas (433, 421)): NDVI_MODIS_2018-09-27.tif


Procesando GeoTIFFs GEE: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 4242/4242 [01:19<00:00, 53.67archivo/s]



‚úÖ NDVI: 4241 archivos incluidos, 1 excluidos.
üïí Tiempo total: 82.48 segundos (1.37 minutos)
üìÇ Procesando 4241 archivos desde: D:\OneDrive - CGIAR\Desktop\codigos_dengue\climate_data_downloader\data\processed\gee\NDWI


Procesando GeoTIFFs GEE:  46%|‚ñà‚ñà‚ñà‚ñà‚ñå     | 1946/4241 [00:35<00:38, 59.79archivo/s]

‚õî Excluido (dimensiones incorrectas (433, 421)): NDWI_MODIS_2018-09-27.tif


Procesando GeoTIFFs GEE: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 4241/4241 [01:17<00:00, 55.05archivo/s]



‚úÖ NDWI: 4240 archivos incluidos, 1 excluidos.
üïí Tiempo total: 79.42 segundos (1.32 minutos)
<xarray.Dataset> Size: 889MB
Dimensions:      (x: 422, y: 433, epi_week: 608)
Coordinates:
  * x            (x) float64 3kB -8.601e+06 -8.6e+06 ... -8.391e+06 -8.39e+06
  * y            (y) float64 3kB 5.598e+05 5.593e+05 ... 3.443e+05 3.438e+05
  * epi_week     (epi_week) object 5kB '2013-22' '2013-23' ... '2025-05'
    spatial_ref  int64 8B 0
Data variables:
    NDVI         (epi_week, y, x) float32 444MB dask.array<chunksize=(1, 200, 200), meta=np.ndarray>
    NDWI         (epi_week, y, x) float32 444MB dask.array<chunksize=(1, 200, 200), meta=np.ndarray>
Attributes:
    TIFFTAG_XRESOLUTION:     1
    TIFFTAG_YRESOLUTION:     1
    TIFFTAG_RESOLUTIONUNIT:  1 (unitless)
    AREA_OR_POINT:           Area
    _FillValue:              0.0
    scale_factor:            1.0
    add_offset:              0.0
‚úÖguardada: D:/OneDrive - CGIAR/Desktop/downscaling/processed/gee\gee_2013-22_2025-05.n