# Reserve Data Exploration
Extract and examine ENTSOE reserve price data from ZIP files


In [1]:
import requests
import zipfile
import io
import xml.etree.ElementTree as ET
import pandas as pd
import numpy as np
from datetime import datetime


In [2]:
# Helper function to fetch data in chunks (avoid API limits)
def fetch_entsoe_in_chunks(base_url, start_date, end_date, chunk_days=15):
    """
    Fetch ENTSOE data in chunks to avoid API limits.
    
    Args:
        base_url: URL without periodStart/periodEnd parameters
        start_date: Start date string "YYYYMMDD0000"
        end_date: End date string "YYYYMMDD0000"
        chunk_days: Number of days per request (default 15)
    
    Returns:
        Combined response content (ZIP or XML)
    """
    from datetime import datetime, timedelta
    import time
    
    # Parse dates
    start = datetime.strptime(start_date, "%Y%m%d%H%M")
    end = datetime.strptime(end_date, "%Y%m%d%H%M")
    
    all_zips = []
    current = start
    
    while current < end:
        next_chunk = min(current + timedelta(days=chunk_days), end)
        
        period_start = current.strftime("%Y%m%d%H%M")
        period_end = next_chunk.strftime("%Y%m%d%H%M")
        
        url = f"{base_url}&periodStart={period_start}&periodEnd={period_end}"
        
        print(f"Fetching {period_start} to {period_end}...")
        
        response = requests.get(url)
        
        if response.status_code == 200:
            all_zips.append(response.content)
            print(f"  ✓ Got {len(response.content)} bytes")
        else:
            print(f"  ✗ Error {response.status_code}: {response.text[:200]}")
        
        current = next_chunk
        time.sleep(0.5)  # Rate limiting
    
    # Combine all ZIP files
    if len(all_zips) == 1:
        return all_zips[0]
    
    # Merge multiple ZIPs into one
    combined_buf = io.BytesIO()
    with zipfile.ZipFile(combined_buf, 'w', compression=zipfile.ZIP_DEFLATED) as zout:
        for idx, zip_content in enumerate(all_zips):
            try:
                ztemp = zipfile.ZipFile(io.BytesIO(zip_content))
                for name in ztemp.namelist():
                    data = ztemp.read(name)
                    out_name = f"chunk{idx}_{name}"
                    zout.writestr(out_name, data)
            except zipfile.BadZipFile:
                # Might be plain XML, wrap it
                zout.writestr(f"chunk{idx}_response.xml", zip_content)
    
    combined_buf.seek(0)
    return combined_buf.read()

print("Helper function loaded")


Helper function loaded


In [3]:
# Process Type = aFRR/A51
# Market Agreement = Daily
# Fetch one week from each season in 2024
API_KEY = "3da11da4-cb1c-41ea-8f2e-e80e188b9e4b"

# Define one week from each season in 2024
# January (winter), April (spring), June (summer), October (fall)
weeks_2024 = [
    ("202401080000", "202401150000"),  # January week (Jan 8-15)
    ("202404080000", "202404150000"),  # April week (Apr 8-15)
    ("202406100000", "202406170000"),  # June week (Jun 10-17)
    ("202410070000", "202410140000"),  # October week (Oct 7-14)
]

base_url = (
    "https://web-api.tp.entsoe.eu/api?"
    "documentType=A81&"
    "businessType=B95&"
    "processType=A51&"
    "Type_MarketAgreement.Type=A01&"
    "controlArea_Domain=10YHU-MAVIR----U&"
    f"securityToken={API_KEY}"
)

print("Fetching aFRR Daily data for 4 seasonal weeks in 2024...")
all_zip_contents = []

for i, (start, end) in enumerate(weeks_2024, 1):
    print(f"\nWeek {i}/4: {start[:8]} to {end[:8]}")
    week_content = fetch_entsoe_in_chunks(base_url, start, end, chunk_days=7)
    all_zip_contents.append(week_content)
    print(f"  ✓ Week {i} data: {len(week_content)} bytes")

# Combine all week ZIP files into one
print("\nCombining all weeks...")
import zipfile

combined_buf = io.BytesIO()
with zipfile.ZipFile(combined_buf, 'w', compression=zipfile.ZIP_DEFLATED) as zout:
    for idx, zip_content in enumerate(all_zip_contents):
        try:
            ztemp = zipfile.ZipFile(io.BytesIO(zip_content))
            for name in ztemp.namelist():
                data = ztemp.read(name)
                out_name = f"week{idx}_{name}"
                zout.writestr(out_name, data)
        except zipfile.BadZipFile:
            # Might be plain XML, wrap it
            zout.writestr(f"week{idx}_response.xml", zip_content)

combined_buf.seek(0)
afrr_d1_content = combined_buf.read()

# Create a mock response object to maintain compatibility
class MockResponse:
    def __init__(self, content):
        self.content = content
        self.status_code = 200

response_afrr_d1 = MockResponse(afrr_d1_content)
print(f"✓ Total combined aFRR Daily data: {len(afrr_d1_content)} bytes")

Fetching aFRR Daily data for 4 seasonal weeks in 2024...

Week 1/4: 20240108 to 20240115
Fetching 202401080000 to 202401150000...
  ✓ Got 6939 bytes
  ✓ Week 1 data: 6939 bytes

Week 2/4: 20240408 to 20240415
Fetching 202404080000 to 202404150000...


ConnectTimeout: HTTPSConnectionPool(host='web-api.tp.entsoe.eu', port=443): Max retries exceeded with url: /api?documentType=A81&businessType=B95&processType=A51&Type_MarketAgreement.Type=A01&controlArea_Domain=10YHU-MAVIR----U&securityToken=3da11da4-cb1c-41ea-8f2e-e80e188b9e4b&periodStart=202404080000&periodEnd=202404150000 (Caused by ConnectTimeoutError(<urllib3.connection.HTTPSConnection object at 0x000001FF48FEA850>, 'Connection to web-api.tp.entsoe.eu timed out. (connect timeout=None)'))

In [None]:
responses = [response_afrr_d1]

In [None]:
# Extract ZIP file

import io, zipfile
from pathlib import Path


zip_file1 = zipfile.ZipFile(io.BytesIO(response_afrr_d1.content))



zips = [zip_file1]  # your ZipFile objects

combined_buf = io.BytesIO()
with zipfile.ZipFile(combined_buf, 'w', compression=zipfile.ZIP_DEFLATED) as zout:
    for i, z in enumerate(zips):
        for name in z.namelist():
            data = z.read(name)
            out_name = f"{i}_{Path(name).name}"  # avoid name collisions
            zout.writestr(out_name, data)

combined_buf.seek(0)
zf = zipfile.ZipFile(combined_buf, 'r')   # merged zip
xml_files = zf.namelist()                 # flat list of all entries
print(f"Merged {len(xml_files)} XML files")

xml_files_afrr_d1 = zip_file1.namelist()                 # flat list of all entries




In [None]:
# Extract XML files into a single DataFrame (store raw XML text)
import re


rows = []

for xml_file in xml_files:
    print(f"\nProcessing {xml_file}...")

    # Read XML content and decode (handle BOM safely)
    xml_bytes = zf.read(xml_file)
    xml_text = xml_bytes.decode('utf-8-sig', errors='replace')

    # Split concatenated XML documents while preserving the declaration
    docs = [d for d in re.split(r'(?=<\?xml\s)', xml_text) if d.strip()]
    if not docs:
        docs = [xml_text]

    for i, doc in enumerate(docs):
        # Ensure declaration at start; if missing, add a minimal one
        cleaned = doc.lstrip()
        if not cleaned.startswith('<?xml'):
            cleaned = '<?xml version="1.0" encoding="UTF-8"?>\n' + cleaned

        name = f"{xml_file}_part_{i}" if len(docs) > 1 else xml_file
        rows.append({'file': name, 'xml': cleaned})

xml_docs_df = pd.DataFrame(rows)
print(f"Collected {len(xml_docs_df)} XML documents into xml_docs_df")
display(xml_docs_df.head())


In [None]:
from lxml import etree as LET
print("Imported LET from lxml.etree")


In [None]:
# Parse a single balancing XML into one DataFrame (metadata + all points)
from lxml import etree as LET
from pathlib import Path
import pandas as pd


def _lname(tag: str) -> str:
    return tag.split('}')[-1] if '}' in tag else tag


def parse_balancing_xml_to_df(xml_text: str) -> pd.DataFrame:
    root = LET.fromstring(xml_text.encode('utf-8'))

    # Document-level metadata (leaf nodes outside Point)
    meta = {}
    for el in root.iter():
        ln = _lname(el.tag)
        if ln == 'Point':
            continue
        # skip if any Point descendant
        if el.xpath('.//*[local-name()="Point"]'):
            continue
        if len(list(el)) == 0:
            val = (el.text or '').strip()
            if val:
                key = f"doc.{ln}"
                # keep first occurrence
                if key not in meta:
                    meta[key] = val
    # include root attributes
    for k, v in root.attrib.items():
        meta[f"doc.@{k}"] = v

    rows = []
    # Iterate TimeSeries → Period → Point
    for ts in root.xpath('.//*[local-name()="TimeSeries"]'):
        ts_meta = {}
        for child in ts:
            ln = _lname(child.tag)
            if ln in ('Period', 'timeInterval'):
                continue
            # capture simple leaf text under TS (one level)
            if len(list(child)) == 0:
                ts_meta[f"ts.{ln}"] = (child.text or '').strip()
        for per in ts.xpath('.//*[local-name()="Period"]'):
            # period context
            start = per.xpath('.//*[local-name()="timeInterval"]/*[local-name()="start"]/text()')
            end = per.xpath('.//*[local-name()="timeInterval"]/*[local-name()="end"]/text()')
            res = per.xpath('.//*[local-name()="resolution"]/text()')
            per_meta = {
                'per.start': start[0] if start else None,
                'per.end': end[0] if end else None,
                'per.resolution': res[0] if res else None,
            }
            for pt in per.xpath('.//*[local-name()="Point"]'):
                row = {}
                # Point fields (flatten one level deep)
                for ch in list(pt):
                    ln = _lname(ch.tag)
                    if len(list(ch)) == 0:
                        row[f"pt.{ln}"] = (ch.text or '').strip()
                    else:
                        # flatten nested elements under Point one level
                        for sub in ch:
                            row[f"pt.{ln}.{_lname(sub.tag)}"] = (sub.text or '').strip()
                # Merge: document meta + TS meta + period meta + point
                full = {**meta, **ts_meta, **per_meta, **row}
                rows.append(full)
    return pd.DataFrame(rows)

# Source XML: prefer first entry in zf/xml_files, else fallback to example file
if 'zf' in globals() and 'xml_files' in globals() and xml_files:
    xml_text_src = zf.read(xml_files[0]).decode('utf-8-sig', errors='replace')
else:
    example_path = Path('notebooks/ActivationPriceXML.txt')
    if not example_path.exists():
        example_path = Path('../notebooks/ActivationPriceXML.txt')
    xml_text_src = example_path.read_text(encoding='utf-8')

points_full_df = parse_balancing_xml_to_df(xml_text_src)
print(points_full_df.shape)
display(points_full_df.head())


In [None]:
# Parse all XML documents in xml_docs_df into one DataFrame (metadata + points)
import pandas as pd

if 'xml_docs_df' not in globals() or xml_docs_df.empty:
    raise RuntimeError('xml_docs_df is empty or not defined. Build it first.')

all_rows = []
for _, r in xml_docs_df.iterrows():
    try:
        dfi = parse_balancing_xml_to_df(str(r['xml']))
        dfi['file'] = r.get('file', None)
        all_rows.append(dfi)
    except Exception as e:
        print(f"Skip {r.get('file', '?')}: {e}")

points_full_df = pd.concat(all_rows, ignore_index=True) if all_rows else pd.DataFrame()
print(f"points_full_df combined: shape={points_full_df.shape}, columns={len(points_full_df.columns)}")
display(points_full_df.head())


In [None]:
points_full_df.columns

In [None]:
unique_counts = points_full_df.nunique(dropna=True).sort_values(ascending=False).to_frame("n_unique")
print(f"DataFrame shape: {unique_counts.shape}")
display(unique_counts)


In [None]:
points_full_df =  points_full_df[[
    "doc.process.processType",
    "ts.type_MarketAgreement.type",
    "ts.mktPSRType.psrType",
    "ts.flowDirection.direction",
    "per.start",
    "per.end",
    "per.resolution",
    "pt.position",
    "pt.quantity",
    "pt.procurement_Price.amount",
    "ts.currency_Unit.name",
]]


In [None]:
points_full_df

In [None]:
# parse UTC 'Z' strings
start_utc = pd.to_datetime(points_full_df['per.start'], utc=True, errors='coerce')
end_utc   = pd.to_datetime(points_full_df['per.end'],   utc=True, errors='coerce')

# DST-aware Hungary time (Europe/Budapest)
points_full_df['per.start_dt'] = start_utc.dt.tz_convert('Europe/Budapest')
points_full_df['per.end_dt']   = end_utc.dt.tz_convert('Europe/Budapest')

points_full_df['per.start_dt'] = start_utc.dt.tz_convert('Etc/GMT-2')
points_full_df['per.end_dt']   = end_utc.dt.tz_convert('Etc/GMT-2')

points_full_df.head()

In [None]:
# Start of the day-period in Europe/Budapest
start_dt = (
    pd.to_datetime(points_full_df['per.start'], utc=True, errors='coerce')
      .dt.tz_convert('Europe/Budapest')
)

# Parse ISO 8601 resolution (PT15M/PT30M/PT60M/PT1H...) -> minutes
res_ex = points_full_df['per.resolution'].str.upper().str.extract(r'PT(\d+)([HM])')
mins = pd.to_numeric(res_ex[0], errors='coerce')
mins = np.where(res_ex[1].eq('H'), mins * 60, mins)
mins = pd.to_numeric(mins, errors='coerce')

# Positions are 1-based; compute offset and exact time
pos = pd.to_numeric(points_full_df['pt.position'], errors='coerce')
offset_min = (pos - 1) * mins

points_full_df['time_dt'] = start_dt + pd.to_timedelta(offset_min, unit='m')
points_full_df['time_end_dt'] = points_full_df['time_dt'] + pd.to_timedelta(mins, unit='m')

points_full_df[['per.start','per.resolution','pt.position','time_dt','time_end_dt']].head()

In [None]:
points_full_df =  points_full_df[[
    'per.start_dt', 
    'per.end_dt' ,
    'time_dt',
    'time_end_dt',
    'per.resolution',
    'pt.position', 
    "doc.process.processType",
    "ts.type_MarketAgreement.type",
    "ts.mktPSRType.psrType",
    "ts.flowDirection.direction",
    "pt.quantity",
    "pt.procurement_Price.amount",
    "ts.currency_Unit.name",
]]

In [None]:
points_full_df.head()

In [None]:
unique_counts = points_full_df.nunique(dropna=True).sort_values(ascending=False).to_frame("n_unique")
print(f"DataFrame shape: {unique_counts.shape}")
display(unique_counts)


In [None]:
points_full_df_test = points_full_df[
    (points_full_df["doc.process.processType"] == "A51")
    & (points_full_df["ts.type_MarketAgreement.type"] == "A01")
    & (points_full_df["ts.mktPSRType.psrType"] == "A03")
    ] 
# aFrr and BESS & daily agreement

In [None]:
points_full_df_test.shape

In [None]:
points_full_df_test.groupby("ts.flowDirection.direction").count()

In [None]:
points_full_df_test.sort_values("time_dt", ascending=True).head(20)

In [None]:
unique_counts = points_full_df_test.nunique(dropna=True).sort_values(ascending=False).to_frame("n_unique")
print(f"DataFrame shape: {unique_counts.shape}")
display(unique_counts)
