Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Output Parquet files as well as SQLite in PUDL ETL #3296

Merged
merged 68 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
8b4fed4
Experimental support for exporting to parquet files.
rousik Jan 6, 2024
b1ac18a
Merge remote-tracking branch 'origin/main' into export-to-parquet
rousik Jan 10, 2024
0a78a94
Modify PudlSqliteIOManager to support parquet.
rousik Jan 10, 2024
c9aa6f0
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jan 10, 2024
b6136c7
Fix the export of pandas/parquet.
rousik Jan 11, 2024
83492a0
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jan 11, 2024
83650d5
Refactoring the io managers with parquet.
rousik Jan 16, 2024
39b72e5
Merge remote-tracking branch 'origin/main' into export-to-parquet
rousik Jan 16, 2024
9043255
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jan 16, 2024
4ab92f2
Change how EnvVars are read when creating mixed io manager.
rousik Jan 17, 2024
c49d129
Merge remote-tracking branch 'origin/main' into export-to-parquet
rousik Jan 17, 2024
afed63d
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jan 17, 2024
d88c1ef
Rename pudl_sqlite_io_manager to pudl_io_manager
rousik Jan 18, 2024
4ab01d5
Merge remote-tracking branch 'origin/main' into export-to-parquet
rousik Jan 18, 2024
cd98379
Fix minor issue in the schema validation code.
rousik Jan 18, 2024
a400446
Refactor PudlSQLiteIOManager tests to use unittest.
rousik Jan 18, 2024
df5e12d
Fail schema enforcement when unknown columns are encountered.
rousik Jan 18, 2024
c3d4bb3
Fail schema enforcement when unknown columns are encountered.
rousik Jan 18, 2024
9081388
Switch mixed format io manager back to IOManager.
rousik Jan 18, 2024
273741a
Remove _get_table_name() method in favor of standalone helper.
rousik Jan 18, 2024
06571ac
Merge branch 'main' into parquet_outputs
zschira Jan 24, 2024
cd2dcd0
REmove schema test for unexpected columns
zschira Jan 25, 2024
c7ce623
Improve naming of io-manager test fixtures
zschira Jan 25, 2024
ab66c2a
Refactor where foreign key check functionality lives
zschira Jan 25, 2024
013f4a5
Revert "Refactor PudlSQLiteIOManager tests to use unittest."
zschira Jan 25, 2024
3408333
Remove unused io-manager test fixture
zschira Jan 25, 2024
91b4ff9
Fix docs build failure
zschira Jan 25, 2024
01ae240
Merge branch 'main' into parquet_outputs
zschira Jan 25, 2024
21e0dbf
Update src/pudl/io_managers.py
zschira Jan 26, 2024
ee2a533
Improve docstring for mixed output io-manager
zschira Jan 26, 2024
2fceea6
Do not allow parquet reads if parquet writes are off.
zschira Jan 26, 2024
1cf1fbd
Improve type hints for mixed io-manager
zschira Jan 26, 2024
dd6e395
Change naming of mixed format io-manager
zschira Jan 26, 2024
192a569
Add argument descriptions to ForeignKeyError
zschira Jan 26, 2024
a27c5f1
Remove unused argument
zschira Jan 26, 2024
f47a9d1
Merge branch 'main' into parquet_outputs
zschira Jan 26, 2024
a961480
Merge branch 'parquet_outputs' of github.com:catalyst-cooperative/pud…
zschira Jan 26, 2024
46dd5a2
Add log for db_path in check_foreign_keys
zschira Jan 29, 2024
e8ebdd7
Update return type from parquet io-manager
zschira Jan 29, 2024
06aa9fe
Add link to description of rowids
zschira Jan 29, 2024
5b82d91
Remove unused argument description from docstring
zschira Jan 29, 2024
aaa46cb
Fixed name collision in integration tests
zschira Jan 29, 2024
0492659
Update src/pudl/etl/check_foreign_keys.py
zschira Jan 30, 2024
e5825fb
Remove redundant log
zschira Jan 30, 2024
6fd541e
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jan 30, 2024
cc3644a
Make parquet output on by default
zschira Jan 30, 2024
7269bf7
Merge branch 'parquet_outputs' of github.com:catalyst-cooperative/pud…
zschira Jan 30, 2024
cf1d5c5
Fix pudl_mixed_format_io_manager reference in docstring
zschira Jan 30, 2024
f9ed62b
Merge branch 'main' into parquet_outputs
zaneselvans Jan 30, 2024
f2c9f25
Update stale check_fks and pudl_paths function args
zaneselvans Jan 30, 2024
41eb6d3
Change how dtype handling is done for parquet reads
zschira Jan 30, 2024
27555ae
Revert "Change how dtype handling is done for parquet reads"
zschira Jan 31, 2024
827dc8b
Make pyarrow datetime type timezone naive
zschira Jan 31, 2024
16b4585
Output epacems to correct directories
zschira Feb 1, 2024
b54e915
Merge branch 'main' into parquet_outputs
zaneselvans Feb 1, 2024
5b1a6f5
Fix consolidated epacems output
zschira Feb 1, 2024
ef74271
Fix access to partitioned parquet outputs.
zschira Feb 1, 2024
4ed6f90
nightly build refer to new parquet outputs.
zaneselvans Feb 1, 2024
53a3fd4
Merge branch 'main' into parquet_outputs
zaneselvans Feb 1, 2024
40e58a6
Merge remote-tracking branch 'origin/main' into parquet_outputs
zaneselvans Feb 1, 2024
b922919
Merge branch 'main' into parquet_outputs
zaneselvans Feb 2, 2024
d59f795
Merge branch 'main' into parquet_outputs
zaneselvans Feb 5, 2024
5fffc65
Merge branch 'main' into parquet_outputs
zaneselvans Feb 5, 2024
16fb36d
Update parquet path in integration tests.
zaneselvans Feb 6, 2024
6c23e79
Remove to_pyarrow() null description workarounds
zaneselvans Feb 6, 2024
1aa55b9
Fix spelling: augement -> augment
zaneselvans Feb 6, 2024
6ca051b
Read parquet files from parquet dir in epacems_io_manager
zaneselvans Feb 6, 2024
0883831
Make parquet test compatible with changes to parquet_io_manager base …
zaneselvans Feb 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 4 additions & 2 deletions docker/gcp_pudl_etl.sh
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,10 @@ function update_nightly_branch() {
function clean_up_outputs_for_distribution() {
# Compress the SQLite DBs for easier distribution
gzip --verbose "$PUDL_OUTPUT"/*.sqlite && \
# Remove redundant multi-file EPA CEMS outputs prior to distribution
rm -rf "$PUDL_OUTPUT/core_epacems__hourly_emissions/" && \
# Grab the consolidated EPA CEMS outputs for distribution
cp "$PUDL_OUTPUT/parquet/core_epacems__hourly_emissions.parquet" "$PUDL_OUTPUT" && \
# Remove all other parquet output, which we are not yet distributing.
rm -rf "$PUDL_OUTPUT/parquet" && \
zaneselvans marked this conversation as resolved.
Show resolved Hide resolved
rm -f "$PUDL_OUTPUT/metadata.yml"
}

Expand Down
2 changes: 1 addition & 1 deletion src/pudl/analysis/allocate_gen_fuel.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ def gen_fuel_by_gen_esc_owner(
for freq in ["AS", "MS"]
for allocated_net_gen_asset in allocate_gen_fuel_asset_factory(
freq=freq,
io_manager_key="pudl_sqlite_io_manager",
io_manager_key="pudl_io_manager",
)
]

Expand Down
12 changes: 6 additions & 6 deletions src/pudl/analysis/mcoe.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def mcoe_asset_factory(
"bga": AssetIn(key="core_eia860__assn_boiler_generator"),
},
compute_kind="Python",
io_manager_key="pudl_sqlite_io_manager",
io_manager_key="pudl_io_manager",
)
def hr_by_unit_asset(gen: pd.DataFrame, bga: pd.DataFrame) -> pd.DataFrame:
return heat_rate_by_unit(gen_fuel_by_energy_source=gen, bga=bga)
Expand All @@ -69,7 +69,7 @@ def hr_by_unit_asset(gen: pd.DataFrame, bga: pd.DataFrame) -> pd.DataFrame:
"gens": AssetIn(key="_out_eia__yearly_generators"),
},
compute_kind="Python",
io_manager_key="pudl_sqlite_io_manager",
io_manager_key="pudl_io_manager",
)
def hr_by_gen_asset(
bga: pd.DataFrame, hr_by_unit: pd.DataFrame, gens: pd.DataFrame
Expand All @@ -86,7 +86,7 @@ def hr_by_gen_asset(
"frc": AssetIn(key=f"out_eia923__{agg_freqs[freq]}_fuel_receipts_costs"),
},
compute_kind="Python",
io_manager_key="pudl_sqlite_io_manager",
io_manager_key="pudl_io_manager",
)
def fc_asset(
hr_by_gen: pd.DataFrame, gens: pd.DataFrame, frc: pd.DataFrame
Expand All @@ -102,7 +102,7 @@ def fc_asset(
"gens": AssetIn(key="_out_eia__yearly_generators"),
},
compute_kind="Python",
io_manager_key="pudl_sqlite_io_manager",
io_manager_key="pudl_io_manager",
)
def cf_asset(gens: pd.DataFrame, gen: pd.DataFrame) -> pd.DataFrame:
return capacity_factor(gens=gens, gen=gen, freq=freq)
Expand Down Expand Up @@ -157,7 +157,7 @@ def cf_asset(gens: pd.DataFrame, gen: pd.DataFrame) -> pd.DataFrame:
),
),
},
io_manager_key="pudl_sqlite_io_manager",
io_manager_key="pudl_io_manager",
)
def mcoe_asset(
context, fuel_cost: pd.DataFrame, capacity_factor: pd.DataFrame
Expand All @@ -179,7 +179,7 @@ def mcoe_asset(
),
"gens": AssetIn(key="_out_eia__yearly_generators"),
},
io_manager_key="pudl_sqlite_io_manager",
io_manager_key="pudl_io_manager",
compute_kind="Python",
config_schema={
"all_gens": Field(
Expand Down
4 changes: 2 additions & 2 deletions src/pudl/analysis/plant_parts_eia.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@


@asset(
io_manager_key="pudl_sqlite_io_manager",
io_manager_key="pudl_io_manager",
zaneselvans marked this conversation as resolved.
Show resolved Hide resolved
compute_kind="Python",
)
def out_eia__yearly_generators_by_ownership(
Expand All @@ -381,7 +381,7 @@ def out_eia__yearly_generators_by_ownership(


@asset(
io_manager_key="pudl_sqlite_io_manager",
io_manager_key="pudl_io_manager",
compute_kind="Python",
)
def out_eia__yearly_plant_parts(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ def run_matching_model(features_train, features_all, y_df):
@op(
out={
"out_pudl__yearly_assn_eia_ferc1_plant_parts": Out(
io_manager_key="pudl_sqlite_io_manager"
io_manager_key="pudl_io_manager"
)
}
)
Expand Down
2 changes: 1 addition & 1 deletion src/pudl/analysis/service_territory.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ def dagster_compile_geoms(
ass
for entity in list(ENTITY_TYPE)
for ass in compiled_geoms_asset_factory(
entity_type=entity, io_manager_key="pudl_sqlite_io_manager"
entity_type=entity, io_manager_key="pudl_io_manager"
)
]

Expand Down
2 changes: 1 addition & 1 deletion src/pudl/analysis/state_demand.py
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ def total_state_sales_eia861(


@asset(
io_manager_key="pudl_sqlite_io_manager",
io_manager_key="pudl_io_manager",
compute_kind="Python",
config_schema={
"mean_overlaps": Field(
Expand Down
4 changes: 2 additions & 2 deletions src/pudl/etl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
epacems_io_manager,
ferc1_dbf_sqlite_io_manager,
ferc1_xbrl_sqlite_io_manager,
pudl_sqlite_io_manager,
pudl_mixed_format_io_manager,
)
from pudl.resources import dataset_settings, datastore, ferc_to_sqlite_settings
from pudl.settings import EtlSettings
Expand Down Expand Up @@ -95,7 +95,7 @@

default_resources = {
"datastore": datastore,
"pudl_sqlite_io_manager": pudl_sqlite_io_manager,
"pudl_io_manager": pudl_mixed_format_io_manager,
"ferc1_dbf_sqlite_io_manager": ferc1_dbf_sqlite_io_manager,
"ferc1_xbrl_sqlite_io_manager": ferc1_xbrl_sqlite_io_manager,
"dataset_settings": dataset_settings,
Expand Down
167 changes: 158 additions & 9 deletions src/pudl/etl/check_foreign_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
import sys

import click
from dagster import build_init_resource_context
import pandas as pd
import sqlalchemy as sa
from dotenv import load_dotenv

import pudl
from pudl.io_managers import pudl_sqlite_io_manager
from pudl.workspace.setup import PudlPaths

logger = pudl.logging_helpers.get_logger(__name__)

Expand All @@ -31,7 +32,17 @@
["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], case_sensitive=False
),
)
def pudl_check_fks(logfile: pathlib.Path, loglevel: str):
@click.option(
"--db_path",
help="Path to PUDL SQLite database where foreign keys should be checked.",
type=click.Path(
exists=False,
resolve_path=True,
path_type=pathlib.Path,
),
default=None,
)
def pudl_check_fks(logfile: pathlib.Path, loglevel: str, db_path: pathlib.Path):
"""Check that foreign key constraints in the PUDL database are respected.

Dagster manages the dependencies between various assets in our ETL pipeline,
Expand All @@ -50,15 +61,153 @@ def pudl_check_fks(logfile: pathlib.Path, loglevel: str):
# Display logged output from the PUDL package:
pudl.logging_helpers.configure_root_logger(logfile=logfile, loglevel=loglevel)

context = build_init_resource_context()
io_manager = pudl_sqlite_io_manager(context)

database_path = io_manager.base_dir / f"{io_manager.db_name}.sqlite"
logger.info(f"Checking foreign key constraints in {database_path}")
zaneselvans marked this conversation as resolved.
Show resolved Hide resolved
# Using PudlPaths to get default value for CLI causes validation issues
if not db_path:
db_path = PudlPaths().output_dir / "pudl.sqlite"

io_manager.check_foreign_keys()
check_foreign_keys(sa.create_engine(f"sqlite:///{db_path}"))
return 0


class ForeignKeyError(sa.exc.SQLAlchemyError):
"""Raised when data in a database violates a foreign key constraint."""

def __init__(
self, child_table: str, parent_table: str, foreign_key: str, rowids: list[int]
):
"""Initialize a new ForeignKeyError object.

Args:
child_table: The table that a foreign key constraint is applied to.
parent_table: The table that a foreign key constraint refers to.
foreign_key: Comma seperated string of key(s) that make up foreign key.
rowids: Rowid(s) of child_table where constraint failed (See
https://www.sqlite.org/lang_createtable.html#rowid for more on rowid's).
"""
self.child_table = child_table
self.parent_table = parent_table
self.foreign_key = foreign_key
self.rowids = rowids
zaneselvans marked this conversation as resolved.
Show resolved Hide resolved

def __str__(self):
"""Create string representation of ForeignKeyError object."""
return (
f"Foreign key error for table: {self.child_table} -- {self.parent_table} "
f"{self.foreign_key} -- on rows {self.rowids}\n"
)

def __eq__(self, other):
"""Compare a ForeignKeyError with another object."""
if isinstance(other, ForeignKeyError):
return (
(self.child_table == other.child_table)
and (self.parent_table == other.parent_table)
and (self.foreign_key == other.foreign_key)
and (self.rowids == other.rowids)
)
return False


class ForeignKeyErrors(sa.exc.SQLAlchemyError): # noqa: N818
"""Raised when data in a database violate multiple foreign key constraints."""

def __init__(self, fk_errors: list[ForeignKeyError]):
"""Initialize a new ForeignKeyErrors object."""
self.fk_errors = fk_errors

def __str__(self):
"""Create string representation of ForeignKeyErrors object."""
fk_errors = [str(x) for x in self.fk_errors]
return "\n".join(fk_errors)

def __iter__(self):
"""Iterate over the fk errors."""
return self.fk_errors

def __getitem__(self, idx):
"""Index the fk errors."""
return self.fk_errors[idx]


def _get_fk_list(engine: sa.Engine, table: str) -> pd.DataFrame:
"""Retrieve a dataframe of foreign keys for a table.

Description from the SQLite Docs: 'This pragma returns one row for each foreign
key constraint created by a REFERENCES clause in the CREATE TABLE statement of
table "table-name".'

The PRAGMA returns one row for each field in a foreign key constraint. This
method collapses foreign keys with multiple fields into one record for
readability.
"""
with engine.begin() as con:
table_fks = pd.read_sql_query(f"PRAGMA foreign_key_list({table});", con)

# Foreign keys with multiple fields are reported in separate records.
# Combine the multiple fields into one string for readability.
# Drop duplicates so we have one FK for each table and foreign key id
table_fks["fk"] = table_fks.groupby("table")["to"].transform(
lambda field: "(" + ", ".join(field) + ")"
)
table_fks = table_fks[["id", "table", "fk"]].drop_duplicates()

# Rename the fields so we can easily merge with the foreign key errors.
table_fks = table_fks.rename(columns={"id": "fkid", "table": "parent"})
table_fks["table"] = table
return table_fks


def check_foreign_keys(engine: sa.Engine):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pulled the foreign key check logic out of the io-manager and put it here directly, which seemed as good a place as anywhere to put it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that was my original idea as well. I do not like the name clash in that you end up with pudl.etl.check_foreign_keys.check_foreign_keys().

In the end, I moved this to helpers.py. That said, there doesn't really seem to be a good place in the codebase for miscellaneous stuff and that might be useful, e.g. pudl.misc where you can have assortment of small modules that don't naturally fit elsewhere. Putting this into check_foreign_keys.py is a bit awkward IMO, because of the naming as well as the fact that this module is CLI and I think that mixing CLI/binary capabilities and library capabilities (something you'd be importing elsewhere) seems a little dirty.

This is just a thought and I would not block this PR for this, as this can be easily refactored in the future.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fully agree, and had some of the same thoughts, but I checked and right now check_foreign_keys is only being called from the CLI, and in testing, so I decided it would be ok to leave here for now. Maybe it would make sense to change the function name to _check_foreign_keys to make it clear it's not meant to be public library code right now?

Part of the reason I put it here is because I think there's a much bigger conversation about code organization that I didn't want to tackle here (should probably try to reignite the github discussion on code org/structure).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fine for now. I think I added check_foreign_keys to the SQLiteIOManager class because I imaged we might have different methods for checking referential integrity for each IO Manager. For example, if we decided to drop sqlite and rely on a combination of parquet files and duckdb we'd need to create a new method of enforcing referential integrity.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, that makes sense.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's also some actual data transformation creeping into the etl subpackage which I thought was just supposed to be laying out the Dagster assets / config stuff (see glue_assets.py)

"""Check foreign key relationships in the database.

The order assets are loaded into the database will not satisfy foreign key
constraints so we can't enable foreign key constraints. However, we can
check for foreign key failures once all of the data has been loaded into
the database using the `foreign_key_check` and `foreign_key_list` PRAGMAs.

You can learn more about the PRAGMAs in the `SQLite docs
<https://www.sqlite.org/pragma.html#pragma_foreign_key_check>`__.

Raises:
ForeignKeyErrors: if data in the database violate foreign key constraints.
"""
logger.info(f"Running foreign key check on {engine.url} database.")
with engine.begin() as con:
fk_errors = pd.read_sql_query("PRAGMA foreign_key_check;", con)

if not fk_errors.empty:
# Merge in the actual FK descriptions
tables_with_fk_errors = fk_errors.table.unique().tolist()
table_foreign_keys = pd.concat(
[_get_fk_list(engine, table) for table in tables_with_fk_errors]
)

fk_errors_with_keys = fk_errors.merge(
table_foreign_keys,
how="left",
on=["parent", "fkid", "table"],
validate="m:1",
)

errors = []
# For each foreign key error, raise a ForeignKeyError
for (
table_name,
parent_name,
parent_fk,
), parent_fk_df in fk_errors_with_keys.groupby(["table", "parent", "fk"]):
errors.append(
ForeignKeyError(
child_table=table_name,
parent_table=parent_name,
foreign_key=parent_fk,
rowids=parent_fk_df["rowid"].values,
)
)
raise ForeignKeyErrors(errors)

logger.info("Success! No foreign key constraint errors found.")


if __name__ == "__main__":
sys.exit(pudl_check_fks())
3 changes: 2 additions & 1 deletion src/pudl/etl/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def get_pudl_etl_job():
@click.option(
"--gcs-cache-path",
type=str,
default="",
help=(
"Load cached inputs from Google Cloud Storage if possible. This is usually "
"much faster and more reliable than downloading from Zenodo directly. The "
Expand Down Expand Up @@ -128,7 +129,7 @@ def pudl_etl(

dataset_settings_config = etl_settings.datasets.model_dump()
process_epacems = True
if etl_settings.datasets.epacems is None:
if etl_settings.datasets.epacems is None or etl_settings.datasets.epacems.disabled:
process_epacems = False
# Dagster config expects values for the epacems settings even though
# the CEMS assets will not be executed. Fill in the config dictionary
Expand Down
2 changes: 1 addition & 1 deletion src/pudl/etl/eia_bulk_elec_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@


@asset(
io_manager_key="pudl_sqlite_io_manager",
io_manager_key="pudl_io_manager",
required_resource_keys={"datastore"},
)
def core_eia__yearly_fuel_receipts_costs_aggs(context):
Expand Down