-
-
Notifications
You must be signed in to change notification settings - Fork 105
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP: Initial conversion of the ETL to prefect flow #901
Conversation
1. Added task tagging with dataset/$blah, bundle/$bundle_name 2. Using the above annotation to create synthetic dependency between _transform_eia -> _transform_epacems. The latter reads the emitted csv files, which is pretty bad. 3. Inline _should_run_$dataset function and fix return types of the _etl_$x methods (return None if no tasks are generated)
Use prefect.List once instead of nested prefect.Set tasks. Flatten the list of lists into unique list of table names inside MetadataBundleMaker.
Use one task per (year, state) combo which allows massive parallelization. The current code may still have bugs w.r.t. extracting table names for datapkg metadata generation. Refactored eia->epacems dependency into singular task that gets injected into the mapped epacems tasks.
--use_dask_cluster ${scheduler_addr} enables this behavior.
WIP: Naming of these artifacts still feels somewhat clunky and there doesn't seem to be a good way to control the output path for this. It is also unclear how to swap LocalResult for cloud-based storage, but perhaps this is also not entirely necessary until after we have kubernetes based deployment (if this is needed/useful).
1. csv dumping method wrapped in prefect @task (write_datapackages) 2. created synthetic dependencies between write_datapackage and _extract_table_names to ensure synchronization (unclear if this is needed) 3. Make sure that we use {df_name: df} dicts as a structure exchanged between ETL tasks. 4. Clean up task tagging (use bundle/{bundle_name}/{datapkg_name}
1. Created DatasetPipeline abstraction and refactored the existing code to use that for the individual task construction. 2. Explicitly link eia into epacems via get_table() method and refactor load_plant_utc_offset to read dataframe directly. 3. created merge_dataframe_maps task that ensures that we always operate on {df_name: df} dictionaries. This is still WIP and the ETL currently fails.
This control whether the ETL will be run on the local dask cluster with task parallelization or use local executor that runs tasks serially.
Originally, this dtyping was embedded in the etl.py but it makes more sense here.
int64 dtype can't be cast from object dtype directly. It needs to be casted via float: object -> float -> int64. This modification finds all object dtyped columns that should be converted to int64 and converts them to float first.
1. add dtype conversion capability to merge_dataframe_maps (currently unused but may come in handy to simplify things) 2. use pudl_task_target_name callback to construct result paths that incorporate datapkg_name (for uniquenes) 3. configure cache validation for variety of tasks 4. mark eia tasks with the datapkg/${datapkg_name} for the purpose of pudl_task_target_name (WIP: needs to be applied everywhere)
Fix logic for object->int64 conversion which was broken. Improved debugging information. If dtype conversion fails, dump the offending dataframe to disk and run dtype conversion once again column by column to identify which column holds the offending data.
Pretty-print the json-style report from goodtables and trim the offending values to 5 max per error. This should make it easier to quickly identify what has gone wrong.
Added lots of docstrings and applied some automatic style fixes.
Moved settings validation to extract/ferc1.py, created simple prefect.task wrapper that initializes ferc1_to_sqlite database and wired this to Ferc1Pipeline such that only one ferc1_to_sqlite task is created in a flow. Plumbed some necessary arguments (etl_settings and clobber) through the intermediary layers.
This can be set to ALWAYS|ONCE|NEVER and controls how ferc1 sqlite database should be initialized.
--upload-to-gcs-bucket cmdline flag will kick off archival and upload of the datapackage files to bucket. Prefix ${uuid}/${doi} is used for the ETL artifacts.
Added simple tool for constructing requirements.txt from setup.py. Restructured docker build instructions to pre-install pudl dependencies before installing the package. This, along with experimental cache mounts, should result in build time speedups. Injecting requirement.txt and installing dependencies *before* the code is added to the container image should also help better utilize the native docker layer caching mechanism.
This triggers on push to master, dev and sprint* branches.
This controls whether dependency graph for prefect flow should be shown or not.
Codecov Report
@@ Coverage Diff @@
## dev #901 +/- ##
==========================================
- Coverage 83.32% 79.31% -4.00%
==========================================
Files 62 68 +6
Lines 6737 7067 +330
==========================================
- Hits 5613 5605 -8
- Misses 1124 1462 +338
Continue to review full report at Codecov.
|
…ctions from etl.py
…c tests to pytest
Ok! I’ve got dev merged into this branch with all the tests passing. I have also cleaned up the branch and tackled some TODOs Jan left. This branch is not complete (see remaining issues) but I think the largest changes have occurred and I would love some feedback. I left some questions in my review. |
"""Create an argument parser for the etl script.""" | ||
parser = argparse.ArgumentParser( | ||
description=__doc__, | ||
parents=[pudl.etl.command_line_flags()]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Jan split command args into etl.py and cli.py. Should all of them live in cli.py?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
was cli.py
initially just a place to setup everything for the etl? or did it also run the elt? If it is the latter than it makes sense to me to just have everything in cli.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm. Is there any practical reason for having the chunk of args in etl.py
? On the surface it seems much cleaner to pull all of the command args over here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suspect think all the args should probably be read and parsed in the CLI module and handed off. Unless maybe the same parsing logic is getting used by multiple CLIs which are all passing stuff to the ETL module? But in that case we probably need to consider a standalone CLI parsing module that can be re-used, or consolidation of the CLIs / entrypoint scripts.
pudl_db_path = Path(pudl_settings["sqlite_dir"]) / "pudl.sqlite" | ||
if pudl_db_path.exists() and not clobber: | ||
if pudl_db_path.exists() and not commandline_args.clobber: | ||
raise SystemExit( | ||
"The PUDL DB already exists, and we don't want to clobber it.\n" | ||
f"Move {pudl_db_path} aside or set clobber=True and try again." | ||
) | ||
|
||
# Configure how we want to obtain raw input data: | ||
ds_kwargs = dict( | ||
gcs_cache_path=gcs_cache_path, | ||
sandbox=pudl_settings.get("sandbox", False) | ||
) | ||
if use_local_cache: | ||
ds_kwargs["local_cache_path"] = Path(pudl_settings["pudl_in"]) / "data" | ||
|
||
validated_etl_settings = etl_settings.datasets | ||
# Setup pipeline cache | ||
configure_prefect_context(etl_settings, pudl_settings, commandline_args) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prefect setup is kind of split between cli and etl. I think I’ve been staring at this for too long and would love any ideas on how to make the setup cleaner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the CLI module should probably just be about gathering the and parsing the arguments from the user, and then handing them off to other modules that do the real work. Having argument parsing inside modules that aren't exported as scripts seems unusual to me.
shutil.rmtree(cache_root) | ||
|
||
|
||
def configure_prefect_context(etl_settings, pudl_settings, commandline_args): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prefect.context is a global var store that can be accessed during a flow run. Things like etl_settings, pudl_settings, overwrite_ferc1_db and pipeline_cache_path are all stored in prefect.context. Do y’all have any opinions on what should and should not live in this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be anything that's invariant across an ETL run, and will need to be accessed by different Tasks (which could be running on different notes, in the fullness of time)?
plants = pudl.transform.epacems.load_plant_utc_offset() | ||
|
||
# Wait to build CEMS until EIA is done if EIA is in the settings file. | ||
# If EIA is not in the settings file, go ahead and build CEMS on its own. | ||
if "eia" in prefect.context.get("datasets", {}).keys(): | ||
dfs_to_sqlite_task = flow.get_tasks(name="dfs_to_sqlite") | ||
logger.info("Setting EIA as a dependency of CEMS.") | ||
plants.set_dependencies(upstream_tasks=[dfs_to_sqlite_task]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Setting the eia and epacems dependency is a little awkward right now. EpaCemsPipeline has to look into the flow to see if the EIA pipeline is being run.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And if it is being run, it has to wait for it to complete so there's an SQLite DB to read from? And if it's not being run, it just reads from the DB directly?
Would it make sense to integrate SQLite with FSSpecResult class somehow, so we can have an input that's available based on the database existing -- either because the EIA ETL has completed, or because we're not running the EIA ETL?
# TODO(bendnorman): Should upload-to be supported right now? | ||
parser.add_argument( | ||
"--upload-to", | ||
type=str, | ||
default=os.environ.get('PUDL_UPLOAD_TO'), | ||
help="""A location (local or remote) where the results of the ETL run | ||
should be uploaded to. This path will be interpreted by fsspec so | ||
anything supported by that module is a valid destination. | ||
This should work with GCS and S3 remote destinations. | ||
Default value for this will be loaded from PUDL_UPLOAD_TO environment | ||
variable. | ||
Files will be stored under {upload_to}/{run_id} to avoid conflicts. | ||
""") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to support upload_to right now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'll need this as soon as we start running this remotely right? Otherwise we'll just end up with some files on the local disk that won't be accessible from anywhere else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is true. If we aren't running this remotely soon should it be included in this PR? I don't want to add functionality that isn't fully supported.
"""Create an argument parser for the etl script.""" | ||
parser = argparse.ArgumentParser( | ||
description=__doc__, | ||
parents=[pudl.etl.command_line_flags()]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
was cli.py
initially just a place to setup everything for the etl? or did it also run the elt? If it is the latter than it makes sense to me to just have everything in cli.py
"--timestamped-logfile", | ||
default="/tmp/pudl_etl.%F-%H%M%S.log", # nosec | ||
help="""If specified, also log to the timestamped logfile. The value of | ||
this flag is passed to strftime method of datetime.now().""") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this default mean that it will always log to the timestamped file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so yes, but it'll go to a temporary directory and the OS will clean it up automatically.
Is it okay to hard-code the /tmp/...
path here? Or do we want to ask the OS for a tmpdir? Is this hooked up to the Python logging system later hopefully? I guess we'll need a different kind of logging output for the cloud use case.
run_id = generate_run_id(args) | ||
prefect.context.pudl_run_id = run_id | ||
|
||
logger.warning( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this a warning
instead of info
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's just so it's visible even when the LOGLEVEL is lower than INFO.
"""Create an argument parser for the etl script.""" | ||
parser = argparse.ArgumentParser( | ||
description=__doc__, | ||
parents=[pudl.etl.command_line_flags()]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm. Is there any practical reason for having the chunk of args in etl.py
? On the surface it seems much cleaner to pull all of the command args over here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
omg I got through the whole thing. Lots of comments and questions inline and I may bring some comments over from the Prefect Docs and put them in the comments on the PR.
command: /pudl/src/release/data-release.sh | ||
# TODO(rousik): create validate service that validates stuff under build/ | ||
# - ${GCP_KEY_PATH}:/tmp/keys/gcp-keyfile.json:ro | ||
- datapkg:/pudl/outputs/datapkg |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there still datapackage stuff going on over here?
command: ["dask-worker", "tcp://dask-scheduler:8786"] | ||
volumes: | ||
# - ${GCP_KEY_PATH}:/tmp/keys/gcp-keyfile.json:ro | ||
- datapkg:/pudl/outputs/datapkg |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto datapackage question above.
volumes: | ||
pudldata: {} | ||
datapkg: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
datapackages!
datapkg_bundle_name: pudl-fast | ||
datapkg_bundle_doi: 10.5072/zenodo.123456 # Sandbox DOI... not real. | ||
datapkg_bundle_settings: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the datapkg settings attributes have been obsoleted now haven't they?
ferc1_to_sqlite_tables: | ||
- f1_respondent_id | ||
- f1_gnrt_plant | ||
- f1_steam | ||
- f1_fuel | ||
- f1_plant_in_srvce | ||
- f1_hydro | ||
- f1_pumped_storage | ||
- f1_purchased_pwr |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this list of FERC 1 tables truncated because it's just for testing purposes?
self.assertEqual(["first", "second"], final_dfc.get_table_names()) | ||
assert dfc1.get_table_names() == ["first"] | ||
assert dfc2.get_table_names() == ["second"] | ||
assert final_dfc.get_table_names() == ["first", "second"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this particular ordering of names guaranteed? Would ["second", "first"]
indicate failure? Could convert both sides to sets.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, get_table_names
uses sort
so it guarantees order (with the intention to make the results stable for easy unit testing).
|
||
def test_to_dict(self): | ||
with pytest.raises(dfc.TableExistsError): | ||
dfc1.union(dfc1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this supposed to be unioning dfc1
with itself? Or with dfc2
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bug. The original code unioned dfc1
and dfc2
.
@@ -193,7 +201,7 @@ def named_empty_df(df_name): | |||
names = ["a", "b", "c", "d"] | |||
res = dfc.merge_list(named_empty_df.map(names)) | |||
final = f.run().result[res].result | |||
self.assertEqual(["a", "b", "c", "d"], final.get_table_names()) | |||
assert final.get_table_names() == ["a", "b", "c", "d"] | |||
|
|||
def test_fanout_task(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm bewildered by this test. I guess I need to go look at the DFC class. Do you understand what fanout is doing, and the work that Prefect is doing in here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fanout task simply splits DFC into list of DFC with fixed number of tables each.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be used in situations like epacems where large number of tables come together in a single dfc and where you want to process this in parallel by splitting it into smaller chunks that can be processed in parallel.
class TestEtlSettings: | ||
"""Test EtlSettings.""" | ||
|
||
# TODO(bendnorman): The behavior of this method works and passes as an invidual unit test but fails when run with all unit tests. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Weird. Is it some kind of issue with the order in which tests get run?
|
I suppose you're referring to the dataframe serialization to disk? I do not think there's any canonical/universal way of doing this and the implementation choice depends on your specific usecase. This format essentially adds disk serialization feature to the preexisting data exchange format between tasks (collections of named dataframes) which allows you to 1. distribute the workload across machines, 2. reduces the memory pressure by offloading the intermediate states to disk. That said, this is totally something that can be switched/replaced with a different serialization mechanism if needed/wanted. I would view this as internal/development format so we should be using whatever is useful to us w.r.t. debuggability/maintenance and integration with the pipeline internals. In particular, we could use database but I think it would make sense only if:
Can you expand on what you mean by off-label use of prefect? The general idea is that Overall I do think that it might be valuable to offer higher-level API where end-users/developers will describe how individual tables are constructed or transformed and then let some translation layer turn this into prefect tasks. This might have the benefit of insulating end-users from the lower-level abstractions (prefect tasks) and enforcing some common structure. I do think that the prototype is somewhat leaky as we are still dealing with a bit of both (DatasetPipeline combined with prefect task annotated functions).
Can you elaborate on this thought a bit more?
I think it should still be possible to run this locally so using cloud resources should not be mandatory. However, I do think that this would allow us to run some automated tests/validation on the cloud, e.g. as part of the PR validation (e.g. PR can kick of github action that will run the pipeline and emit the results of the pipeline somewhere so that they can be checked for correctness or compared against main branch to highlight if the results have changed or not (noop validation)).
Yeah. I can even imagine situation where ETL would rely solely on the cloud storage mirror and another component/pipeline (datastore) would take care of synchronization.
Automatically determining what needs to be rerun is a hard problem (perhaps practically intractable). Trivial changes to libraries or some support code can have unexpected impact on all sorts of things. That said, we have options here:
|
Closing this as we chose Dagster in #1487 and I want to clean up our open issues a bit. |
This is still very much WIP and is for informative purposes only so that you can familiarize yourself with the changes that this introduces.
Couple of important features:
Overall, the flow is constructed in
etl.py
using${Dataset}Pipeline
objects that are responsible for adding relevant tasks to the flow. Many of the extract/transform methods are annotated with@task
decorators which turns them into prefect tasks, sometimes simple wrappers are used to aggregate some steps (e.g. epacems).DataFrameCollection
is used as a default data structure for passing dataframes between prefect tasks. This will ensure that data is written to disk and not held in memory.