-
-
Notifications
You must be signed in to change notification settings - Fork 106
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dagster asset etl #2104
Dagster asset etl #2104
Conversation
…inal tables [no ci] This version of the EIA ETL uses the fs_io_manager for the interim tables and the SQLiteIOManager for the tables that are in our metadata. This way we don't have to worry about adding interim table metadata right now. This version produces a sqlite constraint error when adding the ownership_eia860.owner_zip_code field. All of the values are 5 digits so I'm not sure what's causing the error.
For some reason moving the dtype conversion to the transform function fixed the owner_zip_code constraint failure.
…ine attribute [no ci]
…IO manager [no ci]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Background
Here is my initial conversion of the EIA, static, EIA API and glue ETLs. Tables we want to persist are now assets. For example, the raw and pre-harvested and final EIA tables are now assets. The raw and pre-harvested tables are not saved to the database because I haven't created metadata for them yet. Tables we want to persist in the database need metadata so we can properly maintain column types as they move in and out of the database.
To test this branch out you'll need to recreate your environment and set two environment variables. Set the PUDL_CACHE env var to your .../pudl-work/data
directory. Create a new directory for ETL outputs and set its path to PUDL_OUTPUT.
To start dagit, run:
dagit -f src/pudl/respository.py
To run the fast ETL, click on the etl_fast
job in the pudl dagster repository. Then click "Materialize All". The final database will be at "$PUDL_OUTPUT/sqlite/pudl.sqlite". You can rerun subsets of the ETL by clicking on the assets you want to recreate and click "Materialize Selected".
I left a handful of # TODO (bendnorman):
comments that might provide some additional context or future improvements.
Discussion topics
- Clobbering the database isn't necessary when you want to recreate a table because the contents of the table are deleted before the new data is loaded to the database. However, you'll need to manually delete the database when you make a metadata change because the db metadata is created when the sqlite file is created. I think we have two options 1) just make it clear in documentation that you need to delete the sqlite file when you change the metadata or 2) start using database migrations with something like alembic. Database migrations allow you to version control database schemas and apply changed incrementally.
- For this iteration, I tried to keep the package structure mostly the same so the PR diff doesn't get crazy. I could imagine organizing the package differently in the future so it's clearer how all of the assets and modules relate to one another.
Next steps
- Convert the FERC ETL to use assets.
- Remove deprecated code like most of the functions in
pudl.etl
and thepudl_settings
object. - Start to convert the test suite to accommodate the dagster concepts.
setup.py
Outdated
"coloredlogs>=14.0,<15.1", # Dagster requires 14.0 | ||
"catalystcoop.ferc_xbrl_extractor @ git+https://github.com/catalyst-cooperative/ferc-xbrl-extractor.git@dev", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I relaxed the coloredlogs version range for the ferc_xbrl_extractor package because dagster requires 14.0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is on main now, so you can use version 0.8.0 for ferc_xbrl_extractor
if you want
src/pudl/etl.py
Outdated
@asset( | ||
io_manager_key="pudl_sqlite_io_manager", | ||
required_resource_keys={"datastore"}, | ||
group_name="eia_api", | ||
) | ||
def fuel_receipts_costs_aggs_eia(context): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be a multi_asset if we are expecting to extract additional tables from the EIA API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this was defined as a multi_asset instead of an asset does that mean that all the assets in this eia_api
group would have to be materialized together and you wouldn't be able to just get the fuel_receipts_costs_aggs_eia
table? Although I guess these API tables are independent in the DAG so it seems like you are able to materialize them individually?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, if this was defined as a multi_asset, all of the assets in the eia_api
group would have to be materialized together.
# if we're not ingesting eia, exclude eia only tables | ||
if not eia: | ||
glue_dfs = {name: df for (name, df) in glue_dfs.items() if "_eia" not in name} | ||
# if we're not ingesting ferc, exclude ferc1 only tables | ||
if not ferc1: | ||
glue_dfs = {name: df for (name, df) in glue_dfs.items() if "_ferc1" not in name} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed this logic because dagster expects a multi_asset to return the same set of tables each run.
src/pudl/io_managers.py
Outdated
"pudl_output_path": Field( | ||
str, | ||
description="Path of directory to store the database in.", | ||
default_value=os.environ.get("PUDL_OUTPUT"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm thinking we can replace the pudl_setting object by letting users specify output and input directories as environment vars.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that this seems more approachable to users than the pudl_settings object.
Agree an error here if those vars aren't set would be nice, as I tried to import pudl
without setting them and got a DagsterInvalidConfigError
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I resolved this issue by creating an EnvVar config type (See previous commit). However I reverted back to using the pudl_settings
object because the PudlTabl
, our examples and documentation expect it. I think it makes to deprecate the pudl_settings object once all of PUDL has been converted to use dagster.
src/pudl/settings.py
Outdated
# We do not want to configure tables so remove tables from the dict | ||
d.pop("tables", None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dagster expects assets to produce the same tables each time so we don't want users selecting subsets of tables using a config file. If folks want to create or recreate subset of tables they should use the UI.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could also remove the table argument from the settings objects so we don't have to remove them from this dictionary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like the tables argument is used in a few different contexts in the settings object but maybe can become obsolete? There could also be a message attached to this if/when the tables key is removed from the dictionary that prompts peeps to use the UI
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the tables argument can become obsolete by the end of this refactor. If not, I like the idea of adding a warning or info message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think using the UI to select subsets of tables will generally simplify things substantially
Wow @bendnorman super cool! This will be a huge improvement in all of our lives. I had fun playing around with the UI and everything worked pretty well for me. I left a few comments in response to your comments. Most of your TODOs felt pretty on point with the feedback I'd have for you just from my initial poking around. In particular, I think allowing for loading subsets of the glue tables (and some of the other multi assets). It doesn't take very long to create all the glue tables but I do think it would be nice to load subsets. Alternatively, separating out into different assets would also make the DAG more readable in the UI and makes more sense with the actual dependencies. |
I think that moving towards database migrations seems like a nice thing to have. Maybe doesn't need to happen right alongside the Dagster refactor, but generally having schema changes more explicitly version controlled would probably be really nice for any PUDL users. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks really good! I just have a few questions here, but happy to continue to give feedback as the design moves along. So far though, I think this all makes sense and will be really nice once everything is converted over!
src/pudl/etl.py
Outdated
Returns: | ||
A tuple of extracted EIA dataframes. | ||
""" | ||
eia_settings = context.resources.dataset_settings.eia |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not having to directly pass around settings and resources like the datastore will be super handy
@@ -0,0 +1,354 @@ | |||
"""Dagster IO Managers.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this something that can/should be generalized and pulled out of PUDL? It seems like a SQLiteIOManager could be a pretty general tool that other Dagster users might want to use.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd love to create a community SQLiteIOManager eventually. Maybe once PUDL is up an running with dagster we can make a contribution to the dagster codebase.
src/pudl/settings.py
Outdated
# We do not want to configure tables so remove tables from the dict | ||
d.pop("tables", None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think using the UI to select subsets of tables will generally simplify things substantially
I decided to create a dynamic graph that produces the hourly_emissions_epacems asset . I created two IO Managers to support producing a single combined parquet file and a collection of partitioned parquet files. Using pd.cocnat() to combine the partitions is problematic because it might blow up memory trying to concat >5GB of dataframes together. Will remove this step and have an IO Manager write each partition to the combined parquet file in parallel.
I honestly couldn't get the partitioned assets to work. It took almost a minute to launch sub processes for each partition, duplicate partitions were being launched. Users also would have to deal with the dagaster daemon and they couldn't configure resources for partition runs unless a configured job is created. I'm committing these changes incase we want to refer back
…le without an IO Manager [no ci] This strategy is simpler than the other options laid out in #2084. The downside is that we can't leverage IO Managers to write the data. Not a huge con compared to the other options.
* Made sure that modules and asset group names use our existing data source short codes wherever possible (eia860, eia923, ferc1, eia_bulk_elec, epacems, etc.) * Split "output_assets" into denormalized_assets and analysis_assets. These are still placeholders, but I think we'll want to differentiate between those categories since what's inside them is pretty different (simple combinations of other tables vs. complex, novel derived values). * Removed the _assets suffix from all the asset groups. I think it will be clear from the context that these identifiers are being used in that they pertain to asset groups. I could be convinced otherwise though. * Split EIA assets into "raw" "clean" and "norm" (normalized) groups. Split the pre-harvesting EIA groups by original data source (eia860 vs. eia923) * I think there's a bunch of unnecessary tangling of dependencies inside the eia_transform function that we should untangle, but not right now. * I tried and failed (again) to find a way to grab all the asset keys for assets by asset group (without running into circular import issues) and so ended up enumerating the individual EIA-860 and EIA-923 input assets for the eia_transform function. This let me split the raw & clean EIA assets out into their own groups and seemed more readable. It also removed some top-level definitions that would have been globally accessible, and moved all of the asset group definitions into the pudl.etl module. * I noticed that the censusdp1tract database doesn't seem to show up as a resource or an asset anywhere. Should we add that to keep things uniform? It only has 3-4 tables, so maybe each of them can be its own asset? Or should it be analogous to the FERC 1 DBs?
…anager-tests Throw error when db is missing table schema
Standardize asset group and asset module names
Rename check_pudl_fks to pudl_check_fks to align w/ other CLI names
…onda Pin grpcio==1.46.1 (arm64 compatible version available on conda-forge)
…ebooks Convert devtools notebooks to use dagster concepts
Check out this pull request on See visual diffs & provide feedback on Jupyter Notebooks. Powered by ReviewNB |
* parameterize `pudl.workspace.setup.get_defaults` so we can override behavior * test settings overriding behavior * override settings in centralized locations for test instead of ad-hoc, everywhere --------- Co-authored-by: zschira <zach.schira@catalyst.coop>
* Use PUDL_INPUT/PUDL_OUTPUT environment variables in Docker * Make IO directories *before* chowning so that catalyst user can still use them later. * Format pytest args so they don't mis-parse * Fix side effects of pudl_environment test; add tests for EnvVar configuration; remove obsolete database_test.py
PR Checklist
dev
).