# ðŸ’¡ Introduction

This notebook demonstrates how Dutch **Standard Load Profiles (SLPs)** can be scaled into a **quarter-hourly residential electricity demand time series** for a given spatial aggregation.

The examples use **Alkmaar (GM0361)**, but the workflow is applicable to other aggregation levels such as PC6, wijk, buurt, or municipality.

## Workflow overview

1. Load and preprocess quarter-hourly SLP fractions (ENERGIEDATA WIJZER).
2. Load annual household electricity consumption statistics (with and without PV).
3. Load dwelling-level housing data and retain residential-only PC6 areas.
4. Construct household group counts per PC6.
5. Reconstruct:
   - a residential load profile for a single PC6, and
   - an aggregated profile for the municipality.


## Environment setup

The following libraries are used:
- `pandas` for data handling,
- `matplotlib` for visualization,
- `illuminator` for compatibility with the projectâ€™s simulation framework.

`nest_asyncio` is applied to prevent event-loop conflicts when running simulations inside a Jupyter environment.


In [16]:
# Environment setup
!pip install illuminator

import pandas as pd
import matplotlib.pyplot as plt
from pathlib import Path
import nest_asyncio

nest_asyncio.apply()




## Reading and structuring Dutch Standard Load Profiles

The ENERGIEDATA WIJZER electricity SLP file is loaded and converted into a structured DataFrame:

- SLP profile names are extracted from the file header,
- quarter-hourly fractions are read from the data section,
- timestamps are parsed and set as the index.

The resulting DataFrame contains quarter-hourly SLP fractions for the full year and is used as the basis for load reconstruction.


In [17]:
def read_dutch_slp_csv(csv_path: Path) -> pd.DataFrame:
    """
    Read Dutch SLP CSV (ENERGIEDATA WIJZER) and return a DataFrame of SLP fractions.

    Returns
    -------
    pd.DataFrame
        Index: timestamps (quarter-hourly)
        Columns: SLP profile names (e.g. E1A_AZI_A, E1A_AMI_A, ...)
    """
    # 1) Read header to extract profile names
    with open(csv_path, "r", encoding="utf-8-sig") as f:
        header_line = f.readline().strip()

    parts = header_line.split(";")
    raw_names = [p for p in parts[3:] if p != ""]
    profile_names = [n.split("_", 1)[1] if "_" in n else n for n in raw_names]

    # 2) Read time series body
    df = pd.read_csv(
        csv_path,
        sep=";",
        encoding="utf-8-sig",
        decimal=".",
        skiprows=7,
        header=None
    )

    # Drop trailing empty column if present
    if df.iloc[:, -1].isna().all():
        df = df.iloc[:, :-1]

    # Assign columns and set time index
    df.columns = ["time", "from", "to"] + profile_names
    df["time"] = pd.to_datetime(df["time"], dayfirst=True, errors="coerce")
    df = df.dropna(subset=["time"]).set_index("time")
    df.index.name = "time"

    return df


In [18]:
slp_path = Path("data") / "Standaardprofielen elektriciteit 2026 versie 1.00.csv"

slp_df = read_dutch_slp_csv(slp_path)

print("Shape:", slp_df.shape)
print("Columns (first 10):", list(slp_df.columns[:10]))
slp_df.head()


Shape: (35040, 32)
Columns (first 10): ['from', 'to', 'E1A_AZI_A', 'E1A_AZI_I', 'E1A_AMI_A', 'E1A_AMI_I', 'E1B_AZI_A', 'E1B_AZI_I', 'E1B_AMI_A', 'E1B_AMI_I']


Unnamed: 0_level_0,from,to,E1A_AZI_A,E1A_AZI_I,E1A_AMI_A,E1A_AMI_I,E1B_AZI_A,E1B_AZI_I,E1B_AMI_A,E1B_AMI_I,...,E3A_A,E3A_I,E3B_A,E3B_I,E3C_A,E3C_I,E3D_A,E3D_I,E4A_A,E4A_I
time,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
2026-01-01 00:15:00,01-01-2026 00:00,01-01-2026 00:15,3e-05,2e-08,3.7e-05,2e-08,6.5e-05,6e-08,7.3e-05,6e-08,...,5.5e-05,2e-06,5.5e-05,2e-06,5.5e-05,2e-06,5.5e-05,2e-06,7.9e-05,2e-06
2026-01-01 00:30:00,01-01-2026 00:15,01-01-2026 00:30,3e-05,2e-08,3.6e-05,2e-08,6.4e-05,7e-08,7.2e-05,7e-08,...,5.5e-05,2e-06,5.5e-05,2e-06,5.5e-05,2e-06,5.5e-05,2e-06,7.9e-05,2e-06
2026-01-01 00:45:00,01-01-2026 00:30,01-01-2026 00:45,2.9e-05,2e-08,3.6e-05,2e-08,6.2e-05,7e-08,7e-05,7e-08,...,5.7e-05,2e-06,5.7e-05,2e-06,5.7e-05,2e-06,5.7e-05,2e-06,7.9e-05,2e-06
2026-01-01 01:00:00,01-01-2026 00:45,01-01-2026 01:00,2.8e-05,2e-08,3.5e-05,2e-08,6.1e-05,7e-08,6.9e-05,7e-08,...,5.7e-05,2e-06,5.7e-05,2e-06,5.7e-05,2e-06,5.7e-05,2e-06,7.9e-05,2e-06
2026-01-01 01:15:00,01-01-2026 01:00,01-01-2026 01:15,2.8e-05,2e-08,3.5e-05,2e-08,5.9e-05,7e-08,6.9e-05,7e-08,...,5.9e-05,2e-06,5.9e-05,2e-06,5.9e-05,2e-06,5.9e-05,2e-06,7.9e-05,2e-06


## Helper functions

Two helper functions are defined:
- one to **run a load model for a full year**,
- one to **plot a specific day** from a yearly time series.


In [19]:
def run_load_model_year(load_model, index, step_seconds=900, inputs=None, output_key="load_dem"):
    if inputs is None:
        inputs = {"load": 0}

    def to_float(x):
        if isinstance(x, (int, float)):
            return float(x)
        if isinstance(x, dict):
            if output_key in x:
                return float(x[output_key])
            if "value" in x:
                return float(x["value"])
        return float(x)

    values = []
    for i in range(len(index)):
        t = i * step_seconds
        load_model.step(time=t, inputs=inputs)
        out = load_model._model.outputs.get(output_key)
        values.append(to_float(out))

    return pd.Series(values, index=index, name=output_key)


In [20]:
def plot_day_from_year(series_year, day, ylabel="load_dem"):
    day = pd.to_datetime(day).date()
    s_day = series_year.loc[series_year.index.date == day]

    if s_day.empty:
        raise ValueError(f"No rows found for {day}")

    plt.figure(figsize=(12, 4))
    plt.plot(s_day.index, s_day.values)
    plt.title(f"{ylabel} on {day}")
    plt.ylabel(ylabel)
    plt.grid(True)
    plt.tight_layout()
    plt.show()

    return s_day


## Load model with SLP-based demand

A custom `Load` model is implemented following Illuminator conventions.
When `slp_enabled=True`, the model internally generates quarter-hourly demand
from SLP fractions scaled by annual electricity consumption.


In [21]:
from illuminator.builder import ModelConstructor

# ---------------------------------------------------------------------
# Load class (auto-chooses annual_kwh from tables)
# ---------------------------------------------------------------------
class Load(ModelConstructor):
    """
    Calculates total load demand based on number of houses and input load.

    Two modes:
    - slp_enabled=True:
        Generates per-house load internally from SLP fractions (slp_df).
        annual_kwh is chosen automatically from an annual consumption table
        if provided (PV table for pv=True, no-PV table for pv=False).
        Ignores external input 'load'. (SLP wins)

    - slp_enabled=False:
        Uses external per-house input 'load'.
        Works both:
          (a) inside Simulation (Illuminator message format via unpack_inputs)
          (b) in notebooks (plain dict like {'load': 0})
    """

    parameters = {
        "houses": 1,
        "input_type": "energy",
        "output_type": "power",

        "slp_enabled": False,
        "slp_df": None,
        "category": "E1A",
        "pv": False,
        "direction": "A",

        # annual_kwh is still kept for backward compatibility / fallback
        "annual_kwh": 2270.0,

        # NEW: filtering parameters
        "energy_label": "No Energy Label",  # e.g. "A", "B", "D/E", "A+", etc.
        "surface_area": "total",            # numeric m2 or "total"

        # NEW: lookup tables
        # Pass your PV table as annual_table_pv, and later the no-PV table as annual_table_no_pv
        "annual_table_pv": None,
        "annual_table_no_pv": None,

        "step_seconds": 900,
    }

    inputs = {"load": 0}
    outputs = {"load_dem": 0, "consumption": 0}
    states = {"time": None, "forecast": None}

    time_step_size = 1
    time = None

    def __init__(self, **kwargs) -> None:
        super().__init__(**kwargs)

        # Helper: prefer explicit kwargs, then _model.parameters, then default
        def get_param(name, default):
            if name in kwargs:
                return kwargs[name]
            return self._model.parameters.get(name, default)

        self.consumption = 0

        self.houses = int(get_param("houses", 1))
        self.input_type = get_param("input_type", "power")
        self.output_type = get_param("output_type", "energy")

        if self.input_type not in ["power", "energy"]:
            raise ValueError(f"Invalid input_type: {self.input_type}. Must be 'power' or 'energy'.")
        if self.output_type not in ["power", "energy"]:
            raise ValueError(f"Invalid output_type: {self.output_type}. Must be 'power' or 'energy'.")

        # SLP parameters
        self.slp_enabled = bool(get_param("slp_enabled", False))
        self.slp_df = get_param("slp_df", None)
        self.category = get_param("category", "E1A")
        self.pv = get_param("pv", False)
        self.direction = get_param("direction", "A")

        # NEW filter params
        self.energy_label = get_param("energy_label", "No Energy Label")
        self.surface_area = get_param("surface_area", "total")

        # NEW lookup tables
        self.annual_table_pv = get_param("annual_table_pv", None)
        self.annual_table_no_pv = get_param("annual_table_no_pv", None)

        # Fallback / explicit annual_kwh still supported
        self.annual_kwh = float(get_param("annual_kwh", 2270.0))

        self.step_seconds = int(get_param("step_seconds", 900))

        if self.direction not in ["A", "I"]:
            raise ValueError("direction must be 'A' or 'I'")
        if self.pv not in [True, False]:
            raise ValueError("pv must be True or False")
        if self.annual_kwh < 0:
            raise ValueError("annual_kwh must be >= 0")
        if self.step_seconds <= 0:
            raise ValueError("step_seconds must be > 0")

        # -------------------------------------------------------------
        # NEW: auto-select annual_kwh from the correct table if provided
        # -------------------------------------------------------------
        chosen_table = None
        if self.pv and self.annual_table_pv is not None:
            chosen_table = self.annual_table_pv
        if (not self.pv) and self.annual_table_no_pv is not None:
            chosen_table = self.annual_table_no_pv

        if chosen_table is not None:
            self.annual_kwh = annual_kwh_from_table(
                chosen_table,
                energy_label=self.energy_label,
                surface_area=self.surface_area,
                split_threshold=100
            )

        # Precompute SLP series if enabled
        self._slp_series_kwh_step = None
        if self.slp_enabled:
            if self.slp_df is None:
                raise ValueError("slp_enabled=True requires slp_df to be provided")

            meter_type = "AMI" if self.pv else "AZI"
            self.slp_col = f"{self.category}_{meter_type}_{self.direction}"

            if self.slp_col not in self.slp_df.columns:
                raise KeyError(f"SLP column '{self.slp_col}' not found in slp_df")

            self._slp_series_kwh_step = self.slp_df[self.slp_col] * self.annual_kwh

    def _read_plain_load_input(self, inputs):
        """
        Notebook friendly: accept inputs like {'load': 0.0} directly.
        """
        if inputs is None:
            return 0.0
        val = inputs.get("load", 0.0)
        try:
            return float(val)
        except Exception:
            return 0.0

    def step(self, time: int, inputs: dict = None, max_advance: int = 900) -> None:
        self.time = time

        if self.slp_enabled:
            step_i = int(time / self.step_seconds)
            if step_i < 0:
                step_i = 0
            if step_i >= len(self._slp_series_kwh_step):
                step_i = len(self._slp_series_kwh_step) - 1

            per_house_energy = float(self._slp_series_kwh_step.iloc[step_i])

            if self.input_type == "energy":
                load_in = per_house_energy
            else:
                deltaTime = self.step_seconds / 3600.0
                load_in = per_house_energy / deltaTime

        else:
            # If it looks like an Illuminator message structure, use unpack_inputs.
            # Otherwise, treat it as a plain notebook dict.
            use_unpack = False
            if isinstance(inputs, dict):
                v = inputs.get("load", None)
                if isinstance(v, dict):
                    use_unpack = True

            if use_unpack:
                input_data = self.unpack_inputs(inputs)
                load_in = input_data.get("load", 0)
            else:
                load_in = self._read_plain_load_input(inputs)

        results = self.demand(load=load_in)
        self.set_outputs(results)
        return time + self._model.time_step_size

    def demand(self, load: float) -> dict:
        deltaTime = self.step_seconds / 3600.0

        if self.input_type == "power":
            if self.output_type == "power":
                self.consumption = self.houses * load
            else:
                self.consumption = self.houses * load * deltaTime
        else:
            if self.output_type == "power":
                self.consumption = self.houses * load / deltaTime
            else:
                self.consumption = self.houses * load

        return {"load_dem": self.consumption, "consumption": self.consumption}


## Annual electricity consumption data

Aggregated statistics provide:
- total number of dwellings,
- PV penetration rate,
- average annual grid electricity import
  for households with and without PV.

These values are used to scale the SLPs.


Read the table that contains the consumption data for households with PV injection capability

In [22]:
pv_annual_path = Path("data") / "Annual_Energy_Consumption_WithPV.csv"

# detect delimiter
with open(pv_annual_path, "r", encoding="utf-8-sig") as f:
    first_line = f.readline()

sep = ";" if first_line.count(";") > first_line.count(",") else ","

pv_annual_df = pd.read_csv(
    pv_annual_path,
    sep=sep,
    encoding="utf-8-sig",
    decimal=".",
)

# clean column names
pv_annual_df.columns = (
    pv_annual_df.columns.astype(str).str.strip().str.replace("\ufeff", "", regex=False)
)

# clean label values (strip spaces)
pv_annual_df["Label"] = pv_annual_df["Label"].astype(str).str.strip()

pv_annual_df


Unnamed: 0,Label,Average Surface Area,Avg Energy Consumption per Energy Class,Avg Energy Consumption per Energy Class < 100 m2,Avg Energy Consumption per Energy Class > 100 m2,Label Upgrade Consumption Change
0,A+ and above,112,3364,1641,4857,
1,A,100,2548,1837,3238,32%
2,B,99,2024,1892,3561,26%
3,C,90,2111,2123,2059,-4%
4,D/E,82,1785,1690,2914,18%
5,F/G,83,3220,1668,4464,-45%
6,No Energy Label,93,3840,2324,5053,-16%


Read the table that contains the consumption data for households without PV injection capability

In [23]:
no_pv_annual_path = Path("data") / "Annual_Energy_Consumption_NoPV.csv"

# detect delimiter
with open(no_pv_annual_path, "r", encoding="utf-8-sig") as f:
    first_line = f.readline()

sep_no_pv = ";" if first_line.count(";") > first_line.count(",") else ","

no_pv_annual_df = pd.read_csv(
    no_pv_annual_path,
    sep=sep_no_pv,
    encoding="utf-8-sig",
    decimal=".",
)

# clean column names + label column
no_pv_annual_df.columns = (
    no_pv_annual_df.columns.astype(str).str.strip().str.replace("\ufeff", "", regex=False)
)
no_pv_annual_df["Label"] = no_pv_annual_df["Label"].astype(str).str.strip()

no_pv_annual_df


Unnamed: 0,Label,Average Surface Area,Avg Energy Consumption per Energy Class,Avg Energy Consumption per Energy Class < 100 m2,Avg Energy Consumption per Energy Class > 100 m2,Label Upgrade Consumption Change
0,A+ and above,67,2523,2523,4857,
1,A,71,1759,1657,2986,43%
2,B,75,1717,1658,2522,2%
3,C,74,2069,2041,2421,-17%
4,D/E,69,1976,1976,2914,5%
5,F/G,61,4443,4443,4464,-56%
6,No Energy Label,56,3840,2324,5053,16%


In [24]:
# ---------------------------------------------------------------------
# Helper functions (keep these in the same cell as the class, or above it)
# ---------------------------------------------------------------------
def _normalize_energy_label(label) -> str:
    """
    Map user inputs to the exact labels used in the annual consumption tables.
    Table labels expected: 'A+ and above', 'A', 'B', 'C', 'D/E', 'F/G', 'No Energy Label'
    """
    if label is None:
        return "No Energy Label"

    s = str(label).strip().upper()

    # Generic totals / unknowns
    if s in ["TOTAL", "ALL", "AVG", "AVERAGE"]:
        return "No Energy Label"
    if s in ["NO ENERGY LABEL", "UNKNOWN", "NAN", "NONE", "UNLABELED", "UNLABELLED"]:
        return "No Energy Label"

    # A+ group (A+, A++, A+++, etc.)
    if s.startswith("A+") or s in ["A+ AND ABOVE", "A+ AND HIGHER", "A+ ABOVE"]:
        return "A+ and above"

    # Combined groups
    if s in ["D", "E", "D/E", "DE"]:
        return "D/E"
    if s in ["F", "G", "F/G", "FG"]:
        return "F/G"

    # Single letters
    if s in ["A", "B", "C"]:
        return s

    # Fallback: try to keep original formatting
    return str(label).strip()


def annual_kwh_from_table(annual_df, energy_label, surface_area, split_threshold=100) -> float:
    """
    Pick annual kWh from a table like:
      Label | Avg Energy Consumption per Energy Class | <100 | >100

    surface_area:
      - numeric -> choose <100 or >100 column
      - 'total'/None -> choose overall column
    """
    label_key = _normalize_energy_label(energy_label)

    # Choose column based on surface area bucket
    if surface_area is None or str(surface_area).strip().lower() == "total":
        col = "Avg Energy Consumption per Energy Class"
    else:
        sa = float(surface_area)
        if sa < split_threshold:
            col = "Avg Energy Consumption per Energy Class < 100 m2"
        else:
            col = "Avg Energy Consumption per Energy Class > 100 m2"

    row = annual_df.loc[annual_df["Label"] == label_key]
    if row.empty:
        raise KeyError(
            f"Energy label '{energy_label}' normalized to '{label_key}' not found in annual consumption table."
        )

    return float(row.iloc[0][col])

## Housing microdata (PC6 level)

We load the dwelling-level housing microdata for Alkmaar (GM0361).  
This dataset is later used to:

- filter **residential-only** PC6 areas,
- define a PV proxy using `p6_kwh_productie_2023 > 0`,
- group dwellings by **PV status Ã— energy label Ã— surface area bucket**.

At this stage we only:
- read the CSV file,
- clean column-name artifacts,
- inspect the dataset shape as a basic sanity check.


In [25]:
# ------------------------------------------------------------
# Load housing microdata (Alkmaar: GM0361)
# ------------------------------------------------------------

pc6_path = Path("data") / "energiedata-match-gemeentecode=[GM0361].csv"

pc6_df = pd.read_csv(
    pc6_path,
    sep=";",
    encoding="utf-8-sig",
    engine="python",
    on_bad_lines="skip"
)

# Clean column names (strip whitespace and remove BOM artifacts)
pc6_df.columns = pc6_df.columns.astype(str).str.strip().str.replace("\ufeff", "", regex=False)

# Basic sanity checks
print("PC6 microdata loaded")
print("Rows:", len(pc6_df))
print("Columns:", len(pc6_df.columns))
print("Unique PC6 codes:", pc6_df["postcode"].astype(str).str.strip().nunique())

pc6_df.head()


PC6 microdata loaded
Rows: 64047
Columns: 39
Unique PC6 codes: 2924


Unnamed: 0,numid,pid,vid,lid,sid,postcode,straat,woonplaatsnaam,huisnummer,huisletter,...,p6_kwh_productie_2023,point,buurtcode,buurtnaam,wijkcode,wijknaam,gemeentecode,gemeentenaam,provincienaam,provinciecode
0,365200000001132,365100000000000.0,824650836992,0,0,1487ME,Groenedijk,Oost-Graftdijk,6,,...,3.0,POINT (4.810510284914175 52.5508690495687),BU03611004,Oost-Graftdijk,WK036110,Graft-De Rijp,GM0361,Alkmaar,Noord-Holland,PV27
1,361200000200310,361100000000000.0,824650838992,0,0,1813XS,Hooftstraat,Alkmaar,72,,...,0.0,POINT (4.755955949327461 52.62483854445001),BU03610300,Oud-Overdie,WK036103,Overdie,GM0361,Alkmaar,Noord-Holland,PV27
2,361200000106100,361100000000000.0,824650841040,0,0,1814DE,Van Houtenkade,Alkmaar,3,E,...,0.0,POINT (4.741202321774902 52.62487976979709),BU03610104,Emmakwartier,WK036101,Zuid,GM0361,Alkmaar,Noord-Holland,PV27
3,361200000018410,361100000000000.0,824655570080,0,0,1811MG,Kooltuin,Alkmaar,2,,...,25.0,POINT (4.752676787349159 52.631904468022405),BU03610801,Binnenstad-Oost,WK036108,Centrum,GM0361,Alkmaar,Noord-Holland,PV27
4,361200000029967,361100000000000.0,824655571760,0,0,1823EB,Rijnstraat,Alkmaar,53,,...,845.0,POINT (4.764306608550646 52.63176425740014),BU03610204,Oudorperpolder-Zuid,WK036102,Oudorp,GM0361,Alkmaar,Noord-Holland,PV27


## Residential-only filtering and household grouping

We restrict the dataset to **residential-only** PC6 areas to avoid mixing households with business connections.

**Residential-only PC6 definition (conservative):**
A PC6 is kept only if **all** entries in that PC6:
- include `woonfunctie` in `gebruiksdoelen`, and
- have a missing `sbicode` (no business activity registered).

After filtering, we create household-level features used for grouping:
- **PV proxy**: `has_pv = (p6_kwh_productie_2023 > 0)`
- **Energy label cleanup**: empty or missing labels become `"No Energy Label"`
- **Surface area bucket**: `<100 mÂ²` vs `>=100 mÂ²`

Finally, we construct a grouping table with counts per:
**PC6 Ã— PV status Ã— energy label Ã— surface area bucket**.


In [26]:
# ------------------------------------------------------------
# Step 5A: Filter residential-only PC6 areas
# ------------------------------------------------------------

pc6_df = pc6_df.copy()

pc6_df["is_residential"] = (
    pc6_df["gebruiksdoelen"].astype(str).str.contains("woonfunctie", na=False)
    & pc6_df["sbicode"].isna()
)

residential_pc6 = pc6_df.groupby("postcode")["is_residential"].all()
pc6_df = pc6_df[pc6_df["postcode"].isin(residential_pc6[residential_pc6].index)].copy()

print("After residential-only PC6 filter")
print("Rows:", len(pc6_df))
print("Unique PC6 codes:", pc6_df["postcode"].astype(str).str.strip().nunique())


# ------------------------------------------------------------
# Step 5B: Clean fields and create grouping features
# ------------------------------------------------------------

# Standardize postcode
pc6_df["postcode"] = pc6_df["postcode"].astype(str).str.strip().str.upper()

# Energy label cleanup
pc6_df["energieklasse"] = (
    pc6_df["energieklasse"]
    .astype(str)
    .str.strip()
    .replace({"": "No Energy Label", "nan": "No Energy Label", "None": "No Energy Label"})
)

# Numeric fields
pc6_df["oppervlakte"] = pd.to_numeric(pc6_df["oppervlakte"], errors="coerce")
pc6_df["p6_kwh_productie_2023"] = pd.to_numeric(pc6_df["p6_kwh_productie_2023"], errors="coerce").fillna(0.0)

# PV proxy
pc6_df["has_pv"] = pc6_df["p6_kwh_productie_2023"] > 0

# Surface area bucket (<100, >=100)
pc6_df["sa_bucket"] = pd.cut(
    pc6_df["oppervlakte"],
    bins=[-1, 100, float("inf")],
    labels=["<100", ">=100"]
)


# ------------------------------------------------------------
# Step 5C: Summary tables (optional diagnostics)
# ------------------------------------------------------------

pc6_summary = (
    pc6_df.groupby("postcode")
    .agg(
        n_dwellings=("postcode", "size"),
        pv_share=("has_pv", "mean"),
        mean_surface=("oppervlakte", "mean"),
    )
    .reset_index()
)

pc6_label_counts = (
    pc6_df.groupby(["postcode", "energieklasse"])
    .size()
    .rename("n")
    .reset_index()
)

print("PC6 summary table rows:", len(pc6_summary))
print("PC6 label counts rows:", len(pc6_label_counts))


# ------------------------------------------------------------
# Step 5D: Household grouping table used for reconstruction
# ------------------------------------------------------------

pc6_groups = (
    pc6_df
    .groupby(["postcode", "has_pv", "energieklasse", "sa_bucket"])
    .size()
    .rename("n")
    .reset_index()
)

pc6_groups.head(15)


After residential-only PC6 filter
Rows: 33622
Unique PC6 codes: 1783
PC6 summary table rows: 1783
PC6 label counts rows: 6405


Unnamed: 0,postcode,has_pv,energieklasse,sa_bucket,n
0,1483AA,False,A,<100,0
1,1483AA,False,A,>=100,0
2,1483AA,False,A+,<100,0
3,1483AA,False,A+,>=100,0
4,1483AA,False,A++,<100,0
5,1483AA,False,A++,>=100,0
6,1483AA,False,A+++,<100,0
7,1483AA,False,A+++,>=100,0
8,1483AA,False,A++++,<100,0
9,1483AA,False,A++++,>=100,0


## PC6 reconstruction: quarter-hourly residential demand

We reconstruct a residential load profile for a single PC6 by aggregating household groups.

**Grouping dimensions**
- PV proxy: `has_pv = (p6_kwh_productie_2023 > 0)`
- Energy label: `energieklasse` (missing values mapped to `"No Energy Label"`)
- Surface area bucket: `<100 mÂ²` vs `>=100 mÂ²`

**Core idea**
For each group, demand is constructed as:

`demand(t) = SLP_fraction(t, profile) Ã— annual_kWh(label, surface, PV) Ã— number_of_dwellings`

The PC6 profile is the sum over all groups.

**Output**
A quarter-hourly time series in kWh per 15 minutes for the full year.


In [27]:
def reconstruct_pc6_residential_load(
    pc6_code,
    pc6_groups,
    slp_df,
    pv_annual_df,
    no_pv_annual_df,
    index,
    sa_low=80,
    sa_high=120,
    verbose=False,
):
    """
    Reconstruct annual load profile for ONE PC6 (residential-only).
    Returns a pd.Series (quarter-hourly kWh).

    Speed improvements:
    - no per-group reindex if index matches slp_df.index
    - no pd.concat of many series, we sum incrementally
    - cache annual_kwh lookups per (has_pv, label, bucket)
    """

    df_pc6 = pc6_groups[pc6_groups["postcode"] == pc6_code].copy()
    if df_pc6.empty:
        raise ValueError(f"PC6 {pc6_code} not found")

    if verbose:
        n_total = int(df_pc6["n"].sum())
        pv_injection_detected = bool(df_pc6.loc[df_pc6["has_pv"] == True, "n"].sum() > 0)
        print(f"\nPC6: {pc6_code}")
        print(f"Total dwellings: {n_total}")
        print(f"PV injection detected in PC6 (proxy-based): {pv_injection_detected}")
        print(df_pc6[df_pc6["n"] > 0][["has_pv", "energieklasse", "sa_bucket", "n"]])

    # If you pass index=slp_df.index (you do), we can avoid reindexing entirely
    same_index = index.equals(slp_df.index)

    # Accumulator
    out = pd.Series(0.0, index=index, name="load_dem")

    # Cache annual_kwh so we donâ€™t re-lookup for identical groups
    annual_kwh_cache = {}

    for _, row in df_pc6.iterrows():
        n = int(row["n"])
        if n == 0:
            continue

        has_pv = bool(row["has_pv"])
        energy_label = row["energieklasse"]
        sa_bucket = str(row["sa_bucket"])

        surface_area = sa_low if sa_bucket == "<100" else sa_high

        cache_key = (has_pv, energy_label, sa_bucket)
        if cache_key in annual_kwh_cache:
            annual_kwh = annual_kwh_cache[cache_key]
        else:
            annual_df = pv_annual_df if has_pv else no_pv_annual_df
            annual_kwh = annual_kwh_from_table(
                annual_df,
                energy_label=energy_label,
                surface_area=surface_area,
                split_threshold=100,
            )
            annual_kwh_cache[cache_key] = annual_kwh

        meter_type = "AMI" if has_pv else "AZI"
        slp_col = f"E1A_{meter_type}_A"

        if same_index:
            out += slp_df[slp_col] * (annual_kwh * n)
        else:
            out += (slp_df[slp_col] * (annual_kwh * n)).reindex(index, fill_value=0.0)

    if float(out.sum()) == 0.0:
        raise ValueError(f"PC6 {pc6_code} has no residential load")

    return out


def describe_pc6(pc6_code, pc6_groups):
    df_pc6 = pc6_groups[pc6_groups["postcode"] == pc6_code]

    if df_pc6.empty:
        print(f"PC6 {pc6_code}: not found")
        return

    n_total = int(df_pc6["n"].sum())

    # PC6-level PV injection flag
    pv_injection_pc6 = df_pc6.loc[df_pc6["has_pv"], "n"].sum() > 0

    print(f"\nPC6: {pc6_code}")
    print(f"Total dwellings: {n_total}")
    print(f"PV injection present in PC6: {pv_injection_pc6}")

    print("\nDwelling composition (energy label Ã— surface bucket):")
    for _, row in df_pc6.iterrows():
        if row["n"] > 0:
            print(
                f"  - {row['energieklasse']}, {row['sa_bucket']} mÂ²: {int(row['n'])} dwellings"
            )


In [28]:
# ------------------------------------------------------------
# Example: reconstruct one PC6
# ------------------------------------------------------------

pc6_id = "1483AA"

describe_pc6(pc6_id, pc6_groups)

pc6_series = reconstruct_pc6_residential_load(
    pc6_code=pc6_id,
    pc6_groups=pc6_groups,
    slp_df=slp_df,
    pv_annual_df=pv_annual_df,
    no_pv_annual_df=no_pv_annual_df,
    index=slp_df.index,
    verbose=False,
)

print("Annual kWh (model):", pc6_series.sum())



PC6: 1483AA
Total dwellings: 9
PV injection present in PC6: True

Dwelling composition (energy label Ã— surface bucket):
  - A, >=100 mÂ²: 1 dwellings
  - B, >=100 mÂ²: 2 dwellings
  - C, >=100 mÂ²: 1 dwellings
  - G, >=100 mÂ²: 1 dwellings
  - No Energy Label, <100 mÂ²: 1 dwellings
  - No Energy Label, >=100 mÂ²: 3 dwellings
Annual kWh (model): 34365.96288472


## Municipality reconstruction (GM code)

To obtain a municipality-wide residential demand profile, we:

1. identify all residential-only PC6 codes that belong to the municipality (`gemeentecode`),
2. reconstruct each PC6 profile using the same grouping approach as in Step 6,
3. sum all PC6 profiles into a single municipality time series.

**Outputs**
- `gemeente_series`: quarter-hourly kWh per 15 minutes (full year)
- `n_dwellings_total`: total dwellings included in the reconstruction
- `annual_kwh_total`: annual electricity demand implied by the model (kWh)


In [29]:
# ------------------------------------------------------------
# PC6 â†’ municipality mapping (based on filtered residential dataset)
# ------------------------------------------------------------

pc6_to_gemeente = (
    pc6_df[["postcode", "gemeentecode"]]
    .dropna()
    .drop_duplicates("postcode")
    .set_index("postcode")["gemeentecode"]
    .to_dict()
)


def reconstruct_gemeente_residential_load(
    gemeentecode,
    pc6_groups,
    pc6_to_gemeente,
    slp_df,
    pv_annual_df,
    no_pv_annual_df,
    index,
):
    """
    Aggregate residential load profile for one municipality (gemeentecode).

    Speed improvements:
    - sum incrementally (no pd.concat over many PC6 series)
    """
    pc6_list = [pc6 for pc6, gm in pc6_to_gemeente.items() if gm == gemeentecode]

    out = pd.Series(0.0, index=index, name="load_dem")
    n_dwellings_total = 0

    for pc6 in pc6_list:
        n_pc6 = int(pc6_groups.loc[pc6_groups["postcode"] == pc6, "n"].sum())
        n_dwellings_total += n_pc6

        s = reconstruct_pc6_residential_load(
            pc6_code=pc6,
            pc6_groups=pc6_groups,
            slp_df=slp_df,
            pv_annual_df=pv_annual_df,
            no_pv_annual_df=no_pv_annual_df,
            index=index,
            verbose=False,
        )

        out += s

    annual_kwh_total = float(out.sum())
    return out, n_dwellings_total, annual_kwh_total


In [30]:
# ------------------------------------------------------------
# Example: municipality reconstruction (Alkmaar GM0361)
# ------------------------------------------------------------

alkmaar_series, alkmaar_n, alkmaar_kwh = reconstruct_gemeente_residential_load(
    gemeentecode="GM0361",
    pc6_groups=pc6_groups,
    pc6_to_gemeente=pc6_to_gemeente,
    slp_df=slp_df,
    pv_annual_df=pv_annual_df,
    no_pv_annual_df=no_pv_annual_df,
    index=slp_df.index,
)

print("Municipality: GM0361 (Alkmaar)")
print("Residential dwellings included:", alkmaar_n)
print("Residential annual kWh (model):", int(alkmaar_kwh))
print("Annual kWh per dwelling (model):", int(alkmaar_kwh / alkmaar_n) if alkmaar_n > 0 else None)

plot_day_from_year(alkmaar_series, "2026-06-15", ylabel="Alkmaar residential kWh per 15 min")


KeyboardInterrupt: 