Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize Dagster processing of EPA CEMS #2472

Merged
merged 18 commits into from
Apr 4, 2023
Merged

Parallelize Dagster processing of EPA CEMS #2472

merged 18 commits into from
Apr 4, 2023

Conversation

zschira
Copy link
Member

@zschira zschira commented Mar 30, 2023

Background

epacems is currently a major bottleneck in the ETL and could be easily parallelized. Here I've tried using Dagster's partitioned asets to attempt to achieve concurrency, but they don't seem to be behaving quite as expected/hoped. It seems that for every partition combination, it is a launching a run of the etl_fast/etl_full job, which running the entire ETL instead of just materializing all assets just once, then running each CEMS partition individually. This is unsurprisingly blowing up the resource usage on my computer and causing it to crash even when limiting the max concurrency to 5 runs.

Possible solutions

Option 1: Try to get partitions to work as desired

It would be ideal if we could just get partitions working as we want them to, but I'm not entirely sure if it's possible. I'm going to write up a question in Dagster's slack and see if we can get any help, so we'll se what comes of it.

edit: Just got a response from the Dagster slack, and they recommended creating a new asset job for the partitioned asset and running it separately. This will allow all of the epacems partitions to be generated concurrently, but they won't be running with the main ETL, so we do lose a little concurrency there. For that reason, option 2 might still be preferable, but I'd like to hear others thoughts?

Option 2: Create an asset factory to generate assets for each partition

This seems to me like a decent solution if we can't get partitions working. We could have the factory generate an asset for each year, and allow dagster to handle generating each of these assets concurrently. This behavior seems like exactly what we'd want, but it would add a bunch of assets that kind of create some clutter, but I don't think it would be too bad.

Option 3: Handle concurrency ourselves

We could just use normal python concurrency tools to handle this ourselves and only have one asset. I think doing our own concurrency wouldn't really be ideal, as I think performance will be best optimized if we let dagster handle it. The one advantage I see over the asset factory option is just that we don't have to create a bunch of assets

@zschira zschira requested a review from bendnorman March 30, 2023 16:14
@bendnorman
Copy link
Member

bendnorman commented Mar 30, 2023

Bummer the partitions aren't working out as planned :/

  • I like the asset factory idea but I don't think we'd be able to parameterize which years to process using the datasets_settings resource. To process specific years we'd have to select specific assets which would differ from how we parameterize all of the other datasources.
  • Another option I played around with at one point was wrapping a dynamic graph in an asset. This way we would have one epacems assets and we can select which years to process using the datasets_settings resource. This is the commit where I experimented with this option. We would want to collect() all of the years because it would blow up memory unless we incorporated dask.
  • I'm concerned about moving CEMS into it's own job. We'll end up with an awkward circular dependency, where the CEMS job depends on normalized pudl tables and output tables depend on the outputs of the CEMS job.
  • I also don't think handling parallel processing in a single epacems asset would be the worst idea.

@zaneselvans zaneselvans linked an issue Mar 30, 2023 that may be closed by this pull request
@zaneselvans
Copy link
Member

That seems like really weird behavior from Dagster. I wonder why their idea about how it should work is so different from ours.

@zaneselvans zaneselvans added epacems Integration and analysis of the EPA CEMS dataset. performance Make PUDL run faster! dagster Issues related to our use of the Dagster orchestrator labels Mar 30, 2023
@zaneselvans zaneselvans added this to the 2023Q1 milestone Mar 30, 2023
@zaneselvans zaneselvans added the parquet Issues related to the Apache Parquet file format which we use for long tables. label Mar 31, 2023
@zschira
Copy link
Member Author

zschira commented Mar 31, 2023

I think that dagster expects partitioned assets to depend only on assets with the same partitions. It sounds like they're working on adding functionality more like what we expect, but for the time being we have to go with another option.

@codecov
Copy link

codecov bot commented Mar 31, 2023

Codecov Report

Patch coverage: 100.0% and no project coverage change.

Comparison is base (62cb44a) 86.7% compared to head (25138da) 86.7%.

Additional details and impacted files
@@          Coverage Diff          @@
##             dev   #2472   +/-   ##
=====================================
  Coverage   86.7%   86.7%           
=====================================
  Files         81      81           
  Lines       9490    9504   +14     
=====================================
+ Hits        8233    8247   +14     
  Misses      1257    1257           
Impacted Files Coverage Δ
src/pudl/cli.py 60.0% <ø> (-1.0%) ⬇️
src/pudl/convert/epacems_to_parquet.py 64.7% <ø> (-1.1%) ⬇️
src/pudl/etl/epacems_assets.py 100.0% <100.0%> (ø)

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

☔ View full report in Codecov by Sentry.
📢 Do you have feedback about the report comment? Let us know in this issue.

@zaneselvans zaneselvans modified the milestones: 2023Q1, 2023Q2 Apr 3, 2023
@zschira zschira requested a review from zaneselvans April 3, 2023 19:58
@zschira zschira marked this pull request as ready for review April 3, 2023 19:58
src/pudl/etl/epacems_assets.py Show resolved Hide resolved
src/pudl/etl/epacems_assets.py Show resolved Hide resolved
src/pudl/etl/epacems_assets.py Show resolved Hide resolved
src/pudl/cli.py Show resolved Hide resolved
Copy link
Member

@zaneselvans zaneselvans left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the performance on this is totally good enough -- It went from 70 minutes to 14 minutes on my machine, and once it's interleaved with all of the other jobs in the DAG I don't think the additional granularity would speed things up much. It kept my 10 CPUs pegged to 100% for 10+ minutes.

I needed to merge in dev to run it locally and I tried to fix another lingering partitioning config that was in the integration tests, so I went ahead and pushed that merge back up to the PR. I hope that's okay!

I took at look at the Dagster docs for the other abstractions you're using here, which are different from how we're managing the other assets, and I wasn't totally clear on how it's working. Would you be willing to talk me through it? Maybe a little more of an explanation in the epacems_assets.py module-level docstring would be helpful, since it's so different from the straight-up assets we're using everywhere else?

But functionally it seems to work great!

@@ -30,6 +31,23 @@ def ferc_to_sqlite_settings(init_context) -> FercToSqliteSettings:
return FercToSqliteSettings(**init_context.resource_config)


class ParquetWriter(pq.ParquetWriter):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth preserving the ability to write out a single monolithic file directly, rather than compiling it after the fact from partitioned outputs? Once the data is in Parquet, it's extremely fast to read and write and it seems simpler to avoid the issue of concurrency altogether. We're hacking around it with SQLite because we don't have a choice unless we want to switch to another format for the DB entirely, but Parquet is designed to be sliced and diced.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now I don't think there's any way to directly write to the monolithic output. I was using this class for testing, but it's removed now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack! This was a "pending" comment that never got submitted from 4 days ago when I first saw you push to the branch.

plants_entity_eia,
)
)
return consolidate_partitions(partitions.collect())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you say more about what the meaning of the return value is here? consolidate_partitions() doesn't return anything itself and I'm not immediately understanding the Dagster docs on what's going on here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically a graph_asset like this has to return the output from an op for dagster to be able to figure out how to create the asset. In most cases the op would actually be returning something that would then get passed to an io_manager. This case is kind of confusing though because we are directly writing to the parquet outputs inside the op, so there's nothing to return.

src/pudl/etl/epacems_assets.py Show resolved Hide resolved
src/pudl/etl/epacems_assets.py Show resolved Hide resolved
src/pudl/etl/epacems_assets.py Show resolved Hide resolved
test/conftest.py Show resolved Hide resolved
@zaneselvans zaneselvans changed the title Convert cems to use dagster partitions Parallelize Dagster processing of EPA CEMS Apr 4, 2023
Copy link
Member

@zaneselvans zaneselvans left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like there's a docstring formatting issue that's breaking the docs build, but other than that this looks good to me.

@zschira zschira merged commit f5cc15f into dev Apr 4, 2023
@zschira zschira deleted the epacems_partitions branch April 4, 2023 19:23
@dstansby dstansby mentioned this pull request Jun 8, 2023
8 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
dagster Issues related to our use of the Dagster orchestrator epacems Integration and analysis of the EPA CEMS dataset. parquet Issues related to the Apache Parquet file format which we use for long tables. performance Make PUDL run faster!
Projects
Archived in project
Development

Successfully merging this pull request may close these issues.

Parallelize Dagster processing of EPA CEMS
3 participants