# ERA5-Land Forcing for Wflow SBM: An Expert Workflow**Objective:** Acquire, validate, and process high-resolution climate reanalysis data to drive a physically-based hydrological model (Wflow SBM) for the Upper Niger Basin.## Scientific RationaleTo accurately simulate discharge in the Upper Niger (a complex monsoon-driven system), we require forcing data that represents:1.  **Spatial Heterogeneity:** Rainfall convective systems in West Africa are localized. ERA5-Land's **9km resolution** captures this better than global ERA5 (31km).2.  **Physical Consistency:** We use ERA5-Land because its land surface model (HTESSEL) ensures closure of the water and energy balance, providing physically consistent **Precipitation** and **Evaporation** estimates.3.  **Temporal Dynamics:** The **hourly** resolution allows us to capture diurnal cycles (e.g., peak afternoon convection), which is aggregated to daily for the Wflow SBM timestep.## Methodology1.  **Acquisition:** Parallel retrieval from Copernicus Climate Data Store (CDS) with optimized unarchived requests.2.  **Processing:** Physical unit conversion (SI $\to$ Hydrological) and temporal aggregation (Flux summation, State averaging).3.  **Validation:** Statistical checks and interactive visualization to ensure data integrity before modeling.---**Author:** Expert Hydrologist & Data Scientist  **Target:** Wflow SBM / HydroMT Integration

# 1. Environment SetupEnsuring reproducible execution with minimal dependencies.

In [None]:
import sysimport osfrom pathlib import Pathimport warnings# Core Data Scienceimport numpy as npimport pandas as pdimport xarray as xr# Visualizationimport foliumimport plotly.graph_objects as gofrom plotly.subplots import make_subplotsimport matplotlib.pyplot as plt# Data Acquisitionimport cdsapiimport zipfilefrom concurrent.futures import ThreadPoolExecutor, as_completed# Configurationwarnings.filterwarnings('ignore')  # Suppress generic warnings for clean outputprint(f"Python {sys.version.split()[0]}")print(f"Xarray {xr.__version__} | Pandas {pd.__version__}")

# 2. Configuration & Study AreaDefining the spatiotemporal domain. The **Upper Niger Basin** is defined by a bounding box capturing the headwaters in Guinea/Mali.**Note:** We use `grid=[0.1, 0.1]` to explicitly request the native 9km resolution, avoiding server-side interpolation artifacts.

In [None]:
# ==========================================# EXPERT CONFIGURATION# ==========================================# 1. Spatiotemporal Domain#    Upper Niger Basin [North, West, South, East]AREA = [12.5, -10.0, 10.0, -7.0] YEARS = [2019, 2020]# 2. Variable Selection (Wflow SBM Requirements)#    - 2m_temperature: Controls snowmelt (if any) and ET processes (State)#    - total_precipitation: Mass input to the catchment (Flux)#    - potential_evaporation: Driving force for actual ET (Flux)#    - volumetric_soil_water_layer_1: Initial conditions / Antecedent moisture (State)VARIABLES = [    "2m_temperature",    "total_precipitation",     "potential_evaporation",    "volumetric_soil_water_layer_1"]# 3. Output StrategyOUTPUT_DIR = Path("era5_niger")OUTPUT_DIR.mkdir(parents=True, exist_ok=True)GRID = [0.1, 0.1]  # Native ERA5-Land resolutionMAX_WORKERS = 3    # Respect CDS fair-usage limits# 4. Auth CheckRC_PATH = Path.home() / ".cdsapirc"if not RC_PATH.exists() and Path(".cdsapirc").exists():    RC_PATH = Path(".cdsapirc")    os.environ["CDSAPI_RC"] = str(RC_PATH.resolve())print(f"Target: {OUTPUT_DIR.resolve()}")print(f"Domain: {AREA} @ {GRID[0]}° resolution")

# 3. Domain VisualizationVerifying the bounding box against the physical geography of the Niger River.

In [None]:
center_lat = (AREA[0] + AREA[2]) / 2center_lon = (AREA[1] + AREA[3]) / 2m = folium.Map(location=[center_lat, center_lon], zoom_start=7, tiles="CartoDB positron")# Add bounding boxfolium.Rectangle(    bounds=[[AREA[2], AREA[1]], [AREA[0], AREA[3]]],    color="#2563eb",    weight=3,    fill=True,    fill_opacity=0.15,    popup="<b>ERA5-Land Request Domain</b><br>Upper Niger Basin").add_to(m)# Add context markerfolium.Marker(    [center_lat, center_lon],     popup="Basin Center",    icon=folium.Icon(color="blue", icon="info-sign")).add_to(m)m

# 4. High-Performance Data Acquisition**Engineering Insight:** - We use `download_format: 'unarchived'` to request raw NetCDF files directly, bypassing the time-consuming server-side ZIP compression where possible.- A `ThreadPoolExecutor` manages parallel requests, saturating the allowed bandwidth without violating API limits.- Robust ZIP handling is included as a fallback, as CDS may enforce archiving for larger requests.

In [None]:
client = cdsapi.Client()def download_month(year: int, month: int) -> Path:    """    Robust download function with 'unarchived' optimization and ZIP fallback.    Returns path to the valid NetCDF file.    """    target = OUTPUT_DIR / f"era5_{year}_{month:02d}.nc"    if target.exists():        return target # Idempotency: Skip if exists            # Temp path for download    temp = OUTPUT_DIR / f"temp_{year}_{month:02d}.download"        try:        client.retrieve(            "reanalysis-era5-land",            {                "variable": VARIABLES,                "year": str(year),                "month": f"{month:02d}",                "day": [f"{d:02d}" for d in range(1, 32)],                "time": [f"{h:02d}:00" for h in range(24)],                "area": AREA,                "grid": GRID,                "format": "netcdf",                "download_format": "unarchived" # Optimization            },            str(temp)        )                # Check if CDS returned a ZIP (despite our request) or a NetCDF        if zipfile.is_zipfile(temp):            with zipfile.ZipFile(temp, "r") as z:                # Find the .nc file inside                nc_name = [n for n in z.namelist() if n.endswith('.nc')][0]                z.extract(nc_name, OUTPUT_DIR)                (OUTPUT_DIR / nc_name).rename(target)            temp.unlink()        else:            # It's already a NetCDF            temp.rename(target)                    print(f"✓ {year}-{month:02d}: Success")        return target            except Exception as e:        print(f"✗ {year}-{month:02d}: Failed ({e})")        if temp.exists(): temp.unlink()        raise# Parallel Execution# ------------------RUN_DOWNLOAD = Trueif RUN_DOWNLOAD:    print(f"Starting parallel download with {MAX_WORKERS} workers...")    tasks = [(y, m) for y in YEARS for m in range(1, 13)]        with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:        futures = {executor.submit(download_month, y, m): (y, m) for y, m in tasks}                for future in as_completed(futures):            try:                future.result()            except Exception:                pass # Error logged in function                    print(f"\nPipeline complete. Files: {len(list(OUTPUT_DIR.glob('era5_*.nc')))}")

# 5. Data Loading & Physical InspectionUsing `xarray` with Dask to lazily load the multi-file dataset. This handles memory efficiently.

In [None]:
files = sorted(OUTPUT_DIR.glob("era5_????_??.nc"))if not files:    raise FileNotFoundError("No NetCDF files found. Check download step.")# Combine by coordinates (time)ds = xr.open_mfdataset(    files,     combine="by_coords",     parallel=True,     chunks={"time": "auto"} # Dask chunking).sortby("time")# Quick Inspectionprint(f"Dimensions: {ds.sizes}")print(f"Variables: {list(ds.data_vars)}")ds.head()

# 6. Physical Transformation & HarmonizationHydrological models require specific units. We apply rigorous physical conversions:| Variable | Raw Unit | Target Unit | Transformation Logic ||----------|----------|-------------|----------------------|| **Temp** | $K$ | $^\circ C$ | $T_c = T_k - 273.15$ (Standard) || **Precip** | $m$ | $mm$ | $P_{mm} = P_m \times 1000$ (Depth conversion) || **PET** | $m$ | $mm$ | $PET_{mm} = |PET_m| \times 1000$ (ERA5 uses negative for evaporation) || **Soil** | $m^3/m^3$ | $m^3/m^3$ | None (Ratio) |

In [None]:
ds_hydro = xr.Dataset()# 1. Temperature (State)ds_hydro["temp"] = ds["t2m"] - 273.15ds_hydro["temp"].attrs = {"units": "degC", "long_name": "Air Temperature"}# 2. Precipitation (Flux)ds_hydro["precip"] = ds["tp"] * 1000.0ds_hydro["precip"].attrs = {"units": "mm", "long_name": "Total Precipitation"}# 3. Potential Evapotranspiration (Flux)#    Note: 'pev' is potential evaporation. We use abs() because ERA5 treats it as a loss (negative).ds_hydro["pet"] = abs(ds["pev"]) * 1000.0ds_hydro["pet"].attrs = {"units": "mm", "long_name": "Potential Evapotranspiration"}# 4. Soil Moisture (State)ds_hydro["soil_moisture"] = ds["swvl1"]ds_hydro["soil_moisture"].attrs = {"units": "m3/m3", "long_name": "Volumetric Soil Moisture"}# Copy coordsds_hydro = ds_hydro.assign_coords(ds.coords)

# 7. Temporal Aggregation (Hourly $\to$ Daily)**Hydrological Correctness:**- **Fluxes (Precip, PET):** Must be **SUMMED** to get total daily volume.- **States (Temp, Soil):** Must be **AVERAGED** to get representative daily condition.*Incorrect aggregation (e.g., averaging precip) is a common error that destroys water balance.*

In [None]:
ds_daily = xr.Dataset()# Aggregation Rulesds_daily["precip"] = ds_hydro["precip"].resample(time="1D").sum()ds_daily["pet"]    = ds_hydro["pet"].resample(time="1D").sum()ds_daily["temp"]   = ds_hydro["temp"].resample(time="1D").mean()ds_daily["soil_moisture"] = ds_hydro["soil_moisture"].resample(time="1D").mean()# Update Metadata for Daily Timestepds_daily["precip"].attrs = {"units": "mm/day", "long_name": "Daily Total Precipitation"}ds_daily["pet"].attrs    = {"units": "mm/day", "long_name": "Daily Total PET"}ds_daily["temp"].attrs   = {"units": "degC",   "long_name": "Daily Mean Temperature"}print(f"Daily dataset shape: {ds_daily.sizes}")

# 8. Interactive Exploration: The Hydro-Climate PulseVisualizing the spatially-averaged time series to validate the seasonal signal (West African Monsoon).- **Expectation:** Sharp rainfall peaks Jun-Sep, correlated with temperature drops (cloud cover/evaporative cooling).

In [None]:
# Calculate Spatial Means for plottingts_precip = ds_daily["precip"].mean(dim=["latitude", "longitude"])ts_temp   = ds_daily["temp"].mean(dim=["latitude", "longitude"])ts_pet    = ds_daily["pet"].mean(dim=["latitude", "longitude"])# Plotly Interactive Chartfig = make_subplots(specs=[[{"secondary_y": True}]])# Precip Bars (Blue)fig.add_trace(    go.Bar(x=ts_precip.time, y=ts_precip.values, name="Precipitation (mm)", marker_color="#3b82f6", opacity=0.6),    secondary_y=False)# Temp Line (Red)fig.add_trace(    go.Scatter(x=ts_temp.time, y=ts_temp.values, name="Temperature (°C)", line=dict(color="#ef4444", width=2)),    secondary_y=True)# PET Line (Orange - Dashed)fig.add_trace(    go.Scatter(x=ts_pet.time, y=ts_pet.values, name="PET (mm)", line=dict(color="#f97316", width=2, dash='dot')),    secondary_y=False)fig.update_layout(    title="<b>Upper Niger Hydro-Climatology</b><br><i>ERA5-Land Daily Forcing (Spatially Averaged)</i>",    template="plotly_white",    height=500,    legend=dict(orientation="h", y=-0.15),    hovermode="x unified")fig.update_yaxes(title_text="Fluxes (mm/day)", secondary_y=False)fig.update_yaxes(title_text="Temperature (°C)", secondary_y=True)fig.show()

# 9. Spatial Pattern VerificationChecking mean fields to ensure no artifacts (e.g., missing tiles, grid errors).

In [None]:
fig, axes = plt.subplots(2, 2, figsize=(14, 10))plt.suptitle("Mean Spatial Patterns (2019-2020)", fontsize=16)# Precipds_daily["precip"].mean("time").plot(ax=axes[0,0], cmap="Blues", cbar_kwargs={'label': 'mm/day'})axes[0,0].set_title("Mean Precipitation")# Tempds_daily["temp"].mean("time").plot(ax=axes[0,1], cmap="RdYlBu_r", cbar_kwargs={'label': '°C'})axes[0,1].set_title("Mean Temperature")# PETds_daily["pet"].mean("time").plot(ax=axes[1,0], cmap="Oranges", cbar_kwargs={'label': 'mm/day'})axes[1,0].set_title("Mean PET")# Soil Moistureds_daily["soil_moisture"].mean("time").plot(ax=axes[1,1], cmap="YlGnBu", cbar_kwargs={'label': 'm³/m³'})axes[1,1].set_title("Mean Soil Moisture")plt.tight_layout()plt.show()

# 10. Export for ModelingSaving the validated, Wflow-ready dataset. We use **NetCDF4 with compression** to optimize storage.

In [None]:
# Global Metadata (CF-Conventions)ds_daily.attrs = {    "title": "ERA5-Land Forcing for Wflow SBM",    "institution": "Expert Hydrology Lab",    "source": "ECMWF ERA5-Land via Copernicus CDS",    "history": "Processed via Python xarray pipeline",    "conventions": "CF-1.6"}# Output pathout_file = OUTPUT_DIR / "era5_niger_daily_wflow.nc"# Encoding (Compression)encoding = {var: {"zlib": True, "complevel": 4} for var in ds_daily.data_vars}# Writeds_daily.to_netcdf(out_file, encoding=encoding)print(f"✓ Success! Exported to: {out_file}")print(f"  Size: {out_file.stat().st_size / 1024 / 1024:.2f} MB")print("\nNext Step: Configure 'hydromt_data.yml' to point to this file.")