In [1]:
# Imports & global config
import sys, warnings
from pathlib import Path
from collections import Counter
import numpy as np
import pandas as pd

warnings.filterwarnings("ignore", category=pd.errors.PerformanceWarning)

UTC = "UTC"
HALF_HOUR = pd.Timedelta(minutes=30)


In [3]:
# Define your data root here (adapt path if needed)
PROJECT_ROOT = Path().resolve().parent
PROCESSED = PROJECT_ROOT / "data" / "processed"

# Map tags to parquet files
FILES = {
    "INTRADAY": PROCESSED / "intraday_trades_raw.parquet",
    "IMBALANCE": PROCESSED / "imbalance_prices.parquet",
    "DEMAND": PROCESSED / "forecast_actual.parquet",
    "FORECAST": PROCESSED / "demand_forecast.parquet",
}
FILES


{'INTRADAY': WindowsPath('C:/Users/alexa/OneDrive/Desktop/GB-Power-Price-Diver-Spread-Radar/data/processed/intraday_trades_raw.parquet'),
 'IMBALANCE': WindowsPath('C:/Users/alexa/OneDrive/Desktop/GB-Power-Price-Diver-Spread-Radar/data/processed/imbalance_prices.parquet'),
 'DEMAND': WindowsPath('C:/Users/alexa/OneDrive/Desktop/GB-Power-Price-Diver-Spread-Radar/data/processed/forecast_actual.parquet'),
 'FORECAST': WindowsPath('C:/Users/alexa/OneDrive/Desktop/GB-Power-Price-Diver-Spread-Radar/data/processed/demand_forecast.parquet')}

In [4]:
# Check all files exist
for tag, path in FILES.items():
    assert path.exists(), f"❌ {tag} file missing: {path}"

# Load with basic datetime normalization
def load_parquet(path, tag):
    df = pd.read_parquet(path)
    if "datetime" not in df.columns:
        raise Exception(f"{tag}: 'datetime' column missing")
    df["datetime"] = pd.to_datetime(df["datetime"], utc=True, errors="coerce")
    return df

dfs = {tag: load_parquet(path, tag) for tag, path in FILES.items()}
dfs.keys()


dict_keys(['INTRADAY', 'IMBALANCE', 'DEMAND', 'FORECAST'])

In [5]:
# Print shape, columns, and date span for each dataframe
for tag, df in dfs.items():
    print(f"\n{'='*12} {tag} {'='*12}")
    print(f"Shape: {df.shape}")
    print(f"Columns: {list(df.columns)}")
    print(f"Date range: {df['datetime'].min()} to {df['datetime'].max()}")
    print(f"NaNs per column:\n{df.isna().sum()}")
    print(df.head(2))



Shape: (50108, 6)
Columns: ['Settlement Date', 'Settlement Period', 'Market Index Data Provider Id', 'Market Index Volume (MWh)', 'Market Index Price (£/MWh)', 'datetime']
Date range: 2024-01-01 00:00:00+00:00 to 2025-06-05 23:30:00+00:00
NaNs per column:
Settlement Date                  0
Settlement Period                0
Market Index Data Provider Id    0
Market Index Volume (MWh)        0
Market Index Price (£/MWh)       0
datetime                         0
dtype: int64
   Settlement Date  Settlement Period Market Index Data Provider Id  \
0  01 January 2024                  1                      APXMIDP    
1  01 January 2024                  1                      N2EXMIDP   

   Market Index Volume (MWh)  Market Index Price (£/MWh)  \
0                      664.4                       36.51   
1                        0.0                        0.00   

                   datetime  
0 2024-01-01 00:00:00+00:00  
1 2024-01-01 00:00:00+00:00  

Shape: (24767, 9)
Columns: ['Settl

In [6]:
# Check for missing half-hour slots per dataset
def missing_half_hours(df):
    rng = pd.date_range(df["datetime"].min(), df["datetime"].max(), freq=HALF_HOUR, tz=UTC)
    return sorted(set(rng) - set(df["datetime"]))

for tag, df in dfs.items():
    missing = missing_half_hours(df)
    print(f"{tag}: Missing half-hours: {len(missing)}")
    if missing: print("  Example missing:", missing[:3])


INTRADAY: Missing half-hours: 4
  Example missing: [Timestamp('2024-03-31 23:00:00+0000', tz='UTC'), Timestamp('2024-03-31 23:30:00+0000', tz='UTC'), Timestamp('2025-03-30 23:00:00+0000', tz='UTC')]
IMBALANCE: Missing half-hours: 4
  Example missing: [Timestamp('2024-03-31 23:00:00+0000', tz='UTC'), Timestamp('2024-03-31 23:30:00+0000', tz='UTC'), Timestamp('2025-03-30 23:00:00+0000', tz='UTC')]
DEMAND: Missing half-hours: 4
  Example missing: [Timestamp('2024-03-31 23:00:00+0000', tz='UTC'), Timestamp('2024-03-31 23:30:00+0000', tz='UTC'), Timestamp('2025-03-30 23:00:00+0000', tz='UTC')]
FORECAST: Missing half-hours: 3243
  Example missing: [Timestamp('2024-01-07 00:30:00+0000', tz='UTC'), Timestamp('2024-01-07 01:00:00+0000', tz='UTC'), Timestamp('2024-01-07 01:30:00+0000', tz='UTC')]


In [7]:
# Compute VWAP for INTRADAY (if price/volume exist)
def compute_vwap(df):
    price = next(c for c in df.columns if "price" in c.lower())
    vol = next(c for c in df.columns if "volume" in c.lower())
    vwap = (
        df.groupby("datetime", group_keys=False)[[price, vol]]
        .apply(lambda g: np.average(g[price], weights=g[vol]) if g[vol].sum() else np.nan)
        .rename("mip_price")
        .reset_index()
    )
    return df.drop_duplicates("datetime").merge(vwap, on="datetime", how="left")

dfs["INTRADAY"] = compute_vwap(dfs["INTRADAY"])


In [8]:
def add_suffix_except_datetime(df, suffix):
    cols = df.columns
    new_cols = [col if col == "datetime" else f"{col}_{suffix}" for col in cols]
    df = df.copy()
    df.columns = new_cols
    return df

for tag in dfs:
    dfs[tag] = add_suffix_except_datetime(dfs[tag], tag)
    print(f"{tag}: {list(dfs[tag].columns)}")


INTRADAY: ['Settlement Date_INTRADAY', 'Settlement Period_INTRADAY', 'Market Index Data Provider Id_INTRADAY', 'Market Index Volume (MWh)_INTRADAY', 'Market Index Price (£/MWh)_INTRADAY', 'datetime', 'mip_price_INTRADAY']
IMBALANCE: ['Settlement Date_IMBALANCE', 'Settlement Period_IMBALANCE', 'System Sell Price(GBP/MWh)_IMBALANCE', 'System Buy Price(GBP/MWh)_IMBALANCE', 'Net Imbalance Volume(MWh)_IMBALANCE', 'datetime', 'sbp_IMBALANCE', 'ssp_IMBALANCE', 'niv_IMBALANCE']
DEMAND: ['SETTLEMENT_DATE_DEMAND', 'SETTLEMENT_PERIOD_DEMAND', 'ND_DEMAND', 'TSD_DEMAND', 'ENGLAND_WALES_DEMAND_DEMAND', 'EMBEDDED_WIND_GENERATION_DEMAND', 'EMBEDDED_WIND_CAPACITY_DEMAND', 'EMBEDDED_SOLAR_GENERATION_DEMAND', 'EMBEDDED_SOLAR_CAPACITY_DEMAND', 'NON_BM_STOR_DEMAND', 'PUMP_STORAGE_PUMPING_DEMAND', 'SCOTTISH_TRANSFER_DEMAND', 'IFA_FLOW_DEMAND', 'IFA2_FLOW_DEMAND', 'BRITNED_FLOW_DEMAND', 'MOYLE_FLOW_DEMAND', 'EAST_WEST_FLOW_DEMAND', 'NEMO_FLOW_DEMAND', 'NSL_FLOW_DEMAND', 'ELECLINK_FLOW_DEMAND', 'VIKING_FLOW_D

In [9]:
start = pd.Timestamp("2024-01-01T00:00:00Z")
end   = pd.Timestamp("2025-05-01T23:30:00Z")
for tag in dfs:
    before = len(dfs[tag])
    df = dfs[tag]
    # Find the datetime column (now named 'datetime_{TAG}')
    dt_col = [col for col in df.columns if col.startswith("datetime")][0]
    dfs[tag] = df[(df[dt_col] >= start) & (df[dt_col] <= end)]
    print(f"{tag}: {before} → {len(dfs[tag])} after date filter")


INTRADAY: 25052 → 23372 after date filter
IMBALANCE: 24767 → 23374 after date filter
DEMAND: 24238 → 23374 after date filter
FORECAST: 20086 → 20086 after date filter


In [10]:
# Identify datetime columns (now named as 'datetime_{TAG}') and set index
for tag in dfs:
    dt_col = [col for col in dfs[tag].columns if col.startswith("datetime")][0]
    dfs[tag] = (
        dfs[tag]
        .sort_values(dt_col)
        .drop_duplicates(dt_col)
        .set_index(dt_col)
    )
    print(f"{tag}: index set to {dt_col} ({dfs[tag].shape})")


INTRADAY: index set to datetime ((23372, 6))
IMBALANCE: index set to datetime ((23372, 8))
DEMAND: index set to datetime ((23372, 22))
FORECAST: index set to datetime ((20086, 7))


In [11]:
# Outer join using the index (datetime from each, different column names)
from functools import reduce

# Start with DEMAND and join in order: INTRADAY, IMBALANCE, FORECAST
ordered_tags = ['DEMAND', 'INTRADAY', 'IMBALANCE', 'FORECAST']
dfs_ordered = [dfs[tag] for tag in ordered_tags]

merged = reduce(lambda left, right: left.join(right, how="outer"), dfs_ordered)
merged = merged.reset_index()
# After merging, all original datetime columns are present, pick the first for canonical use
merged = merged.rename(columns={merged.columns[0]: 'datetime'})
print("Merged columns:", list(merged.columns))


Merged columns: ['datetime', 'SETTLEMENT_DATE_DEMAND', 'SETTLEMENT_PERIOD_DEMAND', 'ND_DEMAND', 'TSD_DEMAND', 'ENGLAND_WALES_DEMAND_DEMAND', 'EMBEDDED_WIND_GENERATION_DEMAND', 'EMBEDDED_WIND_CAPACITY_DEMAND', 'EMBEDDED_SOLAR_GENERATION_DEMAND', 'EMBEDDED_SOLAR_CAPACITY_DEMAND', 'NON_BM_STOR_DEMAND', 'PUMP_STORAGE_PUMPING_DEMAND', 'SCOTTISH_TRANSFER_DEMAND', 'IFA_FLOW_DEMAND', 'IFA2_FLOW_DEMAND', 'BRITNED_FLOW_DEMAND', 'MOYLE_FLOW_DEMAND', 'EAST_WEST_FLOW_DEMAND', 'NEMO_FLOW_DEMAND', 'NSL_FLOW_DEMAND', 'ELECLINK_FLOW_DEMAND', 'VIKING_FLOW_DEMAND', 'GREENLINK_FLOW_DEMAND', 'Settlement Date_INTRADAY', 'Settlement Period_INTRADAY', 'Market Index Data Provider Id_INTRADAY', 'Market Index Volume (MWh)_INTRADAY', 'Market Index Price (£/MWh)_INTRADAY', 'mip_price_INTRADAY', 'Settlement Date_IMBALANCE', 'Settlement Period_IMBALANCE', 'System Sell Price(GBP/MWh)_IMBALANCE', 'System Buy Price(GBP/MWh)_IMBALANCE', 'Net Imbalance Volume(MWh)_IMBALANCE', 'sbp_IMBALANCE', 'ssp_IMBALANCE', 'niv_IMBALA

In [12]:
print("Merged shape:", merged.shape)
print("Date range:", merged['datetime'].min(), "→", merged['datetime'].max())
print("Total columns:", len(merged.columns))
print("NaN counts (top 15):\n", merged.isna().sum().sort_values(ascending=False).head(15))
print("Rows with only datetime non-NaN:", (merged.drop(columns="datetime").isna().all(axis=1)).sum())
print("Unique datetimes:", merged["datetime"].nunique())
if merged["datetime"].duplicated().any():
    print("⚠️ Duplicate datetimes found!")
else:
    print("No duplicate datetimes in merged data.")
display(merged.head())


Merged shape: (23372, 44)
Date range: 2024-01-01 00:00:00+00:00 → 2025-05-01 23:30:00+00:00
Total columns: 44
NaN counts (top 15):
 nationalDemand_FORECAST              3539
publishTime_FORECAST                 3286
transmissionSystemDemand_FORECAST    3286
settlementPeriod_FORECAST            3286
boundary_FORECAST                    3286
startTime_FORECAST                   3286
settlementDate_FORECAST              3286
mip_price_INTRADAY                     95
datetime                                0
SETTLEMENT_DATE_DEMAND                  0
SETTLEMENT_PERIOD_DEMAND                0
ND_DEMAND                               0
PUMP_STORAGE_PUMPING_DEMAND             0
NON_BM_STOR_DEMAND                      0
EMBEDDED_SOLAR_CAPACITY_DEMAND          0
dtype: int64
Rows with only datetime non-NaN: 0
Unique datetimes: 23372
No duplicate datetimes in merged data.


Unnamed: 0,datetime,SETTLEMENT_DATE_DEMAND,SETTLEMENT_PERIOD_DEMAND,ND_DEMAND,TSD_DEMAND,ENGLAND_WALES_DEMAND_DEMAND,EMBEDDED_WIND_GENERATION_DEMAND,EMBEDDED_WIND_CAPACITY_DEMAND,EMBEDDED_SOLAR_GENERATION_DEMAND,EMBEDDED_SOLAR_CAPACITY_DEMAND,...,sbp_IMBALANCE,ssp_IMBALANCE,niv_IMBALANCE,startTime_FORECAST,settlementDate_FORECAST,settlementPeriod_FORECAST,boundary_FORECAST,publishTime_FORECAST,transmissionSystemDemand_FORECAST,nationalDemand_FORECAST
0,2024-01-01 00:00:00+00:00,2024-01-01,1,21783,23466,19539,2804,6488,0,16793,...,90.0,90.0,103.352,2024-01-01T00:00:00Z,2024-01-01,1.0,N,2023-12-31T23:46:00Z,24398.0,22500.0
1,2024-01-01 00:30:00+00:00,2024-01-01,2,22521,24103,20286,2834,6488,0,16793,...,100.0,100.0,197.535,2024-01-01T00:30:00Z,2024-01-01,2.0,N,2024-01-01T00:16:00Z,24787.0,23100.0
2,2024-01-01 01:00:00+00:00,2024-01-01,3,22194,24754,20070,2868,6488,0,16793,...,57.43,57.43,458.259,2024-01-01T01:00:00Z,2024-01-01,3.0,N,2024-01-01T00:46:00Z,25341.0,22663.0
3,2024-01-01 01:30:00+00:00,2024-01-01,4,21510,24505,19424,2901,6488,0,16793,...,129.83887,129.83887,492.023,2024-01-01T01:30:00Z,2024-01-01,4.0,N,2024-01-01T01:16:00Z,24845.0,21851.0
4,2024-01-01 02:00:00+00:00,2024-01-01,5,20619,23977,18674,2933,6488,0,16793,...,110.0,110.0,407.514,2024-01-01T02:00:00Z,2024-01-01,5.0,N,2024-01-01T01:46:00Z,24532.0,20993.0


In [13]:
# Data Profiling and Coverage Reporting (No Plots)

# For each dataset, get set of unique datetimes (now the index)
sets = {tag: set(dfs[tag].index) for tag in dfs}

# Overlap: timestamps present in all datasets
common_times = set.intersection(*sets.values())
print(f"Common timestamps across ALL datasets: {len(common_times)}")

# Timestamps unique to each dataset
for tag in dfs:
    others = set.union(*(sets[t] for t in dfs if t != tag))
    only_in = sets[tag] - others
    print(f"Timestamps only in {tag}: {len(only_in)}")

# Timestamps missing from each dataset (relative to full merged range)
all_times = set(merged['datetime'])
for tag in dfs:
    missing = all_times - sets[tag]
    print(f"Timestamps in merged data but missing from {tag}: {len(missing)}")

# High-level summary
print("\nMerged Data Overview:")
print(f"Total rows: {len(merged)}")
print(f"Total columns: {len(merged.columns)}")
print(f"Datetime range: {merged['datetime'].min()} to {merged['datetime'].max()}")
print(f"Rows with all NaN except datetime: {(merged.drop(columns='datetime').isna().all(axis=1)).sum()}")

# Top 20 columns with most NaNs
nan_counts = merged.isna().sum().sort_values(ascending=False).head(20)
print("\nTop 20 columns with most NaNs:")
print(nan_counts)


Common timestamps across ALL datasets: 20086
Timestamps only in INTRADAY: 0
Timestamps only in IMBALANCE: 0
Timestamps only in DEMAND: 0
Timestamps only in FORECAST: 0
Timestamps in merged data but missing from INTRADAY: 0
Timestamps in merged data but missing from IMBALANCE: 0
Timestamps in merged data but missing from DEMAND: 0
Timestamps in merged data but missing from FORECAST: 3286

Merged Data Overview:
Total rows: 23372
Total columns: 44
Datetime range: 2024-01-01 00:00:00+00:00 to 2025-05-01 23:30:00+00:00
Rows with all NaN except datetime: 0

Top 20 columns with most NaNs:
nationalDemand_FORECAST              3539
publishTime_FORECAST                 3286
transmissionSystemDemand_FORECAST    3286
settlementPeriod_FORECAST            3286
boundary_FORECAST                    3286
startTime_FORECAST                   3286
settlementDate_FORECAST              3286
mip_price_INTRADAY                     95
datetime                                0
SETTLEMENT_DATE_DEMAND           

### Sunday forecast gaps (ESO reset window)

For ~02:30–03:00 UTC every Sunday the ESO demand-forecast engine is re-initialised.  
During this window the API still serves a “latest” revision, but the payload omits  
`nationalDemand` (and occasionally `transmissionSystemDemand`).  
Our collector keeps only the last revision per period, so ~3 243 Sunday half-hours
arrive with **NaNs** in the forecast columns.

Historical revisions are not publicly archived, so these values **cannot be back-filled**.


### Legitimate DST gaps (spring clock-forward)

Two half-hours are **missing every year** on the last Sunday in March.  
The GB electricity market timestamps settlement periods in **local time (GMT/BST)**.  
When the clock moves forward to BST the local day contains only **46 settlement periods**.  
When converted to UTC, the “lost” periods appear as  
`23:00 UTC` and `23:30 UTC` on the same date:

| Year | Missing UTC half-hours |
|------|-----------------------|
| 2024 | 2024-03-31 23:00, 23:30 |
| 2025 | 2025-03-30 23:00, 23:30 |

These gaps are **real market behaviour** – no dataset should be expected to contain them.


### Creating a quality flag and limited forward-fill

For full transparency, we create a `FORECAST_MISSING` flag for every row where forecast data is missing.  
To improve continuity for downstream analytics and models, we forward-fill missing forecast values for up to 4 consecutive half-hours (2 hours).  
This reflects standard operator practice in control rooms during short data gaps.

Longer gaps remain `NaN` and are flagged for transparency.


In [14]:
# ---- Create FORECAST_MISSING flag and apply capped forward-fill ----
forecast_cols = ['transmissionSystemDemand_FORECAST', 'nationalDemand_FORECAST']
merged['FORECAST_MISSING'] = merged[forecast_cols].isna().any(axis=1)

# Forward-fill forecast columns up to 4 half-hours (2 hours)
merged[forecast_cols] = merged[forecast_cols].ffill(limit=4)

# Diagnostics: print remaining NaNs in important columns
important_missing = {
    'nationalDemand_FORECAST': merged['nationalDemand_FORECAST'].isna().sum(),
    'transmissionSystemDemand_FORECAST': merged['transmissionSystemDemand_FORECAST'].isna().sum(),
    'mip_price_INTRADAY': merged['mip_price_INTRADAY'].isna().sum()
}
print("Added FORECAST_MISSING flag and forward-filled up to 4 periods for forecast columns.")
print("Remaining NaNs in important columns:", important_missing)


Added FORECAST_MISSING flag and forward-filled up to 4 periods for forecast columns.
Remaining NaNs in important columns: {'nationalDemand_FORECAST': np.int64(3008), 'transmissionSystemDemand_FORECAST': np.int64(3006), 'mip_price_INTRADAY': np.int64(95)}


### Creating and saving two final datasets: full (with NaNs) and clean (no NaNs in key columns)

For maximum flexibility and professionalism:
- **Full dataset**: keeps all NaNs, providing a complete audit trail.  
- **Clean dataset**: drops any rows with missing values in essential columns, ready for ML or statistical modeling.

Both versions are saved for different downstream tasks and to meet best practices.


In [15]:
# ---- Define core columns to require in the clean dataset ----
core_cols = [
    'transmissionSystemDemand_FORECAST',
    'nationalDemand_FORECAST',
    'mip_price_INTRADAY'
    # Add other critical columns as needed, e.g., key demand, price, imbalance cols
]

# Clean dataset: drop rows with any NaN in key columns
clean_df = merged.dropna(subset=core_cols, how='any').copy()

print(f"Clean dataset created: {clean_df.shape[0]} rows, {clean_df.shape[1]} columns")


Clean dataset created: 20327 rows, 45 columns


In [16]:
# ---- Save both datasets ----
output_with_nans = PROCESSED / "final_merged_with_NaNs.parquet"
output_clean     = PROCESSED / "final_merged_clean.parquet"

merged.to_parquet(output_with_nans, index=False)
clean_df.to_parquet(output_clean, index=False)

print("✅ Both final datasets saved:")
print(f"  • With NaNs: {output_with_nans}")
print(f"  • Clean (no NaNs in key cols): {output_clean}")


✅ Both final datasets saved:
  • With NaNs: C:\Users\alexa\OneDrive\Desktop\GB-Power-Price-Diver-Spread-Radar\data\processed\final_merged_with_NaNs.parquet
  • Clean (no NaNs in key cols): C:\Users\alexa\OneDrive\Desktop\GB-Power-Price-Diver-Spread-Radar\data\processed\final_merged_clean.parquet
