Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bring the Census DP1 to SQLite ETL into dagster #2621

Merged
merged 50 commits into from Jun 21, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
da77770
WIP: stashing changes
e-belfer Apr 26, 2023
2c5af18
Add non-spatial dataframe to dagster
e-belfer Apr 27, 2023
f4395fc
Merge branch 'dev' into 714-861-dagster
e-belfer Apr 27, 2023
f0060a7
Add tables into PUDL metadata
e-belfer Apr 27, 2023
6b2c432
Merge branch 'dev' into 714-861-dagster
e-belfer May 15, 2023
ea3ee34
Fix metadata and fk error for compiled_geom tables
e-belfer May 15, 2023
ecb5826
Revert spatial test to match new geopandas output
e-belfer May 15, 2023
efa2cd3
Add 714 outputs to default io mgr
e-belfer May 16, 2023
e3672fb
Add georef resp and counties, summarized_demand_ferc714
e-belfer May 29, 2023
b74dfe0
Merge branch 'dev' into 714-861-dagster
zaneselvans May 30, 2023
06fe62e
Deduplicate particulate_control_id_eia
zaneselvans May 30, 2023
539dbb0
Add FERC714 tables to metadata, confirm all tables identical
e-belfer May 30, 2023
1e2136b
Merge branch '714-861-dagster' of https://github.com/catalyst-coopera…
e-belfer May 30, 2023
70f6fa0
Fix migrations, start tests, add 714 to PUDL
e-belfer May 30, 2023
6610ace
Merge branch 'dev' into 714-861-dagster
e-belfer May 31, 2023
706c80a
Fix FK errors
e-belfer May 31, 2023
442305c
Add validation and integration tests, add state_demand output into PU…
e-belfer May 31, 2023
c90dff1
Functional Census dagster integration, first pass
e-belfer Jun 2, 2023
48dbf82
Updated working Census ETL that pickles all output layers
e-belfer Jun 6, 2023
ecdfb1f
Merge branch 'dev' into census_dagster
e-belfer Jun 6, 2023
2bce0ef
Fix read-in of census layers, multi-asset -> asset factory
e-belfer Jun 7, 2023
a51c6f1
Merge branch 'census_dagster' of https://github.com/catalyst-cooperat…
e-belfer Jun 7, 2023
49b771d
Merge branch 'dev' into census_dagster
e-belfer Jun 7, 2023
e265f10
Remove census from CLI test, fix GH runner
e-belfer Jun 7, 2023
e87844e
Merge branch 'dev' into census_dagster
e-belfer Jun 8, 2023
6dc3499
Materialize census outputs in FERC714 tests
e-belfer Jun 8, 2023
82cde06
Merge branch 'dev' into census_dagster
e-belfer Jun 8, 2023
ff2e979
Merge branch 'census_dagster' into 714-861-dagster
e-belfer Jun 8, 2023
ff117c1
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jun 8, 2023
5a5bd81
update migrations and census read-in
e-belfer Jun 8, 2023
6fb63ef
Merge branch '714-861-dagster' of https://github.com/catalyst-coopera…
e-belfer Jun 8, 2023
bcfc4ca
Update state_demand.py
e-belfer Jun 8, 2023
e78951b
Clean up testing environment from WIP census read-ins
e-belfer Jun 8, 2023
8130143
Merge branch '714-861-dagster' of https://github.com/catalyst-coopera…
e-belfer Jun 8, 2023
581a195
Merge branch 'dev' into census_dagster
e-belfer Jun 10, 2023
7d185d5
Merge branch 'census_dagster' into 714-861-dagster
e-belfer Jun 10, 2023
4168a6d
Remove accidental field change, add release notes
e-belfer Jun 12, 2023
03355e9
Remove outdated args in function
e-belfer Jun 12, 2023
df1edc0
Address first round of PR comments
e-belfer Jun 16, 2023
68f9c59
Prune intermediate assets
e-belfer Jun 20, 2023
f8cc104
Add type hints and clean up docs
e-belfer Jun 20, 2023
00df2d9
Rename asset groups, expand docstring
e-belfer Jun 21, 2023
423dd39
Merge pull request #2550 from catalyst-cooperative/714-861-dagster
e-belfer Jun 21, 2023
55c0564
Merge branch 'dev' into census_dagster
e-belfer Jun 21, 2023
ddd289d
Merge branch 'dev' into census_dagster
e-belfer Jun 21, 2023
5b3aedf
Update alembic migrations
e-belfer Jun 21, 2023
929518b
Merge branch 'dev' into census_dagster
e-belfer Jun 21, 2023
acf6a4f
Merge branch 'census_dagster' of https://github.com/catalyst-cooperat…
e-belfer Jun 21, 2023
eb9bab6
Update release notes
e-belfer Jun 21, 2023
9987ca4
Update release notes to more accurately reflect revisions
e-belfer Jun 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 29 additions & 0 deletions src/pudl/census_to_sqlite/__init__.py
@@ -0,0 +1,29 @@
"""Dagster definitions for the FERC to SQLite process."""

from dagster import Definitions, graph

import pudl
from pudl.convert.censusdp1tract_to_sqlite import censusdp1tract_to_sqlite
from pudl.resources import datastore

logger = pudl.logging_helpers.get_logger(__name__)


@graph
def census_to_sqlite():
"""Clone the Census DP1 database into SQLite."""
censusdp1tract_to_sqlite()


default_resources_defs = {
"datastore": datastore,
}

census_to_sqlite = census_to_sqlite.to_job(
resource_defs=default_resources_defs,
name="census_to_sqlite",
)

defs: Definitions = Definitions(jobs=[census_to_sqlite])
"""A collection of dagster assets, resources, IO managers, and jobs for the FERC to
SQLite ETL."""
147 changes: 147 additions & 0 deletions src/pudl/census_to_sqlite/cli.py
@@ -0,0 +1,147 @@
"""A script for cloning the Census DP1 database into SQLite.

This script generates a SQLite database that is a clone/mirror of the original
Census DP1 database. We use this cloned database as the starting point for the
main PUDL ETL process. The underlying work in the script is being done in
:mod:`pudl.extract.ferc1`.
"""
import argparse
import sys
from collections.abc import Callable

from dagster import (
DagsterInstance,
JobDefinition,
build_reconstructable_job,
execute_job,
)

import pudl
from pudl import census_to_sqlite
from pudl.settings import EtlSettings

# Create a logger to output any messages we might have...
logger = pudl.logging_helpers.get_logger(__name__)


def parse_command_line(argv):
"""Parse command line arguments. See the -h option.

Args:
argv (str): Command line arguments, including caller filename.

Returns:
dict: Dictionary of command line arguments and their parsed values.
"""
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"settings_file", type=str, default="", help="path to YAML settings file."
)
parser.add_argument(
"--logfile",
default=None,
type=str,
help="If specified, write logs to this file.",
)
parser.add_argument(
"-c",
"--clobber",
action="store_true",
help="""Clobber existing sqlite database if it exists. If clobber is
not included but the sqlite databse already exists the _build will
fail.""",
default=False,
)
parser.add_argument(
"--sandbox",
action="store_true",
default=False,
help="Use the Zenodo sandbox rather than production",
)
parser.add_argument(
"--gcs-cache-path",
type=str,
help="Load datastore resources from Google Cloud Storage. Should be gs://bucket[/path_prefix]",
)
parser.add_argument(
"--loglevel",
help="Set logging level (DEBUG, INFO, WARNING, ERROR, or CRITICAL).",
default="INFO",
)
arguments = parser.parse_args(argv[1:])
return arguments


def census_to_sqlite_job_factory(
logfile: str | None = None, loglevel: str = "INFO"
) -> Callable[[], JobDefinition]:
"""Factory for parameterizing a reconstructable census_to_sqlite job.

Args:
loglevel: The log level for the job's execution.
logfile: Path to a log file for the job's execution.

Returns:
The job definition to be executed.
"""

def get_census_to_sqlite_job():
"""Module level func for creating a job to be wrapped by reconstructable."""
return census_to_sqlite.census_to_sqlite.to_job(
resource_defs=census_to_sqlite.default_resources_defs,
name="census_to_sqlite_job",
)

return get_census_to_sqlite_job


def main(): # noqa: C901
"""Clone the Census database into SQLite."""
args = parse_command_line(sys.argv)

# Display logged output from the PUDL package:
pudl.logging_helpers.configure_root_logger(
logfile=args.logfile, loglevel=args.loglevel
)

etl_settings = EtlSettings.from_yaml(args.settings_file)

# Set PUDL_INPUT/PUDL_OUTPUT env vars from .pudl.yml if not set already!
pudl.workspace.setup.get_defaults()

census_to_sqlite_reconstructable_job = build_reconstructable_job(
"pudl.census_to_sqlite.cli",
"census_to_sqlite_job_factory",
reconstructable_kwargs={"loglevel": args.loglevel, "logfile": args.logfile},
)

result = execute_job(
census_to_sqlite_reconstructable_job,
instance=DagsterInstance.get(),
run_config={
"resources": {
"census_to_sqlite_settings": {
"config": etl_settings.census_to_sqlite_settings.dict()
},
"datastore": {
"config": {
"sandbox": args.sandbox,
"gcs_cache_path": args.gcs_cache_path
if args.gcs_cache_path
else "",
},
},
},
},
raise_on_error=True,
)

# Workaround to reliably getting full stack trace
if not result.success:
for event in result.all_events:
if event.event_type_value == "STEP_FAILURE":
raise Exception(event.event_specific_data.error)


if __name__ == "__main__":
sys.exit(main())
38 changes: 30 additions & 8 deletions src/pudl/convert/censusdp1tract_to_sqlite.py
Copy link
Member

@jdangerx jdangerx Jun 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I guess we could get rid of the CLI interface here completely? Or refactor it to just wrap around however dagster asset materialize works? 🤷 Maybe for a different PR.

Copy link
Member Author

@e-belfer e-belfer Jun 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we're going to have trouble providing the datastore resource to the CLI while we're still using legacy resources (see here). I've gone ahead and gotten rid of the CLI.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔥 🔥 🔥

Expand Up @@ -21,13 +21,36 @@
from pathlib import Path
from tempfile import TemporaryDirectory

from dagster import Field, op

import pudl
from pudl.helpers import EnvVar
from pudl.workspace.datastore import Datastore

logger = pudl.logging_helpers.get_logger(__name__)


def censusdp1tract_to_sqlite(pudl_settings=None, year=2010, ds=None, clobber=False):
@op(
config_schema={
"pudl_output_path": Field(
EnvVar(
env_var="PUDL_OUTPUT",
),
description="Path of directory to store the database in.",
default_value=None,
),
"clobber": Field(
bool, description="Clobber existing Census database.", default_value=False
),
"year": Field(
int,
description="Year of Census data to extract (currently must be 2010).",
default_value=2010,
),
},
required_resource_keys={"datastore"},
)
def censusdp1tract_to_sqlite(context):
"""Use GDAL's ogr2ogr utility to convert the Census DP1 GeoDB to an SQLite DB.

The Census DP1 GeoDB is read from the datastore, where it is stored as a
Expand All @@ -36,13 +59,10 @@ def censusdp1tract_to_sqlite(pudl_settings=None, year=2010, ds=None, clobber=Fal
resulting SQLite DB file is put in the PUDL output directory alongside the
ferc1 and pudl SQLite databases.

Args:
pudl_settings (dict): A PUDL settings dictionary.
year (int): Year of Census data to extract (currently must be 2010)

Returns:
None
"""
ds = context.resources.datastore
if ds is None:
ds = Datastore()
# If we're in a conda environment, use the version of ogr2ogr that has been
Expand All @@ -60,12 +80,14 @@ def censusdp1tract_to_sqlite(pudl_settings=None, year=2010, ds=None, clobber=Fal
with TemporaryDirectory() as tmpdir:
# Use datastore to grab the Census DP1 zipfile
tmpdir_path = Path(tmpdir)
zip_ref = ds.get_zipfile_resource("censusdp1tract", year=year)
zip_ref = ds.get_zipfile_resource(
"censusdp1tract", year=context.op_config["year"]
)
extract_root = tmpdir_path / Path(zip_ref.filelist[0].filename)
out_path = Path(pudl_settings["pudl_out"]) / "censusdp1tract.sqlite"
out_path = Path(context.op_config["pudl_output_path"]) / "censusdp1tract.sqlite"

if out_path.exists():
if clobber:
if context.op_config["clobber"]:
out_path.unlink()
else:
raise SystemExit(
Expand Down
3 changes: 3 additions & 0 deletions src/pudl/etl/__init__.py
Expand Up @@ -39,6 +39,9 @@
*load_assets_from_modules([pudl.transform.ferc1], group_name="norm_ferc1"),
*load_assets_from_modules([pudl.extract.ferc714], group_name="raw_ferc714"),
*load_assets_from_modules([pudl.transform.ferc714], group_name="clean_ferc714"),
*load_assets_from_modules(
e-belfer marked this conversation as resolved.
Show resolved Hide resolved
[pudl.convert.censusdp1tract_to_sqlite], group_name="censusdp1"
),
*load_assets_from_modules([glue_assets], group_name="glue"),
*load_assets_from_modules([static_assets], group_name="static"),
*load_assets_from_modules(
Expand Down