Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize extraction of Excel spreadsheets #2943

Merged
merged 19 commits into from Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
72 changes: 2 additions & 70 deletions src/pudl/extract/eia860.py
Expand Up @@ -4,18 +4,8 @@

This code is for use analyzing EIA Form 860 data.
"""
from collections import defaultdict

import pandas as pd
from dagster import (
AssetOut,
DynamicOut,
DynamicOutput,
Output,
graph_asset,
multi_asset,
op,
)
from dagster import AssetOut, Output, multi_asset

import pudl
import pudl.logging_helpers
Expand Down Expand Up @@ -95,65 +85,7 @@ def get_dtypes(page, **partition):
)


@op(
out=DynamicOut(),
required_resource_keys={"dataset_settings"},
)
def eia_years_from_settings(context):
"""Return set of years for EIA in settings.

These will be used to kick off worker processes to load each year of data in
parallel.
"""
eia_settings = context.resources.dataset_settings.eia
for year in eia_settings.eia860.years:
yield DynamicOutput(year, mapping_key=str(year))


@op(
required_resource_keys={"datastore", "dataset_settings"},
)
def load_single_year(context, year: int) -> dict[str, pd.DataFrame]:
"""Load a single year of EIA data from file.

Args:
context:
context: dagster keyword that provides access to resources and config.
year:
Year to load.

Returns:
Loaded data in a dataframe.
"""
ds = context.resources.datastore
return Extractor(ds).extract(year=[year])


@op
def merge_eia860_years(
yearly_dfs: list[dict[str, pd.DataFrame]]
) -> dict[str, pd.DataFrame]:
"""Merge yearly EIA-860 dataframes."""
merged = defaultdict(list)
for dfs in yearly_dfs:
for page in dfs:
merged[page].append(dfs[page])

for page in merged:
merged[page] = pd.concat(merged[page])

return merged


@graph_asset
def eia860_raw_dfs() -> dict[str, pd.DataFrame]:
"""All loaded EIA860 dataframes.

This asset creates a dynamic graph of ops to load EIA860 data in parallel.
"""
years = eia_years_from_settings()
dfs = years.map(lambda year: load_single_year(year))
return merge_eia860_years(dfs.collect())
eia860_raw_dfs = excel.raw_df_factory(Extractor, name="eia860")


# TODO (bendnorman): Figure out type hint for context keyword and mutli_asset return
Expand Down
9 changes: 4 additions & 5 deletions src/pudl/extract/eia861.py
Expand Up @@ -69,6 +69,9 @@ def get_dtypes(page, **partition):
}


eia861_raw_dfs = excel.raw_df_factory(Extractor, name="eia861")


@multi_asset(
outs={
table_name: AssetOut()
Expand Down Expand Up @@ -99,7 +102,7 @@ def get_dtypes(page, **partition):
},
required_resource_keys={"datastore", "dataset_settings"},
)
def extract_eia861(context):
def extract_eia861(context, eia861_raw_dfs):
"""Extract raw EIA-861 data from Excel sheets into dataframes.

Args:
Expand All @@ -108,10 +111,6 @@ def extract_eia861(context):
Returns:
A tuple of extracted EIA-861 dataframes.
"""
eia_settings = context.resources.dataset_settings.eia
ds = context.resources.datastore
eia861_raw_dfs = Extractor(ds).extract(year=eia_settings.eia861.years)

eia861_raw_dfs = {
"raw_eia861__" + table_name.replace("_eia861", ""): df
for table_name, df in eia861_raw_dfs.items()
Expand Down
12 changes: 5 additions & 7 deletions src/pudl/extract/eia923.py
Expand Up @@ -108,25 +108,23 @@ def get_dtypes(page, **partition):
)


eia923_raw_dfs = excel.raw_df_factory(Extractor, name="eia923")


# TODO (bendnorman): Figure out type hint for context keyword and mutli_asset return
@multi_asset(
outs={table_name: AssetOut() for table_name in sorted(eia_raw_table_names)},
required_resource_keys={"datastore", "dataset_settings"},
)
def extract_eia923(context):
"""Extract raw EIA data from excel sheets into dataframes.
def extract_eia923(context, eia923_raw_dfs):
"""Extract raw EIA-923 data from excel sheets into dataframes.

Args:
context: dagster keyword that provides access to resources and config.

Returns:
A tuple of extracted EIA dataframes.
"""
eia_settings = context.resources.dataset_settings.eia

ds = context.resources.datastore
eia923_raw_dfs = Extractor(ds).extract(year=eia_settings.eia923.years)

# create descriptive table_names
eia923_raw_dfs = {
"raw_eia923__" + table_name: df for table_name, df in eia923_raw_dfs.items()
Expand Down
119 changes: 119 additions & 0 deletions src/pudl/extract/excel.py
@@ -1,9 +1,18 @@
"""Load excel metadata CSV files form a python data package."""
import importlib.resources
import pathlib
from collections import defaultdict

import dbfread
import pandas as pd
from dagster import (
AssetsDefinition,
DynamicOut,
DynamicOutput,
OpDefinition,
graph_asset,
op,
)

import pudl

Expand Down Expand Up @@ -354,3 +363,113 @@ def excel_filename(self, page, **partition):
string name of the xlsx file
"""
return self.METADATA.get_file_name(page, **partition)


@op
def merge_dfs_by_page(
paged_dfs: list[dict[str, pd.DataFrame]]
) -> dict[str, pd.DataFrame]:
"""Merge DataFrames loaded with independent calls to .extract().

The input is a list of dictionaries where the keys are the page names,
and values are DataFrame of extracted data.
Where a page name appears more than once in the dictionaries, all
DataFrames with that page name are merged together, and the result
returned as a single dict mapping unique page names to merged
DataFrames.
"""
all_data = defaultdict(list)
for dfs in paged_dfs: # For each dataframe
for page in dfs: # For each page in a raw dataframe
if (
isinstance(dfs[page], pd.DataFrame) and not dfs[page].empty
): # If there is data for the page
all_data[page].append(
dfs[page]
) # Append to other raw data with same page name

for page in all_data: # For each page of data
# Merge all the dataframes for one page
all_data[page] = pd.concat(all_data[page]).reset_index(drop=True)

return all_data


def year_loader_factory(
extractor_cls: type[GenericExtractor], name: str
) -> OpDefinition:
"""Return a dagster op to load a single year using a given extractor class.

Args:
extractor_cls: Class of type GenericExtractor used to extract the data.
name(str): Name of dataset (e.g. "eia860").
"""

def load_single_year(context, year: int) -> dict[str, pd.DataFrame]:
"""Load a single year of EIA data from file.

Args:
context:
context: dagster keyword that provides access to resources and config.
year:
Year to load.

Returns:
Loaded data in a dataframe.
"""
ds = context.resources.datastore
return extractor_cls(ds).extract(year=[year])

return op(
required_resource_keys={"datastore", "dataset_settings"},
name=f"load_single_{name}_year",
)(load_single_year)


def years_from_settings_factory(name: str) -> OpDefinition:
"""Return a dagster op to get EIA years from settings.

Args:
name (str): Name of dataset (e.g. "eia860")
"""

def years_from_settings(context):
e-belfer marked this conversation as resolved.
Show resolved Hide resolved
"""Return set of years for EIA data in settings.

These will be used to kick off worker processes to load each year of data in
parallel.
"""
eia_settings = context.resources.dataset_settings.eia
for year in getattr(eia_settings, name).years:
yield DynamicOutput(year, mapping_key=str(year))

return op(
out=DynamicOut(),
required_resource_keys={"dataset_settings"},
name=f"{name}_years_from_settings",
)(years_from_settings)


def raw_df_factory(
extractor_cls: type[GenericExtractor], name: str
) -> AssetsDefinition:
"""Return a graph asset to load a set of raw DataFrames from Excel files.

Args:
extractor_cls: Class of type GenericExtractor used to extract the data.
name(str): Name of dataset (e.g. "eia860").
"""
# Instantiate dagster factory for dataset
year_loader = year_loader_factory(extractor_cls, name)
# Get years from settings
years_from_settings = years_from_settings_factory(name)

def raw_dfs() -> dict[str, pd.DataFrame]:
"""All loaded EIA dataframes."""
years = years_from_settings()
# Clone dagster op for each year
dfs = years.map(lambda year: year_loader(year))
# Gather all ops and merge them by page
return merge_dfs_by_page(dfs.collect())

return graph_asset(name=f"{name}_raw_dfs")(raw_dfs)
39 changes: 39 additions & 0 deletions test/unit/extract/excel_test.py
Expand Up @@ -4,8 +4,11 @@
from unittest.mock import patch

import pandas as pd
import pytest
from dagster import build_op_context

from pudl.extract import excel as excel
from pudl.settings import DatasetsSettings


class TestMetadata(unittest.TestCase):
Expand Down Expand Up @@ -115,3 +118,39 @@ def test_read_excel_calls(mock_read_excel):

# TODO(rousik@gmail.com): need to figure out how to test process_$x methods.
# TODO(rousik@gmail.com): we should test that empty columns are properly added.


@pytest.mark.parametrize(
"dataset, expected_years",
(
("eia860", set(range(2001, 2022))),
("eia861", set(range(2001, 2022))),
("eia923", set(range(2001, 2022))),
),
)
def test_years_from_settings(dataset, expected_years):
years_from_settings = excel.years_from_settings_factory(dataset)

with build_op_context(
resources={"dataset_settings": DatasetsSettings()}
) as context:
# Assert actual years are a superset of expected. Instead of doing
# an equality check, this avoids having to update expected years
# every time a new year is added to the datasets
assert {
output.value for output in years_from_settings(context)
} >= expected_years


def test_merge_dfs_by_page():
pages = ["page1", "page2", "page3"]
dfs_1 = {page: pd.DataFrame({"df": [1], "page": [page]}) for page in pages}
dfs_2 = {page: pd.DataFrame({"df": [2], "page": [page]}) for page in pages}

merged_dfs = excel.merge_dfs_by_page([dfs_1, dfs_2])
assert list(merged_dfs.keys()) == pages
for page in pages:
pd.testing.assert_frame_equal(
merged_dfs[page],
pd.DataFrame({"df": [1, 2], "page": [page, page]}, index=[0, 1]),
)