In [1]:
import json
import yaml
from pathlib import Path
import logging
import sys

import xarray as xr
import pandas as pd
import numpy as np

In [2]:
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
    handlers=[logging.StreamHandler(sys.stdout)]
)

LOGGER = logging.getLogger(__name__)

In [3]:
def filter_stations(targets, threshold=0.9):
    """Only keep stations with completeness above a certain threshold."""
    dims = ["forecast_reference_time", "t"]
    n_stations = len(targets.station.values)
    ds = targets.stack(s=dims).to_array("var")
    missing = np.isnan(ds).sum("var")
    completeness = (missing == 0).sum("s") / len(ds.s)
    targets = targets.where(completeness > threshold, drop=True)
    n_bad_stations = n_stations - len(targets.station.values)
    LOGGER.info(f"Filtered out {n_bad_stations} out of {n_stations}")
    return targets

In [4]:
def reshape(features: xr.Dataset, targets: xr.Dataset) -> tuple[xr.DataArray]:
    """Reshape data to 2-d (sample, variable) tensors."""
    dims = ["forecast_reference_time", "t", "station"]
    x = (
        features.to_array("var")
        .stack(s=dims, create_index=False)
        .transpose("s", ..., "var")
    )
    y = (
        targets.to_array("var")
        .stack(s=dims, create_index=False)
        .transpose("s", ..., "var")
    )
    LOGGER.info(f"Reshaped: x -> {dict(x.sizes)} and y -> {dict(y.sizes)}")
    return x, y

In [5]:
def drop_missing(x: xr.DataArray, y: xr.DataArray) -> tuple[xr.DataArray]:
    """Only keep complete (all features and targets available) samples."""
    n_samples = len(x.s.values)
    mask_x_dims = [dim for dim in x.dims if dim != "s"]
    mask_y_dims = [dim for dim in y.dims if dim != "s"]
    x = x[np.isfinite(y).all(dim=mask_y_dims)]
    y = y[np.isfinite(y).all(dim=mask_y_dims)]
    y = y[np.isfinite(x).all(dim=mask_x_dims)]
    x = x[np.isfinite(x).all(dim=mask_x_dims)]
    n_incomplete_samples = n_samples - len(x.s.values)
    LOGGER.info(f"Dropped {n_incomplete_samples} incomplete samples out of {n_samples}")
    return x, y

In [10]:
inputs = {
    "features": "data/features.zarr",  # Dosya yollarınızı burada güncelleyin
    "targets": "data/targets.zarr"
}

outputs = {
    "x": "filtered_and_preprocessed_data/x.nc",
    "y": "filtered_and_preprocessed_data/y.nc",
    "stations_list": "data/stations_list.json"
}

In [12]:
try:
    with open("config/config.yaml", "r") as f:
        config = yaml.safe_load(f)
except FileNotFoundError:
    # Manuel yapılandırma
    config = {
        "features": [
            "coe_air_temperature_ensavg",
            "coe_dew_point_temperature_ensavg",
            "coe_dew_point_depression_ensavg",
            "coe_surface_air_pressure_ensavg",
            "coe_relative_humidity_ensavg",
            "coe_water_vapor_mixing_ratio_ensavg",
            "coe_leadtime",
            "time_cos_hourofday",
            "time_sin_hourofday",
            "time_cos_dayofyear",
            "time_sin_dayofyear"
        ],
        "targets": [
            "obs_air_temperature",
            "obs_dew_point_temperature",
            "obs_surface_air_pressure",
            "obs_relative_humidity",
            "obs_water_vapor_mixing_ratio"
        ]
    }

In [13]:
LOGGER.info(f"Inputs: {inputs}")
LOGGER.info(f"Outputs: {outputs}")

2025-04-20 14:04:33 INFO     [2264915338.py:1] Inputs: {'features': 'data/features.zarr', 'targets': 'data/targets.zarr'}
2025-04-20 14:04:33 INFO     [2264915338.py:2] Outputs: {'x': 'filtered_and_preprocessed_data/x.nc', 'y': 'filtered_and_preprocessed_data/y.nc', 'stations_list': 'data/stations_list.json'}


In [16]:
features = xr.open_zarr(inputs["features"])
targets = xr.open_zarr(inputs["targets"])

  return cls(**configuration_parsed)
  return cls(**configuration_parsed)


In [17]:
LOGGER.info("Features variables:")
LOGGER.info(list(features.data_vars))
LOGGER.info("Features coords:")
LOGGER.info(list(features.coords))

LOGGER.info("Targets variables:")
LOGGER.info(list(targets.data_vars))
LOGGER.info("Targets coords:")
LOGGER.info(list(targets.coords))

2025-04-20 14:06:28 INFO     [1950880206.py:1] Features variables:
2025-04-20 14:06:28 INFO     [1950880206.py:2] ['coe_dew_point_depression_ensavg', 'coe_air_temperature_ensavg', 'coe_dew_point_temperature_ensavg', 'coe_dd_10m_ensavg', 'coe_ff_10m_ensavg', 'coe_dursun_ensavg', 'coe_surface_air_pressure_ensavg', 'coe_relhum_2m_ensavg', 'coe_leadtime', 'coe_water_vapor_mixing_ratio_ensavg', 'coe_t_2m_ensavg', 'coe_tot_prec_ensavg', 'coe_relative_humidity_ensavg', 'time_sin_dayofyear', 'time_cos_hourofday', 'time_cos_dayofyear', 'time_sin_hourofday']
2025-04-20 14:06:28 INFO     [1950880206.py:3] Features coords:
2025-04-20 14:06:28 INFO     [1950880206.py:4] ['forecast_reference_time', 'elevation', 'longitude', 'latitude', 'model_height_difference', 'grid_idx', 'station', 't']
2025-04-20 14:06:28 INFO     [1950880206.py:6] Targets variables:
2025-04-20 14:06:28 INFO     [1950880206.py:7] ['obs_air_temperature', 'obs_relative_humidity', 'obs_dew_point_temperature', 'obs_surface_air_press

In [18]:
model_height_difference = None
if "model_height_difference" in features.coords:
    model_height_difference = features["model_height_difference"]
    LOGGER.info("model_height_difference features.coords'dan alındı")
elif "model_height_difference" in targets.coords:
    model_height_difference = targets["model_height_difference"]
    LOGGER.info("model_height_difference targets.coords'dan alındı")

2025-04-20 14:06:37 INFO     [213958751.py:4] model_height_difference features.coords'dan alındı


In [19]:
available_features = []
for feature in config["features"]:
    # Değişkeni farklı formatlarda ara
    possibilities = [
        feature,
        feature.replace("_", ":"),
        feature.replace(":", "_")
    ]

    found = False
    for possibility in possibilities:
        if possibility in features.data_vars:
            available_features.append(possibility)
            found = True
            break
        elif possibility in features.coords:
            # Koordinat değişkeni olabilir
            pass

    if not found:
        LOGGER.warning(f"Feature bulunamadı: {feature}")

In [20]:
LOGGER.info(f"Bulunan features: {available_features}")
if available_features:
    features = features[available_features]
else:
    LOGGER.error("Hiçbir feature bulunamadı!")

2025-04-20 14:06:54 INFO     [772157482.py:1] Bulunan features: ['coe_air_temperature_ensavg', 'coe_dew_point_temperature_ensavg', 'coe_dew_point_depression_ensavg', 'coe_surface_air_pressure_ensavg', 'coe_relative_humidity_ensavg', 'coe_water_vapor_mixing_ratio_ensavg', 'coe_leadtime', 'time_cos_hourofday', 'time_sin_hourofday', 'time_cos_dayofyear', 'time_sin_dayofyear']


In [21]:
available_targets = []
for target in config["targets"]:
    # Değişkeni farklı formatlarda ara
    possibilities = [
        target,
        target.replace("_", ":"),
        target.replace(":", "_")
    ]

    found = False
    for possibility in possibilities:
        if possibility in targets.data_vars:
            available_targets.append(possibility)
            found = True
            break

    if not found:
        LOGGER.warning(f"Target bulunamadı: {target}")

In [22]:
LOGGER.info(f"Bulunan targets: {available_targets}")
if available_targets:
    targets = targets[available_targets]
else:
    LOGGER.error("Hiçbir target bulunamadı!")

2025-04-20 14:07:10 INFO     [4208383790.py:1] Bulunan targets: ['obs_air_temperature', 'obs_dew_point_temperature', 'obs_surface_air_pressure', 'obs_relative_humidity', 'obs_water_vapor_mixing_ratio']


In [23]:
if model_height_difference is not None:
    targets = targets.assign_coords(model_height_difference=model_height_difference)

# owner_id kontrolü - eğer yoksa ekleme yapabilirsiniz
if "owner_id" in targets.coords:
    targets = targets.where(targets.owner_id == 1, drop=True).load()
else:
    LOGGER.warning("owner_id koordinatı bulunamadı - hepsi seçilecek")
    # owner_id yoksa ekleme yapabilirsiniz
    targets = targets.assign_coords(owner_id=("station", np.ones(len(targets.station), dtype=int))).load()



In [24]:
try:
    targets = filter_stations(targets)
    features = features.reindex_like(targets).load()
except Exception as e:
    LOGGER.error(f"İstasyonları filtrelerken hata: {e}")
    LOGGER.info("Filtreleme adımı atlanıyor")

2025-04-20 14:07:31 INFO     [1796570519.py:10] Filtered out 183 out of 183


In [25]:
try:
    if "t" in features.dims and "t" in targets.dims:
        features = features.isel(t=slice(3, 24, 1))
        targets = targets.isel(t=slice(3, 24, 1))
        LOGGER.info("Leadtime filtrelendi")
    else:
        LOGGER.warning("t boyutu bulunamadı - filtreleme yapılmadı")

    # Saatleri filtreleme
    if "forecast_reference_time" in features.dims:
        try:
            features = features.where(features.forecast_reference_time.dt.hour == 0, drop=True)
            targets = targets.where(targets.forecast_reference_time.dt.hour == 0, drop=True)
            LOGGER.info("Saat filtrelendi")
        except:
            LOGGER.warning("Saat filtrelenemedi - dt.hour özelliği yok olabilir")
except Exception as e:
    LOGGER.error(f"Zaman dilimleri filtrelerken hata: {e}")
    LOGGER.info("Filtreleme adımı atlanıyor")

2025-04-20 14:07:42 INFO     [516163430.py:5] Leadtime filtrelendi
2025-04-20 14:07:42 INFO     [516163430.py:14] Saat filtrelendi


In [26]:
features.load()
targets.load()

In [27]:
try:
    x, y = reshape(features, targets)
    x, y = drop_missing(x, y)
except Exception as e:
    LOGGER.error(f"Reshape veya drop_missing sırasında hata: {e}")
    # Hata durumunda alınacak önlemler
    x = features.to_array("var")
    y = targets.to_array("var")
    LOGGER.warning("Basitleştirilmiş dönüşüm yapıldı")

2025-04-20 14:08:10 INFO     [2998523191.py:14] Reshaped: x -> {'s': 0, 'var': 11} and y -> {'s': 0, 'var': 5}
2025-04-20 14:08:10 INFO     [2427045864.py:11] Dropped 0 incomplete samples out of 0


In [28]:
coord_names = ["elevation", "longitude", "latitude", "model_height_difference"]
existing_coords = [coord for coord in coord_names if coord in targets.coords]
LOGGER.info(f"Mevcut koordinatlar: {existing_coords}")

2025-04-20 14:08:24 INFO     [4168571450.py:3] Mevcut koordinatlar: ['elevation', 'longitude', 'latitude', 'model_height_difference']


In [29]:
try:
    stations_list = {}
    for coord in existing_coords:
        coord_data = targets[coord].to_pandas() if coord in targets else None
        if coord_data is not None:
            stations_list[coord] = coord_data.to_dict()

    # İstasyon ID haritalaması
    station_id_map = {s: i for i, s in enumerate(targets.station.values)}
    stations_list["id"] = station_id_map

    # Eğer x'de station varsa, station_id_coord oluştur
    if hasattr(x, 'station') and callable(getattr(x.station, 'to_pandas', None)):
        station_id_coord = x.station.to_pandas().map(station_id_map).values
        x = x.assign_coords(station_id=("s", station_id_coord))
    else:
        LOGGER.warning("x.station bulunamadı, station_id koordinatı eklenemedi")

    # Koordinatları sıfırla
    reset_coords_list = ["owner_id"] + existing_coords
    reset_coords_x = [coord for coord in reset_coords_list if coord in getattr(x, 'coords', [])]
    reset_coords_y = [coord for coord in reset_coords_list if coord in getattr(y, 'coords', [])]

    if reset_coords_x:
        x = x.reset_coords(reset_coords_x, drop=True)
    if reset_coords_y:
        y = y.reset_coords(reset_coords_y, drop=True)

except Exception as e:
    LOGGER.error(f"İstasyon ID eşleştirmesinde hata: {e}")
    # Basit bir stations_list oluştur
    stations_list = {"id": {s: i for i, s in enumerate(targets.station.values)}}

In [32]:

# Dosyaları yazma kısmını güncelleyerek string veri sorunu çözülüyor
try:
    Path(outputs["x"]).parent.mkdir(parents=True, exist_ok=True)

    # Çıktı formatını zarr olarak değiştir (string desteği için)
    outputs["x"] = outputs["x"].replace(".nc", ".zarr")
    outputs["y"] = outputs["y"].replace(".nc", ".zarr")

    # Zarr olarak kaydet
    if hasattr(x, 'to_dataset'):
        # String değerleri de içeren verileri zarr olarak kaydet
        x.to_dataset("var").to_zarr(outputs["x"], mode="w")
    else:
        LOGGER.warning("x.to_dataset metodu bulunamadı, ham veri kaydediliyor")
        xr.Dataset({"x": x}).to_zarr(outputs["x"], mode="w")

    if hasattr(y, 'to_dataset'):
        y.to_dataset("var").to_zarr(outputs["y"], mode="w")
    else:
        LOGGER.warning("y.to_dataset metodu bulunamadı, ham veri kaydediliyor")
        xr.Dataset({"y": y}).to_zarr(outputs["y"], mode="w")

    with open(outputs["stations_list"], "w") as f:
        json.dump(stations_list, f, indent=4)

    LOGGER.info(f"Saved: {outputs['x']}, {outputs['y']} and {outputs['stations_list']}")
except Exception as e:
    LOGGER.error(f"Dosyaları kaydederken hata: {e}")

    # String değerlerini dönüştürmeyi dene
    try:
        LOGGER.info("String değerlerini dönüştürme deneniyor...")

        # String değerlerini kategori kodlarına dönüştür
        if hasattr(x, 'to_dataset'):
            x_ds = x.to_dataset("var")

            # String değerlerini kontrol et ve dönüştür
            for var in x_ds.variables:
                if np.issubdtype(x_ds[var].dtype, np.dtype('O')) or str(x_ds[var].dtype).startswith('string'):
                    LOGGER.info(f"String değeri bulundu ve dönüştürülüyor: {var}")
                    # String değerleri kategori kodlarına dönüştür
                    unique_values = np.unique(x_ds[var].values.ravel())
                    mapping = {val: i for i, val in enumerate(unique_values)}
                    numeric_values = np.array([mapping[v] for v in x_ds[var].values.ravel()]).reshape(x_ds[var].shape)
                    x_ds[var] = (x_ds[var].dims, numeric_values)

                    # Eşleme tablosunu bir attribute olarak ekle
                    x_ds[var].attrs["string_mapping"] = str(mapping)

            x_ds.to_netcdf(outputs["x"].replace(".zarr", ".nc"))

        if hasattr(y, 'to_dataset'):
            y_ds = y.to_dataset("var")

            # String değerlerini kontrol et ve dönüştür
            for var in y_ds.variables:
                if np.issubdtype(y_ds[var].dtype, np.dtype('O')) or str(y_ds[var].dtype).startswith('string'):
                    LOGGER.info(f"String değeri bulundu ve dönüştürülüyor: {var}")
                    # String değerleri kategori kodlarına dönüştür
                    unique_values = np.unique(y_ds[var].values.ravel())
                    mapping = {val: i for i, val in enumerate(unique_values)}
                    numeric_values = np.array([mapping[v] for v in y_ds[var].values.ravel()]).reshape(y_ds[var].shape)
                    y_ds[var] = (y_ds[var].dims, numeric_values)

                    # Eşleme tablosunu bir attribute olarak ekle
                    y_ds[var].attrs["string_mapping"] = str(mapping)

            y_ds.to_netcdf(outputs["y"].replace(".zarr", ".nc"))

        LOGGER.info(f"String değerleri dönüştürüldü ve netCDF olarak kaydedildi")
    except Exception as e2:
        LOGGER.error(f"String değerleri dönüştürürken de hata: {e2}")
        LOGGER.info("Alternatif: Veriyi pickle formatında kaydetmeyi deneyin")

        import pickle

        # Pickle olarak kaydet (son çare)
        with open(outputs["x"].replace(".zarr", ".pkl"), 'wb') as f:
            pickle.dump(x, f)

        with open(outputs["y"].replace(".zarr", ".pkl"), 'wb') as f:
            pickle.dump(y, f)

        LOGGER.info(f"Veriler pickle formatında kaydedildi")

  return cls(**configuration_parsed)
  meta = AsyncArray._create_metadata_v3(
  return cls(**configuration_parsed)
  return cls(**configuration_parsed)
  meta = AsyncArray._create_metadata_v3(


2025-04-20 14:10:37 INFO     [3681899976.py:26] Saved: filtered_and_preprocessed_data/x.zarr, filtered_and_preprocessed_data/y.zarr and data/stations_list.json


  return cls(**configuration_parsed)
