# irp-dbk24 - "Optimising Demand Response Strategies for Carbon-Intelligent Electricity Use"

# Developing Marginal Emissions Models

**NOTEBOOK PURPOSE(S):**
* Reproduce the R Analysis provided by Shefali


**LIMITATIONS:**

**NOTEBOOK OUTPUTS:**

    

### Importing Libraries

In [103]:
# ────────────────────────────────────────────────────────────────────────────
# Future (must be first)
# ────────────────────────────────────────────────────────────────────────────
from __future__ import annotations

# ────────────────────────────────────────────────────────────────────────────
# Jupyter/Notebook Setup
# ────────────────────────────────────────────────────────────────────────────
%matplotlib inline
from IPython.display import display

# ────────────────────────────────────────────────────────────────────────────
# Standard Library
# ────────────────────────────────────────────────────────────────────────────
import binascii
import calendar
import json
import logging
import math
import os
import random
import re
import hashlib
import inspect
import time
from copy import deepcopy
from contextlib import contextmanager
from dataclasses import dataclass
from datetime import datetime, timedelta
from functools import partial, wraps
from itertools import combinations, product
from multiprocessing import Manager, Pool, Lock, cpu_count
from multiprocessing.pool import ThreadPool
from pathlib import Path
from typing import (
    Any, Callable, Dict, Iterable, List, Mapping, Optional, Sequence, Tuple, Union
)
from zoneinfo import ZoneInfo

# ────────────────────────────────────────────────────────────────────────────
# Core Data Handling
# ────────────────────────────────────────────────────────────────────────────
import numpy as np
import pandas as pd
import polars as pl

# ────────────────────────────────────────────────────────────────────────────
# Machine Learning & Statistics
# ────────────────────────────────────────────────────────────────────────────
from feature_engine.creation import CyclicalFeatures
from scipy.stats import kurtosis, skew, zscore
import statsmodels.api as sm
import statsmodels.formula.api as smf
from statsmodels.stats.outliers_influence import variance_inflation_factor

from sklearn.base import BaseEstimator, TransformerMixin, clone
from sklearn.compose import ColumnTransformer
from sklearn.kernel_approximation import RBFSampler
from sklearn.linear_model import LinearRegression
from sklearn.metrics import (
    mean_absolute_error,
    mean_squared_error,
    r2_score,
    root_mean_squared_error,
)
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, SplineTransformer
from sklearn.utils.validation import check_is_fitted

# ────────────────────────────────────────────────────────────────────────────
# Visualization
# ────────────────────────────────────────────────────────────────────────────
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import matplotlib.ticker as ticker
import seaborn as sns

# ────────────────────────────────────────────────────────────────────────────
# Geospatial
# ────────────────────────────────────────────────────────────────────────────
import geopandas as gpd
from shapely.geometry import Point, Polygon
from shapely.wkb import loads
from pyproj import Proj, transform


## Functions

### Utilities

#### CSV File Handling

In [104]:
def _drop_hash_from_part(
        part_path: Path,
        model_hash: str,
        *,
        chunk_size: int = 200_000,
        delete_if_empty: bool = False,
) -> int:
    """
    Remove rows with model_id_hash == `model_hash` from a CSV part file.

    - Streams in chunks (no huge memory spikes)
    - Writes to a temp file, then atomically replaces the original
    - Returns number of rows dropped
    - If all rows are dropped:
        • delete the file if `delete_if_empty=True`
        • otherwise keep a header-only CSV

    Parameters
    ----------
    part_path : Path
        CSV file to edit in place.
    model_hash : str
        Value to filter out from the 'model_id_hash' column.
    chunk_size : int, default 200_000
        Pandas read_csv chunk size.
    delete_if_empty : bool, default False
        If True and all rows are removed, delete the part file.

    Returns
    -------
    int
        Number of rows removed.
    """
    part_path = Path(part_path)
    if not part_path.exists():
        return 0

    # Quick header check
    try:
        header_df = pd.read_csv(part_path, nrows=0)
    except Exception:
        # Broken file — leave as-is
        return 0
    if "model_id_hash" not in header_df.columns:
        return 0

    dropped = 0
    kept = 0
    tmp_path = part_path.with_suffix(part_path.suffix + ".tmp")

    # Ensure no stale tmp
    if tmp_path.exists():
        try:
            tmp_path.unlink()
        except Exception:
            pass

    first_write = True
    try:
        for chunk in pd.read_csv(
            part_path,
            chunksize=chunk_size,
            dtype={"model_id_hash": "string"},  # force string, avoid numeric coercion
        ):
            if "model_id_hash" not in chunk.columns:
                # schema changed mid-file? abort safely
                dropped = 0
                kept = -1
                break
            mask = chunk["model_id_hash"] != model_hash
            kept_chunk = chunk.loc[mask]
            n_dropped = int((~mask).sum())
            dropped += n_dropped
            kept += int(mask.sum())

            if kept_chunk.empty:
                continue

            kept_chunk.to_csv(
                tmp_path,
                index=False,
                mode="w" if first_write else "a",
                header=first_write,
            )
            first_write = False

        # Nothing matched → no change
        if dropped == 0:
            if tmp_path.exists():
                # wrote identical content; discard temp
                try: tmp_path.unlink()
                except Exception: pass
            return 0

        # All rows removed
        if kept == 0:
            if delete_if_empty:
                # Delete original; remove temp if created
                try: part_path.unlink()
                except Exception: pass
                if tmp_path.exists():
                    try: tmp_path.unlink()
                    except Exception: pass
            else:
                # Replace with header-only CSV
                header_df.to_csv(tmp_path, index=False)
                os.replace(tmp_path, part_path)
            return dropped

        # Normal case: replace atomically
        os.replace(tmp_path, part_path)
        return dropped

    finally:
        # Best-effort cleanup
        if tmp_path.exists():
            try: os.remove(tmp_path)
            except Exception: pass


In [105]:
def is_model_logged_rotating_csv(
        model_hash: str,
        base_dir: str | Path,
        file_prefix: str
) -> bool:
    """
    Return True if `model_hash` appears in the rolling-log index for `file_prefix`.

    Parameters
    ----------
    model_hash : str
        The 'model_id_hash' value to look up.
    base_dir : str | Path
        Directory holding the rolling CSV parts and index.
    file_prefix : str
        Prefix of the rolling log.

    Returns
    -------
    bool
        True if present in the index; False otherwise.
    """
    idx = _read_index(_index_path(Path(base_dir), file_prefix))
    if idx.empty or "model_id_hash" not in idx.columns:
        return False
    return str(model_hash) in idx["model_id_hash"].astype("string").values

In [106]:
def _list_part_files(
    base_dir: Path,
    file_prefix: str,
    ext: str = "csv",
) -> list[Path]:
    """
    List existing rolling CSV parts for a given prefix, sorted by numeric part index.

    Parameters
    ----------
    base_dir : Path
        Directory to search.
    file_prefix : str
        Prefix of the rolling CSV set (e.g., 'marginal_emissions_log').
    ext : str, default 'csv'
        File extension (without dot).

    Returns
    -------
    list[Path]
        Sorted list of matching part files, e.g. [.../prefix.part000.csv, .../prefix.part001.csv, ...]
    """
    if not base_dir.exists():
        return []

    rx = re.compile(rf"^{re.escape(file_prefix)}\.part(\d+)\.{re.escape(ext)}$")
    parts: list[tuple[int, Path]] = []

    for p in base_dir.glob(f"{file_prefix}.part*.{ext}"):
        if not p.is_file():
            continue
        m = rx.match(p.name)
        if m:
            parts.append((int(m.group(1)), p))

    parts.sort(key=lambda t: t[0])
    return [p for _, p in parts]


In [107]:
def load_all_logs_rotating_csv(
    results_dir: str | Path = ".",
    file_prefix: str = "marginal_emissions_log",
) -> pd.DataFrame:
    """
    Read only parts referenced by the index; drop duplicate hashes (keep last).

    Parameters
    ----------
    results_dir: str | Path
        The directory containing the results.
    file_prefix: str
        The prefix of the log files to read.

    Returns
    -------
    pd.DataFrame
        The concatenated DataFrame containing the logs.
    """
    # Read the index file
    base_dir = Path(results_dir)
    idx = _read_index(_index_path(base_dir, file_prefix))
    # Check if the index is empty
    if idx.empty:
        return pd.DataFrame()
    # Get the unique parts to read
    parts = idx["part_file"].unique().tolist()
    # Read the parts into DataFrames
    dfs = [pd.read_csv(p) for p in parts if Path(p).exists()]
    # Check if any DataFrames were read
    if not dfs:
        return pd.DataFrame()
    # Concatenate the DataFrames
    out = pd.concat(dfs, ignore_index=True)
    # Drop duplicate model_id_hash entries
    if "model_id_hash" in out.columns:
        out = out.drop_duplicates(subset=["model_id_hash"], keep="last")
    return out


In [108]:
def _read_index(index_path: Path) -> pd.DataFrame:
    """
    Read the rolling-log index CSV (id→part mapping).

    Parameters
    ----------
    index_path : Path
        Path to '<file_prefix>_index.csv'.

    Returns
    -------
    pd.DataFrame
        Columns ['model_id_hash','part_file'] or empty frame if not found/invalid.
    """
    try:
        idx = pd.read_csv(index_path, dtype={"model_id_hash": "string", "part_file": "string"})
        if not {"model_id_hash","part_file"}.issubset(idx.columns):
            raise ValueError("Index missing required columns.")
        return idx
    except FileNotFoundError:
        return pd.DataFrame(columns=["model_id_hash","part_file"])
    except Exception:
        # Be permissive but return the expected schema
        return pd.DataFrame(columns=["model_id_hash","part_file"])


In [109]:
def remove_model_from_rotating_csv(
        model_hash: str,
        results_dir: str | Path = ".",
        file_prefix: str = "marginal_emissions_log",
) -> None:
    """
    Remove all rows with `model_id_hash == model_hash` from the rolling CSV set.

    Parameters
    ----------
    model_hash : str
        Identifier to remove.
    results_dir : str | Path, default "."
        Directory holding parts and index.
    file_prefix : str, default "marginal_emissions_log"
        Prefix of the rolling log files.
    """
    base_dir = _ensure_dir(Path(results_dir))
    idx_path = _index_path(base_dir, file_prefix)

    # Lock the index for the whole operation to avoid races with concurrent writers/readers
    with _file_lock(_index_lock_path(idx_path)):
        idx = _read_index(idx_path)
        if idx.empty:
            return

        # Drop from referenced part files
        for pf in idx.loc[idx["model_id_hash"] == model_hash, "part_file"].dropna().unique():
            _drop_hash_from_part(Path(pf), model_hash)

        # Update index
        idx = idx[idx["model_id_hash"] != model_hash]
        idx.to_csv(idx_path, index=False)


In [110]:
def save_summary_to_rotating_csv(
        summary_df: pd.DataFrame,
        results_dir: str | Path = ".",
        file_prefix: str = "marginal_emissions_log",
        max_mb: int = 95,
        force_overwrite: bool = False,
        naming: PartNaming | None = None,
        fsync: bool = False,
) -> Path:
    """
    Append a single-row summary to a rolling CSV (<prefix>.partNNN.csv) with strict rotation:
    - Per-file lock during append (prevents interleaved writes/duplicate headers)
    - Under-lock preflight ensures the write will NOT push the file over `max_mb`
      (allocates a new shard if necessary)
    - Atomic index update under lock

    Parameters
    ----------
    summary_df : pd.DataFrame
        Single-row DataFrame with at least a 'model_id_hash' column.
    results_dir : str | Path, default "."
        Directory to write parts and the index into.
    file_prefix : str, default "marginal_emissions_log"
        Prefix of the part files ('<prefix>.partNNN.csv').
    max_mb : int, default 95
        Rotate when current part would exceed this size (MiB) after the append.
    force_overwrite : bool, default False
        If True, delete existing rows with the same hash before appending.
    naming : PartNaming, optional
        Naming convention (token/width/ext). If provided, `ext` should include the dot
        (e.g., ".csv"). Internally we use the extension without the dot for matching.
    fsync : bool, default False
        If True, call fsync() on the file after writing to ensure data is flushed to disk.

    Returns
    -------
    Path
        The part file path that received the append.

    Raises
    ------
    ValueError
        If `summary_df` is empty or missing 'model_id_hash'.
    """
    if summary_df.empty:
        raise ValueError("summary_df is empty.")
    if "model_id_hash" not in summary_df.columns:
        raise ValueError("summary_df must contain 'model_id_hash'.")
    if len(summary_df) != 1:
        summary_df = summary_df.iloc[:1].copy()

    naming = naming or PartNaming()
    base_dir = _ensure_dir(Path(results_dir))
    idx_path = _index_path(base_dir, file_prefix)
    model_hash = str(summary_df["model_id_hash"].iloc[0])
    ext_nodot = naming.ext.lstrip(".")

    # Optional overwrite: remove old rows (parts + index)
    if force_overwrite:
        remove_model_from_rotating_csv(model_hash, base_dir, file_prefix)
    else:
        if is_model_logged_rotating_csv(model_hash, base_dir, file_prefix):
            print(f"[SKIP] Hash already indexed: {model_hash}")
            parts = _list_part_files(base_dir, file_prefix, ext=ext_nodot)
            return parts[-1] if parts else base_dir / naming.format(file_prefix, 0)

    # Determine candidate shard
    parts = _list_part_files(base_dir, file_prefix, ext=ext_nodot)
    if parts:
        target = parts[-1]
    else:
        target = allocate_next_part(base_dir, file_prefix, width=naming.width, ext=ext_nodot)

    threshold_bytes = int(max_mb * 1024 * 1024)

    # --- LOCK AND WRITE TO SHARD SAFELY ---
    while True:
        shard_lock = Path(str(target) + ".lock")
        with _file_lock(shard_lock):
            current_size = Path(target).stat().st_size if Path(target).exists() else 0
            write_header = (current_size == 0)
            csv_payload = summary_df.to_csv(index=False, header=write_header)
            payload_bytes = len(csv_payload.encode("utf-8"))

            if current_size + payload_bytes > threshold_bytes:
                # rotate: leave lock, allocate new shard, try again
                pass
            else:
                with open(target, "a", encoding="utf-8", newline="") as f:
                    f.write(csv_payload)
                    f.flush()
                    if fsync:
                        os.fsync(f.fileno())
                break

        target = allocate_next_part(base_dir, file_prefix, width=naming.width, ext=ext_nodot)

    # --- LOCK AND UPDATE INDEX (atomic replace + optional fsync) ---
    lock_path = _index_lock_path(idx_path)
    with _file_lock(lock_path):
        idx = _read_index(idx_path)
        already = ("model_id_hash" in idx.columns) and (model_hash in idx["model_id_hash"].astype("string").values)
        if not already:
            idx = pd.concat(
                [idx, pd.DataFrame([{"model_id_hash": model_hash, "part_file": str(target)}])],
                ignore_index=True,
            )
            tmp_idx = idx_path.with_suffix(idx_path.suffix + ".tmp")
            with open(tmp_idx, "w", encoding="utf-8", newline="") as fh:
                idx.to_csv(fh, index=False)
                fh.flush()
                if fsync:
                    os.fsync(fh.fileno())
            os.replace(tmp_idx, idx_path)
            if fsync:
                # Ensure directory entry for index is durable
                dir_fd = os.open(str(idx_path.parent), os.O_DIRECTORY)
                try:
                    os.fsync(dir_fd)
                finally:
                    os.close(dir_fd)

    print(f"[SAVE] Appended to {target}, index updated.")
    return target

#### General

In [111]:
def _file_size_mb(path: Path) -> float:
    """
    Return size of `path` in MiB. If file doesn't exist, returns 0.0.

    Parameters
    ----------
    path : Path
        Path to the file.

    Returns
    -------
    float
        Size of the file in MiB.
    """
    p = Path(path)
    if not p.exists():
        return 0.0
    return p.stat().st_size / (1024 * 1024.0)


#### Logging

In [112]:
def load_existing_hashes(
        results_dir: str | Path,
        file_prefix: str,
) -> set[str]:
    """
    Get all unique `model_id_hash` values from the rolling-log index.

    Parameters
    ----------
    results_dir : str | Path
        Directory containing the rolling CSV parts and index.
    file_prefix : str
        Prefix of the rolling log files.

    Returns
    -------
    set[str]
        Unique model_id_hash values present in the index.
    """
    idx = _read_index(_index_path(Path(results_dir), file_prefix))
    if idx.empty or "model_id_hash" not in idx.columns:
        return set()
    # Ensure NA is dropped and cast to Python strings
    return set(idx["model_id_hash"].dropna().astype(str).tolist())


In [113]:
def make_config_key(
        config: Mapping[str, Any],
          algo: str = "sha256"
) -> str:
    """
    Create a deterministic hash key for a configuration mapping.

    Parameters
    ----------
    config : Mapping[str, Any]
        Configuration to serialize. Keys should be stringable.
    algo : {'sha256','md5','sha1',...}, default 'sha256'
        Hash algorithm name passed to hashlib.new.

    Returns
    -------
    str
        Hex digest of the normalized, JSON-serialized configuration.
    """
    def _norm(x):
        # Order/JSON-stable normalization.
        if isinstance(x, Mapping):
            # sort by key string to be robust to non-string keys
            return {str(k): _norm(v) for k, v in sorted(x.items(), key=lambda kv: str(kv[0]))}
        if isinstance(x, (list, tuple)):
            return [_norm(v) for v in x]
        if isinstance(x, set):
            # sets are unordered; sort normalized elements
            return sorted(_norm(v) for v in x)
        if isinstance(x, (np.floating, np.integer, np.bool_)):
            return x.item()
        if isinstance(x, (datetime,)):
            return x.isoformat()
        return x  # strings, ints, floats, bools, None, etc.

    payload = json.dumps(
        _norm(config),
        sort_keys=True,
        separators=(",", ":"),
        ensure_ascii=False,
        default=str,   # last-resort for odd objects
    )
    h = hashlib.new(algo)
    h.update(payload.encode("utf-8"))
    return h.hexdigest()

In [114]:
def signature_for_run(
        user_pipeline: Pipeline,
        x_columns: list[str],
        y: pd.Series | pd.DataFrame,
        *,
        random_state: int,
        eval_splits: tuple[str, ...] = ("train", "validation"),
        compute_test: bool = False,
        extra_info: dict | None = None,
) -> tuple[str, dict]:
    """
    Build a stable config mapping for a model run and return (hash_key, mapping).

    This just standardizes what goes into the signature so different call sites
    don’t accidentally diverge.

    Parameters
    ----------
    user_pipeline : Pipeline
        The user-defined pipeline to run.
    x_columns : list[str]
        The feature columns to use for the model.
    y : pd.Series | pd.DataFrame
        The target variable(s) for the model.
    random_state : int
        The random seed to use for the model.
    eval_splits : tuple[str, ...], default=("train", "validation")
        The data splits to evaluate the model on.
    compute_test : bool, default=False
        Whether to compute metrics on the test split.
    extra_info : dict | None, default=None
        Any extra information to include in the signature.

    Returns
    -------
    tuple[str, dict]
        The hash key and the signature mapping.
    """
    sig = {
        "pipeline_params": user_pipeline.get_params(deep=True),
        "x_columns": list(x_columns),
        "y_columns": _y_columns_for_signature(y),
        "random_state": int(random_state),
        "eval_splits": tuple(eval_splits),
        "compute_test": bool(compute_test),
        **(extra_info or {}),
    }
    return make_config_key(sig), sig

In [115]:
def _y_columns_for_signature(y: pd.Series | pd.DataFrame) -> list[str]:
    """
    Normalize y to a list of column names for signature purposes.

    Parameters
    ----------
    y : pd.Series | pd.DataFrame
        The target variable(s) for the model.

    Returns
    -------
    list[str]
        The list of column names for the target variable(s).
    """
    if isinstance(y, pd.DataFrame):
        if y.shape[1] != 1:
            raise ValueError("y must be a Series or single-column DataFrame for signature.")
        return [str(y.columns[0])]
    name = getattr(y, "name", None)
    return [str(name)] if name is not None else ["y"]


#### MPI Management

In [116]:
def allocate_next_part(
        base_dir: Path,
        file_prefix: str,
        width: int = 3,
        ext: str = "csv",
        max_retries: int = 32,
        jitter_ms: tuple[int, int] = (1, 40),
) -> Path:
    """
    Atomically allocate the next rotating part file by creating it exclusively.

    Uses os.open(..., O_CREAT|O_EXCL) so only one process can create a given part.
    If another process wins the race, we re-scan and try the next part number.

    Parameters
    ----------
    base_dir : Path
        Directory to write part files into (created if missing).
    file_prefix : str
        Prefix used before ".partNNN.<ext>".
    width : int, default 3
        Minimum zero-padding for part numbers if none exist.
    ext : str, default "csv"
        Extension without dot.
    max_retries : int, default 32
        Maximum attempts before giving up.
    jitter_ms : (int, int), default (1, 40)
        Random backoff (min,max) milliseconds between retries.

    Returns
    -------
    Path
        The newly created, zero-length part file path (claimed for you).

    Raises
    ------
    RuntimeError
        If a unique part file cannot be allocated within max_retries.
    """
    base_dir = Path(base_dir)
    base_dir.mkdir(parents=True, exist_ok=True)

    for _ in range(max_retries):
        path = _next_csv_part_path(base_dir, file_prefix, width=width, ext=ext)
        flags = os.O_CREAT | os.O_EXCL | os.O_WRONLY
        try:
            fd = os.open(path, flags)  # atomic claim
            os.close(fd)               # leave it for normal open() later
            return path
        except FileExistsError:
            # Someone else grabbed it; small random backoff, then try again
            time.sleep(random.uniform(*jitter_ms) / 1000.0)
            continue

    raise RuntimeError("Failed to allocate a unique part file after many attempts")


In [117]:
def _distribute_configs(
        configs: list[dict],
        rank: int,
        size: int,
        mode: str = "stride"
) -> list[dict]:
    """
    Distribute configurations across multiple ranks.

    Parameters
    ----------
    configs: list[dict]
        The list of configurations to distribute.
    rank: int
        The rank of the current process.
    size: int
        The total number of processes.
    mode: str
        The distribution mode ("stride" or "chunked").

    Returns
    -------
    list[dict]
        The distributed list of configurations.
    """
    # Handle single process case
    if size <= 1:
        return configs
    # Handle multi-process case
    if mode == "stride":
        return configs[rank::size]
    # chunked
    n = len(configs)
    start = (n * rank) // size
    end   = (n * (rank + 1)) // size
    return configs[start:end]

In [118]:
@contextmanager
def _file_lock(lock_path: Path, max_wait_s: float = 30.0, jitter_ms: tuple[int,int]=(2,25)):
    """
    Simple cross-process lock using O_CREAT|O_EXCL on a lockfile.

    Parameters
    ----------
    lock_path : Path
        Path to the lock file to create.
    max_wait_s : float, default 30.0
        Maximum time to wait for the lock before raising TimeoutError.
    jitter_ms : (int,int), default (2,25)
        Randomized backoff between retries, in milliseconds.

    Yields
    ------
    None
        The lock is held for the duration of the context.
    """
    # Create the lock file
    deadline = time.time() + float(max_wait_s)
    lock_path = Path(lock_path)
    last_err = None
    # Wait for the lock to be available
    while time.time() < deadline:
        try:
            fd = os.open(lock_path, os.O_CREAT | os.O_EXCL | os.O_WRONLY)
            os.close(fd)
            try:
                yield
            finally:
                try:
                    os.unlink(lock_path)
                except FileNotFoundError:
                    pass
            return
        except FileExistsError as e:
            last_err = e
            time.sleep(random.uniform(*jitter_ms) / 1000.0)
    raise TimeoutError(f"Could not acquire lock: {lock_path}") from last_err

In [119]:
def _mpi_context():
    """
    Get the MPI context for distributed training.

    Returns
    -------
    Tuple[COMM, int, int]
        The MPI communicator, rank, and size.
    """
    try:
        from mpi4py import MPI  # ensures import
        comm = MPI.COMM_WORLD
        return comm, comm.Get_rank(), comm.Get_size()
    except Exception:
        class _Dummy:  # single-process stub
            def bcast(self, x, root=0): return x
            def Barrier(self): pass
        return _Dummy(), 0, 1

#### Naming Conventions

In [120]:
@dataclass(frozen=True)
class PartNaming:
    token: str = ".part"   # separator between stem and index
    width: int = 3         # zero-pad width
    ext: str = ".csv"      # file extension, with leading dot

    def format(self,
            stem: str,
            idx: int
    ) -> str:
        """
        Format a part filename.

        Parameters
        ----------
        stem : str
            The base name of the file (without extension or part token).
        idx : int
            The part index (zero-padded).

        Returns
        -------
        str
            The formatted part filename.
        """
        return f"{stem}{self.token}{idx:0{self.width}d}{self.ext}"

    def split(self,
            name: str
    ) -> Tuple[str, int | None]:
        """
        Split a part filename into its stem and index.

        Parameters
        ----------
        name : str
            The part filename to split.

        Returns
        -------
        Tuple[str, int | None]
            The stem and index of the part filename.
        """
        # returns (stem, idx) where idx is None if no part index present
        if not name.endswith(self.ext):
            # unknown extension; treat everything before first '.' as stem
            p = Path(name)
            return (p.stem, None)
        base = name[: -len(self.ext)]
        if self.token in base:
            stem, idx_str = base.split(self.token, 1)
            if idx_str.isdigit():
                return stem, int(idx_str)
        return base, None


#### Path and Directory Management

In [121]:
def _ensure_dir(
        d: str | Path,
        *,
        resolve: bool = True
) -> Path:
    """
    Ensure directory `d` exists and return it as a Path.

    - Creates parent directories as needed.
    - Raises a clear error if a non-directory already exists at `d`.
    - Optionally returns the resolved (absolute) path.

    Parameters
    ----------
    d : str | Path
        Directory path to create if missing.
    resolve : bool, default True
        If True, return Path.resolve(strict=False) to normalize/absolutize.

    Returns
    -------
    Path
        The (optionally resolved) directory path.
    """
    p = Path(d)
    if p.exists() and not p.is_dir():
        raise NotADirectoryError(f"Path exists and is not a directory: {p}")
    p.mkdir(parents=True, exist_ok=True)
    return p.resolve(strict=False) if resolve else p


In [122]:
def _index_lock_path(index_path: Path) -> Path:
    """
    Derive the lock file path for an index CSV (same directory, '.lock' suffix).

    Parameters
    ----------
    index_path : Path
        Path to the index CSV file.

    Returns
    -------
    Path
        Path to the lock file.
    """
    return index_path.with_suffix(index_path.suffix + ".lock")

In [123]:
def _index_path(
        base_dir: Path,
        file_prefix: str
) -> Path:
    """
    Build the path to the global index CSV for a given rolling log set.

    Parameters
    ----------
    base_dir : Path
        Directory that holds the rolling CSV parts.
    file_prefix : str
        Prefix used by the rolling CSV (e.g., 'marginal_emissions_log').

    Returns
    -------
    Path
        '<base_dir>/<file_prefix>_index.csv'
    """
    return Path(base_dir) / f"{file_prefix}_index.csv"


In [124]:
def _next_csv_part_path(base_dir: Path, file_prefix: str, width: int = 3, ext: str = "csv") -> Path:
    """
    Return the next available rotating-CSV part path.

    Scans for files named "<file_prefix>.partNNN.<ext>" in `base_dir`, where NNN is an
    integer with zero-padding. Picks max(N) and returns the next. If none exist, returns
    "...part000.<ext>" (or the padding width you pass).

    Parameters
    ----------
    base_dir : Path
        Directory to scan for part files.
    file_prefix : str
        Prefix used before ".partNNN.<ext>".
    width : int, default 3
        Minimum zero-padding width if no files exist yet.
    ext : str, default "csv"
        File extension (without dot).

    Returns
    -------
    Path
        Path for the next part file (not created).
    """
    if width < 1:
        raise ValueError("width must be >= 1")

    base_dir = Path(base_dir)
    pattern = re.compile(rf"^{re.escape(file_prefix)}\.part(\d+)\.{re.escape(ext)}$")

    max_n = -1
    pad = width

    for p in base_dir.glob(f"{file_prefix}.part*.{ext}"):
        m = pattern.match(p.name)
        if not m:
            continue
        n_str = m.group(1)
        pad = max(pad, len(n_str))
        try:
            n = int(n_str)
        except ValueError:
            continue
        if n > max_n:
            max_n = n

    next_n = max_n + 1
    n_str = f"{next_n:0{pad}d}"
    return base_dir / f"{file_prefix}.part{n_str}.{ext}"

In [125]:
def _roll_if_needed(
        path: Path,
        max_mb: int,
        *,
        naming: PartNaming | None = None
) -> Path:
    """
    If `path` exists and is >= max_mb, return the *next* part filename.
    Otherwise return `path` unchanged.

    Parameters
    ----------
    path : Path
        Current part file path (e.g., 'prefix.part007.csv').
    max_mb : int
        Rotation threshold in mebibytes (MiB).
    naming : PartNaming, optional
        Naming convention (token/width/ext). Uses defaults if not provided.

    Returns
    -------
    Path
        Either `path` or a new sibling with incremented part index.
    """
    if not path.exists() or _file_size_mb(path) < float(max_mb):
        return path
    naming = naming or PartNaming()
    stem, idx = naming.split(path.name)
    next_idx = (idx or 0) + 1
    return path.with_name(naming.format(stem=stem, idx=next_idx))


#### Scoring & Metrics

In [126]:
def _compute_group_energy_weights(
    df: pd.DataFrame,
    group_col: str,
    q_col: str,
    interval_hours: float = 0.5,
) -> pd.DataFrame:
    """
    Aggregate energy weights by group.

    Parameters
    ----------
    df : pd.DataFrame
        Rows for a single split after preprocessing (must contain `group_col` and `q_col`).
    group_col : str
        Name of the group id column (e.g., 'median_group_id', 'quantile_group_id').
    q_col : str
        Name of the demand/quantity column used as Q in the regression (usually x_vars[0]).
    interval_hours : float, default 0.5
        Duration represented by each row in hours (half-hourly = 0.5).

    Returns
    -------
    pd.DataFrame
        Columns: [group_col, 'q_sum', 'energy_MWh']
        where energy_MWh = q_sum * interval_hours.
    """
    if group_col not in df.columns:
        raise KeyError(f"'{group_col}' not found in df")
    if q_col not in df.columns:
        raise KeyError(f"'{q_col}' not found in df")
    if not np.issubdtype(np.asarray(df[q_col]).dtype, np.number):
        raise TypeError(f"'{q_col}' must be numeric")
    if interval_hours <= 0:
        raise ValueError("interval_hours must be > 0")

    g = (
        df.groupby(group_col, observed=True)[q_col]
          .sum()
          .rename("q_sum")
          .reset_index()
    )
    g["energy_MWh"] = g["q_sum"] * float(interval_hours)
    return g


In [127]:
def finite_difference_me_metrics(
    df: pd.DataFrame,
    time_col: str = "timestamp",
    q_col: str = "demand_met",
    y_col: str = "tons_co2",
    me_col: str = "ME",
    group_keys: list[str] | tuple[str, ...] = ("city",),
    max_dt: pd.Timedelta = pd.Timedelta("2h"),
    min_abs_dq: float = 1e-6,
) -> pd.DataFrame:
    """
    Compare predicted ME to observed short-horizon slopes s = Δy/ΔQ on held-out data.

    For each group in `group_keys`:
      Δy = y_t - y_{t-1}, ΔQ = Q_t - Q_{t-1}, Δt = t - t_{t-1}
      Keep pairs with Δt ≤ max_dt and |ΔQ| ≥ min_abs_dq.
      s_t = Δy / ΔQ, ME_avg = 0.5*(ME_t + ME_{t-1})

    Returns
    -------
    pd.DataFrame
        One row per group and an optional pooled 'ALL' row:
        ['pearson_r','spearman_r','rmse','mae','n_pairs', *group_keys]
    """
    if time_col not in df.columns:
        raise KeyError(f"'{time_col}' not in df")
    # ensure datetime for Δt filtering
    dt_series = pd.to_datetime(df[time_col], errors="coerce")
    if dt_series.isna().any():
        raise ValueError(f"Column '{time_col}' contains non-parseable datetimes")
    work = df.copy()
    work[time_col] = dt_series

    def _per_group(gdf: pd.DataFrame) -> dict:
        gdf = gdf.sort_values(time_col).copy()
        gdf["dt"] = gdf[time_col].diff()
        gdf["dQ"] = gdf[q_col].diff()
        gdf["dY"] = gdf[y_col].diff()
        gdf["ME_avg"] = 0.5 * (gdf[me_col] + gdf[me_col].shift(1))

        mask = (
            gdf["dt"].notna() & (gdf["dt"] <= max_dt)
            & gdf["dQ"].notna() & (np.abs(gdf["dQ"]) >= float(min_abs_dq))
            & gdf["dY"].notna() & gdf["ME_avg"].notna()
        )
        sub = gdf.loc[mask, ["dY", "dQ", "ME_avg"]]
        if sub.empty:
            return {"pearson_r": np.nan, "spearman_r": np.nan, "rmse": np.nan, "mae": np.nan, "n_pairs": 0}

        s = sub["dY"].to_numpy(dtype=float) / sub["dQ"].to_numpy(dtype=float)
        me = sub["ME_avg"].to_numpy(dtype=float)
        return {
            "pearson_r": float(pd.Series(s).corr(pd.Series(me))),
            "spearman_r": float(pd.Series(s).corr(pd.Series(me), method="spearman")),
            "rmse": float(root_mean_squared_error(s, me)),
            "mae": float(mean_absolute_error(s, me)),
            "n_pairs": int(len(sub)),
        }

    parts: list[dict] = []
    if group_keys:
        for keys, gdf in work.groupby(list(group_keys), observed=True, sort=True):
            row = _per_group(gdf)
            if isinstance(keys, tuple):
                for kname, kval in zip(group_keys, keys):
                    row[kname] = kval
            else:
                row[group_keys[0]] = keys
            parts.append(row)
    else:
        parts.append(_per_group(work) | {"group": "ALL"})

    out = pd.DataFrame(parts)

    # pooled row
    if group_keys and (not out.empty) and out["n_pairs"].sum() > 0:
        tmp = []
        for _, gdf in work.groupby(list(group_keys), observed=True, sort=True):
            gdf = gdf.sort_values(time_col).copy()
            gdf["dt"] = gdf[time_col].diff()
            gdf["dQ"] = gdf[q_col].diff()
            gdf["dY"] = gdf[y_col].diff()
            gdf["ME_avg"] = 0.5 * (gdf[me_col] + gdf[me_col].shift(1))
            mask = (
                gdf["dt"].notna() & (gdf["dt"] <= max_dt)
                & gdf["dQ"].notna() & (np.abs(gdf["dQ"]) >= float(min_abs_dq))
                & gdf["dY"].notna() & gdf["ME_avg"].notna()
            )
            sub = gdf.loc[mask, ["dY", "dQ", "ME_avg"]]
            if not sub.empty:
                tmp.append(
                    pd.DataFrame({
                        "s": sub["dY"].to_numpy(dtype=float) / sub["dQ"].to_numpy(dtype=float),
                        "ME_avg": sub["ME_avg"].to_numpy(dtype=float),
                    })
                )
        if tmp:
            pooled = pd.concat(tmp, ignore_index=True)
            pooled_row = {
                "pearson_r": float(pooled["s"].corr(pooled["ME_avg"])),
                "spearman_r": float(pooled["s"].corr(pooled["ME_avg"], method="spearman")),
                "rmse": float(root_mean_squared_error(pooled["s"], pooled["ME_avg"])),
                "mae": float(mean_absolute_error(pooled["s"], pooled["ME_avg"])),
                "n_pairs": int(len(pooled)),
            }
            for k in group_keys:
                pooled_row[k] = "ALL"
            out = pd.concat([out, pd.DataFrame([pooled_row])], ignore_index=True)

    return out


In [128]:
def macro_micro_means(df: pd.DataFrame, metric: str, weight_col: str = "n_obs") -> dict:
    """
    Compute macro (simple mean) and micro (weighted by `weight_col`) for a metric.

    Parameters
    ----------
    df : pd.DataFrame
        Per-group metrics.
    metric : str
        Column name to average.
    weight_col : str, default "n_obs"
        Column to use as weights for micro average.

    Returns
    -------
    dict
        {"macro": float, "micro": float}
    """
    macro = float(np.nanmean(df[metric].to_numpy(dtype=float)))
    if (weight_col in df) and np.nansum(df[weight_col].to_numpy(dtype=float)) > 0:
        micro = float(np.average(df[metric], weights=df[weight_col]))
    else:
        micro = np.nan
    return {"macro": macro, "micro": micro}


In [129]:
def mean_absolute_percentage_error(
        y_true,
        y_pred,
        eps: float = 1e-6
) -> float:
    """
    Compute MAPE robustly - adding small constant to avoid division by zero.

    MAPE = mean(|(y_true - y_pred) / (|y_true| + eps)|) * 100

    Parameters
    ----------
    y_true : array-like
        Ground-truth values.
    y_pred : array-like
        Predicted values.
    eps : float, default 1e-6
        Small constant to avoid division by zero.

    Returns
    -------
    float
        Mean absolute percentage error in percent.
    """
    # true values for y
    yt = np.asarray(y_true, dtype=float)
    # predicted values for y
    yp = np.asarray(y_pred, dtype=float)
    # denominator
    denom = np.abs(yt) + float(eps)
    # compute MAPE
    m = np.abs((yt - yp) / denom)
    # return as percentage (*100)
    return float(np.nanmean(m) * 100.0)


In [130]:
def mean_metric(df: pd.DataFrame, metric: str) -> float:
    """
    Compute the mean of a metric, with a special case for MSE derived from RMSE.

    Parameters
    ----------
    df : pd.DataFrame
        DataFrame containing metric columns.
    metric : {"r2","rmse","mae","mape","n_obs","mse"}
        Metric to aggregate.

    Returns
    -------
    float
        NaN-safe mean of the requested metric.

    Raises
    ------
    KeyError
        If required columns are missing.
    """
    if metric == "mse":
        if "rmse" not in df:
            raise KeyError("Cannot compute 'mse': 'rmse' column missing.")
        return float(np.nanmean(df["rmse"].to_numpy(dtype=float) ** 2))
    if metric not in df:
        raise KeyError(f"Metric '{metric}' not found in DataFrame.")
    return float(np.nanmean(df[metric].to_numpy(dtype=float)))


In [131]:
def pooled_co2_metrics(
    regressor,                  # fitted GroupwiseRegressor
    transformed_df: pd.DataFrame,
    y_col: str | None = None,
    group_col: str | None = None,
) -> dict:
    """
    Compute pooled (all bins together) out-of-sample metrics for CO2.

    Parameters
    ----------
    regressor : GroupwiseRegressor
        Must be fitted; `regressor.group_models_` is used per group.
    transformed_df : pd.DataFrame
        Contains features used by the regressor, the group column, and the true y.
        (Typically validation/test X after feature+binner, with y added).
    y_col : str, optional
        Target column name. Defaults to regressor.y_var.
    group_col : str, optional
        Group column name. Defaults to regressor.group_col.

    Returns
    -------
    dict
        {'r2','rmse','mae','mape','n_obs'} (NaNs if insufficient data).
    """
    y_col = y_col or regressor.y_var
    group_col = group_col or regressor.group_col
    if y_col not in transformed_df.columns:
        raise KeyError(f"'{y_col}' not found in transformed_df")
    if group_col not in transformed_df.columns:
        raise KeyError(f"'{group_col}' not found in transformed_df")

    preds = pd.Series(index=transformed_df.index, dtype=float)
    for g, gdf in transformed_df.groupby(group_col, sort=True):
        model = regressor.group_models_.get(g)
        if model is None:
            continue
        preds.loc[gdf.index] = model.predict(gdf)

    mask = preds.notna()
    n_obs = int(mask.sum())
    if n_obs == 0:
        return {"r2": np.nan, "rmse": np.nan, "mae": np.nan, "mape": np.nan, "n_obs": 0}

    y_true = transformed_df.loc[mask, y_col].to_numpy(dtype=float)
    y_pred = preds.loc[mask].to_numpy(dtype=float)

    # r2 can error for <2 samples or constant y
    try:
        r2 = float(r2_score(y_true, y_pred))
    except Exception:
        r2 = np.nan

    return {
        "r2": r2,
        "rmse": float(root_mean_squared_error(y_true, y_pred)),
        "mae": float(mean_absolute_error(y_true, y_pred)),
        "mape": float(mean_absolute_percentage_error(y_true, y_pred)),
        "n_obs": n_obs,
    }

In [132]:
def summarise_metrics_logs(
        train_logs: pd.DataFrame,
        val_logs: pd.DataFrame,
        test_logs: pd.DataFrame | None = None,
        user_pipeline: Pipeline = None,
        x_columns: list | None = None,
        random_state: int = 12,
        group_col_name: str = "group",
        pooled_metrics_by_split: dict[str, dict] | None = None,
        fd_me_metrics_by_split: dict[str, dict] | None = None,
        energy_weight_col: str = "energy_MWh",
) -> pd.DataFrame:
    """
    Summarise per-split, per-group metrics and pipeline metadata into a single-row DataFrame.

    This variant allows `test_logs` to be None (can skip test during tuning).

    Parameters
    ----------
    train_logs, val_logs : pd.DataFrame
        Metrics frames for train/validation.
    test_logs : pd.DataFrame or None, default None
        Test metrics; if None, test columns are omitted from the summary.
    user_pipeline : Pipeline
        The fitted or configured pipeline (used for metadata).
    x_columns : list, optional
        Feature names used by the model.
    random_state : int, default 12
        Random seed to record.
    group_col_name : str, default "group"
        Canonical name for the group column.
    pooled_metrics_by_split, fd_me_metrics_by_split : dict, optional
        Optional extra diagnostics keyed by split.
    energy_weight_col : str, default "energy_MWh"
        Column name to use for energy-weighted micro-averages if present.

    Returns
    -------
    pd.DataFrame
        One-row summary. Only includes split columns for the splits provided.
    """
    def _norm(df: pd.DataFrame) -> pd.DataFrame:
        if df is None or df.empty:
            return df

        cols = list(df.columns)

        # If desired already present, use it
        if group_col_name in cols:
            return df

        # If a plain 'group' exists, rename it to the desired name
        if "group" in cols:
            return df.rename(columns={"group": group_col_name})

        # Known aliases we can rename from
        candidates = [
            "multi_group_id",
            "quantile_group_id",
            "median_group_id",
            "original_quantile_group_id",
            "group_id",
        ]

        # Any *_group_id pattern
        pattern_hits = [c for c in cols if c.endswith("_group_id")]

        # Prefer known aliases in order
        for c in candidates:
            if c in cols:
                return df.rename(columns={c: group_col_name})

        # If exactly one *_group_id exists, use it
        if len(pattern_hits) == 1:
            return df.rename(columns={pattern_hits[0]: group_col_name})

        # Nothing we recognize → fail loudly with context
        raise KeyError(
            f"Could not locate a group column; expected '{group_col_name}' or any of "
            f"{[c for c in candidates if c in cols] + (['group'] if 'group' in cols else []) or candidates + ['group']}. "
            f"Available columns: {cols}"
        )
    splits: dict[str, pd.DataFrame] = {
        "train": _norm(train_logs.copy()),
        "validation": _norm(val_logs.copy()),
    }
    if test_logs is not None:
        splits["test"] = _norm(test_logs.copy())

    required = {"r2", "rmse", "mae", "mape", "n_obs"}
    for name, df in splits.items():
        missing = required.difference(df.columns)
        if missing:
            raise ValueError(f"{name} logs missing metrics: {sorted(missing)}")

    first = next(iter(splits.values()))
    model_id = first.get("model_id_hash", pd.Series([np.nan])).iloc[0]
    log_time = first.get("log_time", pd.Series([np.nan])).iloc[0]
    model_name = user_pipeline._final_estimator.__class__.__name__ if user_pipeline is not None else ""
    pipeline_steps = list(user_pipeline.named_steps.keys()) if user_pipeline is not None else []

    summary: dict[str, Any] = {
        "model_id_hash": model_id,
        "random_state": random_state,
        "params_json": json.dumps(
            user_pipeline.get_params(deep=True), sort_keys=True, separators=(",", ":"), default=str
        ) if user_pipeline is not None else "{}",
        "log_time": log_time,
        "model_name": model_name,
        "pipeline_steps": pipeline_steps,
        "pipeline_n_steps": len(pipeline_steps),
        "x_columns": x_columns or [],
        "metrics_by_group": {},
    }

    nested: dict[str, dict] = {}
    for split, df in splits.items():
        # macro means
        summary[f"r2_{split}"] = float(df["r2"].mean())
        summary[f"rmse_{split}"] = float(df["rmse"].mean())
        summary[f"mae_{split}"] = float(df["mae"].mean())
        summary[f"mape_{split}"] = float(df["mape"].mean())
        # counts should be sums, not means
        summary[f"n_obs_{split}"] = int(df["n_obs"].sum())
        summary[f"mse_{split}"] = float((df["rmse"] ** 2).mean())

        # micro by n_obs
        if df["n_obs"].sum() > 0:
            w = df["n_obs"].to_numpy(dtype=float)
            summary[f"r2_{split}_micro"] = float(np.average(df["r2"], weights=w))
            summary[f"rmse_{split}_micro"] = float(np.average(df["rmse"], weights=w))
            summary[f"mae_{split}_micro"] = float(np.average(df["mae"], weights=w))
            summary[f"mape_{split}_micro"] = float(np.average(df["mape"], weights=w))
        else:
            summary[f"r2_{split}_micro"] = np.nan
            summary[f"rmse_{split}_micro"] = np.nan
            summary[f"mae_{split}_micro"] = np.nan
            summary[f"mape_{split}_micro"] = np.nan

        # energy-weighted micro (if provided)
        if (energy_weight_col in df.columns) and (df[energy_weight_col].fillna(0).sum() > 0):
            wE = df[energy_weight_col].fillna(0).to_numpy(dtype=float)
            summary[f"r2_{split}_energy_micro"] = float(np.average(df["r2"], weights=wE))
            summary[f"rmse_{split}_energy_micro"] = float(np.average(df["rmse"], weights=wE))
            summary[f"mae_{split}_energy_micro"] = float(np.average(df["mae"], weights=wE))
            summary[f"mape_{split}_energy_micro"] = float(np.average(df["mape"], weights=wE))
            summary[f"{energy_weight_col}_{split}_total"] = float(wE.sum())
        else:
            summary[f"r2_{split}_energy_micro"] = np.nan
            summary[f"rmse_{split}_energy_micro"] = np.nan
            summary[f"mae_{split}_energy_micro"] = np.nan
            summary[f"mape_{split}_energy_micro"] = np.nan
            summary[f"{energy_weight_col}_{split}_total"] = 0.0

        cols = ["r2", "rmse", "mae", "mape", "n_obs"]
        if energy_weight_col in df.columns:
            cols.append(energy_weight_col)
        nested[split] = df.set_index(group_col_name)[cols].to_dict(orient="index")

    summary["metrics_by_group"] = nested

    pooled_metrics_by_split = pooled_metrics_by_split or {}
    fd_me_metrics_by_split = fd_me_metrics_by_split or {}
    for split in splits.keys():
        summary[f"pooled_co2_{split}"] = json.dumps(pooled_metrics_by_split.get(split, {}))
        summary[f"fd_me_{split}"] = json.dumps(fd_me_metrics_by_split.get(split, {}))

    return pd.DataFrame([summary])

### Transformers / Classes 

#### Feature Engineering Transformers

In [133]:
class AnalysisFeatureAdder(BaseEstimator, TransformerMixin):
    """
    Add core temporal and quantitative features used in the original analysis.

    Adds:
      - time_id:              HH-MM string from `timestamp_col`
      - <Q>_sqrd:             square of `demand_met_col`
      - log_<Q>:              log(demand_met + ε)
      - log_<Q>_sqrd:         (log_<Q>)^2
      - log_<CO2>:            log(tons_co2 + ε) (only if `co2_col` present)
    """

    def __init__(
        self,
        timestamp_col: str = "timestamp",
        demand_met_col: str = "demand_met",
        co2_col: str = "tons_co2",
        epsilon: float = 1e-6,
    ):
        """
        Parameters
        ----------
        timestamp_col : str
            Name of the datetime column (parseable by pandas).
        demand_met_col : str
            Name of the demand column.
        co2_col : str
            Name of the CO2 column (optional at transform time).
        epsilon : float, default 1e-6
            Small constant to avoid log(0).
        """
        if not isinstance(timestamp_col, str):
            raise ValueError("timestamp_col must be a string")
        if not isinstance(demand_met_col, str):
            raise ValueError("demand_met_col must be a string")
        if not isinstance(co2_col, str):
            raise ValueError("co2_col must be a string")
        if not isinstance(epsilon, (float, int)):
            raise ValueError("epsilon must be a float or int")

        self.timestamp_col = timestamp_col
        self.demand_met_col = demand_met_col
        self.co2_col = co2_col
        self.epsilon = float(epsilon)

    def fit(self, X, y=None):
        if not isinstance(X, pd.DataFrame):
            raise TypeError("Input must be a pandas DataFrame")
        for col in [self.timestamp_col, self.demand_met_col]:
            if col not in X.columns:
                raise ValueError(f"Missing required column '{col}' in input DataFrame")
        self.n_features_in_ = X.shape[1]
        self.is_fitted_ = True
        return self

    def transform(self, X: pd.DataFrame, y: pd.Series | None = None) -> pd.DataFrame:
        """
        Parameters
        ----------
        X : pd.DataFrame
            Must contain `timestamp_col` and `demand_met_col`.

        Returns
        -------
        pd.DataFrame
            Copy of X with additional feature columns.
        """
        if not isinstance(X, pd.DataFrame):
            raise TypeError("Input must be a pandas DataFrame")

        df = X.copy()

        for col in [self.timestamp_col, self.demand_met_col]:
            if col not in df.columns:
                raise ValueError(f"Missing required column '{col}'")

        df[self.timestamp_col] = pd.to_datetime(df[self.timestamp_col], errors="coerce")
        if df[self.timestamp_col].isna().any():
            raise ValueError(f"Column '{self.timestamp_col}' contains non-parseable datetimes")

        # temporal
        df["time_id"] = df[self.timestamp_col].dt.strftime("%H-%M").astype("string")

        # quantitative
        q = self.demand_met_col
        df[f"{q}_sqrd"] = df[q] ** 2
        df[f"log_{q}"] = np.log(df[q] + self.epsilon)
        df[f"log_{q}_sqrd"] = df[f"log_{q}"] ** 2

        if self.co2_col in df.columns:
            df[f"log_{self.co2_col}"] = np.log(df[self.co2_col] + self.epsilon)

        return df

    def get_feature_names_out(self, input_features=None):
        base = []
        base.append("time_id")
        base += [
            f"{self.demand_met_col}_sqrd",
            f"log_{self.demand_met_col}",
            f"log_{self.demand_met_col}_sqrd",
        ]
        # optional; only present if co2 is in input
        base.append(f"log_{self.co2_col}")
        if input_features is not None:
            return np.array(list(input_features) + base)
        return np.array(base)


In [134]:
class DateTimeFeatureAdder(BaseEstimator, TransformerMixin):
    """
    Add datetime-based features from a timestamp column.

    New columns:
      - year (int)
      - month (int)
      - week_of_year (ISO week, int)
      - day (int)
      - hour (int)
      - half_hour (0..47, int)
      - day_of_week (1=Mon..7=Sun, int)
      - is_weekend (0/1, int)


    Parameters
    ----------
    timestamp_col : str, default="timestamp"
        Name of the column containing datetime strings or pd.Timestamp.
    drop_original : bool, default=True
        Whether to drop the original timestamp column after extraction.

    Raises
    ------
    TypeError
        If `timestamp_col` is not found in the DataFrame.
    KeyError
        If `timestamp_col` is not present in X.
\
    """
    def __init__(
        self,
        timestamp_col: str = "timestamp",
        drop_original: bool = False,
    ):
        """
        Initialize the feature adder.

        Parameters
        ----------
        timestamp_col : str
            Column name to parse as datetime.
        """
        if not isinstance(timestamp_col, str):
            raise TypeError("timestamp_col must be a string.")
        if not isinstance(drop_original, bool):
            raise TypeError("drop_original must be a bool.")
        self.timestamp_col = timestamp_col
        self.drop_original = drop_original


    def fit(self, X, y=None):
        """
        No-op fit. Exists for sklearn compatibility.

        Returns
        -------
        self : DateTimeFeatureAdder
        """
        if not isinstance(X, pd.DataFrame):
            raise TypeError("Input X must be a pandas DataFrame.")
        if self.timestamp_col not in X.columns:
            raise KeyError(f"Column '{self.timestamp_col}' not found in DataFrame.")

        self.n_features_in_ = X.shape[1]
        self.is_fitted_ = True
        return self

    def transform(self, X: pd.DataFrame) -> pd.DataFrame:
        """
        Transform X by adding:

        - year (int)
        - month (int)
        - week_of_year (int)
        - day (int)
        - hour (int)
        - half_hour (int, 0-47)
        - day_of_week (int, 1=Mon)
        - is_weekend (0/1)

        Parameters
        ----------
        X : pd.DataFrame
            Input DataFrame with a column named `self.timestamp_col`.

        Returns
        -------
        X_out : pd.DataFrame
            Copy of X with the above new columns appended.

        Raises
        ------
        KeyError
            If `self.timestamp_col` is not present in X.
        """
        df = X.copy()
        # Attempt to convert the timestamp column to datetime (if not already)
        try:
            df[self.timestamp_col] = pd.to_datetime(df[self.timestamp_col], errors='raise')
        except Exception as e:
            raise TypeError(f"Column '{self.timestamp_col}' could not be converted to datetime: {e}")

        dt = df[self.timestamp_col]
        df["year"] = dt.dt.year.astype('int32')
        df["month"] = dt.dt.month.astype('int32')
        df["week_of_year"] = dt.dt.isocalendar().week.astype('int32')
        df["day"] = dt.dt.day.astype('int32')
        df["hour"] = dt.dt.hour.astype('int32')
        df["half_hour"]    = (dt.dt.hour * 2 + (dt.dt.minute // 30)).astype("int32")
        df["day_of_week"] = (dt.dt.dayofweek).astype('int32') + 1  # Monday=1
        df["is_weekend"] = (df["day_of_week"] >= 6).astype('int32')

        if self.drop_original:
            df = df.drop(columns=[self.timestamp_col])

        return df

    def get_feature_names_out(self, input_features=None):
        """
        Get the names of the output features.

        Parameters
        ----------
        input_features : array-like, optional
            The input feature names. If None, the original feature names are used.

        Returns
        -------
        np.ndarray
            The output feature names.
        """
        added = ["year","month","week_of_year","day","hour","half_hour","day_of_week","is_weekend"]
        if self.drop_original or input_features is None:
            base = [] if input_features is None else [c for c in input_features if c != self.timestamp_col]
        else:
            base = list(input_features)
        return np.array(base + added, dtype=object)

In [135]:
class GenerationShareAdder(BaseEstimator, TransformerMixin):
    """
    Add percentage‐share features for specified generation columns relative to a total.

    Parameters
    ----------
    generation_cols : List[str]
        Columns whose shares of `total_col` are computed.
    total_col : str, default="total_generation"
        Denominator column.
    suffix : str, default="_share"
        Suffix appended to new share columns.
    as_percent : bool, default=True
        If True, multiply shares by 100; otherwise keep as 0..1 fraction.
    clip_0_100 : bool, default=False
        If True and `as_percent=True`, clip results into [0, 100].
        If True and `as_percent=False`, clip into [0, 1].

    Raises
    ------
    TypeError
        Bad argument types.
    KeyError
        Missing `generation_cols` or `total_col`.
    """

    def __init__(
        self,
        generation_cols: List[str],
        total_col: str = "total_generation",
        suffix: str = "_share",
        as_percent: bool = True,
        clip_0_100: bool = False,
    ):
        """
        Initialize the share adder.

        Parameters
        ----------
        generation_cols : List[str]
            Columns to convert into percentage shares.
        total_col : str
            Column used as the denominator in share calculation.
        suffix : str
            Suffix for the new share columns.

        Raises
        ------
        TypeError
            If `generation_cols` is not a list of strings, or if `total_col` or `suffix` are not strings.
        """
        if not isinstance(generation_cols, list) or not all(isinstance(col, str) for col in generation_cols):
            raise TypeError("generation_cols must be a list of strings.")
        if not isinstance(total_col, str):
            raise TypeError("total_col must be a string.")
        if not isinstance(suffix, str):
            raise TypeError("suffix must be a string.")
        if not isinstance(as_percent, bool):
            raise TypeError("as_percent must be a bool.")
        if not isinstance(clip_0_100, bool):
            raise TypeError("clip_0_100 must be a bool.")

        self.generation_cols = generation_cols
        self.total_col = total_col
        self.suffix = suffix
        self.as_percent = as_percent
        self.clip_0_100 = clip_0_100

    def fit(self, X, y=None):
        """
        No‐op fit for compatibility with sklearn’s transformer API.

        Parameters
        ----------
        X : pd.DataFrame
            Input DataFrame.
        y : Ignored

        Returns
        -------
        self : GenerationShareAdder

        Raises
        ------
        TypeError
            If `X` is not a pandas DataFrame.
        KeyError
            If any of the specified `generation_cols` or `total_col` is not present in the DataFrame.
        """
        if not isinstance(X, pd.DataFrame):
            raise TypeError("Input X must be a pandas DataFrame.")
        missing_cols = [col for col in self.generation_cols if col not in X.columns]
        if missing_cols:
            raise KeyError(f"Generation columns {missing_cols} not found in input DataFrame.")
        if self.total_col not in X.columns:
            raise KeyError(f"Total column '{self.total_col}' not found in input DataFrame.")
        return self


    def transform(self, X: pd.DataFrame) -> pd.DataFrame:
        """
        Compute and append share columns.

        For each `col` in `generation_cols`, creates a new column
        `col + suffix` = 100 * (X[col] / X[total_col]). Zeros in `total_col`
        are treated as NaN to avoid division‐by‐zero.

        Parameters
        ----------
        X : pd.DataFrame
            Input DataFrame containing `generation_cols` and `total_col`.

        Returns
        -------
        X_out : pd.DataFrame
            Copy of X with additional `<col><suffix>` columns.

        """
        df = X.copy()
        # avoid integer division & div-by-zero
        total = df[self.total_col].astype("float64").replace({0.0: np.nan})
        scale = 100.0 if self.as_percent else 1.0

        for col in self.generation_cols:
            share_col = f"{col}{self.suffix}"
            df[share_col] = (df[col].astype("float64") / total) * scale
            if self.clip_0_100:
                lo, hi = (0.0, 100.0) if self.as_percent else (0.0, 1.0)
                df[share_col] = df[share_col].clip(lower=lo, upper=hi)

        return df


    def get_feature_names_out(self, input_features=None):
        added = [f"{c}{self.suffix}" for c in self.generation_cols]
        base = [] if input_features is None else list(input_features)
        return np.array(base + added, dtype=object)


#### Multi-Quantile Binner

In [136]:
class MultiQuantileBinner(BaseEstimator, TransformerMixin):
    """
    Quantile bin multiple variables, then combine their per-variable bin IDs into
    a single mixed-radix group ID (1-based).

    Example: with bin_specs={'v1':5, 'v2':4}:
      - Fit stores quantile edges for each var.
      - Transform assigns v1_group∈{1..5}, v2_group∈{1..4},
        then builds group_col_name = 1 + (v1_group-1)*4 + (v2_group-1)*1.
    """

    def __init__(
        self,
        bin_specs: dict[str, int],
        group_col_name: str = "quantile_group_id",
        retain_flags: bool = True,
        oob_policy: str = "clip",
        max_oob_rate: float | None = None,
    ):
        """
        Parameters
        ----------
        bin_specs : dict[str, int]
            Mapping of variable -> # of quantile bins (positive integers).
        group_col_name : str, default "quantile_group_id"
            Output column for the combined mixed-radix group ID (1-based).
        retain_flags : bool, default True
            If True, keep per-variable `<var>_group` columns.
        oob_policy : {"clip","edge","error"}, default "clip"
            Handling for values falling outside learned edges at transform time:
              - "clip": send to nearest bin (1 or max)
              - "edge": send to the first bin
              - "error": raise ValueError
        max_oob_rate : float or None, default None
            If set, raise an error when an individual variable sees
            OOB rate > max_oob_rate during transform.
        """
        if not isinstance(bin_specs, dict) or not bin_specs:
            raise ValueError("bin_specs must be a non-empty dict")
        if oob_policy not in {"clip", "edge", "error"}:
            raise ValueError("oob_policy must be one of {'clip','edge','error'}")

        self.bin_specs = self.validate_and_convert_bins(bin_specs)
        self.group_col_name = str(group_col_name)
        self.retain_flags = bool(retain_flags)
        self.oob_policy = oob_policy
        self.max_oob_rate = max_oob_rate

        self.variables_: list[str] | None = None
        self.quantile_edges_: dict[str, list[float]] = {}
        self.bin_sizes_: dict[str, int] = {}
        self.multipliers_: list[int] | None = None
        self.oob_counts_: dict[str, int] = {}

    def fit(self, X: pd.DataFrame, y=None):
        """
        Learn quantile edges for each variable.

        Parameters
        ----------
        X : pd.DataFrame
            Must contain all variables in `bin_specs`.
        """
        self.variables_ = list(self.bin_specs.keys())
        self.quantile_edges_.clear()
        self.bin_sizes_.clear()
        self.oob_counts_.clear()

        eps = 1e-4
        for var in self.variables_:
            n_bins = self.bin_specs[var]
            if var not in X.columns:
                raise ValueError(f"Column '{var}' not found in X")
            qs = np.linspace(0, 1, n_bins + 1)
            raw = X[var].quantile(qs, interpolation="midpoint").values
            vmin, vmax = X[var].min(), X[var].max()
            edges = np.unique(np.concatenate([[vmin - eps], raw, [vmax + eps]]))
            edges.sort()
            self.quantile_edges_[var] = edges.tolist()
            self.bin_sizes_[var] = len(edges) - 1

        bases = [self.bin_sizes_[v] for v in self.variables_]
        m = [1]
        for b in reversed(bases[1:]):
            m.insert(0, m[0] * b)
        self.multipliers_ = m
        return self

    def transform(self, X: pd.DataFrame) -> pd.DataFrame:
        """
        Assign per-variable quantile bins and the combined group ID.

        Returns
        -------
        pd.DataFrame
            X plus `<var>_group` (optional) and `group_col_name`.
        """
        if not self.quantile_edges_:
            raise RuntimeError("Must fit binner before transform()")
        df = X.copy()
        self.oob_counts_ = {var: 0 for var in self.variables_}

        for var in self.variables_:
            edges = self.quantile_edges_[var]
            n = len(edges) - 1
            s = pd.cut(df[var], bins=edges, labels=range(1, n + 1), include_lowest=True, right=True)

            if s.isna().any():
                n_oob = int(s.isna().sum())
                self.oob_counts_[var] += n_oob
                if self.oob_policy == "error":
                    bad = df.loc[s.isna(), var].unique()
                    raise ValueError(f"OOB values for '{var}': {bad[:10]} ...")
                elif self.oob_policy == "clip":
                    below = df[var] < edges[1]
                    s = s.astype("Float64")
                    s.loc[s.isna() & below] = 1
                    s.loc[s.isna() & ~below] = n
                    s = s.astype("Int64")
                else:  # "edge"
                    s = s.fillna(1)

            df[f"{var}_group"] = s.astype(int)

        total = len(df)
        if self.max_oob_rate is not None and total > 0:
            for var, cnt in self.oob_counts_.items():
                rate = cnt / total
                if rate > self.max_oob_rate:
                    raise ValueError(
                        f"OOB rate {rate:.2%} exceeds max_oob_rate={self.max_oob_rate:.2%} for '{var}'"
                    )

        df[self.group_col_name] = 1
        for v, m in zip(self.variables_, self.multipliers_):
            df[self.group_col_name] += (df[f"{v}_group"] - 1) * m

        if not self.retain_flags:
            df.drop(columns=[f"{v}_group" for v in self.variables_], inplace=True)

        return df

    @staticmethod
    def validate_and_convert_bins(bin_specs: dict) -> dict[str, int]:
        converted: dict[str, int] = {}
        for k, v in bin_specs.items():
            try:
                v_int = int(float(v))
                if v_int != float(v) or v_int <= 0:
                    raise ValueError
                converted[str(k)] = v_int
            except (ValueError, TypeError) as e:
                raise TypeError(f"Bin spec '{k}' value '{v}' must be a positive integer") from e
        return converted

    def get_feature_names_out(self, input_features=None):
        names = []
        if self.retain_flags and self.variables_:
            names += [f"{v}_group" for v in self.variables_]
        names.append(self.group_col_name)
        if input_features is not None:
            return np.array(list(input_features) + names)
        return np.array(names)


#### Multi-Median Binner

In [137]:
class MultiMedianBinner(BaseEstimator, TransformerMixin):
    """
    Median-split each variable and combine flags into a 1-based group ID.
    """

    def __init__(self, variables: list[str], group_col_name: str = "median_group_id", retain_flags: bool = True):
        if not isinstance(variables, list) or len(variables) == 0:
            raise ValueError("`variables` must be a non-empty list of column names.")
        if any(not isinstance(v, str) for v in variables):
            raise TypeError("All entries in `variables` must be strings.")
        if not isinstance(group_col_name, str) or not group_col_name:
            raise TypeError("`group_col_name` must be a non-empty string.")
        if not isinstance(retain_flags, bool):
            raise TypeError("`retain_flags` must be a boolean value.")

        self.variables = variables
        self.group_col_name = group_col_name
        self.retain_flags = retain_flags
        self.medians_: dict[str, float] = {}

    def fit(self, X, y=None):
        if not isinstance(X, pd.DataFrame):
            raise TypeError("Input X must be a pandas DataFrame.")
        missing = [v for v in self.variables if v not in X.columns]
        if missing:
            raise ValueError(f"Columns not found in input DataFrame: {missing}")
        self.medians_ = X[self.variables].median(skipna=True).to_dict()
        return self

    def transform(self, X):
        """
        Returns
        -------
        pd.DataFrame
            Copy of X with optional `<var>_group` flags (0/1) and `group_col_name`.
        """
        if not self.medians_:
            raise RuntimeError("Must call fit() before transform().")
        if not isinstance(X, pd.DataFrame):
            raise TypeError("Input X must be a pandas DataFrame.")
        missing = [v for v in self.variables if v not in X.columns]
        if missing:
            raise ValueError(f"Columns missing at transform time: {missing}")

        df = X.copy()
        # compare each column to its scalar median (aligned by column name)
        flags = (df[self.variables] > pd.Series(self.medians_)).astype(int)

        multipliers = 2 ** np.arange(len(self.variables))[::-1]
        df[self.group_col_name] = flags.values.dot(multipliers) + 1

        if self.retain_flags:
            for var in self.variables:
                df[f"{var}_group"] = flags[var]

        return df

    def get_feature_names_out(self, input_features=None):
        names = []
        if self.retain_flags:
            names += [f"{v}_group" for v in self.variables]
        names.append(self.group_col_name)
        if input_features is not None:
            return np.array(list(input_features) + names)
        return np.array(names)


#### GroupwiseRegressor

In [138]:
class GroupwiseRegressor(BaseEstimator, TransformerMixin):
    """
    Runs separate OLS regressions in each group and computes marginal emission factors.

    For each group k, we fit:
        y_t = α₁ₖ · x₁_t + α₂ₖ · x₂_t + Σ β_i·C(f_i)_t + ε_t
    and compute the marginal effect:
        ME_t = ∂y_t/∂x₁_t = α₁ₖ + 2·α₂ₖ·x₁_t.

    Parameters
    ----------
    y_var : str
        Target column name (e.g. 'tons_co2').
    x_vars : List[str]
        Predictor columns; first is Q, second is Q².
    fe_vars : List[str], optional
        Categorical fixed-effect columns.
    group_col : str
        Column with integer group IDs.
    min_group_size : int
        Minimum observations per group to run regression.
    track_metrics : bool
        If True, store per-group models and metrics.
    verbose : bool
        If True, log progress and metrics.

    Attributes
    ----------
    group_models_ : dict
        Fitted statsmodels results per group (if track_metrics=True).
    group_metrics_ : dict
        Computed metrics per group (if track_metrics=True).
    """
    def __init__(
        self,
        y_var: str = "tons_co2",
        x_vars: List[str] = ["total_generation", "total_generation_sqrd"],
        fe_vars: Optional[List[str]] = None,
        group_col: str = "k",
        min_group_size: int = 10,
        track_metrics: bool = True,
        verbose: bool = True,
        random_state: int | None = 12,
    ):
        if not isinstance(y_var, str):
            raise TypeError("y_var must be a string")
        if not isinstance(x_vars, list) or not x_vars or not all(isinstance(v, str) for v in x_vars):
            raise TypeError("x_vars must be a non-empty list of strings")
        if fe_vars is not None and (not isinstance(fe_vars, list) or not all(isinstance(v, str) for v in fe_vars)):
            raise TypeError("fe_vars must be a list of strings or None")
        if not isinstance(group_col, str):
            raise TypeError("group_col must be a string")
        if not isinstance(min_group_size, int) or min_group_size < 1:
            raise ValueError("min_group_size must be a positive integer")
        if not isinstance(track_metrics, bool):
            raise TypeError("track_metrics must be a boolean")
        if not isinstance(verbose, bool):
            raise TypeError("verbose must be a boolean")

        self.y_var = y_var
        self.x_vars = x_vars
        self.fe_vars = fe_vars or []
        self.group_col = group_col
        self.min_group_size = min_group_size
        self.track_metrics = track_metrics
        self.verbose = verbose
        self.random_state = random_state
        if self.track_metrics:
            self.group_models_: dict[Any, Any] = {}
            self.group_metrics_: dict[Any, dict[str, float]] = {}

    def fit(self, X, y=None):
        if self.random_state is not None:
            np.random.seed(self.random_state)
        if not isinstance(X, pd.DataFrame):
            raise TypeError("X must be a pandas DataFrame")
        if y is None:
            raise ValueError("y must be provided for fitting")
        if len(X) != len(y):
            raise ValueError(f"X and y have different lengths: {len(X)} != {len(y)}")

        df = X.copy()
        df[self.y_var] = np.asarray(y).reshape(-1)

        # avoid uint in formula design matrix
        uint_cols = [c for c in df.columns if str(df[c].dtype).startswith(("uint", "UInt"))]
        if uint_cols:
            df[uint_cols] = df[uint_cols].astype("int64")

        if self.track_metrics:
            self.group_models_.clear()
            self.group_metrics_.clear()

        # cast FEs to ordered categoricals
        if "month" in self.fe_vars:
            df["month"] = pd.Categorical(df["month"].astype(int), categories=range(1, 13), ordered=True)
        if "hour" in self.fe_vars:
            df["hour"] = pd.Categorical(df["hour"].astype(int), categories=range(24), ordered=True)
        if "day_of_week" in self.fe_vars:
            df["day_of_week"] = pd.Categorical(df["day_of_week"].astype(int), categories=range(1, 8), ordered=True)
        if "week_of_year" in self.fe_vars:
            df["week_of_year"] = pd.Categorical(df["week_of_year"].astype(int), categories=range(1, 54), ordered=True)
        if "half_hour" in self.fe_vars:
            df["half_hour"] = pd.Categorical(df["half_hour"].astype(int), categories=range(0, 48), ordered=True)

        self._fitted_groups: list[Any] = []

        for grp, df_grp in df.groupby(self.group_col, sort=True):
            n = len(df_grp)
            if n < self.min_group_size:
                if self.verbose:
                    logging.warning(f"Skipping group {grp!r}: only {n} < {self.min_group_size}")
                continue

            reg = " + ".join(self.x_vars)
            fe = " + ".join(f"C({f})" for f in self.fe_vars)
            formula = f"{self.y_var} ~ {reg}" + (f" + {fe}" if fe else "")

            model = smf.ols(formula, data=df_grp).fit()
            self._fitted_groups.append(grp)

            if self.track_metrics:
                preds = model.predict(df_grp)
                rmse = float(np.sqrt(np.mean((preds - df_grp[self.y_var]) ** 2)))
                mae = float(np.mean(np.abs(preds - df_grp[self.y_var])))
                mape = float(mean_absolute_percentage_error(df_grp[self.y_var], preds))
                self.group_models_[grp] = model
                self.group_metrics_[grp] = {
                    "r2": float(model.rsquared),
                    "rmse": rmse,
                    "mae": mae,
                    "mape": mape,
                    "n_obs": int(n),
                }

        if not self._fitted_groups:
            raise ValueError("No valid groups found for fitting.")
        return self

    def transform(self, X: pd.DataFrame) -> pd.DataFrame:
        """
        Apply groupwise OLS and compute marginal effects ME_t.

        Parameters
        ----------
        X : pd.DataFrame
            Must contain y_var, x_vars, fe_vars, and group_col.

        Returns
        -------
        pd.DataFrame
            Original rows plus 'alpha1', 'alpha2', and 'ME'.

        Raises
        ------
        TypeError
            If X is not a pandas DataFrame.
        ValueError
            If required columns missing or no group qualifies.
        """
        if not getattr(self, "group_models_", None):
            raise RuntimeError("GroupwiseRegressor must be fit before transform/predict.")
        if not isinstance(X, pd.DataFrame):
            raise TypeError("Input X must be a pandas DataFrame")

        df = X.copy()

        # keep FE category casting consistent with fit
        if "month" in self.fe_vars:
            df["month"] = pd.Categorical(df["month"].astype(int), categories=range(1, 13), ordered=True)
        if "hour" in self.fe_vars:
            df["hour"] = pd.Categorical(df["hour"].astype(int), categories=range(24), ordered=True)
        if "day_of_week" in self.fe_vars:
            df["day_of_week"] = pd.Categorical(df["day_of_week"].astype(int), categories=range(1, 8), ordered=True)
        if "week_of_year" in self.fe_vars:
            df["week_of_year"] = pd.Categorical(df["week_of_year"].astype(int), categories=range(1, 54), ordered=True)
        if "half_hour" in self.fe_vars:
            df["half_hour"] = pd.Categorical(df["half_hour"].astype(int), categories=range(0, 48), ordered=True)

        df["alpha1"] = np.nan
        df["alpha2"] = np.nan
        df["ME"] = np.nan

        for grp, df_grp in df.groupby(self.group_col, sort=True):
            model = self.group_models_.get(grp)
            if model is None:
                continue
            a1 = model.params.get(self.x_vars[0], np.nan)
            a2 = model.params.get(self.x_vars[1], 0.0)
            idx = df_grp.index

            df.loc[idx, "alpha1"] = a1
            df.loc[idx, "alpha2"] = a2
            df.loc[idx, "ME"] = a1 + 2.0 * a2 * df_grp[self.x_vars[0]]

        return df


    def predict(self, X: pd.DataFrame, predict_type: str = "ME") -> pd.Series:
        """
        Predict marginal effects (default) or CO2 for each row in X using fitted group models.

        Parameters
        ----------
        X : pd.DataFrame
            Must contain x_vars, fe_vars, and group_col.
        predict_type : {"ME","y"}, default "ME"
            "ME": return α1 + 2*α2*Q
            "y" : return model.predict(...) (CO2)

        Returns
        -------
        pd.Series
            Predictions aligned to X.index.
        """

        if not getattr(self, "group_models_", None):
            raise RuntimeError("GroupwiseRegressor must be fit before predict().")
        if not isinstance(X, pd.DataFrame):
            raise TypeError("X must be a pandas DataFrame")

        required = self.x_vars + self.fe_vars + [self.group_col]
        missing = [c for c in required if c not in X.columns]
        if missing:
            raise ValueError(f"Missing columns in input DataFrame: {missing}")

        df = X.copy()
        # consistent FE casting
        if "month" in self.fe_vars:
            df["month"] = pd.Categorical(df["month"].astype(int), categories=range(1, 13), ordered=True)
        if "hour" in self.fe_vars:
            df["hour"] = pd.Categorical(df["hour"].astype(int), categories=range(24), ordered=True)
        if "day_of_week" in self.fe_vars:
            df["day_of_week"] = pd.Categorical(df["day_of_week"].astype(int), categories=range(1, 8), ordered=True)
        if "week_of_year" in self.fe_vars:
            df["week_of_year"] = pd.Categorical(df["week_of_year"].astype(int), categories=range(1, 54), ordered=True)
        if "half_hour" in self.fe_vars:
            df["half_hour"] = pd.Categorical(df["half_hour"].astype(int), categories=range(0, 48), ordered=True)

        out = pd.Series(index=df.index, dtype=float)

        for grp, df_grp in df.groupby(self.group_col, sort=True):
            model = self.group_models_.get(grp)
            if model is None:
                continue

            if predict_type == "y":
                preds = model.predict(df_grp)
            else:
                a1 = model.params.get(self.x_vars[0], np.nan)
                a2 = model.params.get(self.x_vars[1], 0.0)
                Q = df_grp[self.x_vars[0]]
                preds = a1 + 2.0 * a2 * Q

            out.loc[df_grp.index] = preds

        return out

    def get_metrics(self, summarise: bool = True) -> Union[dict, pd.DataFrame]:
        """
        Get the metrics for each group.

        Parameters
        ----------
        summarise : bool, default=True
            If True, return a summary DataFrame; otherwise return raw metrics dict.

        Returns
        -------
        dict or pd.DataFrame
            If summarise=True, returns a DataFrame with group metrics.
            If False, returns the raw metrics dictionary.

        Raises
        ------
        RuntimeError
            If track_metrics was not set to True during initialization.
        """
        if not self.track_metrics:
            raise RuntimeError("Metrics tracking is disabled. Set track_metrics=True to enable.")
        if summarise:
            df = pd.DataFrame.from_dict(self.group_metrics_, orient="index")
            df.index.name = self.group_col
            df.reset_index(inplace=True)
            return df
        return self.group_metrics_

### Running Models

#### Utilities

In [139]:
def _apply_fitted_preprocessing(user_pipeline: Pipeline, X: pd.DataFrame) -> pd.DataFrame:
    """
    Apply all *already-fitted* steps in a pipeline except the final estimator,
    without constructing a new sklearn Pipeline (avoids 'Pipeline not fitted' warnings).

    Parameters
    ----------
    user_pipeline : Pipeline
        A pipeline that has already been fitted (on train) and whose final step
        is the estimator (e.g., GroupwiseRegressor).
    X : pd.DataFrame
        Raw features to transform through the fitted preprocessing steps.

    Returns
    -------
    pd.DataFrame
        The transformed features as a DataFrame. If a transformer returns a numpy array,
        we try to retrieve column names via `get_feature_names_out()`; otherwise we fall
        back to the original column names.
    """
    Z = X
    last_transformer = None

    for _, step in user_pipeline.steps[:-1]:
        if hasattr(step, "transform"):
            Z = step.transform(Z)
            last_transformer = step

    if isinstance(Z, pd.DataFrame):
        return Z

    # Try to recover column names
    cols = None
    try:
        cols = user_pipeline[:-1].get_feature_names_out()  # type: ignore[index]
    except Exception:
        try:
            if last_transformer is not None and hasattr(last_transformer, "get_feature_names_out"):
                cols = last_transformer.get_feature_names_out()  # type: ignore[assignment]
        except Exception:
            cols = None

    if cols is None:
        cols = X.columns
    return pd.DataFrame(Z, index=X.index, columns=list(cols))


In [140]:
def compute_me_for_split(
    fitted_pipeline: Pipeline,
    X: pd.DataFrame,
    split_name: str | None = None,
    id_cols: list[str] = ("timestamp", "city"),
    include_params: bool = True,
    keep_cols: list[str] = ("demand_met", "tons_co2"),
) -> pd.DataFrame:
    """
    Use a FITTED pipeline to compute marginal emissions (ME) for a single features DataFrame.

    Parameters
    ----------
    fitted_pipeline : Pipeline
        A pipeline that has already been fit on the training data. Its final step must be
        GroupwiseRegressor, whose transform adds 'ME' (and 'alpha1','alpha2').
    X : pd.DataFrame
        Feature table to transform. Must include the columns required by the pipeline’s
        feature steps and binner (e.g., weather vars), plus any IDs you want to keep.
    split_name : str, optional
        If provided, a 'split' column is added with this value ('train'/'validation'/'test'/etc).
    id_cols : list[str], default ('timestamp','city')
        Identifier columns to carry into the output if present in `X` after transform.
    include_params : bool, default True
        If True, also include 'alpha1' and 'alpha2' in the output.
    keep_cols : list[str], default ('demand_met','tons_co2')
        Additional columns to include if present (useful for diagnostics).

    Returns
    -------
    pd.DataFrame
        One row per input row with at least: id_cols ∩ columns, 'ME', and optionally
        'alpha1','alpha2', the regressor’s group column, keep_cols, and 'split'.
    """
    # Transform through all steps → last step (GroupwiseRegressor) computes ME
    out = fitted_pipeline.transform(X)

    # Final estimator for group column name
    reg = getattr(fitted_pipeline, "_final_estimator", None)
    gcol = getattr(reg, "group_col", None)

    # Build column list in a safe, present-only way
    cols: list[str] = [c for c in id_cols if c in out.columns]
    if "ME" not in out.columns:
        raise RuntimeError("Pipeline transform did not produce 'ME'. Was the final estimator fitted?")
    cols.append("ME")

    if include_params:
        for c in ("alpha1", "alpha2"):
            if c in out.columns:
                cols.append(c)

    if gcol and gcol in out.columns:
        cols.append(gcol)

    for c in keep_cols:
        if c in out.columns and c not in cols:
            cols.append(c)

    result = out[cols].copy()
    if split_name is not None:
        result["split"] = split_name
    return result

In [141]:
def evaluate_on_split(
        regression_model: GroupwiseRegressor,
        full_df: pd.DataFrame
) -> pd.DataFrame:
    """
    After pipeline.transform → full_df with group IDs & original y_var,
    compute per‑group r2/rmse/mae/n_obs using reg.group_models_.

    Parameters
    ----------
    reg : GroupwiseRegressor
        Fitted GroupwiseRegressor instance with group_models_ populated.
    full_df : pd.DataFrame
        DataFrame containing the original y_var and group_col.

    Returns
    -------
    pd.DataFrame
        DataFrame with group metrics: r2, rmse, mae, n_obs.
    """
    df = full_df.copy()
    gcol = regression_model.group_col
    yname = regression_model.y_var

    if gcol not in df.columns or yname not in df.columns:
        missing = [c for c in (gcol, yname) if c not in df.columns]
        raise KeyError(f"Required columns missing: {missing}")

    # Use the regressor's predict to ensure FE category handling is consistent
    y_true = df[yname]
    y_pred = regression_model.predict(df, predict_type="y")

    rows = []
    for grp, idx in df.groupby(gcol).groups.items():
        yt = y_true.loc[idx]
        yp = y_pred.loc[idx].dropna()
        # align just in case
        yt = yt.loc[yp.index]
        if len(yt) == 0:
            continue
        rows.append({
            "group": grp,
            "r2": r2_score(yt, yp),
            "rmse": root_mean_squared_error(yt, yp),
            "mae": mean_absolute_error(yt, yp),
            "mape": mean_absolute_percentage_error(yt, yp),
            "n_obs": int(len(yt)),
        })

    mdf = pd.DataFrame(rows)
    if mdf.empty:
        # return empty with expected columns
        mdf = pd.DataFrame(columns=["group","r2","rmse","mae","mape","n_obs"])
    return mdf

In [142]:
def fit_and_export_marginal_emissions(
    pipeline: Pipeline,
    x_splits: dict,
    y_splits: dict,
    out_parquet_path: str,
    *,
    id_cols: list[str] = ("timestamp", "city"),
    include_params: bool = True,
    keep_cols: list[str] = ("demand_met", "tons_co2"),
    order_splits: list[str] = ("train", "validation", "test"),
    save_mode: str = "single",              # "single" | "per_split"
    compression: str | None = "snappy",     # passed to pandas.to_parquet
    return_df: bool = True,                 # set False on huge runs
) -> pd.DataFrame:
    """
    Fit the pipeline on the train split, compute marginal emissions (ME) for each split,
    concatenate, and save to a single Parquet file.

    Parameters
    ----------
    pipeline : Pipeline
        Your full pipeline: [FeatureAddition → Binner → GroupwiseRegressor].
    x_splits : dict
        Feature splits, e.g. {"train": X_train, "validation": X_val, "test": X_test}.
    y_splits : dict
        Target splits, e.g. {"train": y_train, "validation": y_val, "test": y_test}.
        Only the train target is used for fitting; others are not needed for transform.
    out_parquet_path : str
        File path for the output Parquet dataset.
    id_cols : list[str], default ('timestamp','city')
        Identifier columns to include if present.
    include_params : bool, default True
        Include 'alpha1' and 'alpha2' in the export.
    keep_cols : list[str], default ('demand_met','tons_co2')
        Additional useful columns to include if present.
    order_splits : list[str], default ('train','validation','test')
        Order in which to compute and stack splits.
    save_mode : {"single","per_split"}, default "single"
        Use "per_split" on HPC/MPI (let rank 0 write or give each rank a different path).
    compression : str or None, default "snappy"
        Parquet compression codec (requires pyarrow/fastparquet support).
    return_df : bool, default True
        If False, skip building the concatenated DataFrame in memory.


    Returns
    -------
    pd.DataFrame or None
        Concatenated results if return_df=True and save_mode="single"; otherwise None.


    Notes
    -----
    - Binner quantile edges and groupwise OLS coefficients are learned on TRAIN only.
    - Validation and test are transformed using those learned edges/coefficients.
    - In MPI jobs, prefer save_mode="per_split" or call this only on rank 0.
    - Binner edges and group OLS coefs are learned on train only.
    """
    out_parquet_path = Path(out_parquet_path)

    # Fit on train
    X_tr = x_splits["train"]
    y_tr = y_splits["train"]
    _ = pipeline.fit_transform(X_tr, y_tr)

    if save_mode not in {"single", "per_split"}:
        raise ValueError("save_mode must be 'single' or 'per_split'.")

    # Compute ME for each requested split
    parts: list[pd.DataFrame] = []
    for split in order_splits:
        if split not in x_splits:
            continue
        df_me = compute_me_for_split(
            fitted_pipeline=pipeline,
            X=x_splits[split],
            split_name=split,
            id_cols=id_cols,
            include_params=include_params,
            keep_cols=keep_cols,
        )

        if save_mode == "per_split":
            split_path = out_parquet_path.with_name(
                f"{out_parquet_path.stem}__{split}{out_parquet_path.suffix or '.parquet'}"
            )
            split_path.parent.mkdir(parents=True, exist_ok=True)
            df_me.to_parquet(split_path, index=False, compression=compression)
            # optionally avoid keeping in memory on huge runs
            if return_df:
                parts.append(df_me)
        else:
            parts.append(df_me)

    if save_mode == "single":
        final = pd.concat(parts, ignore_index=True) if parts else pd.DataFrame()
        out_parquet_path.parent.mkdir(parents=True, exist_ok=True)
        final.to_parquet(out_parquet_path, index=False, compression=compression)
        print(f"[SAVE] Wrote marginal emissions to {out_parquet_path} (rows={len(final):,})")
        return final if return_df else None
    else:
        print(f"[SAVE] Wrote per-split Parquet files next to {out_parquet_path}")
        if return_df:
            return pd.concat(parts, ignore_index=True) if parts else pd.DataFrame()
        return None

In [206]:
def fit_and_export_marginal_emissions_full(
    pipeline: Pipeline,
    X_full: pd.DataFrame,
    y_full: pd.Series | pd.DataFrame,
    out_parquet_path: str,
    *,
    id_cols: list[str] = ("timestamp", "city"),
    include_params: bool = True,
    keep_cols: list[str] = ("demand_met", "tons_co2"),
    compression: str | None = "snappy",
    return_df: bool = True,
) -> pd.DataFrame | None:
    """
    Fit the pipeline on ALL data and export marginal emissions for the ENTIRE dataset (no splits).

    Parameters:
    ----------
    pipeline: Pipeline
        The pipeline to fit and use for predictions.
    X_full: pd.DataFrame
        The feature data to fit the pipeline on.
    y_full: pd.Series | pd.DataFrame
        The target data to fit the pipeline on.
    out_parquet_path: str
        The path to save the output parquet file.
    id_cols: list[str], optional
        The columns to use as identifiers (default is ["timestamp", "city"]).
    include_params: bool, optional
        Whether to include model parameters in the output (default is True).
    keep_cols: list[str], optional
        The columns to keep in the output (default is ["demand_met", "tons_co2"]).
    compression: str | None, optional
        The compression method to use for the output parquet file (default is "snappy").
    return_df: bool, optional
        Whether to return the resulting DataFrame (default is True).

    Returns:
    -------
    pd.DataFrame | None
        The resulting DataFrame with marginal emissions or None if not requested.
    """
    out_parquet_path = Path(out_parquet_path)

    # Fit on all data
    pipeline.fit(X_full, y_full)

    # Compute ME for the whole table
    df_me = compute_me_for_split(
        fitted_pipeline=pipeline,
        X=X_full,
        split_name=None,              # no split column
        id_cols=id_cols,
        include_params=include_params,
        keep_cols=keep_cols,
    )

    # Save
    out_parquet_path.parent.mkdir(parents=True, exist_ok=True)
    df_me.to_parquet(out_parquet_path, index=False, compression=compression)
    print(f"[SAVE] Wrote marginal emissions to {out_parquet_path} (rows={len(df_me):,})")

    return df_me if return_df else None


#### Runners & Orchestrators

In [143]:
def run_regressor_model(
    user_pipeline: Pipeline,
    x_df: pd.DataFrame,
    y_df: pd.Series | pd.DataFrame,
    split_name: str,
    extra_info: dict | None = None,
    return_model: bool = False,
    random_state: int = 12,
    interval_hours: float = 0.5,
    *,
 model_id_hash: str | None = None,
    params_json_str: str | None = None,
) -> tuple[pd.DataFrame, list[str], GroupwiseRegressor | dict]:
    """
    Run a pipeline on one split, compute per-group metrics, attach energy weights,
    and compute diagnostics (pooled CO₂ fit + finite-difference ME checks).

    Parameters
    ----------
    user_pipeline : Pipeline
        Full pipeline [FeatureAddition → (Binner) → GroupwiseRegressor].
    x_df : pd.DataFrame
        Features for the split.
    y_df : pd.Series or single-column pd.DataFrame
        Target for the split.
    split_name : {"train","validation","test"}
        Which split to run.
    extra_info : dict, optional
        Extra metadata to stamp onto the output rows.
    return_model : bool, default False
        If True, returns the final estimator as the 3rd tuple item; otherwise returns extras dict.
    random_state : int, default 12
        Random seed for reproducibility.
    interval_hours : float, default 0.5
        Duration represented by each row (half-hourly = 0.5).
    model_id_hash : str, optional
        If provided, stamp this precomputed run-level hash (recommended).
        If None, a local signature is computed (useful for ad-hoc calls).
    params_json_str : str, optional
        Pre-rendered pipeline params JSON to stamp; if None, it is computed.

    Returns
    -------
    metrics_df : pd.DataFrame
        Per-group metrics with added 'energy_MWh' and metadata columns.
    x_cols_used : list[str]
        Regressor feature names used by the GroupwiseRegressor (x_vars + fe_vars).
    model_or_extras : GroupwiseRegressor | dict
        If return_model=True → the fitted final estimator; else a dict of diagnostics.
    """
    np.random.seed(random_state)

    for col in x_df.columns:
        dt = x_df[col].dtype
        if str(dt).startswith(("uint", "UInt")):
            x_df[col] = x_df[col].astype("int64")

    if split_name not in ("train", "validation", "test"):
        raise ValueError(f"split_name must be 'train', 'validation', or 'test' (got {split_name!r})")

    X = x_df.copy()
    if isinstance(y_df, pd.DataFrame):
        if y_df.shape[1] != 1:
            raise ValueError("y_df must be a Series or single-column DataFrame.")
        y_ser = y_df.iloc[:, 0]
    else:
        y_ser = y_df

    # Use provided model_id_hash (from orchestrator) or compute a local one
    if model_id_hash is None:
        model_id_hash, _ = signature_for_run(
            user_pipeline,
            x_columns=list(X.columns),
            y=y_ser,
            random_state=random_state,
            eval_splits=(split_name,),   # local call; orchestrator passes a shared hash
            compute_test=False,
            extra_info=extra_info,
        )

    if params_json_str is None:
        params_json_str = json.dumps(
            user_pipeline.get_params(deep=True),
            sort_keys=True, separators=(",", ":"), default=str
        )

    extras: dict[str, Any] = {}

    if split_name == "train":
        # Fit → metrics from regressor
        _ = user_pipeline.fit_transform(X, y_ser)
        model = user_pipeline._final_estimator  # type: ignore[attr-defined]
        metrics_df = model.get_metrics(summarise=True).reset_index(drop=True)

        # Canonicalize group col to "group"
        if model.group_col in metrics_df.columns:
            metrics_df = metrics_df.rename(columns={model.group_col: "group"})
        elif "group" not in metrics_df.columns:
            metrics_df = metrics_df.rename(columns={metrics_df.columns[0]: "group"})

        # Preprocessed rows for weights & diagnostics
        x_tr = _apply_fitted_preprocessing(user_pipeline, X)
        x_tr[model.y_var] = np.asarray(y_ser, dtype=float)

        # Energy weights
        w = _compute_group_energy_weights(
            df=x_tr, group_col=model.group_col, q_col=model.x_vars[0], interval_hours=interval_hours
        ).rename(columns={model.group_col: "group"})
        metrics_df = metrics_df.merge(w, on="group", how="left")

        # Diagnostics (in-sample)
        extras["pooled_co2"] = pooled_co2_metrics(
            model, x_tr, y_col=model.y_var, group_col=model.group_col
        )
        me_df = model.transform(x_tr)
        fd_df = finite_difference_me_metrics(
            df=me_df,
            time_col="timestamp" if "timestamp" in me_df.columns else "time_id",
            q_col=model.x_vars[0],
            y_col=model.y_var,
            me_col="ME",
            group_keys=[k for k in ("city",) if k in me_df.columns],
        )
        extras["fd_me_by_city"] = fd_df.to_dict(orient="records") if not fd_df.empty else []
        extras["fd_me_pooled"] = (
            fd_df.loc[fd_df["city"] == "ALL"].iloc[0].to_dict()
            if (not fd_df.empty and "city" in fd_df.columns and "ALL" in fd_df["city"].values)
            else (fd_df.sort_values("n_pairs", ascending=False).iloc[0].to_dict() if not fd_df.empty else {})
        )

    else:
        # Use fitted preprocessing + regressor
        model = user_pipeline._final_estimator  # type: ignore[attr-defined]
        x_tr = _apply_fitted_preprocessing(user_pipeline, X)

        if model.group_col not in x_tr.columns:
            raise KeyError(
                f"Group column '{model.group_col}' is missing after transform. "
                "Ensure your binner outputs it."
            )

        x_tr[model.y_var] = np.asarray(y_ser, dtype=float)

        # Per-group metrics
        metrics_df = evaluate_on_split(model, x_tr)

        # Energy weights
        w = _compute_group_energy_weights(
            df=x_tr, group_col=model.group_col, q_col=model.x_vars[0], interval_hours=interval_hours
        ).rename(columns={model.group_col: "group"})
        metrics_df = metrics_df.merge(w, on="group", how="left")

        # Out-of-sample diagnostics
        extras["pooled_co2"] = pooled_co2_metrics(
            model, x_tr, y_col=model.y_var, group_col=model.group_col
        )
        me_df = model.transform(x_tr)
        fd_df = finite_difference_me_metrics(
            df=me_df,
            time_col="timestamp" if "timestamp" in me_df.columns else "time_id",
            q_col=model.x_vars[0],
            y_col=model.y_var,
            me_col="ME",
            group_keys=[k for k in ("city",) if k in me_df.columns],
        )
        extras["fd_me_by_city"] = fd_df.to_dict(orient="records") if not fd_df.empty else []
        extras["fd_me_pooled"] = (
            fd_df.loc[fd_df["city"] == "ALL"].iloc[0].to_dict()
            if (not fd_df.empty and "city" in fd_df.columns and "ALL" in fd_df["city"].values)
            else (fd_df.sort_values("n_pairs", ascending=False).iloc[0].to_dict() if not fd_df.empty else {})
        )

    # Stamp metadata
    metrics_df["data_split"] = split_name
    metrics_df["model_id_hash"] = model_id_hash
    metrics_df["random_state"] = random_state
    metrics_df["pipeline_params_json"] = params_json_str
    metrics_df["log_time"] = datetime.now().isoformat()

    model = user_pipeline._final_estimator  # type: ignore[attr-defined]
    metrics_df["x_columns_used"] = ",".join(model.x_vars + model.fe_vars)
    for k, v in (extra_info or {}).items():
        metrics_df[k] = v

    x_cols_used = model.x_vars + model.fe_vars
    print(f"[LOG] {len(metrics_df)} rows for split={split_name}, model_id={model_id_hash}, random_state={random_state}")

    return (metrics_df, x_cols_used, model) if return_model else (metrics_df, x_cols_used, extras)

In [144]:
def regressor_orchestrator(
        user_pipeline: Pipeline,
        x_splits: dict,
        y_splits: dict,
        log_csv_path: str | None = "marginal_emissions_log.csv",   # legacy
        extra_info: dict | None = None,
        force_run: bool = False,
        force_overwrite: bool = False,
        random_state: int = 12,
        group_col_name: str = "group",
        interval_hours: float = 0.5,
        eval_splits: tuple[str, ...] | None = None,
        compute_test: bool = False,
        # rotating CSV
        results_dir: str | None = None,
        file_prefix: str | None = None,
        max_log_mb: int = 95,
        fsync: bool = True,
) -> pd.DataFrame | None:
    """
    Fit/evaluate a pipeline on train/validation/test, summarise metrics, and append to a CSV log.

    Parameters
    ----------
    user_pipeline : Pipeline
        Full pipeline, typically [FeatureAddition → Binner → GroupwiseRegressor].
    x_splits : dict
        Must include "train" and "validation". Include "test" iff compute_test=True.
    y_splits : dict
        Target splits with the same keys as x_splits.  Must include "train" and "validation". Include "test" iff compute_test=True.
     log_csv_path : str, optional
        Legacy path; used only to infer default results_dir/file_prefix if those are None.
    extra_info : dict, optional
        Extra metadata to stamp onto per-split logs (propagates into `run_regressor_model`).
    force_run : bool, default=False
        If False and an identical model signature was previously logged, skip this run.
    force_overwrite : bool, default=False
        If True, allows re-logging the same model_id_hash (previous rows are NOT removed here;
        use `save_summary_to_csv(..., force_overwrite=True)` for row replacement).
    random_state : int, default=12
        Random seed recorded in the model signature and summary.
    group_col_name : str, default="group"
        Canonical group column name used by `summarise_metrics_logs` for nested metrics.

    Returns
    -------
    pd.DataFrame or None
        One-row summary DataFrame if the run executes; None if skipped due to prior identical log.

    Notes
    -----
    - The model signature (hash) is computed from pipeline parameters, feature columns, target name(s),
      random_state, and any `extra_info`. If unchanged and `force_run=False`, the run is skipped.
    - `x_columns` recorded in the summary are taken from the **train** split’s evaluation result.
    """
    # in regressor_orchestrator before signature_for_run(...)
    if eval_splits is None:
        eval_splits = ("train","validation","test") if compute_test else ("train","validation")
    compute_test = ("test" in eval_splits)  # ← keep hash consistent with actual splits

    # One signature for the whole run (based on TRAIN)
    model_key, sig = signature_for_run(
        user_pipeline,
        x_columns=list(x_splits["train"].columns),
        y=y_splits["train"],
        random_state=random_state,
        eval_splits=eval_splits,
        compute_test=compute_test,
        extra_info=extra_info,
    )

    # Resolve dir + prefix (fallback to legacy path)
    if results_dir is None or file_prefix is None:
        base = Path(log_csv_path or "marginal_emissions_log.csv")
        inferred_dir = base.parent if str(base.parent) != "" else Path(".")
        inferred_prefix = base.stem
        results_dir = results_dir or str(inferred_dir)
        file_prefix = file_prefix or inferred_prefix

    # De-dupe via index
    if not force_run and not force_overwrite:
        if is_model_logged_rotating_csv(model_key, results_dir, file_prefix):
            print(f"[SKIP] Model already logged (hash: {model_key})")
            return None

    # Precompute params JSON once (consistent across splits)
    params_json_str = json.dumps(
        user_pipeline.get_params(deep=True),
        sort_keys=True, separators=(",", ":"), default=str
    )

    logs, pooled_extras, fd_extras = {}, {}, {}
    x_cols_used: list[str] | None = None

    for split in eval_splits:
        metrics_df, x_cols_used, extras = run_regressor_model(
            user_pipeline=user_pipeline,
            x_df=x_splits[split],
            y_df=y_splits[split],
            split_name=split,
            extra_info=extra_info,
            return_model=False,
            random_state=random_state,
            interval_hours=interval_hours,
            model_id_hash=model_key,          # shared ID across splits
            params_json_str=params_json_str,  # shared params JSON
        )
        logs[split] = metrics_df
        pooled_extras[split] = extras.get("pooled_co2", {})
        fd_extras[split] = extras.get("fd_me_pooled", {})

    summary_df = summarise_metrics_logs(
        train_logs=logs["train"],
        val_logs=logs["validation"],
        test_logs=logs.get("test"),
        user_pipeline=user_pipeline,
        x_columns=x_cols_used or [],
        random_state=random_state,
        group_col_name=group_col_name,           # <- use the parameter you accept
        pooled_metrics_by_split=pooled_extras,
        fd_me_metrics_by_split=fd_extras,
    )

    save_summary_to_rotating_csv(
        summary_df,
        results_dir=results_dir,
        file_prefix=file_prefix,
        max_mb=max_log_mb,
        force_overwrite=force_overwrite,
        fsync=fsync,
    )
    return summary_df

#### Grid Search

In [145]:
def run_grid_search(
        base_feature_pipeline: Pipeline,
        regressor_cls,
        regressor_kwargs: dict,
        grid_config: list[dict],
        x_splits: dict,
        y_splits: dict,
        log_path: str | None,  # legacy; optional now
        global_extra_info: dict | None = None,
        force_run: bool = False,
        force_overwrite: bool = False,
        base_feature_pipeline_name: str = "BaseFeaturePipeline",
        eval_splits: tuple[str, ...] = ("train","validation"),
        results_dir: str | None = None,
        file_prefix: str | None = None,
        max_log_mb: int = 95,
        fsync: bool = True,
) -> None:
    """
    Execute a series of [features → binner → regressor] runs and log one summary row per config.

    Parameters
    ----------
    base_feature_pipeline : Pipeline
        Preprocessing steps applied before binning. This object is cloned per run to avoid state leakage.
    regressor_cls : type
        Estimator class to instantiate for the final step (e.g., GroupwiseRegressor).
    regressor_kwargs : dict
        Baseline kwargs for the regressor. Per-config overrides from `grid_config` are merged on top.
        IMPORTANT: This function will not mutate the caller's dict.
    grid_config : list of dict
        Each item should contain:
            - "binner_class": class (e.g., MultiQuantileBinner or MultiMedianBinner)
            - "binner_kwargs": dict of init args for the binner
            - "label": str label for printing/logging (optional)
            - Optional: "x_vars", "fe_vars" to override the regressor’s predictors per-config
            - Optional: anything else you want echoed into `extra_info`
    x_splits, y_splits : dict
        Dicts keyed by {"train","validation","test"} with DataFrames/Series for each split.
    log_path : str
        CSV path where each successful config appends one summary row.
    global_extra_info : dict, optional
        Extra metadata stamped into each run’s logs.
    force_run, force_overwrite : bool
        Passed through to `regressor_orchestrator`.
    base_feature_pipeline_name : str, default "BaseFeaturePipeline"
        Step name used for the features sub-pipeline.

    Returns
    -------
    None
        Prints progress and writes rows to `log_path`. Skips silently (with a message) if a config
        is already logged and `force_run=False`.

    Notes
    -----
    - We clone `base_feature_pipeline` per run to avoid cross-config state sharing.
    - If a binner provides `group_col_name` and the regressor does not specify `group_col`,
      we set the regressor’s `group_col` to match.
    - If a config provides `x_vars`/`fe_vars`, they override the baseline `regressor_kwargs`.
    """
    missing_x = [s for s in eval_splits if s not in x_splits]
    missing_y = [s for s in eval_splits if s not in y_splits]
    if missing_x or missing_y:
        raise KeyError(f"Missing splits: X{missing_x} Y{missing_y}")

    total = len(grid_config)
    for i, raw_config in enumerate(grid_config, start=1):
        config = dict(raw_config)
        binner_class = config["binner_class"]
        binner_kwargs = dict(config.get("binner_kwargs", {}))
        label = config.get("label", binner_class.__name__)

        reg_kwargs = dict(regressor_kwargs)
        if "x_vars" in config:
            reg_kwargs["x_vars"] = list(config["x_vars"])
        if "fe_vars" in config:
            reg_kwargs["fe_vars"] = list(config["fe_vars"])
        reg_kwargs["random_state"] = reg_kwargs.get("random_state", 12)

        binner_group_col = binner_kwargs.get("group_col_name")
        if binner_group_col and "group_col" not in reg_kwargs:
            reg_kwargs["group_col"] = binner_group_col

        try:
            features_step = clone(base_feature_pipeline)
        except Exception:
            features_step = base_feature_pipeline

        binner = binner_class(**binner_kwargs)
        regressor = regressor_cls(**reg_kwargs)

        full_pipeline = Pipeline([
            (base_feature_pipeline_name, features_step),
            (binner_class.__name__, binner),
            (regressor_cls.__name__, regressor),
        ])

        extra_info = {
            "binner_class": binner_class.__name__,
            "binner_params": binner_kwargs,
            "regressor_params": reg_kwargs,
            "grid_label": label,
            **(global_extra_info or {}),
        }

        rank_tag = ""
        try:
            _, rank, size = _mpi_context()
            rank_tag = f"[R{rank}/{max(size-1,0)}] "
        except Exception:
            pass
        print(f"\n{rank_tag}[GRID {i}/{total}] {label}")

        try:
            summary_df = regressor_orchestrator(
                user_pipeline=full_pipeline,
                x_splits=x_splits,
                y_splits=y_splits,
                log_csv_path=log_path,            # legacy OK
                extra_info=extra_info,
                force_run=force_run,
                force_overwrite=force_overwrite,
                random_state=reg_kwargs["random_state"],
                eval_splits=eval_splits,
                # NEW
                results_dir=results_dir,
                file_prefix=file_prefix,
                max_log_mb=max_log_mb,
                fsync=fsync,
                )
            if summary_df is not None:
                print(f"[GRID] Logged: {label}")
            else:
                print(f"[GRID] Skipped (already logged): {label}")
        except Exception as e:
            print(f"[GRID] ERROR in '{label}': {type(e).__name__}: {e}")
            continue

In [146]:
def run_grid_search_auto(
        base_feature_pipeline,
        regressor_cls,
        regressor_kwargs: dict,
        grid_config: list[dict],
        x_splits: dict,
        y_splits: dict,
        *,
        # logging/rotation knobs
        results_dir: str,
        file_prefix: str,
        max_log_mb: int = 95,
        fsync: bool = False,              # set True on HPC if you want durable writes
        # orchestration
        base_feature_pipeline_name: str = "FeatureAdditionPipeline",
        eval_splits: tuple[str, ...] = ("train","validation"),
        force_run: bool = False,
        force_overwrite: bool = False,
        distribute: str = "auto",         # "auto" | "mpi" | "single"
        dist_mode: str = "stride",        # "stride" | "chunked"
        seed: int = 12,
) -> None:
    """
    Single-node or MPI-parallel grid search runner.

    - Auto-detects MPI and splits `grid_config` across ranks.
    - Ensures per-rank deterministic RNG via `seed + rank`.
    - Uses rotating CSV logging with per-file & index locks.

    Parameters are passed straight to `run_grid_search`, except we slice `grid_config`.

    Parameters
    ----------
    base_feature_pipeline: Pipeline
        The base feature pipeline to use for each config.
    regressor_cls: Type[BaseEstimator]
        The regression model class to use.
    regressor_kwargs: dict
        Keyword arguments to pass to the regression model.
    grid_config: list[dict]
        The grid search configuration to use.
    x_splits: dict
        The input feature splits.
    y_splits: dict
        The target variable splits.
    results_dir: str
        The directory to save results.
    file_prefix: str
        The prefix for result files.
    max_log_mb: int
        The maximum log file size in MB.
    naming: PartNaming | None
        Optional naming scheme for output files.
    fsync: bool
        Whether to fsync log files (for durability).
    base_feature_pipeline_name: str
        The name of the base feature pipeline.
    eval_splits: tuple[str, ...]
        The evaluation splits to use.
    force_run: bool
        Whether to force re-running of existing configs.
    force_overwrite: bool
        Whether to force overwriting of existing results.
    distribute: str
        The distribution strategy to use.
    dist_mode: str
        The distribution mode to use.
    seed: int
        The random seed to use.

    Returns
    -------
    None
        Logs the results of the grid search.
    """
    comm, rank, size = _mpi_context()
    if distribute == "auto":
        distribute = "mpi" if size > 1 else "single"

    # Partition the configs
    local_configs = _distribute_configs(grid_config, rank=rank, size=size, mode=dist_mode) \
                    if distribute == "mpi" else grid_config
    if not local_configs:
        if rank == 0:
            print("[GRID] No configs assigned (empty grid or partition).")
        return

    # Per-rank RNG — override/augment existing random_state
    local_reg_kwargs = dict(regressor_kwargs)
    local_reg_kwargs["random_state"] = int(local_reg_kwargs.get("random_state", seed))

    if rank == 0 and distribute == "mpi":
        print(f"[MPI] size={size} → ~{len(grid_config)/max(size,1):.1f} configs per rank")
    else:
        if distribute == "mpi":
            print(f"[MPI] rank={rank}/{size-1} assigned {len(local_configs)} configs")

    run_grid_search(
        base_feature_pipeline=base_feature_pipeline,
        regressor_cls=regressor_cls,
        regressor_kwargs=local_reg_kwargs,
        grid_config=local_configs,
        x_splits=x_splits,
        y_splits=y_splits,
        log_path=None,  # legacy path unused when using rotating logs
        global_extra_info={"runner_rank": rank, "runner_size": size},
        force_run=force_run,
        force_overwrite=force_overwrite,
        base_feature_pipeline_name=base_feature_pipeline_name,
        eval_splits=eval_splits,
        results_dir=results_dir,
        file_prefix=file_prefix,
        max_log_mb=max_log_mb,
        fsync=fsync,
    )

    # Optional barrier for neat logs
    try:
        comm.Barrier()
    except Exception:
        pass
    if rank == 0:
        print("[GRID] Completed (all ranks).")


#### Parameter Grid

In [147]:
def all_nonempty_subsets(columns: list[str]) -> list[list[str]]:
    """All non-empty subsets preserving input order."""
    return [list(c) for i in range(1, len(columns) + 1) for c in combinations(columns, i)]

In [148]:
def get_fe_vars(all_cols: list[str], x_vars: list[str]) -> list[str]:
    """Complement of x_vars within all_cols."""
    xset = set(x_vars)
    return [c for c in all_cols if c not in xset]

In [149]:
def build_x_fe_combinations_disjoint(
    candidate_x_vars: list[str],
    candidate_fe_vars: list[str],
    x_var_length: int = 2,
    max_fe_len: int | None = None,
    *,
    allow_empty_fe: bool = False,
) -> list[dict[str, Any]]:
    """
    Generate all disjoint non-empty combinations of x_vars and fe_vars.

    Parameters
    ----------
    candidate_x_vars : list of str
        Columns eligible to be used as predictors (x_vars).
    candidate_fe_vars : list of str
        Columns eligible to be used as fixed effects (fe_vars).
    x_var_length : int
        Number of x_vars to include in each combination.
    max_fe_len : int | None
        Maximum number of fe_vars to include in each combination.
    allow_empty_fe : bool
        Whether to allow empty fe_vars in the combinations.

    Returns
    -------
    list of dicts
        Each dict has keys: {'x_vars': [...], 'fe_vars': [...]}
    """
    if x_var_length < 1:
        raise ValueError("x_var_length must be >= 1")
    if len(candidate_x_vars) < x_var_length:
        raise ValueError("Not enough candidate_x_vars for requested x_var_length")

    results: list[dict[str, Any]] = []

    x_subsets = [list(c) for c in combinations(candidate_x_vars, x_var_length)]
    fe_pool = [list(c) for i in range(0 if allow_empty_fe else 1, len(candidate_fe_vars) + 1)
               for c in combinations(candidate_fe_vars, i)]

    for x_vars in x_subsets:
        for fe_vars in fe_pool:
            if max_fe_len is not None and len(fe_vars) > max_fe_len:
                continue
            if set(x_vars).isdisjoint(fe_vars):
                results.append({"x_vars": x_vars, "fe_vars": list(fe_vars)})
    return results

In [150]:
def build_quantile_grid_configs(
        candidate_binning_vars: list[str],
        candidate_bin_counts: list[int],
        candidate_x_vars: list[str],
        candidate_fe_vars: list[str],
        x_var_length: int = 2,
        binner_extra_grid: dict | list[dict] | None = None,
) -> list[dict[str, Any]]:
    """
    Produce configs for MultiQuantileBinner sweeping:
      - which vars to bin on
      - how many bins
      - x/fe combinations (disjoint from binned vars)
      - optional extra binner kwargs via dict-of-lists or list-of-dicts

    Parameters
    ----------
    candidate_binning_vars : list[str]
        Variables to be binned.
    candidate_bin_counts : list[int]
        Number of bins to create for each variable.
    candidate_x_vars : list[str]
        Variables to use as predictors (x_vars).
    candidate_fe_vars : list[str]
        Variables to use as fixed effects (fe_vars).
    x_var_length : int
        Number of x_vars to include in each combination.
    binner_extra_grid : dict | list[dict] | None
        Optional extra parameters for the binner.

    Returns
    -------
    list[dict[str, Any]]
        A list of configuration dictionaries for the binner.
    """
    if not candidate_binning_vars:
        return []
    if not candidate_bin_counts:
        return []

    def _expand(grid):
        if grid is None:
            return [dict()]
        if isinstance(grid, list):
            return [dict(d) for d in grid]
        if isinstance(grid, dict):
            keys = list(grid.keys())
            vals = [list(v) if isinstance(v, (list, tuple, set)) else [v] for v in (grid[k] for k in keys)]
            return [dict(zip(keys, combo)) for combo in product(*vals)]
        raise TypeError("binner_extra_grid must be a dict or list of dicts")

    extra_list = _expand(binner_extra_grid)
    configs: list[dict[str, Any]] = []

    # compute once (perf)
    x_fe_grid = build_x_fe_combinations_disjoint(
        candidate_x_vars, candidate_fe_vars, x_var_length=x_var_length
    )

    for bin_vars in all_nonempty_subsets(candidate_binning_vars):
        bset = set(bin_vars)
        for bin_count in candidate_bin_counts:
            if int(bin_count) < 2:
                continue
            bin_spec = {v: int(bin_count) for v in bin_vars}

            for combo in x_fe_grid:
                if not set(combo["x_vars"]).isdisjoint(bset):
                    continue
                for extra in extra_list:
                    binner_kwargs = {"bin_specs": bin_spec, **extra}

                    # label suffix for clarity in logs
                    tag_bits = []
                    pol = extra.get("oob_policy")
                    if pol: tag_bits.append(f"oob{pol}")
                    rate = extra.get("max_oob_rate")
                    if rate is not None: tag_bits.append(f"rate{float(rate):g}")
                    tag = f"__{'_'.join(tag_bits)}" if tag_bits else ""

                    configs.append({
                        "binner_class": MultiQuantileBinner,
                        "binner_kwargs": binner_kwargs,
                        "label": (
                            f"qbin_{bin_count}_{'-'.join(bin_vars)}"
                            f"__x_{'-'.join(combo['x_vars'])}"
                            f"__fe_{'-'.join(combo['fe_vars'])}{tag}"
                        ),
                        "x_vars": combo["x_vars"],
                        "fe_vars": combo["fe_vars"],
                    })
    return configs

In [151]:
def build_median_binner_configs(
    candidate_binning_vars: list[str],
    candidate_x_vars: list[str],
    candidate_fe_vars: list[str],
    x_var_length: int = 2,
    max_fe_len: int | None = None,
    binner_extra_grid: dict | list[dict] | None = None,
) -> list[dict[str, Any]]:
    """
    Produce configs for MultiMedianBinner sweeping subsets of variables and x/fe combos.

    Parameters
    ----------
    candidate_binning_vars : list[str]
        Variables to be binned.
    candidate_x_vars : list[str]
        Variables to use as predictors (x_vars).
    candidate_fe_vars : list[str]
        Variables to use as fixed effects (fe_vars).
    x_var_length : int
        Number of x_vars to include in each combination.
    max_fe_len : int | None
        Maximum number of fixed effects to include in each combination.
    binner_extra_grid : dict | list[dict] | None
        Optional extra parameters for the binner.

    Returns
    -------
    list[dict[str, Any]]
        A list of configuration dictionaries for the binner.
    """
    if not candidate_binning_vars:
        return []

    def _expand(grid):
        if grid is None:
            return [dict()]
        if isinstance(grid, list):
            return [dict(d) for d in grid]
        if isinstance(grid, dict):
            keys = list(grid.keys())
            vals = [ (v if isinstance(v, (list, tuple, set)) else [v]) for v in grid.values() ]
            return [dict(zip(keys, combo)) for combo in product(*vals)]
        raise TypeError("binner_extra_grid must be a dict or list of dicts")

    extra_list = _expand(binner_extra_grid)

    configs: list[dict[str, Any]] = []
    x_fe_grid = build_x_fe_combinations_disjoint(
        candidate_x_vars, candidate_fe_vars, x_var_length=x_var_length, max_fe_len=max_fe_len
    )

    for bin_vars in all_nonempty_subsets(candidate_binning_vars):
        bset = set(bin_vars)
        for combo in x_fe_grid:
            if not set(combo["x_vars"]).isdisjoint(bset):
                continue
            for extra in extra_list:
                binner_kwargs = {
                    "variables": bin_vars,
                    "group_col_name": "median_group_id",
                    "retain_flags": True,
                    **extra,
                }
                tag_bits = []
                if "retain_flags" in extra:
                    tag_bits.append(f"rf{int(bool(extra['retain_flags']))}")
                for k, v in extra.items():
                    if k == "retain_flags":
                        continue
                    tag_bits.append(f"{k}{v}")
                tag = f"__{'_'.join(tag_bits)}" if tag_bits else ""

                configs.append({
                    "binner_class": MultiMedianBinner,
                    "binner_kwargs": binner_kwargs,
                    "label": (
                        f"median_{'-'.join(bin_vars)}"
                        f"__x_{'-'.join(combo['x_vars'])}"
                        f"__fe_{'-'.join(combo['fe_vars'])}{tag}"
                    ),
                    "x_vars": combo["x_vars"],
                    "fe_vars": combo["fe_vars"],
                })
    return configs

## Loading Data

#### Directories

In [152]:
# DIRECTORIES AND PATHS
# This is a redundant code block, but it is included as a reminder of the directory variables.
base_data_directory = "data"  # Base directory where the dataframes will be saved
hitachi_data_directory = os.path.join(base_data_directory, "hitachi_copy")  # Directory where the dataframes will be saved
marginal_emissions_development_directory = os.path.join(base_data_directory, "marginal_emissions_development")  # Directory for marginal emissions development data
marginal_emissions_results_directory = os.path.join(marginal_emissions_development_directory, "results")
marginal_emissions_logs_directory = os.path.join(marginal_emissions_development_directory, "logs")

marginal_emissions_prefix = "marginal_emissions_results"

In [153]:
print("\n" + "-" * 120)
print(f"Contents of '{hitachi_data_directory}' and subdirectories:\n" + "-" * 120)
for root, dirs, files in os.walk(hitachi_data_directory):
    for f in sorted(files):
        rel_dir = os.path.relpath(root, hitachi_data_directory)
        rel_file = os.path.join(rel_dir, f) if rel_dir != "." else f
        print(f"  - {rel_file}")


------------------------------------------------------------------------------------------------------------------------
Contents of 'data/hitachi_copy' and subdirectories:
------------------------------------------------------------------------------------------------------------------------
  - .DS_Store
  - customers_20250606_1901.parquet
  - customers_20250630_1215.parquet
  - customers_20250701_1318.parquet
  - customers_20250714_1401.parquet
  - customers_weather_mapping_20250630_1215.parquet
  - grid_readings_20250606_1901.parquet
  - grid_readings_20250630_1215.parquet
  - grid_readings_20250701_1318.parquet
  - grid_readings_20250714_1401.parquet
  - grid_readings_20250714_1401_processed.parquet
  - grid_readings_20250714_1401_processed_half_hourly.parquet
  - meter_readings_20250606_1901.parquet
  - meter_readings_20250630_1215.parquet
  - meter_readings_20250701_1318.parquet
  - weather_20250606_1901.parquet
  - weather_20250630_1215.parquet
  - weather_20250701_1318.parque

#### File Paths

In [154]:
# cleaned weather data
base_file = "weather_and_grid_data_half-hourly_20250714_1401"

train_file = "marginal_emissions_estimation_20250714_1401_train_data"
validation_file = "marginal_emissions_estimation_20250714_1401_validation_data"
test_file = "marginal_emissions_estimation_20250714_1401_test_data"

In [155]:
base_filepath = os.path.join(hitachi_data_directory, base_file + ".parquet")

train_filepath = os.path.join(marginal_emissions_development_directory, train_file + ".parquet")
validation_filepath = os.path.join(marginal_emissions_development_directory, validation_file + ".parquet")
test_filepath = os.path.join(marginal_emissions_development_directory, test_file + ".parquet")

#### Load and Look at Data

In [156]:
base_pldf = pl.read_parquet(base_filepath)

In [157]:
train_pldf = pl.read_parquet(train_filepath)
validation_pldf = pl.read_parquet(validation_filepath)
test_pldf = pl.read_parquet(test_filepath)

In [158]:
# Sample Rows of the DataFrame
print("\n" + "-" * 120)
print(f"Sample rows of prepared dataset [train_pldf]:\n" + "-" * 120)
display(train_pldf.sample(8))
display(train_pldf.schema)


------------------------------------------------------------------------------------------------------------------------
Sample rows of prepared dataset [train_pldf]:
------------------------------------------------------------------------------------------------------------------------


timestamp,city,land_latitude,land_longitude,wind_speed_mps,wind_direction_meteorological,temperature_celsius,precipitation_mm,surface_net_solar_radiation_kWh_per_m2,surface_solar_radiation_downwards_kWh_per_m2,surface_net_solar_radiation_joules_per_m2,surface_solar_radiation_downwards_joules_per_m2,total_cloud_cover,high_cloud_cover,medium_cloud_cover,low_cloud_cover,thermal_generation,gas_generation,hydro_generation,nuclear_generation,renewable_generation,total_generation,demand_met,non_renewable_generation,tons_co2,g_co2_per_kwh,tons_co2_per_mwh,wind_dir_cardinal_8,wind_dir_cardinal_16,wind_dir_cardinal_4
"datetime[μs, Asia/Kolkata]",cat,f64,f64,f32,f32,f64,f64,f64,f64,f64,f64,f32,f32,f32,f32,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,str,str,str
2021-09-27 20:00:00 IST,"""mumbai""",19.0,72.87,0.924853,212.124847,25.391083,0.073388,0.208333,0.0,0.75,0.0,0.982086,0.977661,0.377747,0.186584,116852.166667,5783.166667,30008.75,5023.333333,11357.916667,169025.333333,167794.083333,157667.416667,58231.5881,689.034097,0.689034,"""SW""","""SSW""","""S"""
2021-09-12 20:30:00 IST,"""delhi""",28.5,77.34,1.63929,50.575806,25.773087,0.211457,0.0,0.0,0.0,0.0,0.834717,0.802658,0.480942,0.050446,103866.0,3856.416667,32174.5,4873.916667,14401.583333,159172.416667,157736.083333,144770.833333,51471.20895,646.733876,0.646734,"""NE""","""NE""","""E"""
2022-10-15 17:30:00 IST,"""delhi""",28.7,76.84,1.425882,218.357635,21.027451,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,118955.75,1514.166667,26546.166667,5602.166667,9030.25,161648.5,158935.416667,152618.25,58295.9289,721.248411,0.721248,"""SW""","""SW""","""S"""
2021-05-16 23:30:00 IST,"""mumbai""",19.2,72.77,11.142207,94.925476,27.693573,0.448465,0.0,0.0,0.0,0.0,1.0,1.0,0.879959,0.598038,104849.916667,4703.0,21369.75,5115.416667,13190.5,149228.583333,147783.5,136038.083333,52141.0544,698.812579,0.698813,"""E""","""E""","""E"""
2021-01-16 09:00:00 IST,"""mumbai""",18.6,72.97,4.02385,290.266449,28.962738,0.0,343449.538203,396765.752842,1236400.0,1428400.0,0.0,0.0,0.0,0.0,129002.5,4321.0,20597.5,4572.916667,10641.416667,169135.333333,168428.083333,158493.916667,63822.243,754.698572,0.754699,"""W""","""WNW""","""W"""
2022-11-05 23:30:00 IST,"""delhi""",28.5,77.24,2.291211,110.456741,20.155869,0.0,5e-19,0.0,1.8e-12,0.0,0.018951,0.018951,0.0,0.0,118432.916667,1970.583333,16261.416667,5249.416667,4395.333333,146309.666667,144045.583333,141914.333333,58143.89825,794.814058,0.794814,"""E""","""ESE""","""E"""
2023-02-07 01:30:00 IST,"""mumbai""",18.8,72.87,3.429985,30.331482,21.22171,0.0,2.5e-19,0.0,9e-13,0.0,0.0,0.0,0.0,0.0,131601.916667,2420.416667,7165.5,5412.5,4301.666667,150902.0,148853.166667,146600.333333,64661.04755,856.997924,0.856998,"""NE""","""NNE""","""N"""
2023-02-27 23:30:00 IST,"""delhi""",28.6,77.14,1.487409,2.909271,15.831146,0.0,5e-19,0.0,1.8e-12,0.0,0.0,0.0,0.0,0.0,144964.166667,2460.25,11306.083333,4950.666667,6624.166667,170305.333333,169720.166667,163681.166667,71180.09825,835.914292,0.835914,"""N""","""N""","""N"""


Schema([('timestamp', Datetime(time_unit='us', time_zone='Asia/Kolkata')),
        ('city', Categorical(ordering='physical')),
        ('land_latitude', Float64),
        ('land_longitude', Float64),
        ('wind_speed_mps', Float32),
        ('wind_direction_meteorological', Float32),
        ('temperature_celsius', Float64),
        ('precipitation_mm', Float64),
        ('surface_net_solar_radiation_kWh_per_m2', Float64),
        ('surface_solar_radiation_downwards_kWh_per_m2', Float64),
        ('surface_net_solar_radiation_joules_per_m2', Float64),
        ('surface_solar_radiation_downwards_joules_per_m2', Float64),
        ('total_cloud_cover', Float32),
        ('high_cloud_cover', Float32),
        ('medium_cloud_cover', Float32),
        ('low_cloud_cover', Float32),
        ('thermal_generation', Float64),
        ('gas_generation', Float64),
        ('hydro_generation', Float64),
        ('nuclear_generation', Float64),
        ('renewable_generation', Float64),
        (

In [159]:
display(train_pldf.describe())

statistic,timestamp,city,land_latitude,land_longitude,wind_speed_mps,wind_direction_meteorological,temperature_celsius,precipitation_mm,surface_net_solar_radiation_kWh_per_m2,surface_solar_radiation_downwards_kWh_per_m2,surface_net_solar_radiation_joules_per_m2,surface_solar_radiation_downwards_joules_per_m2,total_cloud_cover,high_cloud_cover,medium_cloud_cover,low_cloud_cover,thermal_generation,gas_generation,hydro_generation,nuclear_generation,renewable_generation,total_generation,demand_met,non_renewable_generation,tons_co2,g_co2_per_kwh,tons_co2_per_mwh,wind_dir_cardinal_8,wind_dir_cardinal_16,wind_dir_cardinal_4
str,str,str,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,str,str,str
"""count""","""2365200""","""2365200""",2365200.0,2365200.0,2365200.0,2365200.0,2365200.0,2365200.0,2365200.0,2365200.0,2365200.0,2365200.0,2365200.0,2365200.0,2365200.0,2365200.0,2365200.0,2365200.0,2365200.0,2365200.0,2365200.0,2365200.0,2365200.0,2365200.0,2365200.0,2365200.0,2365200.0,"""2365200""","""2365200""","""2365200"""
"""null_count""","""0""","""0""",0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,"""0""","""0""","""0"""
"""mean""","""2022-07-02 11:45:00+05:30""",,25.393333,75.701111,2.6235,209.367767,25.327742,0.074796,77502.482101,93200.185481,322438.613659,386230.698,0.368437,0.268966,0.15923,0.108627,127381.266888,3495.228177,18310.196241,4896.593691,17231.281789,171305.662978,169819.805685,154083.284997,62846.579568,737.505107,0.737505,,,
"""std""",,,4.538596,1.969453,1.384926,108.197655,6.782098,0.301263,117310.82224,140789.172376,438591.739796,523835.164521,0.391733,0.383111,0.246949,0.21914,15967.372027,1293.828068,8769.339123,629.105609,12790.794035,21407.213375,21168.361496,18020.958844,7829.084075,73.445805,0.073446,,,
"""min""","""2021-01-01 00:00:00+05:30""",,18.5,72.77,0.005185,0.000122,2.149261,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,75325.666667,109.483135,3093.333333,2281.083333,0.0,106969.666667,105140.916667,95292.666667,37529.27105,501.631083,0.501631,"""E""","""E""","""E"""
"""25%""","""2021-10-01 18:00:00+05:30""",,19.2,72.97,1.686868,107.563995,21.790451,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,115672.583333,2500.333333,10473.166667,4391.166667,6528.083333,157407.0,156063.0,141618.833333,57124.82145,688.786052,0.688786,,,
"""50%""","""2022-07-02 12:00:00+05:30""",,28.5,76.94,2.39874,244.841629,26.34166,0.0,0.221963,0.277778,12837.411089,15372.208203,0.197662,0.001999,0.029449,0.0,128998.583333,3123.583333,17404.333342,4981.0,13140.416667,172473.75,171032.5,154705.25,63598.64055,736.896614,0.736897,,,
"""75%""","""2023-04-02 05:30:00+05:30""",,28.7,77.14,3.289447,303.114471,29.49292,0.00441,142379.684127,171949.472127,640638.960422,770004.528013,0.786835,0.569672,0.228638,0.100479,139876.5,4266.083333,25239.166667,5389.5,25975.833333,186182.916667,184641.25,166615.5,68917.53425,797.797138,0.797797,,,
"""max""","""2023-12-31 23:30:00+05:30""",,28.8,77.34,19.006941,359.998749,45.706146,11.963309,465916.895282,530511.99554,1677300.0,1909800.0,1.0,1.0,1.0,1.0,171241.430556,10157.0,43162.083333,6387.083333,63835.75,243275.75,241132.916667,210043.916667,84781.290592,905.09159,0.905092,"""W""","""WSW""","""W"""


In [160]:
train_pldf.filter(
    pl.col("timestamp") > datetime(2022, 4, 30, 23, tzinfo=ZoneInfo("Asia/Kolkata"))
)

timestamp,city,land_latitude,land_longitude,wind_speed_mps,wind_direction_meteorological,temperature_celsius,precipitation_mm,surface_net_solar_radiation_kWh_per_m2,surface_solar_radiation_downwards_kWh_per_m2,surface_net_solar_radiation_joules_per_m2,surface_solar_radiation_downwards_joules_per_m2,total_cloud_cover,high_cloud_cover,medium_cloud_cover,low_cloud_cover,thermal_generation,gas_generation,hydro_generation,nuclear_generation,renewable_generation,total_generation,demand_met,non_renewable_generation,tons_co2,g_co2_per_kwh,tons_co2_per_mwh,wind_dir_cardinal_8,wind_dir_cardinal_16,wind_dir_cardinal_4
"datetime[μs, Asia/Kolkata]",cat,f64,f64,f32,f32,f64,f64,f64,f64,f64,f64,f32,f32,f32,f32,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,str,str,str
2022-04-30 23:30:00 IST,"""mumbai""",18.5,72.97,0.182899,94.013245,26.655075,0.000477,0.0,0.0,0.0,0.0,0.717941,0.0,0.0,0.717941,148886.591488,4306.891188,22342.690016,4204.309016,8333.267252,188073.74896,186894.414078,179740.481707,73506.59789,781.68404,0.781684,"""E""","""E""","""E"""
2022-04-30 23:30:00 IST,"""mumbai""",19.3,72.97,1.368702,148.993393,26.589142,0.000477,0.0,0.0,0.0,0.0,0.640396,0.0,0.0,0.640396,148886.591488,4306.891188,22342.690016,4204.309016,8333.267252,188073.74896,186894.414078,179740.481707,73506.59789,781.68404,0.781684,"""SE""","""SSE""","""S"""
2022-04-30 23:30:00 IST,"""mumbai""",19.1,72.87,1.339394,187.678055,26.867416,0.000268,0.0,0.0,0.0,0.0,0.706665,0.0,0.0,0.706665,148886.591488,4306.891188,22342.690016,4204.309016,8333.267252,188073.74896,186894.414078,179740.481707,73506.59789,781.68404,0.781684,"""S""","""S""","""S"""
2022-04-30 23:30:00 IST,"""mumbai""",18.8,72.97,1.035694,175.194733,26.229767,0.000238,0.0,0.0,0.0,0.0,0.689423,0.0,0.0,0.689423,148886.591488,4306.891188,22342.690016,4204.309016,8333.267252,188073.74896,186894.414078,179740.481707,73506.59789,781.68404,0.781684,"""S""","""S""","""S"""
2022-04-30 23:30:00 IST,"""mumbai""",19.0,72.87,1.457114,201.003815,27.021255,0.000238,0.0,0.0,0.0,0.0,0.706665,0.0,0.0,0.706665,148886.591488,4306.891188,22342.690016,4204.309016,8333.267252,188073.74896,186894.414078,179740.481707,73506.59789,781.68404,0.781684,"""S""","""SSW""","""S"""
…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…
2023-12-31 23:30:00 IST,"""delhi""",28.8,76.94,1.737193,339.041443,7.635208,0.0,0.0,0.0,0.0,0.0,0.514725,0.0,0.0,0.514725,126499.75,1906.583333,4063.583333,5740.083333,10116.25,148326.25,147825.583333,138210.0,62059.65945,836.804765,0.836805,"""N""","""NNW""","""N"""
2023-12-31 23:30:00 IST,"""delhi""",28.8,77.04,1.823473,337.503754,7.538071,0.0,0.0,0.0,0.0,0.0,0.514725,0.0,0.0,0.514725,126499.75,1906.583333,4063.583333,5740.083333,10116.25,148326.25,147825.583333,138210.0,62059.65945,836.804765,0.836805,"""N""","""NNW""","""N"""
2023-12-31 23:30:00 IST,"""delhi""",28.8,77.14,1.885798,335.592499,7.494827,0.0,0.0,0.0,0.0,0.0,0.514725,0.0,0.0,0.514725,126499.75,1906.583333,4063.583333,5740.083333,10116.25,148326.25,147825.583333,138210.0,62059.65945,836.804765,0.836805,"""NW""","""NNW""","""N"""
2023-12-31 23:30:00 IST,"""delhi""",28.8,77.24,1.935602,333.553284,7.445801,0.0,0.0,0.0,0.0,0.0,0.993988,0.0,0.0,0.993988,126499.75,1906.583333,4063.583333,5740.083333,10116.25,148326.25,147825.583333,138210.0,62059.65945,836.804765,0.836805,"""NW""","""NNW""","""N"""


In [161]:
# reminder of time boundaries
print("Train Time Boundaries:")
print(f"\tStart Time: {train_pldf['timestamp'].min()}")
print(f"\tEnd Time: {train_pldf['timestamp'].max()}")
print(f"\tTotal Duration: {train_pldf['timestamp'].max() - train_pldf['timestamp'].min()}")
print("\n" + "-" * 80)

Train Time Boundaries:
	Start Time: 2021-01-01 00:00:00+05:30
	End Time: 2023-12-31 23:30:00+05:30
	Total Duration: 1094 days, 23:30:00

--------------------------------------------------------------------------------


In [162]:
# reminder of time boundaries
print("Validation Time Boundaries:")
print(f"\tStart Time: {validation_pldf['timestamp'].min()}")
print(f"\tEnd Time: {validation_pldf['timestamp'].max()}")
print(f"\tTotal Duration: {validation_pldf['timestamp'].max() - validation_pldf['timestamp'].min()}")
print("\n" + "-" * 80)

Validation Time Boundaries:
	Start Time: 2024-01-01 00:00:00+05:30
	End Time: 2024-05-31 23:30:00+05:30
	Total Duration: 151 days, 23:30:00

--------------------------------------------------------------------------------


In [163]:
# reminder of time boundaries
print("Test Time Boundaries:")
print(f"\tStart Time: {test_pldf['timestamp'].min()}")
print(f"\tEnd Time: {test_pldf['timestamp'].max()}")
print(f"\tTotal Duration: {test_pldf['timestamp'].max() - test_pldf['timestamp'].min()}")
print("\n" + "-" * 80)

Test Time Boundaries:
	Start Time: 2024-06-01 00:00:00+05:30
	End Time: 2025-05-31 23:30:00+05:30
	Total Duration: 364 days, 23:30:00

--------------------------------------------------------------------------------


## Data Processing

In [164]:
# Conversion to Pandas DataFrame for compatibility with existing code
train_df = train_pldf.to_pandas()
validation_df = validation_pldf.to_pandas()
test_df = test_pldf.to_pandas()

In [165]:
print("Columns in Train DataFrame:")
print(train_df.columns.tolist())
print("\nColumns in Validation DataFrame:")
print(validation_df.columns.tolist())
print("\nColumns in Test DataFrame:")
print(test_df.columns.tolist())

Columns in Train DataFrame:
['timestamp', 'city', 'land_latitude', 'land_longitude', 'wind_speed_mps', 'wind_direction_meteorological', 'temperature_celsius', 'precipitation_mm', 'surface_net_solar_radiation_kWh_per_m2', 'surface_solar_radiation_downwards_kWh_per_m2', 'surface_net_solar_radiation_joules_per_m2', 'surface_solar_radiation_downwards_joules_per_m2', 'total_cloud_cover', 'high_cloud_cover', 'medium_cloud_cover', 'low_cloud_cover', 'thermal_generation', 'gas_generation', 'hydro_generation', 'nuclear_generation', 'renewable_generation', 'total_generation', 'demand_met', 'non_renewable_generation', 'tons_co2', 'g_co2_per_kwh', 'tons_co2_per_mwh', 'wind_dir_cardinal_8', 'wind_dir_cardinal_16', 'wind_dir_cardinal_4']

Columns in Validation DataFrame:
['timestamp', 'city', 'land_latitude', 'land_longitude', 'wind_speed_mps', 'wind_direction_meteorological', 'temperature_celsius', 'precipitation_mm', 'surface_net_solar_radiation_kWh_per_m2', 'surface_solar_radiation_downwards_kWh_

### Original Analysis

In [166]:
# marginal_emissions_results_directory
# marginal_emissions_logs_directory

# marginal_emissions_prefix

#### Manual Construction

In [167]:
feature_addition_pipeline = Pipeline([
    ("Add_Datetime_Features", DateTimeFeatureAdder(timestamp_col="timestamp")),
    ("Add_Original_Analysis_Features", AnalysisFeatureAdder(timestamp_col="timestamp", demand_met_col="demand_met", co2_col="tons_co2")),
])
feature_addition_pipeline.name = "FeatureAdditionPipeline"
train_original_added_features_df = feature_addition_pipeline.fit_transform(train_df)

In [168]:
print(feature_addition_pipeline.name)

FeatureAdditionPipeline


In [169]:
# Columns in the training set (post feature transformation):
print("Columns in Training Set (post feature transformation):")
print(train_original_added_features_df.columns)

Columns in Training Set (post feature transformation):
Index(['timestamp', 'city', 'land_latitude', 'land_longitude',
       'wind_speed_mps', 'wind_direction_meteorological',
       'temperature_celsius', 'precipitation_mm',
       'surface_net_solar_radiation_kWh_per_m2',
       'surface_solar_radiation_downwards_kWh_per_m2',
       'surface_net_solar_radiation_joules_per_m2',
       'surface_solar_radiation_downwards_joules_per_m2', 'total_cloud_cover',
       'high_cloud_cover', 'medium_cloud_cover', 'low_cloud_cover',
       'thermal_generation', 'gas_generation', 'hydro_generation',
       'nuclear_generation', 'renewable_generation', 'total_generation',
       'demand_met', 'non_renewable_generation', 'tons_co2', 'g_co2_per_kwh',
       'tons_co2_per_mwh', 'wind_dir_cardinal_8', 'wind_dir_cardinal_16',
       'wind_dir_cardinal_4', 'year', 'month', 'week_of_year', 'day', 'hour',
       'half_hour', 'day_of_week', 'is_weekend', 'time_id', 'demand_met_sqrd',
       'log_demand_met

In [170]:
# Original Columns from the R Analysis
# y_var = "tons_co2"
# x_vars = Q : "demand_met", Q2 : "demand_met_sqrd"
# fe_vars = mo: "month", h: "hour"
# group_col = "k" (from MultiQuantileBinner on ssr : "surface_net_solar_radiation_kwh" and v2: "wind_speed"

# Dropping to only the relevant columns for reproducing MultiQuantileBinner & Regresion
x_original_relevant_columns = [
    "demand_met", "demand_met_sqrd",
    "surface_net_solar_radiation_kWh_per_m2", "wind_speed_mps",
    "month", "hour",
]
y_original_relevant_columns = ["tons_co2"]

x_original_train_added_features_df = train_original_added_features_df[x_original_relevant_columns]
y_original_train_added_features_df = train_original_added_features_df[y_original_relevant_columns]

# confirm the columns in the DataFrame
print("Columns in x_original_train_added_features_df:")
print(x_original_train_added_features_df.columns)
print("Columns in y_original_train_added_features_df:")
print(y_original_train_added_features_df.columns)

Columns in x_original_train_added_features_df:
Index(['demand_met', 'demand_met_sqrd',
       'surface_net_solar_radiation_kWh_per_m2', 'wind_speed_mps', 'month',
       'hour'],
      dtype='object')
Columns in y_original_train_added_features_df:
Index(['tons_co2'], dtype='object')


In [171]:
# Add the same features to the validation and test sets
validation_added_features_df = feature_addition_pipeline.transform(validation_df)
test_added_features_df = feature_addition_pipeline.transform(test_df)

# Split to X and y for validation and test sets
x_validation_added_features_df = validation_added_features_df[x_original_relevant_columns]
y_validation_added_features_df = validation_added_features_df[y_original_relevant_columns]
x_test_added_features_df = test_added_features_df[x_original_relevant_columns]
y_test_added_features_df = test_added_features_df[y_original_relevant_columns]


##### Multi-Quantile Binning Models

In [172]:
# Run the MultiQuantileBinner with original parameters
original_multi_binner = MultiQuantileBinner(
    bin_specs={
        "surface_net_solar_radiation_kWh_per_m2": 5,
        "wind_speed_mps": 5,
    },
    group_col_name="original_quantile_group_id"
)
# Fit the binner on the x_original_train_added_features_df
original_multi_binner.fit(x_original_train_added_features_df)

# Transform the DataFrame to get the group IDs
x_original_multi_binner_train_added_features_df = original_multi_binner.transform(x_original_train_added_features_df)

# Checking the columns in the binned DataFrame
print("Columns in x_original_multi_binner_train_added_features_df:")
print(x_original_multi_binner_train_added_features_df.columns)

Columns in x_original_multi_binner_train_added_features_df:
Index(['demand_met', 'demand_met_sqrd',
       'surface_net_solar_radiation_kWh_per_m2', 'wind_speed_mps', 'month',
       'hour', 'surface_net_solar_radiation_kWh_per_m2_group',
       'wind_speed_mps_group', 'original_quantile_group_id'],
      dtype='object')


In [173]:
# Run the GroupwiseRegressor with these original parameters
original_quantile_regressor = GroupwiseRegressor(
    y_var="tons_co2",
    x_vars=["demand_met", "demand_met_sqrd"],
    fe_vars=["month", "hour"],
    group_col="original_quantile_group_id",
    min_group_size=10,
    track_metrics=True,
    verbose=True
)
# Fit and transform the binned DataFrame
original_quantile_regressor_result_df = original_quantile_regressor.fit_transform(x_original_multi_binner_train_added_features_df, y_original_train_added_features_df)
# Checking the columns in the result DataFrame
print("Columns in result_df:")
print(original_quantile_regressor_result_df.columns)



Columns in result_df:
Index(['demand_met', 'demand_met_sqrd',
       'surface_net_solar_radiation_kWh_per_m2', 'wind_speed_mps', 'month',
       'hour', 'surface_net_solar_radiation_kWh_per_m2_group',
       'wind_speed_mps_group', 'original_quantile_group_id', 'alpha1',
       'alpha2', 'ME'],
      dtype='object')


In [174]:
# Binning the validation and test sets
x_original_multi_binner_validation_added_features_df = original_multi_binner.transform(x_validation_added_features_df)
x_original_multi_binner_test_added_features_df = original_multi_binner.transform(x_test_added_features_df)

# Run the GroupwiseRegressor on the validation set
original_quantile_regressor_validation_result_df = original_quantile_regressor.transform(pd.concat([x_original_multi_binner_validation_added_features_df, y_validation_added_features_df], axis=1))
# Run the GroupwiseRegressor on the test set
original_quantile_regressor_test_result_df = original_quantile_regressor.transform(pd.concat([x_original_multi_binner_test_added_features_df, y_test_added_features_df], axis=1))


In [175]:
original_quantile_regressor_test_result_df

Unnamed: 0,demand_met,demand_met_sqrd,surface_net_solar_radiation_kWh_per_m2,wind_speed_mps,month,hour,surface_net_solar_radiation_kWh_per_m2_group,wind_speed_mps_group,original_quantile_group_id,tons_co2,alpha1,alpha2,ME
0,231107.666667,5.341075e+10,0.0,3.454928,6,0,1,5,5,85191.05750,0.677361,-7.758257e-07,0.318763
1,231107.666667,5.341075e+10,0.0,2.886852,6,0,1,5,5,85191.05750,0.677361,-7.758257e-07,0.318763
2,231107.666667,5.341075e+10,0.0,2.309101,6,0,1,4,4,85191.05750,0.674336,-7.300676e-07,0.336888
3,231107.666667,5.341075e+10,0.0,3.021624,6,0,1,5,5,85191.05750,0.677361,-7.758257e-07,0.318763
4,231107.666667,5.341075e+10,0.0,4.143778,6,0,1,6,6,85191.05750,0.533647,-3.042182e-07,0.393033
...,...,...,...,...,...,...,...,...,...,...,...,...,...
788395,212822.750000,4.529352e+10,0.0,3.516465,5,23,1,5,5,74410.90595,0.677361,-7.758257e-07,0.347135
788396,212822.750000,4.529352e+10,0.0,3.293586,5,23,1,5,5,74410.90595,0.677361,-7.758257e-07,0.347135
788397,212822.750000,4.529352e+10,0.0,3.021289,5,23,1,5,5,74410.90595,0.677361,-7.758257e-07,0.347135
788398,212822.750000,4.529352e+10,0.0,2.742792,5,23,1,5,5,74410.90595,0.677361,-7.758257e-07,0.347135


In [176]:
original_quantile_regressor.get_metrics(True)

Unnamed: 0,original_quantile_group_id,r2,rmse,mae,mape,n_obs
0,2,0.909125,2390.234231,1833.218307,2.946539,167292
1,3,0.918085,2310.64913,1780.402866,2.873374,201501
2,4,0.914882,2322.10986,1789.443365,2.874476,196531
3,5,0.911333,2253.774303,1728.309114,2.775676,161286
4,6,0.914709,2314.663837,1793.099022,2.963679,102328
5,9,0.892524,2238.845549,1473.134534,2.292101,39175
6,10,0.91267,2022.037759,1351.058584,2.098423,52028
7,11,0.905317,1987.450534,1372.955713,2.127739,57666
8,12,0.910076,1912.321243,1373.581438,2.137938,42637
9,13,0.893904,2111.793039,1523.494217,2.377541,15941


##### Median Binning Models

In [177]:
# Run the MultiMedianBinner with original parameters
original_median_binner = MultiMedianBinner(
    variables=[
        "surface_net_solar_radiation_kWh_per_m2",
        "wind_speed_mps"
    ],
    group_col_name="median_group_id",
)
# Fit the binner on the x_original_train_added_features_df
original_median_binner.fit(x_original_train_added_features_df)

# Transform the DataFrame to get the group IDs
x_original_median_binner_train_added_features_df = original_median_binner.transform(x_original_train_added_features_df)

# Checking the columns in the binned DataFrame
print("Columns in x_original_median_binner_train_added_features_df:")
print(x_original_median_binner_train_added_features_df.columns)

Columns in x_original_median_binner_train_added_features_df:
Index(['demand_met', 'demand_met_sqrd',
       'surface_net_solar_radiation_kWh_per_m2', 'wind_speed_mps', 'month',
       'hour', 'median_group_id',
       'surface_net_solar_radiation_kWh_per_m2_group', 'wind_speed_mps_group'],
      dtype='object')


In [178]:
# Run the GroupwiseRegressor with these original parameters
original_median_regressor = GroupwiseRegressor(
    y_var="tons_co2",
    x_vars=["demand_met", "demand_met_sqrd"],
    fe_vars=["month", "hour"],
    group_col="median_group_id",
    min_group_size=10,
    track_metrics=True,
    verbose=True
)
# Fit and transform the binned DataFrame
result_df = original_median_regressor.fit_transform(x_original_median_binner_train_added_features_df, y_original_train_added_features_df)
# Checking the columns in the result DataFrame
print("Columns in result_df:")
print(result_df.columns)

Columns in result_df:
Index(['demand_met', 'demand_met_sqrd',
       'surface_net_solar_radiation_kWh_per_m2', 'wind_speed_mps', 'month',
       'hour', 'median_group_id',
       'surface_net_solar_radiation_kWh_per_m2_group', 'wind_speed_mps_group',
       'alpha1', 'alpha2', 'ME'],
      dtype='object')


In [179]:
# Binning the validation and test sets
x_original_median_binner_validation_added_features_df = original_median_binner.transform(x_validation_added_features_df)
x_original_median_binner_test_added_features_df = original_median_binner.transform(x_test_added_features_df)

# Run the GroupwiseRegressor on the validation set
original_median_regressor_validation_result_df = original_median_regressor.transform(pd.concat([x_original_median_binner_validation_added_features_df, y_validation_added_features_df], axis=1))
# Run the GroupwiseRegressor on the test set
original_median_regressor_test_result_df = original_median_regressor.transform(pd.concat([x_original_median_binner_test_added_features_df, y_test_added_features_df], axis=1))

In [180]:
original_median_regressor_test_result_df

Unnamed: 0,demand_met,demand_met_sqrd,surface_net_solar_radiation_kWh_per_m2,wind_speed_mps,month,hour,median_group_id,surface_net_solar_radiation_kWh_per_m2_group,wind_speed_mps_group,tons_co2,alpha1,alpha2,ME
0,231107.666667,5.341075e+10,0.0,3.454928,6,0,2,0,1,85191.05750,0.609027,-5.970059e-07,0.333082
1,231107.666667,5.341075e+10,0.0,2.886852,6,0,2,0,1,85191.05750,0.609027,-5.970059e-07,0.333082
2,231107.666667,5.341075e+10,0.0,2.309101,6,0,1,0,0,85191.05750,0.618650,-6.012432e-07,0.340746
3,231107.666667,5.341075e+10,0.0,3.021624,6,0,2,0,1,85191.05750,0.609027,-5.970059e-07,0.333082
4,231107.666667,5.341075e+10,0.0,4.143778,6,0,2,0,1,85191.05750,0.609027,-5.970059e-07,0.333082
...,...,...,...,...,...,...,...,...,...,...,...,...,...
788395,212822.750000,4.529352e+10,0.0,3.516465,5,23,2,0,1,74410.90595,0.609027,-5.970059e-07,0.354914
788396,212822.750000,4.529352e+10,0.0,3.293586,5,23,2,0,1,74410.90595,0.609027,-5.970059e-07,0.354914
788397,212822.750000,4.529352e+10,0.0,3.021289,5,23,2,0,1,74410.90595,0.609027,-5.970059e-07,0.354914
788398,212822.750000,4.529352e+10,0.0,2.742792,5,23,2,0,1,74410.90595,0.609027,-5.970059e-07,0.354914


In [181]:
print(original_median_regressor.get_metrics(summarise=True))

   median_group_id        r2         rmse          mae      mape   n_obs
0                1  0.905345  2357.142913  1756.956437  2.802556  671350
1                2  0.900719  2336.512436  1790.228601  2.877601  511252
2                3  0.907930  2471.083530  1887.122476  3.152542  511250
3                4  0.894911  2546.163130  1979.647324  3.290793  671348


#### Pipeline Development

In [182]:
# Feature Engineering Pipeline
feature_addition_pipeline = Pipeline([
    ("Add_Datetime_Features", DateTimeFeatureAdder(timestamp_col="timestamp")),
    ("Add_Original_Analysis_Features", AnalysisFeatureAdder(timestamp_col="timestamp", demand_met_col="demand_met", co2_col="tons_co2")),
])

In [183]:
# assuming full_pipeline = Pipeline([...,"regressor", reg])
train_pdf_x_all = train_df.drop(columns=["tons_co2"])
train_pdf_y = train_df["tons_co2"]

validation_pdf_x_all = validation_df.drop(columns=["tons_co2"])
validation_pdf_y = validation_df["tons_co2"]
test_pdf_x_all = test_df.drop(columns=["tons_co2"])
test_pdf_y = test_df["tons_co2"]


##### Quantiles

In [184]:
# BINNERS
original_multi_binner = MultiQuantileBinner(
    bin_specs={
        "surface_net_solar_radiation_kWh_per_m2": 5,
        "wind_speed_mps": 5,
    },
    group_col_name="original_quantile_group_id"
)

In [185]:
# REGRESSORS
original_multi_binner_regressor = GroupwiseRegressor(
    y_var="tons_co2",
    x_vars=["demand_met", "demand_met_sqrd"],
    fe_vars=["month", "hour"],
    group_col="original_quantile_group_id",
    min_group_size=20,
    track_metrics=True,
    verbose=True
)

In [186]:
# PIPELINES
original_multi_binner_regressor_pipeline = Pipeline([
    ("Feature_Addition", feature_addition_pipeline),
    ("Multi_Quantile_Binner", original_multi_binner),
    ("Groupwise_Regressor", original_multi_binner_regressor)
])

In [187]:
# # TESTING - note that this will run regardless - so uncomment only if you want it run
# train_logs, x_cols_used_train, _ = run_regressor_model(original_multi_binner_regressor_pipeline, train_pdf_x_all, train_pdf_y, split_name="train")
# val_logs, x_cols_used_val, _ = run_regressor_model(original_multi_binner_regressor_pipeline, validation_pdf_x_all, validation_pdf_y, split_name="validation")
# test_logs, x_cols_used_test, _ = run_regressor_model(original_multi_binner_regressor_pipeline, test_pdf_x_all, test_pdf_y, split_name="test")
# summarise_metrics_logs(train_logs=train_logs, val_logs=val_logs, test_logs=test_logs, user_pipeline=original_multi_binner_regressor_pipeline, x_columns=x_cols_used_train)

In [188]:
#ORCHESTRATORS
regressor_orchestrator(
    user_pipeline=original_multi_binner_regressor_pipeline,
    x_splits={
        "train": train_pdf_x_all,
        "validation": validation_pdf_x_all,
        "test": test_pdf_x_all
    },
    y_splits={
        "train": train_pdf_y,
        "validation": validation_pdf_y,
        "test": test_pdf_y
    },
    random_state=12,
    group_col_name="quantile_group_id",
    interval_hours=0.5,
    results_dir = marginal_emissions_logs_directory,
    file_prefix = marginal_emissions_prefix,
    force_run=True,
    compute_test=False,
    force_overwrite=True,
    fsync=False,    # set to True when running on HPC
    max_log_mb=95,
)



[LOG] 25 rows for split=train, model_id=13b053372ad4d1d6601337e6d6bd236be66de3cff09dd16909159b65bce23758, random_state=12
[LOG] 25 rows for split=validation, model_id=13b053372ad4d1d6601337e6d6bd236be66de3cff09dd16909159b65bce23758, random_state=12
[SAVE] Appended to /Users/Daniel/Desktop/IRP_WORK_UPDATED/irp-dbk24/code_and_analysis/data/marginal_emissions_development/logs/marginal_emissions_results.part000.csv, index updated.


Unnamed: 0,model_id_hash,random_state,params_json,log_time,model_name,pipeline_steps,pipeline_n_steps,x_columns,metrics_by_group,r2_train,...,mape_validation_micro,r2_validation_energy_micro,rmse_validation_energy_micro,mae_validation_energy_micro,mape_validation_energy_micro,energy_MWh_validation_total,pooled_co2_train,fd_me_train,pooled_co2_validation,fd_me_validation
0,13b053372ad4d1d6601337e6d6bd236be66de3cff09dd1...,12,"{""Feature_Addition"":""Pipeline(steps=[('Add_Dat...",2025-08-16T23:30:38.565267,GroupwiseRegressor,"[Feature_Addition, Multi_Quantile_Binner, Grou...",3,"[demand_met, demand_met_sqrd, month, hour]","{'train': {2: {'r2': 0.9091252456040023, 'rmse...",0.904949,...,3.164732,0.695585,2909.583009,2341.624912,3.175503,31652940000.0,"{""r2"": 0.9094852785665317, ""rmse"": 2355.431764...","{""pearson_r"": -0.0010488140073209547, ""spearma...","{""r2"": 0.758011743909146, ""rmse"": 2935.3213851...","{""pearson_r"": 0.01864741184605731, ""spearman_r..."


In [189]:
all_logs = load_all_logs_rotating_csv(
    results_dir=marginal_emissions_logs_directory,
    file_prefix=marginal_emissions_prefix,
)
all_logs.head(8)

Unnamed: 0,model_id_hash,random_state,params_json,log_time,model_name,pipeline_steps,pipeline_n_steps,x_columns,metrics_by_group,r2_train,...,mape_validation_micro,r2_validation_energy_micro,rmse_validation_energy_micro,mae_validation_energy_micro,mape_validation_energy_micro,energy_MWh_validation_total,pooled_co2_train,fd_me_train,pooled_co2_validation,fd_me_validation
0,690f498780e0712e252cc93bcfd4abacd65e1efdf3fd1a...,12,"{""Feature_Addition"":""Pipeline(steps=[('Add_Dat...",2025-08-12T02:37:45.646345,GroupwiseRegressor,"['Feature_Addition', 'Multi_Median_Binner', 'G...",3,"['demand_met', 'demand_met_sqrd', 'month', 'ho...","{'train': {1: {'r2': 0.7759440466267724, 'rmse...",0.777564,...,3.762266,0.651356,3643.997308,2596.004372,3.774859,31629450000.0,"{""r2"": 0.7792042909467476, ""rmse"": 3942.115886...","{""pearson_r"": -0.0013750900776175363, ""spearma...","{""r2"": 0.6674819506158596, ""rmse"": 3666.654134...","{""pearson_r"": -0.0027290646933841333, ""spearma..."
1,ff562dc468b7f0d2d7dadb624dec64eb545fdfe735279f...,12,"{""Feature_Addition"":""Pipeline(steps=[('Add_Dat...",2025-08-12T02:38:45.678555,GroupwiseRegressor,"['Feature_Addition', 'Multi_Median_Binner', 'G...",3,"['demand_met', 'demand_met_sqrd', 'month', 'ho...","{'train': {1: {'r2': 0.8271049782262871, 'rmse...",0.79403,...,3.828647,0.593359,3712.061289,2664.312118,3.842705,31629450000.0,"{""r2"": 0.8006168219666828, ""rmse"": 3746.091056...","{""pearson_r"": -0.002654555749724851, ""spearman...","{""r2"": 0.6555052060512503, ""rmse"": 3732.103382...","{""pearson_r"": -0.0019075507390846269, ""spearma..."
2,13b053372ad4d1d6601337e6d6bd236be66de3cff09dd1...,12,"{""Feature_Addition"":""Pipeline(steps=[('Add_Dat...",2025-08-16T23:30:38.565267,GroupwiseRegressor,"['Feature_Addition', 'Multi_Quantile_Binner', ...",3,"['demand_met', 'demand_met_sqrd', 'month', 'ho...","{'train': {2: {'r2': 0.9091252456040023, 'rmse...",0.904949,...,3.164732,0.695585,2909.583009,2341.624912,3.175503,31652940000.0,"{""r2"": 0.9094852785665317, ""rmse"": 2355.431764...","{""pearson_r"": -0.0010488140073209547, ""spearma...","{""r2"": 0.758011743909146, ""rmse"": 2935.3213851...","{""pearson_r"": 0.01864741184605731, ""spearman_r..."


##### Medians

In [190]:
# BINNING PIPELINES
original_median_binner = MultiMedianBinner(
    variables=[
        "surface_net_solar_radiation_kWh_per_m2",
        "wind_speed_mps"
    ],
    group_col_name="median_group_id",
)

median_binner_v1 = MultiMedianBinner(
    variables=[
        "surface_net_solar_radiation_kWh_per_m2",
        "wind_speed_mps",
        "temperature_celsius",
    ],
    group_col_name="median_group_id",
)

In [191]:
# REGRESSORS
original_median_regressor = GroupwiseRegressor(
    y_var="tons_co2",
    x_vars=["demand_met", "demand_met_sqrd"],
    fe_vars=["month", "hour"],
    group_col="median_group_id",
    min_group_size=20,
    track_metrics=True,
    verbose=True
)

median_regressor_v1 = GroupwiseRegressor(
    y_var="tons_co2",
    x_vars=["demand_met", "demand_met_sqrd"],
    fe_vars=["month", "hour", "week_of_year"],
    group_col="median_group_id",
    min_group_size=20,
    track_metrics=True,
    verbose=True
)

In [192]:
# REGRESSOR PIPELINES
original_median_regressor_pipeline = Pipeline([
    ("Feature_Addition", feature_addition_pipeline),
    ("Multi_Median_Binner", original_median_binner),
    ("Groupwise_Regressor", original_median_regressor)
])

median_regressor_pipeline_v1 = Pipeline([
    ("Feature_Addition", feature_addition_pipeline),
    ("Multi_Median_Binner", median_binner_v1),
    ("Groupwise_Regressor", median_regressor_v1)
])


In [193]:
# TESTING - note that this will run regardless - so
# train_logs, x_cols_used_train, _ = run_regressor_model(original_median_regressor_pipeline, train_pdf_x_all, train_pdf_y, split_name="train")
# val_logs, x_cols_used_val, _ = run_regressor_model(original_median_regressor_pipeline, validation_pdf_x_all, validation_pdf_y, split_name="validation")
# test_logs, x_cols_used_test, _ = run_regressor_model(original_median_regressor_pipeline, test_pdf_x_all, test_pdf_y, split_name="test")
# summarise_metrics_logs(train_logs, val_logs=val_logs, test_logs, original_median_regressor_pipeline, x_cols_used_train)

In [194]:
# ORCHESTRATORS
regressor_orchestrator(
    user_pipeline=original_median_regressor_pipeline,
    x_splits={
        "train": train_pdf_x_all,
        "validation": validation_pdf_x_all,
        "test": test_pdf_x_all
    },
    y_splits={
        "train": train_pdf_y,
        "validation": validation_pdf_y,
        "test": test_pdf_y
    },
    interval_hours=0.5,
    random_state=12,
    group_col_name="median_group_id",
    force_run=True,
    force_overwrite=True,
    compute_test=False,
    results_dir=marginal_emissions_logs_directory,
    file_prefix=marginal_emissions_prefix,
    max_log_mb=95,
    fsync=False      # set to True when running on HPC
)

regressor_orchestrator(
    user_pipeline=median_regressor_pipeline_v1,
    x_splits={
        "train": train_pdf_x_all,
        "validation": validation_pdf_x_all,
        "test": test_pdf_x_all
    },
    y_splits={
        "train": train_pdf_y,
        "validation": validation_pdf_y,
        "test": test_pdf_y
    },
    interval_hours=0.5,
    random_state=12,
    group_col_name="median_group_id",
    force_run=True,
    force_overwrite=True,
    compute_test=False,
    results_dir=marginal_emissions_logs_directory,
    file_prefix=marginal_emissions_prefix,
    max_log_mb=95,
    fsync=False      # set to True when running on HPC
)

[LOG] 4 rows for split=train, model_id=690f498780e0712e252cc93bcfd4abacd65e1efdf3fd1a2cd607095eff5f0af2, random_state=12
[LOG] 4 rows for split=validation, model_id=690f498780e0712e252cc93bcfd4abacd65e1efdf3fd1a2cd607095eff5f0af2, random_state=12
[SAVE] Appended to /Users/Daniel/Desktop/IRP_WORK_UPDATED/irp-dbk24/code_and_analysis/data/marginal_emissions_development/logs/marginal_emissions_results.part000.csv, index updated.
[LOG] 8 rows for split=train, model_id=ff562dc468b7f0d2d7dadb624dec64eb545fdfe735279f75661c855f018354f3, random_state=12
[LOG] 8 rows for split=validation, model_id=ff562dc468b7f0d2d7dadb624dec64eb545fdfe735279f75661c855f018354f3, random_state=12
[SAVE] Appended to /Users/Daniel/Desktop/IRP_WORK_UPDATED/irp-dbk24/code_and_analysis/data/marginal_emissions_development/logs/marginal_emissions_results.part000.csv, index updated.


Unnamed: 0,model_id_hash,random_state,params_json,log_time,model_name,pipeline_steps,pipeline_n_steps,x_columns,metrics_by_group,r2_train,...,mape_validation_micro,r2_validation_energy_micro,rmse_validation_energy_micro,mae_validation_energy_micro,mape_validation_energy_micro,energy_MWh_validation_total,pooled_co2_train,fd_me_train,pooled_co2_validation,fd_me_validation
0,ff562dc468b7f0d2d7dadb624dec64eb545fdfe735279f...,12,"{""Feature_Addition"":""Pipeline(steps=[('Add_Dat...",2025-08-16T23:32:20.812324,GroupwiseRegressor,"[Feature_Addition, Multi_Median_Binner, Groupw...",3,"[demand_met, demand_met_sqrd, month, hour, wee...","{'train': {1: {'r2': 0.9257890589417794, 'rmse...",0.92662,...,3.113843,0.712657,2837.511095,2325.352098,3.131266,31653130000.0,"{""r2"": 0.9255345804892087, ""rmse"": 2136.427626...","{""pearson_r"": -0.0014954966078752142, ""spearma...","{""r2"": 0.7719652792738894, ""rmse"": 2849.449093...","{""pearson_r"": 0.016084754569606532, ""spearman_..."


In [None]:
testing_output_file = os.path.join(marginal_emissions_results_directory, "original_quantile_bins_marginal_emissions_timeseries.parquet")

In [221]:
base_pldf.describe()

statistic,timestamp,city,land_latitude,land_longitude,world_latitude,world_longitude,wind_speed_mps,wind_direction_meteorological,temperature_celsius,precipitation_mm,surface_net_solar_radiation_kWh_per_m2,surface_solar_radiation_downwards_kWh_per_m2,surface_net_solar_radiation_joules_per_m2,surface_solar_radiation_downwards_joules_per_m2,total_cloud_cover,high_cloud_cover,medium_cloud_cover,low_cloud_cover,distance_between_locations_meters,processing_operations_log,thermal_generation,gas_generation,hydro_generation,nuclear_generation,renewable_generation,total_generation,demand_met,non_renewable_generation,tons_co2,g_co2_per_kwh,tons_co2_per_mwh,wind_dir_cardinal_8,wind_dir_cardinal_16,wind_dir_cardinal_4
str,str,str,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,str,str,str
"""count""","""3481920""","""3481920""",3481920.0,3481920.0,3481920.0,3481920.0,3481920.0,3481920.0,3481920.0,3481920.0,3481920.0,3481920.0,3481920.0,3481920.0,3481920.0,3481920.0,3481920.0,3481920.0,3481920.0,1740960.0,3481920.0,3481920.0,3481920.0,3481920.0,3481920.0,3481920.0,3481920.0,3481920.0,3481920.0,3481920.0,3481920.0,"""3481920""","""3481920""","""3481920"""
"""null_count""","""0""","""0""",0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1740960.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,"""0""","""0""","""0"""
"""mean""","""2023-03-17 23:45:00+05:30""",,25.393333,75.701111,25.393333,75.684448,2.63187,209.982468,25.316455,0.071264,82469.322585,98989.168512,326390.622348,390807.372846,0.359816,0.264307,0.152118,0.107959,0.036667,,133038.493588,3488.657373,17616.983083,5208.884506,19337.656218,178684.081988,177287.744297,159353.018551,65601.301985,738.359913,0.73836,,,
"""std""",,,4.538596,1.969453,4.538596,1.936544,1.382159,109.031609,6.983781,0.309506,120344.556357,144109.410049,443361.492633,529310.34544,0.392301,0.381599,0.245868,0.22307,0.004713,,17973.982837,1459.834657,8805.004409,789.721015,15436.03158,23615.234305,23535.203493,19941.80274,8839.633217,78.292944,0.078293,,,
"""min""","""2021-01-01 00:00:00+05:30""",,18.5,72.77,18.5,72.800003,0.002416,0.000122,2.149261,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.03,,75325.666667,109.483135,2516.0,2281.083333,0.0,106969.666667,105140.916667,95292.666667,37529.27105,483.357945,0.483358,"""E""","""E""","""E"""
"""25%""","""2022-02-08 00:00:00+05:30""",,19.2,72.97,19.200001,73.0,1.691567,108.210403,21.588165,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.030002,,120050.666667,2438.416667,9761.166667,4668.166667,7047.833333,162359.083333,160999.75,145527.666667,59239.9249,685.649915,0.68565,,,
"""50%""","""2023-03-18 00:00:00+05:30""",,28.5,76.94,28.5,76.900002,2.411643,246.204437,26.381775,0.0,0.307726,0.360172,12536.50589,15013.505001,0.175415,0.0,0.01825,0.0,0.039997,,134184.5,3060.5,16623.25,5214.083333,13690.583333,179752.833333,178227.333325,159159.833333,66137.34125,740.288715,0.740289,,,
"""75%""","""2024-04-23 23:30:00+05:30""",,28.7,77.14,28.700001,77.099998,3.309765,304.86676,29.635712,0.002179,157134.975143,189414.069345,649476.992699,779902.687006,0.776154,0.551407,0.21228,0.09198,0.04,,145811.583333,4216.25,24520.083333,5757.75,28750.416667,196273.0,194806.833333,173136.916667,71813.8573,802.991573,0.802992,,,
"""max""","""2025-05-31 23:30:00+05:30""",,28.8,77.34,28.799999,77.300003,19.006941,359.999786,46.374786,16.104477,465916.895282,530511.99554,1677300.0,1909800.0,1.0,1.0,1.0,1.0,0.040003,,177508.083333,12915.75,43162.083333,7070.0,75828.666667,251763.666667,249591.916667,217778.833333,88742.75695,905.09159,0.905092,"""W""","""WSW""","""W"""


In [203]:
full_df = base_pldf.to_pandas()

In [204]:
full_df_x = full_df.drop(columns=["tons_co2"])
full_df_y = full_df["tons_co2"]

In [207]:
me_out = fit_and_export_marginal_emissions_full(
    pipeline=original_multi_binner_regressor_pipeline,
    X_full=full_df_x,
    y_full=full_df_y,
    out_parquet_path=testing_output_file,
    id_cols=["timestamp", "city", "land_longitude", "land_latitude"],            # include any other IDs you care about
    include_params=True,                      # include alpha1/alpha2 for auditability
    keep_cols=["demand_met", "tons_co2"],     # optional handy columns
)



[SAVE] Wrote marginal emissions to data/marginal_emissions_development/results/testing_marginal_emissions_timeseries.parquet (rows=3,481,920)


In [208]:
print(f"The type of the [me_out] DataFrame is: {type(me_out)}")
# convert to polars dataframe
me_out_pldf = pl.DataFrame(me_out)
print(f"The type of the [me_out_pldf] DataFrame is: {type(me_out_pldf)}")

The type of the [me_out] DataFrame is: <class 'pandas.core.frame.DataFrame'>
The type of the [me_out_pldf] DataFrame is: <class 'polars.dataframe.frame.DataFrame'>


In [209]:
me_out_pldf.sample(10)

timestamp,city,land_longitude,land_latitude,ME,alpha1,alpha2,original_quantile_group_id,demand_met
"datetime[μs, Asia/Kolkata]",cat,f64,f64,f64,f64,f64,i64,f64
2024-06-11 08:00:00 IST,"""delhi""",76.84,28.4,0.281945,1.026967,-2e-06,33,202137.583333
2023-10-14 14:00:00 IST,"""delhi""",76.94,28.8,0.245039,0.798551,-1e-06,19,211772.75
2021-11-22 19:00:00 IST,"""mumbai""",72.97,19.1,0.433756,0.805138,-1e-06,3,162786.25
2023-01-27 10:30:00 IST,"""delhi""",76.94,28.5,0.291099,0.757302,-1e-06,20,202182.25
2023-06-19 19:30:00 IST,"""delhi""",77.14,28.7,0.35504,0.805138,-1e-06,3,197289.666667
2022-07-17 21:00:00 IST,"""mumbai""",72.97,18.9,0.398066,0.682504,-8.1658e-07,6,174163.833333
2021-11-06 03:00:00 IST,"""delhi""",76.84,28.8,0.509305,0.798551,-1e-06,19,110665.0
2022-09-03 16:30:00 IST,"""delhi""",77.04,28.8,0.363809,0.805138,-1e-06,3,193446.0
2021-11-22 04:30:00 IST,"""delhi""",76.84,28.4,0.566409,0.975689,-2e-06,26,114676.25
2023-05-31 19:00:00 IST,"""delhi""",76.84,28.7,0.389098,0.784999,-1e-06,2,183418.25


In [210]:
me_out_pldf.describe()

statistic,timestamp,city,land_longitude,land_latitude,ME,alpha1,alpha2,original_quantile_group_id,demand_met
str,str,str,f64,f64,f64,f64,f64,f64,f64
"""count""","""3481920""","""3481920""",3481920.0,3481920.0,3481919.0,3481919.0,3481919.0,3481920.0,3481920.0
"""null_count""","""0""","""0""",0.0,0.0,1.0,1.0,1.0,0.0,0.0
"""mean""","""2023-03-17 23:45:00+05:30""",,75.701111,25.393333,0.373358,0.887762,-1e-06,16.530667,177287.744297
"""std""",,,1.969453,4.538596,0.073292,0.112286,3.4511e-07,11.399828,23535.203493
"""min""","""2021-01-01 00:00:00+05:30""",,72.77,18.5,0.101938,0.682504,-2e-06,2.0,105140.916667
"""25%""","""2022-02-08 00:00:00+05:30""",,72.97,19.2,0.326649,0.798551,-2e-06,4.0,160999.75
"""50%""","""2023-03-18 00:00:00+05:30""",,76.94,28.5,0.373625,0.869218,-1e-06,17.0,178227.333325
"""75%""","""2024-04-23 23:30:00+05:30""",,77.14,28.7,0.423545,1.009042,-1e-06,26.0,194806.833333
"""max""","""2025-05-31 23:30:00+05:30""",,77.34,28.8,0.635273,1.06163,-8.1658e-07,34.0,249591.916667


In [212]:
display(me_out_pldf.filter(
    pl.col("ME").is_null()
))

timestamp,city,land_longitude,land_latitude,ME,alpha1,alpha2,original_quantile_group_id,demand_met
"datetime[μs, Asia/Kolkata]",cat,f64,f64,f64,f64,f64,i64,f64
2024-05-31 02:00:00 IST,"""mumbai""",72.87,19.3,,,,15,218235.916667


In [217]:
display(me_out_pldf.filter(
    pl.col("timestamp") > datetime(2024,5,31,0,0, tzinfo=ZoneInfo("Asia/Kolkata")),
    pl.col("land_longitude").round(2) == 72.87,
    pl.col("land_latitude").round(2) == 19.30
    ).head(10)
)

timestamp,city,land_longitude,land_latitude,ME,alpha1,alpha2,original_quantile_group_id,demand_met
"datetime[μs, Asia/Kolkata]",cat,f64,f64,f64,f64,f64,i64,f64
2024-05-31 00:30:00 IST,"""mumbai""",72.87,19.3,0.276165,0.784577,-1e-06,9,226588.5
2024-05-31 01:00:00 IST,"""mumbai""",72.87,19.3,0.258134,0.796921,-1e-06,16,223699.916667
2024-05-31 01:30:00 IST,"""mumbai""",72.87,19.3,0.265286,0.796921,-1e-06,16,220730.75
2024-05-31 02:00:00 IST,"""mumbai""",72.87,19.3,,,,15,218235.916667
2024-05-31 02:30:00 IST,"""mumbai""",72.87,19.3,0.215375,1.024058,-2e-06,23,216169.333333
2024-05-31 03:00:00 IST,"""mumbai""",72.87,19.3,0.222551,1.024058,-2e-06,23,214251.166667
2024-05-31 03:30:00 IST,"""mumbai""",72.87,19.3,0.22889,1.024058,-2e-06,23,212556.583333
2024-05-31 04:00:00 IST,"""mumbai""",72.87,19.3,0.235012,1.024058,-2e-06,23,210920.083333
2024-05-31 04:30:00 IST,"""mumbai""",72.87,19.3,0.241282,0.991575,-2e-06,24,210233.916667
2024-05-31 05:00:00 IST,"""mumbai""",72.87,19.3,0.256864,1.055478,-2e-06,32,210322.416667


In [98]:
group_cols = ["land_longitude", "land_latitude"]  # exact location match
time_col = "timestamp"
window_min = 30

In [222]:
group_cols = ["land_longitude", "land_latitude"]  # exact location match
time_col = "timestamp"
window_min = 30
window = pl.duration(minutes=window_min)  # <-- compare durations to durations

# Only interpolate the columns that should be continuous
cols_to_fill = ["ME", "alpha1", "alpha2",]

me_out_pldf_sorted = me_out_pldf.sort(group_cols + [time_col])

def fill_col(c: str) -> pl.Expr:
    # nearest non-null timestamps/values within each location
    prev_ts  = (
        pl.when(pl.col(c).is_not_null()).then(pl.col(time_col)).otherwise(None)
        .forward_fill()
        .over(group_cols)
    )
    next_ts  = (
        pl.when(pl.col(c).is_not_null()).then(pl.col(time_col)).otherwise(None)
        .backward_fill()
        .over(group_cols)
    )

    prev_val = pl.col(c).forward_fill().over(group_cols)
    next_val = pl.col(c).backward_fill().over(group_cols)

    # linear interpolation (cast to float for safety)
    interp   = pl.col(c).cast(pl.Float64).interpolate().over(group_cols)

    # only fill if the gap on each side is within the window
    ok_prev = ((pl.col(time_col) - prev_ts) <= window).fill_null(False)
    ok_next = ((next_ts - pl.col(time_col)) <= window).fill_null(False)

    return (
        pl.when(pl.col(c).is_null() & ok_prev & ok_next).then(interp)     # interior gap -> linear
         .when(pl.col(c).is_null() & ok_prev & ~ok_next).then(prev_val)  # edge gap -> forward-fill
         .when(pl.col(c).is_null() & ~ok_prev & ok_next).then(next_val)  # edge gap -> backward-fill
         .otherwise(pl.col(c))
         .alias(c)
    )

me_out_pldf_filled = me_out_pldf_sorted.with_columns([fill_col(c) for c in cols_to_fill])

In [223]:
me_out_pldf_filled.describe()

statistic,timestamp,city,land_longitude,land_latitude,ME,alpha1,alpha2,original_quantile_group_id,demand_met
str,str,str,f64,f64,f64,f64,f64,f64,f64
"""count""","""3481920""","""3481920""",3481920.0,3481920.0,3481920.0,3481920.0,3481920.0,3481920.0,3481920.0
"""null_count""","""0""","""0""",0.0,0.0,0.0,0.0,0.0,0.0,0.0
"""mean""","""2023-03-17 23:45:00+05:30""",,75.701111,25.393333,0.373358,0.887762,-1e-06,16.530667,177287.744297
"""std""",,,1.969453,4.538596,0.073292,0.112286,3.4511e-07,11.399828,23535.203493
"""min""","""2021-01-01 00:00:00+05:30""",,72.77,18.5,0.101938,0.682504,-2e-06,2.0,105140.916667
"""25%""","""2022-02-08 00:00:00+05:30""",,72.97,19.2,0.326648,0.798551,-2e-06,4.0,160999.75
"""50%""","""2023-03-18 00:00:00+05:30""",,76.94,28.5,0.373625,0.869218,-1e-06,17.0,178227.333325
"""75%""","""2024-04-23 23:30:00+05:30""",,77.14,28.7,0.423545,1.009042,-1e-06,26.0,194806.833333
"""max""","""2025-05-31 23:30:00+05:30""",,77.34,28.8,0.635273,1.06163,-8.1658e-07,34.0,249591.916667


In [224]:
display(me_out_pldf_filled.filter(
    pl.col("timestamp") > datetime(2024,5,31,0,0, tzinfo=ZoneInfo("Asia/Kolkata")),
    pl.col("land_longitude").round(2) == 72.87,
    pl.col("land_latitude").round(2) == 19.30
    ).head(10)
)

timestamp,city,land_longitude,land_latitude,ME,alpha1,alpha2,original_quantile_group_id,demand_met
"datetime[μs, Asia/Kolkata]",cat,f64,f64,f64,f64,f64,i64,f64
2024-05-31 00:30:00 IST,"""mumbai""",72.87,19.3,0.276165,0.784577,-1e-06,9,226588.5
2024-05-31 01:00:00 IST,"""mumbai""",72.87,19.3,0.258134,0.796921,-1e-06,16,223699.916667
2024-05-31 01:30:00 IST,"""mumbai""",72.87,19.3,0.265286,0.796921,-1e-06,16,220730.75
2024-05-31 02:00:00 IST,"""mumbai""",72.87,19.3,0.24033,0.910489,-2e-06,15,218235.916667
2024-05-31 02:30:00 IST,"""mumbai""",72.87,19.3,0.215375,1.024058,-2e-06,23,216169.333333
2024-05-31 03:00:00 IST,"""mumbai""",72.87,19.3,0.222551,1.024058,-2e-06,23,214251.166667
2024-05-31 03:30:00 IST,"""mumbai""",72.87,19.3,0.22889,1.024058,-2e-06,23,212556.583333
2024-05-31 04:00:00 IST,"""mumbai""",72.87,19.3,0.235012,1.024058,-2e-06,23,210920.083333
2024-05-31 04:30:00 IST,"""mumbai""",72.87,19.3,0.241282,0.991575,-2e-06,24,210233.916667
2024-05-31 05:00:00 IST,"""mumbai""",72.87,19.3,0.256864,1.055478,-2e-06,32,210322.416667


In [225]:
# rewrite file to include filled data
try:
    me_out_pldf_filled.write_parquet(
        testing_output_file,
        compression="snappy",
        statistics=True
    )
    print(f"Filled data written successfully to {testing_output_file}")
except Exception as e:
    print(f"Error writing filled data: {e}")


Filled data written successfully to data/marginal_emissions_development/results/original_quantile_bins_marginal_emissions_timeseries.parquet


#### Grid Search

##### Quantile Binning

In [None]:
# Setting Up configurations
multi_quantile_param_grid = build_quantile_grid_configs(
    candidate_binning_vars=["surface_net_solar_radiation_kWh_per_m2", "wind_speed_mps", "temperature_celsius", "precipitation_mm", "total_cloud_cover"],
    candidate_bin_counts=[3, 5, 10, 20, 50,100],
    candidate_x_vars=["demand_met", "demand_met_sqrd"],
    candidate_fe_vars=["month", "hour", "week_of_year", "day_of_week", "half_hour"],
    x_var_length=2,
    binner_extra_grid={"oob_policy": ["clip"], "max_oob_rate": [0.05, 0.03, None], "retain_flags": [True]}
)

regressor_kwargs_q = {
    "y_var": "tons_co2",
    "x_vars": ["demand_met", "demand_met_sqrd"],  # default; overwritten per-config anyway
    "fe_vars": ["month", "hour"],
    "group_col": "quantile_group_id",
    "min_group_size": 20,
    "track_metrics": True,
    "verbose": False,
    "random_state": 12,
}

In [None]:
run_grid_search(
    base_feature_pipeline=feature_addition_pipeline,
    regressor_cls=GroupwiseRegressor,
    regressor_kwargs=regressor_kwargs_q,
    grid_config=multi_quantile_param_grid,           # pass the WHOLE list once
    results_dir=marginal_emissions_logs_directory, # rolling shards
    file_prefix=marginal_emissions_prefix,            # e.g. "me_grid"
    global_extra_info={"model_type": "multi_binner"},
    force_run=False,
    force_overwrite=False,
    base_feature_pipeline_name="FeatureAdditionPipeline",
    eval_splits=("train","validation"),               # explicit: no test during tuning
    x_splits={"train": train_pdf_x_all, "validation": validation_pdf_x_all, "test": test_pdf_x_all},
    y_splits={"train": train_pdf_y, "validation": validation_pdf_y, "test": test_pdf_y},
    max_log_mb=95,
    fsync = False,   # set to True when running on HPC
    log_path=None,  # legacy path unused when using rotating logs
)


[R0/0] [GRID 1/17298] qbin_3_surface_net_solar_radiation_kWh_per_m2__x_demand_met-demand_met_sqrd__fe_month__oobclip_rate0.05


KeyboardInterrupt: 

##### Median Binning

In [None]:
candidate_binning_vars = ["surface_net_solar_radiation_kWh_per_m2", "wind_speed_mps", "temperature_celsius", "precipitation_mm", "total_cloud_cover"]
candidate_x_vars = ["demand_met", "demand_met_sqrd"]
candidate_fe_vars = ["month", "hour", "week_of_year", "day_of_week", "half_hour"]

multi_median_param_grid = build_median_binner_configs(
    candidate_binning_vars=candidate_binning_vars,
    candidate_x_vars=candidate_x_vars,
    candidate_fe_vars=candidate_fe_vars,
    x_var_length=2,
)
regressor_kwargs_m = {
    "y_var": "tons_co2",
    "x_vars": ["demand_met", "demand_met_sqrd"],  # default; grid entries can override
    "fe_vars": ["month", "hour"],                  # default; grid entries can override
    "group_col": "median_group_id",                # MUST match binner's group_col_name
    "min_group_size": 20,
    "track_metrics": True,
    "verbose": False,
    "random_state": 12
}

In [None]:
run_grid_search(
    base_feature_pipeline=feature_addition_pipeline,
    regressor_cls=GroupwiseRegressor,
    regressor_kwargs=regressor_kwargs_m,
    grid_config=multi_median_param_grid,           # pass the entire list
    x_splits={"train": train_pdf_x_all, "validation": validation_pdf_x_all, "test": test_pdf_x_all},
    y_splits={"train": train_pdf_y, "validation": validation_pdf_y, "test": test_pdf_y},
    results_dir=marginal_emissions_logs_directory,
    file_prefix=marginal_emissions_prefix,
    log_path=None,  # keep None when using rotating logs
    global_extra_info={"model_type": "median_binner"},
    force_run=False,
    force_overwrite=False,
    base_feature_pipeline_name="FeatureAdditionPipeline",
    eval_splits=("train", "validation"),
    max_log_mb=95,
    fsync=False,   # set True on HPC if you want durable writes
)


[R0/0] [GRID 1/961] median_surface_net_solar_radiation_kWh_per_m2__x_demand_met-demand_met_sqrd__fe_month


KeyboardInterrupt: 

#### HPC

In [None]:
# run_grid_search_auto(
#     base_feature_pipeline=feature_addition_pipeline,
#     regressor_cls=GroupwiseRegressor,
#     regressor_kwargs=regressor_kwargs_q,
#     grid_config=multi_quantile_param_grid,
#     x_splits={"train": train_pdf_x_all, "validation": validation_pdf_x_all, "test": test_pdf_x_all},
#     y_splits={"train": train_pdf_y, "validation": validation_pdf_y, "test": test_pdf_y},
#     results_dir=marginal_emissions_logs_directory,             # shared filesystem path
#     file_prefix=marginal_emissions_prefix + "_quantile",       # separate shard family
#     max_log_mb=95,
#     fsync=True,                                                # durability on HPC
#     base_feature_pipeline_name="FeatureAdditionPipeline",
#     eval_splits=("train", "validation"),                       # no test during tuning
#     distribute="auto",                                         # "mpi" if MPI present, else "single"
#     dist_mode="stride",                                        # good default
#     force_run=False,
#     force_overwrite=False,
#     seed=12,
# )


[R0/0] [GRID 1/17298] qbin_3_surface_net_solar_radiation_kWh_per_m2__x_demand_met-demand_met_sqrd__fe_month__oobclip_rate0.05


KeyboardInterrupt: 

In [None]:
# run_grid_search_auto(
#     base_feature_pipeline=feature_addition_pipeline,
#     regressor_cls=GroupwiseRegressor,
#     regressor_kwargs=regressor_kwargs_m,
#     grid_config=multi_median_param_grid,
#     x_splits={"train": train_pdf_x_all, "validation": validation_pdf_x_all, "test": test_pdf_x_all},
#     y_splits={"train": train_pdf_y, "validation": validation_pdf_y, "test": test_pdf_y},
#     results_dir=marginal_emissions_logs_directory,
#     file_prefix=marginal_emissions_prefix + "_median",
#     max_log_mb=95,
#     fsync=True,
#     base_feature_pipeline_name="FeatureAdditionPipeline",
#     eval_splits=("train", "validation"),
#     distribute="auto",
#     dist_mode="stride",
#     force_run=False,
#     force_overwrite=False,
#     seed=12,
# )


[R0/0] [GRID 1/961] median_surface_net_solar_radiation_kWh_per_m2__x_demand_met-demand_met_sqrd__fe_month


KeyboardInterrupt: 