# Advanced Statistical Analysis for UK Energy Data (July 2025)

This notebook performs a comprehensive statistical analysis of UK energy data for July 2025, loaded from Google BigQuery. The analysis includes:
- T-tests and ANOVA for price comparisons.
- Correlation analysis between key metrics.
- Regression models for price drivers.
- Time series forecasting using SARIMAX.
- Event impact analysis.

Each step is contained in a separate cell for clarity and modular execution.

In [4]:
# =========================
# ======== CONFIG =========
# =========================
PROJECT_ID = "jibber-jabber-knowledge"
LOCATION = "europe-west2"
DATASET_SOURCE = "uk_energy"
DATASET_ANALYTICS = "uk_energy_analysis"
GCS_BUCKET = "jibber-jabber-knowledge-bmrs-data"
DATE_START = "2025-07-01"
DATE_END = "2025-07-31"

print("Configuration loaded.")

# =========================
# ======== IMPORTS ========
# =========================
import pathlib
from datetime import datetime
import numpy as np
import pandas as pd
from scipy import stats
import statsmodels.api as sm
from statsmodels.tsa.seasonal import seasonal_decompose
from statsmodels.tsa.statespace.sarimax import SARIMAX
from google.cloud import bigquery
from pandas_gbq import to_gbq
import warnings

# Optional: GCS upload for plots
try:
    from google.cloud import storage as gcs_storage
    HAS_GCS = True
except ImportError:
    HAS_GCS = False

# Suppress future warnings for cleaner output
warnings.simplefilter(action='ignore', category=FutureWarning)

import matplotlib
import matplotlib.pyplot as plt
# Use 'inline' for notebook display
%matplotlib inline

print("Libraries imported.")

Configuration loaded.


ModuleNotFoundError: No module named 'scipy'

In [None]:
# =========================
# ====== UTILITIES ========
# =========================
OUTDIR = pathlib.Path("./output")
OUTDIR.mkdir(exist_ok=True)

def save_plot(fig, fname: str):
    """Save plot locally or to GCS if configured."""
    local_path = OUTDIR / fname
    fig.savefig(local_path, bbox_inches="tight", dpi=150)
    # No need to close the figure in a notebook if we want to display it
    # plt.close(fig) 
    if GCS_BUCKET and HAS_GCS:
        try:
            client = gcs_storage.Client(project=PROJECT_ID)
            bucket = client.bucket(GCS_BUCKET)
            blob = bucket.blob(f"output/{fname}")
            blob.upload_from_filename(str(local_path))
            print(f"[GCS] Uploaded gs://{GCS_BUCKET}/output/{fname}")
            return f"gs://{GCS_BUCKET}/output/{fname}"
        except Exception as e:
            print(f"[WARN] GCS upload failed: {e}")
    elif GCS_BUCKET and not HAS_GCS:
        print("[WARN] GCS_BUCKET set but google-cloud-storage not installed; saved locally.")
    return str(local_path)

def write_bq(df: pd.DataFrame, table_name: str, if_exists="replace"):
    """Write a DataFrame to BigQuery."""
    full_table = f"{PROJECT_ID}.{DATASET_ANALYTICS}.{table_name}"
    if df.empty:
        print(f"[INFO] {table_name}: DataFrame is empty; skipping write.")
        return
    try:
        to_gbq(df, full_table, project_id=PROJECT_ID, if_exists=if_exists, location=LOCATION)
        print(f"[BQ] Wrote {len(df):,} rows to {full_table} (if_exists={if_exists})")
    except Exception as e:
        print(f"[ERROR] Failed to write to BigQuery table {full_table}: {e}")


def add_calendar_fields(df: pd.DataFrame, ts_col: str):
    ts = pd.to_datetime(df[ts_col])
    df["date"] = ts.dt.date
    df["month"] = ts.dt.month
    df["dow"] = ts.dt.dayofweek
    df["is_weekend"] = df["dow"] >= 5
    def season(m):
        return "Winter" if m in (12,1,2) else "Spring" if m in (3,4,5) else "Summer" if m in (6,7,8) else "Autumn"
    df["season"] = df["month"].apply(season)
    return df

print("Utility functions defined.")

In [None]:
# =========================
# ======== LOADING ========
# =========================
def _table_exists(client: bigquery.Client, fq_table: str) -> bool:
    try:
        client.get_table(fq_table)
        return True
    except Exception:
        return False

def load_data() -> pd.DataFrame:
    """
    Load and harmonize series from uk_energy dataset for the configured date range.
    """
    client = bigquery.Client(project=PROJECT_ID, location=LOCATION)

    # This simplified query joins the essential tables for the analysis.
    # It assumes tables are named like 'elexon_demand_outturn_2025_07'
    # A more robust version would use wildcards, but this is specific to July 2025.
    
    # For simplicity, we will focus on a few key tables first.
    # This query joins demand, bid-offer acceptances (for price), and system warnings.
    
    sql = f"""
    WITH base_grid AS (
        -- Create a complete 30-minute grid for July 2025
        SELECT ts
        FROM UNNEST(GENERATE_TIMESTAMP_ARRAY('2025-07-01 00:00:00', '2025-07-31 23:30:00', INTERVAL 30 MINUTE)) AS ts
    ),
    demand AS (
        SELECT
            TIMESTAMP_TRUNC(timeFrom, MINUTE) as ts,
            AVG(nationalDemand) as volume
        FROM `jibber-jabber-knowledge.uk_energy.elexon_demand_outturn_2025_07`
        GROUP BY 1
    ),
    prices AS (
        SELECT
            TIMESTAMP_TRUNC(timeFrom, MINUTE) as ts,
            -- Simplified proxy for SSP/SBP
            AVG(CASE WHEN bidOfferLevel > 0 THEN bidOfferLevel ELSE NULL END) as SBP,
            AVG(CASE WHEN bidOfferLevel < 0 THEN bidOfferLevel ELSE NULL END) as SSP
        FROM `jibber-jabber-knowledge.uk_energy.elexon_boalf_2025_07`
        GROUP BY 1
    ),
    warnings AS (
        SELECT
            TIMESTAMP_TRUNC(timeFrom, MINUTE) as ts,
            TRUE as unplanned_event
        FROM `jibber-jabber-knowledge.uk_energy.elexon_system_warnings_2025_07`
        GROUP BY 1
    )
    SELECT
        bg.ts,
        d.volume,
        p.SSP,
        p.SBP,
        (p.SBP - p.SSP) as spread,
        w.unplanned_event
    FROM base_grid bg
    LEFT JOIN demand d ON bg.ts = d.ts
    LEFT JOIN prices p ON bg.ts = p.ts
    LEFT JOIN warnings w ON bg.ts = w.ts
    ORDER BY bg.ts
    """

    print("Executing BigQuery query...")
    df = client.query(sql).to_dataframe(create_bqstorage_client=True)
    
    if df.empty:
        raise RuntimeError("No data returned from BigQuery for the specified date range.")
        
    # Post-processing
    df['unplanned_event'] = df['unplanned_event'].fillna(False)
    df = df.interpolate(method='linear', limit_direction='forward', axis=0)
    df = df.dropna(subset=["SSP", "SBP"], how="all")
    df = add_calendar_fields(df, "ts")
    
    return df

print("Data loading function defined.")

In [None]:
# =========================
# ===== EXECUTE LOAD ======
# =========================
print(f"[{datetime.utcnow().isoformat()}] Loading harmonized data from BigQuery uk_energy…")
try:
    df = load_data()
    print(f"[INFO] Loaded {len(df):,} rows")
    print("Data preview:")
    display(df.head())
    print("\nData Info:")
    df.info()
except Exception as e:
    print(f"[ERROR] Data loading failed: {e}")

In [None]:
# =========================
# ====== ARIMA MODEL ======
# =========================
def arima_ssp(df: pd.DataFrame):
    if "SSP" not in df.columns:
        print("[WARN] SSP column not found.")
        return pd.DataFrame(), None
        
    d = df[["ts","SSP"]].dropna().copy()
    if d.empty:
        print("[WARN] No SSP data available for ARIMA.")
        return pd.DataFrame(), None
        
    d["ts"] = pd.to_datetime(d["ts"])
    d = d.set_index("ts").sort_index()
    y = d["SSP"].asfreq("30min").interpolate(limit_direction="both")
    
    # Ensure we have enough data for weekly seasonality
    if y.size < 48*14:
        print("[WARN] Not enough data for stable weekly seasonality ARIMA (need at least 2 weeks).")
        return pd.DataFrame(), None
        
    season = 48 * 7  # 30-min periods in a week
    order = (1,1,1)
    seasonal_order = (1,1,1,season)
    
    print("Fitting SARIMAX model... (this may take a moment)")
    try:
        model = SARIMAX(y, order=order, seasonal_order=seasonal_order,
                        enforce_stationarity=False, enforce_invertibility=False)
        res = model.fit(disp=False)
        print("Model fitting complete.")
    except Exception as e:
        print(f"[WARN] ARIMA failed: {e}")
        return pd.DataFrame(), None
        
    steps = 48 * 3 # 3 days ahead
    fc = res.get_forecast(steps=steps)
    pred = fc.predicted_mean
    ci = fc.conf_int(alpha=0.05)
    
    out = pd.DataFrame({
        "ts": pred.index,
        "forecast_ssp": pred.values,
        "ci_lo": ci.iloc[:,0].values,
        "ci_hi": ci.iloc[:,1].values
    }).reset_index(drop=True)
    
    fig, ax = plt.subplots(figsize=(15,7))
    y.tail(48*7).plot(ax=ax, label="Historical SSP (Last 7 days)")
    pred.plot(ax=ax, label="Forecasted SSP", linestyle='--')
    ax.fill_between(out["ts"], out["ci_lo"], out["ci_hi"], color='k', alpha=0.1, label="95% Confidence Interval")
    ax.set_title(f"SSP ARIMA Forecast (Next {steps//48} days)")
    ax.set_xlabel("Time (UTC)")
    ax.set_ylabel("SSP (£/MWh)")
    ax.legend()
    ax.grid(True, which='both', linestyle='--', linewidth=0.5)
    
    path = save_plot(fig, "arima_ssp_forecast_july2025.png")
    out["plot_path"] = path
    out["aic"] = res.aic
    out["bic"] = res.bic
    out["order"] = str(order)
    out["seasonal_order"] = str(seasonal_order)
    
    return out, fig

# Run the analysis
if 'df' in locals() and not df.empty:
    arima_df, arima_plot = arima_ssp(df)
    if not arima_df.empty:
        print("\nARIMA Forecast Results:")
        display(arima_df.head())
        write_bq(arima_df, "arima_forecast_ssp_july2025")
        plt.show()
else:
    print("DataFrame 'df' not available. Please run the data loading cell first.")

In [None]:
# =========================
# ====== T-TESTS ==========
# =========================
def ttest_ssp_sbp(df: pd.DataFrame):
    if not {"SSP","SBP"}.issubset(df.columns):
        return pd.DataFrame()
    a = df["SSP"].astype(float).values
    b = df["SBP"].astype(float).values
    # Only keep finite values
    a = a[np.isfinite(a)]
    b = b[np.isfinite(b)]
    if len(a) < 3 or len(b) < 3:
        return pd.DataFrame()
    tstat, pval = stats.ttest_ind(a, b, equal_var=False, nan_policy="omit")
    n1, n2 = len(a), len(b)
    m1, m2 = np.nanmean(a), np.nanmean(b)
    s1, s2 = np.nanstd(a, ddof=1), np.nanstd(b, ddof=1)
    # Welch-Satterthwaite df
    df_denom = (s1**2/n1 + s2**2/n2)**2
    df_num = (s1**2/n1)**2/(n1-1) + (s2**2/n2)**2/(n2-1)
    dof = df_denom/df_num if df_num > 0 else np.nan
    mean_diff = m2 - m1
    se = np.sqrt(s1**2/n1 + s2**2/n2)
    ci_lo = mean_diff - 1.96*se
    ci_hi = mean_diff + 1.96*se
    return pd.DataFrame([{
        "metric": "SSP_vs_SBP",
        "t_stat": tstat,
        "p_value": pval,
        "mean_SSP": m1,
        "mean_SBP": m2,
        "mean_diff": mean_diff,
        "ci_95_lo": ci_lo,
        "ci_95_hi": ci_hi,
        "dof": dof,
        "n_SSP": n1,
        "n_SBP": n2
    }])

if 'df' in locals() and not df.empty:
    ttest_df = ttest_ssp_sbp(df)
    if not ttest_df.empty:
        print("T-test Results (SSP vs SBP):")
        display(ttest_df)
        write_bq(ttest_df, "ttest_results_july2025")
else:
    print("DataFrame 'df' not available.")

In [None]:
# =========================
# ======== ANOVA ==========
# =========================
def anova_by_season(df: pd.DataFrame, price_col: str = "SSP"):
    if price_col not in df.columns:
        return pd.DataFrame()
    groups = [g[price_col].dropna().values for _, g in df.groupby("season")]
    groups = [x for x in groups if len(x) > 1]
    if len(groups) < 2:
        return pd.DataFrame()
    fstat, pval = stats.f_oneway(*groups)
    out = pd.DataFrame([{
        "price_col": price_col,
        "f_stat": fstat,
        "p_value": pval,
        "n_groups": len(groups),
        "group_names": list(df.groupby("season").groups.keys()),
        "group_sizes": [len(x) for x in groups],
        "group_means": [float(np.nanmean(x)) for x in groups],
    }])
    return out

if 'df' in locals() and not df.empty:
    anova_ssp = anova_by_season(df, "SSP")
    anova_sbp = anova_by_season(df, "SBP")
    anova_all = pd.concat([anova_ssp, anova_sbp], ignore_index=True) if not anova_ssp.empty or not anova_sbp.empty else pd.DataFrame()
    if not anova_all.empty:
        print("ANOVA Results by Season:")
        display(anova_all)
        write_bq(anova_all, "anova_results_july2025")
else:
    print("DataFrame 'df' not available.")

In [None]:
# =========================
# ====== CORRELATION ======
# =========================
def correlation_matrix(df: pd.DataFrame):
    cols = [c for c in [
        "SSP","SBP","spread","volume"
    ] if c in df.columns]
    if not cols:
        return pd.DataFrame()
    cm = df[cols].corr(method="pearson")
    cm = cm.reset_index().rename(columns={"index": "variable"})
    return cm

def correlation_heatmap(df_corr: pd.DataFrame):
    """Save a correlation heatmap (matplotlib)."""
    if df_corr.empty:
        return "", None
    
    # Set the 'variable' column as the index for plotting
    df_plot = df_corr.set_index('variable')
    
    fig, ax = plt.subplots(figsize=(8, 6))
    im = ax.imshow(df_plot.values, cmap='coolwarm', aspect="auto")
    
    # Set ticks and labels
    ax.set_xticks(np.arange(len(df_plot.columns)))
    ax.set_yticks(np.arange(len(df_plot.index)))
    ax.set_xticklabels(df_plot.columns, rotation=45, ha="right")
    ax.set_yticklabels(df_plot.index)
    
    # Loop over data dimensions and create text annotations.
    for i in range(len(df_plot.index)):
        for j in range(len(df_plot.columns)):
            text = ax.text(j, i, f"{df_plot.iloc[i, j]:.2f}",
                           ha="center", va="center", color="w")
            
    ax.set_title("Correlation Matrix (Pearson)")
    fig.tight_layout()
    path = save_plot(fig, "correlation_matrix_july2025.png")
    return path, fig

if 'df' in locals() and not df.empty:
    corr_df = correlation_matrix(df)
    if not corr_df.empty:
        print("Correlation Matrix:")
        display(corr_df)
        write_bq(corr_df, "correlation_matrix_july2025")
        
        heatmap_path, heatmap_fig = correlation_heatmap(corr_df)
        if heatmap_path:
            print(f"[PLOT] Correlation heatmap -> {heatmap_path}")
            plt.show()
else:
    print("DataFrame 'df' not available.")

In [None]:
# =========================
# ====== REGRESSION 1 =====
# =========================
def regression_temperature_ssp(df: pd.DataFrame):
    # This function requires a 'temperature' column which is not in the simplified loader.
    # We will skip this for now. A more advanced version would join temperature data.
    if not {"SSP","temperature"}.issubset(df.columns):
        print("[INFO] Skipping Temperature vs SSP regression: 'temperature' column not available in the loaded data.")
        return pd.DataFrame(), None
    d = df[["SSP", "temperature"]].dropna()
    if len(d) < 10:
        return pd.DataFrame(), None
    X = sm.add_constant(d["temperature"].astype(float))
    y = d["SSP"].astype(float)
    model = sm.OLS(y, X).fit()
    summary = {
        "model": "OLS_SSP_on_Temperature",
        "n_obs": int(model.nobs),
        "r_squared": float(model.rsquared),
        "adj_r_squared": float(model.rsquared_adj),
        "intercept": float(model.params.get("const", np.nan)),
        "slope_temperature": float(model.params.get("temperature", np.nan)),
        "p_intercept": float(model.pvalues.get("const", np.nan)),
        "p_temperature": float(model.pvalues.get("temperature", np.nan))
    }
    # Plot
    fig, ax = plt.subplots(figsize=(7,5))
    ax.scatter(d["temperature"], d["SSP"], s=8, alpha=0.5)
    xgrid = np.linspace(d["temperature"].min(), d["temperature"].max(), 200)
    yhat = summary["intercept"] + summary["slope_temperature"] * xgrid
    ax.plot(xgrid, yhat, linewidth=2)
    ax.set_xlabel("Temperature (°C)")
    ax.set_ylabel("SSP (£/MWh)")
    ax.set_title("Temperature vs SSP (OLS)")
    path = save_plot(fig, "reg_temperature_ssp_july2025.png")
    summary["plot_path"] = path
    return pd.DataFrame([summary]), fig

if 'df' in locals() and not df.empty:
    reg_temp_df, reg_temp_plot = regression_temperature_ssp(df)
    if not reg_temp_df.empty:
        print("Regression Results (Temperature vs SSP):")
        display(reg_temp_df)
        write_bq(reg_temp_df, "regression_temperature_ssp_july2025")
        plt.show()
else:
    print("DataFrame 'df' not available.")

In [None]:
# =========================
# ====== REGRESSION 2 =====
# =========================
def regression_volume_price(df: pd.DataFrame):
    # Price elasticity proxy: SSP ~ log(volume)
    need = {"SSP","volume"}
    if not need.issubset(df.columns):
        return pd.DataFrame()
    d = df[["SSP","volume"]].dropna()
    d = d[d["volume"] > 0]
    if len(d) < 50:
        return pd.DataFrame()
    d = d.copy()
    d["log_volume"] = np.log(d["volume"])
    X = sm.add_constant(d["log_volume"].astype(float))
    y = d["SSP"].astype(float)
    model = sm.OLS(y, X).fit()
    return pd.DataFrame([{
        "model": "OLS_SSP_on_logVolume",
        "n_obs": int(model.nobs),
        "r_squared": float(model.rsquared),
        "adj_r_squared": float(model.rsquared_adj),
        "intercept": float(model.params.get("const", np.nan)),
        "beta_log_volume": float(model.params.get("log_volume", np.nan)),
        "p_log_volume": float(model.pvalues.get("log_volume", np.nan)),
    }])

if 'df' in locals() and not df.empty:
    reg_vol_df = regression_volume_price(df)
    if not reg_vol_df.empty:
        print("Regression Results (Volume vs Price):")
        display(reg_vol_df)
        write_bq(reg_vol_df, "regression_volume_price_july2025")
else:
    print("DataFrame 'df' not available.")

In [None]:
# =========================
# ==== DECOMPOSITION ======
# =========================
def seasonal_decomp(df: pd.DataFrame):
    if "SSP" not in df.columns:
        return pd.DataFrame(), None
    d = df[["ts","SSP"]].dropna().copy()
    d["ts"] = pd.to_datetime(d["ts"])
    d = d.set_index("ts").sort_index()
    y = d["SSP"].asfreq("30min").interpolate(limit_direction="both")
    period = 48 * 7 # Weekly
    try:
        decomp = seasonal_decompose(y, model="additive", period=period, two_sided=False, extrapolate_trend="freq")
    except Exception as e:
        print(f"[WARN] Seasonal decomposition failed: {e}")
        return pd.DataFrame(), None
    fig, axes = plt.subplots(4, 1, figsize=(12, 8), sharex=True)
    axes[0].plot(y); axes[0].set_title("Observed")
    axes[1].plot(decomp.trend); axes[1].set_title("Trend")
    axes[2].plot(decomp.seasonal); axes[2].set_title("Seasonal")
    axes[3].plot(decomp.resid); axes[3].set_title("Residual")
    fig.tight_layout()
    path = save_plot(fig, "seasonal_decomposition_ssp_july2025.png")
    stats_out = pd.DataFrame([{
        "period": int(period),
        "obs_count": int(len(y)),
        "trend_var": float(np.nanvar(decomp.trend.values)),
        "seasonal_var": float(np.nanvar(decomp.seasonal.values)),
        "resid_var": float(np.nanvar(decomp.resid.values)),
        "plot_path": path
    }])
    return stats_out, fig

if 'df' in locals() and not df.empty:
    decomp_df, decomp_plot = seasonal_decomp(df)
    if not decomp_df.empty:
        print("Seasonal Decomposition Stats:")
        display(decomp_df)
        write_bq(decomp_df, "seasonal_decomposition_stats_july2025")
        plt.show()
else:
    print("DataFrame 'df' not available.")

In [None]:
# =========================
# ===== EVENT IMPACT ======
# =========================
def outage_impact(df: pd.DataFrame):
    """Spread during system events vs non-events (proxy for outage/stress periods)."""
    if "spread" not in df.columns or "unplanned_event" not in df.columns:
        return pd.DataFrame()
    d = df[["spread","unplanned_event"]].dropna()
    if d.empty:
        return pd.DataFrame()
    a = d.loc[d["unplanned_event"] == True,  "spread"].values
    b = d.loc[d["unplanned_event"] == False, "spread"].values
    if len(a) < 5 or len(b) < 5:
        return pd.DataFrame()
    tstat, pval = stats.ttest_ind(a, b, equal_var=False, nan_policy="omit")
    return pd.DataFrame([{
        "metric": "spread_during_system_events",
        "mean_with_event": float(np.nanmean(a)),
        "mean_without_event": float(np.nanmean(b)),
        "mean_diff": float(np.nanmean(a) - np.nanmean(b)),
        "t_stat": float(tstat),
        "p_value": float(pval),
        "n_with": int(np.isfinite(a).sum()),
        "n_without": int(np.isfinite(b).sum())
    }])

if 'df' in locals() and not df.empty:
    outage_df = outage_impact(df)
    if not outage_df.empty:
        print("Outage Impact on Spread:")
        display(outage_df)
        write_bq(outage_df, "outage_impact_results_july2025")
else:
    print("DataFrame 'df' not available.")

In [None]:
# =========================
# ===== NESO BEHAVIOR =====
# =========================
def neso_behavior(df: pd.DataFrame):
    """Spread ~ NESO balancing cost intensity."""
    # This function requires 'balancing_cost' which is not in the simplified loader.
    # We will skip this for now.
    if not {"spread","balancing_cost"}.issubset(df.columns):
        print("[INFO] Skipping NESO behavior analysis: 'balancing_cost' column not available.")
        return pd.DataFrame()
    
    d = df[["spread","balancing_cost"]].dropna()
    if len(d) < 50:
        return pd.DataFrame()
    X = sm.add_constant(d["balancing_cost"].astype(float))
    y = d["spread"].astype(float)
    model = sm.OLS(y, X).fit()
    out = {
        "model": "Spread_on_BalancingCosts",
        "n_obs": int(model.nobs),
        "r_squared": float(model.rsquared),
        "adj_r_squared": float(model.rsquared_adj),
        "beta_balancing_cost": float(model.params.get("balancing_cost", np.nan)),
        "p_balancing_cost": float(model.pvalues.get("balancing_cost", np.nan)),
    }
    return pd.DataFrame([out])

if 'df' in locals() and not df.empty:
    neso_df = neso_behavior(df)
    if not neso_df.empty:
        print("NESO Behavior Analysis:")
        display(neso_df)
        write_bq(neso_df, "neso_behavior_results_july2025")
else:
    print("DataFrame 'df' not available.")

In [None]:
def analyze_bid_offer_acceptances():
    """
    Analyzes Bid-Offer Acceptances (BOD) data to create a summary table.
    Includes a progress bar for the data download from BigQuery.
    """
    client = bigquery.Client(project=PROJECT_ID, location=LOCATION)
    
    # This query assumes the table is named based on the month of data loaded.
    # It calculates volumes, VWAP, and notional values per BMU per settlement period.
    sql = f"""
    SELECT
        DATE(timeFrom) as date,
        settlementPeriod as settlement_period,
        bmUnit AS bmu_id,
        SUM(CASE WHEN bidOfferLevel < 0 THEN -bidOfferLevel ELSE 0 END) AS accepted_bid_volume,
        SUM(CASE WHEN bidOfferLevel > 0 THEN bidOfferLevel ELSE 0 END) AS accepted_offer_volume,
        
        SAFE_DIVIDE(
            SUM(CASE WHEN bidOfferLevel < 0 THEN -bidOfferLevel * bidOfferPrice ELSE 0 END),
            NULLIF(SUM(CASE WHEN bidOfferLevel < 0 THEN -bidOfferLevel ELSE 0 END), 0)
        ) AS vwap_bid_price,
        
        SAFE_DIVIDE(
            SUM(CASE WHEN bidOfferLevel > 0 THEN bidOfferLevel * bidOfferPrice ELSE 0 END),
            NULLIF(SUM(CASE WHEN bidOfferLevel > 0 THEN bidOfferLevel ELSE 0 END), 0)
        ) AS vwap_offer_price,
        
        SUM(CASE WHEN bidOfferLevel < 0 THEN -bidOfferLevel * bidOfferPrice ELSE 0 END) as notional_value_bid,
        SUM(CASE WHEN bidOfferLevel > 0 THEN bidOfferLevel * bidOfferPrice ELSE 0 END) as notional_value_offer
    FROM
        `{PROJECT_ID}.{DATASET_SOURCE}.elexon_boalf_2025_07`
    GROUP BY
        date, settlement_period, bmu_id
    ORDER BY
        date, settlement_period, bmu_id;
    """
    
    print("Executing Bid-Offer analysis query and downloading data...")
    
    # The to_dataframe() call will display a progress bar in the notebook.
    df = client.query(sql).to_dataframe(
        create_bqstorage_client=True,
        progress_bar_type='tqdm'
    )
    
    if df.empty:
        print("[WARN] No Bid-Offer data returned from BigQuery.")
    else:
        print(f"Successfully downloaded {len(df)} rows of Bid-Offer data.")
        
    return df

# Execute the analysis and display a preview of the results
bod_analytics_df = analyze_bid_offer_acceptances()

if not bod_analytics_df.empty:
    print("\nBid-Offer Acceptance Analysis Results (first 5 rows):")
    display(bod_analytics_df.head())
    
    # Write the results to a new BigQuery table
    write_bq(bod_analytics_df, "bod_analytics_july2025")

In [None]:
def plot_accepted_volumes(df: pd.DataFrame):
    """
    Plots the total accepted bid and offer volumes over time.
    """
    if df.empty:
        print("[INFO] Cannot plot volumes, DataFrame is empty.")
        return None

    # Aggregate volumes by date for a clearer plot
    df_agg = df.groupby('date').agg({
        'accepted_bid_volume': 'sum',
        'accepted_offer_volume': 'sum'
    }).reset_index()

    fig, ax = plt.subplots(figsize=(15, 7))
    
    # Plotting as a stacked area chart
    ax.stackplot(
        df_agg['date'],
        df_agg['accepted_bid_volume'],
        df_agg['accepted_offer_volume'],
        labels=['Accepted Bid Volume (MWh)', 'Accepted Offer Volume (MWh)'],
        colors=['#2ca02c', '#d62728'], # Green for bids, Red for offers
        alpha=0.7
    )
    
    ax.set_title('Total Accepted Bid and Offer Volumes for July 2025', fontsize=16)
    ax.set_xlabel('Date', fontsize=12)
    ax.set_ylabel('Volume (MWh)', fontsize=12)
    ax.legend(loc='upper left')
    ax.grid(True, which='both', linestyle='--', linewidth=0.5)
    
    # Improve date formatting on the x-axis
    fig.autofmt_xdate()
    
    path = save_plot(fig, "bod_accepted_volumes_july2025.png")
    if path:
        print(f"[PLOT] Accepted volumes chart saved to: {path}")
        
    return fig

# Generate and display the plot
if 'bod_analytics_df' in locals() and not bod_analytics_df.empty:
    volumes_plot = plot_accepted_volumes(bod_analytics_df)
    if volumes_plot:
        plt.show()
else:
    print("BOD analytics DataFrame not available. Please run the previous cell first.")

---
# **Analysis Module 1: Bid-Offer Acceptances (BOD)**

**Primary Goal:** See accepted bids/offers and price/volume dynamics by Settlement Period and Balancing Mechanism Unit (BMU).

**Best Charts:**
*   **Line (stacked area):** Accepted bid/offer volume by SP/day.
*   **Box/Violin:** Bid/offer prices per hour.
*   **Scatter:** Price vs accepted volume.
*   **Heatmap:** Average accepted price by hour × weekday.

**Recommended Data Table (`bod_analytics`):**
| Column | Description |
| :--- | :--- |
| `date` | The settlement date. |
| `settlement_period` | The half-hourly settlement period (1-48). |
| `bmu_id` | The Balancing Mechanism Unit ID. |
| `accepted_bid_volume` | Total volume of accepted bids (MWh). |
| `accepted_offer_volume`| Total volume of accepted offers (MWh). |
| `vwap_bid_price` | Volume-weighted average price of accepted bids (£/MWh). |
| `vwap_offer_price` | Volume-weighted average price of accepted offers (£/MWh). |
| `notional_value_bid` | Total value of accepted bids (£). |
| `notional_value_offer` | Total value of accepted offers (£). |