# Test Merge Forecasts Notebook

Dit Jupyter Notebook laadt en merge forecast-bestanden zonder ze op te slaan. Alle stappen worden gelogd in de console.

In [None]:
#!/usr/bin/env python3
import logging
import glob
import xarray as xr
import sys
import os

# --- Configuratie ---
MAX_FILES   = 10
PATH1       = "/home/jupyter-ayoub/data/EUPP"
PATH2       = "/home/jupyter-aaron/Postprocessing/PP_EUPP/data/EUPP"
OUTPUT_DIR  = "/home/jupyter-ayoub/data/test"
OUTPUT_STORE= "combined_forecasts.zarr"

def setup_logging():
    """Zet logging naar STDOUT op DEBUG-niveau."""
    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    handler = logging.StreamHandler(sys.stdout)
    handler.setFormatter(logging.Formatter("%(asctime)s | %(levelname)-8s | %(message)s"))
    logger.handlers.clear()
    logger.addHandler(handler)

def find_files(path, limit=None):
    pattern = os.path.join(path, "output.sfc.*.nc")
    files = sorted(glob.glob(pattern))
    logging.info(f"Gevonden {len(files)} bestanden in {path}")
    if limit:
        files = files[:limit]
        logging.info(f"  → Beperk tot eerste {limit} bestanden")
    for f in files:
        logging.debug(f"    • {f}")
    return files

def preprocess(ds):
    """Drop 'valid_time' om MergeError te voorkomen."""
    return ds.drop_vars("valid_time", errors="ignore")

def open_group(files, name):
    """Open een set NetCDF-bestanden via nested concatenation over time."""
    logging.info(f"Open groep '{name}' met {len(files)} bestanden …")
    ds = xr.open_mfdataset(
        files,
        combine="nested",
        concat_dim="time",
        preprocess=preprocess,
        parallel=False,
        data_vars="minimal",
        coords="minimal",
        compat="override"
    )
    logging.info(f"  → '{name}' dims: {ds.dims}")
    logging.info(f"  → '{name}' vars: {list(ds.data_vars.keys())}")
    return ds

def main():
    setup_logging()
    logging.info("=== Merge & inspect all variables ===")

    # 1) Bestanden vinden
    files1 = find_files(PATH1, limit=MAX_FILES)
    files2 = find_files(PATH2, limit=MAX_FILES)
    if not (files1 or files2):
        logging.error("Geen bestanden gevonden in beide paden!")
        sys.exit(1)

    # 2) Groepen inladen
    ds1 = open_group(files1, name="EUPP-ayoub")
    ds2 = open_group(files2, name="EUPP-aaron")

    # 3) Merger
    logging.info("Start merge van beide groepen…")
    ds_all = xr.merge([files1, files2])
    logging.info("Merge compleet!")
    logging.info(f"  dims: {ds_all.dims}")
    logging.info(f"  vars ({len(ds_all.data_vars)}): {list(ds_all.data_vars.keys())}")

    # 4) Per variabele een apart Dataset printen
    for var in ds_all.data_vars:
        print("\n" + "="*80)
        print(f"Variable: {var}")
        print("="*80)
        # ds_all[[var]] geeft een Dataset met precies die ene variabele
        print(ds_all[[var]])

    # 5) (Optioneel) schrijven naar Zarr
    #os.makedirs(OUTPUT_DIR, exist_ok=True)
    #output_path = os.path.join(OUTPUT_DIR, OUTPUT_STORE)
    #logging.info(f"Schrijf merged dataset naar Zarr: {output_path}")
    # ds_all.to_zarr(output_path, mode="w")
    #logging.info("Klaar – Zarr-store geschreven.")

if __name__ == "__main__":
    main()


In [2]:
#!/usr/bin/env python3
import logging
import glob
import xarray as xr
import pandas as pd
import sys
import os
from collections import OrderedDict

# --- Configuratie ---
MAX_FILES    = 10
PATH1        = "/home/jupyter-ayoub/data/EUPP"
PATH2        = "/home/jupyter-aaron/Postprocessing/PP_EUPP/data/EUPP"
OUTPUT_ROOT  = "/home/jupyter-ayoub/data/test"
FORECAST_DIR = os.path.join(OUTPUT_ROOT, "forecasts")

def setup_logging():
    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    h = logging.StreamHandler(sys.stdout)
    h.setFormatter(logging.Formatter("%(asctime)s | %(levelname)-8s | %(message)s"))
    logger.handlers.clear()
    logger.addHandler(h)

def find_files(path, limit=None):
    pattern = os.path.join(path, "output.sfc.*.nc")
    files = sorted(glob.glob(pattern))
    logging.info(f"Gevonden {len(files)} bestanden in {path}")
    if limit:
        files = files[:limit]
        logging.info(f"  → Gebruik eerste {limit}")
    return files

def build_nested_file_list(files):
    """
    Groepeer op jaar (uit de naam output.sfc.<year>.<date>.nc) en maak
    een geneste lijst per jaar:
      [[f_yr0_time0, f_yr0_time1, …], [f_yr1_time0, …], …]
    """
    groups = OrderedDict()
    for f in files:
        year = int(os.path.basename(f).split('.')[2])
        groups.setdefault(year, []).append(f)
    # zorg dat binnen elk jaar de files gesorteerd staan
    nested = [groups[y] for y in sorted(groups)]
    logging.info(f"  → Gemaakt {len(nested)} jaar-groepen voor nested combine")
    return nested

def preprocess(ds):
    # drop conflicterende coord
    return ds.drop_vars("valid_time", errors="ignore")

def open_group(path, name):
    """
    Open alle files in `path` als één Dataset met echte year- én time-dimensie.
    """
    files = find_files(path, limit=MAX_FILES)
    nested = build_nested_file_list(files)
    logging.info(f"Nested combine voor groep '{name}' met dims ['year','time'] …")
    ds = xr.open_mfdataset(
        nested,
        combine="nested",
        concat_dim=["year", "time"],
        preprocess=preprocess,
        parallel=False,
        data_vars="minimal",
        coords="minimal",
        compat="override"
    )
    # zet dims in de door jou gewenste volgorde
    desired = ["time","number","year","step","surface","latitude","longitude"]
    ds = ds.transpose(*[d for d in desired if d in ds.dims])
    logging.info(f"  → '{name}' dims: {ds.dims}")
    logging.info(f"  → '{name}' vars: {list(ds.data_vars)}")
    return ds

def merge_groups(ds1, ds2):
    logging.info("Mergen van beide groepen…")
    ds = xr.merge([ds1, ds2], compat="override")
    logging.info(f"  → merged dims: {ds.dims}")
    logging.info(f"  → merged vars: {list(ds.data_vars)}")
    return ds

def save_per_timestep(ds, out_dir, prefix="output.sfc"):
    """Schrijf per (year,time) precies één bestand weg."""
    os.makedirs(out_dir, exist_ok=True)
    for y in range(ds.sizes["year"]):
        for t in range(ds.sizes["time"]):
            date = pd.to_datetime(ds.time.values[t]).strftime("%Y%m%d")
            fname = f"{prefix}.{y}.{date}.nc"
            fpath = os.path.join(out_dir, fname)
            ds.isel(year=y, time=t).to_netcdf(fpath)
            logging.debug(f"Saved {fpath}")

def main():
    setup_logging()
    logging.info("=== Merge + export met volledige year+time dims ===")

    ds1 = open_group(PATH1, "EUPP-ayoub")
    ds2 = open_group(PATH2, "EUPP-aaron")

    ds_all = merge_groups(ds1, ds2)

    logging.info(f"Schrijven per (year,time) naar {FORECAST_DIR} …")
    save_per_timestep(ds_all, FORECAST_DIR)

    logging.info("Klaar – alle bestanden zijn weggeschreven.")

if __name__ == "__main__":
    main()


2025-04-24 15:27:47,149 | INFO     | === Merge + export met volledige year+time dims ===
2025-04-24 15:27:47,164 | INFO     | Gevonden 4180 bestanden in /home/jupyter-ayoub/data/EUPP
2025-04-24 15:27:47,164 | INFO     |   → Gebruik eerste 10
2025-04-24 15:27:47,165 | INFO     |   → Gemaakt 1 jaar-groepen voor nested combine
2025-04-24 15:27:47,165 | INFO     | Nested combine voor groep 'EUPP-ayoub' met dims ['year','time'] …
2025-04-24 15:27:47,383 | INFO     |   → 'EUPP-ayoub' vars: ['q', 'tp6', 'ssr6', 'str6', 'ssrd6', 'strd6', 'ssrd6_obs']
2025-04-24 15:27:47,397 | INFO     | Gevonden 4180 bestanden in /home/jupyter-aaron/Postprocessing/PP_EUPP/data/EUPP
2025-04-24 15:27:47,397 | INFO     |   → Gebruik eerste 10
2025-04-24 15:27:47,398 | INFO     |   → Gemaakt 1 jaar-groepen voor nested combine
2025-04-24 15:27:47,398 | INFO     | Nested combine voor groep 'EUPP-aaron' met dims ['year','time'] …
2025-04-24 15:27:47,783 | INFO     |   → 'EUPP-aaron' vars: ['t2m', 'z', 't', 'u10', 'v1