In [4]:
import os
import zipfile
from pathlib import Path
from tqdm import tqdm

# === Paths ===
base_dir = Path("/mnt/cephfs-mount/chenchen/ERA5_Climate_Data")
output_base = base_dir / "unzip"
output_base.mkdir(exist_ok=True)

# === Find all .zip files ===
zip_files = sorted(base_dir.glob("*.zip"))

print(f"Found {len(zip_files)} zip files.")

# === Unzip loop ===
for zip_path in tqdm(zip_files, desc="Unzipping files"):
    # e.g. ERA5Land_2018_08_daily_mean.zip → ERA5Land_2018_08
    subfolder_name = "_".join(zip_path.stem.split("_")[:3])
    output_folder = output_base / subfolder_name

    # Skip if already unzipped
    if output_folder.exists() and any(output_folder.iterdir()):
        print(f"⏩ Skipping {subfolder_name} (already unzipped)")
        continue

    output_folder.mkdir(parents=True, exist_ok=True)

    # Unzip content
    with zipfile.ZipFile(zip_path, 'r') as zf:
        zf.extractall(output_folder)

print("✅ All files processed successfully.")

Found 83 zip files.


Unzipping files:  71%|█████████████████████████████████████████████████▊                    | 59/83 [00:00<00:00, 584.20it/s]

⏩ Skipping ERA5Land_2018_08 (already unzipped)
⏩ Skipping ERA5Land_2018_09 (already unzipped)
⏩ Skipping ERA5Land_2018_10 (already unzipped)
⏩ Skipping ERA5Land_2018_11 (already unzipped)
⏩ Skipping ERA5Land_2018_12 (already unzipped)
⏩ Skipping ERA5Land_2019_01 (already unzipped)
⏩ Skipping ERA5Land_2019_02 (already unzipped)
⏩ Skipping ERA5Land_2019_03 (already unzipped)
⏩ Skipping ERA5Land_2019_04 (already unzipped)
⏩ Skipping ERA5Land_2019_05 (already unzipped)
⏩ Skipping ERA5Land_2019_06 (already unzipped)
⏩ Skipping ERA5Land_2019_07 (already unzipped)
⏩ Skipping ERA5Land_2019_08 (already unzipped)
⏩ Skipping ERA5Land_2019_09 (already unzipped)
⏩ Skipping ERA5Land_2019_10 (already unzipped)
⏩ Skipping ERA5Land_2019_11 (already unzipped)
⏩ Skipping ERA5Land_2019_12 (already unzipped)
⏩ Skipping ERA5Land_2020_01 (already unzipped)
⏩ Skipping ERA5Land_2020_02 (already unzipped)
⏩ Skipping ERA5Land_2020_03 (already unzipped)
⏩ Skipping ERA5Land_2020_04 (already unzipped)
⏩ Skipping ER

Unzipping files: 100%|███████████████████████████████████████████████████████████████████████| 83/83 [00:05<00:00, 14.45it/s]

⏩ Skipping ERA5Land_2024_08 (already unzipped)
⏩ Skipping ERA5Land_2024_09 (already unzipped)
⏩ Skipping ERA5Land_2024_11 (already unzipped)
⏩ Skipping ERA5Land_2024_12 (already unzipped)
⏩ Skipping ERA5Land_2025_01 (already unzipped)
⏩ Skipping ERA5Land_2025_02 (already unzipped)
⏩ Skipping ERA5Land_2025_03 (already unzipped)
⏩ Skipping ERA5Land_2025_04 (already unzipped)
⏩ Skipping ERA5Land_2025_05 (already unzipped)
⏩ Skipping ERA5Land_2025_06 (already unzipped)
⏩ Skipping ERA5Land_2025_07 (already unzipped)
✅ All files processed successfully.





In [13]:
# -*- coding: utf-8 -*-
"""
ERA5-Land monthly folder inspector + optional merge
- Build a catalog of variables/files (CSV)
- Merge variables into one Dataset (lazy), with basic sanity checks
- Quick, safe stats helpers (skip heavy loads by default)
"""

from pathlib import Path
from datetime import datetime
import json
import pandas as pd
import xarray as xr

# ======== CONFIG ========
BASE_DIR = Path("/mnt/cephfs-mount/chenchen/ERA5_Climate_Data/unzip")
MONTH_DIR = BASE_DIR / "ERA5Land_2018_08"   # <- change to target month
CATALOG_CSV = MONTH_DIR / "_catalog.csv"
MERGED_ZARR = MONTH_DIR / "_merged.zarr"    # optional: on-disk Zarr store for speed
BUILD_ZARR  = False                         # set True to persist merged dataset as Zarr

# Variable shortname mapping (filename → canonical var name we’ll use)
VAR_NAME_MAP = {
    "u_component_of_wind": "u10",
    "v_component_of_wind": "v10",
    "2m_dewpoint_temperature": "d2m",
    "2m_temperature": "t2m",
    "skin_temperature": "skt",
    "leaf_area_index_high_vegetation": "lai_hv",
    "leaf_area_index_low_vegetation": "lai_lv",
    "snow_depth": "sde",
    "volumetric_soil_water_layer_1": "swvl1",
    "volumetric_soil_water_layer_2": "swvl2",
    "volumetric_soil_water_layer_3": "swvl3",
    "volumetric_soil_water_layer_4": "swvl4",
}

# ======== HELPERS ========
def _human_size(n):
    for u in ["B","KB","MB","GB","TB","PB"]:
        if n < 1024:
            return f"{n:.2f} {u}"
        n /= 1024
    return f"{n:.2f} EB"

def _time_coverage(ds):
    try:
        if "valid_time" in ds.coords:
            t = ds.indexes.get("valid_time", None)
            if t is not None and len(t) > 0:
                return t[0].isoformat(), t[-1].isoformat(), len(t)
        if "time" in ds.coords:
            t = ds.indexes.get("time", None)
            if t is not None and len(t) > 0:
                return t[0].isoformat(), t[-1].isoformat(), len(t)
    except Exception:
        pass
    return None, None, None

def _guess_primary_var(ds):
    # pick the first data_var (each file appears to have a single data var)
    if len(ds.data_vars) == 1:
        return list(ds.data_vars)[0]
    # fallback: prefer keys present in our map
    for k in VAR_NAME_MAP.values():
        if k in ds.data_vars:
            return k
    return list(ds.data_vars)[0] if ds.data_vars else None

def _canonical_name_from_filename(p: Path):
    stem = p.stem.replace("_0_daily-mean","")  # strip suffix
    # find the longest matching key in VAR_NAME_MAP
    best = None
    for k in VAR_NAME_MAP.keys():
        if k in stem:
            best = k
            break
    return VAR_NAME_MAP.get(best, None)

# ======== 1) BUILD CATALOG ========
def build_catalog(month_dir: Path, out_csv: Path) -> pd.DataFrame:
    nc_files = sorted(month_dir.glob("*.nc"))
    rows = []
    for f in nc_files:
        try:
            size = f.stat().st_size
        except Exception:
            size = None

        try:
            # Lazy open; chunks={} enables dask if present
            ds = xr.open_dataset(f, decode_times=True, chunks={})
        except Exception as e:
            rows.append({
                "file": f.name,
                "path": str(f),
                "open_error": str(e),
            })
            continue

        t0, t1, ntime = _time_coverage(ds)
        var = _guess_primary_var(ds)
        can_name = _canonical_name_from_filename(f)

        units = ds[var].attrs.get("units","") if var and var in ds else ""
        long_name = ds[var].attrs.get("long_name","") if var and var in ds else ""

        shape = dict(ds[var].sizes) if var and var in ds else {}
        dims = ",".join(shape.keys()) if shape else ""
        sizes = ",".join(str(shape[d]) for d in shape) if shape else ""

        rows.append({
            "file": f.name,
            "path": str(f),
            "size_bytes": size,
            "size_human": _human_size(size) if size else "",
            "primary_var": var,
            "canonical_var": can_name or "",
            "units": units,
            "long_name": long_name,
            "dims": dims,
            "sizes": sizes,
            "time_start": t0 or "",
            "time_end": t1 or "",
            "ntime": ntime or 0,
        })

        # Close dataset explicitly
        try:
            ds.close()
        except Exception:
            pass

    df = pd.DataFrame(rows).sort_values("file")
    if out_csv:
        df.to_csv(out_csv, index=False)
        print(f"Catalog saved → {out_csv}")
    return df

# ======== 2) MERGE ALL VARIABLES (LAZY) ========
def open_and_merge(month_dir: Path) -> xr.Dataset:
    """
    Open each .nc lazily and merge on common coords (valid_time/latitude/longitude).
    Renames data vars to canonical short names where possible.
    """
    ds_list = []
    for f in sorted(month_dir.glob("*.nc")):
        ds = xr.open_dataset(f, decode_times=True, chunks={})
        var = _guess_primary_var(ds)
        can = _canonical_name_from_filename(f)
        if var and can and var in ds:
            ds = ds.rename({var: can})
        ds_list.append(ds)

    # Merge (align by coords). compat='override' is safe here because grids match.
    merged = xr.merge(ds_list, compat="override", join="exact")
    # Optional: make sure time coord name is consistent
    if "valid_time" in merged.coords and "time" not in merged.coords:
        merged = merged.rename({"valid_time": "time"})
    return merged

# ======== 3) QUICK STATS (LAZY-FRIENDLY) ========
def quick_stats(ds: xr.Dataset, vars_subset=None, compute=False):
    """
    Compute min/max/mean over time for each variable.
    If compute=False, returns dask-backed arrays (lazy).
    """
    stats = []
    vv = vars_subset or list(ds.data_vars)
    for v in vv:
        da = ds[v]
        # Reduce over time dimension if present
        reduce_dims = [d for d in da.dims if d.lower() in ("time","valid_time")]
        if reduce_dims:
            da_tmean = da.mean(dim=reduce_dims, skipna=True)
            da_tmin  = da.min(dim=reduce_dims, skipna=True)
            da_tmax  = da.max(dim=reduce_dims, skipna=True)
        else:
            da_tmean, da_tmin, da_tmax = da, da, da

        if compute:
            da_tmean, da_tmin, da_tmax = [x.load() for x in (da_tmean, da_tmin, da_tmax)]
        stats.append((v, da_tmean, da_tmin, da_tmax))
    return stats

# ======== 4) EXAMPLES ========
if __name__ == "__main__":
    # 1) Build & save catalog
    df = build_catalog(MONTH_DIR, CATALOG_CSV)
    print(df[["file","canonical_var","units","long_name","ntime"]])

    # 2) Merge everything (lazy)
    ds = open_and_merge(MONTH_DIR)
    print("\nMerged dataset variables:", list(ds.data_vars))
    print("Coords:", list(ds.coords))
    print("Dims:", dict(ds.sizes))

    # (Optional) persist as Zarr for fast later access
    if BUILD_ZARR:
        # Choose some chunking; good default: chunk over time, moderate spatial tile
        ds = ds.chunk({"time": -1, "latitude": 200, "longitude": 400})
        ds.to_zarr(MERGED_ZARR, mode="w")
        print(f"Saved merged Zarr → {MERGED_ZARR}")

    # 3) Quick, lazy stats preview (no heavy compute yet)
    stats = quick_stats(ds, vars_subset=["t2m","d2m","skt","lai_hv","lai_lv","sde","u10","v10","swvl1","swvl2","swvl3","swvl4"], compute=False)
    for v, mean_da, min_da, max_da in stats:
        print(f"- {v}: mean/min/max over time are DataArrays with shape {mean_da.shape} (lazy)")
        # If you want scalar previews at a single point (fast), use .isel:
        # ex: print(float(mean_da.isel(latitude=400, longitude=1800)))

Catalog saved → /mnt/cephfs-mount/chenchen/ERA5_Climate_Data/unzip/ERA5Land_2018_08/_catalog.csv
                                               file canonical_var       units  \
0           10m_u_component_of_wind_0_daily-mean.nc           u10     m s**-1   
1           10m_v_component_of_wind_0_daily-mean.nc           v10     m s**-1   
2           2m_dewpoint_temperature_0_daily-mean.nc           d2m           K   
3                    2m_temperature_0_daily-mean.nc           t2m           K   
4   leaf_area_index_high_vegetation_0_daily-mean.nc        lai_hv  m**2 m**-2   
5    leaf_area_index_low_vegetation_0_daily-mean.nc        lai_lv  m**2 m**-2   
6                  skin_temperature_0_daily-mean.nc           skt           K   
7                        snow_depth_0_daily-mean.nc           sde           m   
8     volumetric_soil_water_layer_1_0_daily-mean.nc         swvl1  m**3 m**-3   
9     volumetric_soil_water_layer_2_0_daily-mean.nc         swvl2  m**3 m**-3   
10    volume

In [7]:
# -*- coding: utf-8 -*-
"""
Fuse water-fraction CSV with ERA5-Land met + LAI by date and (sp_lon, sp_lat).
Input : /mnt/cephfs-mount/chenchen/water_body_fraction_with_landcover&slope/monthly/2025_07.csv
ERA5  : /mnt/cephfs-mount/chenchen/ERA5_Climate_Data/unzip/ERA5Land_YYYY_MM/*.nc
Output: .../with_era5/2025_07_with_era5.{csv,parquet}

Fixes:
- Use xarray.interp for BOTH nearest and linear (tuple indexers are not valid for sel()).
- Provide lon/lat as DataArray with dims="points".
- Sort latitude only for linear interpolation.
- Convert pandas index to NumPy when assigning back to numpy arrays.
"""

from pathlib import Path
import sys
import numpy as np
import pandas as pd
import xarray as xr

# ================== CONFIG ==================
CSV_PATH = Path("/mnt/cephfs-mount/chenchen/water_body_fraction_with_landcover&slope/monthly/2018_09.csv")
UNZIP_BASE = Path("/mnt/cephfs-mount/chenchen/ERA5_Climate_Data/unzip")

OUTPUT_DIR = CSV_PATH.parent / "with_era5"
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
OUT_CSV = OUTPUT_DIR / (CSV_PATH.stem + "_with_era5.csv")
OUT_PARQUET = OUTPUT_DIR / (CSV_PATH.stem + "_with_era5.parquet")  # optional; comment out to skip

# "nearest" or "linear" (bilinear)
SAMPLING = "nearest"

# Variables to extract (canonical short names)
ERA5_VARS = ["t2m","d2m","skt","u10","v10","sde","swvl1","swvl2","swvl3","swvl4","lai_hv","lai_lv"]

# Map long stems → canonical names
VAR_NAME_MAP = {
    "u_component_of_wind": "u10",
    "v_component_of_wind": "v10",
    "2m_dewpoint_temperature": "d2m",
    "2m_temperature": "t2m",
    "skin_temperature": "skt",
    "leaf_area_index_high_vegetation": "lai_hv",
    "leaf_area_index_low_vegetation": "lai_lv",
    "snow_depth": "sde",
    "volumetric_soil_water_layer_1": "swvl1",
    "volumetric_soil_water_layer_2": "swvl2",
    "volumetric_soil_water_layer_3": "swvl3",
    "volumetric_soil_water_layer_4": "swvl4",
}

# ================== HELPERS ==================
def month_folder_from_date(dt: pd.Timestamp) -> Path:
    return UNZIP_BASE / f"ERA5Land_{dt.year:04d}_{dt.month:02d}"

def canonical_from_fname(p: Path) -> str | None:
    stem = p.stem.replace("_0_daily-mean", "")
    for k, v in VAR_NAME_MAP.items():
        if k in stem:
            return v
    return None

def open_and_merge_month(month_dir: Path) -> xr.Dataset:
    """Open all .nc for a month, rename primary vars to canonical, merge lazily."""
    ds_list = []
    for f in sorted(month_dir.glob("*.nc")):
        can = canonical_from_fname(f)
        if can is None:
            continue
        ds = xr.open_dataset(f, decode_times=True, chunks={})
        dvars = list(ds.data_vars)
        if not dvars:
            ds.close()
            continue
        var = dvars[0]
        if var != can and var in ds:
            ds = ds.rename({var: can})
        ds_list.append(ds)

    if not ds_list:
        raise FileNotFoundError(f"No usable .nc files in {month_dir}")

    merged = xr.merge(ds_list, compat="override", join="exact")
    if "valid_time" in merged.coords and "time" not in merged.coords:
        merged = merged.rename({"valid_time": "time"})
    # Keep only variables we need (if present)
    keep = [v for v in ERA5_VARS if v in merged.data_vars]
    if not keep:
        raise ValueError(f"No target variables found in {month_dir}")
    merged = merged[keep]
    return merged

def ensure_lon_convention(df_lons: np.ndarray, ds_lons: np.ndarray) -> np.ndarray:
    """
    Make CSV longitudes match ERA5 longitude convention.
    ERA5 here is [-180, 180). If ERA5 were [0, 360), we'd wrap accordingly.
    """
    ds_min, ds_max = float(ds_lons.min()), float(ds_lons.max())
    lons = df_lons.astype(float).copy()
    if ds_min <= -180 and ds_max <= 180:
        lons = ((lons + 180.0) % 360.0) - 180.0
    elif ds_min >= 0 and ds_max > 180:
        lons = lons % 360.0
    return lons

def extract_for_points_on_date(ds_day: xr.Dataset, lons: np.ndarray, lats: np.ndarray, sampling: str = "nearest") -> dict:
    """
    Return dict(var -> np.ndarray[n_points]) for one date using xarray.interp with
    method 'nearest' or 'linear'. lon/lat are provided as DataArray(dims='points').
    """
    vars_present = [v for v in ERA5_VARS if v in ds_day.data_vars]
    if not vars_present:
        return {}

    # For linear, xarray requires ascending latitude; sort if necessary
    if sampling == "linear":
        if not np.all(np.diff(ds_day.latitude.values) > 0):
            ds_day = ds_day.sortby("latitude")

    lon_da = xr.DataArray(lons, dims="points")
    lat_da = xr.DataArray(lats, dims="points")

    out = ds_day[vars_present].interp(
        longitude=lon_da,
        latitude=lat_da,
        method=("linear" if sampling == "linear" else "nearest"),
    )

    res = {}
    for v in vars_present:
        res[v] = out[v].values  # shape: (points,)
    return res

# ================== MAIN ==================
def main():
    # ---- 1) Load CSV ----
    if not CSV_PATH.exists():
        print(f"ERROR: CSV not found: {CSV_PATH}", file=sys.stderr)
        sys.exit(1)

    df = pd.read_csv(CSV_PATH)
    if "date" not in df.columns or "sp_lon" not in df.columns or "sp_lat" not in df.columns:
        raise ValueError("CSV must contain 'date', 'sp_lon', and 'sp_lat' columns.")

    # Parse date (floor to date if time exists)
    df["date"] = pd.to_datetime(df["date"], errors="coerce").dt.floor("D")
    if df["date"].isna().any():
        bad = int(df["date"].isna().sum())
        print(f"WARNING: {bad} rows have non-parsable 'date' and will remain NaN after join.")

    # ---- 2) Discover month set & cache ERA5 datasets ----
    df["_year"] = df["date"].dt.year
    df["_month"] = df["date"].dt.month
    months = sorted(df.loc[df["date"].notna(), ["_year","_month"]].drop_duplicates().itertuples(index=False, name=None))

    month_cache: dict[tuple[int,int], xr.Dataset] = {}
    for (y, m) in months:
        mdir = month_folder_from_date(pd.Timestamp(year=y, month=m, day=1))
        if not mdir.exists():
            print(f"WARNING: ERA5 month folder missing → {mdir}. Rows for {y}-{m:02d} will remain NaN.")
            continue
        try:
            month_cache[(y, m)] = open_and_merge_month(mdir)
        except Exception as e:
            print(f"WARNING: Failed to open/merge {mdir}: {e}. Rows for {y}-{m:02d} will remain NaN.")

    # ---- 3) Prepare output columns (default NaN) ----
    out_cols = {v: np.full(len(df), np.nan, dtype=float) for v in ERA5_VARS}

    # ---- 4) Iterate by month, then date ----
    for (y, m), ds_month in month_cache.items():
        ds_lons = ds_month.longitude.values
        ds_lats = ds_month.latitude.values
        lat_min, lat_max = float(ds_lats.min()), float(ds_lats.max())

        idx_month = df.index[(df["_year"] == y) & (df["_month"] == m)]
        if len(idx_month) == 0:
            continue

        # Group by date for this month
        groups = df.loc[idx_month].groupby("date").groups
        for dt, idx_day in groups.items():
            if pd.isna(dt):
                continue
            idx_day_np = np.asarray(list(idx_day))  # ensure NumPy index for assignment

            # Slice exact day; fall back to nearest within this month if exact missing
            try:
                ds_day = ds_month.sel(time=pd.Timestamp(dt))
            except KeyError:
                try:
                    ds_day = ds_month.sel(time=pd.Timestamp(dt), method="nearest")
                except Exception:
                    print(f"WARNING: No ERA5 data for date {dt.date()} in ERA5Land_{y:04d}_{m:02d}")
                    continue

            # Coordinates for these rows
            lons = df.loc[idx_day_np, "sp_lon"].to_numpy(float)
            lats = df.loc[idx_day_np, "sp_lat"].to_numpy(float)

            # Match longitude convention to dataset
            lons = ensure_lon_convention(lons, ds_lons)

            # Domain guard (latitude)
            lo = min(lat_min, lat_max)
            hi = max(lat_min, lat_max)
            in_lat_mask = (lats >= lo) & (lats <= hi)

            if not in_lat_mask.any():
                continue

            valid_pos = np.where(in_lat_mask)[0]
            vals = extract_for_points_on_date(
                ds_day,
                lons[valid_pos],
                lats[valid_pos],
                sampling=SAMPLING
            )

            # Write back
            for v in ERA5_VARS:
                tmp = np.full(idx_day_np.size, np.nan, dtype=float)
                if v in vals:
                    tmp[valid_pos] = vals[v]
                out_cols[v][idx_day_np] = tmp

    # ---- 5) Attach columns & save ----
    for v, arr in out_cols.items():
        df[v] = arr

    df.drop(columns=["_year","_month"], inplace=True, errors="ignore")

    df.to_csv(OUT_CSV, index=False)
    print(f"✅ Saved CSV → {OUT_CSV}")

    # Optional Parquet for faster reloads (comment out to skip)
    try:
        df.to_parquet(OUT_PARQUET, index=False)
        print(f"✅ Saved Parquet → {OUT_PARQUET}")
    except Exception as e:
        print(f"(Note) Parquet save skipped/failed: {e}")

if __name__ == "__main__":
    main()


✅ Saved CSV → /mnt/cephfs-mount/chenchen/water_body_fraction_with_landcover&slope/monthly/with_era5/2018_09_with_era5.csv
(Note) Parquet save skipped/failed: Unable to find a usable engine; tried using: 'pyarrow', 'fastparquet'.
A suitable version of pyarrow or fastparquet is required for parquet support.
Trying to import the above resulted in these errors:
 - Missing optional dependency 'pyarrow'. pyarrow is required for parquet support. Use pip or conda to install pyarrow.
 - Missing optional dependency 'fastparquet'. fastparquet is required for parquet support. Use pip or conda to install fastparquet.


In [2]:
# -*- coding: utf-8 -*-
"""
Batch-fuse water-fraction CSVs with ERA5-Land met + LAI (date + sp_lon/sp_lat),
processing ONLY months in the range 2018-08 .. 2025-05 (inclusive).

Input  CSV dir : /mnt/cephfs-mount/chenchen/water_body_fraction_with_landcover&slope/monthly/
ERA5   month   : /mnt/cephfs-mount/chenchen/ERA5_Climate_Data/unzip/ERA5Land_YYYY_MM/*.nc
Output dir     : <input_dir>/with_era5/<csv_stem>_with_era5.{csv,parquet}

Policy:
- STRICT_ERA5_ALIGNMENT=True: if the ERA5 month folder for a CSV's months is missing,
  SKIP that CSV entirely (no partial outputs).
"""

from pathlib import Path
import sys
import re
from datetime import date
import numpy as np
import pandas as pd
import xarray as xr

# ================== CONFIG ==================
INPUT_DIR    = Path("/mnt/cephfs-mount/chenchen/water_body_fraction_with_landcover&slope/monthly")
UNZIP_BASE   = Path("/mnt/cephfs-mount/chenchen/ERA5_Climate_Data/unzip")
OUTPUT_DIR   = INPUT_DIR / "with_era5"
GLOB_PATTERN = "*.csv"
SKIP_EXISTING   = True
WRITE_PARQUET   = False     # keep off unless you want .parquet too
SAMPLING        = "nearest" # "nearest" or "linear" (bilinear)

# STRICT alignment: if any required ERA5 month folder for a CSV is missing, skip the CSV
STRICT_ERA5_ALIGNMENT = True

# Variables to extract (canonical short names)
ERA5_VARS = ["t2m","d2m","skt","u10","v10","sde","swvl1","swvl2","swvl3","swvl4","lai_hv","lai_lv"]

# Map long stems → canonical names
VAR_NAME_MAP = {
    "u_component_of_wind": "u10",
    "v_component_of_wind": "v10",
    "2m_dewpoint_temperature": "d2m",
    "2m_temperature": "t2m",
    "skin_temperature": "skt",
    "leaf_area_index_high_vegetation": "lai_hv",
    "leaf_area_index_low_vegetation": "lai_lv",
    "snow_depth": "sde",
    "volumetric_soil_water_layer_1": "swvl1",
    "volumetric_soil_water_layer_2": "swvl2",
    "volumetric_soil_water_layer_3": "swvl3",
    "volumetric_soil_water_layer_4": "swvl4",
}

# ===== Month range: 2018-08 .. 2025-05 (inclusive) =====
def month_range(start: date, end: date):
    y, m = start.year, start.month
    while (y, m) <= (end.year, end.month):
        yield (y, m)
        if m == 12:
            y += 1; m = 1
        else:
            m += 1

MONTHS_TO_PROCESS = list(month_range(date(2018, 8, 1), date(2025, 7, 1)))

# ================== HELPERS ==================
def month_folder_from_date(dt: pd.Timestamp) -> Path:
    return UNZIP_BASE / f"ERA5Land_{dt.year:04d}_{dt.month:02d}"

def canonical_from_fname(p: Path) -> str | None:
    stem = p.stem.replace("_0_daily-mean", "")
    for k, v in VAR_NAME_MAP.items():
        if k in stem:
            return v
    return None

def open_and_merge_month(month_dir: Path) -> xr.Dataset:
    """Open all .nc for a month, rename primary vars to canonical, merge lazily."""
    ds_list = []
    for f in sorted(month_dir.glob("*.nc")):
        can = canonical_from_fname(f)
        if can is None:
            continue
        ds = xr.open_dataset(f, decode_times=True, chunks={})
        dvars = list(ds.data_vars)
        if not dvars:
            ds.close()
            continue
        var = dvars[0]
        if var != can and var in ds:
            ds = ds.rename({var: can})
        ds_list.append(ds)

    if not ds_list:
        raise FileNotFoundError(f"No usable .nc files in {month_dir}")

    merged = xr.merge(ds_list, compat="override", join="exact")
    if "valid_time" in merged.coords and "time" not in merged.coords:
        merged = merged.rename({"valid_time": "time"})
    keep = [v for v in ERA5_VARS if v in merged.data_vars]
    if not keep:
        raise ValueError(f"No target variables found in {month_dir}")
    return merged[keep]

def ensure_lon_convention(df_lons: np.ndarray, ds_lons: np.ndarray) -> np.ndarray:
    """Match CSV longitudes to ERA5 longitude convention."""
    ds_min, ds_max = float(ds_lons.min()), float(ds_lons.max())
    lons = df_lons.astype(float).copy()
    if ds_min <= -180 and ds_max <= 180:
        lons = ((lons + 180.0) % 360.0) - 180.0
    elif ds_min >= 0 and ds_max > 180:
        lons = lons % 360.0
    return lons

def extract_for_points_on_date(ds_day: xr.Dataset, lons: np.ndarray, lats: np.ndarray, sampling: str = "nearest") -> dict:
    """Sample one date at many points using xarray.interp (nearest or linear)."""
    vars_present = [v for v in ERA5_VARS if v in ds_day.data_vars]
    if not vars_present:
        return {}

    if sampling == "linear":
        # xarray.interp requires ascending latitude
        if not np.all(np.diff(ds_day.latitude.values) > 0):
            ds_day = ds_day.sortby("latitude")

    out = ds_day[vars_present].interp(
        longitude=xr.DataArray(lons, dims="points"),
        latitude=xr.DataArray(lats, dims="points"),
        method=("linear" if sampling == "linear" else "nearest"),
    )

    return {v: out[v].values for v in vars_present}

def parse_month_from_filename(path: Path) -> tuple[int,int] | None:
    """Try to parse YYYY_MM from filename like '2025_07.csv'."""
    m = re.search(r"(?P<y>20\d{2}|19\d{2})[_-](?P<m>\d{2})", path.stem)
    if not m:
        return None
    y = int(m.group("y")); mo = int(m.group("m"))
    if 1 <= mo <= 12:
        return (y, mo)
    return None

def month_from_csv_dates(path: Path) -> tuple[int,int] | None:
    """Fallback: read a few rows and infer year/month from 'date' column."""
    try:
        df_head = pd.read_csv(path, nrows=1000)
        if "date" not in df_head.columns:
            return None
        s = pd.to_datetime(df_head["date"], errors="coerce").dropna()
        if s.empty:
            return None
        y = int(s.dt.year.mode(dropna=True).iloc[0])
        m = int(s.dt.month.mode(dropna=True).iloc[0])
        return (y, m)
    except Exception:
        return None

# ================== CORE PROCESSOR ==================
def process_one_csv(csv_path: Path, month_cache: dict) -> None:
    OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
    out_csv = OUTPUT_DIR / (csv_path.stem + "_with_era5.csv")
    out_parquet = OUTPUT_DIR / (csv_path.stem + "_with_era5.parquet")

    if SKIP_EXISTING and out_csv.exists() and (not WRITE_PARQUET or out_parquet.exists()):
        print(f"⏩ Skip (exists): {csv_path.name}")
        return

    # Load
    df = pd.read_csv(csv_path)
    if "date" not in df.columns or "sp_lon" not in df.columns or "sp_lat" not in df.columns:
        print(f"WARNING: Missing required columns in {csv_path.name}; needs 'date','sp_lon','sp_lat'. Skipping.")
        return

    # Parse date
    df["date"] = pd.to_datetime(df["date"], errors="coerce").dt.floor("D")
    if df["date"].isna().any():
        print(f"WARNING: {int(df['date'].isna().sum())} rows have non-parsable 'date' in {csv_path.name}")

    # Month discovery for this file (should be a single month, but we support multi)
    df["_year"] = df["date"].dt.year
    df["_month"] = df["date"].dt.month
    months = sorted(df.loc[df["date"].notna(), ["_year","_month"]].drop_duplicates().itertuples(index=False, name=None))

    # STRICT check: all required ERA5 month folders must exist, else skip this CSV
    if STRICT_ERA5_ALIGNMENT:
        missing = []
        for (y, m) in months:
            mdir = UNZIP_BASE / f"ERA5Land_{y:04d}_{m:02d}"
            if not mdir.exists():
                missing.append((y, m))
        if missing:
            miss_str = ", ".join([f"{y:04d}-{m:02d}" for (y, m) in missing])
            print(f"⏭️  Skip {csv_path.name}: missing ERA5 months → {miss_str}")
            return

    # Prepare outputs
    out_cols = {v: np.full(len(df), np.nan, dtype=float) for v in ERA5_VARS}

    # Iterate per month, then date
    for (y, m) in months:
        # Open/cache ERA5 month dataset
        key = (y, m)
        mdir = UNZIP_BASE / f"ERA5Land_{y:04d}_{m:02d}"
        if key not in month_cache:
            try:
                month_cache[key] = open_and_merge_month(mdir)
            except Exception as e:
                print(f"WARNING: Failed to open/merge {mdir}: {e}. Rows for {y}-{m:02d} remain NaN in {csv_path.name}.")
                if STRICT_ERA5_ALIGNMENT:
                    print(f"⏭️  Skip {csv_path.name}: cannot open ERA5 month {y:04d}-{m:02d}.")
                    return
                else:
                    continue

        ds_month = month_cache[key]
        ds_lons = ds_month.longitude.values
        ds_lats = ds_month.latitude.values
        lat_min, lat_max = float(ds_lats.min()), float(ds_lats.max())
        lat_lo, lat_hi = min(lat_min, lat_max), max(lat_min, lat_max)

        idx_month = df.index[(df["_year"] == y) & (df["_month"] == m)]
        if len(idx_month) == 0:
            continue

        groups = df.loc[idx_month].groupby("date").groups
        for dt, idx_day in groups.items():
            if pd.isna(dt):
                continue
            idx_day_np = np.asarray(list(idx_day))

            # Slice exact day; fallback nearest within month
            try:
                ds_day = ds_month.sel(time=pd.Timestamp(dt))
            except KeyError:
                try:
                    ds_day = ds_month.sel(time=pd.Timestamp(dt), method="nearest")
                except Exception:
                    print(f"WARNING: No ERA5 day for {dt.date()} in ERA5Land_{y:04d}_{m:02d} for {csv_path.name}")
                    continue

            # Coords for these rows
            lons = df.loc[idx_day_np, "sp_lon"].to_numpy(float)
            lats = df.loc[idx_day_np, "sp_lat"].to_numpy(float)

            # Match longitude convention
            lons = ensure_lon_convention(lons, ds_lons)

            # Lat domain guard
            in_lat = (lats >= lat_lo) & (lats <= lat_hi)
            if not in_lat.any():
                continue

            valid_pos = np.where(in_lat)[0]
            vals = extract_for_points_on_date(ds_day, lons[valid_pos], lats[valid_pos], sampling=SAMPLING)

            # Assign back
            for v in ERA5_VARS:
                tmp = np.full(idx_day_np.size, np.nan, dtype=float)
                if v in vals:
                    tmp[valid_pos] = vals[v]
                out_cols[v][idx_day_np] = tmp

    # Attach columns and save
    for v, arr in out_cols.items():
        df[v] = arr
    df.drop(columns=["_year","_month"], inplace=True, errors="ignore")

    OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
    df.to_csv(out_csv, index=False)
    print(f"✅ Saved CSV → {out_csv}")
    if WRITE_PARQUET:
        try:
            df.to_parquet(out_parquet, index=False)
            print(f"✅ Saved Parquet → {out_parquet}")
        except Exception as e:
            print(f"(Note) Parquet save skipped/failed for {csv_path.name}: {e}")

# ================== MAIN ==================
def main():
    if not INPUT_DIR.exists():
        print(f"ERROR: Input dir not found: {INPUT_DIR}", file=sys.stderr)
        sys.exit(1)

    csv_files = sorted(INPUT_DIR.glob(GLOB_PATTERN))
    if not csv_files:
        print(f"No CSVs found in {INPUT_DIR} with pattern {GLOB_PATTERN}")
        return

    # Filter by target months (2018-08 .. 2025-05)
    targets = set(MONTHS_TO_PROCESS)
    selected_files = []
    for p in csv_files:
        # Skip helper outputs
        if p.name.startswith("_") or p.name.endswith("_schema_summary.csv"):
            continue
        y_m = parse_month_from_filename(p)
        if y_m is None:
            y_m = month_from_csv_dates(p)
        if y_m is None:
            print(f"⚠️  Could not determine month for {p.name}; skipping.")
            continue
        if y_m in targets:
            selected_files.append(p)

    if not selected_files:
        print(f"No files matched target months: {sorted(targets)}")
        return

    print("Files selected:")
    for p in selected_files:
        print(" -", p.name)

    # Shared ERA5 cache across files
    month_cache: dict[tuple[int,int], xr.Dataset] = {}
    for csv_path in selected_files:
        process_one_csv(csv_path, month_cache)

if __name__ == "__main__":
    main()

Files selected:
 - 2018_08.csv
 - 2018_09.csv
 - 2018_10.csv
 - 2018_11.csv
 - 2018_12.csv
 - 2019_01.csv
 - 2019_02.csv
 - 2019_03.csv
 - 2019_04.csv
 - 2019_05.csv
 - 2019_06.csv
 - 2019_07.csv
 - 2019_08.csv
 - 2019_09.csv
 - 2019_10.csv
 - 2019_11.csv
 - 2019_12.csv
 - 2020_01.csv
 - 2020_02.csv
 - 2020_03.csv
 - 2020_04.csv
 - 2020_05.csv
 - 2020_06.csv
 - 2020_07.csv
 - 2020_08.csv
 - 2020_09.csv
 - 2020_10.csv
 - 2020_11.csv
 - 2020_12.csv
 - 2021_01.csv
 - 2021_02.csv
 - 2021_03.csv
 - 2021_04.csv
 - 2021_05.csv
 - 2021_06.csv
 - 2021_07.csv
 - 2021_08.csv
 - 2021_09.csv
 - 2021_10.csv
 - 2021_11.csv
 - 2021_12.csv
 - 2022_01.csv
 - 2022_02.csv
 - 2022_03.csv
 - 2022_04.csv
 - 2022_05.csv
 - 2022_06.csv
 - 2022_07.csv
 - 2022_08.csv
 - 2022_09.csv
 - 2022_10.csv
 - 2022_11.csv
 - 2022_12.csv
 - 2023_01.csv
 - 2023_02.csv
 - 2023_03.csv
 - 2023_04.csv
 - 2023_05.csv
 - 2023_06.csv
 - 2023_07.csv
 - 2023_08.csv
 - 2023_09.csv
 - 2023_10.csv
 - 2023_11.csv
 - 2023_12.csv
 - 2024_0

  import pynvml


✅ Saved CSV → /mnt/cephfs-mount/chenchen/water_body_fraction_with_landcover&slope/monthly/with_era5/2025_06_with_era5.csv
✅ Saved CSV → /mnt/cephfs-mount/chenchen/water_body_fraction_with_landcover&slope/monthly/with_era5/2025_07_with_era5.csv


In [1]:
# -*- coding: utf-8 -*-
"""
Batch-augment daily CYGNSS CSVs with ERA5-Land meteorology + LAI
for each day (YYYYMMDD.csv) under monthly subfolders (e.g., 2024_12_v2/).

INPUT_BASE   : /mnt/cephfs-mount/chenchen/CygnssDataCsvLand/
OUTPUT_BASE  : /mnt/cephfs-mount/chenchen/CygnssDataCsvLand_full/
ERA5 month   : /mnt/cephfs-mount/chenchen/ERA5_Climate_Data/unzip/ERA5Land_YYYY_MM/*.nc

Behavior:
- Preserves monthly subfolder structure in OUTPUT_BASE.
- Writes output with the SAME FILENAME as input (e.g., 20241231.csv) into OUTPUT_BASE/<month_dir>/.
- STRICT_ERA5_ALIGNMENT=True: if required ERA5 month folder is missing or unreadable, skip that CSV.
- SKIP_EXISTING=True: if output CSV already exists, skip processing.

Assumptions:
- Each daily CSV contains columns 'sp_lon' and 'sp_lat'. If 'date' column is missing,
  it will be created from the filename.
- ERA5 files are daily statistics with a 'time' coordinate per day.

Author: You :)
"""

from pathlib import Path
import sys
import re
from typing import Optional, Tuple
import numpy as np
import pandas as pd
import xarray as xr

# ================== CONFIG ==================
INPUT_BASE   = Path("/mnt/cephfs-mount/chenchen/CygnssDataCsvLand")
OUTPUT_BASE  = Path("/mnt/cephfs-mount/chenchen/CygnssDataCsvLand_full")
UNZIP_BASE   = Path("/mnt/cephfs-mount/chenchen/ERA5_Climate_Data/unzip")

# Which files/folders to process
MONTH_DIR_REGEX   = re.compile(r".*(?P<y>\d{4})_(?P<m>\d{2}).*")  # matches '2024_12' in '2024_12_v2'
DAILY_FILE_REGEX  = re.compile(r"(?P<ymd>\d{8})\.csv$", flags=re.IGNORECASE)

# I/O options
SKIP_EXISTING     = True   # Skip if output daily CSV already exists
WRITE_PARQUET     = False  # Optional .parquet alongside .csv
PARQUET_SUFFIX    = ".parquet"
CSV_SUFFIX        = ".csv"

# Sampling method for xarray.interp: "nearest" or "linear" (bilinear)
SAMPLING          = "nearest"

# If True, skip any daily CSV if its ERA5 month is missing or unreadable
STRICT_ERA5_ALIGNMENT = True

# Variables to extract (canonical short names)
ERA5_VARS = ["t2m","d2m","skt","u10","v10","sde","swvl1","swvl2","swvl3","swvl4","lai_hv","lai_lv"]

# Map long stems → canonical names
VAR_NAME_MAP = {
    "u_component_of_wind": "u10",
    "v_component_of_wind": "v10",
    "2m_dewpoint_temperature": "d2m",
    "2m_temperature": "t2m",
    "skin_temperature": "skt",
    "leaf_area_index_high_vegetation": "lai_hv",
    "leaf_area_index_low_vegetation": "lai_lv",
    "snow_depth": "sde",
    "volumetric_soil_water_layer_1": "swvl1",
    "volumetric_soil_water_layer_2": "swvl2",
    "volumetric_soil_water_layer_3": "swvl3",
    "volumetric_soil_water_layer_4": "swvl4",
}

# ================== HELPERS ==================
def canonical_from_fname(p: Path) -> Optional[str]:
    """Infer canonical variable name from ERA5 filename."""
    stem = p.stem.replace("_0_daily-mean", "")
    for k, v in VAR_NAME_MAP.items():
        if k in stem:
            return v
    return None

def open_and_merge_month(month_dir: Path) -> xr.Dataset:
    """
    Open all .nc in an ERA5 month folder, rename primary var to canonical, and merge lazily.
    Returns a Dataset with only ERA5_VARS that are present.
    """
    ds_list = []
    for f in sorted(month_dir.glob("*.nc")):
        can = canonical_from_fname(f)
        if can is None:
            continue
        ds = xr.open_dataset(f, decode_times=True, chunks={})
        dvars = list(ds.data_vars)
        if not dvars:
            ds.close()
            continue
        var = dvars[0]
        if var != can and var in ds:
            ds = ds.rename({var: can})
        ds_list.append(ds)

    if not ds_list:
        raise FileNotFoundError(f"No usable .nc files in {month_dir}")

    merged = xr.merge(ds_list, compat="override", join="exact")

    # Normalize time coordinate name
    if "valid_time" in merged.coords and "time" not in merged.coords:
        merged = merged.rename({"valid_time": "time"})

    keep = [v for v in ERA5_VARS if v in merged.data_vars]
    if not keep:
        raise ValueError(f"No target variables {ERA5_VARS} found in {month_dir}")
    return merged[keep]

def ensure_lon_convention(df_lons: np.ndarray, ds_lons: np.ndarray) -> np.ndarray:
    """
    Match CSV longitudes to ERA5 longitude convention (0..360 or -180..180).
    """
    ds_min, ds_max = float(ds_lons.min()), float(ds_lons.max())
    lons = df_lons.astype(float).copy()
    if ds_min <= -180 and ds_max <= 180:
        # ERA5 grid in [-180, 180]
        lons = ((lons + 180.0) % 360.0) - 180.0
    elif ds_min >= 0 and ds_max > 180:
        # ERA5 grid in [0, 360)
        lons = lons % 360.0
    return lons

def extract_for_points_on_date(
    ds_day: xr.Dataset, lons: np.ndarray, lats: np.ndarray, sampling: str = "nearest"
) -> dict:
    """Sample one date at many points using xarray.interp."""
    vars_present = [v for v in ERA5_VARS if v in ds_day.data_vars]
    if not vars_present:
        return {}

    if sampling == "linear":
        # xarray.interp requires ascending latitude
        if not np.all(np.diff(ds_day.latitude.values) > 0):
            ds_day = ds_day.sortby("latitude")

    out = ds_day[vars_present].interp(
        longitude=xr.DataArray(lons, dims="points"),
        latitude=xr.DataArray(lats, dims="points"),
        method=("linear" if sampling == "linear" else "nearest"),
    )
    return {v: out[v].values for v in vars_present}

def parse_date_from_filename(fname: str) -> Optional[pd.Timestamp]:
    """Parse YYYYMMDD.csv -> Timestamp."""
    m = DAILY_FILE_REGEX.search(fname)
    if not m:
        return None
    ymd = m.group("ymd")
    try:
        return pd.to_datetime(ymd, format="%Y%m%d")
    except Exception:
        return None

def month_dir_year_month(name: str) -> Optional[Tuple[int, int]]:
    """Extract (year, month) from a month directory name like '2024_12_v2'."""
    m = MONTH_DIR_REGEX.match(name)
    if not m:
        return None
    y = int(m.group("y"))
    mo = int(m.group("m"))
    if 1 <= mo <= 12:
        return (y, mo)
    return None

# ================== CORE PROCESSOR ==================
def process_one_daily_csv(csv_path: Path, out_csv_path: Path, month_cache: dict) -> None:
    """
    Process a single daily CSV:
    - Ensure ERA5 month dataset is present & cached.
    - Determine date from filename (or fallback to 'date' column).
    - Interpolate ERA5 variables at each row's (sp_lon, sp_lat) for that day.
    - Write to out_csv_path; optionally also .parquet.
    """
    # Skip if exists
    if SKIP_EXISTING and out_csv_path.exists():
        print(f"⏩ Skip (exists): {out_csv_path.relative_to(OUTPUT_BASE)}")
        return

    # Determine date
    day_ts = parse_date_from_filename(csv_path.name)

    # Load CSV
    df = pd.read_csv(csv_path)
    if "sp_lon" not in df.columns or "sp_lat" not in df.columns:
        print(f"WARNING: Missing 'sp_lon'/'sp_lat' in {csv_path.name}; skipping.")
        return

    if "date" in df.columns:
        # Align to day if provided
        df["date"] = pd.to_datetime(df["date"], errors="coerce").dt.floor("D")
        # If filename date is missing/ambiguous, use column's mode
        if day_ts is None:
            s = df["date"].dropna()
            if not s.empty:
                day_ts = pd.to_datetime(s.mode(dropna=True).iloc[0])
    else:
        # Create 'date' from filename date if possible
        if day_ts is not None:
            df["date"] = pd.Timestamp(day_ts).floor("D")
        else:
            print(f"⚠️  Cannot infer date for {csv_path.name}; add 'date' col or fix filename. Skipping.")
            return

    # Ensure a valid day
    if day_ts is None or pd.isna(day_ts):
        # fallback to mode of 'date' col
        s = pd.to_datetime(df["date"], errors="coerce").dropna()
        if s.empty:
            print(f"⚠️  No valid date in {csv_path.name}; skipping.")
            return
        day_ts = pd.to_datetime(s.mode(dropna=True).iloc[0]).floor("D")

    y, m = day_ts.year, day_ts.month
    era5_month_dir = UNZIP_BASE / f"ERA5Land_{y:04d}_{m:02d}"
    key = (y, m)

    # STRICT: ERA5 month must exist
    if STRICT_ERA5_ALIGNMENT and not era5_month_dir.exists():
        print(f"⏭️  Skip {csv_path.name}: missing ERA5 month → {y:04d}-{m:02d}")
        return

    # Open/cache ERA5 for the month
    if key not in month_cache:
        try:
            month_cache[key] = open_and_merge_month(era5_month_dir)
        except Exception as e:
            print(f"⏭️  Skip {csv_path.name}: cannot open ERA5 month {y:04d}-{m:02d} → {e}")
            return

    ds_month = month_cache[key]

    # Slice exact day; fallback nearest within month
    try:
        ds_day = ds_month.sel(time=pd.Timestamp(day_ts))
    except KeyError:
        try:
            ds_day = ds_month.sel(time=pd.Timestamp(day_ts), method="nearest")
        except Exception:
            print(f"WARNING: No ERA5 'time' for {day_ts.date()} in ERA5Land_{y:04d}_{m:02d} for {csv_path.name}")
            return

    # Prepare outputs
    out_cols = {v: np.full(len(df), np.nan, dtype=float) for v in ERA5_VARS}

    # Get coords & lon convention
    lons = df["sp_lon"].to_numpy(float)
    lats = df["sp_lat"].to_numpy(float)

    ds_lons = ds_day.longitude.values
    ds_lats = ds_day.latitude.values
    lat_min, lat_max = float(ds_lats.min()), float(ds_lats.max())
    lat_lo, lat_hi = min(lat_min, lat_max), max(lat_min, lat_max)

    lons = ensure_lon_convention(lons, ds_lons)
    in_lat = (lats >= lat_lo) & (lats <= lat_hi)

    if not in_lat.any():
        # Nothing within ERA5 lat range
        df_out = df.copy()
        for v, arr in out_cols.items():
            df_out[v] = arr
        out_csv_path.parent.mkdir(parents=True, exist_ok=True)
        df_out.to_csv(out_csv_path, index=False)
        print(f"✅ Saved (no in-range lats) → {out_csv_path.relative_to(OUTPUT_BASE)}")
        if WRITE_PARQUET:
            try:
                df_out.to_parquet(out_csv_path.with_suffix(PARQUET_SUFFIX), index=False)
            except Exception as e:
                print(f"(Note) Parquet save skipped/failed for {csv_path.name}: {e}")
        return

    valid_pos = np.where(in_lat)[0]
    vals = extract_for_points_on_date(ds_day, lons[valid_pos], lats[valid_pos], sampling=SAMPLING)

    # Assign back
    for v in ERA5_VARS:
        tmp = np.full(len(df), np.nan, dtype=float)
        if v in vals:
            tmp[valid_pos] = vals[v]
        out_cols[v] = tmp

    # Attach and save
    df_out = df.copy()
    for v, arr in out_cols.items():
        df_out[v] = arr

    out_csv_path.parent.mkdir(parents=True, exist_ok=True)
    df_out.to_csv(out_csv_path, index=False)
    print(f"✅ Saved → {out_csv_path.relative_to(OUTPUT_BASE)}")

    if WRITE_PARQUET:
        try:
            df_out.to_parquet(out_csv_path.with_suffix(PARQUET_SUFFIX), index=False)
            print(f"✅ Saved Parquet → {out_csv_path.with_suffix(PARQUET_SUFFIX).relative_to(OUTPUT_BASE)}")
        except Exception as e:
            print(f"(Note) Parquet save skipped/failed for {csv_path.name}: {e}")

# ================== MAIN SCAN ==================
def main():
    if not INPUT_BASE.exists():
        print(f"ERROR: Input base not found: {INPUT_BASE}", file=sys.stderr)
        sys.exit(1)

    month_dirs = [d for d in sorted(INPUT_BASE.iterdir()) if d.is_dir() and month_dir_year_month(d.name) is not None]
    if not month_dirs:
        print(f"No month-like subfolders found under: {INPUT_BASE}")
        return

    # Shared ERA5 month cache
    month_cache: dict[tuple[int, int], xr.Dataset] = {}

    print("Scanning month folders:")
    for mdir in month_dirs:
        y_m = month_dir_year_month(mdir.name)
        if y_m is None:
            continue
        y, m = y_m
        #print(f" - {mdir.name} (parsed {y:04d}-{m:02d})")

        # Collect daily CSVs
        daily_files = [p for p in sorted(mdir.glob("*.csv")) if DAILY_FILE_REGEX.search(p.name)]
        if not daily_files:
            print(f"   (no daily CSVs like YYYYMMDD.csv)")
            continue

        # Output month folder mirrors input name
        out_month_dir = OUTPUT_BASE / mdir.name

        for csv_path in daily_files:
            # Out file uses same filename in mirrored folder
            out_csv_path = out_month_dir / csv_path.name
            process_one_daily_csv(csv_path, out_csv_path, month_cache)

    print("Done.")

if __name__ == "__main__":
    main()


Scanning month folders:
⏩ Skip (exists): 2018_08_v2/20180801.csv
⏩ Skip (exists): 2018_08_v2/20180802.csv
⏩ Skip (exists): 2018_08_v2/20180803.csv
⏩ Skip (exists): 2018_08_v2/20180804.csv
⏩ Skip (exists): 2018_08_v2/20180805.csv
⏩ Skip (exists): 2018_08_v2/20180806.csv
⏩ Skip (exists): 2018_08_v2/20180807.csv
⏩ Skip (exists): 2018_08_v2/20180808.csv
⏩ Skip (exists): 2018_08_v2/20180809.csv
⏩ Skip (exists): 2018_08_v2/20180810.csv
⏩ Skip (exists): 2018_08_v2/20180811.csv
⏩ Skip (exists): 2018_08_v2/20180812.csv
⏩ Skip (exists): 2018_08_v2/20180813.csv
⏩ Skip (exists): 2018_08_v2/20180814.csv
⏩ Skip (exists): 2018_08_v2/20180815.csv
⏩ Skip (exists): 2018_08_v2/20180816.csv
⏩ Skip (exists): 2018_08_v2/20180817.csv
⏩ Skip (exists): 2018_08_v2/20180818.csv
⏩ Skip (exists): 2018_08_v2/20180819.csv
⏩ Skip (exists): 2018_08_v2/20180820.csv
⏩ Skip (exists): 2018_08_v2/20180821.csv
⏩ Skip (exists): 2018_08_v2/20180822.csv
⏩ Skip (exists): 2018_08_v2/20180823.csv
⏩ Skip (exists): 2018_08_v2/20180

  import pynvml


✅ Saved → 2024_07_v2/20240721.csv
✅ Saved → 2024_07_v2/20240722.csv
✅ Saved → 2024_07_v2/20240723.csv
✅ Saved → 2024_07_v2/20240724.csv
✅ Saved → 2024_07_v2/20240725.csv
✅ Saved → 2024_07_v2/20240726.csv
✅ Saved → 2024_07_v2/20240727.csv
✅ Saved → 2024_07_v2/20240728.csv
✅ Saved → 2024_07_v2/20240729.csv
✅ Saved → 2024_07_v2/20240730.csv
✅ Saved → 2024_07_v2/20240731.csv
⏩ Skip (exists): 2024_08_v2/20240801.csv
⏩ Skip (exists): 2024_08_v2/20240802.csv
⏩ Skip (exists): 2024_08_v2/20240803.csv
⏩ Skip (exists): 2024_08_v2/20240804.csv
⏩ Skip (exists): 2024_08_v2/20240805.csv
⏩ Skip (exists): 2024_08_v2/20240806.csv
⏩ Skip (exists): 2024_08_v2/20240807.csv
⏩ Skip (exists): 2024_08_v2/20240808.csv
⏩ Skip (exists): 2024_08_v2/20240809.csv
⏩ Skip (exists): 2024_08_v2/20240810.csv
⏩ Skip (exists): 2024_08_v2/20240811.csv
⏩ Skip (exists): 2024_08_v2/20240812.csv
⏩ Skip (exists): 2024_08_v2/20240813.csv
⏩ Skip (exists): 2024_08_v2/20240814.csv
⏩ Skip (exists): 2024_08_v2/20240815.csv
⏩ Skip (exi