In [1]:
import os
import requests
import datetime

import pandas as pd
from tqdm import tqdm
from dotenv import load_dotenv
from pydantic import TypeAdapter
from sqlmodel import SQLModel, create_engine
from sqlalchemy.orm import sessionmaker

from heatmap.conversion import ng_to_core__power_carbon_intensity, elexon_to_core__power_market_index
from heatmap.definitions.elexon import MarketIndex
from heatmap.definitions.ng_carbon_intensity import Response as NgCarbonIntensityResponse
from heatmap.definitions.timeseries import PowerCarbonIntensity, PowerPrice, GasPrice

In [2]:
load_dotenv('../.env')
DB_CONN_STR = os.environ['DB_CONN_STR'].replace('sqlite:///', 'sqlite:///../')

engine = create_engine(url=DB_CONN_STR)
bound_sessionmaker = sessionmaker(engine)
SQLModel.metadata.create_all(engine, tables=[
    PowerCarbonIntensity.__table__, 
    PowerPrice.__table__, 
    GasPrice.__table__
])

In [None]:
start_date = datetime.datetime(2021, 1, 1, 0, 0)
end_date = datetime.datetime(2024, 6, 16, 23, 59)

In [3]:
def batch_start_end_dates(start_date: datetime.datetime, end_date: datetime.datetime, freq: str = '7d') -> list[tuple[datetime.datetime, datetime.datetime]]:
    dt_rng = pd.date_range(start_date, end_date, freq=freq)
    return list(zip([dt+datetime.timedelta(minutes=30) for dt in dt_rng[:-1]], dt_rng[1:]))

In [4]:
download_power_carbon_intensities = False

if download_power_carbon_intensities:
    power_carbon_intensities = []

    for batch_start_date, batch_end_date in tqdm(batch_start_end_dates(start_date, end_date)):
        r = requests.get(f'https://api.carbonintensity.org.uk/intensity/{batch_start_date.strftime("%Y-%m-%dT%H:%MZ")}/{batch_end_date.strftime("%Y-%m-%dT%H:%MZ")}', headers={'Accept': 'application/json'})
        ng_carbon_intensity_response = NgCarbonIntensityResponse.model_validate(r.json())
        power_carbon_intensities.extend(ng_to_core__power_carbon_intensity(ng_carbon_intensity_response))

    # there's a single duplicated datetime that the below two lines will remove
    PowerCarbonIntensityList = TypeAdapter(list[PowerCarbonIntensity])
    power_carbon_intensities = PowerCarbonIntensityList.validate_python(pd.DataFrame([pci.model_dump() for pci in power_carbon_intensities]).drop_duplicates(subset=['start_time']).to_dict(orient='records'))

    with bound_sessionmaker() as session:
        session.bulk_save_objects(power_carbon_intensities)
        session.commit()

In [5]:
download_power_carbon_intensities = False

if download_power_carbon_intensities:
    url = 'https://data.elexon.co.uk/bmrs/api/v1/balancing/pricing/market-index'

    raw_data = []

    for batch_start_date, batch_end_date in tqdm(batch_start_end_dates(start_date, end_date)):
        params = {
            'from': batch_start_date.strftime('%Y-%m-%dT%H:%MZ'),
            'to': batch_end_date.strftime('%Y-%m-%dT%H:%MZ'),
            'dataProviders': 'APXMIDP'
        }
        
        r = requests.get(url, params=params, headers={'Accept': 'application/json'})
        raw_data.extend(r.json()['data'])

    MarketIndexList = TypeAdapter(list[MarketIndex])
    market_index_records = MarketIndexList.validate_python(raw_data)

    cleaned_power_price_records = elexon_to_core__power_market_index(market_index_records)

    with bound_sessionmaker() as session:
        session.bulk_save_objects(cleaned_power_price_records)
        session.commit()

  0%|          | 0/180 [00:00<?, ?it/s]

100%|██████████| 180/180 [00:14<00:00, 12.26it/s]
