In [None]:
# ==================  High-throughput Elexon bulk crawler  ==================
import asyncio, aiohttp, gzip, json, datetime as dt
from pathlib import Path
import nest_asyncio, aiolimiter
nest_asyncio.apply()

In [None]:
BASE_URL = "https://data.elexon.co.uk/bmrs/api/v1"

ENDPOINTS = {

    # ------------------ Generation ----------------------------------------
    "GEN_PER_TYPE":
        "/datasets/AGPT?"
        "publishDateTimeFrom={from_ts}&publishDateTimeTo={to_ts}",

    "INTER":
        "/generation/outturn/interconnectors?"
        "settlementDateFrom={date}&settlementDateTo={date}",

    "DAYAHEAD_GEN_WIND_SOLAR":
        "/forecast/generation/wind-and-solar/day-ahead?"
        "from={from_ts}&to={to_ts}&processType=day%20ahead",

    "INTRADAYPROCESS_GEN_WIND_SOLAR":
        "/forecast/generation/wind-and-solar/day-ahead?"
        "from={from_ts}&to={to_ts}&processType=intraday%20process",

    "INTRADAYTOTAL_GEN_WIND_SOLAR":
        "/forecast/generation/wind-and-solar/day-ahead?"
        "from={from_ts}&to={to_ts}&processType=intraday%20total",

    "ACTUAL_GEN_WIND_SOLAR":
        "/datasets/AGWS?publishDateTimeFrom={from_ts}&publishDateTimeTo={to_ts}",

    # ------------------ Demand -------------------------------------------
    "DAYAHEAD_DEMAND":
        "/forecast/demand/day-ahead/history?"
        "publishTime={date}",

    "INDICATED_DAYAHEAD_DEMAND":
        "/forecast/indicated/day-ahead/history?"
        "publishTime={date}",

    "ACTUAL_DEMAND":
        "/demand/outturn?"
        "settlementDateFrom={date}&settlementDateTo={date}",

    # ------------------ Balancing ----------------------------------------
    "SYSTEM_PRICES":
        "/balancing/settlement/system-prices/{date}",

    "BSAD":
        "/datasets/netbsad?from={from_ts}&to={to_ts}",

    "MID":
        "/datasets/mid?from={from_ts}&to={to_ts}",

    "NONBM":
        "/datasets/NONBM?from={from_ts}&to={to_ts}",

    # ------------------ Transmission -------------------------------------
    "LOLPDRM":
        "/forecast/system/loss-of-load?from={from_ts}&to={to_ts}",
}
# --------------------------------------------------------------------------

START_DATE = dt.date(2017, 1, 1)
END_DATE   = dt.date(2025, 7, 1)

BASE_DIR = Path("bmrs_json_raw")
BASE_DIR.mkdir(exist_ok=True)

def _ph(day: dt.date):
    """Return placeholders dict for a given date."""
    iso = day.isoformat()
    return {
        "date"   : iso,
        "from_ts": f"{iso}T00:00:00Z",
        "to_ts"  : f"{iso}T23:59:59Z",
    }

# --------------------------------------------------------------------------
CONCURRENCY = 256                 # sockets
PER_MINUTE  = aiolimiter.AsyncLimiter(4500, 60)   
PER_SECOND  = aiolimiter.AsyncLimiter(75,   1)    
TIMEOUT     = aiohttp.ClientTimeout(total=40)
# --------------------------------------------------------------------------

async def _fetch(sess: aiohttp.ClientSession, url: str, dest: Path, log: Path):
    if dest.exists():
        return

    # acquire from **both** buckets
    async with PER_SECOND, PER_MINUTE:
        async with sess.get(url) as r:
            if r.status == 404:
                return
            if r.status == 429:
                # simple exponential back-off then retry once
                await asyncio.sleep(5)
                async with sess.get(url) as r2:
                    if r2.status == 404 or r2.status == 429:
                        return
                    r2.raise_for_status()
                    payload = await r2.json()
            else:
                r.raise_for_status()
                payload = await r.json()

    if isinstance(payload, dict) and payload.get("data") == []:
        log.write_text((log.read_text() + dest.stem + "\n") if log.exists()
                       else dest.stem + "\n")
        return

    with gzip.open(dest, "wt") as fh:
        json.dump(payload, fh)


async def _all_tasks():
    day = START_DATE
    while day <= END_DATE:
        ph = _ph(day)
        for code, tpl in ENDPOINTS.items():
            sub   = BASE_DIR / code
            sub.mkdir(exist_ok=True)
            url   = f"{BASE_URL}{tpl.format(**ph)}"
            dest  = sub / f"{ph['date']}.json.gz"
            log   = sub / "empty.log"
            yield url, dest, log
        day += dt.timedelta(days=1)


async def main():
    connector = aiohttp.TCPConnector(limit=CONCURRENCY, ttl_dns_cache=300)
    async with aiohttp.ClientSession(connector=connector,
                                     timeout=TIMEOUT) as sess:
        tasks = []
        async for url, dest, log in _all_tasks():
            tasks.append(asyncio.create_task(_fetch(sess, url, dest, log)))
        CHUNK = 10_000
        for i in range(0, len(tasks), CHUNK):
            await asyncio.gather(*tasks[i:i+CHUNK])
            print(f"✓ {min(i+CHUNK, len(tasks)):,} / {len(tasks):,} done")


In [9]:
# This cell actually runs the download
await main()
print("✅  completed")


✓ 10,000 / 43,456 done
✓ 20,000 / 43,456 done
✓ 30,000 / 43,456 done
✓ 40,000 / 43,456 done
✓ 43,456 / 43,456 done
✅  completed


In [None]:
import asyncio
import aiohttp
import gzip
import json
import pandas as pd
from datetime import datetime, timedelta, timezone
from aiolimiter import AsyncLimiter
from pathlib import Path

# Configuration
BASE_URL       = "https://data.elexon.co.uk/bmrs/api/v1"
HISTORY_EP     = "/forecast/generation/wind/history"

# Explicit mapping of local clock to settlementPeriod
PUBLISH_MAP    = {
    "03:30": 6,
    "05:30": 10,
    "08:30": 16,
    "10:30": 20,
    "12:30": 24,
    "16:30": 32,
    "19:30": 38,
    "23:30": 46,
}
START_DATE     = datetime(2016,12,31).date()
END_DATE       = datetime(2025,6,30).date()
BASE_DIR       = Path("bmrs_json_raw")
DETAILED_DIR   = BASE_DIR / "DETAILED_WINDFOR"
DETAILED_DIR.mkdir(exist_ok=True, parents=True)

# Rate limits
PER_MINUTE     = AsyncLimiter(4500, 60)
PER_SECOND     = AsyncLimiter(75,   1)
TIMEOUT        = aiohttp.ClientTimeout(total=40)
CONCURRENCY    = 256

async def fetch_and_save(sess: aiohttp.ClientSession, date: datetime.date, pt: str, sp: int):
    publish_str = f"{date.isoformat()}T{pt}Z"
    dest = DETAILED_DIR / f"{date}_{sp}.json.gz"
    if dest.exists():
        return

    async with PER_SECOND, PER_MINUTE:
        resp = await sess.get(f"{BASE_URL}{HISTORY_EP}", params={"publishTime": publish_str})
        resp.raise_for_status()
        payload = await resp.json()

    data = payload.get("data", [])
    if not data:
        return

    df = pd.DataFrame(data)
    df["publishTime"] = pd.to_datetime(df["publishTime"], utc=True)
    df["startTime"]   = pd.to_datetime(df["startTime"],   utc=True)

    pub_dt = df["publishTime"].iloc[0]
    window_start = pub_dt
    window_end   = pub_dt + timedelta(hours=24)
    filtered = df[(df["startTime"] >= window_start) & (df["startTime"] <= window_end)].copy()

    # format for JSON
    filtered["publishTime"] = filtered["publishTime"].dt.strftime("%Y-%m-%dT%H:%M:%SZ")
    filtered["startTime"]   = filtered["startTime"].dt.strftime("%Y-%m-%dT%H:%M:%SZ")

    out = {"metadata": payload.get("metadata", {}), "data": filtered.to_dict(orient="records")}

    with gzip.open(dest, "wt") as fh:
        json.dump(out, fh)

async def main():
    connector = aiohttp.TCPConnector(limit=CONCURRENCY, ttl_dns_cache=300)
    async with aiohttp.ClientSession(connector=connector, timeout=TIMEOUT) as sess:
        tasks = []
        day = START_DATE
        while day <= END_DATE:
            for pt, sp in PUBLISH_MAP.items():
                tasks.append(asyncio.create_task(fetch_and_save(sess, day, pt, sp)))
            day += timedelta(days=1)
        await asyncio.gather(*tasks)
        print(f"✅ Completed all {len(tasks)} fetches")


In [23]:
await main()
print("✅  completed")

✅ Completed all 24832 fetches
✅  completed


In [26]:
import asyncio
import aiohttp
import gzip
import json
import pandas as pd
from datetime import datetime, date, time, timedelta, timezone
from aiolimiter import AsyncLimiter
from pathlib import Path

# Configuration
BASE_URL     = "https://data.elexon.co.uk/bmrs/api/v1"
EVOLUTION_EP = "/forecast/generation/wind/evolution"

START_DATE   = date(2017, 1, 1)
END_DATE     = date(2025, 6, 30)

BASE_DIR     = Path("bmrs_json_raw")
EVOL_DIR     = BASE_DIR / "EVOLUTION_WINDFOR"
EVOL_DIR.mkdir(exist_ok=True, parents=True)

# Rate limits
PER_MINUTE   = AsyncLimiter(4500, 60)
PER_SECOND   = AsyncLimiter(75,   1)
TIMEOUT      = aiohttp.ClientTimeout(total=40)
CONCURRENCY  = 256

async def fetch_and_save_evolution(sess: aiohttp.ClientSession, start_dt: datetime):
    """
    Fetch the evolution series for a given forecast startTime,
    keep the 8 latest publishes at least 1h before start_dt, and save to gzipped JSON.
    """
    iso_start = start_dt.strftime("%Y-%m-%dT%H:%M:%SZ")
    stamp     = start_dt.strftime("%Y-%m-%d_%H%M")
    dest      = EVOL_DIR / f"{stamp}.json.gz"
    if dest.exists():
        return

    params = {"startTime": iso_start, "format": "json"}
    async with PER_SECOND, PER_MINUTE:
        resp = await sess.get(f"{BASE_URL}{EVOLUTION_EP}", params=params)
        resp.raise_for_status()
        payload = await resp.json()

    data = payload.get("data", [])
    if not data:
        return

    df = pd.DataFrame(data)
    df["publishTime"] = pd.to_datetime(df["publishTime"], utc=True)

    # filter to publishes at least 1h before the startTime
    cutoff = start_dt - timedelta(hours=1)
    df = df[df["publishTime"] <= cutoff]
    if df.empty:
        return

    # take the 8 most recent publishes
    df = df.sort_values("publishTime", ascending=False).head(8).copy()
    df["publishTime"] = df["publishTime"].dt.strftime("%Y-%m-%dT%H:%M:%SZ")

    out = {
        "metadata": payload.get("metadata", {}),
        "data": df.to_dict(orient="records")
    }

    with gzip.open(dest, "wt") as fh:
        json.dump(out, fh)

async def main():
    connector = aiohttp.TCPConnector(limit=CONCURRENCY, ttl_dns_cache=300)
    async with aiohttp.ClientSession(connector=connector, timeout=TIMEOUT) as sess:
        tasks = []
        day = START_DATE
        one_day = timedelta(days=1)
        half_hour = timedelta(minutes=30)

        while day <= END_DATE:
            # start at 00:00 UTC of this day
            current = datetime.combine(day, time(0, 0), tzinfo=timezone.utc)
            end_of_day = current + one_day

            # step in 30-minute increments
            while current < end_of_day:
                tasks.append(asyncio.create_task(fetch_and_save_evolution(sess, current)))
                current += half_hour

            day += one_day

        await asyncio.gather(*tasks)
        print(f"✅ Completed all {len(tasks)} evolution fetches")


In [28]:
await main()

✅ Completed all 148944 evolution fetches
✅  completed


In [7]:
import asyncio
import aiohttp
import gzip
import json
import pandas as pd
from datetime import datetime, timedelta, timezone
from aiolimiter import AsyncLimiter
from pathlib import Path

# Configuration
BASE_URL     = "https://data.elexon.co.uk/bmrs/api/v1"
HISTORY_EP   = "/forecast/demand/day-ahead/history"
START_DATE   = datetime(2021,7,1).date()
END_DATE     = datetime(2025,6,30).date()
BASE_DIR     = Path("bmrs_json_raw")
DETAILED_DIR = BASE_DIR / "DEMAND_FORECASTS"
DETAILED_DIR.mkdir(exist_ok=True, parents=True)

# Rate limiting
PER_MINUTE   = AsyncLimiter(4500, 60)
PER_SECOND   = AsyncLimiter(75,   1)
TIMEOUT      = aiohttp.ClientTimeout(total=40)
CONCURRENCY  = 64

def _round_up_to_next_30(dt: datetime) -> datetime:
    dt0    = dt.replace(second=0, microsecond=0)
    extra  = dt0.minute % 30
    if extra == 0 and dt.second == 0 and dt.microsecond == 0:
        return dt0
    return dt0 + timedelta(minutes=(30 - extra))

async def fetch_and_save(sess: aiohttp.ClientSession, query_dt: datetime):
    publish_str = query_dt.strftime("%Y-%m-%dT%H:%M:%SZ")
    async with PER_SECOND, PER_MINUTE:
        resp = await sess.get(f"{BASE_URL}{HISTORY_EP}",
                              params={"publishTime": publish_str})
        resp.raise_for_status()
        payload = await resp.json()

    data = payload.get("data", [])
    if not data:
        return

    df = pd.DataFrame(data)
    df["publishTime"] = pd.to_datetime(df["publishTime"], utc=True)
    df["startTime"]   = pd.to_datetime(df["startTime"],   utc=True)

    # keep only the next 24 hours
    pub_dt = df["publishTime"].iloc[0]
    window_start = pub_dt + timedelta(minutes=30)
    window_end   = pub_dt + timedelta(hours=24)
    df = df[(df["startTime"] >= window_start)
           & (df["startTime"] <= window_end)].copy()
    if df.empty:
        return

    # re-format timestamps for JSON
    df["publishTime"] = df["publishTime"].dt.strftime("%Y-%m-%dT%H:%M:%SZ")
    df["startTime"]   = df["startTime"].dt.strftime("%Y-%m-%dT%H:%M:%SZ")

    # determine filename by rounding *actual* publishTime up
    rounded = _round_up_to_next_30(pub_dt)
    fname   = f"{rounded.date().isoformat()}_{rounded.strftime('%H%M')}.json.gz"
    dest    = DETAILED_DIR / fname
    if dest.exists():
        return

    out = {
        "metadata": payload.get("metadata", {}),
        "data":     df.to_dict(orient="records")
    }
    with gzip.open(dest, "wt") as fh:
        json.dump(out, fh)

async def main():
    connector = aiohttp.TCPConnector(limit=CONCURRENCY, ttl_dns_cache=300)
    async with aiohttp.ClientSession(connector=connector, timeout=TIMEOUT) as sess:
        tasks = []
        day = START_DATE
        while day <= END_DATE:
            # start at midnight UTC on 'day'
            cursor = datetime.combine(day, datetime.min.time(), tzinfo=timezone.utc)
            end_dt = cursor + timedelta(days=1)
            # step through in 30-minute increments
            while cursor < end_dt:
                tasks.append(fetch_and_save(sess, cursor))
                cursor += timedelta(minutes=30)
            day += timedelta(days=1)

        await asyncio.gather(*tasks)
        print(f"✅ Completed all {len(tasks)} fetches")


In [8]:
await main()

✅ Completed all 70128 fetches
