<a href="https://colab.research.google.com/github/esb-index/Barka-AV/blob/main/Untitled0.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# ==========================
# ERA5 ZIP -> MERGE -> PROCESS pipeline (Colab)
# ==========================
# Másold be egy Colab cellába és futtasd.
# Feltételezzük: a zip fájlok egy 'cds' alkönyvtárban vannak (pl. /content/cds)
# Fájlok nevei: dania1.zip ... dania4.zip, nemet1..4.zip, uk1..4.zip, tajvan1..4.zip, usa1..4.zip
!mkdir cds
!unzip cds_all.zip -d cds
!ls cds

# 0) TELEPÍTÉSEK
!pip install xarray netCDF4 cfgrib cftime dask[complete] fsspec aiohttp pandas numpy scipy tqdm

# 1) KÖNYVTÁRSTRUKTÚRA ÉS ZIP KIBONTÁSA
import os, glob, zipfile, shutil
from tqdm import tqdm

ROOT = "/content"               # Colab root - módosítsd ha Drive-ot használsz (pl. /content/drive/MyDrive/era5)
ZIPDIR = os.path.join(ROOT, "cds")
EXTRACT_DIR = os.path.join(ROOT, "era5_extracted")
os.makedirs(EXTRACT_DIR, exist_ok=True)

zip_files = sorted(glob.glob(os.path.join(ZIPDIR, "*.zip")))
print(f"Talált zip fájlok: {len(zip_files)}")

for zf in tqdm(zip_files, desc="Kibontás"):
    with zipfile.ZipFile(zf, 'r') as z:
        # Kibontás egy alfolderbe az eredeti zip név alapján
        base = os.path.splitext(os.path.basename(zf))[0]
        outdir = os.path.join(EXTRACT_DIR, base)
        os.makedirs(outdir, exist_ok=True)
        z.extractall(outdir)

print("Kibontás kész. Kimenet:", EXTRACT_DIR)

# 2) FÁJLFORMÁTUM ELLENŐRZÉSE (GRIB vs NETCDF)
import pathlib
all_files = sorted(glob.glob(os.path.join(EXTRACT_DIR, "**", "*"), recursive=True))
nc_files = [f for f in all_files if f.endswith(".nc")]
grib_files = [f for f in all_files if f.endswith(".grib") or f.endswith(".grb")]

print(f"NetCDF fájlok: {len(nc_files)}, GRIB fájlok: {len(grib_files)}")

# 3) Ha vannak GRIB fájlok: konvertáld NetCDF-re (xarray+cfgrib vagy cdo ha telepítve)
# Megpróbáljuk cfgrib + xarray útvonalat: egyszerű és Pythonos. (Ha cfgrib gond van, érdemes CDO-t telepíteni.)
import xarray as xr

CONVERTED_DIR = os.path.join(ROOT, "era5_nc")
os.makedirs(CONVERTED_DIR, exist_ok=True)

def grib_to_nc(inpath, outdir=CONVERTED_DIR):
    # out filename based on input
    base = pathlib.Path(inpath).stem
    outpath = os.path.join(outdir, base + ".nc")
    if os.path.exists(outpath):
        return outpath
    try:
        # open with cfgrib engine and save to netcdf
        ds = xr.open_dataset(inpath, engine="cfgrib")
        # Some GRIBs have multiple messages per file -> try to merge dims safely
        ds.to_netcdf(outpath)
        ds.close()
        return outpath
    except Exception as e:
        print(f"cfgrib conversion failed for {inpath}: {e}")
        return None

if len(grib_files) > 0:
    print("Konvertálás GRIB->NetCDF (cfgrib)...")
    for g in tqdm(grib_files):
        grib_to_nc(g)

# Frissítsük a NetCDF listát
nc_files = sorted(glob.glob(os.path.join(EXTRACT_DIR, "**", "*.nc"), recursive=True)) \
           + sorted(glob.glob(os.path.join(CONVERTED_DIR, "*.nc")))
nc_files = sorted(set(nc_files))
print(f"Összes NetCDF fájl (kibontott + konvertált): {len(nc_files)}")

# 4) REGION LABELLEZÉS (a zip mappanevek alapján)
# Miután kibontottuk, az eredeti zip-könyvtár neve (pl 'dania1') megtalálható az útvonalban.
def region_from_path(p):
    p_low = p.lower()
    if "dania" in p_low: return "dania"
    if "nemet" in p_low: return "nemet"
    if "uk" in p_low: return "uk"   # tartalmaz hollandot is per a te megjegyzésed
    if "tajvan" in p_low: return "tajvan"
    if "usa" in p_low: return "usa"
    return "other"

region_files = {}
for f in nc_files:
    r = region_from_path(f)
    region_files.setdefault(r, []).append(f)

for r, fl in region_files.items():
    print(r, len(fl))

# 5) VÁLTOZÓLISTA (amelyeket feldolgozunk) — unify: a legfontosabbak (európa/usa/tajvan)
variables_needed = [
    "t2m",   # 2m_temperature
    "tp",    # total_precipitation
    "u10", "v10",  # 10m u/v
    "msl",   # mean_sea_level_pressure
    "ssrd",  # surface_solar_radiation_downwards
    "sd",    # snow_depth (nem kötelező minden régiónak, de ha van)
    "sst",   # sea_surface_temperature
    "10fg"   # gust (cfgrib variable name may vary; handle aliases)
]
# Megjegyzés: a NetCDF változónevek eltérhetnek (pl. '2m_temperature' vagy 't2m').
# A feldolgozásnál megpróbálunk különböző aliasokat.

# 6) HELYI SEGÉDFÜGGVÉNYEK: változó-ALIAS kezelés, winsorize, imputálás, deriváltak
import numpy as np
import pandas as pd
from scipy import stats

def alias_map(varname):
    # egyszerű alias leképezés a gyakori ERA5 nevekhez
    m = {
        "t2m": ["t2m", "2m_temperature", "air_temperature_at_2m"],
        "tp": ["tp", "total_precipitation"],
        "u10": ["u10", "10m_u_component_of_wind"],
        "v10": ["v10", "10m_v_component_of_wind"],
        "msl": ["msl", "mean_sea_level_pressure"],
        "ssrd": ["ssrd", "surface_solar_radiation_downwards"],
        "sd": ["sd", "snow_depth"],
        "sst": ["sst", "sea_surface_temperature"],
        "10fg": ["10fg", "maximum_wind_gust_since_previous_post_processing", "10m_wind_gust", "gust"]
    }
    return m.get(varname, [varname])

def find_variable_in_ds(ds, logical_name):
    for alias in alias_map(logical_name):
        if alias in ds.variables:
            return alias
    return None

def winsorize_series(arr, lower_q=0.01, upper_q=0.99):
    # arr is 1D numpy; clip to percentiles
    lo = np.nanpercentile(arr, lower_q*100)
    hi = np.nanpercentile(arr, upper_q*100)
    return np.clip(arr, lo, hi)

# 7) REGIONÁLIS MERGE ÉS PROCESS (példányosított)
OUT_DIR = os.path.join(ROOT, "era5_processed")
os.makedirs(OUT_DIR, exist_ok=True)

# Dask performance
import dask
dask.config.set({"array.slicing.split_large_chunks": False})
# dask.config.set(scheduler='threads')

for region, files in region_files.items():
    if region == "other" or len(files) == 0:
        continue
    print(f"\n--- Feldolgozás: {region} ({len(files)} fájl) ---")
    # Megpróbáljuk egyszerre megnyitni őket
    try:
        ds = xr.open_mfdataset(files, combine='by_coords', parallel=True, concat_dim="time")
    except Exception as e:
        print(f"open_mfdataset hiba: {e}\nPróbáljuk változónként összevonni.")
        # Alternatíva: változónként nyitjuk és merge-öljük
        var_datasets = []
        for v in variables_needed:
            # keresünk az első fájlban alias-t
            found = None
            for f in files:
                try:
                    with xr.open_dataset(f) as tmp:
                        alias = find_variable_in_ds(tmp, v)
                        if alias:
                            found = alias
                            break
                except Exception:
                    continue
            if not found:
                continue
            # most open_mfdataset csak ezzel a varra (usecols equivalent: use sel? use preprocess)
            def preprocess(ds_in, varalias=found):
                # keep only variable and time/coords to reduce mem
                to_keep = [varalias] + [d for d in ds_in.dims.keys()] + [c for c in ds_in.coords.keys()]
                # ensure time present
                return ds_in[[varalias]]
            try:
                dd = xr.open_mfdataset(files, preprocess=preprocess, combine='by_coords', parallel=True, concat_dim="time")
                var_datasets.append(dd)
            except Exception as e2:
                print(f"Változó {v} nyitása sikertelen: {e2}")
        # egyszerű merge
        if len(var_datasets) == 0:
            print("Nincs egyetlen megnyitható változó sem, továbblépünk.")
            continue
        ds = xr.merge(var_datasets)

    print("Dataset megnyitva. Változók:", list(ds.variables.keys())[:30])

    # 8) Alias normalizálás: átnevezzük a változókat logikus nevekre (t2m, u10 stb.) ha található
    rename_map = {}
    for logical in ["t2m","tp","u10","v10","msl","ssrd","sd","sst","10fg"]:
        found = find_variable_in_ds(ds, logical)
        if found:
            rename_map[found] = logical
    if rename_map:
        ds = ds.rename(rename_map)
        print("Átnevezett változók:", rename_map)

    # 9) IDŐTENGELY és CHUNK
    # Győződj meg róla, hogy van time dim
    if "time" not in ds.dims:
        print("Nincs time dim, átugorva.")
        continue

    # 10) Deriváltak: wind speed, gust daily max, wind@100m (power law), dMSLP/dt
    # Wind speed (instant)
    if ("u10" in ds.variables) and ("v10" in ds.variables):
        ds["wind10"] = np.sqrt(ds["u10"]**2 + ds["v10"]**2)

    # Hub height extrapolation (simple power law) -> wind at 100m
    # U_model at z: U_z = U_ref * (z/z_ref)**alpha. alpha=0.14 (neutral) default
    z_ref = 10.0
    z_target = 100.0
    alpha = 0.14
    if "wind10" in ds.variables:
        ds["wind100"] = ds["wind10"] * (z_target / z_ref)**alpha

    # dMSLP/dt (24h difference) -> requires hourly data; we compute a 24h centered diff if possible
    if "msl" in ds.variables:
        try:
            ds = ds.assign_coords(time=ds.time)
            dmsl = ds["msl"].diff("time", n=24)  # difference over 24 steps: only valid if hourly and continuous
            # align by padding with NaNs to original length
            ds["dmsl_24h"] = dmsl.reindex_like(ds["msl"])
        except Exception:
            # fallback: simple diff next-prev
            ds["dmsl_1step"] = ds["msl"].diff("time")

    # daily aggregates: we resample to daily and compute daily max gust, daily precipitation sum, daily mean temp etc.
    daily = ds.resample(time="1D").agg({
        v: ("max" if v in ["10fg","wind10","wind100"] else "sum" if v in ["tp"] else "mean")
        for v in ds.data_vars
    })
    # Note: resample with dict requires xarray>=0.20; else we do simpler approach:
    # For safety, fallback to explicit calcs if above fails
    try:
        daily = ds.resample(time="1D").reduce(lambda x: x.max() if x.name in ["10fg","wind10","wind100"] else x.sum() if x.name=="tp" else x.mean())
    except Exception:
        # Simple commonly needed ones:
        daily = xr.Dataset()
        if "10fg" in ds.variables:
            daily["daily_max_gust"] = ds["10fg"].resample(time="1D").max()
        if "wind10" in ds.variables:
            daily["daily_max_wind10"] = ds["wind10"].resample(time="1D").max()
        if "tp" in ds.variables:
            daily["daily_precip_sum"] = ds["tp"].resample(time="1D").sum()
        if "t2m" in ds.variables:
            daily["daily_mean_t2m"] = ds["t2m"].resample(time="1D").mean()

    # 11) WINSORIZE (1-99%) és egyszerű imputáció (timewise interpolation then global median)
    # We'll perform winsorize per variable over the whole region dataset (flatten)
    def winsorize_dataset(ds_in, low_q=0.01, up_q=0.99):
        for v in list(ds_in.data_vars):
            try:
                arr = ds_in[v].values
                if np.isnan(arr).all():
                    continue
                lo = np.nanpercentile(arr, low_q*100)
                hi = np.nanpercentile(arr, up_q*100)
                arr_clipped = np.clip(arr, lo, hi)
                ds_in[v].values = arr_clipped
            except Exception as e:
                print(f"winsorize hiba {v}: {e}")
        return ds_in

    daily = winsorize_dataset(daily, 0.01, 0.99)
    # Impute: linear time interpolation per gridpoint if available, else fillna with median
    def impute_dataset(ds_in):
        for v in list(ds_in.data_vars):
            try:
                da = ds_in[v]
                # convert to pandas in time dimension per gridpoint if small; else do xarray interpolation
                da_interp = da.interpolate_na(dim="time", method="linear", fill_value="extrapolate")
                # still nan? fill with median
                if np.isnan(da_interp.values).any():
                    med = np.nanmedian(da_interp.values)
                    da_interp = da_interp.fillna(med)
                ds_in[v] = da_interp
            except Exception as e:
                print(f"Imputáció hiba {v}: {e}")
        return ds_in

    daily = impute_dataset(daily)

    # 12) EXTRACT KEY INDICATORS TO CSV (per time and aggregated spatially: mean, max, percentiles)
    # We'll compute region-aggregated series: region_mean, region_max, region_p95 for each daily var
    import math
    indicators = {}
    for v in daily.data_vars:
        da = daily[v]
        # spatial aggregation: mean over lat/lon dims if present
        spatial_dims = [d for d in da.dims if d not in ["time"]]
        if len(spatial_dims) > 0:
            regional_mean = da.mean(dim=spatial_dims)
            regional_max = da.max(dim=spatial_dims)
            regional_p95 = da.reduce(lambda x: np.nanpercentile(x, 95), dim=spatial_dims)
        else:
            regional_mean = da
            regional_max = da
            regional_p95 = da
        # to pandas series
        try:
            df = pd.DataFrame({
                f"{v}_mean": regional_mean.to_series(),
                f"{v}_max": regional_max.to_series(),
                f"{v}_p95": regional_p95.to_series()
            })
        except Exception:
            # fallback: convert with .values
            df = pd.DataFrame({
                f"{v}_mean": regional_mean.values,
                f"{v}_max": regional_max.values
            }, index=pd.to_datetime(daily.time.values))
        # append or merge
        if len(indicators)==0:
            indicators = df
        else:
            indicators = indicators.join(df, how="outer")

    # 13) Mentés: napi indicátor CSV, teljes regionális NetCDF (ha ok)
    csv_out = os.path.join(OUT_DIR, f"{region}_daily_indicators.csv")
    indicators.to_csv(csv_out)
    print(f"Regionális indikátor CSV kimentve: {csv_out}")

    # netcdf kimenet (daily)
    netcdf_out = os.path.join(OUT_DIR, f"{region}_daily.nc")
    try:
        daily.to_netcdf(netcdf_out)
        print(f"Regionális daily NetCDF kimenet: {netcdf_out}")
    except Exception as e:
        print(f"NetCDF mentés hiba: {e}")

    # cleanup ds to free memory
    try:
        ds.close()
        del ds
    except Exception:
        pass

print("\n==== Pipeline kész. Ellenőrizd az 'era5_processed' mappát. ====")


Archive:  cds_all.zip
  End-of-central-directory signature not found.  Either this file is not
  a zipfile, or it constitutes one disk of a multi-part archive.  In the
  latter case the central directory and zipfile comment will be found on
  the last disk(s) of this archive.
unzip:  cannot find zipfile directory in one of cds_all.zip or
        cds_all.zip.zip, and cannot find cds_all.zip.ZIP, period.
Collecting netCDF4
  Downloading netcdf4-1.7.3-cp311-abi3-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (1.9 kB)
Collecting cfgrib
  Downloading cfgrib-0.9.15.1-py3-none-any.whl.metadata (56 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m56.1/56.1 kB[0m [31m3.5 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting cftime
  Downloading cftime-1.6.5-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.whl.metadata (8.7 kB)
Collecting eccodes>=0.9.8 (from cfgrib)
  Downloading eccodes-2.44.0-py3-none-any.whl.metadata (15 kB)
Collecting lz4>=4.3.2 (from dask[comp

Kibontás: 0it [00:00, ?it/s]

Kibontás kész. Kimenet: /content/era5_extracted
NetCDF fájlok: 0, GRIB fájlok: 0





Összes NetCDF fájl (kibontott + konvertált): 0

==== Pipeline kész. Ellenőrizd az 'era5_processed' mappát. ====
