In [1]:
# === ONE-CELL NOTEBOOK: DE-LU Day-Ahead (DA) 15-min Price Forecast → two CSVs ===
%pip install -q pandas numpy requests pytz python-dateutil scikit-learn lightgbm matplotlib seaborn pyarrow joblib entsoe-py

import os, json, warnings
warnings.filterwarnings("ignore")

import pandas as pd
import numpy as np
import requests
from datetime import timedelta, date
from time import sleep
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
import lightgbm as lgb
import joblib

print("✅ Notebook boot OK.")

# -------------------------- Configuration --------------------------
# ENTSO-E token (Transparency Platform REST API).
# Ref: entsoe-py usage & tz requirements: https://pypi.org/project/entsoe-py/  [1](https://github.com/nager/Nager.Date)
os.environ["ENTSOE_API_TOKEN"] = "c75a0111-6841-4300-a9d8-e3746d32ac5e"  # <-- replace if needed
ENTSOE_TOKEN = os.getenv("ENTSOE_API_TOKEN") or os.getenv("ENTSOE_API") or ""
print("✅ ENTSO-E token detected." if ENTSOE_TOKEN else "⚠️ ENTSO-E token NOT detected")

ZONE_CODE_ENTSOE = "DE_LU"     # Germany-Luxembourg bidding zone in entsoe-py  [1](https://github.com/nager/Nager.Date)
TZ_LOCAL   = "Europe/Berlin"   # local output tz (DST-safe)
TZ_ENTSOE  = "Europe/Brussels" # entsoe-py expects Brussels tz  [1](https://github.com/nager/Nager.Date)

FEATURE_STORE_DIR = "./feature_store"; os.makedirs(FEATURE_STORE_DIR, exist_ok=True)
MODEL_DIR         = "./models";       os.makedirs(MODEL_DIR, exist_ok=True)
ARTIFACTS_DIR     = "./artifacts";    os.makedirs(ARTIFACTS_DIR, exist_ok=True)

# Training window (spans DA 15-min from 2025-10-01 onward; normalize earlier data to 15-min)
TRAIN_START = pd.Timestamp("2024-10-01", tz=TZ_LOCAL)
TRAIN_END   = pd.Timestamp.today(tz=TZ_LOCAL) - pd.Timedelta(days=5)

# Forecast days (local)
TOMORROW_LOCAL    = (pd.Timestamp.today(tz=TZ_LOCAL) + pd.Timedelta(days=1)).normalize()
TODAY_LOCAL       = pd.Timestamp.today(tz=TZ_LOCAL).normalize()
PREV_LOCAL_DATE   = TODAY_LOCAL - pd.Timedelta(days=1)   # for accuracy check

# Fallback open sources:
SMARD_BASE         = "https://www.smard.de/app/chart_data"   # DA 15-min since Oct 1, 2025  [2](https://www.entsoe.eu/network_codes/bzr/)
ENERGY_CHARTS_BASE = "https://api.energy-charts.info"        # public API (no token)        [3](http://www.world-timedate.com/dst/daylight_saving_time.php?city_id=162)

print("✅ Config loaded.")

# -------------------------- Helpers --------------------------
def make_qh_index(start_ts, end_ts, tzname=TZ_LOCAL):
    """DST-safe quarter-hour index."""
    return pd.date_range(start=start_ts, end=end_ts, freq="15min", tz=tzname)

def to_brussels(ts):
    """Ensure tz-aware Timestamp in Europe/Brussels for entsoe-py."""
    ts = pd.Timestamp(ts)
    return ts.tz_localize(TZ_ENTSOE) if ts.tz is None else ts.tz_convert(TZ_ENTSOE)

# -------------------------- ENTSO-E client --------------------------
from entsoe import EntsoePandasClient
pc = EntsoePandasClient(api_key=ENTSOE_TOKEN)

# -------------------------- DA series (ENTSO-E → SMARD → Energy-Charts) --------------------------
# SMARD fallback: 15-min DA since Oct 1, 2025.  [2](https://www.entsoe.eu/network_codes/bzr/)
def smard_index(filter_id, resolution="quarterhour", region="DE"):
    r = requests.get(f"{SMARD_BASE}/{filter_id}/{region}/index_{resolution}.json", timeout=30)
    r.raise_for_status(); return r.json()

def smard_series(filter_id, ts_token, resolution="quarterhour", region="DE"):
    r = requests.get(f"{SMARD_BASE}/{filter_id}/{region}/{filter_id}_{region}_{resolution}_{ts_token}.json", timeout=30)
    r.raise_for_status(); return r.json()

def get_smard_da_qh():
    for fid in [4169, 8004169]:  # community-known ids (may change)
        try:
            idx = smard_index(fid, "quarterhour", "DE")
            if idx.get("timestamps"):
                token = idx["timestamps"][-1]
                data = smard_series(fid, token, "quarterhour", "DE")
                df = pd.DataFrame(data.get("series", []), columns=["unix_ms", "eur_per_mwh"])
                if not df.empty:
                    df["timestamp"] = pd.to_datetime(df["unix_ms"], unit="ms", utc=True).dt.tz_convert(TZ_LOCAL)
                    return df.set_index("timestamp")[["eur_per_mwh"]].rename(columns={"eur_per_mwh":"da_eur_per_mwh"}).sort_index()
        except Exception: pass
    return pd.DataFrame(columns=["da_eur_per_mwh"])

# Energy-Charts fallback: public API (no token).  [3](http://www.world-timedate.com/dst/daylight_saving_time.php?city_id=162)
def get_energy_charts_da(country="DE", year=None):
    if year is None: year = date.today().year
    for url in [f"{ENERGY_CHARTS_BASE}/price_day_ahead?country={country}&year={year}",
                f"{ENERGY_CHARTS_BASE}/price_spot?country={country}&year={year}"]:
        try:
            r = requests.get(url, timeout=30)
            if r.ok:
                js = r.json(); vals = js.get("price", js.get("values"))
                if js.get("unix_seconds") and vals:
                    ts = pd.to_datetime(js["unix_seconds"], unit="s", utc=True).tz_convert(TZ_LOCAL)
                    return pd.DataFrame({"da_eur_per_mwh": vals}, index=ts).sort_index()
        except Exception: pass
    return pd.DataFrame(columns=["da_eur_per_mwh"])

def get_da_series(start_local, end_local):
    """Get DA series from ENTSO-E; fallback to SMARD/Energy-Charts; normalize to 15-min."""
    try:
        s_br, e_br = to_brussels(start_local), to_brussels(end_local)
        da = pc.query_day_ahead_prices(country_code=ZONE_CODE_ENTSOE, start=s_br, end=e_br) # DIRECT CALL
        da = da.rename("da_eur_per_mwh").to_frame()
    except Exception as e:
        print("ENTSO-E DA failed:", repr(e))
        da = get_smard_da_qh()
        if da.empty:
            da = get_energy_charts_da("DE")
    idx_qh = make_qh_index(start_local, end_local)
    return da.resample("15min").mean().ffill().bfill().reindex(idx_qh)

# -------------------------- DA-related features (ENTSO-E forecasts, calendar) --------------------------
def get_load_forecast(start_local, end_local):
    """Robust: returns Series or empty on failure."""
    try:
        s_br, e_br = to_brussels(start_local), to_brussels(end_local)
        ser = pc.query_load_forecast(country_code=ZONE_CODE_ENTSOE, start=s_br, end=e_br) # DIRECT CALL
        return ser.rename("load_fcst_mw")
    except Exception as e:
        print("ENTSO-E load forecast failed:", repr(e))
        return pd.Series(dtype=float, name="load_fcst_mw")

def get_wind_solar_forecast(start_local, end_local):
    """Robust: returns RES forecast proxy or empty on failure."""
    try:
        s_br, e_br = to_brussels(start_local), to_brussels(end_local)
        df = pc.query_wind_and_solar_forecast(country_code=ZONE_CODE_ENTSOE, start=s_br, end=e_br) # DIRECT CALL
        if isinstance(df, pd.DataFrame) and not df.empty:
            ser = df.select_dtypes(include=[np.number]).sum(axis=1)
            ser.name = "res_fcst_mw"
            return ser
        elif isinstance(df, pd.Series):
            return df.rename("res_fcst_mw")
    except Exception as e:
        print("ENTSO-E wind/solar forecast failed:", repr(e))
    return pd.Series(dtype=float, name="res_fcst_mw")

def build_feature_store(start_local=TRAIN_START, end_local=TRAIN_END):
    idx_qh = make_qh_index(start_local, end_local)
    da = get_da_series(start_local, end_local)
    load_fcst = get_load_forecast(start_local, end_local)
    res_fcst  = get_wind_solar_forecast(start_local, end_local)

    def to_qh(df_or_ser):
        if df_or_ser is None or len(df_or_ser)==0:
            if isinstance(df_or_ser, pd.Series) and df_or_ser.name:
                df = pd.DataFrame(index=idx_qh, columns=[df_or_ser.name])
                return df.astype(float)
            return pd.DataFrame(index=idx_qh)
        df = df_or_ser if isinstance(df_or_ser, pd.DataFrame) else df_or_ser.to_frame()
        return df.resample("15min").mean().ffill().bfill().reindex(idx_qh)

    X = to_qh(da)
    X = X.join(to_qh(load_fcst))
    X = X.join(to_qh(res_fcst))

    # Calendar features
    X["hour"] = X.index.hour
    X["quarter"] = (X.index.minute // 15)
    X["dow"] = X.index.dayofweek
    X["month"] = X.index.month
    X["is_weekend"] = X["dow"].isin([5,6]).astype(int)

    # Lags (yesterday & weekly)
    for lag_qh in [1, 4, 96, 96*7]:  # 15m, 1h, 1 day, 1 week
        X[f"da_lag_{lag_qh}"] = X["da_eur_per_mwh"].shift(lag_qh)

    # Ensure expected feature columns exist even if forecasts failed
    expected_cols = ["load_fcst_mw","res_fcst_mw","hour","quarter","dow","month","is_weekend",
                     "da_lag_1","da_lag_4","da_lag_96","da_lag_672"]
    for c in expected_cols:
        if c not in X.columns:
            X[c] = 0.0

    path = f"{FEATURE_STORE_DIR}/da_features_qh.parquet"
    X.to_parquet(path)
    print(f"✅ DA feature store saved: {path} (shape={X.shape})")
    return X

features = build_feature_store()

# -------------------------- Train DA model (quantile LightGBM) --------------------------
def make_supervised_da(df, target_col="da_eur_per_mwh"):
    base = df.dropna(subset=[target_col]).copy()
    y = base[target_col]
    X = base.drop(columns=[target_col])
    num_cols = X.select_dtypes(include=[np.number]).columns
    return X[num_cols], y

X_da, y_da = make_supervised_da(features)

# Temporal split: last 15% validation
# Correction: Convert X_tr and X_va to NumPy arrays (.values)
cut = int(0.85 * len(X_da))
# X_tr and X_va are now NumPy arrays (no feature names)
X_tr, X_va = X_da.iloc[:cut].values, X_da.iloc[cut:].values
y_tr, y_va = y_da.iloc[:cut], y_da.iloc[cut:]

def train_q_model(Xtr, ytr, alpha):
    params = dict(objective="quantile", alpha=alpha, n_estimators=500, learning_rate=0.05, num_leaves=63)
    model = lgb.LGBMRegressor(**params)
    pipe = Pipeline([("scaler", StandardScaler(with_mean=False)), ("lgb", model)])
    pipe.fit(Xtr, ytr)
    return pipe

mdl_p50 = train_q_model(X_tr, y_tr, 0.5)
mdl_p10 = train_q_model(X_tr, y_tr, 0.1)
mdl_p90 = train_q_model(X_tr, y_tr, 0.9)

def pinball_loss(y_true, y_pred, q):
    e = y_true - y_pred
    return np.mean(np.maximum(q*e, (q-1)*e))

p50_va = mdl_p50.predict(X_va)
p10_va = mdl_p10.predict(X_va)
p90_va = mdl_p90.predict(X_va)

print("Validation MAE (p50):", float(np.mean(np.abs(y_va - p50_va))))
print("Pinball@10:", float(pinball_loss(y_va, p10_va, 0.1)),
      "Pinball@50:", float(pinball_loss(y_va, p50_va, 0.5)),
      "Pinball@90:", float(pinball_loss(y_va, p90_va, 0.9)))

joblib.dump({"p50": mdl_p50, "p10": mdl_p10, "p90": mdl_p90,
             "cols": X_da.columns.tolist()}, f"{MODEL_DIR}/da_models_qh.joblib")
print("✅ DA models saved.")

# -------------------------- Forecast for a given delivery day --------------------------
def build_features_for_day(delivery_local_date: pd.Timestamp):
    start_t = delivery_local_date
    end_t   = delivery_local_date + pd.Timedelta(days=1)
    idx_qh  = make_qh_index(start_t, end_t, TZ_LOCAL)

    # Forecast drivers for that delivery day
    load_fcst_t = get_load_forecast(start_t, end_t)
    res_fcst_t  = get_wind_solar_forecast(start_t, end_t)

    # Yesterday DA lags relative to delivery day (align by +1 day)
    y_start = start_t - pd.Timedelta(days=1)
    y_end   = end_t   - pd.Timedelta(days=1)
    da_yday = get_da_series(y_start, y_end)
    yday_shifted = da_yday["da_eur_per_mwh"].copy()
    yday_shifted.index = yday_shifted.index + pd.Timedelta(days=1)

    def to_qh(df_or_ser):
        if df_or_ser is None or len(df_or_ser)==0:
            if isinstance(df_or_ser, pd.Series) and df_or_ser.name:
                df = pd.DataFrame(index=idx_qh, columns=[df_or_ser.name])
                return df.astype(float)
            return pd.DataFrame(index=idx_qh)
        df = df_or_ser if isinstance(df_or_ser, pd.DataFrame) else df_or_ser.to_frame()
        return df.resample("15min").mean().ffill().bfill().reindex(idx_qh)

    X = pd.DataFrame(index=idx_qh)
    X = X.join(to_qh(load_fcst_t).rename(columns={"load_fcst_mw":"load_fcst_mw"}))
    X = X.join(to_qh(res_fcst_t).rename(columns={"res_fcst_mw":"res_fcst_mw"}))

    # Calendar features
    X["hour"] = X.index.hour
    X["quarter"] = (X.index.minute // 15)
    X["dow"] = X.index.dayofweek
    X["month"] = X.index.month
    X["is_weekend"] = X["dow"].isin([5,6]).astype(int)

    # Lags aligned
    lag_base = yday_shifted.reindex(idx_qh)
    X["da_lag_96"]  = lag_base
    X["da_lag_1"]   = lag_base.shift(1)
    X["da_lag_4"]   = lag_base.shift(4)
    X["da_lag_672"] = lag_base.shift(96*7)

  # Ensure all training columns exist
    cols = joblib.load(f"{MODEL_DIR}/da_models_qh.joblib")["cols"]
    for c in cols:
        if c not in X.columns:
            X[c] = 0.0
    # Line 405 correction: Use ffill() and bfill() directly
    X = X[cols].ffill().bfill().fillna(0.0) 
    return X

def forecast_for_day(delivery_local_date: pd.Timestamp):
    models = joblib.load(f"{MODEL_DIR}/da_models_qh.joblib")
    X_day = build_features_for_day(delivery_local_date)
    p50 = models["p50"].predict(X_day[models["cols"]].values) 
    p10 = models["p10"].predict(X_day[models["cols"]].values)
    p90 = models["p90"].predict(X_day[models["cols"]].values)

    out = pd.DataFrame({
        "timestamp_local": X_day.index,
        "price_p10_eur_per_mwh": np.round(p10, 2),
        "price_p50_eur_per_mwh": np.round(p50, 2),
        "price_p90_eur_per_mwh": np.round(p90, 2)
    }).set_index("timestamp_local")
    return out

# -------------------------- Accuracy: previous day actual vs predicted --------------------------
def prev_day_actual_vs_pred(prev_local_date: pd.Timestamp):
    # Actual DA for prev day (open-data chain)
    actual_df = get_da_series(prev_local_date, prev_local_date + pd.Timedelta(days=1))
    actual_df = actual_df.rename(columns={"da_eur_per_mwh":"actual_da_eur_per_mwh"})

    # Predicted for prev day
    pred_df = forecast_for_day(prev_local_date).rename(columns={
        "price_p50_eur_per_mwh":"pred_p50_eur_per_mwh",
        "price_p10_eur_per_mwh":"pred_p10_eur_per_mwh",
        "price_p90_eur_per_mwh":"pred_p90_eur_per_mwh"
    })

    # Join and compute errors
    comp = pred_df.join(actual_df, how="left")
    comp["abs_error"]  = np.abs(comp["actual_da_eur_per_mwh"] - comp["pred_p50_eur_per_mwh"])
    comp["sq_error"]   = (comp["actual_da_eur_per_mwh"] - comp["pred_p50_eur_per_mwh"])**2
    comp["in_band"]    = ((comp["actual_da_eur_per_mwh"] >= comp["pred_p10_eur_per_mwh"]) &
                          (comp["actual_da_eur_per_mwh"] <= comp["pred_p90_eur_per_mwh"])).astype(int)

    mae  = float(comp["abs_error"].mean())
    rmse = float(np.sqrt(comp["sq_error"].mean()))
    coverage = float(100.0 * comp["in_band"].mean())
    print(f"✅ Prev-day metrics → MAE: {mae:.3f} EUR/MWh | RMSE: {rmse:.3f} EUR/MWh | Coverage (p10–p90): {coverage:.2f}%")
    return comp

# -------------------------- Two separate CSV outputs --------------------------
# 1) Forecast for TOMORROW → CSV
forecast_tomorrow_df = forecast_for_day(TOMORROW_LOCAL)
csv_forecast_path = f"{ARTIFACTS_DIR}/DA_forecast_DE-LU_{TOMORROW_LOCAL.date().isoformat()}.csv"
forecast_tomorrow_df.to_csv(csv_forecast_path, index=True)
print(f"✅ Saved DA forecast CSV (tomorrow): {csv_forecast_path} (rows={len(forecast_tomorrow_df)})")

# 2) Previous day actual vs predicted → CSV
prev_comp_df = prev_day_actual_vs_pred(PREV_LOCAL_DATE)
csv_accuracy_path = f"{ARTIFACTS_DIR}/DA_prev_day_actual_vs_pred_DE-LU_{PREV_LOCAL_DATE.date().isoformat()}.csv"
prev_comp_df.to_csv(csv_accuracy_path, index=True)


Note: you may need to restart the kernel to use updated packages.
✅ Notebook boot OK.
✅ ENTSO-E token detected.
✅ Config loaded.
ENTSO-E load forecast failed: TypeError("'str' object is not callable")
✅ DA feature store saved: ./feature_store/da_features_qh.parquet (shape=(39771, 12))
[LightGBM] [Info] Auto-choosing row-wise multi-threading, the overhead of testing was 0.001430 seconds.
You can set `force_row_wise=true` to remove the overhead.
And if memory is not enough, you can set `force_col_wise=true`.
[LightGBM] [Info] Total Bins 1325
[LightGBM] [Info] Number of data points in the train set: 33805, number of used features: 10
[LightGBM] [Info] Start training from score 95.599998


  updated_mean = (last_sum + new_sum) / updated_sample_count
  T = new_sum / new_sample_count
  new_unnormalized_variance -= correction**2 / new_sample_count
  updated_mean = (last_sum + new_sum) / updated_sample_count
  T = new_sum / new_sample_count
  new_unnormalized_variance -= correction**2 / new_sample_count


[LightGBM] [Info] Auto-choosing col-wise multi-threading, the overhead of testing was 0.001500 seconds.
You can set `force_col_wise=true` to remove the overhead.
[LightGBM] [Info] Total Bins 1325
[LightGBM] [Info] Number of data points in the train set: 33805, number of used features: 10
[LightGBM] [Info] Start training from score 3.900000


  updated_mean = (last_sum + new_sum) / updated_sample_count
  T = new_sum / new_sample_count
  new_unnormalized_variance -= correction**2 / new_sample_count


[LightGBM] [Info] Auto-choosing col-wise multi-threading, the overhead of testing was 0.002355 seconds.
You can set `force_col_wise=true` to remove the overhead.
[LightGBM] [Info] Total Bins 1325
[LightGBM] [Info] Number of data points in the train set: 33805, number of used features: 10
[LightGBM] [Info] Start training from score 144.610001




Validation MAE (p50): 8.869997884428805
Pinball@10: 3.181057764373718 Pinball@50: 4.434998942214403 Pinball@90: 3.5834322034595587
✅ DA models saved.
ENTSO-E load forecast failed: NoMatchingDataError()
ENTSO-E wind/solar forecast failed: NoMatchingDataError()




✅ Saved DA forecast CSV (tomorrow): ./artifacts/DA_forecast_DE-LU_2025-11-25.csv (rows=97)
ENTSO-E load forecast failed: TypeError("'str' object is not callable")
✅ Prev-day metrics → MAE: 19.053 EUR/MWh | RMSE: 21.944 EUR/MWh | Coverage (p10–p90): 9.28%


