In [1]:
# import libraries
from pathlib import Path
import pandas as pd
import numpy as np
import os
from scipy.stats import norm
import pickle

In [13]:
dir_sfcwind = Path("../data/processed/wind_power")
files_sfcwind = list(dir_sfcwind.glob("*"))
dict_wind = {file.stem: pd.read_parquet(file) for file in files_sfcwind}

In [None]:

def taskW_add_window_power_all_sites(
    dic_wind: dict,
    power_col: str = "power",
    windows: list[str] = None,
    time_col: str = "time",
    agg: str = "mean", 
    min_periods: int | None = None,  
    center: bool = False             
) -> dict:
    """
    Task W: 对每个站点的 power 做 time-based rolling window 聚合，生成 power_w{window}
    Returns: dic_wind_win {code: df with added columns power_w...}
    """
    if windows is None:
        windows = ["1H", "4H", "12H", "24H", "48H", "72H", "120H"]

    out = {}

    for code, df in dic_wind.items():
        d = df.copy()
        d[time_col] = pd.to_datetime(d[time_col], errors="coerce")
        d = d.sort_values(time_col).set_index(time_col)

        for w in windows:
            expected = int(pd.Timedelta(w).total_seconds() // 3600)
            mp = expected if min_periods is None else min_periods

            out_col = f"{power_col}_w{w}"
            roll = d[power_col].rolling(window=w, min_periods=mp, center=center)

            if agg == "mean":
                d[out_col] = roll.mean()
            elif agg == "sum":
                d[out_col] = roll.sum()
            else:
                raise ValueError("agg must be 'mean' or 'sum'")

        out[code] = d.reset_index()

    return out


In [None]:
WINDOWS_WIND = ["1h", "4h", "12h", "24h", "48h", "72h", "120h"]

# Task W: window on power
dict_wind_win = taskW_add_window_power_all_sites(
    dict_wind,
    power_col="power",      
    windows=WINDOWS_WIND,
    time_col="time",
    agg="mean",
    min_periods=None,      
    center=False
)

In [None]:
def build_ecdf_model(values: np.ndarray) -> dict:
    v = np.asarray(values, dtype=float)
    v = v[~np.isnan(v)]
    if v.size < 10:
        raise ValueError(f"Not enough samples to build ECDF: {v.size}")
    x_sorted = np.sort(v)
    N = x_sorted.size
    eps = 1.0 / (N + 1)
    return {"x_sorted": x_sorted, "N": N, "eps": eps}

def ecdf_p(x: np.ndarray, model: dict, clip: bool = True) -> np.ndarray:
    xs = model["x_sorted"]
    N = model["N"]
    p = np.searchsorted(xs, x, side="right") / (N + 1)
    if clip:
        eps = model["eps"]
        p = np.clip(p, eps, 1 - eps)
    return p

def taskB_window_power_to_ecdf_all_sites(
    dict_wind_win: dict,
    power_col: str = "power",
    windows: list[str] = None,
) -> tuple[dict, dict]:
    """
    Task B: 对每个 window 的 power_w{w} 建 ECDF，输出 p_w{w}
    Returns:
      dic_ecdf: {code: df with added p_w...}
      models: {w: {code: model}}
    """
    if windows is None:
        windows = ["1H", "4H", "12H", "24H", "48H", "72H", "120H"]

    dict_ecdf = {}
    models = {w: {} for w in windows}

    for code, df in dict_wind_win.items():
        d = df.copy()

        for w in windows:
            w_col = f"{power_col}_w{w}"
            p_col = f"p_w{w}"

            series = d[w_col].to_numpy(dtype=float)
            model = build_ecdf_model(series)

            d[p_col] = ecdf_p(series, model, clip=True)
            models[w][code] = model

        dict_ecdf[code] = d

    return dict_ecdf, models


In [32]:
# Task B: ECDF on each window series
dict_wind_ecdf, wind_models = taskB_window_power_to_ecdf_all_sites(
    dict_wind_win,
    power_col="power",
    windows=WINDOWS_WIND
)

In [33]:
def taskC_ecdf_to_srepi_all_sites(
    dict_ecdf: dict,
    windows: list[str] = None,
    eps: float = 1e-6,
) -> dict:
    """
    Task C: 对每个 window，把 p_w{w} -> srepi_w{w}
    Returns: dic_srepi {code: df with added srepi_w...}
    """
    if windows is None:
        windows = ["1H", "4H", "12H", "24H", "48H", "72H", "120H"]

    dict_srepi = {}

    for code, df in dict_ecdf.items():
        d = df.copy()
        for w in windows:
            p_col = f"p_w{w}"
            s_col = f"srepi_w{w}"
            p_clip = d[p_col].astype(float).clip(eps, 1 - eps)
            d[s_col] = norm.ppf(p_clip.to_numpy())
        dict_srepi[code] = d

    return dict_srepi


In [34]:
# Task C: SREPI for each window
dict_wind_srepi = taskC_ecdf_to_srepi_all_sites(
    dict_wind_ecdf,
    windows=WINDOWS_WIND
)

In [37]:
out_dir = "../data/processed/wind_srepi"
os.makedirs(out_dir, exist_ok=True)
for site, data in dict_wind_srepi.items():
    file_path = out_dir + "/" + site + ".parquet"
    data.to_parquet(file_path, index=False)
