Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize Dagster processing of EPA CEMS #2472

Merged
merged 18 commits into from
Apr 4, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions src/pudl/resources.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Collection of Dagster resources for PUDL."""
from multiprocessing import Lock
from pathlib import Path

import pyarrow.parquet as pq
Expand Down Expand Up @@ -30,6 +31,23 @@ def ferc_to_sqlite_settings(init_context) -> FercToSqliteSettings:
return FercToSqliteSettings(**init_context.resource_config)


class ParquetWriter(pq.ParquetWriter):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth preserving the ability to write out a single monolithic file directly, rather than compiling it after the fact from partitioned outputs? Once the data is in Parquet, it's extremely fast to read and write and it seems simpler to avoid the issue of concurrency altogether. We're hacking around it with SQLite because we don't have a choice unless we want to switch to another format for the DB entirely, but Parquet is designed to be sliced and diced.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now I don't think there's any way to directly write to the monolithic output. I was using this class for testing, but it's removed now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack! This was a "pending" comment that never got submitted from 4 days ago when I first saw you push to the branch.

"""Subclass of ParquetWriter to provide synchronization around writes."""

def __init__(self, *args, **kwargs):
"""Initialize base class and create lock."""
super().__init__(*args, **kwargs)
self.lock = Lock()

def write_table(self, *args, **kwargs):
"""Acquire lock, then write table."""
self.lock.acquire()
try:
super().write_table(*args, **kwargs)
finally:
self.lock.release()


@resource(
config_schema={
"pudl_output_path": Field(
Expand All @@ -41,15 +59,15 @@ def ferc_to_sqlite_settings(init_context) -> FercToSqliteSettings:
),
}
)
def pq_writer(init_context) -> pq.ParquetWriter:
def pq_writer(init_context) -> ParquetWriter:
"""Get monolithic parquet writer."""
monolithic_path = (
Path(init_context.resource_config["pudl_output_path"])
/ "hourly_emissions_epacems.parquet"
)
schema = Resource.from_id("hourly_emissions_epacems").to_pyarrow()

with pq.ParquetWriter(
with ParquetWriter(
where=monolithic_path, schema=schema, compression="snappy", version="2.6"
) as monolithic_writer:
yield monolithic_writer
Expand Down