New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bring the Census DP1 to SQLite ETL into dagster #2621
Conversation
…tive/pudl into 714-861-dagster
…DL, reorder output organization
I think we want to treat the Census DP1 like we do the |
After actually looking at the code, my response is: It looks like we do still generate the edit: old note from before I actually looked at the code is below: Still persisting the data in a SQLite file that we distribute makes sense to me! I would add that we can do this without making a whole new DAG - we could add a new subclass of a Now that we have a bunch of different instances of "thing that reads/writes from SQLite" though, it might make sense to refactor the code to better fit the use-cases/patterns we've discovered 🤷 . |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm glad this diff turned out so small, nice work!
A couple questions that I'd like you to respond to, though your answer to both could just be "no" 😅
@@ -152,7 +175,9 @@ def main(): | |||
|
|||
pudl_settings["sandbox"] = args.sandbox | |||
|
|||
censusdp1tract_to_sqlite(pudl_settings=pudl_settings, ds=ds, clobber=args.clobber) | |||
_ = censusdp1tract_to_sqlite( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you want to use this assignment somewhere else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This just returns the path to the saved DB, which we don't need to call anywhere else. The loggers for this function already print the output path, which would be the only possible application I could think of for this variable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, at this point could you just avoid the assignment altogether?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, I deleted it (and the whole CLI interface)! :)
src/pudl/output/censusdp1tract.py
Outdated
layer: Literal["state", "county", "tract"], pudl_settings=None, ds=None | ||
) -> gpd.GeoDataFrame: | ||
"""Select one layer from the Census DP1 database. | ||
@multi_asset( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding is that multi-assets are hard to parallelize - that's probably not a huge problem here, since there's only 3 assets anyways, but you might want to try making a bunch of separate assets.
Looks like you already have a bit of an asset factory idea going on, based on the name of the function, but this function is returning a list of Outputs as opposed to an Asset. Did you want to try something like the finished_eia_asset_factory
in transform/eia.py
?
def finished_eia_asset_factory(
table_name: str, io_manager_key: str | None = None
) -> AssetsDefinition:
"""An asset factory for finished EIA tables."""
clean_table_name = "clean_" + table_name
@asset(
ins={clean_table_name: AssetIn()},
name=table_name,
io_manager_key=io_manager_key,
)
def finished_eia_asset(**kwargs) -> pd.DataFrame:
"""Enforce PUDL DB schema on a cleaned EIA dataframe."""
df = convert_cols_dtypes(kwargs[clean_table_name], data_source="eia")
res = Package.from_resource_ids().get_resource(table_name)
return res.enforce_schema(df)
return finished_eia_asset
finished_eia_assets = [
finished_eia_asset_factory(table_name, io_manager_key="pudl_sqlite_io_manager")
for table_name in [
"boiler_fuel_eia923",
"coalmine_eia923",
"fuel_receipts_costs_eia923",
"generation_eia923",
"generation_fuel_eia923",
"generation_fuel_nuclear_eia923",
"ownership_eia860",
"emissions_control_equipment_eia860",
"boiler_emissions_control_equipment_assn_eia860",
"boiler_cooling_assn_eia860",
"boiler_stack_flue_assn_eia860",
]
]
If you follow that pattern it would look like
def census_asset_factory(
layer: str
) -> AssetsDefinition:
"""An asset factory for finished EIA tables."""
@asset(
ins={"censusdp1tract_to_sqlite": AssetIn()},
name=f"{layer}_censusdp1",
)
def census_layer(**kwargs) -> pd.DataFrame:
"""yada yada"""
dp1_engine = ...
def get_layer(...):
...
# could be nice to pass in the sql engine as opposed to closing over it while defining the get_layer function
layer_gdf = get_layer(layer, dp1_engine) # or, just in-line the body of get_layer since you're not calling it in a loop anymore?
return layer_gdf
return census_layer
census_dp1_layers = [
census_asset_factory(layer)
for layer in ["state", "county", "tract"]
]
It looks like it might be having trouble with some of the Python geospatial stack / Proj in the test environment. This is one of the things we typically install locally using
|
@zaneselvans In the long-term we'll want to figure out how to access interim dagster assets for tests, I'm sure. But I don't think this is one of those cases, since this issue gets effectively resolved in #2550 and the issue here is only to do with managing an interim workaround. I propose we merge that PR into this one and then merge both into dev if they pass the tests. |
Go for it, if that fixes the issue, great! |
Great! Will do after #2550 gets a proper review. |
Convert EIA-861 and FERC 714 service territory outputs to Dagster assets
…ive/pudl into census_dagster
Codecov ReportPatch coverage:
Additional details and impacted files@@ Coverage Diff @@
## dev #2621 +/- ##
=======================================
+ Coverage 87.2% 88.4% +1.1%
=======================================
Files 87 87
Lines 10155 10135 -20
=======================================
+ Hits 8864 8965 +101
+ Misses 1291 1170 -121
☔ View full report in Codecov by Sentry. |
Addresses issue #2412.
This PR brings
pudl.convert.censusdp1tract_to_sqlite
andpudl.output.censusdp1
into dagster, producing 3 pickled outputs (1 for each layer of the Census DP1 database that is of interest). As of now, these outputs are not written into the PUDL database. They currently have 100+ columns, and are mostly used for their geometries. We pickle these outputs using the current default IO manager and feed them into downstream outputs that are written into PUDL (e.g. the state demand outputs in #2550). This is what is currently in this PR.Remaining tasks:
pudl.output.censusdpt1tract.py
into dagster assetsPR Checklist
dev
).