# Data pipelines (API downloads)

This notebook fetches raw data from external APIs and writes them into the repo's folder layout:
- Fingrid: `Dataset/fin/<year>/fi_<dataset>_<year>.csv`
- eSett open data: `Dataset/esett/<year>/*.csv`
- Energi Data Service day-ahead prices:  `Dataset/energi/<year>/<zone>_prices.csv`
- ERA5 weather (GRIB): `Raw data/<year>/era5_weather_<year>_<month>.grib`, extracted to `Weather data/<year>/bidding_zone_weather_<year>_<month>.csv`

Environment: ensure `.env` exists with `FINGRID_API_KEY=<your_key>` (get one from Fingrid Open Data portal).


In [None]:
from __future__ import annotations

import os
import time
from pathlib import Path
from typing import List, Tuple

import pandas as pd
import requests
import xarray as xr
from dotenv import load_dotenv, find_dotenv
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

try:
    import cdsapi
except ImportError:
    cdsapi = None

load_dotenv()
FINGRID_API_KEY = os.getenv('FINGRID_API_KEY')

DATASET_DIR = Path('Dataset')
RAW_WEATHER_DIR = Path('Raw data')
WEATHER_OUT_DIR = Path('Weather data')

NORDIC_ZONES = ['DK1','DK2','NO1','NO2','NO3','NO4','NO5','SE1','SE2','SE3','SE4','FI']

## Fingrid (FI generation/consumption)
- Auth via `FINGRID_API_KEY` in `.env`
- Fetches monthly pages for 2023/2024
- Saves to `Dataset/fin/<year>/fi_<dataset>_<year>.csv`
Datasets: consumption (192), wind (181), nuclear (188), hydro (191).

In [2]:
FINGRID_DATASETS = {
    'consumption': 192,
    'wind': 181,
    'nuclear': 188,
    'hydro': 191,
}

BASE_URL_FINGRID = "https://data.fingrid.fi/api/datasets"

def monthly_ranges(year: int) -> List[Tuple[str, str]]:
    start = f"{year}-01-01"
    end = f"{year+1}-01-01"
    dates = pd.date_range(start=start, end=end, freq='MS')
    return [(
        dates[i].strftime('%Y-%m-%dT%H:%M:%SZ'),
        dates[i+1].strftime('%Y-%m-%dT%H:%M:%SZ')
    ) for i in range(len(dates)-1)]


def fetch_fingrid(dataset_id: int, start: str, end: str, page: int = 1, page_size: int = 20000) -> dict:
    headers = {'x-api-key': FINGRID_API_KEY}
    params = {
        'startTime': start,
        'endTime': end,
        'page': page,
        'pageSize': page_size,
        'format': 'json',
    }
    r = requests.get(f"{BASE_URL_FINGRID}/{dataset_id}/data", headers=headers, params=params, timeout=30)
    r.raise_for_status()
    return r.json()


def download_fingrid_year(dataset_id: int, name: str, year: int) -> None:
    out_dir = DATASET_DIR / 'fin' / str(year)
    out_dir.mkdir(parents=True, exist_ok=True)
    parts = []
    for start, end in monthly_ranges(year):
        page = 1
        while True:
            data_json = fetch_fingrid(dataset_id, start, end, page)
            data = data_json.get('data', [])
            if not data:
                break
            df = pd.DataFrame(data)
            df['datetime_utc'] = pd.to_datetime(df['startTime'], utc=True).dt.tz_localize(None)
            df = df[['datetime_utc','value']]
            parts.append(df)
            last_page = data_json.get('pagination', {}).get('lastPage', page)
            if page >= last_page:
                break
            page += 1
            time.sleep(0.2)
    if parts:
        final_df = pd.concat(parts).drop_duplicates().sort_values('datetime_utc')
        dest = out_dir / f"fi_{name}_{year}.csv"
        final_df.to_csv(dest, index=False)
        print(f"Saved {dest} ({len(final_df)} rows)")
    else:
        print(f"No data for {name} {year}")

RUN_FINGRID = True
if RUN_FINGRID:
    for name, dsid in FINGRID_DATASETS.items():
        for yr in [2023, 2024]:
            download_fingrid_year(dsid, name, yr)

HTTPError: 429 Client Error: Too Many Requests for url: https://data.fingrid.fi/api/datasets/192/data?startTime=2023-02-01T00%3A00%3A00Z&endTime=2023-03-01T00%3A00%3A00Z&page=1&pageSize=20000&format=json

## eSett open data (production, consumption, imbalance, balancing prices)
- Endpoints EXP16 (production), EXP15 (consumption), EXP13 (imbalance), EXP14 (balancing prices)
- Saved to `Dataset/esett/<year>/...`
- Loops zones via MBA codes and months.

In [None]:
MBA_CODES = {
    'SE1': '10Y1001A1001A44P',
    'SE2': '10Y1001A1001A45N',
    'SE3': '10Y1001A1001A46L',
    'SE4': '10Y1001A1001A47J',
    'FI':  '10YFI_1________U',
    'DK1': '10YDK-1--------W',
    'DK2': '10YDK-2--------M',
    'NO1': '10YNO_1________2',
    'NO2': '10YNO_2________T',
    'NO3': '10YNO_3________J',
    'NO4': '10YNO_4________9',
    'NO5': '10Y1001A1001A48H',
}

ESETT_BASE = "https://api.opendata.esett.com"

def month_edges(year: int):
    dates = pd.date_range(start=f"{year}-01-01", end=f"{year+1}-01-01", freq='MS')
    for i in range(len(dates)-1):
        yield dates[i], dates[i+1]


def fetch_esett(endpoint: str, mba: str, start, end) -> pd.DataFrame | None:
    params = {
        'start': start.strftime('%Y-%m-%dT00:00:00.000Z'),
        'end': end.strftime('%Y-%m-%dT00:00:00.000Z'),
        'mba': mba,
    }
    r = requests.get(f"{ESETT_BASE}{endpoint}", params=params, timeout=30)
    if r.status_code == 204:
        return None
    r.raise_for_status()
    return pd.DataFrame(r.json())


def normalize_series(df: pd.DataFrame, zone: str, value_col: str) -> pd.DataFrame | None:
    if df is None or df.empty:
        return None
    ts_col = 'timestampUTC' if 'timestampUTC' in df.columns else 'timestamp'
    if ts_col not in df.columns or value_col not in df.columns:
        return None
    out = df[[ts_col, value_col]].copy()
    out['datetime_utc'] = pd.to_datetime(out[ts_col], utc=True).dt.tz_localize(None)
    return out[['datetime_utc', value_col]].rename(columns={value_col: zone})


def download_esett(endpoint: str, out_name: str, value_col: str, years: list[int]) -> None:
    for year in years:
        dest_dir = DATASET_DIR / 'esett' / str(year)
        dest_dir.mkdir(parents=True, exist_ok=True)
        merged: pd.DataFrame | None = None
        for zone, mba in MBA_CODES.items():
            zone_parts = []
            print(f"{out_name} | {zone} | {year}")
            for start, end in month_edges(year):
                try:
                    df = fetch_esett(endpoint, mba, start, end)
                    df = normalize_series(df, zone, value_col)
                    if df is not None:
                        zone_parts.append(df)
                except Exception as e:
                    print(f"  Error {zone} {start.date()}: {e}")
                time.sleep(1)
            if zone_parts:
                zone_df = pd.concat(zone_parts).drop_duplicates().sort_values('datetime_utc')
                merged = zone_df if merged is None else merged.merge(zone_df, on='datetime_utc', how='outer')
        if merged is not None:
            dest = dest_dir / f"{out_name}_{year}.csv"
            merged.to_csv(dest, index=False)
            print(f"Saved {dest} ({merged.shape})")
        else:
            print(f"No data for {out_name} {year}")

RUN_ESETT = True
if RUN_ESETT:
    download_esett('/EXP16/Aggregate', 'production', 'total', [2023, 2024])
    download_esett('/EXP15/Aggregate', 'consumption', 'total', [2023, 2024])
    download_esett('/EXP13/Aggregate', 'imbalance', 'imbalance', [2023, 2024])
    download_esett('/EXP14/Aggregate', 'balancing_price', 'upRegPrice', [2023, 2024])


## Energi Data Service (Elspot prices)
Downloads day-ahead prices for all Nordic zones and saves to `Dataset/energi/<year>/<zone>_prices.csv`.

In [None]:
def get_elspot_prices(start: str, end: str, area: str) -> pd.DataFrame | None:
    url = 'https://api.energidataservice.dk/dataset/Elspotprices'
    params = {'start': start, 'end': end, 'filter': f'{{"PriceArea": ["{area}"]}}'}
    resp = requests.get(url, params=params, timeout=30)
    if resp.status_code != 200:
        print(f"Failed {area}: {resp.text}")
        return None
    data = resp.json().get('records', [])
    if not data:
        return None
    df = pd.DataFrame(data)
    df['datetime_utc'] = pd.to_datetime(df['HourUTC']).dt.tz_localize(None)
    return df[['datetime_utc', 'SpotPriceEUR']]


def download_elspot(year: int) -> None:
    start, end = f"{year}-01-01", f"{year+1}-01-01"
    out_dir = DATASET_DIR / 'energi' / str(year)
    out_dir.mkdir(parents=True, exist_ok=True)
    for zone in NORDIC_ZONES:
        print(f"Prices {zone} {year}")
        df = get_elspot_prices(start, end, zone)
        if df is None:
            print(f"  No data {zone} {year}")
            continue
        dest = out_dir / f"{zone.lower()}_prices.csv"
        df.to_csv(dest, index=False)
        print(f"  Saved {dest}")

RUN_ELSPOT = True
if RUN_ELSPOT:
    for yr in [2023, 2024]:
        download_elspot(yr)


## ERA5 weather (download + extract)
Downloads monthly ERA5 single-level data to GRIB and extracts per bidding zone to CSV in `Weather data/<year>/...`. Requires `cdsapi` credentials configured in your local CDS file.

In [7]:
BBOX = [72, 5, 54, 32]  # N, W, S, E
POINTS = {
    "DK1": (55.6, 9.2),
    "DK2": (55.7, 12.5),
    "NO1": (60.0, 10.0),
    "NO2": (59.0, 6.5),
    "NO3": (64.0, 11.0),
    "NO4": (69.0, 19.0),
    "NO5": (62.0, 5.5),
    "SE1": (66.0, 20.0),
    "SE2": (63.0, 17.0),
    "SE3": (59.5, 16.0),
    "SE4": (57.0, 15.0),
    "FI":  (61.5, 25.0),
}


def download_era5_month(year: int, month: int) -> Path | None:
    if cdsapi is None:
        print("cdsapi not installed; skipping download")
        return None
    raw_dir = RAW_WEATHER_DIR / str(year)
    raw_dir.mkdir(parents=True, exist_ok=True)
    grib_path = raw_dir / f"era5_weather_{year}_{month:02d}.grib"
    if grib_path.exists():
        return grib_path
    client = cdsapi.Client()
    req = {
        'product_type': 'reanalysis',
        'variable': [
            '2m_temperature','10m_u_component_of_wind','10m_v_component_of_wind',
            'mean_sea_level_pressure','total_precipitation','100m_u_component_of_wind','100m_v_component_of_wind','surface_solar_radiation_downwards'
        ],
        'year': [f"{year}"],
        'month': [f"{month:02d}"],
        'day': [f"{d:02d}" for d in range(1,32)],
        'time': [f"{h:02d}:00" for h in range(24)],
        'area': BBOX,
        'format': 'grib',
    }
    client.retrieve('reanalysis-era5-single-levels', req).download(str(grib_path))
    return grib_path


def extract_grib_to_csv(grib_path: Path) -> None:
    ds = xr.open_dataset(grib_path, engine='cfgrib', backend_kwargs={'indexpath': ''})
    year = grib_path.parent.name
    month = grib_path.stem.split('_')[-1]
    out_dir = WEATHER_OUT_DIR / str(year)
    out_dir.mkdir(parents=True, exist_ok=True)
    rows = []
    for zone, (lat, lon) in POINTS.items():
        point = ds.sel(latitude=lat, longitude=lon, method='nearest')
        df_zone = point.to_dataframe().reset_index()
        df_zone['zone'] = zone
        rows.append(df_zone)
    df_all = pd.concat(rows, ignore_index=True)
    keep = ['valid_time','zone'] + [c for c in ['t2m','u10','v10','msl','tp','ssrd','u100','v100'] if c in df_all.columns]
    df_all = df_all[keep]
    if 't2m' in df_all.columns:
        df_all['t2m'] = df_all['t2m'] - 273.15
    out_path = out_dir / f"bidding_zone_weather_{year}_{month}.csv"
    df_all.to_csv(out_path, index=False)
    print(f"Saved {out_path}")

RUN_ERA5_DOWNLOAD = True
RUN_ERA5_EXTRACT = True
YEARS_ERA5 = [2023, 2024]

if RUN_ERA5_DOWNLOAD:
    for yr in YEARS_ERA5:
        for mo in range(1,13):
            download_era5_month(yr, mo)

if RUN_ERA5_EXTRACT:
    for yr in YEARS_ERA5:
        for mo in range(1,13):
            grib = RAW_WEATHER_DIR / str(yr) / f"era5_weather_{yr}_{mo:02d}.grib"
            if grib.exists():
                extract_grib_to_csv(grib)
            else:
                print(f"Missing {grib}")

skipping variable: paramId==228 shortName='tp'
Traceback (most recent call last):
  File "c:\Users\sasha\miniconda3\envs\era5\Lib\site-packages\cfgrib\dataset.py", line 726, in build_dataset_components
    dict_merge(variables, coord_vars)
  File "c:\Users\sasha\miniconda3\envs\era5\Lib\site-packages\cfgrib\dataset.py", line 642, in dict_merge
    raise DatasetBuildError(
cfgrib.dataset.DatasetBuildError: key present and new value is different: key='time' value=Variable(dimensions=('time',), data=array([1672531200, 1672534800, 1672538400, 1672542000, 1672545600,
       1672549200, 1672552800, 1672556400, 1672560000, 1672563600,
       1672567200, 1672570800, 1672574400, 1672578000, 1672581600,
       1672585200, 1672588800, 1672592400, 1672596000, 1672599600,
       1672603200, 1672606800, 1672610400, 1672614000, 1672617600,
       1672621200, 1672624800, 1672628400, 1672632000, 1672635600,
       1672639200, 1672642800, 1672646400, 1672650000, 1672653600,
       1672657200, 1672660800

Saved Weather data\2023\bidding_zone_weather_2023_01.csv


skipping variable: paramId==228 shortName='tp'
Traceback (most recent call last):
  File "c:\Users\sasha\miniconda3\envs\era5\Lib\site-packages\cfgrib\dataset.py", line 726, in build_dataset_components
    dict_merge(variables, coord_vars)
  File "c:\Users\sasha\miniconda3\envs\era5\Lib\site-packages\cfgrib\dataset.py", line 642, in dict_merge
    raise DatasetBuildError(
cfgrib.dataset.DatasetBuildError: key present and new value is different: key='time' value=Variable(dimensions=('time',), data=array([1675209600, 1675213200, 1675216800, 1675220400, 1675224000,
       1675227600, 1675231200, 1675234800, 1675238400, 1675242000,
       1675245600, 1675249200, 1675252800, 1675256400, 1675260000,
       1675263600, 1675267200, 1675270800, 1675274400, 1675278000,
       1675281600, 1675285200, 1675288800, 1675292400, 1675296000,
       1675299600, 1675303200, 1675306800, 1675310400, 1675314000,
       1675317600, 1675321200, 1675324800, 1675328400, 1675332000,
       1675335600, 1675339200

Saved Weather data\2023\bidding_zone_weather_2023_02.csv


skipping variable: paramId==228 shortName='tp'
Traceback (most recent call last):
  File "c:\Users\sasha\miniconda3\envs\era5\Lib\site-packages\cfgrib\dataset.py", line 726, in build_dataset_components
    dict_merge(variables, coord_vars)
  File "c:\Users\sasha\miniconda3\envs\era5\Lib\site-packages\cfgrib\dataset.py", line 642, in dict_merge
    raise DatasetBuildError(
cfgrib.dataset.DatasetBuildError: key present and new value is different: key='time' value=Variable(dimensions=('time',), data=array([1677628800, 1677632400, 1677636000, 1677639600, 1677643200,
       1677646800, 1677650400, 1677654000, 1677657600, 1677661200,
       1677664800, 1677668400, 1677672000, 1677675600, 1677679200,
       1677682800, 1677686400, 1677690000, 1677693600, 1677697200,
       1677700800, 1677704400, 1677708000, 1677711600, 1677715200,
       1677718800, 1677722400, 1677726000, 1677729600, 1677733200,
       1677736800, 1677740400, 1677744000, 1677747600, 1677751200,
       1677754800, 1677758400

KeyboardInterrupt: 