From 3840373153b51d1ed3826e048bd486cc741d17e8 Mon Sep 17 00:00:00 2001 From: David Stansby Date: Wed, 14 Jun 2023 20:40:42 +0100 Subject: [PATCH 01/16] Parallel load EIA861 --- src/pudl/etl/__init__.py | 1 + src/pudl/extract/eia860.py | 46 +++++--------------------------------- src/pudl/extract/eia861.py | 45 ++++++++++++++++++++++++++++++++----- src/pudl/extract/excel.py | 42 ++++++++++++++++++++++++++++++++++ 4 files changed, 88 insertions(+), 46 deletions(-) diff --git a/src/pudl/etl/__init__.py b/src/pudl/etl/__init__.py index 0bcc273d0b..9fb646bd6d 100644 --- a/src/pudl/etl/__init__.py +++ b/src/pudl/etl/__init__.py @@ -28,6 +28,7 @@ default_assets = ( *load_assets_from_modules([eia_bulk_elec_assets], group_name="eia_bulk_elec"), *load_assets_from_modules([epacems_assets], group_name="epacems"), + *load_assets_from_modules([pudl.extract.excel], group_name="excel"), *load_assets_from_modules([pudl.extract.eia860], group_name="raw_eia860"), *load_assets_from_modules([pudl.transform.eia860], group_name="clean_eia860"), *load_assets_from_modules([pudl.extract.eia861], group_name="raw_eia861"), diff --git a/src/pudl/extract/eia860.py b/src/pudl/extract/eia860.py index 4e3094e600..2f09785bd6 100644 --- a/src/pudl/extract/eia860.py +++ b/src/pudl/extract/eia860.py @@ -4,8 +4,6 @@ This code is for use analyzing EIA Form 860 data. """ -from collections import defaultdict - import pandas as pd from dagster import ( AssetOut, @@ -99,8 +97,8 @@ def get_dtypes(page, **partition): out=DynamicOut(), required_resource_keys={"dataset_settings"}, ) -def eia_years_from_settings(context): - """Return set of years for EIA in settings. +def eia860_years_from_settings(context): + """Return set of years for EIA-860 in settings. These will be used to kick off worker processes to load each year of data in parallel. @@ -110,39 +108,7 @@ def eia_years_from_settings(context): yield DynamicOutput(year, mapping_key=str(year)) -@op( - required_resource_keys={"datastore", "dataset_settings"}, -) -def load_single_year(context, year: int) -> dict[str, pd.DataFrame]: - """Load a single year of EIA data from file. - - Args: - context: - context: dagster keyword that provides access to resources and config. - year: - Year to load. - - Returns: - Loaded data in a dataframe. - """ - ds = context.resources.datastore - return Extractor(ds).extract(year=[year]) - - -@op -def merge_eia860_years( - yearly_dfs: list[dict[str, pd.DataFrame]] -) -> dict[str, pd.DataFrame]: - """Merge yearly EIA-860 dataframes.""" - merged = defaultdict(list) - for dfs in yearly_dfs: - for page in dfs: - merged[page].append(dfs[page]) - - for page in merged: - merged[page] = pd.concat(merged[page]) - - return merged +load_single_eia860_year = excel.year_loader_factory(Extractor, "eia860") @graph_asset @@ -151,9 +117,9 @@ def eia860_raw_dfs() -> dict[str, pd.DataFrame]: This asset creates a dynamic graph of ops to load EIA860 data in parallel. """ - years = eia_years_from_settings() - dfs = years.map(lambda year: load_single_year(year)) - return merge_eia860_years(dfs.collect()) + years = eia860_years_from_settings() + dfs = years.map(lambda year: load_single_eia860_year(year)) + return excel.merge_yearly_dfs(dfs.collect()) # TODO (bendnorman): Figure out type hint for context keyword and mutli_asset return diff --git a/src/pudl/extract/eia861.py b/src/pudl/extract/eia861.py index 369ce067d2..ca279fe254 100644 --- a/src/pudl/extract/eia861.py +++ b/src/pudl/extract/eia861.py @@ -7,7 +7,15 @@ import warnings import pandas as pd -from dagster import AssetOut, Output, multi_asset +from dagster import ( + AssetOut, + DynamicOut, + DynamicOutput, + Output, + graph_asset, + multi_asset, + op, +) import pudl.logging_helpers from pudl.extract import excel @@ -69,6 +77,35 @@ def get_dtypes(page, **partition): } +@op( + out=DynamicOut(), + required_resource_keys={"dataset_settings"}, +) +def eia861_years_from_settings(context): + """Return set of years for EIA-861 in settings. + + These will be used to kick off worker processes to load each year of data in + parallel. + """ + eia_settings = context.resources.dataset_settings.eia + for year in eia_settings.eia861.years: + yield DynamicOutput(year, mapping_key=str(year)) + + +load_single_eia861_year = excel.year_loader_factory(Extractor, name="eia861") + + +@graph_asset +def eia861_raw_dfs() -> dict[str, pd.DataFrame]: + """All loaded EIA-861 dataframes. + + This asset creates a dynamic graph of ops to load EIA860 data in parallel. + """ + years = eia861_years_from_settings() + dfs = years.map(lambda year: load_single_eia861_year(year)) + return excel.merge_yearly_dfs(dfs.collect()) + + @multi_asset( outs={ table_name: AssetOut() @@ -99,7 +136,7 @@ def get_dtypes(page, **partition): }, required_resource_keys={"datastore", "dataset_settings"}, ) -def extract_eia861(context): +def extract_eia861(context, eia861_raw_dfs): """Extract raw EIA-861 data from Excel sheets into dataframes. Args: @@ -108,10 +145,6 @@ def extract_eia861(context): Returns: A tuple of extracted EIA-861 dataframes. """ - eia_settings = context.resources.dataset_settings.eia - ds = context.resources.datastore - eia861_raw_dfs = Extractor(ds).extract(year=eia_settings.eia861.years) - eia861_raw_dfs = { "raw_" + table_name: df for table_name, df in eia861_raw_dfs.items() } diff --git a/src/pudl/extract/excel.py b/src/pudl/extract/excel.py index ea1a247bef..604dfc2d07 100644 --- a/src/pudl/extract/excel.py +++ b/src/pudl/extract/excel.py @@ -1,9 +1,11 @@ """Load excel metadata CSV files form a python data package.""" import importlib.resources import pathlib +from collections import defaultdict import dbfread import pandas as pd +from dagster import op import pudl @@ -356,3 +358,43 @@ def excel_filename(self, page, **partition): string name of the xlsx file """ return self.METADATA.get_file_name(page, **partition) + + +@op +def merge_yearly_dfs( + yearly_dfs: list[dict[str, pd.DataFrame]] +) -> dict[str, pd.DataFrame]: + """Merge DataFrames loaded with indepented calls to .extract().""" + merged = defaultdict(list) + for dfs in yearly_dfs: + for page in dfs: + merged[page].append(dfs[page]) + + for page in merged: + merged[page] = pd.concat(merged[page]) + + return merged + + +def year_loader_factory(extractor_cls: type[GenericExtractor], name: str): + """Return a dagster op to load a single year using a given extractor class.""" + + def load_single_year(context, year: int) -> dict[str, pd.DataFrame]: + """Load a single year of EIA data from file. + + Args: + context: + context: dagster keyword that provides access to resources and config. + year: + Year to load. + + Returns: + Loaded data in a dataframe. + """ + ds = context.resources.datastore + return extractor_cls(ds).extract(year=[year]) + + return op( + required_resource_keys={"datastore", "dataset_settings"}, + name=f"load_single_{name}_year", + )(load_single_year) From 82cef3b373f3761ed622c374bbc3a2266104b41a Mon Sep 17 00:00:00 2001 From: David Stansby Date: Wed, 14 Jun 2023 20:47:07 +0100 Subject: [PATCH 02/16] De-dupe years from settings --- src/pudl/extract/eia860.py | 19 +------------------ src/pudl/extract/eia861.py | 19 +------------------ src/pudl/extract/excel.py | 22 +++++++++++++++++++++- 3 files changed, 23 insertions(+), 37 deletions(-) diff --git a/src/pudl/extract/eia860.py b/src/pudl/extract/eia860.py index 2f09785bd6..113e74e16c 100644 --- a/src/pudl/extract/eia860.py +++ b/src/pudl/extract/eia860.py @@ -7,12 +7,9 @@ import pandas as pd from dagster import ( AssetOut, - DynamicOut, - DynamicOutput, Output, graph_asset, multi_asset, - op, ) import pudl @@ -93,21 +90,7 @@ def get_dtypes(page, **partition): ) -@op( - out=DynamicOut(), - required_resource_keys={"dataset_settings"}, -) -def eia860_years_from_settings(context): - """Return set of years for EIA-860 in settings. - - These will be used to kick off worker processes to load each year of data in - parallel. - """ - eia_settings = context.resources.dataset_settings.eia - for year in eia_settings.eia860.years: - yield DynamicOutput(year, mapping_key=str(year)) - - +eia860_years_from_settings = excel.years_from_settings_factory("eia860") load_single_eia860_year = excel.year_loader_factory(Extractor, "eia860") diff --git a/src/pudl/extract/eia861.py b/src/pudl/extract/eia861.py index ca279fe254..1b07636c71 100644 --- a/src/pudl/extract/eia861.py +++ b/src/pudl/extract/eia861.py @@ -9,12 +9,9 @@ import pandas as pd from dagster import ( AssetOut, - DynamicOut, - DynamicOutput, Output, graph_asset, multi_asset, - op, ) import pudl.logging_helpers @@ -77,21 +74,7 @@ def get_dtypes(page, **partition): } -@op( - out=DynamicOut(), - required_resource_keys={"dataset_settings"}, -) -def eia861_years_from_settings(context): - """Return set of years for EIA-861 in settings. - - These will be used to kick off worker processes to load each year of data in - parallel. - """ - eia_settings = context.resources.dataset_settings.eia - for year in eia_settings.eia861.years: - yield DynamicOutput(year, mapping_key=str(year)) - - +eia861_years_from_settings = excel.years_from_settings_factory("eia861") load_single_eia861_year = excel.year_loader_factory(Extractor, name="eia861") diff --git a/src/pudl/extract/excel.py b/src/pudl/extract/excel.py index 604dfc2d07..2ac0ce5854 100644 --- a/src/pudl/extract/excel.py +++ b/src/pudl/extract/excel.py @@ -5,7 +5,7 @@ import dbfread import pandas as pd -from dagster import op +from dagster import DynamicOut, DynamicOutput, op import pudl @@ -398,3 +398,23 @@ def load_single_year(context, year: int) -> dict[str, pd.DataFrame]: required_resource_keys={"datastore", "dataset_settings"}, name=f"load_single_{name}_year", )(load_single_year) + + +def years_from_settings_factory(name: str): + """Return a dagster op to get years from settings.""" + + def years_from_settings(context): + """Return set of years for EIA-860 in settings. + + These will be used to kick off worker processes to load each year of data in + parallel. + """ + eia_settings = context.resources.dataset_settings.eia + for year in getattr(eia_settings, name).years: + yield DynamicOutput(year, mapping_key=str(year)) + + return op( + out=DynamicOut(), + required_resource_keys={"dataset_settings"}, + name=f"{name}_years_from_settings", + )(years_from_settings) From 5bced34fb08e86f89eb06a0f198efa4267bf4006 Mon Sep 17 00:00:00 2001 From: David Stansby Date: Wed, 14 Jun 2023 21:06:08 +0100 Subject: [PATCH 03/16] Add a raw_df factory --- src/pudl/extract/eia860.py | 21 ++------------------- src/pudl/extract/eia861.py | 21 ++------------------- src/pudl/extract/excel.py | 22 ++++++++++++++++++++-- 3 files changed, 24 insertions(+), 40 deletions(-) diff --git a/src/pudl/extract/eia860.py b/src/pudl/extract/eia860.py index 113e74e16c..087041eadd 100644 --- a/src/pudl/extract/eia860.py +++ b/src/pudl/extract/eia860.py @@ -5,12 +5,7 @@ This code is for use analyzing EIA Form 860 data. """ import pandas as pd -from dagster import ( - AssetOut, - Output, - graph_asset, - multi_asset, -) +from dagster import AssetOut, Output, multi_asset import pudl import pudl.logging_helpers @@ -90,19 +85,7 @@ def get_dtypes(page, **partition): ) -eia860_years_from_settings = excel.years_from_settings_factory("eia860") -load_single_eia860_year = excel.year_loader_factory(Extractor, "eia860") - - -@graph_asset -def eia860_raw_dfs() -> dict[str, pd.DataFrame]: - """All loaded EIA860 dataframes. - - This asset creates a dynamic graph of ops to load EIA860 data in parallel. - """ - years = eia860_years_from_settings() - dfs = years.map(lambda year: load_single_eia860_year(year)) - return excel.merge_yearly_dfs(dfs.collect()) +eia860_raw_dfs = excel.raw_df_factory(Extractor, name="eia860") # TODO (bendnorman): Figure out type hint for context keyword and mutli_asset return diff --git a/src/pudl/extract/eia861.py b/src/pudl/extract/eia861.py index 1b07636c71..799f1afe9b 100644 --- a/src/pudl/extract/eia861.py +++ b/src/pudl/extract/eia861.py @@ -7,12 +7,7 @@ import warnings import pandas as pd -from dagster import ( - AssetOut, - Output, - graph_asset, - multi_asset, -) +from dagster import AssetOut, Output, multi_asset import pudl.logging_helpers from pudl.extract import excel @@ -74,19 +69,7 @@ def get_dtypes(page, **partition): } -eia861_years_from_settings = excel.years_from_settings_factory("eia861") -load_single_eia861_year = excel.year_loader_factory(Extractor, name="eia861") - - -@graph_asset -def eia861_raw_dfs() -> dict[str, pd.DataFrame]: - """All loaded EIA-861 dataframes. - - This asset creates a dynamic graph of ops to load EIA860 data in parallel. - """ - years = eia861_years_from_settings() - dfs = years.map(lambda year: load_single_eia861_year(year)) - return excel.merge_yearly_dfs(dfs.collect()) +eia861_raw_dfs = excel.raw_df_factory(Extractor, name="eia861") @multi_asset( diff --git a/src/pudl/extract/excel.py b/src/pudl/extract/excel.py index 2ac0ce5854..1aa2aa83aa 100644 --- a/src/pudl/extract/excel.py +++ b/src/pudl/extract/excel.py @@ -5,7 +5,7 @@ import dbfread import pandas as pd -from dagster import DynamicOut, DynamicOutput, op +from dagster import DynamicOut, DynamicOutput, graph_asset, op import pudl @@ -404,7 +404,7 @@ def years_from_settings_factory(name: str): """Return a dagster op to get years from settings.""" def years_from_settings(context): - """Return set of years for EIA-860 in settings. + """Return set of years for EIA data in settings. These will be used to kick off worker processes to load each year of data in parallel. @@ -418,3 +418,21 @@ def years_from_settings(context): required_resource_keys={"dataset_settings"}, name=f"{name}_years_from_settings", )(years_from_settings) + + +def raw_df_factory(extractor_cls: type[GenericExtractor], name: str): + """Return a graph asset to load a set of raw DataFrames from Excel files. + """ + year_loader = year_loader_factory(extractor_cls, name) + years_from_settings = years_from_settings_factory(name) + + def raw_dfs() -> dict[str, pd.DataFrame]: + """All loaded EIA-861 dataframes. + + This asset creates a dynamic graph of ops to load EIA860 data in parallel. + """ + years = years_from_settings() + dfs = years.map(lambda year: year_loader(year)) + return merge_yearly_dfs(dfs.collect()) + + return graph_asset(name=f"{name}_raw_dfs")(raw_dfs) From b048e839b94a41289f38521b030fd618b55981d7 Mon Sep 17 00:00:00 2001 From: David Stansby Date: Thu, 15 Jun 2023 21:31:07 +0100 Subject: [PATCH 04/16] Clean up new docs --- src/pudl/extract/excel.py | 41 +++++++++++++++++++++++++++++---------- 1 file changed, 31 insertions(+), 10 deletions(-) diff --git a/src/pudl/extract/excel.py b/src/pudl/extract/excel.py index 1aa2aa83aa..27ca195083 100644 --- a/src/pudl/extract/excel.py +++ b/src/pudl/extract/excel.py @@ -5,7 +5,14 @@ import dbfread import pandas as pd -from dagster import DynamicOut, DynamicOutput, graph_asset, op +from dagster import ( + AssetsDefinition, + DynamicOut, + DynamicOutput, + OpDefinition, + graph_asset, + op, +) import pudl @@ -376,8 +383,15 @@ def merge_yearly_dfs( return merged -def year_loader_factory(extractor_cls: type[GenericExtractor], name: str): - """Return a dagster op to load a single year using a given extractor class.""" +def year_loader_factory( + extractor_cls: type[GenericExtractor], name: str +) -> OpDefinition: + """Return a dagster op to load a single year using a given extractor class. + + Args: + extractor_cls: Class of type GenericExtractor used to extract the data. + name(str): Name of dataset (e.g. "eia860"). + """ def load_single_year(context, year: int) -> dict[str, pd.DataFrame]: """Load a single year of EIA data from file. @@ -400,8 +414,12 @@ def load_single_year(context, year: int) -> dict[str, pd.DataFrame]: )(load_single_year) -def years_from_settings_factory(name: str): - """Return a dagster op to get years from settings.""" +def years_from_settings_factory(name: str) -> OpDefinition: + """Return a dagster op to get EIA years from settings. + + Args: + name (str): Name of dataset (e.g. "eia860") + """ def years_from_settings(context): """Return set of years for EIA data in settings. @@ -420,17 +438,20 @@ def years_from_settings(context): )(years_from_settings) -def raw_df_factory(extractor_cls: type[GenericExtractor], name: str): +def raw_df_factory( + extractor_cls: type[GenericExtractor], name: str +) -> AssetsDefinition: """Return a graph asset to load a set of raw DataFrames from Excel files. + + Args: + extractor_cls: Class of type GenericExtractor used to extract the data. + name(str): Name of dataset (e.g. "eia860"). """ year_loader = year_loader_factory(extractor_cls, name) years_from_settings = years_from_settings_factory(name) def raw_dfs() -> dict[str, pd.DataFrame]: - """All loaded EIA-861 dataframes. - - This asset creates a dynamic graph of ops to load EIA860 data in parallel. - """ + """All loaded EIA dataframes.""" years = years_from_settings() dfs = years.map(lambda year: year_loader(year)) return merge_yearly_dfs(dfs.collect()) From f060d938743e70b80ab3f7a7b4ed71f1adaa7193 Mon Sep 17 00:00:00 2001 From: David Stansby Date: Thu, 15 Jun 2023 21:32:30 +0100 Subject: [PATCH 05/16] Apply parallel loading to eia923 --- src/pudl/extract/eia923.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/src/pudl/extract/eia923.py b/src/pudl/extract/eia923.py index 09610b048f..0992d4bfad 100644 --- a/src/pudl/extract/eia923.py +++ b/src/pudl/extract/eia923.py @@ -108,13 +108,16 @@ def get_dtypes(page, **partition): ) +eia923_raw_dfs = excel.raw_df_factory(Extractor, name="eia923") + + # TODO (bendnorman): Figure out type hint for context keyword and mutli_asset return @multi_asset( outs={table_name: AssetOut() for table_name in sorted(eia_raw_table_names)}, required_resource_keys={"datastore", "dataset_settings"}, ) -def extract_eia923(context): - """Extract raw EIA data from excel sheets into dataframes. +def extract_eia923(context, eia923_raw_dfs): + """Extract raw EIA-923 data from excel sheets into dataframes. Args: context: dagster keyword that provides access to resources and config. @@ -122,11 +125,6 @@ def extract_eia923(context): Returns: A tuple of extracted EIA dataframes. """ - eia_settings = context.resources.dataset_settings.eia - - ds = context.resources.datastore - eia923_raw_dfs = Extractor(ds).extract(year=eia_settings.eia923.years) - # create descriptive table_names eia923_raw_dfs = { "raw_" + table_name + "_eia923": df for table_name, df in eia923_raw_dfs.items() From 4002f156017a300ca3e58381f2893aac221c170d Mon Sep 17 00:00:00 2001 From: David Stansby Date: Sun, 18 Jun 2023 21:12:13 +0100 Subject: [PATCH 06/16] Add a unit test for years_from_settings --- test/unit/extract/excel_test.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/test/unit/extract/excel_test.py b/test/unit/extract/excel_test.py index acc2d77037..a8516d27ca 100644 --- a/test/unit/extract/excel_test.py +++ b/test/unit/extract/excel_test.py @@ -115,3 +115,25 @@ def test_read_excel_calls(mock_read_excel): # TODO(rousik@gmail.com): need to figure out how to test process_$x methods. # TODO(rousik@gmail.com): we should test that empty columns are properly added. + + +@pytest.mark.parametrize( + "dataset, expected_years", + ( + ("eia860", set(range(2001, 2022))), + ("eia861", set(range(2001, 2022))), + ("eia923", set(range(2001, 2022))), + ), +) +def test_years_from_settings(dataset, expected_years): + years_from_settings = excel.years_from_settings_factory(dataset) + + with build_op_context( + resources={"dataset_settings": DatasetsSettings()} + ) as context: + # Assert actual years are a superset of expected. Instead of doing + # an equality check, this avoids having to update expected years + # every time a new year is added to the datasets + assert { + output.value for output in years_from_settings(context) + } >= expected_years From c04e98f1c0ca6ef89f2e941ba98f47136e927f61 Mon Sep 17 00:00:00 2001 From: David Stansby Date: Sun, 18 Jun 2023 21:12:28 +0100 Subject: [PATCH 07/16] Add test for yearly df merge --- test/unit/extract/excel_test.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/test/unit/extract/excel_test.py b/test/unit/extract/excel_test.py index a8516d27ca..b4664db445 100644 --- a/test/unit/extract/excel_test.py +++ b/test/unit/extract/excel_test.py @@ -4,8 +4,11 @@ from unittest.mock import patch import pandas as pd +import pytest +from dagster import build_op_context from pudl.extract import excel as excel +from pudl.settings import DatasetsSettings class TestMetadata(unittest.TestCase): @@ -137,3 +140,17 @@ def test_years_from_settings(dataset, expected_years): assert { output.value for output in years_from_settings(context) } >= expected_years + + +def test_merge_yearly_dfs(): + pages = ["page1", "page2", "page3"] + dfs_1 = {page: pd.DataFrame({"df": [1], "page": [page]}) for page in pages} + dfs_2 = {page: pd.DataFrame({"df": [2], "page": [page]}) for page in pages} + + merged_dfs = excel.merge_yearly_dfs([dfs_1, dfs_2]) + assert list(merged_dfs.keys()) == pages + for page in pages: + pd.testing.assert_frame_equal( + merged_dfs[page], + pd.DataFrame({"df": [1, 2], "page": [page, page]}, index=[0, 0]), + ) From e3675298c01fc2209b55993ecf497f31b69553fc Mon Sep 17 00:00:00 2001 From: David Stansby Date: Mon, 19 Jun 2023 10:13:41 +0100 Subject: [PATCH 08/16] Add docstring to merge_dfs_by_page --- src/pudl/extract/excel.py | 18 +++++++++++++----- test/unit/extract/excel_test.py | 4 ++-- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/src/pudl/extract/excel.py b/src/pudl/extract/excel.py index 27ca195083..b77d3dd4b2 100644 --- a/src/pudl/extract/excel.py +++ b/src/pudl/extract/excel.py @@ -368,12 +368,20 @@ def excel_filename(self, page, **partition): @op -def merge_yearly_dfs( - yearly_dfs: list[dict[str, pd.DataFrame]] +def merge_dfs_by_page( + paged_dfs: list[dict[str, pd.DataFrame]] ) -> dict[str, pd.DataFrame]: - """Merge DataFrames loaded with indepented calls to .extract().""" + """Merge DataFrames loaded with independent calls to .extract(). + + The input is a list of dictionaries where the keys are the page names, + and values are DataFrame of extracted data. + Where a page name appears more than once in the dictionaries, all + DataFrames with that page name are merged together, and the result + returned as a single dict mapping unique page names to merged + DataFrames. + """ merged = defaultdict(list) - for dfs in yearly_dfs: + for dfs in paged_dfs: for page in dfs: merged[page].append(dfs[page]) @@ -454,6 +462,6 @@ def raw_dfs() -> dict[str, pd.DataFrame]: """All loaded EIA dataframes.""" years = years_from_settings() dfs = years.map(lambda year: year_loader(year)) - return merge_yearly_dfs(dfs.collect()) + return merge_dfs_by_page(dfs.collect()) return graph_asset(name=f"{name}_raw_dfs")(raw_dfs) diff --git a/test/unit/extract/excel_test.py b/test/unit/extract/excel_test.py index b4664db445..767e1d043c 100644 --- a/test/unit/extract/excel_test.py +++ b/test/unit/extract/excel_test.py @@ -142,12 +142,12 @@ def test_years_from_settings(dataset, expected_years): } >= expected_years -def test_merge_yearly_dfs(): +def test_merge_dfs_by_page(): pages = ["page1", "page2", "page3"] dfs_1 = {page: pd.DataFrame({"df": [1], "page": [page]}) for page in pages} dfs_2 = {page: pd.DataFrame({"df": [2], "page": [page]}) for page in pages} - merged_dfs = excel.merge_yearly_dfs([dfs_1, dfs_2]) + merged_dfs = excel.merge_dfs_by_page([dfs_1, dfs_2]) assert list(merged_dfs.keys()) == pages for page in pages: pd.testing.assert_frame_equal( From 5b75a8db4ac56df1ba10a7fff2b9d555e3657ad6 Mon Sep 17 00:00:00 2001 From: thinky Date: Mon, 16 Oct 2023 16:21:27 -0400 Subject: [PATCH 09/16] Fix indexing issue --- src/pudl/extract/excel.py | 2 +- test/unit/extract/excel_test.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/pudl/extract/excel.py b/src/pudl/extract/excel.py index 3c9ec8635f..35a6333979 100644 --- a/src/pudl/extract/excel.py +++ b/src/pudl/extract/excel.py @@ -384,7 +384,7 @@ def merge_dfs_by_page( merged[page].append(dfs[page]) for page in merged: - merged[page] = pd.concat(merged[page]) + merged[page] = pd.concat(merged[page]).reset_index(drop=True) return merged diff --git a/test/unit/extract/excel_test.py b/test/unit/extract/excel_test.py index 767e1d043c..f4a8bf202f 100644 --- a/test/unit/extract/excel_test.py +++ b/test/unit/extract/excel_test.py @@ -152,5 +152,5 @@ def test_merge_dfs_by_page(): for page in pages: pd.testing.assert_frame_equal( merged_dfs[page], - pd.DataFrame({"df": [1, 2], "page": [page, page]}, index=[0, 0]), + pd.DataFrame({"df": [1, 2], "page": [page, page]}, index=[0, 1]), ) From ecd1abd031a22fbb7f91d05805f2745d61e56780 Mon Sep 17 00:00:00 2001 From: thinky Date: Tue, 17 Oct 2023 10:37:44 -0400 Subject: [PATCH 10/16] Remove empty dataframes from concatenation, add notes, remove excel assets from module --- src/pudl/etl/__init__.py | 1 - src/pudl/extract/excel.py | 22 +++++++++++++++------- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/src/pudl/etl/__init__.py b/src/pudl/etl/__init__.py index acda4c4242..e62d1003c7 100644 --- a/src/pudl/etl/__init__.py +++ b/src/pudl/etl/__init__.py @@ -32,7 +32,6 @@ default_assets = ( *load_assets_from_modules([eia_bulk_elec_assets], group_name="eia_bulk_elec"), *load_assets_from_modules([epacems_assets], group_name="epacems"), - *load_assets_from_modules([pudl.extract.excel], group_name="excel"), *load_assets_from_modules([pudl.extract.eia860], group_name="raw_eia860"), *load_assets_from_modules([pudl.transform.eia860], group_name="_core_eia860"), *load_assets_from_modules([pudl.extract.eia861], group_name="raw_eia861"), diff --git a/src/pudl/extract/excel.py b/src/pudl/extract/excel.py index 35a6333979..a3d2a3f4ce 100644 --- a/src/pudl/extract/excel.py +++ b/src/pudl/extract/excel.py @@ -378,15 +378,21 @@ def merge_dfs_by_page( returned as a single dict mapping unique page names to merged DataFrames. """ - merged = defaultdict(list) - for dfs in paged_dfs: - for page in dfs: - merged[page].append(dfs[page]) + all_data = defaultdict(list) + for dfs in paged_dfs: # For each dataframe + for page in dfs: # For each page in a raw dataframe + if ( + isinstance(dfs[page], pd.DataFrame) and not dfs[page].empty + ): # If there is data for the page + all_data[page].append( + dfs[page] + ) # Append to other raw data with same page name - for page in merged: - merged[page] = pd.concat(merged[page]).reset_index(drop=True) + for page in all_data: # For each page of data + # Merge all the dataframes for one page + all_data[page] = pd.concat(all_data[page]).reset_index(drop=True) - return merged + return all_data def year_loader_factory( @@ -459,7 +465,9 @@ def raw_df_factory( def raw_dfs() -> dict[str, pd.DataFrame]: """All loaded EIA dataframes.""" years = years_from_settings() + # Clone dagster op for each year dfs = years.map(lambda year: year_loader(year)) + # Gather all ops and merge them by page return merge_dfs_by_page(dfs.collect()) return graph_asset(name=f"{name}_raw_dfs")(raw_dfs) From 77e695f84471c819e335005c750f84bb431ac982 Mon Sep 17 00:00:00 2001 From: thinky Date: Tue, 17 Oct 2023 10:39:53 -0400 Subject: [PATCH 11/16] Add more docstrings --- src/pudl/extract/excel.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/pudl/extract/excel.py b/src/pudl/extract/excel.py index a3d2a3f4ce..633b811a7d 100644 --- a/src/pudl/extract/excel.py +++ b/src/pudl/extract/excel.py @@ -459,7 +459,9 @@ def raw_df_factory( extractor_cls: Class of type GenericExtractor used to extract the data. name(str): Name of dataset (e.g. "eia860"). """ + # Instantiate dagster factory for dataset year_loader = year_loader_factory(extractor_cls, name) + # Get years from settings years_from_settings = years_from_settings_factory(name) def raw_dfs() -> dict[str, pd.DataFrame]: From 4c6210320e3870fd69614d3fda8335c6347e3834 Mon Sep 17 00:00:00 2001 From: thinky Date: Tue, 17 Oct 2023 10:49:33 -0400 Subject: [PATCH 12/16] Update release notes --- docs/release_notes.rst | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/docs/release_notes.rst b/docs/release_notes.rst index 7682375f85..0f4049a079 100644 --- a/docs/release_notes.rst +++ b/docs/release_notes.rst @@ -32,12 +32,13 @@ Dagster Adoption returns the resources ids for a given etl group. * :mod:`pudl.settings.FercToSqliteSettings` class now loads all FERC datasources if no datasets are specified. + * The Excel extractor in ``pudl.extract.excel`` has been updated to parallelize + Excel spreadsheet extraction using Dagster ``@multi_asset`` functionality, thanks to + :user:`dstanby`. This is currently being used for EIA 860, 861 and 923 data. See + :issue:`2385` and PRs :pr:`2644`, :pr:`2943`. * EIA ETL changes: - * EIA extract methods are now ``@multi_asset`` that return an asset for each - raw table. 860 and 923 are separate ``@multi_asset`` which allows this data - to be extracted in parallel. * The EIA table level cleaning functions are now dagster assets. The table level cleaning assets now have a "clean\_" prefix and a "_{datasource}" suffix to distinguish them from the final harvested tables. From b35fbe34a3d450166f8d38a12c09478899da756d Mon Sep 17 00:00:00 2001 From: thinky Date: Tue, 17 Oct 2023 12:00:16 -0400 Subject: [PATCH 13/16] Fix missing data tables --- src/pudl/extract/excel.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/src/pudl/extract/excel.py b/src/pudl/extract/excel.py index 633b811a7d..01cef24cca 100644 --- a/src/pudl/extract/excel.py +++ b/src/pudl/extract/excel.py @@ -381,16 +381,19 @@ def merge_dfs_by_page( all_data = defaultdict(list) for dfs in paged_dfs: # For each dataframe for page in dfs: # For each page in a raw dataframe - if ( - isinstance(dfs[page], pd.DataFrame) and not dfs[page].empty - ): # If there is data for the page - all_data[page].append( - dfs[page] - ) # Append to other raw data with same page name + all_data[page].append( + dfs[page] + ) # Append to other raw data with same page name for page in all_data: # For each page of data - # Merge all the dataframes for one page - all_data[page] = pd.concat(all_data[page]).reset_index(drop=True) + # Drop any empty dataframes + all_data[page] = [df for df in all_data[page] if not df.empty] + if not all_data[page]: + logger.warning(f"No data found for table: {page}.") + all_data[page] = pd.DataFrame() + else: + # Merge all the dataframes for one page + all_data[page] = pd.concat(all_data[page]).reset_index(drop=True) return all_data From 81a27d49817970721aedd79dfe3994c6dfe6427f Mon Sep 17 00:00:00 2001 From: thinky Date: Tue, 17 Oct 2023 12:01:37 -0400 Subject: [PATCH 14/16] Fix typo in release notes --- docs/release_notes.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/release_notes.rst b/docs/release_notes.rst index 0f4049a079..4253516f8f 100644 --- a/docs/release_notes.rst +++ b/docs/release_notes.rst @@ -34,7 +34,7 @@ Dagster Adoption datasources if no datasets are specified. * The Excel extractor in ``pudl.extract.excel`` has been updated to parallelize Excel spreadsheet extraction using Dagster ``@multi_asset`` functionality, thanks to - :user:`dstanby`. This is currently being used for EIA 860, 861 and 923 data. See + :user:`dstansby`. This is currently being used for EIA 860, 861 and 923 data. See :issue:`2385` and PRs :pr:`2644`, :pr:`2943`. * EIA ETL changes: From cad0ee056c46c241da457f8b86da9c3afefcab43 Mon Sep 17 00:00:00 2001 From: Zane Selvans Date: Tue, 17 Oct 2023 17:58:45 -0600 Subject: [PATCH 15/16] Clarify names & docs; retain column info from null dfs - Expanded on some of the docstrings and comments to try and clarify the highly abstracted nested factories that construct the Excel extraction graph asset. - Changed some of the function/object names to use extract_ rather than load_ to match the stage-names we're using elsewhere. - It turned out that not including the empty dataframes created errors, since those dataframes are being constructed such that they always contain all mapped columns, even if the particular year being extracted doesn't have every column... and if we don't end up with all of the expected columns (even if they're null) that causes problems downstream. --- src/pudl/extract/excel.py | 122 ++++++++++++++++++-------------- src/pudl/settings.py | 1 + test/unit/extract/excel_test.py | 4 +- 3 files changed, 70 insertions(+), 57 deletions(-) diff --git a/src/pudl/extract/excel.py b/src/pudl/extract/excel.py index 01cef24cca..81cfe96b66 100644 --- a/src/pudl/extract/excel.py +++ b/src/pudl/extract/excel.py @@ -366,81 +366,87 @@ def excel_filename(self, page, **partition): @op -def merge_dfs_by_page( - paged_dfs: list[dict[str, pd.DataFrame]] -) -> dict[str, pd.DataFrame]: - """Merge DataFrames loaded with independent calls to .extract(). - - The input is a list of dictionaries where the keys are the page names, - and values are DataFrame of extracted data. - Where a page name appears more than once in the dictionaries, all - DataFrames with that page name are merged together, and the result - returned as a single dict mapping unique page names to merged - DataFrames. +def concat_pages(paged_dfs: list[dict[str, pd.DataFrame]]) -> dict[str, pd.DataFrame]: + """Concatenate pages loaded with independent calls to .extract(). + + Transform a list of dictionaries of dataframes into a single dictionary of + dataframes, where each dataframe is the concatenation of dataframes with identical + keys from the input list. + + Args: + paged_dfs: A list of dictionaries whose keys are page names, and values are + extracted DataFrames. Each element of the list corresponds to a single + year of the dataset being extracted. + + Returns: + A dictionary of DataFrames keyed by page name, where the DataFrame contains that + page's data from all extracted years concatenated together. """ + # Transform the list of dictionaries of dataframes into a dictionary of lists of + # dataframes, in which all dataframes in each list represent different instances of + # the same page of data from different years all_data = defaultdict(list) - for dfs in paged_dfs: # For each dataframe - for page in dfs: # For each page in a raw dataframe - all_data[page].append( - dfs[page] - ) # Append to other raw data with same page name - - for page in all_data: # For each page of data - # Drop any empty dataframes - all_data[page] = [df for df in all_data[page] if not df.empty] - if not all_data[page]: - logger.warning(f"No data found for table: {page}.") - all_data[page] = pd.DataFrame() - else: - # Merge all the dataframes for one page - all_data[page] = pd.concat(all_data[page]).reset_index(drop=True) + for dfs in paged_dfs: + for page in dfs: + all_data[page].append(dfs[page]) + + # concatenate the dataframes in each list in the dictionary into a single dataframe + for page in all_data: + all_data[page] = pd.concat(all_data[page]).reset_index(drop=True) return all_data -def year_loader_factory( +def year_extractor_factory( extractor_cls: type[GenericExtractor], name: str ) -> OpDefinition: - """Return a dagster op to load a single year using a given extractor class. + """Construct a Dagster op that extracts one year of data, given an extractor class. Args: - extractor_cls: Class of type GenericExtractor used to extract the data. - name(str): Name of dataset (e.g. "eia860"). + extractor_cls: Class of type :class:`GenericExtractor` used to extract the data. + name: Name of an Excel based dataset (e.g. "eia860"). """ - def load_single_year(context, year: int) -> dict[str, pd.DataFrame]: - """Load a single year of EIA data from file. + def extract_single_year(context, year: int) -> dict[str, pd.DataFrame]: + """A function that extracts a year of spreadsheet data from an Excel file. + + This function will be decorated with a Dagster op and returned. Args: - context: - context: dagster keyword that provides access to resources and config. - year: - Year to load. + context: Dagster keyword that provides access to resources and config. + year: Year of data to extract. Returns: - Loaded data in a dataframe. + A dictionary of DataFrames extracted from Excel, keyed by page name. """ ds = context.resources.datastore return extractor_cls(ds).extract(year=[year]) return op( required_resource_keys={"datastore", "dataset_settings"}, - name=f"load_single_{name}_year", - )(load_single_year) + name=f"extract_single_{name}_year", + )(extract_single_year) def years_from_settings_factory(name: str) -> OpDefinition: - """Return a dagster op to get EIA years from settings. + """Construct a Dagster op to get target years from settings in the Dagster context. Args: - name (str): Name of dataset (e.g. "eia860") + name: Name of an Excel based dataset (e.g. "eia860"). Currently this must be + one of the attributes of :class:`pudl.settings.EiaSettings` + """ - def years_from_settings(context): - """Return set of years for EIA data in settings. + def years_from_settings(context) -> DynamicOutput: + """Produce target years for the given dataset from the EIA settings object. - These will be used to kick off worker processes to load each year of data in + These will be used to kick off worker processes to extract each year of data in parallel. + + Yields: + A Dagster :class:`DynamicOutput` object representing the year to be + extracted. See the Dagster API documentation for more details: + https://docs.dagster.io/_apidocs/dynamic#dagster.DynamicOut """ eia_settings = context.resources.dataset_settings.eia for year in getattr(eia_settings, name).years: @@ -456,23 +462,29 @@ def years_from_settings(context): def raw_df_factory( extractor_cls: type[GenericExtractor], name: str ) -> AssetsDefinition: - """Return a graph asset to load a set of raw DataFrames from Excel files. + """Return a dagster graph asset to extract a set of raw DataFrames from Excel files. Args: - extractor_cls: Class of type GenericExtractor used to extract the data. - name(str): Name of dataset (e.g. "eia860"). + extractor_cls: The dataset-specific Excel extractor used to extract the data. + Needs to correspond to the dataset identified by ``name``. + name: Name of an Excel based dataset (e.g. "eia860"). Currently this must be + one of the attributes of :class:`pudl.settings.EiaSettings` """ - # Instantiate dagster factory for dataset - year_loader = year_loader_factory(extractor_cls, name) - # Get years from settings + # Build a Dagster op that can extract a single year of data + year_extractor = year_extractor_factory(extractor_cls, name) + # Get the list of target years to extract from the PUDL ETL settings object which is + # stored in the Dagster context that is available to all ops. years_from_settings = years_from_settings_factory(name) def raw_dfs() -> dict[str, pd.DataFrame]: - """All loaded EIA dataframes.""" + """Produce a dictionary of extracted EIA dataframes.""" years = years_from_settings() - # Clone dagster op for each year - dfs = years.map(lambda year: year_loader(year)) - # Gather all ops and merge them by page - return merge_dfs_by_page(dfs.collect()) + # Clone dagster op for each year using DynamicOut.map() + # See https://docs.dagster.io/_apidocs/dynamic#dagster.DynamicOut + dfs = years.map(lambda year: year_extractor(year)) + # Collect the results from all of those cloned ops and concatenate the + # individual years of data into a single multi-year dataframe for each different + # page in the spreadsheet based dataset using DynamicOut.collect() + return concat_pages(dfs.collect()) return graph_asset(name=f"{name}_raw_dfs")(raw_dfs) diff --git a/src/pudl/settings.py b/src/pudl/settings.py index 37210720d0..e196db5fff 100644 --- a/src/pudl/settings.py +++ b/src/pudl/settings.py @@ -244,6 +244,7 @@ class EiaSettings(BaseModel): Args: eia860: Immutable pydantic model to validate eia860 settings. + eia861: Immutable pydantic model to validate eia861 settings. eia923: Immutable pydantic model to validate eia923 settings. """ diff --git a/test/unit/extract/excel_test.py b/test/unit/extract/excel_test.py index f4a8bf202f..0e9ac3eea6 100644 --- a/test/unit/extract/excel_test.py +++ b/test/unit/extract/excel_test.py @@ -142,12 +142,12 @@ def test_years_from_settings(dataset, expected_years): } >= expected_years -def test_merge_dfs_by_page(): +def test_concat_pages(): pages = ["page1", "page2", "page3"] dfs_1 = {page: pd.DataFrame({"df": [1], "page": [page]}) for page in pages} dfs_2 = {page: pd.DataFrame({"df": [2], "page": [page]}) for page in pages} - merged_dfs = excel.merge_dfs_by_page([dfs_1, dfs_2]) + merged_dfs = excel.concat_pages([dfs_1, dfs_2]) assert list(merged_dfs.keys()) == pages for page in pages: pd.testing.assert_frame_equal( From 8d3fbcc1a9c7cb5a0db5421a9b3bf8937c74c3c5 Mon Sep 17 00:00:00 2001 From: Zane Selvans Date: Tue, 17 Oct 2023 18:19:16 -0600 Subject: [PATCH 16/16] Replace one more 'load' with 'extract' --- src/pudl/extract/excel.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/pudl/extract/excel.py b/src/pudl/extract/excel.py index 81cfe96b66..e308135307 100644 --- a/src/pudl/extract/excel.py +++ b/src/pudl/extract/excel.py @@ -354,12 +354,12 @@ def excel_filename(self, page, **partition): """Produce the xlsx document file name as it will appear in the archive. Args: - page: pudl name for the dataset contents, eg - "boiler_generator_assn" or "coal_stocks" - partition: partition to load. (ex: 2009 for year partition or - "2020-08" for year_month partition) + page: pudl name for the dataset contents, eg "boiler_generator_assn" or + "coal_stocks" + partition: partition to load. (ex: 2009 for year partition or "2020-08" for + year_month partition) - Return: + Returns: string name of the xlsx file """ return self.METADATA.get_file_name(page, **partition) @@ -367,7 +367,7 @@ def excel_filename(self, page, **partition): @op def concat_pages(paged_dfs: list[dict[str, pd.DataFrame]]) -> dict[str, pd.DataFrame]: - """Concatenate pages loaded with independent calls to .extract(). + """Concatenate similar pages of data from different years into single dataframes. Transform a list of dictionaries of dataframes into a single dictionary of dataframes, where each dataframe is the concatenation of dataframes with identical