## Environment setup

I start by importing the full set of libraries and internal project modules needed for the end-to-end workflow: data acquisition, cleaning, PostgreSQL I/O, panel regression, and reporting. Centralizing imports up front keeps the pipeline reproducible and makes each downstream cell single-purpose.


In [1]:
from __future__ import annotations
import io
import os
import json
import uuid
import math
import logging
from pathlib import Path    
import time
from datetime import datetime
import warnings
from dataclasses import dataclass
from datetime import datetime, timezone
import subprocess
import re
import numpy as np
import pandas as pd
import csv
from dotenv import load_dotenv

import hashlib
from catboost import CatBoostRegressor
from sklearn.model_selection import KFold
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.ensemble import HistGradientBoostingRegressor
import matplotlib.pyplot as plt
from scipy import stats
from statsmodels.graphics.tsaplots import plot_acf

from linearmodels import PanelOLS
import sqlalchemy as sa
from sqlalchemy import text
import psycopg
from psycopg.rows import dict_row

from reportlab.lib.pagesizes import LETTER
from reportlab.lib.units import inch
from reportlab.lib import colors
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.platypus import (
    SimpleDocTemplate, Paragraph, Spacer, PageBreak, Image,
    Table, TableStyle, Preformatted, KeepTogether
)
from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.ttfonts import TTFont

from dc_prices.config import ProjectPaths, load_settings
from dc_prices.http import make_cached_session
from dc_prices.eia import fetch_eia_v2_all_pages
from dc_prices.fractracker import load_fractracker_csv
from dc_prices.qa import qa_overview, qa_column_profile, assert_required_columns, check_state_codes
from dc_prices.eda import write_profile_report


warnings.filterwarnings("ignore")
pd.set_option("display.max_columns", 200)
pd.set_option("display.width", 140)

RANDOM_SEED = 42
np.random.seed(RANDOM_SEED)


## Project configuration

I initialize consistent project paths (raw/interim/outputs/reports) and load runtime settings (e.g., API paging and timeouts) from a single configuration layer. I also set up a cached HTTP session so repeated API pulls are deterministic and less likely to trigger rate/timeout issues.

In [2]:
# Cell: Logging + project paths
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(message)s"
)
logger = logging.getLogger("capstone")

PROJECT_ROOT = Path.cwd().parent 

# Define directories relative to the true Project Root
SQL_DIR = PROJECT_ROOT / "sql"
REPORTS_DIR = PROJECT_ROOT / "reports"
FIG_DIR = REPORTS_DIR / "figures"
TABLE_DIR = REPORTS_DIR / "tables"
DATA_RAW = PROJECT_ROOT / "data" / "raw"
FRAC_PATH = DATA_RAW / "Data_Centers_Database - FracTracker Data Centers.csv"

REPORTS_DIR.mkdir(parents=True, exist_ok=True)
FIG_DIR.mkdir(parents=True, exist_ok=True)
TABLE_DIR.mkdir(parents=True, exist_ok=True)

logger.info(f"PROJECT_ROOT = {PROJECT_ROOT}")
logger.info(f"SQL_DIR       = {SQL_DIR}")

REPO_ROOT = Path.cwd().parents[0]
paths = ProjectPaths.from_repo_root(REPO_ROOT)
paths.data_interim.mkdir(parents=True, exist_ok=True)
paths.reports_qa.mkdir(parents=True, exist_ok=True)
paths.reports_eda.mkdir(parents=True, exist_ok=True)

settings = load_settings()
session = make_cached_session(cache_name=str(paths.data_interim / "requests_cache"))


2026-01-10 17:17:51,854 | INFO | PROJECT_ROOT = C:\Users\amand\OneDrive\Documents\School\Assignments\WGU\Data-Engineering\Capstone\datacenter-electricity-prices-us
2026-01-10 17:17:51,860 | INFO | SQL_DIR       = C:\Users\amand\OneDrive\Documents\School\Assignments\WGU\Data-Engineering\Capstone\datacenter-electricity-prices-us\sql


## Database connectivity

I build the PostgreSQL connection using environment variables so credentials never live in the notebook. I then validate connectivity and define lightweight SQL helpers that let me execute small verification queries as I move from raw ingestion to curated marts.

In [3]:
PROJECT_ROOT = Path.cwd()
load_dotenv(PROJECT_ROOT / ".env", override=True)

# Build URL programmatically (avoids URL-encoding issues)


url = sa.URL.create(
    "postgresql+psycopg",
    username=os.environ["PGUSER"],
    password=os.environ["PGPASSWORD"],
    host=os.environ.get("PGHOST", "localhost"),
    port=int(os.environ.get("PGPORT", "5432")),
    database=os.environ["PGDATABASE"],
)
print(url)
engine = sa.create_engine(url, pool_pre_ping=True, future=True)

# Quick connectivity test
with engine.connect() as conn:
    v = conn.execute(text("select version();")).scalar_one()
print(v)

def sql_exec(stmt: str, params: dict | None = None):
    with engine.begin() as conn:
        conn.execute(text(stmt), params or {})


postgresql+psycopg://postgres:***@localhost:5432/dc_capstone
PostgreSQL 18.1 on x86_64-windows, compiled by msvc-19.44.35221, 64-bit


## Database schema bootstrap

I initialize the database objects from my SQL DDL (schemas/tables/indexes) to ensure the database is the system of record for the pipeline. I intentionally execute the DDL programmatically so the environment can be rebuilt consistently from scratch when needed.

In [4]:
# =========================
# Bootstrap: ALWAYS run DDL (sql/ddl) near the top of the notebook
# =========================
assert "engine" in globals(), "Define your SQLAlchemy `engine` before running DDL bootstrap."
assert "SQL_DIR" in globals(), "Define `SQL_DIR` (e.g., PROJECT_ROOT / 'sql') before running DDL bootstrap."

SQL_DDL_DIR = Path(SQL_DIR) / "ddl"

DDL_SQL_FILES = [
    "00_schemas.sql",
    "01_raw_tables.sql",
    "02_stage_tables.sql",
    "03_mart_dimensions.sql",
    "04_mart_facts.sql",
    "05_mart_analysis.sql",
]

_BEGIN = re.compile(r"(?im)^\s*BEGIN;\s*$")
_COMMIT = re.compile(r"(?im)^\s*COMMIT;\s*$")
_PCT = re.compile(r"%(?![sbt%])", flags=re.IGNORECASE)  # escape stray % for psycopg

def _split_sql_statements(sql: str) -> list[str]:
    """
    Split SQL on semicolons while respecting:
      - single-quoted strings
      - dollar-quoted blocks ($$...$$ or $tag$...$tag$)
      - -- line comments
      - /* block comments */
    """
    stmts, buf = [], []
    i, n = 0, len(sql)

    in_sq = False
    in_line_comment = False
    in_block_comment = False
    dollar_tag = None

    def startswith_at(s: str, pos: int) -> bool:
        return sql.startswith(s, pos)

    while i < n:
        ch = sql[i]

        # line comments
        if in_line_comment:
            buf.append(ch)
            if ch == "\n":
                in_line_comment = False
            i += 1
            continue

        # block comments
        if in_block_comment:
            buf.append(ch)
            if startswith_at("*/", i):
                buf.append("*")
                i += 2
                in_block_comment = False
            else:
                i += 1
            continue
        if not in_sq and dollar_tag is None:
            if startswith_at("--", i):
                buf.append("--")
                i += 2
                in_line_comment = True
                continue
            if startswith_at("/*", i):
                buf.append("/*")
                i += 2
                in_block_comment = True
                continue

        # dollar-quote start/end (only when not in single quotes)
        if not in_sq:
            if dollar_tag is None and ch == "$":
                j = i + 1
                while j < n and sql[j] != "$" and (sql[j].isalnum() or sql[j] == "_"):
                    j += 1
                if j < n and sql[j] == "$":
                    dollar_tag = sql[i : j + 1]  # e.g., $$ or $func$
                    buf.append(dollar_tag)
                    i = j + 1
                    continue
            elif dollar_tag is not None and startswith_at(dollar_tag, i):
                buf.append(dollar_tag)
                i += len(dollar_tag)
                dollar_tag = None
                continue

        # single quotes
        if dollar_tag is None and ch == "'":
            buf.append(ch)
            if in_sq:
                # escaped ''
                if i + 1 < n and sql[i + 1] == "'":
                    buf.append("'")
                    i += 2
                    continue
                in_sq = False
            else:
                in_sq = True
            i += 1
            continue

        # statement terminator
        if ch == ";" and (not in_sq) and (dollar_tag is None):
            stmt = "".join(buf).strip()
            if stmt:
                stmts.append(stmt)
            buf = []
            i += 1
            continue

        buf.append(ch)
        i += 1

    tail = "".join(buf).strip()
    if tail:
        stmts.append(tail)

    return stmts


def run_sql_file_bootstrap(path: Path) -> None:
    if not path.exists():
        raise FileNotFoundError(f"Missing SQL file: {path.resolve()}")

    sql_text = path.read_text(encoding="utf-8-sig")  # strips BOM if present
    sql_text = _BEGIN.sub("", sql_text)
    sql_text = _COMMIT.sub("", sql_text)
    sql_text = _PCT.sub("%%", sql_text)

    statements = _split_sql_statements(sql_text)

    print(f"Running DDL: {path.name}  (statements: {len(statements)})")
    with engine.begin() as conn:
        conn.exec_driver_sql("SET search_path TO public;")

        for k, stmt in enumerate(statements, start=1):
            s = stmt.strip()
            if not s or s.startswith("\\"):  # ignore psql meta-commands if any
                continue
            try:
                conn.exec_driver_sql(stmt)
            except Exception as e:
                preview = (stmt[:500] + " ...") if len(stmt) > 500 else stmt
                raise RuntimeError(
                    f"DDL failed in {path.name} (statement {k}/{len(statements)}):\n{preview}"
                ) from e


# Run DDL scripts in a deterministic order
for fname in DDL_SQL_FILES:
    run_sql_file_bootstrap(SQL_DDL_DIR / fname)

print("✅ DDL bootstrap complete (schemas + raw/stage/mart objects ensured).")


Running DDL: 00_schemas.sql  (statements: 3)
Running DDL: 01_raw_tables.sql  (statements: 9)
Running DDL: 02_stage_tables.sql  (statements: 8)
Running DDL: 03_mart_dimensions.sql  (statements: 4)
Running DDL: 04_mart_facts.sql  (statements: 5)
Running DDL: 05_mart_analysis.sql  (statements: 2)
✅ DDL bootstrap complete (schemas + raw/stage/mart objects ensured).


## Ingest data center facilities (FracTracker)

I load the FracTracker data center facilities CSV into a raw DataFrame and validate that the structural keys I depend on (facility ID, state, coordinates, status) are present. This is my facility-level source that will later be aggregated into state-level data center load intensity.

In [5]:
fractracker_csv = paths.data_raw / "Data_Centers_Database - FracTracker Data Centers.csv"
FRAC_PATH = fractracker_csv
df_dc = load_fractracker_csv(fractracker_csv)

# Basic structural expectations
required_dc_cols = [
    "facility_id", "name", "state", "status", "county", "lat", "long"
]
assert_required_columns(df_dc, required_dc_cols, "FracTracker Data Centers")

overview_dc = qa_overview(df_dc, "fractracker_datacenters")
overview_dc


{'dataset': 'fractracker_datacenters',
 'rows': 1260,
 'cols': 40,
 'duplicate_rows': 0,
 'missing_cells': 25840,
 'missing_pct': 0.5126984126984127}

## Facility profiling

I generate a column-level profile and explicitly inspect missingness for the MW- and size-related fields that drive my exposure measures. This gives me an evidence-based view of where imputation is required and which features are available to support it.

In [6]:
dc_col_profile = qa_column_profile(df_dc)
dc_col_profile.head(20)
df_dc["status"].value_counts(dropna=False).head(25)
for col in ["mw_high", "facility_size_sq_ft", "sizerank", "sizerank_numeric"]:
    if col in df_dc.columns:
        print(col, "missing %:", float(df_dc[col].isna().mean()))


mw_high missing %: 0.6801587301587302
facility_size_sq_ft missing %: 0.3761904761904762
sizerank missing %: 0.0
sizerank_numeric missing %: 0.002380952380952381


## Persist raw facilities + QA evidence

I save a raw parquet snapshot of the facility dataset and write QA summaries to disk. These artifacts make the pipeline auditable: I can prove what I ingested and what the data quality looked like before any transformations.

In [7]:
(df_dc
 .to_parquet(paths.data_interim / "fractracker_datacenters_raw.parquet", index=False))

dc_col_profile.to_csv(paths.reports_qa / "fractracker_column_profile.csv")

with open(paths.reports_qa / "fractracker_overview.json", "w", encoding="utf-8") as f:
    json.dump(overview_dc, f, indent=2)


## Extract retail electricity outcomes (EIA API)

I pull monthly retail sales metrics from the EIA API for the full state list and the project date window. This dataset provides my primary outcome measure—residential price (cents/kWh)—along with sales, revenue, and customer counts at a consistent reporting grain.

In [8]:
EIA_RETAIL_URL = "https://api.eia.gov/v2/electricity/retail-sales/data/"

retail_params = {
    "frequency": "monthly",
    "data[0]": "customers",
    "data[1]": "price",
    "data[2]": "revenue",
    "data[3]": "sales",
    "start": "2023-01",
    "end": "2025-10",
    "sort[0][column]": "period",
    "sort[0][direction]": "asc",  
    "facets[stateid][]": ["AK","AL","AR","AZ","CA","CO","CT","DC","DE","FL","GA","HI","IA","ID","IL","IN","KS","KY","LA",
                          "MA","MD","ME","MI","MN","MO","MS","MT","NC","ND","NE","NH","NJ","NM","NV","NY","OH","OK","OR",
                          "PA","RI","SC","SD","TN","TX","UT","VA","VT","WA","WI","WV","WY"],
}

retail_res = fetch_eia_v2_all_pages(
    session=session,
    base_url=EIA_RETAIL_URL,
    base_params=retail_params,
    api_key=settings.eia_api_key,
    page_size=settings.eia_page_size,
    timeout_s=settings.eia_timeout_s,
)

df_retail = retail_res.df
{"rows": len(df_retail), "total_reported": retail_res.total, "cols": df_retail.shape[1]}


{'rows': 10404, 'total_reported': 10404, 'cols': 13}

## Retail extract sanity check

In [9]:
df_retail.head(20)

Unnamed: 0,period,stateid,stateDescription,sectorid,sectorName,customers,price,revenue,sales,customers-units,price-units,revenue-units,sales-units
0,2023-01,AK,Alaska,ALL,all sectors,352907.0,20.28,119.22997,587.91571,number of customers,cents per kilowatt-hour,million dollars,million kilowatt hours
1,2023-01,AK,Alaska,COM,commercial,56637.0,20.22,48.06171,237.6831,number of customers,cents per kilowatt-hour,million dollars,million kilowatt hours
2,2023-01,AK,Alaska,IND,industrial,1119.0,17.61,21.08354,119.73471,number of customers,cents per kilowatt-hour,million dollars,million kilowatt hours
3,2023-01,AK,Alaska,OTH,other,,,,,number of customers,cents per kilowatt-hour,million dollars,million kilowatt hours
4,2023-01,AK,Alaska,RES,residential,295151.0,21.73,50.08472,230.4979,number of customers,cents per kilowatt-hour,million dollars,million kilowatt hours
5,2023-01,AK,Alaska,TRA,transportation,0.0,0.0,0.0,0.0,number of customers,cents per kilowatt-hour,million dollars,million kilowatt hours
6,2023-01,AL,Alabama,ALL,all sectors,2729635.0,11.4,803.02308,7043.38231,number of customers,cents per kilowatt-hour,million dollars,million kilowatt hours
7,2023-01,AL,Alabama,COM,commercial,381998.0,13.27,227.42971,1713.24371,number of customers,cents per kilowatt-hour,million dollars,million kilowatt hours
8,2023-01,AL,Alabama,IND,industrial,7208.0,6.81,172.33699,2531.80572,number of customers,cents per kilowatt-hour,million dollars,million kilowatt hours
9,2023-01,AL,Alabama,OTH,other,,,,,number of customers,cents per kilowatt-hour,million dollars,million kilowatt hours


## Validate retail dataset structure

I enforce required column presence and compute an overview QA summary (row counts, missingness, and basic distribution indicators). This checkpoint ensures I’m not silently modeling on incomplete or malformed API responses.


In [10]:
required_retail_cols = ["period","stateid","sectorid","customers","price","revenue","sales"]
assert_required_columns(df_retail, required_retail_cols, "EIA retail-sales")

overview_retail = qa_overview(df_retail, "eia_retail_sales")
overview_retail


{'dataset': 'eia_retail_sales',
 'rows': 10404,
 'cols': 13,
 'duplicate_rows': 0,
 'missing_cells': 6936,
 'missing_pct': 0.05128205128205128}

## Validate state identifiers

I validate that state codes conform to the expected two-letter format and are within the intended domain. Keeping state identifiers clean is critical because the database joins and the panel index both depend on this key.


In [11]:
# Validate state code shape
bad_states = check_state_codes(df_retail["stateid"], "retail.stateid")
bad_states.head(10)


Series([], Name: stateid, dtype: object)

## Confirm sector coverage

I inspect sector distribution to confirm the dataset includes the sectors I expect (with residential as the primary focus). This also helps me detect any unexpected sector codes early, before loading into Postgres.

In [12]:
# Sector distribution
df_retail["sectorid"].value_counts(dropna=False).head(15)


sectorid
ALL    1734
COM    1734
IND    1734
OTH    1734
RES    1734
TRA    1734
Name: count, dtype: int64

In [13]:
# Basic numeric sanity checks
(df_retail[["price","sales","revenue","customers"]]
 .describe(percentiles=[0.01,0.05,0.5,0.95,0.99]))


Unnamed: 0,price,sales,revenue,customers
count,8670,8670,8670,8670
unique,2097,7920,7912,6748
top,0,0,0,0
freq,750,750,750,750


## Persist retail snapshot + QA evidence

I store a parquet snapshot of the raw EIA retail dataset and write QA summaries to disk. This creates a reproducible “raw landing” record and supports traceability from model outputs back to the ingested source.


In [14]:
df_retail.to_parquet(paths.data_interim / "eia_retail_sales_raw.parquet", index=False)
qa_column_profile(df_retail).to_csv(paths.reports_qa / "eia_retail_column_profile.csv")

with open(paths.reports_qa / "eia_retail_overview.json", "w", encoding="utf-8") as f:
    json.dump(overview_retail, f, indent=2)


## Extract generation and fuel controls (EIA API)

I pull monthly electric power operational data from the EIA API over the same state list and date window. These fields become my supply-side controls (generation and fuel cost proxies) to help separate demand-side association from supply-driven price movements.


In [15]:
EIA_OPS_URL = "https://api.eia.gov/v2/electricity/electric-power-operational-data/data/"

ops_params = {
    "frequency": "monthly",
    "data[0]": "consumption-for-eg",
    "data[1]": "consumption-for-eg-btu",
    "data[2]": "cost",
    "data[3]": "generation",
    "data[4]": "total-consumption",
    "data[5]": "total-consumption-btu",
    "start": "2023-01",
    "end": "2025-10",
    "sort[0][column]": "period",
    "sort[0][direction]": "asc",
    "facets[location][]": ["AK","AL","AR","AZ","CA","CO","CT","DC","DE","FL","GA","HI","IA","ID","IL","IN","KS","KY","LA",
                           "MA","MD","ME","MI","MN","MO","MS","MT","NC","ND","NE","NH","NJ","NM","NV","NY","OH","OK","OR",
                           "PA","RI","SC","SD","TN","TX","UT","VA","VT","WA","WI","WV","WY"],
}

ops_res = fetch_eia_v2_all_pages(
    session=session,
    base_url=EIA_OPS_URL,
    base_params=ops_params,
    api_key=settings.eia_api_key,
    page_size=settings.eia_page_size,
    timeout_s=settings.eia_timeout_s,
)

df_ops = ops_res.df
{"rows": len(df_ops), "total_reported": ops_res.total, "cols": df_ops.shape[1]}


{'rows': 429522, 'total_reported': 429522, 'cols': 19}

## Ops extract sanity check

In [16]:
df_ops.head(20)

Unnamed: 0,period,location,stateDescription,sectorid,sectorDescription,fueltypeid,fuelTypeDescription,consumption-for-eg,consumption-for-eg-units,consumption-for-eg-btu,consumption-for-eg-btu-units,cost,cost-units,generation,generation-units,total-consumption,total-consumption-units,total-consumption-btu,total-consumption-btu-units
0,2023-01,AK,Alaska,1,Electric Utility,ALL,all fuels,0.0,thousand physical units,4.23789,million MMBtu,0.0,dollars per physical units,569.12167,thousand megawatthours,0.0,thousand physical units,4.53181,million MMBtu
1,2023-01,AK,Alaska,1,Electric Utility,AOR,all renewables,0.0,thousand physical units,0.01564,million MMBtu,0.0,dollars per physical units,4.58276,thousand megawatthours,0.0,thousand physical units,0.01564,million MMBtu
2,2023-01,AK,Alaska,1,Electric Utility,COL,"coal, excluding waste coal",25.983,thousand short tons,0.40381,million MMBtu,79.87,dollars per short tons,33.93734,thousand megawatthours,43.817,thousand short tons,0.67434,million MMBtu
3,2023-01,AK,Alaska,1,Electric Utility,COW,all coal products,31.504,thousand short tons,0.47116,million MMBtu,70.16,dollars per short tons,39.30983,thousand megawatthours,49.338,thousand short tons,0.7417,million MMBtu
4,2023-01,AK,Alaska,1,Electric Utility,DFO,distillate fuel oil,90.356,thousand short tons,0.51183,million MMBtu,133.34,dollars per short tons,48.04845,thousand megawatthours,92.479,thousand short tons,0.52418,million MMBtu
5,2023-01,AK,Alaska,1,Electric Utility,DPV,estimated small scale solar photovoltaic,,thousand physical units,,million MMBtu,0.0,dollars per physical units,0.0,thousand megawatthours,,thousand physical units,,million MMBtu
6,2023-01,AK,Alaska,1,Electric Utility,FOS,fossil fuels,0.0,thousand physical units,3.72474,million MMBtu,0.0,dollars per physical units,419.1309,thousand megawatthours,0.0,thousand physical units,4.01867,million MMBtu
7,2023-01,AK,Alaska,1,Electric Utility,HYC,conventional hydroelectric,0.0,thousand physical units,0.49751,million MMBtu,0.0,dollars per physical units,145.776,thousand megawatthours,0.0,thousand physical units,0.49751,million MMBtu
8,2023-01,AK,Alaska,1,Electric Utility,LIG,lignite coal,22.417,thousand short tons,0.34971,million MMBtu,79.87,dollars per short tons,27.89334,thousand megawatthours,22.417,thousand short tons,0.34971,million MMBtu
9,2023-01,AK,Alaska,1,Electric Utility,NG,natural gas,2439.779,thousand Mcf,2.44204,million MMBtu,5.36,dollars per Mcf,292.56004,thousand megawatthours,2439.779,thousand Mcf,2.44204,million MMBtu


## Validate ops dataset structure

I enforce required column presence and compute an overview QA summary. This ensures the operational controls I rely on later are complete enough to support modeling and don’t introduce systematic missingness at the state-month level.


In [17]:
required_ops_cols = ["period","location","sectorid","fueltypeid","generation","cost"]
assert_required_columns(df_ops, required_ops_cols, "EIA ops")

overview_ops = qa_overview(df_ops, "eia_power_ops")
overview_ops


{'dataset': 'eia_power_ops',
 'rows': 429522,
 'cols': 19,
 'duplicate_rows': 0,
 'missing_cells': 544561,
 'missing_pct': 0.06672790977681678}

## Validate state identifiers

I validate that state codes conform to the expected two-letter format and are within the intended domain. 

In [18]:
bad_states_ops = check_state_codes(df_ops["location"], "ops.location")
bad_states_ops.head(10)


Series([], Name: location, dtype: object)

## Confirm ops categorical domains

I inspect categorical distributions (fuel type and sector identifiers) to understand the shape of the data I will later roll up into state-month control variables. This step also helps prevent silent category drift from the API.


In [19]:
df_ops["sectorid"].value_counts(dropna=False).head(20), df_ops["fueltypeid"].value_counts(dropna=False).head(20)


(sectorid
 99    57856
 98    47974
 90    46470
 94    43426
 1     40279
 97    35251
 2     35242
 96    29613
 7     27171
 5     16784
 4     16357
 3     15099
 6     10934
 8      5202
 95     1864
 Name: count, dtype: int64,
 fueltypeid
 ALL    20686
 FOS    19178
 REN    18412
 NGO    18246
 NG     18110
 AOR    17895
 PET    16910
 PEL    16808
 DFO    16500
 BIO    14318
 WAS    13406
 SPV    13308
 SUN    13308
 OTH    12020
 COW    11125
 COL    11022
 HYC    10508
 TPV    10290
 TSN    10290
 DPV    10268
 Name: count, dtype: int64)

## Ops distribution spot checks

I run additional frequency checks on key categorical fields to confirm coverage looks reasonable and to anticipate how I will aggregate operational data into stable, interpretable controls.


In [20]:
(df_ops[["generation","cost","consumption-for-eg-btu","total-consumption-btu"]]
 .describe(percentiles=[0.01,0.05,0.5,0.95,0.99]))


Unnamed: 0,generation,cost,consumption-for-eg-btu,total-consumption-btu
count,410246,83157,384792,384792
unique,122773,4426,83262,89112
top,0,0,0,0
freq,66572,65785,68305,66256


## Persist ops snapshot + QA evidence

I store a parquet snapshot of the raw operational dataset and write QA summaries. This preserves the raw pull and supports repeatable reconstruction of the downstream state-month controls.

In [21]:
df_ops.to_parquet(paths.data_interim / "eia_power_ops_raw.parquet", index=False)
qa_column_profile(df_ops).to_csv(paths.reports_qa / "eia_ops_column_profile.csv")

with open(paths.reports_qa / "eia_ops_overview.json", "w", encoding="utf-8") as f:
    json.dump(overview_ops, f, indent=2)


## Schema-safe loading utilities

I define coercion helpers that convert messy numeric and integer-like fields into consistent pandas dtypes compatible with PostgreSQL. This is an engineering upgrade that reduces load failures and prevents silent type drift when the API or CSV formatting varies.


In [22]:
def pg_table_coltypes(engine, schema: str, table: str) -> dict[str, str]:
    q = sa.text("""
        SELECT column_name, data_type
        FROM information_schema.columns
        WHERE table_schema = :schema AND table_name = :table
        ORDER BY ordinal_position;
    """)
    with engine.connect() as conn:
        rows = conn.execute(q, {"schema": schema, "table": table}).fetchall()
    return {r[0]: r[1] for r in rows}

def _coerce_numeric_series(s: pd.Series) -> pd.Series:
    if pd.api.types.is_numeric_dtype(s):
        return s
    x = s.astype("string").str.strip()
    x = x.replace(["", "NA", "N/A", "na", "n/a", "null", "NULL", "None", "NONE", "-", "—"], pd.NA)
    x = x.str.replace(",", "", regex=False)
    x = x.str.replace(r"[^\d\.\-\+]", "", regex=True)
    x = x.replace("", pd.NA)
    return pd.to_numeric(x, errors="coerce")

def _coerce_int_series(s: pd.Series, *, colname: str = "") -> pd.Series:
    """
    Convert messy integer-like values to pandas nullable Int64.
    Handles '6.0' safely. If fractional parts exist, rounds (with warning).
    """
    x = _coerce_numeric_series(s)

    # If fractional values exist (e.g., 6.2), warn + round
    frac = (x.dropna() - np.floor(x.dropna())).abs()
    if len(frac) and (frac > 1e-9).any():
        bad_n = int((frac > 1e-9).sum())
        print(f"⚠️  Warning: {colname} has {bad_n} non-integer values; rounding to nearest int for COPY.")
        x = x.round()

    return x.astype("Int64")

## High-reliability Postgres loaders

I add utilities to introspect table schemas and load DataFrames via COPY-based inserts. This approach is both faster and more schema-faithful than naive inserts, and it improves repeatability by enforcing column alignment against the database definitions.


In [23]:
# -------------------------------
# Postgres introspection helpers
# -------------------------------
def pg_table_columns(engine: sa.Engine, schema: str, table: str) -> list[str]:
    q = """
    SELECT column_name
    FROM information_schema.columns
    WHERE table_schema=:schema AND table_name=:table
    ORDER BY ordinal_position
    """
    with engine.connect() as conn:
        return [r[0] for r in conn.execute(sa.text(q), {"schema": schema, "table": table}).fetchall()]

def pg_table_types(engine: sa.Engine, schema: str, table: str) -> dict[str, str]:
    q = """
    SELECT column_name, data_type
    FROM information_schema.columns
    WHERE table_schema=:schema AND table_name=:table
    """
    with engine.connect() as conn:
        return {r[0]: r[1] for r in conn.execute(sa.text(q), {"schema": schema, "table": table}).fetchall()}

# -------------------------------
# Hashing + safe source_ref
# -------------------------------
def sanitize_source_ref(s: str) -> str:
    # Remove api_key=... if present
    return re.sub(r"([?&])api_key=[^&]+", r"\1api_key=REDACTED", s)

def row_hash(values: list[str | float | int | None]) -> str:
    norm = "|".join("" if v is None else str(v).strip() for v in values)
    return hashlib.sha256(norm.encode("utf-8")).hexdigest()

# -------------------------------
# Robust coercion helpers
# -------------------------------
_NUM_CLEAN_RE = re.compile(r"[,\s]") 

def _coerce_numeric_series(s: pd.Series) -> pd.Series:
    """
    Coerce a series to numeric:
    - strips commas (e.g. "1,200")
    - converts blank/NA-ish to NaN
    """
    if s is None:
        return s
    if pd.api.types.is_numeric_dtype(s):
        return pd.to_numeric(s, errors="coerce")
    x = s.astype("string").str.strip()
    x = x.replace({"": pd.NA, "NA": pd.NA, "N/A": pd.NA, "None": pd.NA, "null": pd.NA, "NULL": pd.NA})
    x = x.str.replace(_NUM_CLEAN_RE, "", regex=True)
    return pd.to_numeric(x, errors="coerce")

def _coerce_int_series(s: pd.Series) -> pd.Series:
    """
    Coerce to pandas nullable Int64, handling things like "6.0" safely.
    """
    x = _coerce_numeric_series(s)
    x = np.where(pd.isna(x), np.nan, np.round(x).astype("float"))
    return pd.Series(x).astype("Int64")

def prepare_df_for_raw_load(
    df: pd.DataFrame,
    table_cols: list[str],
    table_types: dict[str, str],
    source_name: str,
    source_ref: str,
    dedupe_on: list[str] | None = None,
    row_hash_cols: list[str] | None = None,
) -> pd.DataFrame:
    """
    Align df to Postgres raw table columns and add ingestion metadata.
    - keeps only table_cols
    - coerces NUMERIC/INTEGER-ish columns
    - adds source_name/source_ref/source_row_hash
    """
    df2 = df.copy()
    def _normalize_col(name: str) -> str:
        name = str(name).strip().replace("-", "_").replace(" ", "_")
        s = re.sub(r"(.)([A-Z][a-z]+)", r"\1_\2", name)
        s = re.sub(r"([a-z0-9])([A-Z])", r"\1_\2", s)
        s = re.sub(r"__+", "_", s)
        return s.lower()

    rename = {c: _normalize_col(c) for c in df2.columns}
    df2 = df2.rename(columns=rename)
    if table_cols:
        alias: dict[str, str] = {}
        for c in list(df2.columns):
            if c in table_cols:
                continue
            c_no_us = c.replace("_", "")
            if c_no_us in table_cols and c_no_us not in df2.columns:
                alias[c] = c_no_us
        overrides = {
            "customers_units": "customerunits",
            "state_description": "statedescription",
            "sector_name": "sectorname",
            "sector_description": "sectordescription",
            "fuel_type_description": "fueltypedescription",
            "consumption_for_eg": "comsumption_for_eg",
            "consumption_for_eg_units": "comsumption_for_eg_units",
            "consumption_for_eg_btu": "comsumption_for_eg_btu",
            "consumption_for_eg_btu_units": "comsumption_for_eg_btu_units",
        }
        for src_col, dst_col in overrides.items():
            if src_col in df2.columns and dst_col in table_cols and dst_col not in df2.columns:
                alias[src_col] = dst_col

        if alias:
            df2 = df2.rename(columns=alias)
    df2["source_name"] = source_name
    df2["source_ref"] = source_ref
    business_cols = [c for c in df2.columns if c not in ("ingested_at", "source_name", "source_ref", "source_row_hash")]
    use_hash_cols = row_hash_cols or business_cols
    use_hash_cols = [c for c in use_hash_cols if c in df2.columns]
    df2["source_row_hash"] = df2[use_hash_cols].astype("string").fillna("").agg(lambda r: row_hash(list(r)), axis=1)

    for c in table_cols:
        if c not in df2.columns:
            df2[c] = pd.NA
    df2 = df2[table_cols].copy()
    for col, dtype in table_types.items():
        if col not in df2.columns:
            continue
        dt = dtype.lower()
        if dt in ("numeric", "double precision", "real", "decimal"):
            df2[col] = _coerce_numeric_series(df2[col])
        elif dt in ("integer", "bigint", "smallint"):
            df2[col] = _coerce_int_series(df2[col])
    if dedupe_on:
        keep = [c for c in dedupe_on if c in df2.columns]
        if keep:
            df2 = df2.drop_duplicates(subset=keep, keep="last").copy()

    return df2

# -------------------------------
# COPY loader (fast + robust)
# -------------------------------
def copy_df_to_postgres(
    engine: sa.Engine,
    schema: str,
    table: str,
    df: pd.DataFrame,
    mode: str = "append", 
    chunk_rows: int = 50_000,
) -> None:
    """
    Uses psycopg3 COPY for speed. Expects df columns already aligned to table.
    """
    if df.empty:
        print(f"[COPY] No rows to load into {schema}.{table}")
        return

    table_cols = pg_table_columns(engine, schema, table)
    missing = [c for c in table_cols if c not in df.columns]
    if missing:
        raise ValueError(f"DF missing columns for COPY into {schema}.{table}: {missing}")

    # Ensure column order matches table
    df = df[table_cols].copy()
    def _to_csv_chunk(d: pd.DataFrame) -> str:
        return d.to_csv(index=False, header=False, na_rep="", lineterminator="\n")

    raw_conn = engine.raw_connection()
    try:
        with raw_conn.cursor() as cur:
            if mode == "truncate":
                cur.execute(sa.text(f'TRUNCATE TABLE "{schema}"."{table}"').text)

            col_list = ", ".join([f'"{c}"' for c in table_cols])
            copy_sql = f'COPY "{schema}"."{table}" ({col_list}) FROM STDIN WITH (FORMAT CSV, NULL \'\')'

            with cur.copy(copy_sql) as copy:
                for start in range(0, len(df), chunk_rows):
                    chunk = df.iloc[start:start+chunk_rows]
                    copy.write(_to_csv_chunk(chunk))
        raw_conn.commit()
        print(f"[COPY] Loaded {len(df):,} rows into {schema}.{table} ({mode})")
    finally:
        raw_conn.close()

## Load raw facilities to Postgres

I load the facilities dataset into the raw database layer so PostgreSQL becomes the authoritative source for downstream transforms. Landing raw data in the database supports traceability and makes all later aggregation/join logic centralized and testable.


In [24]:
# --- LOAD: FracTracker -> raw.fractracker_datacenters ---

frac_cols  = pg_table_columns(engine, "raw", "fractracker_datacenters")
frac_types = pg_table_types(engine, "raw", "fractracker_datacenters")

assert "df_dc" in globals(), "df_dc not found. Run the FracTracker CSV ingestion cell first."
assert "FRAC_PATH" in globals(), "FRAC_PATH not found. Add FRAC_PATH = fractracker_csv after setting fractracker_csv."

df_dc_load = prepare_df_for_raw_load(
    df=df_dc,
    table_cols=frac_cols,
    table_types=frac_types,
    source_name="fractracker_csv",
    source_ref=str(Path(FRAC_PATH).name),
    dedupe_on=["facility_id"],
    row_hash_cols=["facility_id", "name", "state", "county", "city", "lat", "long", "status", "mw_high", "sizerank", "facility_size_sqft"],
)
df_dc_load['ingested_at'] = pd.Timestamp.now()
copy_df_to_postgres(engine, "raw", "fractracker_datacenters", df_dc_load, mode="truncate")

with engine.connect() as conn:
    n = conn.execute(sa.text('SELECT COUNT(*) FROM raw.fractracker_datacenters')).scalar_one()
print("raw.fractracker_datacenters rowcount:", n)


[COPY] Loaded 1,260 rows into raw.fractracker_datacenters (truncate)
raw.fractracker_datacenters rowcount: 1260


## Defensive handling for retail extract naming

I add a small compatibility step to ensure the retail DataFrame reference is available even if earlier variable naming differs. This keeps the pipeline resilient to notebook iteration without changing the underlying data logic.


In [25]:
def _pick_first_existing_df(*names: str):
    for n in names:
        if n in globals() and isinstance(globals()[n], pd.DataFrame) and not globals()[n].empty:
            return globals()[n]
    return None

def _load_cached_retail_df(raw_dir: Path) -> pd.DataFrame | None:
    candidates = [
        raw_dir / "eia_retail_sales.parquet",
        raw_dir / "eia_retail_sales.csv",
        raw_dir / "eia_retail_sales.json",
        raw_dir / "df_retail_raw.parquet",
        raw_dir / "df_retail_raw.csv",
        raw_dir / "df_retail_raw.json",
    ]
    for p in candidates:
        if p.exists():
            if p.suffix == ".parquet":
                return pd.read_parquet(p)
            if p.suffix == ".csv":
                return pd.read_csv(p)
            if p.suffix == ".json":
                return pd.read_json(p)
    return None

# ---- Ensure df_retail_raw exists ----
df_retail_raw = _pick_first_existing_df("df_retail_raw", "df_retail", "df_eia_retail")

if df_retail_raw is None:
    cached = _load_cached_retail_df(RAW_DIR)
    if cached is not None and not cached.empty:
        df_retail_raw = cached

if df_retail_raw is None or df_retail_raw.empty:
    raise NameError(
        "df_retail_raw is not defined (and no cached retail file found in data/raw/). "
        "Run your EIA Retail API extraction cell first (the one that fetches the retail endpoint), "
        "or save the extracted df to data/raw/eia_retail_sales.parquet and rerun this cell."
    )

print("✅ df_retail_raw ready:", df_retail_raw.shape)
print("Columns:", list(df_retail_raw.columns)[:20], "...")

✅ df_retail_raw ready: (10404, 13)
Columns: ['period', 'stateid', 'stateDescription', 'sectorid', 'sectorName', 'customers', 'price', 'revenue', 'sales', 'customers-units', 'price-units', 'revenue-units', 'sales-units'] ...


## Load raw retail outcomes to Postgres

I load the EIA retail outcomes into the raw database layer using schema-aware logic. This ensures that the state-month-sector measures are queryable and joinable in SQL, which is especially important when I construct the curated mart.


In [26]:
# --- LOAD: EIA Retail Sales -> raw.eia_retail_sales ---
_NUMERIC_RE = re.compile(r"^\s*[-+]?(\d+(\.\d+)?|\.\d+)\s*$")

def _coerce_numeric_series(s: pd.Series) -> pd.Series:
    """
    Convert common messy numeric strings to floats:
    - removes commas: "1,200" -> "1200"
    - removes currency/units chars: "$12.3" -> "12.3"
    - converts blanks/NA-like to NaN
    Leaves already-numeric values as-is.
    """
    if s.dtype.kind in "if":  # already numeric
        return s

    # Convert to string, preserve nulls
    x = s.astype("string")

    # Normalize common null tokens
    x = x.str.strip()
    x = x.replace(
        ["", "NA", "N/A", "na", "n/a", "null", "NULL", "None", "NONE", "-", "—"],
        pd.NA,
    )
    x = x.str.replace(",", "", regex=False)
    x = x.str.replace(r"[^\d\.\-\+]", "", regex=True)

    x = x.replace("", pd.NA)

    return pd.to_numeric(x, errors="coerce")
# --- LOAD: EIA Retail -> raw.eia_retail_sales ---

retail_cols  = pg_table_columns(engine, "raw", "eia_retail_sales")
retail_types = pg_table_types(engine, "raw", "eia_retail_sales")

df_retail_in = None
for candidate in ["df_retail_raw", "df_retail", "df_eia_retail", "df_eia_retail_sales"]:
    if candidate in globals():
        df_retail_in = globals()[candidate]
        print("Using retail df:", candidate, df_retail_in.shape)
        break
if df_retail_in is None:
    raise NameError("Could not find a retail dataframe. Expected one of: df_retail_raw, df_retail, df_eia_retail, df_eia_retail_sales")

src_ref = sanitize_source_ref(globals().get("EIA_RETAIL_ENDPOINT", "eia_api_v2_retail_sales"))

df_retail_load = prepare_df_for_raw_load(
    df=df_retail_in,
    table_cols=retail_cols,
    table_types=retail_types,
    source_name="eia_api_v2_retail_sales",
    source_ref=src_ref,
    dedupe_on=["period", "stateid", "sectorid"],
    row_hash_cols=["period", "stateid", "sectorid", "customers", "price", "revenue", "sales"],
)
df_retail_load['ingested_at'] = pd.Timestamp.now()
copy_df_to_postgres(engine, "raw", "eia_retail_sales", df_retail_load, mode="truncate")

with engine.connect() as conn:
    n = conn.execute(sa.text('SELECT COUNT(*) FROM raw.eia_retail_sales')).scalar_one()
print("raw.eia_retail_sales rowcount:", n)

Using retail df: df_retail_raw (10404, 13)
[COPY] Loaded 10,404 rows into raw.eia_retail_sales (truncate)
raw.eia_retail_sales rowcount: 10404


## Load raw operational controls to Postgres

I load EIA operational data into the raw layer so fuel/generation measures can be aggregated with SQL into consistent state-month controls. Keeping this logic database-centered improves reproducibility and reduces in-notebook join complexity.


In [27]:
# --- LOAD: EIA Ops -> raw.eia_power_ops ---

ops_cols  = pg_table_columns(engine, "raw", "eia_power_ops")
ops_types = pg_table_types(engine, "raw", "eia_power_ops")

df_ops_in = None
for candidate in ["df_ops_raw", "df_ops", "df_eia_ops", "df_eia_power_ops"]:
    if candidate in globals():
        df_ops_in = globals()[candidate]
        print("Using ops df:", candidate, df_ops_in.shape)
        break
if df_ops_in is None:
    raise NameError("Could not find an ops dataframe. Expected one of: df_ops_raw, df_ops, df_eia_ops, df_eia_power_ops")

src_ref = sanitize_source_ref(globals().get("EIA_OPS_ENDPOINT", "eia_api_v2_power_ops"))

df_ops_load = prepare_df_for_raw_load(
    df=df_ops_in,
    table_cols=ops_cols,
    table_types=ops_types,
    source_name="eia_api_v2_power_ops",
    source_ref=src_ref,
    dedupe_on=["period", "location", "sectorid", "fueltypeid"],
    row_hash_cols=["period", "location", "sectorid", "fueltypeid", "generation", "cost", "total_consumption_btu"],
)
df_ops_load['ingested_at'] = pd.Timestamp.now()
copy_df_to_postgres(engine, "raw", "eia_power_ops", df_ops_load, mode="truncate")

with engine.connect() as conn:
    n = conn.execute(sa.text('SELECT COUNT(*) FROM raw.eia_power_ops')).scalar_one()
print("raw.eia_power_ops rowcount:", n)


Using ops df: df_ops (429522, 19)
[COPY] Loaded 429,522 rows into raw.eia_power_ops (truncate)
raw.eia_power_ops rowcount: 429522


## Automated profiling (raw layer)

I generate profiling reports for the three raw datasets (facilities, retail, ops). These reports provide a transparent, repeatable record of distributions, missingness, and anomalies that inform the cleaning and modeling decisions downstream.


In [28]:
write_profile_report(df_dc, paths.reports_eda / "profiling_fractracker_raw.html", "FracTracker Data Centers (Raw)")
write_profile_report(df_retail, paths.reports_eda / "profiling_eia_retail_raw.html", "EIA Retail Sales (Raw)", sample=300_000)
write_profile_report(df_ops, paths.reports_eda / "profiling_eia_ops_raw.html", "EIA Power Ops (Raw)", sample=300_000)


2026-01-10 17:20:01,007 | INFO | Pandas backend loaded 2.3.2
2026-01-10 17:20:01,062 | INFO | Numpy backend loaded 2.3.3
2026-01-10 17:20:01,067 | INFO | Pyspark backend NOT loaded
2026-01-10 17:20:01,069 | INFO | Python backend loaded
Summarize dataset:  13%|████                          | 6/45 [00:00<00:03, 12.08it/s, Describe variable: info_source_6]
Summarize dataset:  49%|███████████▏           | 22/45 [00:00<00:00, 38.65it/s, Describe variable: property_size_acres][A
Summarize dataset:  64%|███████████████████████▏            | 29/45 [00:00<00:00, 44.34it/s, Describe variable: county][A
100%|██████████████████████████████████████████████████████████████████████████████████| 40/40 [00:00<00:00, 69.65it/s][A
Summarize dataset: 100%|████████████████████████████████████████████████████| 46/46 [00:01<00:00, 45.13it/s, Completed]
Generate report structure: 100%|█████████████████████████████████████████████████████████| 1/1 [00:33<00:00, 33.66s/it]
Render HTML: 100%|█████████████████

## Pull raw facilities from Postgres for transformation

I read the facilities table from Postgres using a schema-aligned query so the transformation stage starts from the database source of truth. This also helps ensure that the pipeline behavior remains consistent across machines/environments.


In [29]:
# --- READ: FracTracker raw from DB (for cleaning + imputation) ---

cols = set(pg_table_columns(engine, "raw", "fractracker_datacenters"))

wanted = [
    "facility_id",
    "name",
    "address",
    "city",
    "state",
    "zip",
    "county",
    "lat",
    "long",
    "status",
    "status_detail",
    "expected_date_online",
    "mw_high",
    "sizerank",
    "sizerank_numeric",
    "facility_size_sqft",
    "property_size_acres",
    "number_of_generators",
    "number_of_buildings",
    "power_source",
    "dedicated_power_plant",
    "cooling_type",
    "cooling_source",
    "purpose",
    "operator",  
    "tenant",
    "source_row_hash",
    "ingested_at",
]

select_exprs = [c for c in wanted if c in cols]

RAW_DC_SQL = f"""
SELECT {", ".join(select_exprs)}
FROM raw.fractracker_datacenters;
"""

df_dc_raw = pd.read_sql(RAW_DC_SQL, engine)
print(df_dc_raw.shape)
df_dc_raw.head(3)


(1260, 28)


Unnamed: 0,facility_id,name,address,city,state,zip,county,lat,long,status,status_detail,expected_date_online,mw_high,sizerank,sizerank_numeric,facility_size_sqft,property_size_acres,number_of_generators,number_of_buildings,power_source,dedicated_power_plant,cooling_type,cooling_source,purpose,operator,tenant,source_row_hash,ingested_at
0,1,Logistic Land Investments LLC developer,Rock Mountain Lake Rd,Bessemer,AL,35022.0,,33.342,-87.0341,Proposed,,,1200.0,"Mega campus (>1,000 MW)",6.0,,675.0,,18.0,,,,,,,,162404f7428395dd34759f3fa26cea0265fc3230254203...,2026-01-10 22:18:16.481519+00:00
1,2,DC Blox,433 6th St S,Birmingham,AL,35233.0,,33.50051,-86.821,Operating,,,,Unknown,1.0,,,,,,,,,,,,51ff6fa6dd976bae0c1ba4e6d6b4a57cd12c5c9b8d57d2...,2026-01-10 22:18:16.481519+00:00
2,3,Google data center,48809 Alabama 277,Bridgeport,AL,35740.0,,34.92019,-85.7446,Unknown,,,,Unknown,1.0,,360.0,,,,,,,,Google,,7bf54df3590be2128eb694de6ba30d09a7b6d62ce6fa70...,2026-01-10 22:18:16.481519+00:00


## Reusable QA checkpoint helper

I define a compact QA overview helper for quick checkpoints during transformation. Keeping this function in the notebook lets me measure row counts, duplicates, and missingness repeatedly as I filter and derive fields.


In [30]:
def qa_overview(df: pd.DataFrame, name: str, key_cols: list[str] | None = None) -> pd.DataFrame:
    out = []
    out.append(("rows", len(df)))
    out.append(("cols", df.shape[1]))
    if key_cols:
        out.append(("duplicate_keys", int(df.duplicated(key_cols).sum())))
        out.append(("null_keys_any", int(df[key_cols].isna().any(axis=1).sum())))
    miss = (df.isna().mean() * 100).sort_values(ascending=False).head(15)
    print(f"\n=== QA: {name} ===")
    for k,v in out:
        print(f"{k:>18}: {v}")
    print("\nTop missing (%):")
    display(miss.to_frame("missing_%"))
    return miss.to_frame("missing_%")

_ = qa_overview(df_dc_raw, "FracTracker raw facilities", key_cols=["facility_id"])



=== QA: FracTracker raw facilities ===
              rows: 1260
              cols: 28
    duplicate_keys: 0
     null_keys_any: 0

Top missing (%):


Unnamed: 0,missing_%
facility_size_sqft,100.0
number_of_generators,99.920635
dedicated_power_plant,99.920635
cooling_type,98.730159
tenant,98.650794
cooling_source,98.492063
power_source,97.619048
purpose,97.460317
expected_date_online,97.142857
status_detail,95.079365


## Standardize facility attributes

I normalize facility fields (types, text normalization, state formatting) and prepare the operating-facility subset for MW inference. This step converts the raw facility dataset into a clean, modeling-ready input for the imputation stage.


In [31]:
def to_num(x):
    if x is None:
        return np.nan
    if isinstance(x, (int, float, np.number)):
        return float(x)
    s = str(x).strip()
    if s == "" or s.lower() in {"na", "n/a", "nan", "none", "null", "unknown"}:
        return np.nan
    s = s.replace(",", "")
    try:
        return float(s)
    except:
        return np.nan

df = df_dc_raw.copy()

# Normalize core text fields
for col in ["state","status","status_detail","purpose","power_source","cooling_type","cooling_source","dedicated_power_plant"]:
    if col in df.columns:
        df[col] = df[col].astype("string").str.strip()

# Numeric conversions for model features
num_cols = [
    "lat","long","facility_size_sqft","property_size_acres",
    "number_of_generators","number_of_buildings","mw_high","sizerank_numeric"
]
for c in num_cols:
    if c in df.columns:
        df[c] = df[c].map(to_num)

# Make state uppercase 2-letter
df["state"] = df["state"].str.upper()

# Keep only valid-looking states for modeling
df = df[df["state"].str.fullmatch(r"[A-Z]{2}", na=False)].copy()

df.shape


(1260, 28)

## Define imputation training and prediction sets

I filter to operating facilities and partition rows into (a) records with valid MW values for training and (b) records missing or invalid MW values for prediction. This makes the imputation step measurable and auditable.


In [34]:
df_op = df[df["status"].str.upper() == "OPERATING"].copy()
y = df_op["mw_high"].copy()

train_mask = y.notna() & (y > 0)
pred_mask  = y.isna() | (y <= 0)

print("Operating rows:", len(df_op))
print("Train rows (MW known):", int(train_mask.sum()))
print("Predict rows (MW missing):", int(pred_mask.sum()))
print(df_op.columns)

Operating rows: 494
Train rows (MW known): 163
Predict rows (MW missing): 331
Index(['facility_id', 'name', 'address', 'city', 'state', 'zip', 'county', 'lat', 'long', 'status', 'status_detail',
       'expected_date_online', 'mw_high', 'sizerank', 'sizerank_numeric', 'facility_size_sqft', 'property_size_acres',
       'number_of_generators', 'number_of_buildings', 'power_source', 'dedicated_power_plant', 'cooling_type', 'cooling_source', 'purpose',
       'operator', 'tenant', 'source_row_hash', 'ingested_at'],
      dtype='object')


## Build feature matrix for MW inference

I select a feature set grounded in facility attributes (size, footprint, generators, geography, and categorical descriptors) and standardize missing categorical values into an explicit “UNKNOWN” bucket. This improves model robustness while preserving transparency about missing source data.


In [35]:
NUM_FEATURES = [
    "facility_size_sqft",
    "property_size_acres",
    "number_of_generators",
    "number_of_buildings",
    "lat",
    "long"
]

CAT_FEATURES = [
    "state",
    "purpose",
    "power_source",
    "cooling_type",
    "cooling_source",
    "dedicated_power_plant",
    "tier_classification"
]

NUM_FEATURES = [c for c in NUM_FEATURES if c in df_op.columns]
CAT_FEATURES = [c for c in CAT_FEATURES if c in df_op.columns]

X = df_op[NUM_FEATURES + CAT_FEATURES].copy()

for c in CAT_FEATURES:
    X[c] = X[c].fillna("UNKNOWN").astype("string").str.strip().replace({"": "UNKNOWN"})


## Evaluate MW inference model (validated imputation)

Instead of relying on a single heuristic, I evaluate an imputation model using cross-validation and standard regression metrics (RMSE/MAE/R²). This is an intentional upgrade: it keeps the imputation defensible by measuring expected error on facilities where MW is known.


In [36]:
def rmse(y_true, y_pred):
    return float(np.sqrt(mean_squared_error(y_true, y_pred)))

def eval_cv_predictions(y_true, y_pred):
    return {
        "rmse": rmse(y_true, y_pred),
        "mae": float(mean_absolute_error(y_true, y_pred)),
        "r2": float(r2_score(y_true, y_pred)),
    }

X_train = X.loc[train_mask].copy()
y_train = y.loc[train_mask].copy()

y_train_log = np.log1p(y_train)

cv = KFold(n_splits=5, shuffle=True, random_state=RANDOM_SEED)

candidates = []


catboost_available = False
try:
    from catboost import CatBoostRegressor
    catboost_available = True

    cat_idx = [X_train.columns.get_loc(c) for c in CAT_FEATURES]

    oof = np.zeros(len(X_train), dtype=float)
    for fold, (tr, va) in enumerate(cv.split(X_train), start=1):
        model = CatBoostRegressor(
            loss_function="RMSE",
            depth=8,
            learning_rate=0.05,
            iterations=2000,
            random_seed=RANDOM_SEED,
            verbose=False
        )
        model.fit(
            X_train.iloc[tr], y_train_log.iloc[tr],
            cat_features=cat_idx,
            eval_set=(X_train.iloc[va], y_train_log.iloc[va]),
            use_best_model=True
        )
        oof[va] = model.predict(X_train.iloc[va])

    oof_mw = np.expm1(oof)
    metrics = eval_cv_predictions(y_train.values, oof_mw)
    candidates.append(("catboost", metrics))
    print("CatBoost CV:", metrics)

except Exception as e:
    print("CatBoost not available or failed import:", repr(e))

# --- Candidate B: sklearn HistGradientBoosting (robust baseline) ---


numeric_tf = Pipeline(steps=[
    ("imputer", SimpleImputer(strategy="median"))
])

categorical_tf = Pipeline(steps=[
    ("imputer", SimpleImputer(strategy="most_frequent")),
    ("onehot", OneHotEncoder(handle_unknown="ignore", sparse_output=False))
])

pre = ColumnTransformer(
    transformers=[
        ("num", numeric_tf, NUM_FEATURES),
        ("cat", categorical_tf, CAT_FEATURES),
    ],
    remainder="drop"
)

sk_model = HistGradientBoostingRegressor(
    random_state=RANDOM_SEED,
    max_depth=6,
    learning_rate=0.05,
    max_iter=800
)

sk_pipe = Pipeline(steps=[("pre", pre), ("model", sk_model)])

oof = np.zeros(len(X_train), dtype=float)
for fold, (tr, va) in enumerate(cv.split(X_train), start=1):
    sk_pipe.fit(X_train.iloc[tr], y_train_log.iloc[tr])
    oof[va] = sk_pipe.predict(X_train.iloc[va])

oof_mw = np.expm1(oof)
metrics = eval_cv_predictions(y_train.values, oof_mw)
candidates.append(("sklearn_hgbr", metrics))
print("sklearn HGBR CV:", metrics)

best_algo, best_metrics = sorted(candidates, key=lambda x: x[1]["rmse"])[0]
print("\nBEST:", best_algo, best_metrics)


CatBoost CV: {'rmse': 274.5759108811738, 'mae': 97.88754730821715, 'r2': 0.06878641341973857}
sklearn HGBR CV: {'rmse': 292.1591435248276, 'mae': 109.5730466014038, 'r2': -0.05429804981514352}

BEST: catboost {'rmse': 274.5759108811738, 'mae': 97.88754730821715, 'r2': 0.06878641341973857}


## Impute MW and enforce reasonable bounds

I train the final model on known-MW facilities, generate predictions for all operating facilities, and construct a single MW_final field that uses the observed value when available and the predicted value otherwise. I also cap extreme predictions using a high-quantile rule to reduce the impact of outliers on state-level aggregation.


In [37]:
X_all = X.copy()

if best_algo == "catboost":
    cat_idx = [X_train.columns.get_loc(c) for c in CAT_FEATURES]
    final_model = CatBoostRegressor(
        loss_function="RMSE",
        depth=8,
        learning_rate=0.05,
        iterations=2000,
        random_seed=RANDOM_SEED,
        verbose=False
    )
    final_model.fit(X_train, y_train_log, cat_features=cat_idx, verbose=False)
    pred_log = final_model.predict(X_all)
else:
    # sklearn pipeline
    final_model = sk_pipe
    final_model.fit(X_train, y_train_log)
    pred_log = final_model.predict(X_all)

pred_mw = np.expm1(pred_log)

q995 = float(np.nanquantile(y_train.values, 0.995))
upper_cap = max(q995 * 1.5, q995)  # allow some headroom
pred_mw = np.clip(pred_mw, 0, upper_cap)

df_op["mw_pred"] = pred_mw
df_op["mw_final"] = np.where(train_mask.values, df_op["mw_high"], df_op["mw_pred"])

df_op[["mw_high","mw_pred","mw_final"]].describe(percentiles=[0.5,0.9,0.95,0.99])


Unnamed: 0,mw_high,mw_pred,mw_final
count,163.0,494.0,494.0
mean,113.119632,52.081817,68.099149
std,285.413214,72.256656,169.560215
min,0.1,1.019715,0.1
50%,16.0,36.110608,36.041937
90%,300.0,104.833588,112.1
95%,575.4,155.223835,182.686825
99%,1400.0,333.733588,1000.0
max,2200.0,787.621667,2200.0


## Derive size rank consistently from MW_final

I map MW_final into an interpretable size-rank scale (small → hyperscale → mega campus) and reconcile it with any existing size rank fields. This guarantees that downstream “intensity” measures can be summarized consistently even when the source dataset marks size as unknown.


In [38]:
def sizerank_from_mw(mw: float) -> tuple[str, int]:
    if mw is None or np.isnan(mw) or mw <= 0:
        return ("Unknown", 1)
    if mw <= 10:
        return ("Small (0–10 MW)", 2)
    if mw <= 50:
        return ("Medium (11–50 MW)", 3)
    if mw <= 100:
        return ("Large (51–100 MW)", 4)
    if mw <= 999:
        return ("Hyperscale (101–999 MW)", 5)
    return ("Mega campus (>1,000 MW)", 6)

sr = df_op["mw_final"].apply(sizerank_from_mw)
df_op["sizerank_inferred"] = sr.apply(lambda x: x[0])
df_op["sizerank_numeric_inferred"] = sr.apply(lambda x: x[1])

df_op["sizerank_final"] = np.where(
    df_op["sizerank"].fillna("Unknown").str.contains("Unknown", case=False),
    df_op["sizerank_inferred"],
    df_op["sizerank"].fillna(df_op["sizerank_inferred"])
)

df_op["sizerank_numeric_final"] = np.where(
    df_op["sizerank_numeric"].fillna(1).astype(float) == 1,
    df_op["sizerank_numeric_inferred"],
    df_op["sizerank_numeric"]
)

df_op[["sizerank","sizerank_final","sizerank_numeric","sizerank_numeric_final"]].head(10)


Unnamed: 0,sizerank,sizerank_final,sizerank_numeric,sizerank_numeric_final
1,Unknown,Medium (11–50 MW),1.0,3.0
3,Unknown,Medium (11–50 MW),1.0,3.0
4,Unknown,Medium (11–50 MW),1.0,3.0
14,Unknown,Hyperscale (101–999 MW),1.0,5.0
19,Unknown,Hyperscale (101–999 MW),1.0,5.0
26,Small (0-10 MW),Small (0-10 MW),2.0,2.0
39,Unknown,Hyperscale (101–999 MW),1.0,5.0
40,Small (0-10 MW),Small (0-10 MW),2.0,2.0
43,Unknown,Hyperscale (101–999 MW),1.0,5.0
44,Unknown,Hyperscale (101–999 MW),1.0,5.0


## Persist imputation as a first-class, auditable run

I write the imputation run metadata (algorithm, parameters, metrics) and the per-facility imputed MW values into a dedicated staging schema with a run_id. Treating imputation as versioned data—rather than a hidden transformation—supports reproducibility, lineage, and future re-runs with improved models.


In [39]:
def pg_conn():
    return psycopg.connect(
        host=os.environ.get("PGHOST", "localhost"),
        port=os.environ.get("PGPORT", "5432"),
        dbname=os.environ["PGDATABASE"],
        user=os.environ["PGUSER"],
        password=os.environ["PGPASSWORD"],
        row_factory=dict_row
    )




def fetch_df(query: str, params: tuple | None = None) -> pd.DataFrame:
    with pg_conn() as conn:
        with conn.cursor() as cur:
            cur.execute(query, params or ())
            rows = cur.fetchall()
    return pd.DataFrame(rows)

def fetch_scalar(query: str, params: tuple | None = None):
    with pg_conn() as conn:
        with conn.cursor() as cur:
            cur.execute(query, params or ())
            row = cur.fetchone()
    if not row:
        return None
    return list(row.values())[0]
run_id = str(uuid.uuid4())
run_ts = datetime.now(timezone.utc)

algo_params = {
    "algorithm": best_algo,
    "random_seed": RANDOM_SEED,
    "num_features": NUM_FEATURES,
    "cat_features": CAT_FEATURES,
    "target_transform": "log1p(mw_high)",
    "prediction_cap_rule": "cap at 1.5 * q99.5 of training MW"
}
run_metrics = best_metrics

df_imp = df_op.loc[
    pred_mask.values,
    ["facility_id", "mw_pred", "mw_final", "sizerank_final", "sizerank_numeric_final"]
].copy()

missing_final = df_imp["mw_final"].isna()
if missing_final.any():
    donor = df_op.loc[df_op["mw_high"].notna(), ["mw_high", "sizerank_final"]].copy()

    by_rank_median = donor.groupby("sizerank_final")["mw_high"].median()
    overall_median = donor["mw_high"].median()

    df_imp.loc[missing_final, "mw_final"] = (
        df_imp.loc[missing_final, "sizerank_final"]
        .map(by_rank_median)
        .fillna(overall_median)
    )

    df_imp.loc[missing_final, "mw_pred"] = df_imp.loc[missing_final, "mw_final"]

if df_imp["mw_final"].isna().any():
    bad = df_imp[df_imp["mw_final"].isna()].head(10)
    raise ValueError(f"mw_final still has NULLs after fallback. Sample:\n{bad}")

df_imp["diagnostics"] = df_imp.apply(lambda r: {
    "run_id": run_id,
    "mw_pred": float(r["mw_pred"]) if pd.notna(r["mw_pred"]) else None,
    "mw_final": float(r["mw_final"]) if pd.notna(r["mw_final"]) else None,
    "sizerank_final": r["sizerank_final"],
    "sizerank_numeric_final": int(r["sizerank_numeric_final"]) if pd.notna(r["sizerank_numeric_final"]) else None,
}, axis=1)


# ----------------------------
# B) Persist run + results (schema-safe, NOT NULL-safe)
# ----------------------------
with pg_conn() as conn:
    with conn.cursor() as cur:

        cur.execute("CREATE SCHEMA IF NOT EXISTS stage;")

        cur.execute("""
        CREATE TABLE IF NOT EXISTS stage.dc_mw_imputation_run (
          run_id UUID PRIMARY KEY,
          run_ts TIMESTAMPTZ NOT NULL,
          algorithm TEXT NOT NULL,
          params JSONB NOT NULL,
          metrics JSONB NOT NULL,
          is_current BOOLEAN NOT NULL DEFAULT FALSE
        );
        """)

        cur.execute("""ALTER TABLE stage.dc_mw_imputation_run ADD COLUMN IF NOT EXISTS run_ts TIMESTAMPTZ;""")
        cur.execute("""ALTER TABLE stage.dc_mw_imputation_run ADD COLUMN IF NOT EXISTS algorithm TEXT;""")
        cur.execute("""ALTER TABLE stage.dc_mw_imputation_run ADD COLUMN IF NOT EXISTS params JSONB;""")
        cur.execute("""ALTER TABLE stage.dc_mw_imputation_run ADD COLUMN IF NOT EXISTS metrics JSONB;""")
        cur.execute("""ALTER TABLE stage.dc_mw_imputation_run ADD COLUMN IF NOT EXISTS is_current BOOLEAN;""")

        cur.execute("UPDATE stage.dc_mw_imputation_run SET run_ts = COALESCE(run_ts, now()) WHERE run_ts IS NULL;")
        cur.execute("UPDATE stage.dc_mw_imputation_run SET algorithm = COALESCE(algorithm, 'unknown') WHERE algorithm IS NULL;")
        cur.execute("UPDATE stage.dc_mw_imputation_run SET params = COALESCE(params, '{}'::jsonb) WHERE params IS NULL;")
        cur.execute("UPDATE stage.dc_mw_imputation_run SET metrics = COALESCE(metrics, '{}'::jsonb) WHERE metrics IS NULL;")
        cur.execute("UPDATE stage.dc_mw_imputation_run SET is_current = COALESCE(is_current, FALSE) WHERE is_current IS NULL;")

        cur.execute("ALTER TABLE stage.dc_mw_imputation_run ALTER COLUMN run_ts SET NOT NULL;")
        cur.execute("ALTER TABLE stage.dc_mw_imputation_run ALTER COLUMN algorithm SET NOT NULL;")
        cur.execute("ALTER TABLE stage.dc_mw_imputation_run ALTER COLUMN params SET NOT NULL;")
        cur.execute("ALTER TABLE stage.dc_mw_imputation_run ALTER COLUMN metrics SET NOT NULL;")
        cur.execute("ALTER TABLE stage.dc_mw_imputation_run ALTER COLUMN is_current SET NOT NULL;")
        cur.execute("ALTER TABLE stage.dc_mw_imputation_run ALTER COLUMN is_current SET DEFAULT FALSE;")

        cur.execute("""
        CREATE UNIQUE INDEX IF NOT EXISTS ux_dc_mw_imputation_run_current
          ON stage.dc_mw_imputation_run (is_current) WHERE is_current;
        """)

        cur.execute("""
        CREATE TABLE IF NOT EXISTS stage.dc_mw_imputation_result (
          run_id UUID NOT NULL REFERENCES stage.dc_mw_imputation_run(run_id) ON DELETE CASCADE,
          facility_id TEXT NOT NULL,
          mw_imputed NUMERIC NOT NULL,
          mw_pred NUMERIC,
          diagnostics JSONB NOT NULL DEFAULT '{}'::jsonb,
          inserted_at TIMESTAMPTZ NOT NULL DEFAULT now(),
          PRIMARY KEY (run_id, facility_id)
        );
        """)

        cur.execute("""ALTER TABLE stage.dc_mw_imputation_result ADD COLUMN IF NOT EXISTS mw_imputed NUMERIC;""")
        cur.execute("""ALTER TABLE stage.dc_mw_imputation_result ADD COLUMN IF NOT EXISTS mw_pred NUMERIC;""")
        cur.execute("""ALTER TABLE stage.dc_mw_imputation_result ADD COLUMN IF NOT EXISTS diagnostics JSONB;""")
        cur.execute("""ALTER TABLE stage.dc_mw_imputation_result ADD COLUMN IF NOT EXISTS inserted_at TIMESTAMPTZ;""")

        cur.execute("""
        UPDATE stage.dc_mw_imputation_result
        SET
          diagnostics = COALESCE(diagnostics, '{}'::jsonb),
          inserted_at = COALESCE(inserted_at, now()),
          mw_imputed = COALESCE(
              mw_imputed,
              mw_pred,
              NULLIF((diagnostics->>'mw_final'), '')::numeric,
              NULLIF((diagnostics->>'mw_pred'), '')::numeric
          )
        WHERE mw_imputed IS NULL
           OR diagnostics IS NULL
           OR inserted_at IS NULL;
        """)

        # Enforce constraints
        cur.execute("ALTER TABLE stage.dc_mw_imputation_result ALTER COLUMN mw_imputed SET NOT NULL;")
        cur.execute("ALTER TABLE stage.dc_mw_imputation_result ALTER COLUMN diagnostics SET NOT NULL;")
        cur.execute("ALTER TABLE stage.dc_mw_imputation_result ALTER COLUMN inserted_at SET NOT NULL;")
        cur.execute("ALTER TABLE stage.dc_mw_imputation_result ALTER COLUMN inserted_at SET DEFAULT now();")

        # ----------------------------
        # 1) Insert as the current run
        # ----------------------------
        cur.execute("UPDATE stage.dc_mw_imputation_run SET is_current = FALSE WHERE is_current = TRUE;")

        cur.execute(
            """
            INSERT INTO stage.dc_mw_imputation_run (run_id, run_ts, algorithm, params, metrics, is_current)
            VALUES (%s, %s, %s, %s::jsonb, %s::jsonb, TRUE);
            """,
            (run_id, run_ts, best_algo, json.dumps(algo_params), json.dumps(run_metrics)),
        )

        # ----------------------------
        # 2) Upsert imputed results (INCLUDE mw_imputed)
        # ----------------------------
        rows = [
            (
                run_id,
                str(r["facility_id"]),
                float(r["mw_final"]),  # <- satisfies mw_imputed NOT NULL
                float(r["mw_pred"]) if pd.notna(r["mw_pred"]) else None,
                json.dumps(r["diagnostics"]),
            )
            for _, r in df_imp.iterrows()
        ]

        cur.executemany(
            """
            INSERT INTO stage.dc_mw_imputation_result
              (run_id, facility_id, mw_imputed, mw_pred, diagnostics)
            VALUES (%s, %s, %s, %s, %s::jsonb)
            ON CONFLICT (run_id, facility_id) DO UPDATE
            SET mw_imputed = EXCLUDED.mw_imputed,
                mw_pred    = EXCLUDED.mw_pred,
                diagnostics= EXCLUDED.diagnostics,
                inserted_at= now();
            """,
            rows,
        )

    conn.commit()

print("Inserted imputation run:", run_id, "rows:", len(df_imp))


Inserted imputation run: f45cdb3a-68da-45f5-a8d3-4d654fbbc71b rows: 331


## Build staging tables from raw sources (SQL transforms)

I execute staged SQL transforms to convert raw landing tables into clean, analysis-aligned staging tables (operating facilities, retail state-month-sector, and operational controls). Keeping these transforms in SQL makes the logic centralized, testable, and easier to refresh end-to-end.


In [40]:
SQL_TRANSFORMS_DIR = SQL_DIR / "transforms"
stage_sql_files = [
    "06_stage__datacenters_operating.sql",
    "07_stage__retail_state_month_sector.sql",
    "08_stage__ops_fuel_and_controls.sql",
]

def run_sql_file(path: Path):
    sql_text = path.read_text(encoding="utf-8")
    sql_text = re.sub(r"(?im)^\s*BEGIN;\s*$", "", sql_text)
    sql_text = re.sub(r"(?im)^\s*COMMIT;\s*$", "", sql_text)
    sql_text = re.sub(r"%(?![sbt%])", "%%", sql_text)

    with engine.begin() as conn:
        conn.exec_driver_sql(sql_text)

for f in stage_sql_files:
    p = SQL_TRANSFORMS_DIR / f
    print("Running:", p)
    run_sql_file(p)

print("Stage refresh complete.")

Running: C:\Users\amand\OneDrive\Documents\School\Assignments\WGU\Data-Engineering\Capstone\datacenter-electricity-prices-us\sql\transforms\06_stage__datacenters_operating.sql
Running: C:\Users\amand\OneDrive\Documents\School\Assignments\WGU\Data-Engineering\Capstone\datacenter-electricity-prices-us\sql\transforms\07_stage__retail_state_month_sector.sql
Running: C:\Users\amand\OneDrive\Documents\School\Assignments\WGU\Data-Engineering\Capstone\datacenter-electricity-prices-us\sql\transforms\08_stage__ops_fuel_and_controls.sql
Stage refresh complete.


## Stage validation: facility counts and MW totals

I compute state-level aggregates (facility counts and total MW) from the staged operating facilities table as a reasonableness check. This is a quick “does it pass the smell test?” checkpoint before I generate the curated mart and modeling dataset.


In [41]:
dc_stage = pd.read_sql("""
SELECT
  state_code,
  COUNT(*) AS facilities,
  SUM(CASE WHEN mw_final IS NULL THEN 1 ELSE 0 END) AS mw_final_nulls,
  SUM(COALESCE(mw_final, 0)) AS total_mw
FROM stage.data_centers_operating
GROUP BY 1
ORDER BY total_mw DESC NULLS LAST;
""", engine)

dc_stage.head(10)


Unnamed: 0,state_code,facilities,mw_final_nulls,total_mw
0,VA,202,0,10575.357169
1,TX,53,0,6833.246667
2,GA,82,0,4638.98818
3,IN,3,0,2239.275342
4,TN,5,0,2028.83378
5,OH,11,0,1547.522475
6,CA,7,0,855.772646
7,MS,1,0,819.0
8,OK,3,0,770.056653
9,PA,42,0,660.420047


## QA checkpoint output structure

I create a timestamped QA output directory for this run so all validation tables and summaries are preserved as run artifacts. This run-isolated structure makes debugging and reporting much easier because outputs never overwrite prior checkpoints.


In [42]:
REPORT_RUN_TS = datetime.now().strftime("%Y%m%d_%H%M%S")
QA_OUT_DIR = Path("reports") / "qa" / f"checkpoint_{REPORT_RUN_TS}"
QA_OUT_DIR.mkdir(parents=True, exist_ok=True)

print("QA checkpoint outputs ->", QA_OUT_DIR)

QA checkpoint outputs -> reports\qa\checkpoint_20260110_174056


## Curated mart build (dimensions, facts, and analysis view)

I execute the mart SQL in a controlled order to load dimensions, load facts, and refresh the analysis materialized view. This produces a curated, analytics-ready layer designed for consistent joins and stable modeling at the state-month grain.


In [43]:
marts_dir = SQL_DIR / "transforms"

mart_sql_files = [
    "09_mart__load_dimensions.sql",
    "10_mart__load_facts.sql",
    "11_mart__mv_analysis_refresh.sql",
]

_PCT = re.compile(r"%(?![sbt%])", flags=re.IGNORECASE)

def run_sql_file(path: Path):
    if not path.exists():
        raise FileNotFoundError(f"Missing SQL file: {path.resolve()}")

    sql_text = path.read_text(encoding="utf-8-sig")

    statements = _split_sql_statements(sql_text)

    print(f"Running: {path.name}  (statements: {len(statements)})")
    with engine.begin() as conn:
        for k, stmt in enumerate(statements, start=1):
            try:
                conn.exec_driver_sql(stmt)
            except Exception as e:
                preview = (stmt[:400] + " ...") if len(stmt) > 400 else stmt
                raise RuntimeError(
                    f"SQL failed in {path.name} (statement {k}/{len(statements)}):\n{preview}"
                ) from e


# Run mart scripts in the required order
for fname in mart_sql_files:
    print(fname)
    run_sql_file(marts_dir / fname)

print("✅ MART load complete (dimensions, facts, analysis MV).")

09_mart__load_dimensions.sql
Running: 09_mart__load_dimensions.sql  (statements: 13)
10_mart__load_facts.sql
Running: 10_mart__load_facts.sql  (statements: 9)
11_mart__mv_analysis_refresh.sql
Running: 11_mart__mv_analysis_refresh.sql  (statements: 8)
✅ MART load complete (dimensions, facts, analysis MV).


## QA assertion framework

I define reusable QA assertions (row counts, uniqueness, null-rate thresholds, non-negativity) plus a helper to write outputs to the checkpoint folder. These checks formalize data quality expectations and turn silent failures into explicit, actionable errors.


In [44]:
def qdf(sql: str, params: dict | None = None) -> pd.DataFrame:
    return pd.read_sql(text(sql), engine, params=params or {})

def scalar(sql: str, params: dict | None = None):
    df = qdf(sql, params)
    return df.iat[0, 0] if not df.empty else None

def assert_rowcount(table: str, min_rows: int = 1, max_rows: int | None = None):
    n = scalar(f"SELECT COUNT(*) FROM {table};")
    if n is None:
        raise AssertionError(f"{table}: unable to read rowcount")
    if n < min_rows:
        raise AssertionError(f"{table}: rowcount {n} < min {min_rows}")
    if max_rows is not None and n > max_rows:
        raise AssertionError(f"{table}: rowcount {n} > max {max_rows}")
    return int(n)

def assert_unique_key(table: str, key_cols: list[str]):
    cols = ", ".join(key_cols)
    dup = scalar(f"""
        SELECT COUNT(*) FROM (
          SELECT {cols}, COUNT(*) c
          FROM {table}
          GROUP BY {cols}
          HAVING COUNT(*) > 1
        ) d;
    """)
    if dup and int(dup) > 0:
        raise AssertionError(f"{table}: found {dup} duplicate key rows on ({cols})")
    return 0

def assert_null_rate(table: str, col: str, max_null_pct: float):
    pct = scalar(f"""
        SELECT 100.0 * AVG(CASE WHEN {col} IS NULL THEN 1 ELSE 0 END) FROM {table};
    """)
    pct = float(pct) if pct is not None else 0.0
    if pct > max_null_pct:
        raise AssertionError(f"{table}.{col}: null% {pct:.2f} > allowed {max_null_pct:.2f}")
    return pct

def assert_nonnegative(table: str, col: str):
    bad = scalar(f"SELECT COUNT(*) FROM {table} WHERE {col} < 0;")
    bad = int(bad or 0)
    if bad > 0:
        raise AssertionError(f"{table}.{col}: {bad} rows are negative")
    return 0

def save_df(df: pd.DataFrame, filename: str):
    out = QA_OUT_DIR / filename
    df.to_csv(out, index=False)
    print("Wrote:", out)

## Dimension QA validation

I validate dimension tables (state, month, sector, and optional fuel group) for expected row counts and key uniqueness. These checks ensure the star schema behaves correctly and prevents downstream duplication in fact joins.


In [45]:
qa_rows = []

# dim_state
n_states = assert_rowcount("mart.dim_state", min_rows=40, max_rows=60)
assert_unique_key("mart.dim_state", ["state_code"])
qa_rows.append(("mart.dim_state", "rowcount", n_states))

# dim_month
n_months = assert_rowcount("mart.dim_month", min_rows=12, max_rows=120)
assert_unique_key("mart.dim_month", ["month_date"])
qa_rows.append(("mart.dim_month", "rowcount", n_months))

# Check month coverage
month_bounds = qdf("""
SELECT MIN(month_date) AS min_month, MAX(month_date) AS max_month
FROM mart.dim_month;
""")
save_df(month_bounds, "dim_month_bounds.csv")
display(month_bounds)

# dim_retail_sector
n_sectors = assert_rowcount("mart.dim_retail_sector", min_rows=3, max_rows=10)
assert_unique_key("mart.dim_retail_sector", ["sector_code"])
qa_rows.append(("mart.dim_retail_sector", "rowcount", n_sectors))

# dim_fuel_group 
try:
    n_fuel_groups = assert_rowcount("mart.dim_fuel_group", min_rows=4, max_rows=20)
    assert_unique_key("mart.dim_fuel_group", ["fuel_group"])
    qa_rows.append(("mart.dim_fuel_group", "rowcount", n_fuel_groups))
except Exception as e:
    print("Note: mart.dim_fuel_group check skipped:", e)

qa_summary = pd.DataFrame(qa_rows, columns=["table", "metric", "value"])
save_df(qa_summary, "qa_dim_summary.csv")
display(qa_summary)


Wrote: reports\qa\checkpoint_20260110_174056\dim_month_bounds.csv


Unnamed: 0,min_month,max_month
0,2023-01-01,2025-10-01


Note: mart.dim_fuel_group check skipped: (psycopg.errors.UndefinedTable) relation "mart.dim_fuel_group" does not exist
LINE 1: SELECT COUNT(*) FROM mart.dim_fuel_group;
                             ^
[SQL: SELECT COUNT(*) FROM mart.dim_fuel_group;]
(Background on this error at: https://sqlalche.me/e/20/f405)
Wrote: reports\qa\checkpoint_20260110_174056\qa_dim_summary.csv


Unnamed: 0,table,metric,value
0,mart.dim_state,rowcount,51
1,mart.dim_month,rowcount,34
2,mart.dim_retail_sector,rowcount,6


## Fact QA validation

I validate core fact tables for row counts, uniqueness at the intended grain, and numeric constraints (e.g., non-negative sales/revenue/customers). This confirms that my curated measures can safely support modeling without structural join artifacts.


In [46]:
qa_fact = []

# fact_retail_price
n_fr = assert_rowcount("mart.fact_retail_price", min_rows=500)
assert_unique_key("mart.fact_retail_price", ["state_sk", "month_sk", "sector_sk"])

# sanity checks
assert_nonnegative("mart.fact_retail_price", "price_cents_kwh")
assert_nonnegative("mart.fact_retail_price", "sales_mkwh")
assert_nonnegative("mart.fact_retail_price", "revenue_musd")
assert_nonnegative("mart.fact_retail_price", "customers")

# null thresholds
qa_fact.append(("mart.fact_retail_price", "rowcount", n_fr))
qa_fact.append(("mart.fact_retail_price.price_cents_kwh", "null_pct", assert_null_rate("mart.fact_retail_price", "price_cents_kwh", max_null_pct=20.0)))
qa_fact.append(("mart.fact_retail_price.sales_mkwh", "null_pct", assert_null_rate("mart.fact_retail_price", "sales_mkwh", max_null_pct=20.0)))

# fact_dc_state
n_dc = assert_rowcount("mart.fact_dc_state", min_rows=1, max_rows=100)
assert_unique_key("mart.fact_dc_state", ["state_sk"])
assert_nonnegative("mart.fact_dc_state", "dc_count")
assert_nonnegative("mart.fact_dc_state", "total_mw")
assert_nonnegative("mart.fact_dc_state", "total_sqft")
qa_fact.append(("mart.fact_dc_state", "rowcount", n_dc))

qa_fact_df = pd.DataFrame(qa_fact, columns=["item", "metric", "value"])
save_df(qa_fact_df, "qa_fact_summary.csv")
display(qa_fact_df)


Wrote: reports\qa\checkpoint_20260110_174056\qa_fact_summary.csv


Unnamed: 0,item,metric,value
0,mart.fact_retail_price,rowcount,10404.0
1,mart.fact_retail_price.price_cents_kwh,null_pct,16.666667
2,mart.fact_retail_price.sales_mkwh,null_pct,16.666667
3,mart.fact_dc_state,rowcount,30.0


## Control-variable QA (generation mix shares)

I validate that derived generation mix shares are well-formed (bounded between 0 and 1) and that the controls table has sufficient coverage. This protects the regression stage from subtle math errors that can occur during aggregation.


In [47]:
n_controls = assert_rowcount("stage.ops_state_month_controls", min_rows=500)

# shares should be between 0 and 1
share_checks = qdf("""
SELECT
  100.0 * AVG(CASE WHEN share_coal   IS NULL THEN 1 ELSE 0 END) AS null_share_coal_pct,
  100.0 * AVG(CASE WHEN share_gas    IS NULL THEN 1 ELSE 0 END) AS null_share_gas_pct,
  100.0 * AVG(CASE WHEN share_nuclear IS NULL THEN 1 ELSE 0 END) AS null_share_nuclear_pct,
  100.0 * AVG(CASE WHEN share_renew  IS NULL THEN 1 ELSE 0 END) AS null_share_renew_pct,
  SUM(CASE WHEN share_coal   < 0 OR share_coal   > 1 THEN 1 ELSE 0 END) AS out_of_range_coal,
  SUM(CASE WHEN share_gas    < 0 OR share_gas    > 1 THEN 1 ELSE 0 END) AS out_of_range_gas,
  SUM(CASE WHEN share_nuclear < 0 OR share_nuclear > 1 THEN 1 ELSE 0 END) AS out_of_range_nuclear,
  SUM(CASE WHEN share_renew  < 0 OR share_renew  > 1 THEN 1 ELSE 0 END) AS out_of_range_renew
FROM stage.ops_state_month_controls;
""")
save_df(share_checks, "qa_controls_share_checks.csv")
display(share_checks)

# Assert no out-of-range shares
for col in ["out_of_range_coal","out_of_range_gas","out_of_range_nuclear","out_of_range_renew"]:
    if int(share_checks.at[0, col]) > 0:
        raise AssertionError(f"stage.ops_state_month_controls: {col} = {share_checks.at[0, col]}")

print("✅ Controls shares look valid. Rows:", n_controls)


Wrote: reports\qa\checkpoint_20260110_174056\qa_controls_share_checks.csv


Unnamed: 0,null_share_coal_pct,null_share_gas_pct,null_share_nuclear_pct,null_share_renew_pct,out_of_range_coal,out_of_range_gas,out_of_range_nuclear,out_of_range_renew
0,3.633218,3.633218,3.633218,3.633218,0,0,0,0


✅ Controls shares look valid. Rows: 1734


## Analysis view QA (panel structure)

I validate that the analysis view has the expected state-month uniqueness and sufficient coverage to behave like a proper panel dataset. I also check null thresholds on key modeling fields so the regression dataset remains stable and defensible.


In [48]:
# 1) MV exists and has rows
n_mv = assert_rowcount("mart.mv_analysis_state_month", min_rows=200)

# 2) Uniqueness at analysis grain
assert_unique_key("mart.mv_analysis_state_month", ["state_code", "month_date"])

# 3) Column null thresholds for modeling fields
assert_null_rate("mart.mv_analysis_state_month", "res_price_cents_kwh", max_null_pct=5.0)

# Exposure: total_mw may be null if no DC rows exist in fact_dc_state for that state
null_total_mw_pct = scalar("""
SELECT 100.0 * AVG(CASE WHEN total_mw IS NULL THEN 1 ELSE 0 END)
FROM mart.mv_analysis_state_month;
""")
print("total_mw null%:", float(null_total_mw_pct))

# Controls: allow some null% (EIA ops might be missing in rare rows)
for c in ["generation_mwh_total", "fuel_cost_proxy", "share_gas", "share_coal"]:
    pct = scalar(f"""
    SELECT 100.0 * AVG(CASE WHEN {c} IS NULL THEN 1 ELSE 0 END)
    FROM mart.mv_analysis_state_month;
    """)
    print(f"{c} null%:", float(pct))

# 4) Coverage vs expected state-month panel (diagnostic)
expected_panel = n_states * n_months

coverage = qdf("""
SELECT
  COUNT(*) AS actual_rows,
  COUNT(DISTINCT state_code) AS states_present,
  COUNT(DISTINCT month_date) AS months_present
FROM mart.mv_analysis_state_month;
""")
coverage["expected_state_month"] = expected_panel
coverage["coverage_pct"] = 100.0 * coverage["actual_rows"] / expected_panel
save_df(coverage, "qa_mv_coverage.csv")
display(coverage)

print("✅ MV analysis rowcount:", n_mv)


total_mw null%: 41.1764705882353
generation_mwh_total null%: 0.0
fuel_cost_proxy null%: 0.0
share_gas null%: 3.633217993079585
share_coal null%: 3.633217993079585
Wrote: reports\qa\checkpoint_20260110_174056\qa_mv_coverage.csv


Unnamed: 0,actual_rows,states_present,months_present,expected_state_month,coverage_pct
0,1734,51,34,1734,100.0


✅ MV analysis rowcount: 1734


## Create a model-ready dataset snapshot

I pull the full analysis dataset from the materialized view, coerce numeric fields, and write a timestamped parquet snapshot. This locks the exact modeling input for the run and makes the regression step reproducible even if I later refresh the mart.


In [49]:
df_model = qdf("""
SELECT *
FROM mart.mv_analysis_state_month
ORDER BY state_code, month_date;
""")

numeric_cols = [
    "res_price_cents_kwh","res_sales_mkwh","res_customers",
    "dc_count","total_mw","total_sqft","avg_mw","mw_per_100k_sqft",
    "generation_mwh_total","share_coal","share_gas","share_nuclear","share_renew",
    "fuel_cost_proxy","dc_mw_per_res_mkwh","ln_total_mw"
]
for c in numeric_cols:
    if c in df_model.columns:
        df_model[c] = pd.to_numeric(df_model[c], errors="coerce")

out_path = Path("data") / "interim" / f"mv_analysis_state_month_{REPORT_RUN_TS}.parquet"
out_path.parent.mkdir(parents=True, exist_ok=True)
df_model.to_parquet(out_path, index=False)
print("Wrote:", out_path, "shape:", df_model.shape)


Wrote: data\interim\mv_analysis_state_month_20260110_174056.parquet shape: (1734, 18)


## Regression run orchestration and provenance

I set up a run-specific output folder structure (tables/figures/text/data/logs) and write a run manifest that captures timestamp and (when available) the current git commit SHA. This makes the statistical results traceable to the exact code + dataset snapshot used.


In [50]:
logger = logging.getLogger("capstone.regression")
if not logger.handlers:
    logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s")

# --- Paths (run-isolated) ---
PROJECT_ROOT = Path.cwd()
OUT_ROOT = PROJECT_ROOT / "outputs" / "regression"
RUN_ID = datetime.now().strftime("%Y%m%d_%H%M%S")
RUN_DIR = OUT_ROOT / f"run_{RUN_ID}"

DIRS = {
    "tables": RUN_DIR / "tables",
    "figures": RUN_DIR / "figures",
    "text": RUN_DIR / "text",
    "data": RUN_DIR / "data",
    "logs": RUN_DIR / "logs",
}
for p in DIRS.values():
    p.mkdir(parents=True, exist_ok=True)

logger.info(f"Regression run directory: {RUN_DIR}")

# --- DB engine expectation ---
try:
    engine  # noqa: F821
except NameError as e:
    raise RuntimeError(
        "SQLAlchemy `engine` is not defined yet. Ensure your earlier DB setup cell ran "
        "(the one that creates `engine = sa.create_engine(...)`)."
    ) from e

def assert_true(cond: bool, msg: str) -> None:
    if not cond:
        raise AssertionError(msg)

def safe_write_text(path: Path, content: str) -> None:
    path.write_text(content, encoding="utf-8")

def safe_to_parquet(df: pd.DataFrame, path: Path) -> Path:
    """Write parquet if available; otherwise fall back to CSV and return the written path."""
    try:
        df.to_parquet(path, index=False)
        return path
    except Exception:
        csv_path = path.with_suffix(".csv")
        df.to_csv(csv_path, index=False)
        return csv_path

def git_head_sha() -> str | None:
    try:
        sha = subprocess.check_output(["git", "rev-parse", "HEAD"], cwd=PROJECT_ROOT, text=True).strip()
        return sha
    except Exception:
        return None

manifest = {
    "run_id": RUN_ID,
    "created_at": datetime.now().isoformat(timespec="seconds"),
    "project_root": str(PROJECT_ROOT),
    "git_sha": git_head_sha(),
}
safe_write_text(DIRS["text"] / "run_manifest.json", json.dumps(manifest, indent=2))

2026-01-10 17:40:58,945 | INFO | Regression run directory: C:\Users\amand\OneDrive\Documents\School\Assignments\WGU\Data-Engineering\Capstone\datacenter-electricity-prices-us\notebooks\outputs\regression\run_20260110_174058


## Optional mart refresh control

I include an explicit switch to refresh the mart/materialized view from SQL using psql when needed. This gives me a clean separation between “data build” and “modeling run,” while keeping refresh behavior intentional rather than implicit.


In [51]:
REFRESH_MART = False  
SQL_DIR = PROJECT_ROOT / "sql"
sql_files = [
    SQL_DIR / "09_mart__load_dimensions.sql",
    SQL_DIR / "10_mart__load_facts.sql",
    SQL_DIR / "11_mart__mv_analysis_refresh.sql",
]

def run_psql_file(path: Path) -> None:
    """
    Executes a SQL file with psql, stopping on first error.
    Uses PG* env vars (do not print secrets).
    """
    assert_true(path.exists(), f"SQL file not found: {path}")

    cmd = [
        "psql",
        "-h", PGHOST,
        "-p", str(PGPORT),
        "-U", PGUSER,
        "-d", PGDATABASE,
        "-v", "ON_ERROR_STOP=1",
        "-f", str(path),
    ]
    env = os.environ.copy()
    env["PGPASSWORD"] = PGPASSWORD

    logger.info(f"psql -f {path.name}")
    proc = subprocess.run(cmd, env=env, capture_output=True, text=True)
    if proc.returncode != 0:
        raise RuntimeError(f"psql failed for {path.name}:\n{proc.stderr}")
    if proc.stderr.strip():
        logger.warning(proc.stderr.strip())

if REFRESH_MART:
    for f in sql_files:
        run_psql_file(f)
    logger.info("✅ Mart load + MV refresh complete.")
else:
    logger.info("REFRESH_MART=False → skipping mart refresh.")

2026-01-10 17:40:59,166 | INFO | REFRESH_MART=False → skipping mart refresh.


## Load regression input from the curated mart

I query the analysis view for the specific modeling fields I need and write a run-local parquet snapshot. I also include a defensive fallback for the exposure measure to ensure the regression can proceed even if the view structure evolves.


In [52]:
# =========================
# Pull model-ready dataset from mart.mv_analysis_state_month
# =========================

QUERY = """
SELECT
  state_code,
  month_date,
  ym,
  res_price_cents_kwh,
  res_sales_mkwh,
  res_customers,
  dc_count,
  total_mw,
  total_sqft,
  avg_mw,
  mw_per_100k_sqft,
  generation_mwh_total,
  share_coal,
  share_gas,
  share_nuclear,
  share_renew,
  fuel_cost_proxy,
  dc_mw_per_res_mkwh
FROM mart.mv_analysis_state_month
ORDER BY state_code, month_date;
"""

df = pd.read_sql_query(text(QUERY), engine, parse_dates=["month_date"])

# Robust fallback if MV ever lacks exposure
if "dc_mw_per_res_mkwh" not in df.columns and {"total_mw", "res_sales_mkwh"}.issubset(df.columns):
    df["dc_mw_per_res_mkwh"] = np.where(
        (df["res_sales_mkwh"].isna()) | (df["res_sales_mkwh"] == 0),
        np.nan,
        df["total_mw"] / df["res_sales_mkwh"]
    )

written = safe_to_parquet(df, DIRS["data"] / "mv_analysis_state_month.parquet")
logger.info(f"Wrote dataset snapshot: {written}")
logger.info(f"mv_analysis_state_month shape: {df.shape}")
df.head()


2026-01-10 17:40:59,334 | INFO | Wrote dataset snapshot: C:\Users\amand\OneDrive\Documents\School\Assignments\WGU\Data-Engineering\Capstone\datacenter-electricity-prices-us\notebooks\outputs\regression\run_20260110_174058\data\mv_analysis_state_month.parquet
2026-01-10 17:40:59,336 | INFO | mv_analysis_state_month shape: (1734, 18)


Unnamed: 0,state_code,month_date,ym,res_price_cents_kwh,res_sales_mkwh,res_customers,dc_count,total_mw,total_sqft,avg_mw,mw_per_100k_sqft,generation_mwh_total,share_coal,share_gas,share_nuclear,share_renew,fuel_cost_proxy,dc_mw_per_res_mkwh
0,AK,2023-01-01,2023-01,21.73,230.4979,295151.0,,,,,,569121.67,0.069071,0.514055,0.0,0.264194,0.426905,0.0
1,AK,2023-02-01,2023-02,22.92,178.92844,294835.0,,,,,,517944.72,0.044791,0.54009,0.0,0.286934,0.826461,0.0
2,AK,2023-03-01,2023-03,23.45,196.24384,295298.0,,,,,,533502.11,0.087022,0.526931,0.0,0.270481,0.825519,0.0
3,AK,2023-04-01,2023-04,23.44,166.71102,294846.0,,,,,,487187.82,0.07907,0.524473,0.0,0.240792,0.826277,0.0
4,AK,2023-05-01,2023-05,24.91,151.96408,295873.0,,,,,,441835.35,0.081966,0.488562,0.0,0.259393,0.762815,0.0


## Regression dataset QA

Before modeling, I enforce required columns, confirm uniqueness at the state-month grain, and apply null-rate thresholds for the outcome, exposure, and controls. This checkpoint ensures the regression results reflect data issues transparently rather than masking them in the estimator.


In [None]:
# =========================
# Checkpoint QA: grain, uniqueness, null thresholds, and domain checks
# =========================

required_cols = [
    "state_code", "month_date",
    "res_price_cents_kwh",
    "dc_mw_per_res_mkwh",
    "generation_mwh_total",
    "share_coal", "share_gas", "share_nuclear",
    "fuel_cost_proxy",
]
missing = [c for c in required_cols if c not in df.columns]
assert_true(not missing, f"Missing required columns in MV: {missing}")

dup = int(df.duplicated(subset=["state_code", "month_date"]).sum())
assert_true(dup == 0, f"Duplicate (state_code, month_date) rows in MV: {dup}")

assert_true(df["state_code"].astype(str).str.fullmatch(r"[A-Z]{2}").all(), "state_code invalid.")
assert_true(pd.to_datetime(df["month_date"], errors="coerce").notna().all(), "month_date unparsable.")
assert_true((df["res_price_cents_kwh"].dropna() > 0).all(), "Found non-positive prices.")

for c in ["share_coal", "share_gas", "share_nuclear", "share_renew"]:
    s = df[c].dropna()
    if len(s):
        bad = ((s < 0) | (s > 1)).sum()
        assert_true(bad == 0, f"{c} has {bad} values outside [0,1].")

null_rates = df.isna().mean().sort_values(ascending=False)
null_rates.to_csv(DIRS["tables"] / "qa_null_rates.csv")

NULL_THRESH = {
    "res_price_cents_kwh": 0.05,
    "dc_mw_per_res_mkwh": 0.25,
    "generation_mwh_total": 0.10,
    "fuel_cost_proxy": 0.25,
    "share_coal": 0.25,
    "share_gas": 0.25,
    "share_nuclear": 0.25,
}

for col, thresh in NULL_THRESH.items():
    rate = float(df[col].isna().mean())
    assert_true(rate <= thresh, f"Null rate too high for {col}: {rate:.2%} > {thresh:.2%}")

qa_summary = {
    "rows": int(df.shape[0]),
    "cols": int(df.shape[1]),
    "states": int(df["state_code"].nunique()),
    "months": int(df["month_date"].nunique()),
    "null_thresholds": NULL_THRESH,
    "null_rates": {k: float(df[k].isna().mean()) for k in NULL_THRESH},
}
safe_write_text(DIRS["text"] / "qa_summary.json", json.dumps(qa_summary, indent=2))

logger.info("✅ QA checks passed.")


## Descriptive summary for the analysis story (Table 1)

I produce a Table 1 style descriptive summary of the model variables, both overall and split by whether a state has any operating data centers. This sets context for the regression by showing baseline distributions and helps readers interpret effect sizes.


In [54]:
# =========================
# Table 1: model-ready descriptive summary (overall + by DC presence)
# =========================

outcome = "res_price_cents_kwh"
exposure = "dc_mw_per_res_mkwh"

df1 = df.copy()
df1["ln_generation_mwh_total"] = np.log1p(df1["generation_mwh_total"])
df1["has_dc_state"] = (df1["dc_count"].fillna(0) > 0).astype(int)

model_vars = [
    outcome,
    exposure,
    "ln_generation_mwh_total",
    "share_coal", "share_gas", "share_nuclear",
    "fuel_cost_proxy",
]

def _summarize_series(x: pd.Series) -> pd.Series:
    return pd.Series({
        "n": int(x.notna().sum()),
        "pct_missing": float(100.0 * x.isna().mean()),
        "mean": float(x.mean()) if x.notna().any() else np.nan,
        "std": float(x.std()) if x.notna().any() else np.nan,
        "min": float(x.min()) if x.notna().any() else np.nan,
        "p25": float(x.quantile(0.25)) if x.notna().any() else np.nan,
        "median": float(x.median()) if x.notna().any() else np.nan,
        "p75": float(x.quantile(0.75)) if x.notna().any() else np.nan,
        "max": float(x.max()) if x.notna().any() else np.nan,
    })

def build_table1(dfin: pd.DataFrame, vars_: list[str], group_col: str) -> pd.DataFrame:
    blocks = []
    overall = pd.concat({v: _summarize_series(dfin[v]) for v in vars_}, axis=1).T
    overall.insert(0, "group", "Overall")
    blocks.append(overall)

    for gval, gname in [(0, "No DC in state"), (1, "Has DC in state")]:
        sub = dfin[dfin[group_col] == gval]
        tab = pd.concat({v: _summarize_series(sub[v]) for v in vars_}, axis=1).T
        tab.insert(0, "group", gname)
        blocks.append(tab)

    out = pd.concat(blocks, axis=0)
    out.insert(0, "variable", out.index)
    out.reset_index(drop=True, inplace=True)
    return out

table1 = build_table1(df1, model_vars, "has_dc_state")
table1_path = DIRS["tables"] / "table1_model_ready.csv"
table1.to_csv(table1_path, index=False)
logger.info(f"Wrote Table 1: {table1_path}")

table1.head(12)


2026-01-10 17:40:59,570 | INFO | Wrote Table 1: C:\Users\amand\OneDrive\Documents\School\Assignments\WGU\Data-Engineering\Capstone\datacenter-electricity-prices-us\notebooks\outputs\regression\run_20260110_174058\tables\table1_model_ready.csv


Unnamed: 0,variable,group,n,pct_missing,mean,std,min,p25,median,p75,max
0,res_price_cents_kwh,Overall,1734.0,0.0,17.244769,6.44207,9.27,13.1525,14.91,18.3975,45.39
1,dc_mw_per_res_mkwh,Overall,1734.0,0.0,0.167903,0.459301,0.0,0.0,0.009027,0.114525,4.101417
2,ln_generation_mwh_total,Overall,1734.0,0.0,13.392357,3.726264,0.0,13.085443,14.895369,15.562025,17.078796
3,share_coal,Overall,1671.0,3.633218,0.220587,0.258375,0.0,0.0,0.127965,0.358885,0.981543
4,share_gas,Overall,1671.0,3.633218,0.357086,0.268646,0.0,0.131545,0.327905,0.527069,0.999086
5,share_nuclear,Overall,1671.0,3.633218,0.110718,0.167583,0.0,0.0,0.0,0.226358,0.638601
6,fuel_cost_proxy,Overall,1734.0,0.0,-0.010775,0.512534,-1.164549,-0.19673,-0.003175,0.11112,11.39013
7,res_price_cents_kwh,No DC in state,714.0,0.0,18.856611,7.964229,9.75,13.05,15.5,23.9175,45.39
8,dc_mw_per_res_mkwh,No DC in state,714.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
9,ln_generation_mwh_total,No DC in state,714.0,0.0,11.49356,4.79428,0.0,9.132993,13.55082,14.839594,17.078796


## Prepare panel structure for fixed-effects regression

I construct the panel dataset, clean numeric fields, drop rows missing required modeling variables, and winsorize the exposure to reduce sensitivity to extreme values. I then set a MultiIndex (state, month) so fixed effects can be absorbed cleanly by the estimator.


In [55]:
# =========================
# Prepare panel dataset (MultiIndex) + mild winsorization of exposure
# =========================

reg = df1[["state_code", "month_date"] + model_vars].copy()

for c in model_vars:
    reg[c] = pd.to_numeric(reg[c], errors="coerce")
reg = reg.replace([np.inf, -np.inf], np.nan)

reg = reg.dropna(subset=[outcome])

def winsorize(s: pd.Series, p: float = 0.01) -> pd.Series:
    lo = s.quantile(p)
    hi = s.quantile(1 - p)
    return s.clip(lower=lo, upper=hi)

reg[exposure + "_w"] = winsorize(reg[exposure], p=0.01)

needed = [exposure + "_w", "ln_generation_mwh_total", "share_coal", "share_gas", "share_nuclear", "fuel_cost_proxy"]
before = len(reg)
reg = reg.dropna(subset=needed)
logger.info(f"Rows kept for regression: {len(reg):,} / {before:,}")

within_sd = reg.groupby("state_code")[exposure + "_w"].std()
assert_true((within_sd.fillna(0) > 0).any(), "Exposure constant within every state → will be absorbed by FE.")

reg = reg.set_index(["state_code", "month_date"]).sort_index()

written = safe_to_parquet(reg.reset_index(), DIRS["data"] / "panel_regression_dataset.parquet")
logger.info(f"Wrote modeling dataset snapshot: {written}")

reg.head()


2026-01-10 17:40:59,634 | INFO | Rows kept for regression: 1,671 / 1,734
2026-01-10 17:40:59,684 | INFO | Wrote modeling dataset snapshot: C:\Users\amand\OneDrive\Documents\School\Assignments\WGU\Data-Engineering\Capstone\datacenter-electricity-prices-us\notebooks\outputs\regression\run_20260110_174058\data\panel_regression_dataset.parquet


Unnamed: 0_level_0,Unnamed: 1_level_0,res_price_cents_kwh,dc_mw_per_res_mkwh,ln_generation_mwh_total,share_coal,share_gas,share_nuclear,fuel_cost_proxy,dc_mw_per_res_mkwh_w
state_code,month_date,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
AK,2023-01-01,21.73,0.0,13.251851,0.069071,0.514055,0.0,0.426905,0.0
AK,2023-02-01,22.92,0.0,13.157626,0.044791,0.54009,0.0,0.826461,0.0
AK,2023-03-01,23.45,0.0,13.18722,0.087022,0.526931,0.0,0.825519,0.0
AK,2023-04-01,23.44,0.0,13.096407,0.07907,0.524473,0.0,0.826277,0.0
AK,2023-05-01,24.91,0.0,12.998695,0.081966,0.488562,0.0,0.762815,0.0


## Primary model: two-way fixed effects panel regression

I fit a PanelOLS model with state fixed effects and month fixed effects, using robust clustering as my primary standard-error specification. I export a clean coefficient table (with confidence intervals) so results can be reported consistently and reused in the automated PDF output.


In [56]:
# =========================
# Fit PanelOLS: state FE + month FE (primary: 2-way clustered SE)
# =========================

formula = (
    f"{outcome} ~ 1 "
    f"+ {exposure}_w "
    "+ ln_generation_mwh_total "
    "+ share_coal + share_gas + share_nuclear "
    "+ fuel_cost_proxy "
    "+ EntityEffects + TimeEffects"
)

logger.info(f"PanelOLS formula: {formula}")

mod = PanelOLS.from_formula(
    formula,
    data=reg,
    drop_absorbed=True,
    check_rank=True,
)

res_cluster = mod.fit(cov_type="clustered", cluster_entity=True, cluster_time=True)

print(res_cluster.summary)
safe_write_text(DIRS["text"] / "panelols_summary_clustered.txt", str(res_cluster.summary))

try:
    res_kernel = mod.fit(cov_type="kernel", cov_config={"kernel": "bartlett", "bandwidth": 6})
    safe_write_text(DIRS["text"] / "panelols_summary_kernel_bartlett_bw6.txt", str(res_kernel.summary))
except Exception as e:
    res_kernel = None
    logger.warning(f"Kernel covariance fit failed (skipping): {e}")

coef_tbl = pd.DataFrame({
    "param": res_cluster.params.index,
    "coef": res_cluster.params.astype(float).values,
    "std_err": res_cluster.std_errors.astype(float).values,
    "t_stat": res_cluster.tstats.astype(float).values,
    "p_value": res_cluster.pvalues.astype(float).values,
})
ci = res_cluster.conf_int().astype(float)
coef_tbl["ci_95_low"] = ci.iloc[:, 0].values
coef_tbl["ci_95_high"] = ci.iloc[:, 1].values

coef_path = DIRS["tables"] / "panelols_coefficients_clustered.csv"
coef_tbl.to_csv(coef_path, index=False)
logger.info(f"Wrote coefficients: {coef_path}")

coef_tbl.head(12)


2026-01-10 17:40:59,727 | INFO | PanelOLS formula: res_price_cents_kwh ~ 1 + dc_mw_per_res_mkwh_w + ln_generation_mwh_total + share_coal + share_gas + share_nuclear + fuel_cost_proxy + EntityEffects + TimeEffects
2026-01-10 17:41:00,210 | INFO | Wrote coefficients: C:\Users\amand\OneDrive\Documents\School\Assignments\WGU\Data-Engineering\Capstone\datacenter-electricity-prices-us\notebooks\outputs\regression\run_20260110_174058\tables\panelols_coefficients_clustered.csv


                           PanelOLS Estimation Summary                           
Dep. Variable:     res_price_cents_kwh   R-squared:                        0.0209
Estimator:                    PanelOLS   R-squared (Between):             -0.1883
No. Observations:                 1671   R-squared (Within):               0.0087
Date:                 Sat, Jan 10 2026   R-squared (Overall):             -0.1356
Time:                         17:40:59   Log-likelihood                   -2451.9
Cov. Estimator:              Clustered                                           
                                         F-statistic:                      5.6315
Entities:                           51   P-value                           0.0000
Avg Obs:                        32.765   Distribution:                  F(6,1581)
Min Obs:                        6.0000                                           
Max Obs:                        34.000   F-statistic (robust):             1.1335
                

Unnamed: 0,param,coef,std_err,t_stat,p_value,ci_95_low,ci_95_high
0,Intercept,12.453402,3.275539,3.80194,0.000149,6.028546,18.878259
1,dc_mw_per_res_mkwh_w,0.867527,0.540843,1.604028,0.108908,-0.193317,1.928371
2,ln_generation_mwh_total,0.351158,0.258651,1.35765,0.174769,-0.156178,0.858494
3,share_coal,-0.422601,1.446734,-0.292107,0.770243,-3.260321,2.415118
4,share_gas,-1.027313,0.93354,-1.100449,0.271304,-2.85842,0.803794
5,share_nuclear,-0.995976,1.113411,-0.894527,0.371176,-3.179893,1.187941
6,fuel_cost_proxy,0.149625,0.158596,0.943438,0.345601,-0.161455,0.460706


## Hypothesis test and interpretable effect size

I evaluate the one-sided hypothesis on the exposure coefficient (β > 0) and record the decision at my chosen alpha level. To make the result actionable, I translate the coefficient into an interpretable scenario (e.g., moving from the 25th to 75th percentile exposure and computing the implied change in cents/kWh with a 95% CI).


In [57]:
# =========================
# Automated hypothesis test (one-sided) + interpretable effect size
# =========================

ALPHA = 0.05
param = f"{exposure}_w"
assert_true(param in res_cluster.params.index, f"Exposure param '{param}' not found.")

beta = float(res_cluster.params[param])
se = float(res_cluster.std_errors[param])
t = float(res_cluster.tstats[param])
p_two = float(res_cluster.pvalues[param])

p_one = (p_two / 2.0) if beta >= 0 else (1.0 - p_two / 2.0)
reject = bool((beta > 0) and (p_one < ALPHA))

x = reg[param].dropna()
p25, p75 = float(x.quantile(0.25)), float(x.quantile(0.75))
delta_x = p75 - p25
delta_price = beta * delta_x

ci = res_cluster.conf_int().loc[param].astype(float)
beta_lo, beta_hi = float(ci.iloc[0]), float(ci.iloc[1])
delta_lo, delta_hi = beta_lo * delta_x, beta_hi * delta_x

hyp = {
    "alpha": ALPHA,
    "hypothesis": "H0: beta <= 0 vs H1: beta > 0",
    "param": param,
    "beta": beta,
    "std_error": se,
    "t_stat": t,
    "p_two_sided": p_two,
    "p_one_sided_beta_gt_0": p_one,
    "reject_H0": reject,
    "nobs": int(res_cluster.nobs),
    "states": int(reg.reset_index()["state_code"].nunique()),
    "months": int(reg.reset_index()["month_date"].nunique()),
    "exposure_p25": p25,
    "exposure_p75": p75,
    "exposure_delta_p75_minus_p25": delta_x,
    "implied_delta_price_cents_per_kwh": delta_price,
    "implied_delta_price_ci95_low": delta_lo,
    "implied_delta_price_ci95_high": delta_hi,
}

safe_write_text(DIRS["tables"] / "hypothesis_test.json", json.dumps(hyp, indent=2))

logger.info(f"Decision: {'REJECT' if reject else 'FAIL TO REJECT'} | p_one={p_one:.4g} | beta={beta:.6g}")
hyp


2026-01-10 17:41:00,262 | INFO | Decision: FAIL TO REJECT | p_one=0.05445 | beta=0.867527


{'alpha': 0.05,
 'hypothesis': 'H0: beta <= 0 vs H1: beta > 0',
 'param': 'dc_mw_per_res_mkwh_w',
 'beta': 0.8675269341103441,
 'std_error': 0.5408426001594229,
 't_stat': 1.604028480475882,
 'p_two_sided': 0.1089075064893541,
 'p_one_sided_beta_gt_0': 0.05445375324467705,
 'reject_H0': False,
 'nobs': 1671,
 'states': 51,
 'months': 34,
 'exposure_p25': 0.0,
 'exposure_p75': 0.11924634016614011,
 'exposure_delta_p75_minus_p25': 0.11924634016614011,
 'implied_delta_price_cents_per_kwh': 0.1034494118882107,
 'implied_delta_price_ci95_low': -0.023052371250940774,
 'implied_delta_price_ci95_high': 0.2299511950273622}

## Residual diagnostics (quantitative checks)

I compute fitted values and residuals and summarize residual behavior numerically, including lag-1 autocorrelation patterns across states. This gives me quick evidence about whether error structure might motivate clustered or kernel/HAC-style standard errors.


In [58]:
# =========================
# Residual diagnostics (figures + numeric summaries)
# =========================

fitted = res_cluster.fitted_values
resid = res_cluster.resids

fitted_s = fitted.iloc[:, 0] if isinstance(fitted, pd.DataFrame) else fitted
resid_s = resid.iloc[:, 0] if isinstance(resid, pd.DataFrame) else resid

diag = pd.DataFrame({"fitted": fitted_s.astype(float), "resid": resid_s.astype(float)}).reset_index()
diag = diag.rename(columns={"entity": "state_code", "time": "month_date"})

actual = reg[[outcome]].reset_index().rename(columns={outcome: "actual"})
diag = diag.merge(actual, on=["state_code", "month_date"], how="left")

written = safe_to_parquet(diag, DIRS["data"] / "diagnostics_fitted_resid.parquet")
logger.info(f"Wrote diagnostics dataset: {written}")

plt.figure()
plt.hist(diag["resid"].dropna(), bins=40)
plt.title("Residual distribution (PanelOLS)")
plt.tight_layout()
plt.savefig(DIRS["figures"] / "residual_hist.png", dpi=200)
plt.show()

plt.figure()
stats.probplot(diag["resid"].dropna(), dist="norm", plot=plt)
plt.title("QQ plot of residuals")
plt.tight_layout()
plt.savefig(DIRS["figures"] / "residual_qq.png", dpi=200)
plt.show()

plt.figure()
plt.scatter(diag["fitted"], diag["resid"], s=10)
plt.axhline(0, linewidth=1)
plt.title("Residuals vs fitted")
plt.tight_layout()
plt.savefig(DIRS["figures"] / "residual_vs_fitted.png", dpi=200)
plt.show()

by_month = diag.groupby("month_date", as_index=False)["resid"].mean().sort_values("month_date")
plt.figure()
plt.plot(by_month["month_date"], by_month["resid"])
plt.axhline(0, linewidth=1)
plt.title("Mean residual over time")
plt.tight_layout()
plt.savefig(DIRS["figures"] / "mean_residual_over_time.png", dpi=200)
plt.show()



monthly_mean_resid = by_month.set_index("month_date")["resid"].dropna()

if len(monthly_mean_resid) >= 3:
    lags = min(24, len(monthly_mean_resid) - 1)  # e.g., up to 2 years of monthly lags
    fig, ax = plt.subplots()
    plot_acf(monthly_mean_resid, lags=lags, zero=False, ax=ax)
    ax.axhline(0, linewidth=1)
    ax.set_title("ACF: monthly mean residuals")
    fig.tight_layout()
    fig.savefig(DIRS["figures"] / "acf_monthly_mean_residual.png", dpi=200)
    plt.show()
else:
    logger.warning("Skipping ACF plot: not enough monthly residual points.")


diag_sorted = diag.sort_values(["state_code", "month_date"]).copy()
diag_sorted["resid_lag1"] = diag_sorted.groupby("state_code")["resid"].shift(1)

def safe_corr(g: pd.DataFrame) -> float:
    g2 = g.dropna(subset=["resid", "resid_lag1"])
    if len(g2) < 3:
        return np.nan
    return float(np.corrcoef(g2["resid"], g2["resid_lag1"])[0, 1])

ac1 = diag_sorted.groupby("state_code").apply(safe_corr).dropna()
diag_numeric = {
    "resid_mean": float(diag["resid"].mean()),
    "resid_std": float(diag["resid"].std()),
    "lag1_autocorr_mean_across_states": float(ac1.mean()) if len(ac1) else None,
    "lag1_autocorr_median_across_states": float(ac1.median()) if len(ac1) else None,
    "n_states_with_lag1_corr": int(len(ac1)),
}
safe_write_text(DIRS["tables"] / "residual_diagnostics_numeric.json", json.dumps(diag_numeric, indent=2))

pd.Series(diag_numeric)


2026-01-10 17:41:00,327 | INFO | Wrote diagnostics dataset: C:\Users\amand\OneDrive\Documents\School\Assignments\WGU\Data-Engineering\Capstone\datacenter-electricity-prices-us\notebooks\outputs\regression\run_20260110_174058\data\diagnostics_fitted_resid.parquet


resid_mean                            1.103127e-14
resid_std                             1.049913e+00
lag1_autocorr_mean_across_states      6.706805e-01
lag1_autocorr_median_across_states    6.946309e-01
n_states_with_lag1_corr               5.100000e+01
dtype: float64

## Run narrative for reporting

I automatically generate a concise narrative summary of the hypothesis test result, key coefficient values, and the practical effect-size translation. Writing this to disk as an artifact ensures the final report language matches the exact run outputs.


In [59]:
# =========================
# Auto-generated narrative block
# =========================

def fmt(x, digits=4):
    if x is None:
        return "NA"
    try:
        xf = float(x)
        if np.isnan(xf):
            return "NA"
        return f"{xf:.{digits}f}"
    except Exception:
        return str(x)

decision_txt = "reject" if hyp["reject_H0"] else "fail to reject"

narrative = f"""
Regression & Hypothesis Testing (PanelOLS)

Model:
Two-way fixed effects panel regression with state fixed effects and month fixed effects.
Outcome: residential price (cents/kWh).
Exposure: data center load intensity (MW per million kWh of residential sales).
Controls: log(1+generation), fuel mix shares (renew omitted baseline), fuel cost proxy.

Estimation:
- PanelOLS with EntityEffects + TimeEffects
- SEs: two-way clustered by state and month
- N={hyp["nobs"]} across {hyp["states"]} states and {hyp["months"]} months

Result:
beta={fmt(hyp["beta"], 6)} (SE={fmt(hyp["std_error"], 6)})
two-sided p={fmt(hyp["p_two_sided"], 6)}
one-sided p={fmt(hyp["p_one_sided_beta_gt_0"], 6)} → {decision_txt} H0 at alpha={hyp["alpha"]}

Effect size (P25→P75 exposure):
Δx={fmt(hyp["exposure_delta_p75_minus_p25"], 6)} implies Δprice≈{fmt(hyp["implied_delta_price_cents_per_kwh"], 4)} cents/kWh
(95% CI≈[{fmt(hyp["implied_delta_price_ci95_low"], 4)}, {fmt(hyp["implied_delta_price_ci95_high"], 4)}])

Artifacts written to:
{RUN_DIR}
""".strip()

safe_write_text(DIRS["text"] / "results_narrative.txt", narrative)
print(narrative)


Regression & Hypothesis Testing (PanelOLS)

Model:
Two-way fixed effects panel regression with state fixed effects and month fixed effects.
Outcome: residential price (cents/kWh).
Exposure: data center load intensity (MW per million kWh of residential sales).
Controls: log(1+generation), fuel mix shares (renew omitted baseline), fuel cost proxy.

Estimation:
- PanelOLS with EntityEffects + TimeEffects
- SEs: two-way clustered by state and month
- N=1671 across 51 states and 34 months

Result:
beta=0.867527 (SE=0.540843)
two-sided p=0.108908
one-sided p=0.054454 → fail to reject H0 at alpha=0.05

Effect size (P25→P75 exposure):
Δx=0.119246 implies Δprice≈0.1034 cents/kWh
(95% CI≈[-0.0231, 0.2300])

Artifacts written to:
C:\Users\amand\OneDrive\Documents\School\Assignments\WGU\Data-Engineering\Capstone\datacenter-electricity-prices-us\notebooks\outputs\regression\run_20260110_174058


## Sensitivity checks (robustness of inference)

I re-fit the same model under multiple covariance specifications (robust, clustered, kernel/HAC-style) and export a comparison table. This is a targeted robustness step to show whether inference about the exposure coefficient is stable to reasonable assumptions about the error structure.


In [60]:
# =========================
# Sensitivity specs
# =========================

specs = [
    ("robust", dict(cov_type="robust")),
    ("cluster_state", dict(cov_type="clustered", cluster_entity=True, cluster_time=False)),
    ("cluster_state_month", dict(cov_type="clustered", cluster_entity=True, cluster_time=True)),
    ("kernel_bartlett_bw6", dict(cov_type="kernel", cov_config={"kernel": "bartlett", "bandwidth": 6})),
]

rows = []
for name, kwargs in specs:
    try:
        r = mod.fit(**kwargs)
        rows.append({
            "spec": name,
            "beta": float(r.params.get(f"{exposure}_w", np.nan)),
            "std_err": float(r.std_errors.get(f"{exposure}_w", np.nan)),
            "p_value_two_sided": float(r.pvalues.get(f"{exposure}_w", np.nan)),
            "nobs": int(r.nobs),
        })
    except Exception as e:
        rows.append({"spec": name, "beta": np.nan, "std_err": np.nan, "p_value_two_sided": np.nan, "nobs": np.nan, "error": str(e)})

sens = pd.DataFrame(rows)
sens_path = DIRS["tables"] / "panelols_sensitivity.csv"
sens.to_csv(sens_path, index=False)
logger.info(f"Wrote sensitivity table: {sens_path}")

sens


2026-01-10 17:41:03,394 | INFO | Wrote sensitivity table: C:\Users\amand\OneDrive\Documents\School\Assignments\WGU\Data-Engineering\Capstone\datacenter-electricity-prices-us\notebooks\outputs\regression\run_20260110_174058\tables\panelols_sensitivity.csv


Unnamed: 0,spec,beta,std_err,p_value_two_sided,nobs,error
0,robust,0.867527,0.230868,0.000178,1671.0,
1,cluster_state,0.867527,0.550125,0.115004,1671.0,
2,cluster_state_month,0.867527,0.540843,0.108908,1671.0,
3,kernel_bartlett_bw6,,,,,Covariance estimator DriscollKraay only suppor...


## Automated PDF report assembly (setup)

I locate the most recent regression run folder and define the paths to the tables, figures, text artifacts, and dataset snapshot. This design keeps reporting decoupled from modeling: the report is assembled from saved artifacts, not from in-memory state.


In [61]:
# =============================================================================
# PDF Builder 
# =============================================================================
_outputs_root = Path.cwd() / "outputs" / "regression"

def _pick_latest_run_dir(root: Path) -> Path:
    if not root.exists():
        raise FileNotFoundError(f"Outputs root not found: {root}")
    runs = sorted([p for p in root.glob("run_*") if p.is_dir()])
    if not runs:
        raise FileNotFoundError(
            f"No run_* folders found under: {root}. "
            "Run the regression section first to generate artifacts."
        )
    return runs[-1]

try:
    REPORT_RUN_DIR = Path(RUN_DIR)
except NameError:
    REPORT_RUN_DIR = _pick_latest_run_dir(_outputs_root)

REPORT_RUN_DIR = Path(REPORT_RUN_DIR).resolve()
if not REPORT_RUN_DIR.exists():
    raise FileNotFoundError(f"REPORT_RUN_DIR does not exist: {REPORT_RUN_DIR}")

REPORT_TABLES = REPORT_RUN_DIR / "tables"
REPORT_FIGS   = REPORT_RUN_DIR / "figures"
REPORT_TEXT   = REPORT_RUN_DIR / "text"
REPORT_DATA   = REPORT_RUN_DIR / "data"

# Ensure expected folders exist
for _p in [REPORT_TABLES, REPORT_FIGS, REPORT_TEXT, REPORT_DATA]:
    _p.mkdir(parents=True, exist_ok=True)

OUT_PDF = REPORT_RUN_DIR / "report_regression_results.pdf"

print("REPORT_RUN_DIR:", REPORT_RUN_DIR)
print("OUT_PDF:", OUT_PDF)


REPORT_RUN_DIR: C:\Users\amand\OneDrive\Documents\School\Assignments\WGU\Data-Engineering\Capstone\datacenter-electricity-prices-us\notebooks\outputs\regression\run_20260110_174058
OUT_PDF: C:\Users\amand\OneDrive\Documents\School\Assignments\WGU\Data-Engineering\Capstone\datacenter-electricity-prices-us\notebooks\outputs\regression\run_20260110_174058\report_regression_results.pdf


## Generate the final regression PDF (artifact-driven)

I compile the run artifacts into a polished PDF report: executive summary narrative, Table 1, coefficient table, model summary, and diagnostic figures. Building the report from saved tables/figures/text makes the output reproducible and aligns the final narrative with the exact results generated by the run.


In [62]:
def safe_read_text(path: Path, default: str = "") -> str:
    try:
        return path.read_text(encoding="utf-8")
    except Exception:
        return default

def safe_read_csv(path: Path) -> pd.DataFrame:
    try:
        return pd.read_csv(path)
    except Exception:
        return pd.DataFrame()

def fmt_num(x):
    if pd.isna(x):
        return ""
    if isinstance(x, (int, np.integer)):
        return f"{int(x):,}"
    if isinstance(x, (float, np.floating)):
        ax = abs(float(x))
        if ax >= 1000:
            return f"{float(x):,.2f}"
        if ax >= 10:
            return f"{float(x):.3f}"
        return f"{float(x):.4f}"
    return str(x)

def df_to_rl_table(df: pd.DataFrame, doc_width: float, max_rows: int = 60) -> Table:
    """Convert a DataFrame to a ReportLab Table with safe sizing."""
    if df.empty:
        return Table([["(No data found)"]])

    df2 = df.copy()

    # Trim long tables so the PDF stays readable
    if len(df2) > max_rows:
        df2 = df2.iloc[:max_rows].copy()
        df2.loc[len(df2)] = ["…"] + ["…"] * (df2.shape[1] - 1)

    data = [list(df2.columns)]
    for _, row in df2.iterrows():
        data.append([fmt_num(v) for v in row.values])

    ncols = len(df2.columns)
    col_widths = [doc_width / ncols] * ncols

    tbl = Table(data, repeatRows=1, colWidths=col_widths)
    tbl.hAlign = "LEFT"

    tbl.setStyle(TableStyle([
        ("FONTNAME", (0,0), (-1,0), "Helvetica-Bold"),
        ("FONTSIZE", (0,0), (-1,0), 8),
        ("BACKGROUND", (0,0), (-1,0), colors.HexColor("#f1f5f9")),
        ("TEXTCOLOR", (0,0), (-1,0), colors.HexColor("#0f172a")),
        ("LINEBELOW", (0,0), (-1,0), 1, colors.HexColor("#cbd5e1")),

        ("FONTNAME", (0,1), (-1,-1), "Helvetica"),
        ("FONTSIZE", (0,1), (-1,-1), 7),
        ("TEXTCOLOR", (0,1), (-1,-1), colors.HexColor("#111827")),

        ("GRID", (0,0), (-1,-1), 0.25, colors.HexColor("#e2e8f0")),
        ("VALIGN", (0,0), (-1,-1), "TOP"),
        ("LEFTPADDING", (0,0), (-1,-1), 3),
        ("RIGHTPADDING", (0,0), (-1,-1), 3),
        ("TOPPADDING", (0,0), (-1,-1), 2),
        ("BOTTOMPADDING", (0,0), (-1,-1), 2),
    ]))

    # Right-align numeric-looking columns (heuristic)
    for c in range(1, ncols):
        col_vals = [str(v) for v in df2.iloc[:, c].head(15).values]
        numericish = sum(bool(re.match(r"^\s*[-+]?\d[\d,]*\.?\d*\s*$", v)) for v in col_vals) >= max(5, len(col_vals)//2)
        if numericish:
            tbl.setStyle(TableStyle([("ALIGN", (c,1), (c,-1), "RIGHT")]))

    return tbl

def add_fig(story, img_path: Path, caption: str, max_width_in: float = 6.7):
    if not img_path.exists():
        story.append(Paragraph(f"<b>Figure:</b> Missing image: {img_path.name}", styles["Body"]))
        story.append(Spacer(1, 0.15*inch))
        return
    img = Image(str(img_path))
    max_w = max_width_in * inch
    if img.drawWidth > max_w:
        scale = max_w / img.drawWidth
        img.drawWidth *= scale
        img.drawHeight *= scale
    story.append(img)
    story.append(Spacer(1, 0.08*inch))
    story.append(Paragraph(f"<i>{caption}</i>", styles["Caption"]))
    story.append(Spacer(1, 0.20*inch))

# ---------- Styles ----------
base = getSampleStyleSheet()
styles = {
    "Title": ParagraphStyle("Title", parent=base["Title"], fontName="Helvetica-Bold", fontSize=20, leading=24,
                           textColor=colors.HexColor("#0f172a"), spaceAfter=12),
    "H1": ParagraphStyle("H1", parent=base["Heading1"], fontName="Helvetica-Bold", fontSize=14, leading=18,
                        textColor=colors.HexColor("#0f172a"), spaceBefore=10, spaceAfter=8),
    "Body": ParagraphStyle("Body", parent=base["BodyText"], fontName="Helvetica", fontSize=9.5, leading=13,
                          textColor=colors.HexColor("#111827"), spaceAfter=6),
    "Caption": ParagraphStyle("Caption", parent=base["BodyText"], fontName="Helvetica-Oblique", fontSize=8.5,
                             leading=11, textColor=colors.HexColor("#334155"), spaceAfter=8),
    "Mono": ParagraphStyle("Mono", parent=base["BodyText"], fontName="Courier", fontSize=7.5, leading=9.5,
                          textColor=colors.HexColor("#0b1220")),
    "Meta": ParagraphStyle("Meta", parent=base["BodyText"], fontName="Helvetica", fontSize=9, leading=12,
                          textColor=colors.HexColor("#334155")),
}

PROJECT_TITLE = "Quantifying the Association Between Data Center Presence and Retail Electricity Prices"
SUBTITLE = "Regression & Hypothesis Testing Report (Automated Output)"
CREATED_AT = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

# ---------- Load artifacts ----------
table1_path = REPORT_TABLES / "table1_model_ready.csv"
coef_path   = REPORT_TABLES / "panelols_coefficients_clustered.csv"
narr_path   = REPORT_TEXT   / "results_narrative.txt"
sum_path    = REPORT_TEXT   / "panelols_summary_clustered.txt"

df_table1 = safe_read_csv(table1_path)
df_coef   = safe_read_csv(coef_path)
narrative = safe_read_text(narr_path, default="(results narrative not found)")
summary_txt = safe_read_text(sum_path, default="(model summary not found)")

# ---------- Header/Footer ----------
def _header_footer(canvas, doc):
    canvas.saveState()
    canvas.setFont("Helvetica", 8)
    canvas.setFillColor(colors.HexColor("#475569"))
    canvas.drawString(0.75*inch, 0.55*inch, f"{PROJECT_TITLE}  •  Generated {CREATED_AT}")
    canvas.drawRightString(7.75*inch, 0.55*inch, f"Page {canvas.getPageNumber()}")
    canvas.restoreState()

# ---------- Build PDF ----------
OUT_PDF.parent.mkdir(parents=True, exist_ok=True)

doc = SimpleDocTemplate(
    str(OUT_PDF),
    pagesize=LETTER,
    leftMargin=0.85*inch,
    rightMargin=0.85*inch,
    topMargin=0.85*inch,
    bottomMargin=0.85*inch,
    title=PROJECT_TITLE,
    author="WGU Capstone (Automated Notebook Output)",
)

story = []

# Cover
story.append(Paragraph(PROJECT_TITLE, styles["Title"]))
story.append(Paragraph(SUBTITLE, styles["Meta"]))
story.append(Spacer(1, 0.15*inch))
story.append(Paragraph(f"<b>Artifact folder:</b> {REPORT_RUN_DIR}", styles["Meta"]))
story.append(Paragraph(
    "This PDF is generated automatically from the notebook run artifacts (tables, figures, and model output). "
    "It is intended to be a polished, reproducible input to the final written capstone report.",
    styles["Body"]
))
story.append(PageBreak())

# Executive Summary / Narrative
story.append(Paragraph("Executive Summary", styles["H1"]))
for para in [p.strip() for p in narrative.split("\n\n") if p.strip()]:
    story.append(Paragraph(para.replace("\n", "<br/>"), styles["Body"]))
story.append(PageBreak())

# Table 1
story.append(Paragraph("Table 1. Model-Ready Descriptive Summary", styles["H1"]))
story.append(Spacer(1, 0.10*inch))
story.append(df_to_rl_table(df_table1, doc.width, max_rows=60))
story.append(Spacer(1, 0.15*inch))
story.append(Paragraph(f"<i>Source:</i> {table1_path.name}", styles["Caption"]))
story.append(PageBreak())

# Regression coefficients
story.append(Paragraph("Panel Regression Results (Coefficients)", styles["H1"]))
story.append(Spacer(1, 0.10*inch))
story.append(df_to_rl_table(df_coef, doc.width, max_rows=40))
story.append(Spacer(1, 0.15*inch))
story.append(Paragraph(f"<i>Source:</i> {coef_path.name}", styles["Caption"]))
story.append(PageBreak())

# Model summary
story.append(Paragraph("Model Summary (PanelOLS)", styles["H1"]))
story.append(Spacer(1, 0.10*inch))
story.append(Preformatted(summary_txt, styles["Mono"]))
story.append(PageBreak())

# Figures
story.append(Paragraph("Diagnostics Figures", styles["H1"]))
story.append(Spacer(1, 0.10*inch))

fig_items = [
    ("residual_hist.png", "Figure 1. Histogram of regression residuals."),
    ("residual_qq.png", "Figure 2. Q-Q plot of residuals (normality check)."),
    ("residual_vs_fitted.png", "Figure 3. Residuals vs fitted values."),
    ("mean_residual_over_time.png", "Figure 4. Mean residual over time."),
    ("acf_monthly_mean_residual.png", "Figure 5. ACF of monthly mean residuals."),
]

for fname, cap in fig_items:
    add_fig(story, REPORT_FIGS / fname, cap)

doc.build(story, onFirstPage=_header_footer, onLaterPages=_header_footer)

# Post-build verification
assert OUT_PDF.exists(), f"PDF build did not produce output file: {OUT_PDF}"
print(f"✅ PDF generated: {OUT_PDF}")
print("PDF size (MB):", round(OUT_PDF.stat().st_size / (1024**2), 2))

✅ PDF generated: C:\Users\amand\OneDrive\Documents\School\Assignments\WGU\Data-Engineering\Capstone\datacenter-electricity-prices-us\notebooks\outputs\regression\run_20260110_174058\report_regression_results.pdf
PDF size (MB): 0.31
