Skip to content

Commit

Permalink
Merge pull request #2943 from catalyst-cooperative/parallel_extraction
Browse files Browse the repository at this point in the history
@e-belfer I know you wanted to get this merged in today and I went ahead and made the docs & naming clarifications, so I'm going to mash the button and hopefully we get a good build out of it!  I ran the full ETL locally and it completed fine.
  • Loading branch information
zaneselvans committed Oct 18, 2023
2 parents b939d28 + 8d3fbcc commit b69c009
Show file tree
Hide file tree
Showing 7 changed files with 194 additions and 90 deletions.
7 changes: 4 additions & 3 deletions docs/release_notes.rst
Expand Up @@ -32,12 +32,13 @@ Dagster Adoption
returns the resources ids for a given etl group.
* :mod:`pudl.settings.FercToSqliteSettings` class now loads all FERC
datasources if no datasets are specified.
* The Excel extractor in ``pudl.extract.excel`` has been updated to parallelize
Excel spreadsheet extraction using Dagster ``@multi_asset`` functionality, thanks to
:user:`dstansby`. This is currently being used for EIA 860, 861 and 923 data. See
:issue:`2385` and PRs :pr:`2644`, :pr:`2943`.

* EIA ETL changes:

* EIA extract methods are now ``@multi_asset`` that return an asset for each
raw table. 860 and 923 are separate ``@multi_asset`` which allows this data
to be extracted in parallel.
* The EIA table level cleaning functions are now
dagster assets. The table level cleaning assets now have a "clean\_" prefix
and a "_{datasource}" suffix to distinguish them from the final harvested tables.
Expand Down
72 changes: 2 additions & 70 deletions src/pudl/extract/eia860.py
Expand Up @@ -4,18 +4,8 @@
This code is for use analyzing EIA Form 860 data.
"""
from collections import defaultdict

import pandas as pd
from dagster import (
AssetOut,
DynamicOut,
DynamicOutput,
Output,
graph_asset,
multi_asset,
op,
)
from dagster import AssetOut, Output, multi_asset

import pudl
import pudl.logging_helpers
Expand Down Expand Up @@ -95,65 +85,7 @@ def get_dtypes(page, **partition):
)


@op(
out=DynamicOut(),
required_resource_keys={"dataset_settings"},
)
def eia_years_from_settings(context):
"""Return set of years for EIA in settings.
These will be used to kick off worker processes to load each year of data in
parallel.
"""
eia_settings = context.resources.dataset_settings.eia
for year in eia_settings.eia860.years:
yield DynamicOutput(year, mapping_key=str(year))


@op(
required_resource_keys={"datastore", "dataset_settings"},
)
def load_single_year(context, year: int) -> dict[str, pd.DataFrame]:
"""Load a single year of EIA data from file.
Args:
context:
context: dagster keyword that provides access to resources and config.
year:
Year to load.
Returns:
Loaded data in a dataframe.
"""
ds = context.resources.datastore
return Extractor(ds).extract(year=[year])


@op
def merge_eia860_years(
yearly_dfs: list[dict[str, pd.DataFrame]]
) -> dict[str, pd.DataFrame]:
"""Merge yearly EIA-860 dataframes."""
merged = defaultdict(list)
for dfs in yearly_dfs:
for page in dfs:
merged[page].append(dfs[page])

for page in merged:
merged[page] = pd.concat(merged[page])

return merged


@graph_asset
def eia860_raw_dfs() -> dict[str, pd.DataFrame]:
"""All loaded EIA860 dataframes.
This asset creates a dynamic graph of ops to load EIA860 data in parallel.
"""
years = eia_years_from_settings()
dfs = years.map(lambda year: load_single_year(year))
return merge_eia860_years(dfs.collect())
eia860_raw_dfs = excel.raw_df_factory(Extractor, name="eia860")


# TODO (bendnorman): Figure out type hint for context keyword and mutli_asset return
Expand Down
9 changes: 4 additions & 5 deletions src/pudl/extract/eia861.py
Expand Up @@ -69,6 +69,9 @@ def get_dtypes(page, **partition):
}


eia861_raw_dfs = excel.raw_df_factory(Extractor, name="eia861")


@multi_asset(
outs={
table_name: AssetOut()
Expand Down Expand Up @@ -99,7 +102,7 @@ def get_dtypes(page, **partition):
},
required_resource_keys={"datastore", "dataset_settings"},
)
def extract_eia861(context):
def extract_eia861(context, eia861_raw_dfs):
"""Extract raw EIA-861 data from Excel sheets into dataframes.
Args:
Expand All @@ -108,10 +111,6 @@ def extract_eia861(context):
Returns:
A tuple of extracted EIA-861 dataframes.
"""
eia_settings = context.resources.dataset_settings.eia
ds = context.resources.datastore
eia861_raw_dfs = Extractor(ds).extract(year=eia_settings.eia861.years)

eia861_raw_dfs = {
"raw_eia861__" + table_name.replace("_eia861", ""): df
for table_name, df in eia861_raw_dfs.items()
Expand Down
12 changes: 5 additions & 7 deletions src/pudl/extract/eia923.py
Expand Up @@ -108,25 +108,23 @@ def get_dtypes(page, **partition):
)


eia923_raw_dfs = excel.raw_df_factory(Extractor, name="eia923")


# TODO (bendnorman): Figure out type hint for context keyword and mutli_asset return
@multi_asset(
outs={table_name: AssetOut() for table_name in sorted(eia_raw_table_names)},
required_resource_keys={"datastore", "dataset_settings"},
)
def extract_eia923(context):
"""Extract raw EIA data from excel sheets into dataframes.
def extract_eia923(context, eia923_raw_dfs):
"""Extract raw EIA-923 data from excel sheets into dataframes.
Args:
context: dagster keyword that provides access to resources and config.
Returns:
A tuple of extracted EIA dataframes.
"""
eia_settings = context.resources.dataset_settings.eia

ds = context.resources.datastore
eia923_raw_dfs = Extractor(ds).extract(year=eia_settings.eia923.years)

# create descriptive table_names
eia923_raw_dfs = {
"raw_eia923__" + table_name: df for table_name, df in eia923_raw_dfs.items()
Expand Down
144 changes: 139 additions & 5 deletions src/pudl/extract/excel.py
@@ -1,9 +1,18 @@
"""Load excel metadata CSV files form a python data package."""
import importlib.resources
import pathlib
from collections import defaultdict

import dbfread
import pandas as pd
from dagster import (
AssetsDefinition,
DynamicOut,
DynamicOutput,
OpDefinition,
graph_asset,
op,
)

import pudl

Expand Down Expand Up @@ -345,12 +354,137 @@ def excel_filename(self, page, **partition):
"""Produce the xlsx document file name as it will appear in the archive.
Args:
page: pudl name for the dataset contents, eg
"boiler_generator_assn" or "coal_stocks"
partition: partition to load. (ex: 2009 for year partition or
"2020-08" for year_month partition)
page: pudl name for the dataset contents, eg "boiler_generator_assn" or
"coal_stocks"
partition: partition to load. (ex: 2009 for year partition or "2020-08" for
year_month partition)
Return:
Returns:
string name of the xlsx file
"""
return self.METADATA.get_file_name(page, **partition)


@op
def concat_pages(paged_dfs: list[dict[str, pd.DataFrame]]) -> dict[str, pd.DataFrame]:
"""Concatenate similar pages of data from different years into single dataframes.
Transform a list of dictionaries of dataframes into a single dictionary of
dataframes, where each dataframe is the concatenation of dataframes with identical
keys from the input list.
Args:
paged_dfs: A list of dictionaries whose keys are page names, and values are
extracted DataFrames. Each element of the list corresponds to a single
year of the dataset being extracted.
Returns:
A dictionary of DataFrames keyed by page name, where the DataFrame contains that
page's data from all extracted years concatenated together.
"""
# Transform the list of dictionaries of dataframes into a dictionary of lists of
# dataframes, in which all dataframes in each list represent different instances of
# the same page of data from different years
all_data = defaultdict(list)
for dfs in paged_dfs:
for page in dfs:
all_data[page].append(dfs[page])

# concatenate the dataframes in each list in the dictionary into a single dataframe
for page in all_data:
all_data[page] = pd.concat(all_data[page]).reset_index(drop=True)

return all_data


def year_extractor_factory(
extractor_cls: type[GenericExtractor], name: str
) -> OpDefinition:
"""Construct a Dagster op that extracts one year of data, given an extractor class.
Args:
extractor_cls: Class of type :class:`GenericExtractor` used to extract the data.
name: Name of an Excel based dataset (e.g. "eia860").
"""

def extract_single_year(context, year: int) -> dict[str, pd.DataFrame]:
"""A function that extracts a year of spreadsheet data from an Excel file.
This function will be decorated with a Dagster op and returned.
Args:
context: Dagster keyword that provides access to resources and config.
year: Year of data to extract.
Returns:
A dictionary of DataFrames extracted from Excel, keyed by page name.
"""
ds = context.resources.datastore
return extractor_cls(ds).extract(year=[year])

return op(
required_resource_keys={"datastore", "dataset_settings"},
name=f"extract_single_{name}_year",
)(extract_single_year)


def years_from_settings_factory(name: str) -> OpDefinition:
"""Construct a Dagster op to get target years from settings in the Dagster context.
Args:
name: Name of an Excel based dataset (e.g. "eia860"). Currently this must be
one of the attributes of :class:`pudl.settings.EiaSettings`
"""

def years_from_settings(context) -> DynamicOutput:
"""Produce target years for the given dataset from the EIA settings object.
These will be used to kick off worker processes to extract each year of data in
parallel.
Yields:
A Dagster :class:`DynamicOutput` object representing the year to be
extracted. See the Dagster API documentation for more details:
https://docs.dagster.io/_apidocs/dynamic#dagster.DynamicOut
"""
eia_settings = context.resources.dataset_settings.eia
for year in getattr(eia_settings, name).years:
yield DynamicOutput(year, mapping_key=str(year))

return op(
out=DynamicOut(),
required_resource_keys={"dataset_settings"},
name=f"{name}_years_from_settings",
)(years_from_settings)


def raw_df_factory(
extractor_cls: type[GenericExtractor], name: str
) -> AssetsDefinition:
"""Return a dagster graph asset to extract a set of raw DataFrames from Excel files.
Args:
extractor_cls: The dataset-specific Excel extractor used to extract the data.
Needs to correspond to the dataset identified by ``name``.
name: Name of an Excel based dataset (e.g. "eia860"). Currently this must be
one of the attributes of :class:`pudl.settings.EiaSettings`
"""
# Build a Dagster op that can extract a single year of data
year_extractor = year_extractor_factory(extractor_cls, name)
# Get the list of target years to extract from the PUDL ETL settings object which is
# stored in the Dagster context that is available to all ops.
years_from_settings = years_from_settings_factory(name)

def raw_dfs() -> dict[str, pd.DataFrame]:
"""Produce a dictionary of extracted EIA dataframes."""
years = years_from_settings()
# Clone dagster op for each year using DynamicOut.map()
# See https://docs.dagster.io/_apidocs/dynamic#dagster.DynamicOut
dfs = years.map(lambda year: year_extractor(year))
# Collect the results from all of those cloned ops and concatenate the
# individual years of data into a single multi-year dataframe for each different
# page in the spreadsheet based dataset using DynamicOut.collect()
return concat_pages(dfs.collect())

return graph_asset(name=f"{name}_raw_dfs")(raw_dfs)
1 change: 1 addition & 0 deletions src/pudl/settings.py
Expand Up @@ -244,6 +244,7 @@ class EiaSettings(BaseModel):
Args:
eia860: Immutable pydantic model to validate eia860 settings.
eia861: Immutable pydantic model to validate eia861 settings.
eia923: Immutable pydantic model to validate eia923 settings.
"""

Expand Down
39 changes: 39 additions & 0 deletions test/unit/extract/excel_test.py
Expand Up @@ -4,8 +4,11 @@
from unittest.mock import patch

import pandas as pd
import pytest
from dagster import build_op_context

from pudl.extract import excel as excel
from pudl.settings import DatasetsSettings


class TestMetadata(unittest.TestCase):
Expand Down Expand Up @@ -115,3 +118,39 @@ def test_read_excel_calls(mock_read_excel):

# TODO(rousik@gmail.com): need to figure out how to test process_$x methods.
# TODO(rousik@gmail.com): we should test that empty columns are properly added.


@pytest.mark.parametrize(
"dataset, expected_years",
(
("eia860", set(range(2001, 2022))),
("eia861", set(range(2001, 2022))),
("eia923", set(range(2001, 2022))),
),
)
def test_years_from_settings(dataset, expected_years):
years_from_settings = excel.years_from_settings_factory(dataset)

with build_op_context(
resources={"dataset_settings": DatasetsSettings()}
) as context:
# Assert actual years are a superset of expected. Instead of doing
# an equality check, this avoids having to update expected years
# every time a new year is added to the datasets
assert {
output.value for output in years_from_settings(context)
} >= expected_years


def test_concat_pages():
pages = ["page1", "page2", "page3"]
dfs_1 = {page: pd.DataFrame({"df": [1], "page": [page]}) for page in pages}
dfs_2 = {page: pd.DataFrame({"df": [2], "page": [page]}) for page in pages}

merged_dfs = excel.concat_pages([dfs_1, dfs_2])
assert list(merged_dfs.keys()) == pages
for page in pages:
pd.testing.assert_frame_equal(
merged_dfs[page],
pd.DataFrame({"df": [1, 2], "page": [page, page]}, index=[0, 1]),
)

0 comments on commit b69c009

Please sign in to comment.