# MARBL — UK Balancing & Wholesale (with BE/NL backups) + Spike Analysis

Purpose
- **Primary:** UK Balancing (Elexon, keyless) and UK Wholesale Day-Ahead (ENTSO-E token).
- **Backup:** Belgium Balancing (Elia Opendatasoft) + Day-Ahead (ENTSO-E), Netherlands Day-Ahead (ENTSO-E), NL Balancing stub (TenneT when key is approved).
- **Outputs:** Idempotent CSVs, robust availability snapshot, and **spike analysis** (incl. Brexit-related windows).

Tokens / Keys
- ENTSO-E: set `ENTSOE_TOKEN` (required for day-ahead prices).
- TenneT (NL Balancing): set `TENNET_API_KEY` (optional; when approved).

In [None]:
# Switches (enable/disable data sources)
RUN_GB_BAL = True    # Elexon system prices (balancing, 30-min)
RUN_GB_DA  = True    # ENTSO-E Day-Ahead (GB zone)

RUN_BE_BAL = True    # Elia ods133 (imbalance price, 1-min)
RUN_BE_DA  = True    # ENTSO-E Day-Ahead (BE zone)

RUN_NL_BAL = False   # TenneT (requires key; leave False until approved)
RUN_NL_DA  = True    # ENTSO-E Day-Ahead (NL zone)

# Deps
try:
    import requests  # noqa: F401
except ModuleNotFoundError:
    import sys, subprocess
    subprocess.check_call([sys.executable, "-m", "pip", "install", "requests", "--quiet"])
    import requests  # noqa: F401

from pathlib import Path
from datetime import datetime, timedelta, timezone
import pandas as pd
import os
import io

# IO structure (idempotent)
DATA_DIR   = Path("./data"); DATA_DIR.mkdir(parents=True, exist_ok=True)
GB_BAL_DIR = DATA_DIR / "gb_bal"; GB_BAL_DIR.mkdir(exist_ok=True)
GB_DA_DIR  = DATA_DIR / "gb_da";  GB_DA_DIR.mkdir(exist_ok=True)
BE_BAL_DIR = DATA_DIR / "be_bal"; BE_BAL_DIR.mkdir(exist_ok=True)
BE_DA_DIR  = DATA_DIR / "be_da";  BE_DA_DIR.mkdir(exist_ok=True)
NL_BAL_DIR = DATA_DIR / "nl_bal"; NL_BAL_DIR.mkdir(exist_ok=True)
NL_DA_DIR  = DATA_DIR / "nl_da";  NL_DA_DIR.mkdir(exist_ok=True)

# HTTP session
import requests
SESSION = requests.Session()
SESSION.headers.update({"User-Agent": "MARBL-smoketest/4.0"})
DEFAULT_TIMEOUT = 45

# Tokens
ENTSOE_TOKEN = os.getenv("ENTSOE_TOKEN", "").strip()
TENNET_API_KEY = os.getenv("TENNET_API_KEY", "").strip()
TENNET_KEY_HEADER = os.getenv("TENNET_KEY_HEADER", "Ocp-Apim-Subscription-Key")  # alt: X-API-Key

def log(msg: str):
    print(f"[MARBL] {msg}")

print(f"[TOKENS] ENTSOE_TOKEN set: {bool(ENTSOE_TOKEN)} | TENNET_API_KEY set: {bool(TENNET_API_KEY)}")

In [None]:
# Time window (UTC). Use explicit dates or LAST_N_DAYS.
FROM_DATE = None     # e.g., "2019-12-01"
TO_DATE   = None     # e.g., "2020-02-01"
LAST_N_DAYS = 5      # used if FROM/TO are not set

def compute_window():
    if FROM_DATE and TO_DATE:
        start = datetime.fromisoformat(FROM_DATE).replace(tzinfo=timezone.utc)
        end   = datetime.fromisoformat(TO_DATE).replace(tzinfo=timezone.utc) + timedelta(days=1)
        return start, end
    end = datetime.now(timezone.utc).replace(second=0, micro_ok=True)
    # Fallback: some Python versions don't accept micro_ok; enforce microsecond=0
    try:
        end = end
    except Exception:
        end = datetime.now(timezone.utc).replace(second=0, microsecond=0)
    start = end - timedelta(days=LAST_N_DAYS)
    return start, end

WINDOW_START, WINDOW_END = compute_window()
log(f"Window: {WINDOW_START.isoformat()} .. {WINDOW_END.isoformat()}")

In [None]:
def http_get(url: str, params=None, headers=None, timeout=DEFAULT_TIMEOUT):
    r = SESSION.get(url, params=params, headers=headers, timeout=timeout)
    r.raise_for_status()
    return r

def to_utc(ts):
    if ts is None: return None
    if isinstance(ts, datetime):
        return ts if ts.tzinfo else ts.replace(tzinfo=timezone.utc)
    try:
        return pd.to_datetime(ts, utc=True, errors="coerce").to_pydatetime()
    except Exception:
        return None

def write_csv_no_dups(df: pd.DataFrame, path: Path, key_cols):
    path.parent.mkdir(parents=True, exist_ok=True)
    if df is None or df.empty: 
        return False, 0
    df2 = df.copy()
    for c in key_cols:
        if c in df2.columns and "ts" in c:
            df2[c] = pd.to_datetime(df2[c], utc=True, errors="coerce")
    if path.exists():
        try:
            old = pd.read_csv(path)
            for c in key_cols:
                if c in old.columns and "ts" in c:
                    old[c] = pd.to_datetime(old[c], utc=True, errors="coerce")
            df2 = pd.concat([old, df2], ignore_index=True)
        except Exception as e:
            log(f"WARN: reading existing {path.name} failed: {e}")
    before = len(df2)
    df2 = df2.drop_duplicates(subset=[c for c in key_cols if c in df2.columns]).sort_values(key_cols)
    removed = before - len(df2)
    tmp = path.with_suffix(path.suffix + ".tmp")
    df2.to_csv(tmp, index=False)
    tmp.replace(path)
    return True, removed

def qc_print(df: pd.DataFrame, label: str, val_col: str | None):
    if df is None or df.empty:
        print(f"{label}: empty")
        return
    dups = df.duplicated(subset=[c for c in ["ts_utc", val_col] if c in df.columns]).sum()
    nas  = df[val_col].isna().sum() if val_col and val_col in df.columns else "NA"
    tmin = df["ts_utc"].min() if "ts_utc" in df.columns else None
    tmax = df["ts_utc"].max() if "ts_utc" in df.columns else None
    print(f"{label}: rows={len(df)} dups={dups} na={nas} ts=[{tmin} .. {tmax}]")
    display(df.head(8))

def scan_dir_safe(dir_path: Path, label: str, ts_candidates=("ts_utc","start_time","time")) -> pd.DataFrame:
    rows = []
    for p in sorted(dir_path.glob("*.csv")):
        try:
            df = pd.read_csv(p)
            tcol = next((c for c in ts_candidates if c in df.columns), None)
            if tcol:
                df[tcol] = pd.to_datetime(df[tcol], utc=True, errors="coerce")
                rows.append({"dataset": label, "file": p.name, "rows": int(len(df)),
                             "min_ts": df[tcol].min(), "max_ts": df[tcol].max()})
            else:
                rows.append({"dataset": label, "file": p.name, "rows": int(len(df)),
                             "min_ts": None, "max_ts": None})
        except Exception as e:
            rows.append({"dataset": label, "file": p.name, "rows": None, "min_ts": None, "max_ts": None})
    return pd.DataFrame(rows)

In [None]:
# GB Balancing — Elexon system prices (30-min), keyless
ELEXON_JSON = "https://data.elexon.co.uk/bmrs/api/v1/system-price"
ELEXON_CSV  = "https://data.elexon.co.uk/bmrs/api/v1/system-price/csv"

def gb_system_prices_day(day: str) -> pd.DataFrame:
    start = f"{day}T00:00:00Z"; end = f"{day}T23:59:59Z"
    try:
        r = http_get(ELEXON_JSON, params={"from": start, "to": end}, timeout=30)
        if "application/json" in r.headers.get("content-type","").lower():
            js = r.json()
            items = js.get("data") or js.get("items") or js
            if isinstance(items, dict): items = items.get("results", [])
            rows = []
            for it in (items if isinstance(items, list) else []):
                ts = it.get("settlementDateTime") or it.get("time") or it.get("timestamp")
                sp = it.get("systemPrice") or it.get("system_price") or it.get("price")
                if ts is None or sp is None: 
                    continue
                rows.append({"ts_utc": to_utc(ts), "sp_gbp_mwh": float(sp)})
            df = pd.DataFrame(rows).dropna(subset=["ts_utc"]).sort_values("ts_utc")
            if not df.empty: return df
    except Exception as e:
        log(f"GB JSON error: {e}")
    try:
        r = http_get(ELEXON_CSV, params={"from": start, "to": end}, timeout=30)
        df = pd.read_csv(io.StringIO(r.text))
        cols = {c.lower(): c for c in df.columns}
        ts_col = cols.get("settlementdatetime") or cols.get("time") or cols.get("timestamp") or list(df.columns)[0]
        price_col = cols.get("systemprice") or cols.get("price") or list(df.columns)[-1]
        out = pd.DataFrame({
            "ts_utc": pd.to_datetime(df[ts_col], utc=True, errors="coerce"),
            "sp_gbp_mwh": pd.to_numeric(df[price_col], errors="coerce")
        }).dropna(subset=["ts_utc"]).sort_values("ts_utc")
        return out
    except Exception as e:
        log(f"GB CSV error: {e}")
        return pd.DataFrame()

In [None]:
# ENTSO-E Day-Ahead (Wholesale) — requires ENTSOE_TOKEN
from xml.etree import ElementTree as ET

ENTSOE_BASE = "https://web-api.tp.entsoe.eu/api"
BZN = {"GB": "10YGB----------A", "BE": "10YBE----------2", "NL": "10YNL----------L"}

def _ti(dt: datetime) -> str:
    return dt.strftime("%Y%m%d%H%M")

def entsoe_da_prices(bzn: str, start_dt: datetime, end_dt: datetime) -> pd.DataFrame:
    if not ENTSOE_TOKEN:
        log("ENTSOE_TOKEN not set -> skip DA")
        return pd.DataFrame()
    params = {
        "securityToken": ENTSOE_TOKEN, "documentType": "A44",
        "in_Domain": bzn, "out_Domain": bzn,
        "timeInterval": f"{_ti(start_dt)}-{_ti(end_dt)}",
    }
    r = http_get(ENTSOE_BASE, params=params, timeout=60)
    root = ET.fromstring(r.text)
    ns = {"ns": root.tag.split('}')[0].strip('{')} if '}' in root.tag else {}
    rows = []
    for ts in (root.findall(".//ns:TimeSeries", ns) if ns else root.findall(".//TimeSeries")):
        for period in (ts.findall(".//ns:Period", ns) if ns else ts.findall(".//Period")):
            start_str = period.findtext("./ns:timeInterval/ns:start", default="", namespaces=ns) if ns else period.findtext("./timeInterval/start", "")
            start_ts = pd.to_datetime(start_str, utc=True, errors="coerce")
            for point in (period.findall("./ns:Point", ns) if ns else period.findall("./Point")):
                pos  = point.findtext("./ns:position", default="", namespaces=ns) if ns else point.findtext("./position", "")
                pval = point.findtext("./ns:price.amount", default="", namespaces=ns) if ns else point.findtext("./price.amount", "")
                if not start_ts or not pval: continue
                try: k = int(pos) - 1
                except Exception: k = 0
                ts_utc = start_ts + timedelta(hours=k)
                rows.append({"ts_utc": ts_utc, "da_price": pd.to_numeric(pval, errors="coerce")})
    return pd.DataFrame(rows).dropna(subset=["ts_utc"]).sort_values("ts_utc")

In [None]:
# BE Balancing — Elia ods133 (fast: v2.1 where + CSV fallback + daily chunking)
ELIA_V21 = "https://opendata.elia.be/api/explore/v2.1/catalog/datasets/ods133/records"
ELIA_CSV = "https://opendata.elia.be/explore/dataset/ods133/download/"
ELIA_EARLIEST = datetime(2024, 5, 22, tzinfo=timezone.utc)

def _isoz(dt: datetime) -> str:
    return dt.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")

def be_ods133_v21_range(start_dt: datetime, end_dt: datetime, page_rows=10000, timeout=60) -> pd.DataFrame:
    start_dt = max(start_dt, ELIA_EARLIEST)
    if start_dt >= end_dt: return pd.DataFrame(columns=["ts_utc","price_eur_mwh"])
    rows, offset = [], 0
    where = f'datetime >= "{_isoz(start_dt)}" AND datetime < "{_isoz(end_dt)}"'
    while True:
        r = http_get(ELIA_V21, params={"where": where, "order_by": "datetime", "limit": page_rows, "offset": offset, "timezone":"UTC"}, timeout=timeout)
        js = r.json(); batch = js.get("results", [])
        if not batch: break
        for it in batch:
            ts = it.get("datetime"); px = it.get("imbalance_price_eur_per_mwh")
            if ts is None or px is None: continue
            rows.append({"ts_utc": pd.to_datetime(ts, utc=True, errors="coerce"), "price_eur_mwh": pd.to_numeric(px, errors="coerce")})
        if len(batch) < page_rows: break
        offset += page_rows
        if offset >= 1_000_000: break
    df = pd.DataFrame(rows)
    if df.empty: return df
    return df.dropna(subset=["ts_utc"]).drop_duplicates(subset=["ts_utc"]).sort_values("ts_utc").reset_index(drop=True)

def be_ods133_csv_range(start_dt: datetime, end_dt: datetime, timeout=90) -> pd.DataFrame:
    start_dt = max(start_dt, ELIA_EARLIEST)
    if start_dt >= end_dt: return pd.DataFrame(columns=["ts_utc","price_eur_mwh"])
    params = {"format":"csv","timezone":"UTC","use_labels_for_header":"true","rows":"-1","where": f'datetime >= "{_isoz(start_dt)}" AND datetime < "{_isoz(end_dt)}"'}
    r = http_get(ELIA_CSV, params=params, timeout=timeout)
    raw = pd.read_csv(io.StringIO(r.text))
    cols = {c.lower(): c for c in raw.columns}
    ts_col = cols.get("datetime") or cols.get("date_time") or list(raw.columns)[0]
    price_col = cols.get("imbalance price eur per mwh") or cols.get("price_eur_mwh") or list(raw.columns)[-1]
    out = pd.DataFrame({"ts_utc": pd.to_datetime(raw[ts_col], utc=True, errors="coerce"),
                        "price_eur_mwh": pd.to_numeric(raw[price_col], errors="coerce")}).dropna(subset=["ts_utc"]).sort_values("ts_utc")
    return out

def be_ods133_between_fast(start_dt: datetime, end_dt: datetime, chunk="D") -> pd.DataFrame:
    cur_start = max(start_dt, ELIA_EARLIEST); cur_end = end_dt
    if cur_start >= cur_end: return pd.DataFrame(columns=["ts_utc","price_eur_mwh"])
    frames = []
    edges = pd.date_range(pd.Timestamp(cur_start).floor(chunk), pd.Timestamp(cur_end).ceil(chunk), freq=chunk, tz="UTC")
    if len(edges) == 0 or edges[0] != pd.Timestamp(cur_start).floor(chunk):
        edges = edges.insert(0, pd.Timestamp(cur_start).floor(chunk))
    for i in range(len(edges)-1):
        s = max(pd.Timestamp(cur_start), edges[i]).to_pydatetime()
        e = min(pd.Timestamp(cur_end),   edges[i+1]).to_pydatetime()
        if s >= e: continue
        try:
            df = be_ods133_v21_range(s, e)
        except Exception as ex:
            log(f"[BE v2.1 chunk fail] {s}..{e}: {ex} -> CSV fallback")
            df = be_ods133_csv_range(s, e)
        if not df.empty: frames.append(df)
    if not frames: return pd.DataFrame(columns=["ts_utc","price_eur_mwh"])
    out = pd.concat(frames, ignore_index=True).drop_duplicates(subset=["ts_utc"]).sort_values("ts_utc").reset_index(drop=True)
    return out

In [None]:
# NL Balancing — TenneT readiness stub
TENNET_BASE = "https://developer.tennet.eu"
def tennet_headers():
    if not TENNET_API_KEY: return {}
    return {TENNET_KEY_HEADER: TENNET_API_KEY, "Accept": "application/json"}

In [None]:
# WHOLESALE runs (DA via ENTSO-E)
if RUN_GB_DA:
    if ENTSOE_TOKEN:
        gb_da_win = entsoe_da_prices("10YGB----------A", WINDOW_START, WINDOW_END)
        qc_print(gb_da_win, "GB — ENTSOE DA (window)", "da_price")
        if not gb_da_win.empty:
            stamp = f"{WINDOW_START:%Y%m%d%H%M}_{WINDOW_END:%Y%m%d%H%M}"
            write_csv_no_dups(gb_da_win, GB_DA_DIR / f"GB_DA_window_{stamp}.csv", key_cols=["ts_utc","da_price"])
    else:
        log("GB DA skipped (ENTSOE_TOKEN missing).")

if RUN_BE_DA:
    if ENTSOE_TOKEN:
        be_da_win = entsoe_da_prices("10YBE----------2", WINDOW_START, WINDOW_END)
        qc_print(be_da_win, "BE — ENTSOE DA (window)", "da_price")
        if not be_da_win.empty:
            stamp = f"{WINDOW_START:%Y%m%d%H%M}_{WINDOW_END:%Y%m%d%H%M}"
            write_csv_no_dups(be_da_win, BE_DA_DIR / f"BE_DA_window_{stamp}.csv", key_cols=["ts_utc","da_price"])
    else:
        log("BE DA skipped (ENTSOE_TOKEN missing).")

if RUN_NL_DA:
    if ENTSOE_TOKEN:
        nl_da_win = entsoe_da_prices("10YNL----------L", WINDOW_START, WINDOW_END)
        qc_print(nl_da_win, "NL — ENTSOE DA (window)", "da_price")
        if not nl_da_win.empty:
            stamp = f"{WINDOW_START:%Y%m%d%H%M}_{WINDOW_END:%Y%m%d%H%M}"
            write_csv_no_dups(nl_da_win, NL_DA_DIR / f"NL_DA_window_{stamp}.csv", key_cols=["ts_utc","da_price"])
    else:
        log("NL DA skipped (ENTSOE_TOKEN missing).")

In [None]:
# BALANCING runs
if RUN_GB_BAL:
    yday = (datetime.now(timezone.utc) - timedelta(days=1)).date().strftime("%Y-%m-%d")
    gb_bal_day = gb_system_prices_day(yday)
    qc_print(gb_bal_day, "GB — Elexon Balancing (yday)", "sp_gbp_mwh")
    if not gb_bal_day.empty:
        write_csv_no_dups(gb_bal_day, GB_BAL_DIR / f"GB_bal_day_{yday.replace('-','')}.csv",
                          key_cols=["ts_utc","sp_gbp_mwh"])
else:
    log("GB Balancing disabled.")

if RUN_BE_BAL:
    be_bal_win = be_ods133_between_fast(WINDOW_START, WINDOW_END, chunk="D")
    qc_print(be_bal_win, "BE — Elia ods133 (window, fast)", "price_eur_mwh")
    if not be_bal_win.empty:
        stamp = f"{WINDOW_START:%Y%m%d%H%M}_{WINDOW_END:%Y%m%d%H%M}"
        write_csv_no_dups(be_bal_win, BE_BAL_DIR / f"BE_bal_window_{stamp}.csv",
                          key_cols=["ts_utc","price_eur_mwh"])
else:
    log("BE Balancing disabled.")

In [None]:
# Availability snapshot
frames = []
frames.append(scan_dir_safe(GB_DA_DIR,  "GB_DA"))
frames.append(scan_dir_safe(GB_BAL_DIR, "GB_BAL"))
frames.append(scan_dir_safe(BE_DA_DIR,  "BE_DA"))
frames.append(scan_dir_safe(BE_BAL_DIR, "BE_BAL"))
frames.append(scan_dir_safe(NL_DA_DIR,  "NL_DA"))
frames.append(scan_dir_safe(NL_BAL_DIR, "NL_BAL"))
avail = pd.concat([f for f in frames if f is not None], ignore_index=True) if frames else pd.DataFrame()
avail = avail.sort_values(["dataset","file"]).reset_index(drop=True)
display(avail)

def coverage_summary(df: pd.DataFrame) -> pd.DataFrame:
    if df.empty:
        return pd.DataFrame(columns=["dataset","files","total_rows","min_ts","max_ts"])
    g = (df.groupby("dataset")
           .agg(files=("file","count"),
                total_rows=("rows","sum"),
                min_ts=("min_ts","min"),
                max_ts=("max_ts","max"))
           .reset_index())
    return g

cov = coverage_summary(avail)
display(cov)

In [None]:
# Spike detection utilities
def load_all(dir_path: Path, ts_col: str, val_col: str) -> pd.DataFrame:
    frames = []
    for p in sorted(dir_path.glob("*.csv")):
        try:
            df = pd.read_csv(p)
            if ts_col not in df.columns or val_col not in df.columns: 
                continue
            df[ts_col] = pd.to_datetime(df[ts_col], utc=True, errors="coerce")
            frames.append(df[[ts_col, val_col]])
        except Exception:
            pass
    if not frames: return pd.DataFrame(columns=[ts_col, val_col])
    out = (pd.concat(frames, ignore_index=True)
             .dropna()
             .drop_duplicates(subset=[ts_col])
             .sort_values(ts_col))
    return out

def detect_spikes(df: pd.DataFrame, ts_col: str, val_col: str, baseline_days=90, q=0.99):
    if df.empty:
        return df.assign(is_spike=pd.Series(dtype=bool))
    s = df[[ts_col, val_col]].copy().sort_values(ts_col).reset_index(drop=True)
    s["thr"] = s[val_col].rolling(f"{baseline_days}D", on=ts_col).quantile(0.99, interpolation="nearest")
    s["is_spike"] = s[val_col] > s["thr"]
    return s

def window_stats(spike_df: pd.DataFrame, ts_col: str, val_col: str, start_dt: datetime, end_dt: datetime):
    if spike_df.empty:
        return {"rows": 0, "spikes_win": 0, "spikes_out": 0, "ratio": None}
    m_win = (spike_df[ts_col] >= start_dt) & (spike_df[ts_col] < end_dt)
    spikes_win = int(spike_df.loc[m_win, "is_spike"].sum())
    spikes_out = int(spike_df.loc[~m_win, "is_spike"].sum())
    rows = int(len(spike_df))
    ratio = (spikes_win / max(1, spikes_out)) if spikes_out else None
    return {"rows": rows, "spikes_win": spikes_win, "spikes_out": spikes_out, "ratio": ratio}

In [None]:
# Event windows (incl. Brexit) & analysis
EVENTS = [
    {"name": "Brexit Referendum", "start": "2016-06-20", "end": "2016-06-27"},
    {"name": "Brexit Day",        "start": "2020-01-29", "end": "2020-02-03"},
    {"name": "Transition End",    "start": "2020-12-28", "end": "2021-01-03"},
    {"name": "Gas Crisis 2021",   "start": "2021-09-01", "end": "2021-10-01"},
]

def to_utc_dt(s: str) -> datetime:
    return datetime.fromisoformat(s).replace(tzinfo=timezone.utc)

gb_bal_all = load_all(GB_BAL_DIR, "ts_utc", "sp_gbp_mwh")
gb_da_all  = load_all(GB_DA_DIR,  "ts_utc", "da_price")
be_bal_all = load_all(BE_BAL_DIR, "ts_utc", "price_eur_mwh")
be_da_all  = load_all(BE_DA_DIR,  "ts_utc", "da_price")
nl_da_all  = load_all(NL_DA_DIR,  "ts_utc", "da_price")

gb_bal_sp = detect_spikes(gb_bal_all, "ts_utc", "sp_gbp_mwh", baseline_days=90, q=0.99)
gb_da_sp  = detect_spikes(gb_da_all,  "ts_utc", "da_price",    baseline_days=90, q=0.99)
be_bal_sp = detect_spikes(be_bal_all, "ts_utc", "price_eur_mwh", baseline_days=90, q=0.99)
be_da_sp  = detect_spikes(be_da_all,  "ts_utc", "da_price",    baseline_days=90, q=0.99)
nl_da_sp  = detect_spikes(nl_da_all,  "ts_utc", "da_price",    baseline_days=90, q=0.99)

rows = []
for ev in EVENTS:
    sdt, edt = to_utc_dt(ev["start"]), to_utc_dt(ev["end"])
    def rec(ds_name, stat):
        row = {"event": ev["name"], "dataset": ds_name}
        row.update(stat); rows.append(row)
    if not gb_bal_sp.empty: rec("GB_BAL", window_stats(gb_bal_sp, "ts_utc", "sp_gbp_mwh", sdt, edt))
    if not gb_da_sp.empty:  rec("GB_DA",  window_stats(gb_da_sp,  "ts_utc", "da_price",    sdt, edt))
    if not be_bal_sp.empty: rec("BE_BAL", window_stats(be_bal_sp, "ts_utc", "price_eur_mwh", sdt, edt))
    if not be_da_sp.empty:  rec("BE_DA",  window_stats(be_da_sp,  "ts_utc", "da_price",    sdt, edt))
    if not nl_da_sp.empty:  rec("NL_DA",  window_stats(nl_da_sp,  "ts_utc", "da_price",    sdt, edt))

event_summary = pd.DataFrame(rows)
display(event_summary)

In [None]:
# Final narrative summary
lines = []
def add(s): lines.append(s); print(s)

add("Summary — Data readiness & spikes")
add(f"- ENTSO-E token set: {bool(ENTSOE_TOKEN)}.")
add("- UK Balancing (Elexon) is keyless; BE Balancing (Elia) is open; NL Balancing awaits TenneT key.")
add("- Availability and coverage are above; DA 'how far back' appears after backfilling months.")

if 'event_summary' in globals() and not event_summary.empty:
    add("Event spike ratios (inside vs outside window) shown above per dataset; ratio>1 implies elevated spikes near the event.")
else:
    add("No event-window results yet — ensure DA and/or Balancing data exist for the chosen periods.")