diff --git a/docs/release_notes.rst b/docs/release_notes.rst index 7682375f85..4253516f8f 100644 --- a/docs/release_notes.rst +++ b/docs/release_notes.rst @@ -32,12 +32,13 @@ Dagster Adoption returns the resources ids for a given etl group. * :mod:`pudl.settings.FercToSqliteSettings` class now loads all FERC datasources if no datasets are specified. + * The Excel extractor in ``pudl.extract.excel`` has been updated to parallelize + Excel spreadsheet extraction using Dagster ``@multi_asset`` functionality, thanks to + :user:`dstansby`. This is currently being used for EIA 860, 861 and 923 data. See + :issue:`2385` and PRs :pr:`2644`, :pr:`2943`. * EIA ETL changes: - * EIA extract methods are now ``@multi_asset`` that return an asset for each - raw table. 860 and 923 are separate ``@multi_asset`` which allows this data - to be extracted in parallel. * The EIA table level cleaning functions are now dagster assets. The table level cleaning assets now have a "clean\_" prefix and a "_{datasource}" suffix to distinguish them from the final harvested tables. diff --git a/src/pudl/extract/eia860.py b/src/pudl/extract/eia860.py index 18a37345a6..8929ac73e3 100644 --- a/src/pudl/extract/eia860.py +++ b/src/pudl/extract/eia860.py @@ -4,18 +4,8 @@ This code is for use analyzing EIA Form 860 data. """ -from collections import defaultdict - import pandas as pd -from dagster import ( - AssetOut, - DynamicOut, - DynamicOutput, - Output, - graph_asset, - multi_asset, - op, -) +from dagster import AssetOut, Output, multi_asset import pudl import pudl.logging_helpers @@ -95,65 +85,7 @@ def get_dtypes(page, **partition): ) -@op( - out=DynamicOut(), - required_resource_keys={"dataset_settings"}, -) -def eia_years_from_settings(context): - """Return set of years for EIA in settings. - - These will be used to kick off worker processes to load each year of data in - parallel. - """ - eia_settings = context.resources.dataset_settings.eia - for year in eia_settings.eia860.years: - yield DynamicOutput(year, mapping_key=str(year)) - - -@op( - required_resource_keys={"datastore", "dataset_settings"}, -) -def load_single_year(context, year: int) -> dict[str, pd.DataFrame]: - """Load a single year of EIA data from file. - - Args: - context: - context: dagster keyword that provides access to resources and config. - year: - Year to load. - - Returns: - Loaded data in a dataframe. - """ - ds = context.resources.datastore - return Extractor(ds).extract(year=[year]) - - -@op -def merge_eia860_years( - yearly_dfs: list[dict[str, pd.DataFrame]] -) -> dict[str, pd.DataFrame]: - """Merge yearly EIA-860 dataframes.""" - merged = defaultdict(list) - for dfs in yearly_dfs: - for page in dfs: - merged[page].append(dfs[page]) - - for page in merged: - merged[page] = pd.concat(merged[page]) - - return merged - - -@graph_asset -def eia860_raw_dfs() -> dict[str, pd.DataFrame]: - """All loaded EIA860 dataframes. - - This asset creates a dynamic graph of ops to load EIA860 data in parallel. - """ - years = eia_years_from_settings() - dfs = years.map(lambda year: load_single_year(year)) - return merge_eia860_years(dfs.collect()) +eia860_raw_dfs = excel.raw_df_factory(Extractor, name="eia860") # TODO (bendnorman): Figure out type hint for context keyword and mutli_asset return diff --git a/src/pudl/extract/eia861.py b/src/pudl/extract/eia861.py index ad6a08b017..cc4ea32655 100644 --- a/src/pudl/extract/eia861.py +++ b/src/pudl/extract/eia861.py @@ -69,6 +69,9 @@ def get_dtypes(page, **partition): } +eia861_raw_dfs = excel.raw_df_factory(Extractor, name="eia861") + + @multi_asset( outs={ table_name: AssetOut() @@ -99,7 +102,7 @@ def get_dtypes(page, **partition): }, required_resource_keys={"datastore", "dataset_settings"}, ) -def extract_eia861(context): +def extract_eia861(context, eia861_raw_dfs): """Extract raw EIA-861 data from Excel sheets into dataframes. Args: @@ -108,10 +111,6 @@ def extract_eia861(context): Returns: A tuple of extracted EIA-861 dataframes. """ - eia_settings = context.resources.dataset_settings.eia - ds = context.resources.datastore - eia861_raw_dfs = Extractor(ds).extract(year=eia_settings.eia861.years) - eia861_raw_dfs = { "raw_eia861__" + table_name.replace("_eia861", ""): df for table_name, df in eia861_raw_dfs.items() diff --git a/src/pudl/extract/eia923.py b/src/pudl/extract/eia923.py index a3f6b64aeb..680be141de 100644 --- a/src/pudl/extract/eia923.py +++ b/src/pudl/extract/eia923.py @@ -108,13 +108,16 @@ def get_dtypes(page, **partition): ) +eia923_raw_dfs = excel.raw_df_factory(Extractor, name="eia923") + + # TODO (bendnorman): Figure out type hint for context keyword and mutli_asset return @multi_asset( outs={table_name: AssetOut() for table_name in sorted(eia_raw_table_names)}, required_resource_keys={"datastore", "dataset_settings"}, ) -def extract_eia923(context): - """Extract raw EIA data from excel sheets into dataframes. +def extract_eia923(context, eia923_raw_dfs): + """Extract raw EIA-923 data from excel sheets into dataframes. Args: context: dagster keyword that provides access to resources and config. @@ -122,11 +125,6 @@ def extract_eia923(context): Returns: A tuple of extracted EIA dataframes. """ - eia_settings = context.resources.dataset_settings.eia - - ds = context.resources.datastore - eia923_raw_dfs = Extractor(ds).extract(year=eia_settings.eia923.years) - # create descriptive table_names eia923_raw_dfs = { "raw_eia923__" + table_name: df for table_name, df in eia923_raw_dfs.items() diff --git a/src/pudl/extract/excel.py b/src/pudl/extract/excel.py index 2f4a324745..e308135307 100644 --- a/src/pudl/extract/excel.py +++ b/src/pudl/extract/excel.py @@ -1,9 +1,18 @@ """Load excel metadata CSV files form a python data package.""" import importlib.resources import pathlib +from collections import defaultdict import dbfread import pandas as pd +from dagster import ( + AssetsDefinition, + DynamicOut, + DynamicOutput, + OpDefinition, + graph_asset, + op, +) import pudl @@ -345,12 +354,137 @@ def excel_filename(self, page, **partition): """Produce the xlsx document file name as it will appear in the archive. Args: - page: pudl name for the dataset contents, eg - "boiler_generator_assn" or "coal_stocks" - partition: partition to load. (ex: 2009 for year partition or - "2020-08" for year_month partition) + page: pudl name for the dataset contents, eg "boiler_generator_assn" or + "coal_stocks" + partition: partition to load. (ex: 2009 for year partition or "2020-08" for + year_month partition) - Return: + Returns: string name of the xlsx file """ return self.METADATA.get_file_name(page, **partition) + + +@op +def concat_pages(paged_dfs: list[dict[str, pd.DataFrame]]) -> dict[str, pd.DataFrame]: + """Concatenate similar pages of data from different years into single dataframes. + + Transform a list of dictionaries of dataframes into a single dictionary of + dataframes, where each dataframe is the concatenation of dataframes with identical + keys from the input list. + + Args: + paged_dfs: A list of dictionaries whose keys are page names, and values are + extracted DataFrames. Each element of the list corresponds to a single + year of the dataset being extracted. + + Returns: + A dictionary of DataFrames keyed by page name, where the DataFrame contains that + page's data from all extracted years concatenated together. + """ + # Transform the list of dictionaries of dataframes into a dictionary of lists of + # dataframes, in which all dataframes in each list represent different instances of + # the same page of data from different years + all_data = defaultdict(list) + for dfs in paged_dfs: + for page in dfs: + all_data[page].append(dfs[page]) + + # concatenate the dataframes in each list in the dictionary into a single dataframe + for page in all_data: + all_data[page] = pd.concat(all_data[page]).reset_index(drop=True) + + return all_data + + +def year_extractor_factory( + extractor_cls: type[GenericExtractor], name: str +) -> OpDefinition: + """Construct a Dagster op that extracts one year of data, given an extractor class. + + Args: + extractor_cls: Class of type :class:`GenericExtractor` used to extract the data. + name: Name of an Excel based dataset (e.g. "eia860"). + """ + + def extract_single_year(context, year: int) -> dict[str, pd.DataFrame]: + """A function that extracts a year of spreadsheet data from an Excel file. + + This function will be decorated with a Dagster op and returned. + + Args: + context: Dagster keyword that provides access to resources and config. + year: Year of data to extract. + + Returns: + A dictionary of DataFrames extracted from Excel, keyed by page name. + """ + ds = context.resources.datastore + return extractor_cls(ds).extract(year=[year]) + + return op( + required_resource_keys={"datastore", "dataset_settings"}, + name=f"extract_single_{name}_year", + )(extract_single_year) + + +def years_from_settings_factory(name: str) -> OpDefinition: + """Construct a Dagster op to get target years from settings in the Dagster context. + + Args: + name: Name of an Excel based dataset (e.g. "eia860"). Currently this must be + one of the attributes of :class:`pudl.settings.EiaSettings` + + """ + + def years_from_settings(context) -> DynamicOutput: + """Produce target years for the given dataset from the EIA settings object. + + These will be used to kick off worker processes to extract each year of data in + parallel. + + Yields: + A Dagster :class:`DynamicOutput` object representing the year to be + extracted. See the Dagster API documentation for more details: + https://docs.dagster.io/_apidocs/dynamic#dagster.DynamicOut + """ + eia_settings = context.resources.dataset_settings.eia + for year in getattr(eia_settings, name).years: + yield DynamicOutput(year, mapping_key=str(year)) + + return op( + out=DynamicOut(), + required_resource_keys={"dataset_settings"}, + name=f"{name}_years_from_settings", + )(years_from_settings) + + +def raw_df_factory( + extractor_cls: type[GenericExtractor], name: str +) -> AssetsDefinition: + """Return a dagster graph asset to extract a set of raw DataFrames from Excel files. + + Args: + extractor_cls: The dataset-specific Excel extractor used to extract the data. + Needs to correspond to the dataset identified by ``name``. + name: Name of an Excel based dataset (e.g. "eia860"). Currently this must be + one of the attributes of :class:`pudl.settings.EiaSettings` + """ + # Build a Dagster op that can extract a single year of data + year_extractor = year_extractor_factory(extractor_cls, name) + # Get the list of target years to extract from the PUDL ETL settings object which is + # stored in the Dagster context that is available to all ops. + years_from_settings = years_from_settings_factory(name) + + def raw_dfs() -> dict[str, pd.DataFrame]: + """Produce a dictionary of extracted EIA dataframes.""" + years = years_from_settings() + # Clone dagster op for each year using DynamicOut.map() + # See https://docs.dagster.io/_apidocs/dynamic#dagster.DynamicOut + dfs = years.map(lambda year: year_extractor(year)) + # Collect the results from all of those cloned ops and concatenate the + # individual years of data into a single multi-year dataframe for each different + # page in the spreadsheet based dataset using DynamicOut.collect() + return concat_pages(dfs.collect()) + + return graph_asset(name=f"{name}_raw_dfs")(raw_dfs) diff --git a/src/pudl/settings.py b/src/pudl/settings.py index 37210720d0..e196db5fff 100644 --- a/src/pudl/settings.py +++ b/src/pudl/settings.py @@ -244,6 +244,7 @@ class EiaSettings(BaseModel): Args: eia860: Immutable pydantic model to validate eia860 settings. + eia861: Immutable pydantic model to validate eia861 settings. eia923: Immutable pydantic model to validate eia923 settings. """ diff --git a/test/unit/extract/excel_test.py b/test/unit/extract/excel_test.py index acc2d77037..0e9ac3eea6 100644 --- a/test/unit/extract/excel_test.py +++ b/test/unit/extract/excel_test.py @@ -4,8 +4,11 @@ from unittest.mock import patch import pandas as pd +import pytest +from dagster import build_op_context from pudl.extract import excel as excel +from pudl.settings import DatasetsSettings class TestMetadata(unittest.TestCase): @@ -115,3 +118,39 @@ def test_read_excel_calls(mock_read_excel): # TODO(rousik@gmail.com): need to figure out how to test process_$x methods. # TODO(rousik@gmail.com): we should test that empty columns are properly added. + + +@pytest.mark.parametrize( + "dataset, expected_years", + ( + ("eia860", set(range(2001, 2022))), + ("eia861", set(range(2001, 2022))), + ("eia923", set(range(2001, 2022))), + ), +) +def test_years_from_settings(dataset, expected_years): + years_from_settings = excel.years_from_settings_factory(dataset) + + with build_op_context( + resources={"dataset_settings": DatasetsSettings()} + ) as context: + # Assert actual years are a superset of expected. Instead of doing + # an equality check, this avoids having to update expected years + # every time a new year is added to the datasets + assert { + output.value for output in years_from_settings(context) + } >= expected_years + + +def test_concat_pages(): + pages = ["page1", "page2", "page3"] + dfs_1 = {page: pd.DataFrame({"df": [1], "page": [page]}) for page in pages} + dfs_2 = {page: pd.DataFrame({"df": [2], "page": [page]}) for page in pages} + + merged_dfs = excel.concat_pages([dfs_1, dfs_2]) + assert list(merged_dfs.keys()) == pages + for page in pages: + pd.testing.assert_frame_equal( + merged_dfs[page], + pd.DataFrame({"df": [1, 2], "page": [page, page]}, index=[0, 1]), + )