diff --git a/.github/workflows/tox-pytest.yml b/.github/workflows/tox-pytest.yml index e143ea2611..831ff3f48d 100644 --- a/.github/workflows/tox-pytest.yml +++ b/.github/workflows/tox-pytest.yml @@ -6,6 +6,7 @@ on: [pull_request] env: PUDL_OUTPUT: /home/runner/pudl-work/output PUDL_INPUT: /home/runner/pudl-work/data/ + DAGSTER_HOME: /home/runner/pudl-work/dagster_home/ jobs: ci-static: @@ -145,8 +146,8 @@ jobs: path: ${{ env.PUDL_INPUT }} key: zenodo-datastore-${{ hashFiles('datastore-dois.txt') }} - - name: Make cache/output dirs - run: mkdir -p ${{ env.PUDL_OUTPUT }} ${{ env.PUDL_INPUT}} + - name: Make input, output and dagster dirs + run: mkdir -p ${{ env.PUDL_OUTPUT }} ${{ env.PUDL_INPUT}} ${{ env.DAGSTER_HOME }} - name: List workspace contents run: find /home/runner/pudl-work diff --git a/docs/release_notes.rst b/docs/release_notes.rst index 3f9b6e33eb..8d001e9685 100644 --- a/docs/release_notes.rst +++ b/docs/release_notes.rst @@ -61,6 +61,11 @@ Dagster Adoption * FERC 714 extraction methods are now subsettable by year, with 2019 and 2020 data included in the ``etl_fast.yml`` by default. See :issue:`2628` and PR :pr:`2649`. +* Census DP1 ETL changes: + + * :mod:`pudl.convert.censusdp1tract_to_sqlite` and :mod:`pudl.output.censusdp1tract` + are now integrated into dagster. See :issue:`1973` and :pr:`2621`. + Data Coverage ^^^^^^^^^^^^^ @@ -149,11 +154,13 @@ Data Coverage * A couple of tables from :doc:`data_sources/ferc714` have been added to the PUDL DB. These tables contain data from 2006-2020 (2021 is distributed by FERC in XBRL format - and we have not yet integrated it). See :issue:`2266` & :pr:`2421`. The newly - accessible tables include: + and we have not yet integrated it). See :issue:`2266`, :pr:`2421` and :pr:`2550`. + The newly accessible tables include: * :ref:`respondent_id_ferc714` (linking FERC-714 respondents to EIA utilities) * :ref:`demand_hourly_pa_ferc714` (hourly electricity demand by planning area) + * :ref:`fipsified_respondents_ferc714` (annual respondents with county FIPS IDs) + * :ref:`summarized_demand_ferc714` (annual demand for FERC-714 respondents) * Added new table :ref:`epacamd_eia_subplant_ids`, which aguments the :ref:`epacamd_eia` glue table. This table incorporates all @@ -226,6 +233,13 @@ Analysis the :ref:`generators_eia860` table so associating those gf and bf records are more cleanly associated with generators. Thanks to :user:`grgmiller` for his contribution, which was integrated by :user:`cmgosnell`! See PRs :pr:`2235,2446`. +* Added outputs from :mod:`pudl.analysis.service_territory` and + :mod:`pudl.analysis.state_demand` into PUDL. These outputs include the US Census + geometries associated with balancing authority and utility data from EIA 861 + (:ref:`compiled_geometry_balancing_authority_eia861` and + :ref:`compiled_geometry_utility_eia861`), and the estimated total hourly electricity + demand for each US state in :ref:`predicted_state_hourly_demand`. See :issue:`1973` + and :pr:`2550`. Deprecations ^^^^^^^^^^^^ diff --git a/migrations/versions/1ec25c296a6d_add_service_terrtory_ferc714_state_.py b/migrations/versions/1ec25c296a6d_add_service_terrtory_ferc714_state_.py new file mode 100644 index 0000000000..f3b0a370cd --- /dev/null +++ b/migrations/versions/1ec25c296a6d_add_service_terrtory_ferc714_state_.py @@ -0,0 +1,99 @@ +"""Add service terrtory, FERC714, state demand assets + +Revision ID: 1ec25c296a6d +Revises: 88d9201ae4c4 +Create Date: 2023-06-16 09:33:08.254754 + +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import sqlite + +# revision identifiers, used by Alembic. +revision = '1ec25c296a6d' +down_revision = 'e608f95a3b78' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('compiled_geometry_balancing_authority_eia861', + sa.Column('county_id_fips', sa.Text(), nullable=False, comment='County ID from the Federal Information Processing Standard Publication 6-4.'), + sa.Column('county_name_census', sa.Text(), nullable=True, comment='County name as specified in Census DP1 Data.'), + sa.Column('population', sa.Float(), nullable=True, comment='County population, sourced from Census DP1 data.'), + sa.Column('area_km2', sa.Float(), nullable=True, comment='County area in km2.'), + sa.Column('report_date', sa.Date(), nullable=False, comment='Date reported.'), + sa.Column('balancing_authority_id_eia', sa.Integer(), nullable=False, comment='EIA balancing authority ID. This is often (but not always!) the same as the utility ID associated with the same legal entity.'), + sa.Column('state', sa.Text(), nullable=True, comment='Two letter US state abbreviation.'), + sa.Column('county', sa.Text(), nullable=False, comment='County name.'), + sa.Column('state_id_fips', sa.Text(), nullable=True, comment='Two digit state FIPS code.'), + sa.PrimaryKeyConstraint('balancing_authority_id_eia', 'report_date', 'county_id_fips', 'county') + ) + op.create_table('compiled_geometry_utility_eia861', + sa.Column('county_id_fips', sa.Text(), nullable=False, comment='County ID from the Federal Information Processing Standard Publication 6-4.'), + sa.Column('county_name_census', sa.Text(), nullable=True, comment='County name as specified in Census DP1 Data.'), + sa.Column('population', sa.Float(), nullable=True, comment='County population, sourced from Census DP1 data.'), + sa.Column('area_km2', sa.Float(), nullable=True, comment='County area in km2.'), + sa.Column('report_date', sa.Date(), nullable=False, comment='Date reported.'), + sa.Column('utility_id_eia', sa.Integer(), nullable=False, comment='The EIA Utility Identification number.'), + sa.Column('state', sa.Text(), nullable=True, comment='Two letter US state abbreviation.'), + sa.Column('county', sa.Text(), nullable=True, comment='County name.'), + sa.Column('state_id_fips', sa.Text(), nullable=True, comment='Two digit state FIPS code.'), + sa.PrimaryKeyConstraint('utility_id_eia', 'report_date', 'county_id_fips') + ) + op.create_table('predicted_state_hourly_demand', + sa.Column('state_id_fips', sa.Text(), nullable=False, comment='Two digit state FIPS code.'), + sa.Column('utc_datetime', sqlite.DATETIME(), nullable=False), + sa.Column('demand_mwh', sa.Float(), nullable=True), + sa.Column('scaled_demand_mwh', sa.Float(), nullable=True, comment='Estimated electricity demand scaled by the total sales within a state.'), + sa.PrimaryKeyConstraint('state_id_fips', 'utc_datetime') + ) + op.create_table('fipsified_respondents_ferc714', + sa.Column('eia_code', sa.Integer(), nullable=True), + sa.Column('respondent_type', sa.Enum('utility', 'balancing_authority'), nullable=True), + sa.Column('respondent_id_ferc714', sa.Integer(), nullable=True), + sa.Column('respondent_name_ferc714', sa.Text(), nullable=True), + sa.Column('report_date', sa.Date(), nullable=True, comment='Date reported.'), + sa.Column('balancing_authority_id_eia', sa.Integer(), nullable=True, comment='EIA balancing authority ID. This is often (but not always!) the same as the utility ID associated with the same legal entity.'), + sa.Column('balancing_authority_code_eia', sa.Text(), nullable=True, comment='EIA short code identifying a balancing authority.'), + sa.Column('balancing_authority_name_eia', sa.Text(), nullable=True, comment='Name of the balancing authority.'), + sa.Column('utility_id_eia', sa.Integer(), nullable=True, comment='The EIA Utility Identification number.'), + sa.Column('utility_name_eia', sa.Text(), nullable=True, comment='The name of the utility.'), + sa.Column('state', sa.Text(), nullable=True, comment='Two letter US state abbreviation.'), + sa.Column('county', sa.Text(), nullable=True, comment='County name.'), + sa.Column('state_id_fips', sa.Text(), nullable=True, comment='Two digit state FIPS code.'), + sa.Column('county_id_fips', sa.Text(), nullable=True, comment='County ID from the Federal Information Processing Standard Publication 6-4.'), + sa.ForeignKeyConstraint(['respondent_id_ferc714'], ['respondent_id_ferc714.respondent_id_ferc714'], ) + ) + op.create_table('summarized_demand_ferc714', + sa.Column('report_date', sa.Date(), nullable=False, comment='Date reported.'), + sa.Column('respondent_id_ferc714', sa.Integer(), nullable=False), + sa.Column('demand_annual_mwh', sa.Float(), nullable=True), + sa.Column('population', sa.Float(), nullable=True, comment='County population, sourced from Census DP1 data.'), + sa.Column('area_km2', sa.Float(), nullable=True, comment='County area in km2.'), + sa.Column('population_density_km2', sa.Float(), nullable=True, comment='Average population per sq. km area of a service territory.'), + sa.Column('demand_annual_per_capita_mwh', sa.Float(), nullable=True, comment='Per-capita annual demand, averaged using Census county-level population estimates.'), + sa.Column('demand_density_mwh_km2', sa.Float(), nullable=True, comment='Annual demand per km2 of a given service territory.'), + sa.Column('eia_code', sa.Integer(), nullable=True), + sa.Column('respondent_type', sa.Enum('utility', 'balancing_authority'), nullable=True), + sa.Column('respondent_name_ferc714', sa.Text(), nullable=True), + sa.Column('balancing_authority_id_eia', sa.Integer(), nullable=True, comment='EIA balancing authority ID. This is often (but not always!) the same as the utility ID associated with the same legal entity.'), + sa.Column('balancing_authority_code_eia', sa.Text(), nullable=True, comment='EIA short code identifying a balancing authority.'), + sa.Column('balancing_authority_name_eia', sa.Text(), nullable=True, comment='Name of the balancing authority.'), + sa.Column('utility_id_eia', sa.Integer(), nullable=True, comment='The EIA Utility Identification number.'), + sa.Column('utility_name_eia', sa.Text(), nullable=True, comment='The name of the utility.'), + sa.ForeignKeyConstraint(['respondent_id_ferc714'], ['respondent_id_ferc714.respondent_id_ferc714'], ), + sa.PrimaryKeyConstraint('respondent_id_ferc714', 'report_date') + ) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_table('summarized_demand_ferc714') + op.drop_table('fipsified_respondents_ferc714') + op.drop_table('predicted_state_hourly_demand') + op.drop_table('compiled_geometry_utility_eia861') + op.drop_table('compiled_geometry_balancing_authority_eia861') + # ### end Alembic commands ### diff --git a/pyproject.toml b/pyproject.toml index a4878069f2..336db603bb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -89,7 +89,6 @@ keywords = [ ] [project.scripts] -censusdp1tract_to_sqlite = "pudl.convert.censusdp1tract_to_sqlite:main" metadata_to_rst = "pudl.convert.metadata_to_rst:main" epacems_to_parquet = "pudl.convert.epacems_to_parquet:main" ferc_to_sqlite = "pudl.ferc_to_sqlite.cli:main" diff --git a/src/pudl/analysis/service_territory.py b/src/pudl/analysis/service_territory.py index 4d9920d82c..2481a0ca62 100644 --- a/src/pudl/analysis/service_territory.py +++ b/src/pudl/analysis/service_territory.py @@ -8,9 +8,13 @@ import argparse import math import sys +from collections.abc import Iterable +from typing import Literal +import geopandas as gpd import pandas as pd import sqlalchemy as sa +from dagster import AssetKey, AssetsDefinition, Field, asset from matplotlib import pyplot as plt import pudl @@ -23,8 +27,12 @@ MAP_CRS = "EPSG:3857" # For mapping w/ OSM baselayer tiles CALC_CRS = "ESRI:102003" # For accurate area calculations +ENTITY_TYPE = {"ba": "balancing_authority", "util": "utility"} -def get_all_utils(pudl_out): + +def utility_ids_all_eia( + denorm_utilities_eia: pd.DataFrame, service_territory_eia861: pd.DataFrame +) -> pd.DataFrame: """Compile IDs and Names of all known EIA Utilities. Grab all EIA utility names and IDs from both the EIA 861 Service Territory table and @@ -33,19 +41,17 @@ def get_all_utils(pudl_out): process and PUDL database yet. Args: - pudl_out (pudl.output.pudltabl.PudlTabl): The PUDL output object which should be - used to obtain PUDL data. + denorm_utilities_eia: De-normalized EIA 860 utility attributes table. + service_territory_eia861: Normalized EIA 861 Service Territory table. Returns: - pandas.DataFrame: Having 2 columns ``utility_id_eia`` and ``utility_name_eia``. + A DataFrame having 2 columns ``utility_id_eia`` and ``utility_name_eia``. """ return ( pd.concat( [ - pudl_out.utils_eia860()[["utility_id_eia", "utility_name_eia"]], - pudl_out.service_territory_eia861()[ - ["utility_id_eia", "utility_name_eia"] - ], + denorm_utilities_eia[["utility_id_eia", "utility_name_eia"]], + service_territory_eia861[["utility_id_eia", "utility_name_eia"]], ] ) .dropna(subset=["utility_id_eia"]) @@ -56,7 +62,13 @@ def get_all_utils(pudl_out): ################################################################################ # Functions that compile geometries based on EIA 861 data tables: ################################################################################ -def get_territory_fips(ids, assn, assn_col, st_eia861, limit_by_state=True): +def get_territory_fips( + ids: Iterable[int], + assn: pd.DataFrame, + assn_col: str, + service_territory_eia861: pd.DataFrame, + limit_by_state: bool = True, +) -> pd.DataFrame: """Compile county FIPS codes associated with an entity's service territory. For each entity identified by ids, look up the set of counties associated @@ -65,21 +77,21 @@ def get_territory_fips(ids, assn, assn_col, st_eia861, limit_by_state=True): within the EIA 861 data. Args: - ids (iterable of ints): A collection of EIA utility or balancing authority IDs. - assn (pandas.DataFrame): Association table, relating ``report_date``, + ids: A collection of EIA utility or balancing authority IDs. + assn: Association table, relating ``report_date``, ``state``, and ``utility_id_eia`` to each other, as well as the column indicated by ``assn_col`` -- if it's not ``utility_id_eia``. - assn_col (str): Label of the dataframe column in ``assn`` that contains + assn_col: Label of the dataframe column in ``assn`` that contains the ID of the entities of interest. Should probably be either ``balancing_authority_id_eia`` or ``utility_id_eia``. - st_eia861 (pandas.DataFrame): The EIA 861 Service Territory table. - limit_by_state (bool): Whether to require that the counties associated + service_territory_eia861: The EIA 861 Service Territory table. + limit_by_state: Whether to require that the counties associated with the balancing authority are inside a state that has also been seen in association with the balancing authority and the utility whose service territory contians the county. Returns: - pandas.DataFrame: A table associating the entity IDs with a collection of + A table associating the entity IDs with a collection of counties annually, identifying counties both by name and county_id_fips (both state and state_id_fips are included for clarity). """ @@ -90,7 +102,7 @@ def get_territory_fips(ids, assn, assn_col, st_eia861, limit_by_state=True): assn = assn.drop("state", axis="columns") return ( - pd.merge(assn, st_eia861, how="inner") + pd.merge(assn, service_territory_eia861, how="inner") .loc[ :, [ @@ -106,7 +118,12 @@ def get_territory_fips(ids, assn, assn_col, st_eia861, limit_by_state=True): ) -def add_geometries(df, census_gdf, dissolve=False, dissolve_by=None): +def add_geometries( + df: pd.DataFrame, + census_gdf: gpd.GeoDataFrame, + dissolve: bool = False, + dissolve_by: list[str] = None, +) -> gpd.GeoDataFrame: """Merge census geometries into dataframe on county_id_fips, optionally dissolving. Merge the US Census county-level geospatial information into the DataFrame df @@ -115,13 +132,13 @@ def add_geometries(df, census_gdf, dissolve=False, dissolve_by=None): summing as necessary in the case of dissolved geometries. Args: - df (pandas.DataFrame): A DataFrame containing a county_id_fips column. + df: A DataFrame containing a county_id_fips column. census_gdf (geopandas.GeoDataFrame): A GeoDataFrame based on the US Census demographic profile (DP1) data at county resolution, with the original column names as published by US Census. - dissolve (bool): If True, dissolve individual county geometries into larger + dissolve: If True, dissolve individual county geometries into larger service territories. - dissolve_by (list): The columns to group by in the dissolve. For example, + dissolve_by: The columns to group by in the dissolve. For example, dissolve_by=["report_date", "utility_id_eia"] might provide annual utility service territories, while ["report_date", "balancing_authority_id_eia"] would provide annual balancing authority territories. @@ -175,8 +192,14 @@ def add_geometries(df, census_gdf, dissolve=False, dissolve_by=None): def get_territory_geometries( - ids, assn, assn_col, st_eia861, census_gdf, limit_by_state=True, dissolve=False -): + ids: Iterable[int], + assn: pd.DataFrame, + assn_col: str, + service_territory_eia861: pd.DataFrame, + census_gdf: gpd.GeoDataFrame, + limit_by_state: bool = True, + dissolve: bool = False, +) -> gpd.GeoDataFrame: """Compile service territory geometries based on county_id_fips. Calls ``get_territory_fips`` to generate the list of counties associated with @@ -193,21 +216,20 @@ def get_territory_geometries( use in many analyses. Dissolving is mostly useful for generating visualizations. Args: - ids (iterable of ints): A collection of EIA balancing authority IDs. - assn (pandas.DataFrame): Association table, relating ``report_date``, + ids: A collection of EIA balancing authority IDs. + assn: Association table, relating ``report_date``, ``state``, and ``utility_id_eia`` to each other, as well as the column indicated by ``assn_col`` -- if it's not ``utility_id_eia``. - assn_col (str): Label of the dataframe column in ``assn`` that contains + assn_col: Label of the dataframe column in ``assn`` that contains the ID of the entities of interest. Should probably be either ``balancing_authority_id_eia`` or ``utility_id_eia``. - st_eia861 (pandas.DataFrame): The EIA 861 Service Territory table. - census_gdf (geopandas.GeoDataFrame): The US Census DP1 county-level geometries - as returned by pudl.output.censusdp1tract.get_layer("county"). - limit_by_state (bool): Whether to require that the counties associated + service_territory_eia861: The EIA 861 Service Territory table. + census_gdf: The US Census DP1 county-level geometries. + limit_by_state: Whether to require that the counties associated with the balancing authority are inside a state that has also been seen in association with the balancing authority and the utility whose service territory contians the county. - dissolve (bool): If False, each record in the compiled territory will correspond + dissolve: If False, each record in the compiled territory will correspond to a single county, with a county-level geometry, and there will be many records enumerating all the counties associated with a given balancing_authority_id_eia in each year. If dissolve=True, all of the @@ -216,13 +238,13 @@ def get_territory_geometries( balancing_authority-year. Returns: - geopandas.GeoDataFrame + A GeoDataFrame with service territory geometries for each entity. """ return get_territory_fips( ids=ids, assn=assn, assn_col=assn_col, - st_eia861=st_eia861, + service_territory_eia861=service_territory_eia861, limit_by_state=limit_by_state, ).pipe( add_geometries, @@ -232,34 +254,42 @@ def get_territory_geometries( ) +def _save_geoparquet(gdf, entity_type, dissolve, limit_by_state): + # For filenames based on input args: + dissolved = "" + if dissolve: + dissolved = "_dissolved" + else: + # States & counties only remain at this point if we didn't dissolve + for col in ("county_id_fips", "state_id_fips"): + # pandas.NA values are not compatible with Parquet Strings yet. + gdf[col] = gdf[col].fillna("") + limited = "" + if limit_by_state: + limited = "_limited" + # Save the geometries to a GeoParquet file + fn = f"{entity_type}_geom{limited+dissolved}.pq" + gdf.to_parquet(fn, index=False) + + def compile_geoms( - pudl_out, - census_counties, - entity_type, # "ba" or "util" - dissolve=False, - limit_by_state=True, - save=True, + balancing_authority_eia861: pd.DataFrame, + balancing_authority_assn_eia861: pd.DataFrame, + denorm_utilities_eia: pd.DataFrame, + service_territory_eia861: pd.DataFrame, + utility_assn_eia861: pd.DataFrame, + census_counties: pd.DataFrame, + entity_type: Literal["ba", "util"], + save_format: Literal["geoparquet", "geodataframe", "dataframe"], + dissolve: bool = False, + limit_by_state: bool = True, ): """Compile all available utility or balancing authority geometries. - Args: - pudl_out (pudl.output.pudltabl.PudlTabl): A PUDL output object, which will - be used to extract and cache the EIA 861 tables. - census_counties (geopandas.GeoDataFrame): A GeoDataFrame containing the county - level US Census DP1 data and county geometries. - entity_type (str): The type of service territory geometry to compile. Must be - either "ba" (balancing authority) or "util" (utility). - dissolve (bool): Whether to dissolve the compiled geometries to the - utility/balancing authority level, or leave them as counties. - limit_by_state (bool): Whether to limit included counties to those with - observed EIA 861 data in association with the state and utility/balancing - authority. - save (bool): If True, save the compiled GeoDataFrame as a GeoParquet file before - returning. Especially useful in the case of dissolved geometries, as they - are computationally expensive. - - Returns: - geopandas.GeoDataFrame + Returns a geoparquet file, geopandas GeoDataFrame or a pandas DataFrame with the + geometry column removed depending on the value of the save_format parameter. By + default, this returns only counties with observed EIA 861 data for a utility or + balancing authority, with geometries available at the county level. """ logger.info( "Compiling %s geometries with dissolve=%s and limit_by_state=%s.", @@ -268,13 +298,17 @@ def compile_geoms( limit_by_state, ) + utilids_all_eia = utility_ids_all_eia( + denorm_utilities_eia, service_territory_eia861 + ) + if entity_type == "ba": - ids = pudl_out.balancing_authority_eia861().balancing_authority_id_eia.unique() - assn = pudl_out.balancing_authority_assn_eia861() + ids = balancing_authority_eia861.balancing_authority_id_eia.unique() + assn = balancing_authority_assn_eia861 assn_col = "balancing_authority_id_eia" elif entity_type == "util": - ids = get_all_utils(pudl_out).utility_id_eia.unique() - assn = pudl_out.utility_assn_eia861() + ids = utilids_all_eia.utility_id_eia.unique() + assn = utility_assn_eia861 assn_col = "utility_id_eia" else: raise ValueError(f"Got {entity_type=}, but need either 'ba' or 'util'") @@ -284,38 +318,106 @@ def compile_geoms( ids=ids, assn=assn, assn_col=assn_col, - st_eia861=pudl_out.service_territory_eia861(), + service_territory_eia861=service_territory_eia861, census_gdf=census_counties, limit_by_state=limit_by_state, dissolve=dissolve, ) - if save: - _save_geoparquet( + if save_format == "geoparquet": + if dissolve: + # States & counties only remain at this point if we didn't dissolve + for col in ("county_id_fips", "state_id_fips"): + # pandas.NA values are not compatible with Parquet Strings yet. + geom[col] = geom[col].fillna("") + + _save_geoparquet( # To do: update to use new io manager. geom, entity_type=entity_type, dissolve=dissolve, limit_by_state=limit_by_state, ) + elif save_format == "dataframe": + geom = pd.DataFrame(geom.drop(columns="geometry")) return geom -def _save_geoparquet(gdf, entity_type, dissolve, limit_by_state): - # For filenames based on input args: - dissolved = "" - if dissolve: - dissolved = "_dissolved" - else: - # States & counties only remain at this point if we didn't dissolve - for col in ("county_id_fips", "state_id_fips"): - # pandas.NA values are not compatible with Parquet Strings yet. - gdf[col] = gdf[col].fillna("") - limited = "" - if limit_by_state: - limited = "_limited" - # Save the geometries to a GeoParquet file - fn = f"{entity_type}_geom{limited+dissolved}.pq" - gdf.to_parquet(fn, index=False) +def compiled_geoms_asset_factory( + entity_type: Literal["ba", "util"], + io_manager_key: str | None = None, +) -> list[AssetsDefinition]: + """Build asset definitions for balancing authority and utility geometries.""" + + @asset( + name=f"compiled_geometry_{ENTITY_TYPE[entity_type]}_eia861", + io_manager_key=io_manager_key, + config_schema={ + "dissolve": Field( + bool, + default_value=False, + description=( + "If True, dissolve the compiled geometries to the entity level. If False, leave them as counties." + ), + ), + "limit_by_state": Field( + bool, + default_value=True, + description=( + "If True, only include counties with observed EIA 861 data in association with the state and utility/balancing authority." + ), + ), + "save_format": Field( + str, + default_value="dataframe", + description=( + "Format of output in PUDL. One of: geoparquet, geodataframe, dataframe." + ), + ), + }, + compute_kind="Python", + ) + def dagster_compile_geoms( + context, + balancing_authority_eia861: pd.DataFrame, + balancing_authority_assn_eia861: pd.DataFrame, + denorm_utilities_eia: pd.DataFrame, + service_territory_eia861: pd.DataFrame, + utility_assn_eia861: pd.DataFrame, + county_censusdp1: pd.DataFrame, + ): + """Compile all available utility or balancing authority geometries. + + Returns: + A dataframe compiling all available utility or balancing authority geometries. + """ + # Get options from dagster + dissolve = context.op_config["dissolve"] + limit_by_state = context.op_config["limit_by_state"] + save_format = context.op_config["save_format"] + + return compile_geoms( + balancing_authority_eia861=balancing_authority_eia861, + balancing_authority_assn_eia861=balancing_authority_assn_eia861, + denorm_utilities_eia=denorm_utilities_eia, + service_territory_eia861=service_territory_eia861, + utility_assn_eia861=utility_assn_eia861, + census_counties=county_censusdp1, + entity_type=entity_type, + dissolve=dissolve, + limit_by_state=limit_by_state, + save_format=save_format, + ) + + return [dagster_compile_geoms] + + +compiled_geometry_eia861_assets = [ + ass + for entity in list(ENTITY_TYPE) + for ass in compiled_geoms_asset_factory( + entity_type=entity, io_manager_key="pudl_sqlite_io_manager" + ) +] ################################################################################ @@ -478,11 +580,9 @@ def main(): pudl_settings = pudl.workspace.setup.get_defaults() pudl_engine = sa.create_engine(pudl_settings["pudl_db"]) - pudl_out = pudl.output.pudltabl.PudlTabl(pudl_engine) + # Load the US Census DP1 county data: - county_gdf = pudl.output.censusdp1tract.get_layer( - layer="county", pudl_settings=pudl_settings - ) + county_gdf = pudl.etl.defs.load_asset_value(AssetKey("county_censusdp1")) kwargs_dicts = [ {"entity_type": "util", "limit_by_state": False}, @@ -493,9 +593,18 @@ def main(): for kwargs in kwargs_dicts: _ = compile_geoms( - pudl_out, + balancing_authority_eia861=pd.read_sql( + "balancing_authority_eia861", pudl_engine + ), + balancing_authority_assn_eia861=pd.read_sql( + "balancing_authority_assn_eia861", pudl_engine + ), + denorm_utilities_eia=pd.read_sql(AssetKey("denorm_utilities_eia")), + service_territory_eia861=pd.read_sql(AssetKey("service_territory_eia861")), + utility_assn_eia861=pd.read_sql("utility_assn_eia861", pudl_engine), census_counties=county_gdf, dissolve=args.dissolve, + save_format="geoparquet", **kwargs, ) diff --git a/src/pudl/analysis/state_demand.py b/src/pudl/analysis/state_demand.py index d0f58778bd..cf973bfd97 100644 --- a/src/pudl/analysis/state_demand.py +++ b/src/pudl/analysis/state_demand.py @@ -27,10 +27,11 @@ from collections.abc import Iterable from typing import Any +import geopandas as gpd import matplotlib.pyplot as plt import numpy as np import pandas as pd -import sqlalchemy as sa +from dagster import AssetKey, AssetOut, Field, asset, multi_asset import pudl.analysis.timeseries_cleaning import pudl.logging_helpers @@ -265,14 +266,20 @@ def load_ventyx_hourly_state_demand(path: str) -> pd.DataFrame: # --- Datasets: FERC 714 hourly demand --- # -def load_ferc714_hourly_demand_matrix( - pudl_out: pudl.output.pudltabl.PudlTabl, +@multi_asset( + compute_kind="Python", + outs={ + "raw_hourly_demand_matrix_ferc714": AssetOut(), + "utc_offset_ferc714": AssetOut(), + }, +) +def load_hourly_demand_matrix_ferc714( + demand_hourly_pa_ferc714: pd.DataFrame, ) -> tuple[pd.DataFrame, pd.DataFrame]: """Read and format FERC 714 hourly demand into matrix form. Args: - pudl_out: Used to access - :meth:`pudl.output.pudltabl.PudlTabl.demand_hourly_pa_ferc714`. + demand_hourly_pa_ferc714: FERC 714 hourly demand time series by planning area. Returns: Hourly demand as a matrix with a `datetime` row index @@ -282,19 +289,22 @@ def load_ferc714_hourly_demand_matrix( A second Dataframe lists the UTC offset in hours of each `respondent_id_ferc714` and reporting `year` (int). """ - demand = pudl_out.demand_hourly_pa_ferc714().copy() # Convert UTC to local time (ignoring daylight savings) - demand["utc_offset"] = demand["timezone"].map(STANDARD_UTC_OFFSETS) - demand["datetime"] = utc_to_local(demand["utc_datetime"], demand["utc_offset"]) + demand_hourly_pa_ferc714["utc_offset"] = demand_hourly_pa_ferc714["timezone"].map( + STANDARD_UTC_OFFSETS + ) + demand_hourly_pa_ferc714["datetime"] = utc_to_local( + demand_hourly_pa_ferc714["utc_datetime"], demand_hourly_pa_ferc714["utc_offset"] + ) # Pivot to demand matrix: timestamps x respondents - matrix = demand.pivot( + matrix = demand_hourly_pa_ferc714.pivot( index="datetime", columns="respondent_id_ferc714", values="demand_mwh" ) # List timezone by year for each respondent - demand["year"] = demand["report_date"].dt.year - utc_offset = demand.groupby(["respondent_id_ferc714", "year"], as_index=False)[ - "utc_offset" - ].first() + demand_hourly_pa_ferc714["year"] = demand_hourly_pa_ferc714["report_date"].dt.year + utc_offset = demand_hourly_pa_ferc714.groupby( + ["respondent_id_ferc714", "year"], as_index=False + )["utc_offset"].first() return matrix, utc_offset @@ -425,23 +435,87 @@ def melt_ferc714_hourly_demand_matrix( return df +# --- Dagster assets for main functions --- # + + +@asset( + compute_kind="Python", + config_schema={ + "min_data": Field( + int, + default_value=100, + description=("Minimum number of non-null hours in a year."), + ), + "min_data_fraction": Field( + float, + default_value=0.9, + description=( + "Minimum fraction of non-null hours between the first and last non-null" + " hour in a year." + ), + ), + }, +) +def clean_hourly_demand_matrix_ferc714( + context, raw_hourly_demand_matrix_ferc714: pd.DataFrame +) -> pd.DataFrame: + """Cleaned and nulled FERC 714 hourly demand matrix. + + Args: + raw_hourly_demand_matrix_ferc714: FERC 714 hourly demand data in a matrix form. + + Returns: + df: Matrix with nulled anomalous values, where respondent-years with too few responses + are nulled and respondents with no data across all years are dropped. + """ + min_data = context.op_config["min_data"] + min_data_fraction = context.op_config["min_data_fraction"] + df = clean_ferc714_hourly_demand_matrix(raw_hourly_demand_matrix_ferc714) + df = filter_ferc714_hourly_demand_matrix( + df, min_data=min_data, min_data_fraction=min_data_fraction + ) + return df + + +@asset(compute_kind="Python") +def imputed_hourly_demand_ferc714( + clean_hourly_demand_matrix_ferc714: pd.DataFrame, utc_offset_ferc714: pd.DataFrame +) -> pd.DataFrame: + """Imputed FERC714 hourly demand in long format. + + Impute null values for FERC 714 hourly demand matrix, performing imputation + separately for each year using only respondents reporting data in that year. Then, + melt data into a long format. + + Args: + clean_hourly_demand_matrix_ferc714: Cleaned hourly demand matrix from FERC 714. + utc_offset_ferc714: Timezone by year for each respondent. + + Returns: + df: DataFrame with imputed FERC714 hourly demand. + """ + df = impute_ferc714_hourly_demand_matrix(clean_hourly_demand_matrix_ferc714) + df = melt_ferc714_hourly_demand_matrix(df, utc_offset_ferc714) + return df + + # --- Datasets: Counties --- # -def load_ferc714_county_assignments( - pudl_out: pudl.output.pudltabl.PudlTabl, +def county_assignments_ferc714( + fipsified_respondents_ferc714, ) -> pd.DataFrame: """Load FERC 714 county assignments. Args: - pudl_out: PUDL database extractor. + fipsified_respondents_ferc714: From `pudl.output.ferc714`, FERC 714 respondents + with county FIPS IDs. Returns: Dataframe with columns `respondent_id_ferc714`, report `year` (int), and `county_id_fips`. """ - respondents = pudl.output.ferc714.Respondents(pudl_out) - df = respondents.fipsify()[ + df = fipsified_respondents_ferc714[ ["respondent_id_ferc714", "county_id_fips", "report_date"] ] # Drop rows where county is blank or a duplicate @@ -452,41 +526,37 @@ def load_ferc714_county_assignments( return df -def load_counties( - pudl_out: pudl.output.pudltabl.PudlTabl, pudl_settings: dict -) -> pd.DataFrame: +def census_counties(county_censusdp1: gpd.GeoDataFrame) -> gpd.GeoDataFrame: """Load county attributes. Args: - pudl_out: PUDL database extractor. - pudl_settings: PUDL settings. + county_censusdp: The county layer of the Census DP1 geodatabase. Returns: Dataframe with columns `county_id_fips` and `population`. """ - df = pudl.output.censusdp1tract.get_layer( - layer="county", pudl_settings=pudl_settings - )[["geoid10", "dp0010001"]] - return df.rename(columns={"geoid10": "county_id_fips", "dp0010001": "population"}) + return county_censusdp1[["geoid10", "dp0010001"]].rename( + columns={"geoid10": "county_id_fips", "dp0010001": "population"} + ) # --- Allocation --- # -def load_eia861_state_total_sales( - pudl_out: pudl.output.pudltabl.PudlTabl, +def total_state_sales_eia861( + sales_eia861, ) -> pd.DataFrame: """Read and format EIA 861 sales by state and year. Args: - pudl_out: Used to access - :meth:`pudl.output.pudltabl.PudlTabl.sales_eia861`. + sales_eia861: Electricity sales data from EIA 861. Returns: Dataframe with columns `state_id_fips`, `year`, `demand_mwh`. """ - df = pudl_out.sales_eia861() - df = df.groupby(["state", "report_date"], as_index=False)["sales_mwh"].sum() + df = sales_eia861.groupby(["state", "report_date"], as_index=False)[ + "sales_mwh" + ].sum() # Convert report_date to year df["year"] = df["report_date"].dt.year # Convert state abbreviations to FIPS codes @@ -498,45 +568,65 @@ def load_eia861_state_total_sales( return df[["state_id_fips", "year", "demand_mwh"]] -def predict_state_hourly_demand( - demand: pd.DataFrame, - counties: pd.DataFrame, - assignments: pd.DataFrame, - state_totals: pd.DataFrame = None, - mean_overlaps: bool = False, +@asset( + io_manager_key="pudl_sqlite_io_manager", + compute_kind="Python", + config_schema={ + "mean_overlaps": Field( + bool, + default_value=False, + description=( + "Whether to mean the demands predicted for a county in cases when a " + "county is assigned to multiple respondents. By default, demands are " + "summed." + ), + ), + }, +) +def predicted_state_hourly_demand( + context, + imputed_hourly_demand_ferc714: pd.DataFrame, + county_censusdp1: pd.DataFrame, + fipsified_respondents_ferc714: pd.DataFrame, + sales_eia861: pd.DataFrame = None, ) -> pd.DataFrame: """Predict state hourly demand. Args: - demand: Hourly demand timeseries, with columns + imputed_hourly_demand_ferc714: Hourly demand timeseries, with columns `respondent_id_ferc714`, report `year`, `utc_datetime`, and `demand_mwh`. - counties: Counties, with columns `county_id_fips` and `population`. - assignments: County assignments for demand respondents, - with columns `respondent_id_ferc714`, `year`, and `county_id_fips`. - state_totals: Total annual demand by state, - with columns `state_id_fips`, `year`, and `demand_mwh`. - If provided, the predicted hourly demand is scaled to match these totals. - mean_overlaps: Whether to mean the demands predicted for a county - in cases when a county is assigned to multiple respondents. - By default, demands are summed. + county_censusdp1: The county layer of the Census DP1 shapefile. + fipsified_respondents_ferc714: Annual respondents with the county FIPS IDs + for their service territories. + sales_eia861: EIA 861 sales data. If provided, the predicted hourly demand is + scaled to match these totals. Returns: Dataframe with columns `state_id_fips`, `utc_datetime`, `demand_mwh`, and (if `state_totals` was provided) `scaled_demand_mwh`. """ + # Get config + mean_overlaps = context.op_config["mean_overlaps"] + + # Call necessary functions + count_assign_ferc714 = county_assignments_ferc714(fipsified_respondents_ferc714) + counties = census_counties(county_censusdp1) + total_sales_eia861 = total_state_sales_eia861(sales_eia861) + # Pre-compute list of respondent-years with demand with_demand = ( - demand.groupby(["respondent_id_ferc714", "year"], as_index=False)["demand_mwh"] + imputed_hourly_demand_ferc714.groupby( + ["respondent_id_ferc714", "year"], as_index=False + )["demand_mwh"] .sum() .query("demand_mwh > 0") )[["respondent_id_ferc714", "year"]] # Pre-compute state-county assignments - counties = counties.copy() counties["state_id_fips"] = counties["county_id_fips"].str[:2] # Merge counties with respondent- and state-county assignments df = ( - assignments + count_assign_ferc714 # Drop respondent-years with no demand .merge(with_demand, on=["respondent_id_ferc714", "year"]) # Merge with counties and state-county assignments @@ -559,15 +649,17 @@ def predict_state_hourly_demand( ["respondent_id_ferc714", "year", "state_id_fips"], as_index=False )["weight"].sum() # Multiply respondent-state weights with demands - df = weights.merge(demand, on=["respondent_id_ferc714", "year"]) + df = weights.merge( + imputed_hourly_demand_ferc714, on=["respondent_id_ferc714", "year"] + ) df["demand_mwh"] *= df["weight"] # Scale estimates using state totals - if state_totals is not None: + if total_sales_eia861 is not None: # Compute scale factor between current and target state totals totals = ( df.groupby(["state_id_fips", "year"], as_index=False)["demand_mwh"] .sum() - .merge(state_totals, on=["state_id_fips", "year"]) + .merge(total_sales_eia861, on=["state_id_fips", "year"]) ) totals["scale"] = totals["demand_mwh_y"] / totals["demand_mwh_x"] df = df.merge(totals[["state_id_fips", "year", "scale"]]) @@ -739,28 +831,10 @@ def main(): # --- Connect to PUDL database --- # pudl_settings = pudl.workspace.setup.get_defaults() - pudl_engine = sa.create_engine(pudl_settings["pudl_db"]) - pudl_out = pudl.output.pudltabl.PudlTabl(pudl_engine) - - # --- Prepare FERC 714 hourly demand --- # - - df, tz = load_ferc714_hourly_demand_matrix(pudl_out) - df = clean_ferc714_hourly_demand_matrix(df) - df = filter_ferc714_hourly_demand_matrix(df, min_data=100, min_data_fraction=0.9) - df = impute_ferc714_hourly_demand_matrix(df) - demand = melt_ferc714_hourly_demand_matrix(df, tz) - - # --- Predict demand --- # - - counties = load_counties(pudl_out, pudl_settings) - assignments = load_ferc714_county_assignments(pudl_out) - state_totals = load_eia861_state_total_sales(pudl_out) - prediction = predict_state_hourly_demand( - demand, - counties=counties, - assignments=assignments, - state_totals=state_totals, - mean_overlaps=False, + + # --- Read in inputs from PUDL + dagster cache --- # + prediction = pudl.etl.defs.load_asset_value( + AssetKey("predicted_state_hourly_demand") ) # --- Export results --- # diff --git a/src/pudl/convert/censusdp1tract_to_sqlite.py b/src/pudl/convert/censusdp1tract_to_sqlite.py index aab9d51d95..43567caa0c 100644 --- a/src/pudl/convert/censusdp1tract_to_sqlite.py +++ b/src/pudl/convert/censusdp1tract_to_sqlite.py @@ -14,20 +14,41 @@ is what we do in our CI setup and Docker images.) """ -import argparse import os import subprocess # nosec: B404 -import sys from pathlib import Path from tempfile import TemporaryDirectory +from dagster import Field, asset + import pudl +from pudl.helpers import EnvVar from pudl.workspace.datastore import Datastore logger = pudl.logging_helpers.get_logger(__name__) -def censusdp1tract_to_sqlite(pudl_settings=None, year=2010, ds=None, clobber=False): +@asset( + config_schema={ + "pudl_output_path": Field( + EnvVar( + env_var="PUDL_OUTPUT", + ), + description="Path of directory to store the database in.", + default_value=None, + ), + "clobber": Field( + bool, description="Clobber existing Census database.", default_value=True + ), + "year": Field( + int, + description="Year of Census data to extract (currently must be 2010).", + default_value=2010, + ), + }, + required_resource_keys={"datastore"}, +) +def censusdp1tract_to_sqlite(context): """Use GDAL's ogr2ogr utility to convert the Census DP1 GeoDB to an SQLite DB. The Census DP1 GeoDB is read from the datastore, where it is stored as a @@ -36,13 +57,10 @@ def censusdp1tract_to_sqlite(pudl_settings=None, year=2010, ds=None, clobber=Fal resulting SQLite DB file is put in the PUDL output directory alongside the ferc1 and pudl SQLite databases. - Args: - pudl_settings (dict): A PUDL settings dictionary. - year (int): Year of Census data to extract (currently must be 2010) - Returns: None """ + ds = context.resources.datastore if ds is None: ds = Datastore() # If we're in a conda environment, use the version of ogr2ogr that has been @@ -60,12 +78,14 @@ def censusdp1tract_to_sqlite(pudl_settings=None, year=2010, ds=None, clobber=Fal with TemporaryDirectory() as tmpdir: # Use datastore to grab the Census DP1 zipfile tmpdir_path = Path(tmpdir) - zip_ref = ds.get_zipfile_resource("censusdp1tract", year=year) + zip_ref = ds.get_zipfile_resource( + "censusdp1tract", year=context.op_config["year"] + ) extract_root = tmpdir_path / Path(zip_ref.filelist[0].filename) - out_path = Path(pudl_settings["pudl_out"]) / "censusdp1tract.sqlite" + out_path = Path(context.op_config["pudl_output_path"]) / "censusdp1tract.sqlite" if out_path.exists(): - if clobber: + if context.op_config["clobber"]: out_path.unlink() else: raise SystemExit( @@ -80,80 +100,4 @@ def censusdp1tract_to_sqlite(pudl_settings=None, year=2010, ds=None, clobber=Fal subprocess.run( # nosec: B603 Trying to use absolute paths. [ogr2ogr, str(out_path), str(extract_root)], check=True ) - - -def parse_command_line(argv): - """Parse command line arguments. See the -h option. - - Args: - argv (str): Command line arguments, including caller filename. - - Returns: - dict: Dictionary of command line arguments and their parsed values. - """ - parser = argparse.ArgumentParser(description=__doc__) - parser.add_argument( - "--logfile", - default=None, - type=str, - help="If specified, write logs to this file.", - ) - parser.add_argument( - "-c", - "--clobber", - action="store_true", - help="""Clobber existing sqlite database if it exists. If clobber is - not included but the sqlite databse already exists the _build will - fail.""", - default=False, - ) - parser.add_argument( - "--sandbox", - action="store_true", - default=False, - help="Use the Zenodo sandbox rather than production", - ) - parser.add_argument( - "--gcs-cache-path", - type=str, - help="Load datastore resources from Google Cloud Storage. Should be gs://bucket[/path_prefix]", - ) - parser.add_argument( - "--bypass-local-cache", - action="store_true", - default=False, - help="If enabled, the local file cache for datastore will not be used.", - ) - parser.add_argument( - "--loglevel", - help="Set logging level (DEBUG, INFO, WARNING, ERROR, or CRITICAL).", - default="INFO", - ) - arguments = parser.parse_args(argv[1:]) - return arguments - - -def main(): - """Convert the Census DP1 GeoDatabase into an SQLite Database.""" - args = parse_command_line(sys.argv) - pudl.logging_helpers.configure_root_logger( - logfile=args.logfile, loglevel=args.loglevel - ) - pudl_settings = pudl.workspace.setup.get_defaults() - - # Configure how we want to obtain raw input data: - ds_kwargs = dict( - gcs_cache_path=args.gcs_cache_path, sandbox=pudl_settings.get("sandbox", False) - ) - if not args.bypass_local_cache: - ds_kwargs["local_cache_path"] = Path(pudl_settings["pudl_in"]) / "data" - - ds = Datastore(**ds_kwargs) - - pudl_settings["sandbox"] = args.sandbox - - censusdp1tract_to_sqlite(pudl_settings=pudl_settings, ds=ds, clobber=args.clobber) - - -if __name__ == "__main__": - sys.exit(main()) + return out_path diff --git a/src/pudl/etl/__init__.py b/src/pudl/etl/__init__.py index abb8adb373..0bcc273d0b 100644 --- a/src/pudl/etl/__init__.py +++ b/src/pudl/etl/__init__.py @@ -39,6 +39,11 @@ *load_assets_from_modules([pudl.transform.ferc1], group_name="norm_ferc1"), *load_assets_from_modules([pudl.extract.ferc714], group_name="raw_ferc714"), *load_assets_from_modules([pudl.transform.ferc714], group_name="clean_ferc714"), + *load_assets_from_modules([pudl.output.ferc714], group_name="respondents_ferc714"), + *load_assets_from_modules( + [pudl.convert.censusdp1tract_to_sqlite, pudl.output.censusdp1tract], + group_name="censusdp1", + ), *load_assets_from_modules([glue_assets], group_name="glue"), *load_assets_from_modules([static_assets], group_name="static"), *load_assets_from_modules( @@ -51,6 +56,12 @@ group_name="denorm_eia", ), *load_assets_from_modules([pudl.output.ferc1], group_name="denorm_ferc1"), + *load_assets_from_modules( + [pudl.analysis.service_territory], group_name="service_territory_eia861" + ), + *load_assets_from_modules( + [pudl.analysis.state_demand], group_name="state_demand_ferc714" + ), ) default_resources = { diff --git a/src/pudl/metadata/classes.py b/src/pudl/metadata/classes.py index 922561f284..9d8548817c 100644 --- a/src/pudl/metadata/classes.py +++ b/src/pudl/metadata/classes.py @@ -1198,7 +1198,9 @@ class Resource(Base): "static_eia", "static_eia_disabled", "eia_bulk_elec", + "state_demand", "static_pudl", + "service_territories", ] = None create_database_schema: bool = True diff --git a/src/pudl/metadata/fields.py b/src/pudl/metadata/fields.py index 231940c5f9..d80a5e6093 100644 --- a/src/pudl/metadata/fields.py +++ b/src/pudl/metadata/fields.py @@ -80,6 +80,7 @@ "type": "string", "description": "EIA record ID of the associated true granularity record.", }, + "area_km2": {"type": "number", "description": "County area in km2.", "unit": "km2"}, "ash_content_pct": { "type": "number", "description": "Ash content percentage by weight to the nearest 0.1 percent.", @@ -484,6 +485,10 @@ "pattern": r"^\d{5}$", }, }, + "county_name_census": { + "type": "string", + "description": "County name as specified in Census DP1 Data.", + }, "country_code": { "type": "string", "description": "Three letter ISO-3166 country code (e.g. USA or CAN).", @@ -548,12 +553,22 @@ }, "delivery_customers": {"type": "number"}, "demand_annual_mwh": {"type": "number", "unit": "MWh"}, + "demand_annual_per_capita_mwh": { + "type": "number", + "description": "Per-capita annual demand, averaged using Census county-level population estimates.", + "unit": "MWh/person", + }, "demand_charges": { "type": "number", "description": "Demand charges (USD).", "unit": "USD", }, "demand_mwh": {"type": "number", "unit": "MWh"}, + "demand_density_mwh_km2": { + "type": "number", + "description": "Annual demand per km2 of a given service territory.", + "unit": "MWh/km2", + }, "depreciation_type": { "type": "string", "description": ( @@ -1977,6 +1992,14 @@ "type": "boolean", "description": "Is the reporting entity an owner of power plants reported on Schedule 2 of the form?", }, + "population": { + "type": "number", + "description": "County population, sourced from Census DP1 data.", + }, + "population_density_km2": { + "type": "number", + "description": "Average population per sq. km area of a service territory.", + }, "potential_peak_demand_savings_mw": {"type": "number", "unit": "MW"}, "previously_canceled": { "type": "boolean", @@ -2152,6 +2175,11 @@ "unit": "USD", }, "sales_to_ultimate_consumers_mwh": {"type": "number", "unit": "MWh"}, + "scaled_demand_mwh": { + "type": "number", + "description": "Estimated electricity demand scaled by the total sales within a state.", + "unit": "MWh", + }, "secondary_transportation_mode_code": { "type": "string", "description": "Transportation mode for the second longest distance transported.", diff --git a/src/pudl/metadata/resources/eia.py b/src/pudl/metadata/resources/eia.py index 64c5b88015..a661e8e2ac 100644 --- a/src/pudl/metadata/resources/eia.py +++ b/src/pudl/metadata/resources/eia.py @@ -14,16 +14,19 @@ "exclude": [ "advanced_metering_infrastructure_eia861", "balancing_authority_eia861", + "compiled_geometry_balancing_authority_eia861", "demand_response_eia861", "demand_response_water_heater_eia861", "dynamic_pricing_eia861", "energy_efficiency_eia861", + "fipsified_respondents_ferc714", "net_metering_customer_fuel_class_eia861", "net_metering_misc_eia861", "non_net_metering_customer_fuel_class_eia861", "non_net_metering_misc_eia861", "reliability_eia861", "sales_eia861", + "summarized_demand_ferc714", ], }, }, @@ -688,6 +691,7 @@ "utilities_eia", "advanced_metering_infrastructure_eia861", "balancing_authority_assn_eia861", + "compiled_geometry_utility_eia861", "demand_response_eia861", "demand_response_water_heater_eia861", "demand_side_management_ee_dr_eia861", @@ -699,6 +703,7 @@ "distribution_systems_eia861", "dynamic_pricing_eia861", "energy_efficiency_eia861", + "fipsified_respondents_ferc714", "green_pricing_eia861", "mergers_eia861", "net_metering_customer_fuel_class_eia861", @@ -710,6 +715,7 @@ "reliability_eia861", "sales_eia861", "service_territory_eia861", + "summarized_demand_ferc714", "utility_assn_eia861", "utility_data_misc_eia861", "utility_data_nerc_eia861", diff --git a/src/pudl/metadata/resources/eia860.py b/src/pudl/metadata/resources/eia860.py index 6f605b3ec0..3f3ce29a62 100644 --- a/src/pudl/metadata/resources/eia860.py +++ b/src/pudl/metadata/resources/eia860.py @@ -435,6 +435,7 @@ "exclude": [ "advanced_metering_infrastructure_eia861", "balancing_authority_assn_eia861", + "compiled_geometry_utility_eia861", "demand_response_eia861", "demand_response_water_heater_eia861", "demand_side_management_ee_dr_eia861", @@ -454,6 +455,7 @@ "distribution_systems_eia861", "dynamic_pricing_eia861", "energy_efficiency_eia861", + "fipsified_respondents_ferc714", "green_pricing_eia861", "mergers_eia861", "net_metering_customer_fuel_class_eia861", @@ -465,6 +467,7 @@ "reliability_eia861", "sales_eia861", "service_territory_eia861", + "summarized_demand_ferc714", "utility_assn_eia861", "utility_data_misc_eia861", "utility_data_nerc_eia861", diff --git a/src/pudl/metadata/resources/eia861.py b/src/pudl/metadata/resources/eia861.py index 953e70a61b..8808cf255c 100644 --- a/src/pudl/metadata/resources/eia861.py +++ b/src/pudl/metadata/resources/eia861.py @@ -656,4 +656,49 @@ "sources": ["eia861"], "etl_group": "eia861", }, + "compiled_geometry_utility_eia861": { + "description": "County-level spatial data for EIA861 utilities.", + "schema": { + "fields": [ + "county_id_fips", + "county_name_census", + "population", + "area_km2", + "report_date", + "utility_id_eia", + "state", + "county", + "state_id_fips", + ], + "primary_key": ["utility_id_eia", "report_date", "county_id_fips"], + }, + "sources": ["eia861", "censusdp1"], + "field_namespace": "eia", + "etl_group": "service_territories", + }, + "compiled_geometry_balancing_authority_eia861": { + "description": "County-level spatial data for EIA861 balancing authorities.", + "schema": { + "fields": [ + "county_id_fips", + "county_name_census", + "population", + "area_km2", + "report_date", + "balancing_authority_id_eia", + "state", + "county", + "state_id_fips", + ], + "primary_key": [ + "balancing_authority_id_eia", + "report_date", + "county_id_fips", + "county", + ], + }, + "sources": ["eia861", "censusdp1"], + "field_namespace": "eia", + "etl_group": "service_territories", + }, } diff --git a/src/pudl/metadata/resources/ferc714.py b/src/pudl/metadata/resources/ferc714.py index 395f3705af..2a85f8c6ae 100644 --- a/src/pudl/metadata/resources/ferc714.py +++ b/src/pudl/metadata/resources/ferc714.py @@ -37,6 +37,65 @@ "field_namespace": "ferc714", "etl_group": "ferc714", }, + "fipsified_respondents_ferc714": { + "description": ( + "Annual respondents with the county FIPS IDs for their service territories." + ), + "schema": { + "fields": [ + "eia_code", + "respondent_type", + "respondent_id_ferc714", + "respondent_name_ferc714", + "report_date", + "balancing_authority_id_eia", + "balancing_authority_code_eia", + "balancing_authority_name_eia", + "utility_id_eia", + "utility_name_eia", + "state", + "county", + "state_id_fips", + "county_id_fips", + ] + # No primary key here because the state and county FIPS columns + # which are part of the natural primary key can be null. + # The natural primary key would be: + # ['respondent_id_ferc714', 'report_date', 'state_id_fips', 'county_id_fips'] + }, + "sources": ["ferc714"], + "field_namespace": "ferc714", + "etl_group": "outputs", + }, + "summarized_demand_ferc714": { + "description": ( + "Compile FERC 714 annualized, categorized respondents and summarize values." + ), + "schema": { + "fields": [ + "report_date", + "respondent_id_ferc714", + "demand_annual_mwh", + "population", + "area_km2", + "population_density_km2", + "demand_annual_per_capita_mwh", + "demand_density_mwh_km2", + "eia_code", + "respondent_type", + "respondent_name_ferc714", + "balancing_authority_id_eia", + "balancing_authority_code_eia", + "balancing_authority_name_eia", + "utility_id_eia", + "utility_name_eia", + ], + "primary_key": ["respondent_id_ferc714", "report_date"], + }, + "sources": ["ferc714"], + "field_namespace": "ferc714", + "etl_group": "outputs", + }, } """FERC Form 714 resource attributes by PUDL identifier (``resource.name``). diff --git a/src/pudl/metadata/resources/pudl.py b/src/pudl/metadata/resources/pudl.py index d83d1a92a1..fec1782ab2 100644 --- a/src/pudl/metadata/resources/pudl.py +++ b/src/pudl/metadata/resources/pudl.py @@ -78,6 +78,20 @@ "field_namespace": "pudl", "sources": ["pudl"], }, + "predicted_state_hourly_demand": { + "schema": { + "fields": [ + "state_id_fips", + "utc_datetime", + "demand_mwh", + "scaled_demand_mwh", + ], + "primary_key": ["state_id_fips", "utc_datetime"], + }, + "etl_group": "state_demand", + "field_namespace": "pudl", + "sources": ["ferc714", "eia861", "censusdp1"], + }, } """PUDL-specifiic resource attributes by PUDL identifier (``resource.name``). diff --git a/src/pudl/output/censusdp1tract.py b/src/pudl/output/censusdp1tract.py index ac6ddeaa09..45e202de23 100644 --- a/src/pudl/output/censusdp1tract.py +++ b/src/pudl/output/censusdp1tract.py @@ -1,82 +1,79 @@ """Functions for reading data out of the Census DP1 SQLite Database.""" -from pathlib import Path -from typing import Literal - import geopandas as gpd import pandas as pd import sqlalchemy as sa +from dagster import AssetIn, AssetsDefinition, asset import pudl -from pudl.convert.censusdp1tract_to_sqlite import censusdp1tract_to_sqlite -from pudl.workspace.datastore import Datastore logger = pudl.logging_helpers.get_logger(__name__) -def get_layer( - layer: Literal["state", "county", "tract"], pudl_settings=None, ds=None -) -> gpd.GeoDataFrame: - """Select one layer from the Census DP1 database. +def census_asset_factory(layer: str) -> AssetsDefinition: + """An asset factory for finished EIA tables.""" - Uses information within the Census DP1 database to set the coordinate - reference system and to identify the column containing the geometry. The - geometry column is renamed to "geom" as that's the default withing - Geopandas. No other column names or types are altered. + @asset( + ins={"censusdp1tract_to_sqlite": AssetIn("censusdp1tract_to_sqlite")}, + name=f"{layer}_censusdp1", + ) + def census_layer(censusdp1tract_to_sqlite, **kwargs) -> gpd.GeoDataFrame: + """Select one layer from the Census DP1 database. - Args: - layer (str): Which set of geometries to read, must be one of "state", - "county", or "tract". - pudl_settings (dict or None): A dictionary of PUDL settings, including - paths to various resources like the Census DP1 SQLite database. If - None, the user defaults are used. + Uses information within the Census DP1 database to set the coordinate reference + system and to identify the column containing the geometry. The geometry column + is renamed to "geom" as that's the default withing Geopandas. No other column + names or types are altered. + """ + census_conn = f"sqlite:///{censusdp1tract_to_sqlite}" + dp1_engine = sa.create_engine(census_conn) - Returns: - geopandas.GeoDataFrame - """ - if not isinstance(layer, str): - raise TypeError(f"Argument 'layer' must be a string, got arg of type {layer}.") - layer = layer.lower() - if layer not in ["state", "county", "tract"]: - raise ValueError( - "Census DP1 layer must be one of 'state', 'county' or 'tract', " - f"but got {layer}." - ) - if pudl_settings is None: - pudl_settings = pudl.workspace.setup.get_defaults() - if ds is None: - ds = Datastore() - # Check if we have the Census DP1 database. If not, create it. - if not Path(pudl_settings["pudl_out"], "censusdp1tract.sqlite").exists(): - logger.info("Census DP1 SQLite DB is missing. Creating it.") - censusdp1tract_to_sqlite(pudl_settings=pudl_settings, ds=ds) + def get_layer(layer, dp1_engine): + if not isinstance(layer, str): + raise TypeError( + f"Argument 'layer' must be a string, got arg of type {layer}." + ) + layer = layer.lower() + if layer not in ["state", "county", "tract"]: + raise ValueError( + "Census DP1 layer must be one of 'state', 'county' or 'tract', " + f"but got {layer}." + ) + table_name = f"{layer}_2010census_dp1" + df = pd.read_sql( + """ + SELECT geom_cols.f_table_name as table_name, + geom_cols.f_geometry_column as geom_col, + crs.auth_name as auth_name, + crs.auth_srid as auth_srid + FROM geometry_columns geom_cols + INNER JOIN spatial_ref_sys crs + ON geom_cols.srid = crs.srid + WHERE table_name = ? + """, + dp1_engine, + params=[table_name], + ) + if len(df) != 1: + raise AssertionError( + f"Expected exactly 1 geometry description, but found {len(df)}" + ) - dp1_engine = sa.create_engine(pudl_settings["censusdp1tract_db"]) + geom_col = df.at[0, "geom_col"] + crs_auth_str = f"{df.at[0, 'auth_name']}:{df.at[0, 'auth_srid']}".lower() - table_name = f"{layer}_2010census_dp1" - df = pd.read_sql( - """ -SELECT geom_cols.f_table_name as table_name, - geom_cols.f_geometry_column as geom_col, - crs.auth_name as auth_name, - crs.auth_srid as auth_srid - FROM geometry_columns geom_cols - INNER JOIN spatial_ref_sys crs - ON geom_cols.srid = crs.srid - WHERE table_name = ? -""", - dp1_engine, - params=[table_name], - ) - if len(df) != 1: - raise AssertionError( - f"Expected exactly 1 geometry description, but found {len(df)}" - ) + gdf = gpd.read_postgis( + table_name, dp1_engine, geom_col=geom_col, crs=crs_auth_str + ) + gdf.rename_geometry("geometry", inplace=True) + + return gdf + + return get_layer(layer, dp1_engine) - geom_col = df.at[0, "geom_col"] - crs_auth_str = f"{df.at[0, 'auth_name']}:{df.at[0, 'auth_srid']}".lower() + return census_layer - gdf = gpd.read_postgis(table_name, dp1_engine, geom_col=geom_col, crs=crs_auth_str) - gdf.rename_geometry("geometry", inplace=True) - return gdf +census_dp1_layers = [ + census_asset_factory(layer) for layer in ["state", "county", "tract"] +] diff --git a/src/pudl/output/ferc714.py b/src/pudl/output/ferc714.py index 255b2a9e88..5c9100ee48 100644 --- a/src/pudl/output/ferc714.py +++ b/src/pudl/output/ferc714.py @@ -1,13 +1,15 @@ """Functions & classes for compiling derived aspects of the FERC Form 714 data.""" -from functools import cached_property +from datetime import datetime from typing import Any +import geopandas as gpd import numpy as np import pandas as pd +from dagster import Field, asset import pudl +from pudl.analysis.service_territory import utility_ids_all_eia from pudl.metadata.fields import apply_pudl_dtypes -from pudl.workspace.datastore import Datastore logger = pudl.logging_helpers.get_logger(__name__) @@ -81,20 +83,20 @@ ################################################################################ -def add_dates(rids_ferc714, report_dates): +def add_dates(rids_ferc714: pd.DataFrame, report_dates: list[datetime]) -> pd.DataFrame: """Broadcast respondent data across dates. Args: - rids_ferc714 (pandas.DataFrame): A simple FERC 714 Respondent ID dataframe, + rids_ferc714: A simple FERC 714 Respondent ID dataframe, without any date information. - report_dates (ordered collection of datetime): Dates for which each respondent + report_dates: Dates for which each respondent should be given a record. Raises: ValueError: if a ``report_date`` column exists in ``rids_ferc714``. Returns: - pandas.DataFrame: Dataframe having all the same columns as the input + A Dataframe having all the same columns as the input ``rids_ferc714`` with the addition of a ``report_date`` column, but with all records associated with each ``respondent_id_ferc714`` duplicated on a per-date basis. @@ -116,7 +118,12 @@ def add_dates(rids_ferc714, report_dates): return rids_with_dates -def categorize_eia_code(eia_codes, ba_ids, util_ids, priority="balancing_authority"): +def categorize_eia_code( + eia_codes: list[int], + ba_ids: list[int], + util_ids: list[int], + priority: str = "balancing_authority", +) -> pd.DataFrame: """Categorize FERC 714 ``eia_codes`` as either balancing authority or utility IDs. Most FERC 714 respondent IDs are associated with an ``eia_code`` which refers to @@ -145,18 +152,18 @@ def categorize_eia_code(eia_codes, ba_ids, util_ids, priority="balancing_authori priority with all utility IDs Args: - eia_codes (ordered collection of ints): A collection of IDs which may be either + eia_codes: A collection of IDs which may be either associated with EIA balancing authorities or utilities, to be categorized. - ba_ids_eia (ordered collection of ints): A collection of IDs which should be + ba_ids_eia: A collection of IDs which should be interpreted as belonging to EIA Balancing Authorities. - util_ids_eia (ordered collection of ints): A collection of IDs which should be + util_ids_eia: A collection of IDs which should be interpreted as belonging to EIA Utilities. - priorty (str): Which respondent_type to give priority to if the eia_code shows + priority: Which respondent_type to give priority to if the eia_code shows up in both util_ids_eia and ba_ids_eia. Must be one of "utility" or - "balancing_authority". The default is "balanacing_authority". + "balancing_authority". The default is "balancing_authority". Returns: - pandas.DataFrame: A dataframe containing 2 columns: ``eia_code`` and + A DataFrame containing 2 columns: ``eia_code`` and ``respondent_type``. """ if priority == "balancing_authority": @@ -195,456 +202,471 @@ def categorize_eia_code(eia_codes, ba_ids, util_ids, priority="balancing_authori return df -class Respondents: - """A class coordinating compilation of data related to FERC 714 Respondents. - - The FERC 714 Respondents themselves are not complex as they are reported, but - various ambiguities and the need to associate service territories with them mean - there are a lot of different derived aspects related to them which we repeatedly - need to compile in a self consistent way. This class allows you to choose several - parameters for that compilation, and then easily access the resulting derived - tabular outputs. - - Some of these derived attributes are computationally expensive, and so they are - cached internally. You can force a new computation in most cases by using - ``update=True`` in the access methods. However, this functionality isn't totally - implemented because we're still depending on the interim ETL processes for the FERC - 714 and EIA 861 data, and we don't want to trigger whole new ETL runs every time - a derived value is updated. - - Attributes: - pudl_out (pudl.output.pudltabl.PudlTabl): The PUDL output object which should be - used to obtain PUDL data. - pudl_settings (dict or None): A dictionary of settings indicating where data - related to PUDL can be found. Needed to obtain US Census DP1 data which - has the county geometries. - ba_ids (ordered collection or None): EIA IDs that should be treated as referring - to balancing authorities in respondent categorization process. If None, all - known values of ``balancing_authority_id_eia`` will be used. - util_ids (ordered collection or None): EIA IDs that should be treated as - referring to utilities in respondent categorization process. If None, all - known values of ``utility_id_eia`` will be used. - priority (str): Which type of entity should take priority in the categorization - of FERC 714 respondents. Must be either ``utility`` or - ``balancing_authority.`` The default is ``balancing_authority``. - limit_by_state (bool): Whether to limit respondent service territories to the - states where they have documented activity in the EIA 861. Currently this - is only implemented for Balancing Authorities. - """ +################################################################################# +# Functions to compute analysis assets. +################################################################################ - def __init__( - self, - pudl_out, - pudl_settings=None, - ba_ids=None, - util_ids=None, - priority="balancing_authority", - limit_by_state=True, - ds=None, - ): - """Set respondent compilation parameters.""" - self.pudl_out = pudl_out - self.ds = ds - - if pudl_settings is None: - pudl_settings = pudl.workspace.setup.get_defaults() - self.pudl_settings = pudl_settings - if ds is None: - ds = Datastore() - if ba_ids is None: - ba_ids = ( - self.balancing_authority_eia861.balancing_authority_id_eia.dropna().unique() - ) - self.ba_ids = ba_ids - - if util_ids is None: - util_ids = pudl.analysis.service_territory.get_all_utils( - self.pudl_out - ).utility_id_eia - self.util_ids = util_ids - - self.priority = priority - self.limit_by_state = limit_by_state - self._categorized = None - self._annualized = None - self._demand_summary = None - self._fipsified = None - self._counties_gdf = None - self._respondents_gdf = None - - @cached_property - def balancing_authority_eia861(self) -> pd.DataFrame: - """Modified balancing_authority_eia861 table.""" - df = self.pudl_out.balancing_authority_eia861() - index = ["balancing_authority_id_eia", "report_date"] - dfi = df.set_index(index) - # Prepare reference rows - keys = [(fix["id"], pd.Timestamp(fix["from"], 1, 1)) for fix in ASSOCIATIONS] - refs = dfi.loc[keys].reset_index().to_dict("records") - # Build table of new rows - # Insert row for each target balancing authority-year pair - # missing from the original table, using the reference year as a template. - rows = [] - for ref, fix in zip(refs, ASSOCIATIONS): - for year in range(fix["to"][0], fix["to"][1] + 1): - key = (fix["id"], pd.Timestamp(year, 1, 1)) - if key not in dfi.index: - rows.append({**ref, "report_date": key[1]}) - # Append to original table - df = pd.concat([df, pd.DataFrame(rows)]) - # Remove balancing authorities treated as utilities - mask = df["balancing_authority_id_eia"].isin([util["id"] for util in UTILITIES]) - return df[~mask] - - @cached_property - def balancing_authority_assn_eia861(self) -> pd.DataFrame: - """Modified balancing_authority_assn_eia861 table.""" - df = self.pudl_out.balancing_authority_assn_eia861() - # Prepare reference rows - refs = [] - for fix in ASSOCIATIONS: - mask = df["balancing_authority_id_eia"].eq(fix["id"]).to_numpy(bool) - mask[mask] = df["report_date"][mask].eq(pd.Timestamp(fix["from"], 1, 1)) - ref = df[mask] - if "exclude" in fix: - # Exclude utilities by state - mask = ~ref["state"].isin(fix["exclude"]) - ref = ref[mask] - refs.append(ref) - # Buid table of new rows - # Insert (or overwrite) rows for each target balancing authority-year pair, - # using the reference year as a template. - replaced = np.zeros(df.shape[0], dtype=bool) - tables = [] - for ref, fix in zip(refs, ASSOCIATIONS): - for year in range(fix["to"][0], fix["to"][1] + 1): - key = fix["id"], pd.Timestamp(year, 1, 1) - mask = df["balancing_authority_id_eia"].eq(key[0]).to_numpy(bool) - mask[mask] = df["report_date"][mask].eq(key[1]) - tables.append(ref.assign(report_date=key[1])) - replaced |= mask - # Append to original table with matching rows removed - df = pd.concat([df[~replaced], pd.concat(tables)]) - # Remove balancing authorities treated as utilities - mask = np.zeros(df.shape[0], dtype=bool) - tables = [] - for util in UTILITIES: - is_parent = df["balancing_authority_id_eia"].eq(util["id"]) - mask |= is_parent - # Associated utilities are reassigned to parent balancing authorities - if "reassign" in util and util["reassign"]: - # Ignore when entity is child to itself - is_child = ~is_parent & df["utility_id_eia"].eq(util["id"]) - # Build table associating parents to children of entity - table = ( - df[is_child] - .merge( - df[is_parent & ~df["utility_id_eia"].eq(util["id"])], - left_on=["report_date", "utility_id_eia"], - right_on=["report_date", "balancing_authority_id_eia"], - ) - .drop( - columns=[ - "utility_id_eia_x", - "state_x", - "balancing_authority_id_eia_y", - ] - ) - .rename( - columns={ - "balancing_authority_id_eia_x": "balancing_authority_id_eia", - "utility_id_eia_y": "utility_id_eia", - "state_y": "state", - } - ) + +def filled_balancing_authority_eia861( + balancing_authority_eia861: pd.DataFrame, +) -> pd.DataFrame: + """Modified balancing_authority_eia861 table. + + This function adds rows for each balancing authority-year pair missing from the + cleaned balancing_authority_eia861 table, using a dictionary of manual fixes. It + uses the reference year as a template. The function also removes balancing + authorities that are manually categorized as utilities. + """ + df = balancing_authority_eia861 + index = ["balancing_authority_id_eia", "report_date"] + dfi = df.set_index(index) + # Prepare reference rows + keys = [(fix["id"], pd.Timestamp(fix["from"], 1, 1)) for fix in ASSOCIATIONS] + refs = dfi.loc[keys].reset_index().to_dict("records") + # Build table of new rows + # Insert row for each target balancing authority-year pair + # missing from the original table, using the reference year as a template. + rows = [] + for ref, fix in zip(refs, ASSOCIATIONS): + for year in range(fix["to"][0], fix["to"][1] + 1): + key = (fix["id"], pd.Timestamp(year, 1, 1)) + if key not in dfi.index: + rows.append({**ref, "report_date": key[1]}) + # Append to original table + df = pd.concat([df, pd.DataFrame(rows)]) + # Remove balancing authorities treated as utilities + mask = df["balancing_authority_id_eia"].isin([util["id"] for util in UTILITIES]) + return df[~mask] + + +def filled_balancing_authority_assn_eia861( + balancing_authority_assn_eia861: pd.DataFrame, +) -> pd.DataFrame: + """Modified balancing_authority_assn_eia861 table. + + This function adds rows for each balancing authority-year pair missing from the + cleaned balancing_authority_assn_eia861 table, using a dictionary of manual fixes. + It uses the reference year as a template. The function also reassigns balancing + authorities that are manually categorized as utilities to their parent balancing + authorities. + """ + df = balancing_authority_assn_eia861 + # Prepare reference rows + refs = [] + for fix in ASSOCIATIONS: + mask = df["balancing_authority_id_eia"].eq(fix["id"]).to_numpy(bool) + mask[mask] = df["report_date"][mask].eq(pd.Timestamp(fix["from"], 1, 1)) + ref = df[mask] + if "exclude" in fix: + # Exclude utilities by state + mask = ~ref["state"].isin(fix["exclude"]) + ref = ref[mask] + refs.append(ref) + # Buid table of new rows + # Insert (or overwrite) rows for each target balancing authority-year pair, + # using the reference year as a template. + replaced = np.zeros(df.shape[0], dtype=bool) + tables = [] + for ref, fix in zip(refs, ASSOCIATIONS): + for year in range(fix["to"][0], fix["to"][1] + 1): + key = fix["id"], pd.Timestamp(year, 1, 1) + mask = df["balancing_authority_id_eia"].eq(key[0]).to_numpy(bool) + mask[mask] = df["report_date"][mask].eq(key[1]) + tables.append(ref.assign(report_date=key[1])) + replaced |= mask + # Append to original table with matching rows removed + df = pd.concat([df[~replaced], pd.concat(tables)]) + # Remove balancing authorities treated as utilities + mask = np.zeros(df.shape[0], dtype=bool) + tables = [] + for util in UTILITIES: + is_parent = df["balancing_authority_id_eia"].eq(util["id"]) + mask |= is_parent + # Associated utilities are reassigned to parent balancing authorities + if "reassign" in util and util["reassign"]: + # Ignore when entity is child to itself + is_child = ~is_parent & df["utility_id_eia"].eq(util["id"]) + # Build table associating parents to children of entity + table = ( + df[is_child] + .merge( + df[is_parent & ~df["utility_id_eia"].eq(util["id"])], + left_on=["report_date", "utility_id_eia"], + right_on=["report_date", "balancing_authority_id_eia"], ) - tables.append(table) - if "replace" in util and util["replace"]: - mask |= is_child - return pd.concat([df[~mask], pd.concat(tables)]).drop_duplicates() - - @cached_property - def service_territory_eia861(self) -> pd.DataFrame: - """Modified service_territory_eia861 table.""" - index = ["utility_id_eia", "state", "report_date"] - # Select relevant balancing authority-utility associations - assn = self.balancing_authority_assn_eia861 - selected = np.zeros(assn.shape[0], dtype=bool) - for fix in ASSOCIATIONS: - years = [fix["from"], *range(fix["to"][0], fix["to"][1] + 1)] - dates = [pd.Timestamp(year, 1, 1) for year in years] - mask = assn["balancing_authority_id_eia"].eq(fix["id"]).to_numpy(bool) - mask[mask] = assn["report_date"][mask].isin(dates) - selected |= mask - # Reformat as unique utility-state-year - assn = assn[selected][index].drop_duplicates() - # Select relevant service territories - df = self.pudl_out.service_territory_eia861() - mdf = assn.merge(df, how="left") - # Drop utility-state with no counties for all years - grouped = mdf.groupby(["utility_id_eia", "state"])["county_id_fips"] - mdf = mdf[grouped.transform("count").gt(0)] - # Fill missing utility-state-year with nearest year with counties - grouped = mdf.groupby(index)["county_id_fips"] - missing = mdf[grouped.transform("count").eq(0)].to_dict("records") - has_county = mdf["county_id_fips"].notna() - tables = [] - for row in missing: - mask = ( - mdf["utility_id_eia"].eq(row["utility_id_eia"]) - & mdf["state"].eq(row["state"]) - & has_county - ) - years = mdf["report_date"][mask].drop_duplicates() - # Match to nearest year - idx = (years - row["report_date"]).abs().idxmin() - mask &= mdf["report_date"].eq(years[idx]) - tables.append(mdf[mask].assign(report_date=row["report_date"])) - return pd.concat([df] + tables) - - def annualize(self, update=False): - """Broadcast respondent data across all years with reported demand. - - The FERC 714 Respondent IDs and names are reported in their own table, without - any refence to individual years, but much of the information we are associating - with them varies annually. This method creates an annualized version of the - respondent table, with each respondent having an entry corresponding to every - year in which hourly demand was reported in the FERC 714 dataset as a whole -- - this necessarily means that many of the respondents will end up having entries - for years in which they reported no demand, and that's fine. They can be - filtered later. - """ - if update or self._annualized is None: - # Calculate the total demand per respondent, per year: - report_dates = self.pudl_out.demand_hourly_pa_ferc714().report_date.unique() - self._annualized = ( - self.pudl_out.respondent_id_ferc714() - .pipe(add_dates, report_dates) - .pipe(apply_pudl_dtypes) - ) - return self._annualized - - def categorize(self, update=False): - """Annualized respondents with ``respondent_type`` assigned if possible. - - Categorize each respondent as either a ``utility`` or a ``balancing_authority`` - using the parameters stored in the instance of the class. While categorization - can also be done without annualizing, this function annualizes as well, since we - are adding the ``respondent_type`` in order to be able to compile service - territories for the respondent, which vary annually. - """ - if update or self._categorized is None: - rids_ferc714 = self.pudl_out.respondent_id_ferc714() - logger.info("Categorizing EIA codes associated with FERC-714 Respondents.") - categorized = categorize_eia_code( - rids_ferc714.eia_code.dropna().unique(), - ba_ids=self.ba_ids, - util_ids=self.util_ids, - priority=self.priority, - ) - logger.info( - "Merging categorized EIA codes with annualized FERC-714 Respondent " - "data." - ) - categorized = pd.merge( - categorized, self.annualize(update=update), how="right" - ) - # Names, ids, and codes for BAs identified as FERC 714 respondents - # NOTE: this is not *strictly* correct, because the EIA BAs are not - # eternal and unchanging. There's at least one case in which the BA - # associated with a given ID had a code and name change between years - # after it changed hands. However, not merging on report_date in - # addition to the balancing_authority_id_eia / eia_code fields ensures - # that all years are populated for all BAs, which keeps them analogous - # to the Utiliies in structure. Sooo.... it's fine for now. - logger.info("Selecting FERC-714 Balancing Authority respondents.") - ba_respondents = categorized.query("respondent_type=='balancing_authority'") - logger.info( - "Merging FERC-714 Balancing Authority respondents with BA id/code/name " - "information from EIA-861." - ) - ba_respondents = pd.merge( - ba_respondents, - self.balancing_authority_eia861[ - [ - "balancing_authority_id_eia", - "balancing_authority_code_eia", - "balancing_authority_name_eia", - ] - ].drop_duplicates( - subset=[ - "balancing_authority_id_eia", + .drop( + columns=[ + "utility_id_eia_x", + "state_x", + "balancing_authority_id_eia_y", ] - ), - how="left", - left_on="eia_code", - right_on="balancing_authority_id_eia", - ) - logger.info("Selecting names and IDs for FERC-714 Utility respondents.") - util_respondents = categorized.query("respondent_type=='utility'") - logger.info("Merging FERC-714 Utility respondents with service territory.") - util_respondents = pd.merge( - util_respondents, - pudl.analysis.service_territory.get_all_utils(self.pudl_out), - how="left", - left_on="eia_code", - right_on="utility_id_eia", - ) - logger.info("Concatenating categorized FERC-714 respondents.") - self._categorized = pd.concat( - [ - ba_respondents, - util_respondents, - # Uncategorized respondents w/ no respondent_type: - categorized[categorized.respondent_type.isnull()], - ] - ) - self._categorized = apply_pudl_dtypes(self._categorized) - return self._categorized - - def summarize_demand(self, update=False): - """Compile annualized, categorized respondents and summarize values. - - Calculated summary values include: - * Total reported electricity demand per respondent (``demand_annual_mwh``) - * Reported per-capita electrcity demand (``demand_annual_per_capita_mwh``) - * Population density (``population_density_km2``) - * Demand density (``demand_density_mwh_km2``) - - These metrics are helpful identifying suspicious changes in the compiled annual - geometries for the planning areas. - """ - if update or self._demand_summary is None: - demand_annual = ( - pd.merge( - self.annualize(update=update), - self.pudl_out.demand_hourly_pa_ferc714().loc[ - :, ["report_date", "respondent_id_ferc714", "demand_mwh"] - ], - how="left", ) - .groupby(["report_date", "respondent_id_ferc714"]) - .agg({"demand_mwh": sum}) - .rename(columns={"demand_mwh": "demand_annual_mwh"}) - .reset_index() - .merge( - self.georef_counties(update=update) - .groupby(["report_date", "respondent_id_ferc714"]) - .agg({"population": sum, "area_km2": sum}) - .reset_index() + .rename( + columns={ + "balancing_authority_id_eia_x": "balancing_authority_id_eia", + "utility_id_eia_y": "utility_id_eia", + "state_y": "state", + } ) - .assign( - population_density_km2=lambda x: x.population / x.area_km2, - demand_annual_per_capita_mwh=lambda x: x.demand_annual_mwh - / x.population, - demand_density_mwh_km2=lambda x: x.demand_annual_mwh / x.area_km2, - ) - ) - # Merge respondent categorizations into the annual demand - self._demand_summary = pd.merge( - demand_annual, self.categorize(update=update), how="left" - ).pipe(apply_pudl_dtypes) - return self._demand_summary - - def fipsify(self, update=False): - """Annual respondents with the county FIPS IDs for their service territories. - - Given the ``respondent_type`` associated with each respondent (either - ``utility`` or ``balancing_authority``) compile a list of counties that are part - of their service territory on an annual basis, and merge those into the - annualized respondent table. This results in a very long dataframe, since there - are thousands of counties and many of them are served by more than one entity. - - Currently respondents categorized as ``utility`` will include any county that - appears in the ``service_territory_eia861`` table in association with that - utility ID in each year, while for ``balancing_authority`` respondents, some - counties can be excluded based on state (if ``self.limit_by_state==True``). - """ - if update or self._fipsified is None: - categorized = self.categorize(update=update) - # Generate the BA:FIPS relation: - ba_counties = pd.merge( - categorized.query("respondent_type=='balancing_authority'"), - pudl.analysis.service_territory.get_territory_fips( - ids=categorized.balancing_authority_id_eia.unique(), - assn=self.balancing_authority_assn_eia861, - assn_col="balancing_authority_id_eia", - st_eia861=self.service_territory_eia861, - limit_by_state=self.limit_by_state, - ), - on=["report_date", "balancing_authority_id_eia"], - how="left", - ) - # Generate the Util:FIPS relation: - util_counties = pd.merge( - categorized.query("respondent_type=='utility'"), - pudl.analysis.service_territory.get_territory_fips( - ids=categorized.utility_id_eia.unique(), - assn=self.pudl_out.utility_assn_eia861(), - assn_col="utility_id_eia", - st_eia861=self.service_territory_eia861, - limit_by_state=self.limit_by_state, - ), - on=["report_date", "utility_id_eia"], - how="left", ) - self._fipsified = pd.concat( - [ - ba_counties, - util_counties, - categorized[categorized.respondent_type.isnull()], - ] - ).pipe(apply_pudl_dtypes) - return self._fipsified - - def georef_counties(self, update=False): - """Annual respondents with all associated county-level geometries. - - Given the county FIPS codes associated with each respondent in each year, pull - in associated geometries from the US Census DP1 dataset, so we can do spatial - analyses. This keeps each county record independent -- so there will be many - records for each respondent in each year. This is fast, and still good for - mapping, and retains all of the FIPS IDs so you can also still do ID based - analyses. - """ - if update or self._counties_gdf is None: - census_counties = pudl.output.censusdp1tract.get_layer( - layer="county", pudl_settings=self.pudl_settings, ds=self.ds - ) - self._counties_gdf = pudl.analysis.service_territory.add_geometries( - self.fipsify(update=update), census_gdf=census_counties - ).pipe(apply_pudl_dtypes) - return self._counties_gdf - - def georef_respondents(self, update=False): - """Annual respondents with a single all-encompassing geometry for each year. - - Given the county FIPS codes associated with each responent in each year, compile - a geometry for the respondent's entire service territory annually. This results - in just a single record per respondent per year, but is computationally - expensive and you lose the information about what all counties are associated - with the respondent in that year. But it's useful for merging in other annual - data like total demand, so you can see which respondent-years have both reported - demand and decent geometries, calculate their areas to see if something changed - from year to year, etc. - """ - if update or self._respondents_gdf is None: - census_counties = pudl.output.censusdp1tract.get_layer( - layer="county", - pudl_settings=self.pudl_settings, - ) - self._respondents_gdf = ( - pudl.analysis.service_territory.add_geometries( - self.fipsify(update=update), - census_gdf=census_counties, - dissolve=True, - dissolve_by=["report_date", "respondent_id_ferc714"], - ) - .merge( - self.summarize_demand(update=update)[ - ["report_date", "respondent_id_ferc714", "demand_annual_mwh"] - ] - ) - .pipe(apply_pudl_dtypes) + tables.append(table) + if "replace" in util and util["replace"]: + mask |= is_child + return pd.concat([df[~mask], pd.concat(tables)]).drop_duplicates() + + +def filled_service_territory_eia861( + balancing_authority_assn_eia861: pd.DataFrame, + service_territory_eia861: pd.DataFrame, +) -> pd.DataFrame: + """Modified service_territory_eia861 table. + + This function adds rows for each balancing authority-year pair missing from the + cleaned service_territory_eia861 table, using a dictionary of manual fixes. It also + drops utility-state combinations which are missing counties across all years of + data, fills records missing counties with the nearest year of county data for the + same utility and state. + """ + index = ["utility_id_eia", "state", "report_date"] + # Select relevant balancing authority-utility associations + assn = filled_balancing_authority_assn_eia861(balancing_authority_assn_eia861) + selected = np.zeros(assn.shape[0], dtype=bool) + for fix in ASSOCIATIONS: + years = [fix["from"], *range(fix["to"][0], fix["to"][1] + 1)] + dates = [pd.Timestamp(year, 1, 1) for year in years] + mask = assn["balancing_authority_id_eia"].eq(fix["id"]).to_numpy(bool) + mask[mask] = assn["report_date"][mask].isin(dates) + selected |= mask + # Reformat as unique utility-state-year + assn = assn[selected][index].drop_duplicates() + # Select relevant service territories + df = service_territory_eia861 + mdf = assn.merge(df, how="left") + # Drop utility-state with no counties for all years + grouped = mdf.groupby(["utility_id_eia", "state"])["county_id_fips"] + mdf = mdf[grouped.transform("count").gt(0)] + # Fill missing utility-state-year with nearest year with counties + grouped = mdf.groupby(index)["county_id_fips"] + missing = mdf[grouped.transform("count").eq(0)].to_dict("records") + has_county = mdf["county_id_fips"].notna() + tables = [] + for row in missing: + mask = ( + mdf["utility_id_eia"].eq(row["utility_id_eia"]) + & mdf["state"].eq(row["state"]) + & has_county + ) + years = mdf["report_date"][mask].drop_duplicates() + # Match to nearest year + idx = (years - row["report_date"]).abs().idxmin() + mask &= mdf["report_date"].eq(years[idx]) + tables.append(mdf[mask].assign(report_date=row["report_date"])) + return pd.concat([df] + tables) + + +@asset(compute_kind="Python") +def annualized_respondents_ferc714( + demand_hourly_pa_ferc714: pd.DataFrame, respondent_id_ferc714: pd.DataFrame +) -> pd.DataFrame: + """Broadcast respondent data across all years with reported demand. + + The FERC 714 Respondent IDs and names are reported in their own table, without any + refence to individual years, but much of the information we are associating with + them varies annually. This method creates an annualized version of the respondent + table, with each respondent having an entry corresponding to every year in which + hourly demand was reported in the FERC 714 dataset as a whole -- this necessarily + means that many of the respondents will end up having entries for years in which + they reported no demand, and that's fine. They can be filtered later. + """ + # Calculate the total demand per respondent, per year: + report_dates = [ + time for time in demand_hourly_pa_ferc714.report_date.unique() if pd.notna(time) + ] + annualized_respondents_ferc714 = respondent_id_ferc714.pipe( + add_dates, report_dates + ).pipe(apply_pudl_dtypes) + return annualized_respondents_ferc714 + + +@asset( + config_schema={ + "priority": Field( + str, + default_value="balancing_authority", + description=( + "Which type of entity should take priority in the categorization of " + "FERC 714 respondents. Must be either ``utility`` or " + "``balancing_authority.`` The default is ``balancing_authority``." + ), + ), + }, + compute_kind="Python", +) +def categorized_respondents_ferc714( + context, + respondent_id_ferc714: pd.DataFrame, + denorm_utilities_eia: pd.DataFrame, + service_territory_eia861: pd.DataFrame, + balancing_authority_eia861: pd.DataFrame, + annualized_respondents_ferc714: pd.DataFrame, +) -> pd.DataFrame: + """Annualized respondents with ``respondent_type`` assigned if possible. + + Categorize each respondent as either a ``utility`` or a ``balancing_authority`` + using the parameters stored in the instance of the class. While categorization + can also be done without annualizing, this function annualizes as well, since we + are adding the ``respondent_type`` in order to be able to compile service + territories for the respondent, which vary annually. + """ + priority = context.op_config["priority"] + + logger.info("Categorizing EIA codes associated with FERC-714 Respondents.") + + bal_auth = filled_balancing_authority_eia861(balancing_authority_eia861) + utilids_all_eia = utility_ids_all_eia( + denorm_utilities_eia, service_territory_eia861 + ) + + categorized = categorize_eia_code( + respondent_id_ferc714.eia_code.dropna().unique(), + ba_ids=bal_auth.balancing_authority_id_eia.dropna().unique(), + util_ids=utilids_all_eia.utility_id_eia, + priority=priority, + ) + logger.info( + "Merging categorized EIA codes with annualized FERC-714 Respondent " "data." + ) + categorized = pd.merge(categorized, annualized_respondents_ferc714, how="right") + # Names, ids, and codes for BAs identified as FERC 714 respondents + # NOTE: this is not *strictly* correct, because the EIA BAs are not + # eternal and unchanging. There's at least one case in which the BA + # associated with a given ID had a code and name change between years + # after it changed hands. However, not merging on report_date in + # addition to the balancing_authority_id_eia / eia_code fields ensures + # that all years are populated for all BAs, which keeps them analogous + # to the Utiliies in structure. Sooo.... it's fine for now. + logger.info("Selecting FERC-714 Balancing Authority respondents.") + ba_respondents = categorized.query("respondent_type=='balancing_authority'") + logger.info( + "Merging FERC-714 Balancing Authority respondents with BA id/code/name " + "information from EIA-861." + ) + ba_respondents = pd.merge( + ba_respondents, + bal_auth[ + [ + "balancing_authority_id_eia", + "balancing_authority_code_eia", + "balancing_authority_name_eia", + ] + ].drop_duplicates( + subset=[ + "balancing_authority_id_eia", + ] + ), + how="left", + left_on="eia_code", + right_on="balancing_authority_id_eia", + ) + logger.info("Selecting names and IDs for FERC-714 Utility respondents.") + util_respondents = categorized.query("respondent_type=='utility'") + logger.info("Merging FERC-714 Utility respondents with service territory.") + util_respondents = pd.merge( + util_respondents, + utilids_all_eia, + how="left", + left_on="eia_code", + right_on="utility_id_eia", + ) + logger.info("Concatenating categorized FERC-714 respondents.") + categorized = pd.concat( + [ + ba_respondents, + util_respondents, + # Uncategorized respondents w/ no respondent_type: + categorized[categorized.respondent_type.isnull()], + ] + ) + categorized = apply_pudl_dtypes(categorized) + return categorized + + +@asset( + config_schema={ + "limit_by_state": Field( + bool, + default_value=True, + description=( + "Whether to limit respondent service territories to the states where " + "they have documented activity in the EIA 861. Currently this is only " + "implemented for Balancing Authorities." + ), + ), + }, + compute_kind="Python", + io_manager_key="pudl_sqlite_io_manager", +) +def fipsified_respondents_ferc714( + context, + categorized_respondents_ferc714: pd.DataFrame, + balancing_authority_assn_eia861: pd.DataFrame, + service_territory_eia861: pd.DataFrame, + utility_assn_eia861: pd.DataFrame, +) -> pd.DataFrame: + """Annual respondents with the county FIPS IDs for their service territories. + + Given the ``respondent_type`` associated with each respondent (either + ``utility`` or ``balancing_authority``) compile a list of counties that are part + of their service territory on an annual basis, and merge those into the + annualized respondent table. This results in a very long dataframe, since there + are thousands of counties and many of them are served by more than one entity. + + Currently respondents categorized as ``utility`` will include any county that + appears in the ``service_territory_eia861`` table in association with that + utility ID in each year, while for ``balancing_authority`` respondents, some + counties can be excluded based on state (if ``limit_by_state==True``). + """ + # + assn = filled_balancing_authority_assn_eia861(balancing_authority_assn_eia861) + st_eia861 = filled_service_territory_eia861( + balancing_authority_assn_eia861, service_territory_eia861 + ) + + # Generate the BA:FIPS relation: + ba_counties = pd.merge( + categorized_respondents_ferc714.query("respondent_type=='balancing_authority'"), + pudl.analysis.service_territory.get_territory_fips( + ids=categorized_respondents_ferc714.balancing_authority_id_eia.unique(), + assn=assn, + assn_col="balancing_authority_id_eia", + service_territory_eia861=st_eia861, + limit_by_state=context.op_config["limit_by_state"], + ), + on=["report_date", "balancing_authority_id_eia"], + how="left", + ) + # Generate the Util:FIPS relation: + util_counties = pd.merge( + categorized_respondents_ferc714.query("respondent_type=='utility'"), + pudl.analysis.service_territory.get_territory_fips( + ids=categorized_respondents_ferc714.utility_id_eia.unique(), + assn=utility_assn_eia861, + assn_col="utility_id_eia", + service_territory_eia861=st_eia861, + limit_by_state=context.op_config["limit_by_state"], + ), + on=["report_date", "utility_id_eia"], + how="left", + ) + fipsified = pd.concat( + [ + ba_counties, + util_counties, + categorized_respondents_ferc714[ + categorized_respondents_ferc714.respondent_type.isnull() + ], + ] + ).pipe(apply_pudl_dtypes) + return fipsified + + +@asset(compute_kind="Python") +def georeferenced_counties_ferc714( + fipsified_respondents_ferc714: pd.DataFrame, county_censusdp1: gpd.GeoDataFrame +) -> gpd.GeoDataFrame: + """Annual respondents with all associated county-level geometries. + + Given the county FIPS codes associated with each respondent in each year, pull in + associated geometries from the US Census DP1 dataset, so we can do spatial analyses. + This keeps each county record independent -- so there will be many records for each + respondent in each year. This is fast, and still good for mapping, and retains all + of the FIPS IDs so you can also still do ID based analyses. + """ + counties_gdf = pudl.analysis.service_territory.add_geometries( + fipsified_respondents_ferc714, census_gdf=county_censusdp1 + ).pipe(apply_pudl_dtypes) + return counties_gdf + + +@asset(compute_kind="Python") +def georeferenced_respondents_ferc714( + fipsified_respondents_ferc714: pd.DataFrame, + summarized_demand_ferc714: pd.DataFrame, + county_censusdp1: gpd.GeoDataFrame, +) -> gpd.GeoDataFrame: + """Annual respondents with a single all-encompassing geometry for each year. + + Given the county FIPS codes associated with each responent in each year, compile a + geometry for the respondent's entire service territory annually. This results in + just a single record per respondent per year, but is computationally expensive and + you lose the information about what all counties are associated with the respondent + in that year. But it's useful for merging in other annual data like total demand, so + you can see which respondent-years have both reported demand and decent geometries, + calculate their areas to see if something changed from year to year, etc. + """ + respondents_gdf = ( + pudl.analysis.service_territory.add_geometries( + fipsified_respondents_ferc714, + census_gdf=county_censusdp1, + dissolve=True, + dissolve_by=["report_date", "respondent_id_ferc714"], + ) + .merge( + summarized_demand_ferc714[ + ["report_date", "respondent_id_ferc714", "demand_annual_mwh"] + ] + ) + .pipe(apply_pudl_dtypes) + ) + return respondents_gdf + + +@asset(compute_kind="Python", io_manager_key="pudl_sqlite_io_manager") +def summarized_demand_ferc714( + annualized_respondents_ferc714: pd.DataFrame, + demand_hourly_pa_ferc714: pd.DataFrame, + fipsified_respondents_ferc714: pd.DataFrame, + categorized_respondents_ferc714: pd.DataFrame, + georeferenced_counties_ferc714: gpd.GeoDataFrame, +) -> pd.DataFrame: + """Compile annualized, categorized respondents and summarize values. + + Calculated summary values include: + * Total reported electricity demand per respondent (``demand_annual_mwh``) + * Reported per-capita electrcity demand (``demand_annual_per_capita_mwh``) + * Population density (``population_density_km2``) + * Demand density (``demand_density_mwh_km2``) + + These metrics are helpful identifying suspicious changes in the compiled annual + geometries for the planning areas. + """ + demand_annual = ( + pd.merge( + annualized_respondents_ferc714, + demand_hourly_pa_ferc714.loc[ + :, ["report_date", "respondent_id_ferc714", "demand_mwh"] + ], + how="left", + ) + .groupby(["report_date", "respondent_id_ferc714"]) + .agg({"demand_mwh": sum}) + .rename(columns={"demand_mwh": "demand_annual_mwh"}) + .reset_index() + .merge( + georeferenced_counties_ferc714.groupby( + ["report_date", "respondent_id_ferc714"] ) - return self._respondents_gdf + .agg({"population": sum, "area_km2": sum}) + .reset_index() + ) + .assign( + population_density_km2=lambda x: x.population / x.area_km2, + demand_annual_per_capita_mwh=lambda x: x.demand_annual_mwh / x.population, + demand_density_mwh_km2=lambda x: x.demand_annual_mwh / x.area_km2, + ) + ) + # Merge respondent categorizations into the annual demand + demand_summary = pd.merge( + demand_annual, categorized_respondents_ferc714, how="left" + ).pipe(apply_pudl_dtypes) + return demand_summary diff --git a/src/pudl/output/pudltabl.py b/src/pudl/output/pudltabl.py index f82e766855..a9405f1a32 100644 --- a/src/pudl/output/pudltabl.py +++ b/src/pudl/output/pudltabl.py @@ -216,6 +216,13 @@ def _register_output_methods(self): # ferc714 "respondent_id_ferc714": "respondent_id_ferc714", "demand_hourly_pa_ferc714": "demand_hourly_pa_ferc714", + "fipsified_respondents_ferc714": "fipsified_respondents_ferc714", + "summarized_demand_ferc714": "summarized_demand_ferc714", + # service territory + "compiled_geometry_balancing_authority_eia861": "compiled_geometry_balancing_authority_eia861", + "compiled_geometry_utility_eia861": "compiled_geometry_utility_eia861", + # state demand + "predicted_state_hourly_demand": "predicted_state_hourly_demand", } for table_name, method_name in table_method_map.items(): diff --git a/test/integration/output_test.py b/test/integration/output_test.py index d63ae957d2..73539e4de3 100644 --- a/test/integration/output_test.py +++ b/test/integration/output_test.py @@ -1,9 +1,6 @@ """PyTest cases related to the integration between FERC1 & EIA 860/923.""" import logging -import os -import sys -import geopandas as gpd import pandas as pd import pytest @@ -162,49 +159,21 @@ def test_outputs_by_table_suffix(fast_out, table_suffix): raise ValueError(f"Found null column: {table}.{col}") -@pytest.fixture(scope="module") -def ferc714_out(fast_out, pudl_settings_fixture, pudl_datastore_fixture): - """A FERC 714 Respondents output object for use in CI.""" - return pudl.output.ferc714.Respondents( - fast_out, pudl_settings=pudl_settings_fixture, ds=pudl_datastore_fixture - ) - - @pytest.mark.parametrize( "df_name", [ - "annualize", - "categorize", - "summarize_demand", - "fipsify", + "summarized_demand_ferc714", + "fipsified_respondents_ferc714", ], ) -def test_ferc714_outputs(ferc714_out, df_name): +def test_ferc714_outputs(pudl_engine, df_name): """Test FERC 714 derived output methods.""" - logger.info(f"Running ferc714_out.{df_name}()") - df = ferc714_out.__getattribute__(df_name)() + df = pd.read_sql(df_name, pudl_engine) assert isinstance(df, pd.DataFrame), f"{df_name} is {type(df)} not DataFrame!" logger.info(f"Found {len(df)} rows in {df_name}") assert not df.empty, f"{df_name} is empty!" -@pytest.mark.xfail( - (sys.platform != "linux") & (not os.environ.get("CONDA_PREFIX", False)), - reason="Test relies on ogr2ogr being installed via GDAL.", -) -def test_ferc714_respondents_georef_counties(ferc714_out): - """Test FERC 714 respondent county FIPS associations. - - This test works with the Census DP1 data, which is converted into SQLite using the - GDAL command line tool ogr2ogr. That tools is easy to install via conda or on Linux, - but is more challenging on Windows and MacOS, so this test is marked xfail - conditionally if the user is neither using conda, nor is on Linux. - """ - ferc714_gdf = ferc714_out.georef_counties() - assert isinstance(ferc714_gdf, gpd.GeoDataFrame), "ferc714_gdf not a GeoDataFrame!" - assert not ferc714_gdf.empty, "ferc714_gdf is empty!" - - @pytest.fixture(scope="module") def fast_out_filled(pudl_engine): """A PUDL output object for use in CI with net generation filled.""" @@ -235,3 +204,32 @@ def test_mcoe_filled(fast_out_filled, df_name, expected_nuke_fraction, tolerance fast_out_filled.__getattribute__(df_name)() ) assert abs(actual_nuke_fraction - expected_nuke_fraction) <= tolerance + + +@pytest.mark.parametrize( + "df_name", + [ + "compiled_geometry_balancing_authority_eia861", + "compiled_geometry_utility_eia861", + ], +) +def test_service_territory_outputs(pudl_engine, df_name): + """Test FERC 714 derived output methods.""" + df = pd.read_sql(df_name, pudl_engine) + assert isinstance(df, pd.DataFrame), f"{df_name} is {type(df)} not DataFrame!" + logger.info(f"Found {len(df)} rows in {df_name}") + assert not df.empty, f"{df_name} is empty!" + + +@pytest.mark.parametrize( + "df_name", + [ + "predicted_state_hourly_demand", + ], +) +def test_state_demand_outputs(pudl_engine, df_name): + """Test state demand analysis methods.""" + df = pd.read_sql(df_name, pudl_engine) + assert isinstance(df, pd.DataFrame), f"{df_name} is {type(df)} not DataFrame!" + logger.info(f"Found {len(df)} rows in {df_name}") + assert not df.empty, f"{df_name} is empty!" diff --git a/test/validate/service_territory_test.py b/test/validate/service_territory_test.py new file mode 100644 index 0000000000..65ae7bab57 --- /dev/null +++ b/test/validate/service_territory_test.py @@ -0,0 +1,48 @@ +"""Validate post-ETL FERC 714 outputs and associated service territory analyses.""" +import logging + +import pytest + +import pudl +from pudl import validate as pv + +logger = logging.getLogger(__name__) + + +@pytest.mark.parametrize( + "df_name,expected_rows", + [ + ("summarized_demand_ferc714", 3_195), + ("fipsified_respondents_ferc714", 135_627), + ("compiled_geometry_balancing_authority_eia861", 108_436), + ("compiled_geometry_utility_eia861", 237_872), + ], +) +def test_minmax_rows( + pudl_out_orig: "pudl.output.pudltabl.PudlTabl", + live_dbs: bool, + expected_rows: int, + df_name: str, +): + """Verify that output DataFrames don't have too many or too few rows. + + Args: + pudl_out_orig: A PudlTabl output object. + live_dbs: Whether we're using a live or testing DB. + expected_rows: Expected number of rows that the dataframe should + contain when all data is loaded and is output without aggregation. + df_name: Shorthand name identifying the dataframe, corresponding + to the name of the function used to pull it from the PudlTabl + output object. + """ + if not live_dbs: + pytest.skip("Data validation only works with a live PUDL DB.") + _ = ( + pudl_out_orig.__getattribute__(df_name)() + .pipe( + pv.check_min_rows, expected_rows=expected_rows, margin=0.0, df_name=df_name + ) + .pipe( + pv.check_max_rows, expected_rows=expected_rows, margin=0.0, df_name=df_name + ) + ) diff --git a/test/validate/state_demand_test.py b/test/validate/state_demand_test.py new file mode 100644 index 0000000000..692b003c98 --- /dev/null +++ b/test/validate/state_demand_test.py @@ -0,0 +1,37 @@ +"""Validate post-ETL state demand analysis output.""" +import logging + +import pytest + +from pudl import validate as pv + +logger = logging.getLogger(__name__) + + +@pytest.mark.parametrize( + "df_name,expected_rows", + [("predicted_state_hourly_demand", 6_706_318)], +) +def test_minmax_rows(pudl_out_orig, live_dbs, expected_rows, df_name): + """Verify that output DataFrames don't have too many or too few rows. + + Args: + pudl_out_orig: A PudlTabl output object. + live_dbs: Boolean (wether we're using a live or testing DB). + expected_rows (int): Expected number of rows that the dataframe should + contain when all data is loaded and is output without aggregation. + df_name (str): Shorthand name identifying the dataframe, corresponding + to the name of the function used to pull it from the PudlTabl + output object. + """ + if not live_dbs: + pytest.skip("Data validation only works with a live PUDL DB.") + _ = ( + pudl_out_orig.__getattribute__(df_name)() + .pipe( + pv.check_min_rows, expected_rows=expected_rows, margin=0.0, df_name=df_name + ) + .pipe( + pv.check_max_rows, expected_rows=expected_rows, margin=0.0, df_name=df_name + ) + )