Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP Transition CEMS paritions to year_quarter from year and quarter #3139

Merged
merged 12 commits into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
32 changes: 21 additions & 11 deletions src/pudl/etl/epacems_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@
from dagster import AssetIn, DynamicOut, DynamicOutput, asset, graph_asset, op

import pudl
from pudl.extract.epacems import EpaCemsPartition
from pudl.metadata.classes import Resource
from pudl.metadata.enums import EPACEMS_STATES
from pudl.workspace.setup import PudlPaths

logger = pudl.logging_helpers.get_logger(__name__)


YearPartitions = namedtuple("YearPartitions", ["year", "quarters"])
YearPartitions = namedtuple("YearPartitions", ["year_quarters"])


@op(
Expand All @@ -38,7 +39,10 @@ def get_years_from_settings(context):
parallel.
"""
epacems_settings = context.resources.dataset_settings.epacems
for year in epacems_settings.years:
years = {
EpaCemsPartition(year_quarter=yq).year for yq in epacems_settings.year_quarters
}
for year in years:
yield DynamicOutput(year, mapping_key=str(year))


Expand Down Expand Up @@ -68,23 +72,29 @@ def process_single_year(
partitioned_path = PudlPaths().output_dir / "hourly_emissions_epacems"
partitioned_path.mkdir(exist_ok=True)

for quarter in epacems_settings.quarters:
logger.info(f"Processing EPA CEMS hourly data for {year}-{quarter}")
df = pudl.extract.epacems.extract(year=year, quarter=quarter, ds=ds)
year_quarters_in_year = {
yq
for yq in epacems_settings.year_quarters
if EpaCemsPartition(year_quarter=yq).year == year
}

for year_quarter in year_quarters_in_year:
logger.info(f"Processing EPA CEMS hourly data for {year_quarter}")
df = pudl.extract.epacems.extract(year_quarter=year_quarter, ds=ds)
if not df.empty: # If state-year combination has data
df = pudl.transform.epacems.transform(df, epacamd_eia, plants_entity_eia)
table = pa.Table.from_pandas(df, schema=schema, preserve_index=False)

# Write to a directory of partitioned parquet files
with pq.ParquetWriter(
where=partitioned_path / f"epacems-{year}-{quarter}.parquet",
where=partitioned_path / f"epacems-{year_quarter}.parquet",
schema=schema,
compression="snappy",
version="2.6",
) as partitioned_writer:
partitioned_writer.write_table(table)

return YearPartitions(year, epacems_settings.quarters)
return YearPartitions(year_quarters_in_year)


@op
Expand All @@ -102,20 +112,20 @@ def consolidate_partitions(context, partitions: list[YearPartitions]) -> None:
with pq.ParquetWriter(
where=monolithic_path, schema=schema, compression="snappy", version="2.6"
) as monolithic_writer:
for year, quarters in partitions:
for year_partition in partitions:
for state in EPACEMS_STATES:
monolithic_writer.write_table(
# Concat a slice of all state data from all quarters in a year
# Concat a slice of each state's data from all quarters in a year
# and write to parquet to create year-state row groups
pa.concat_tables(
[
pq.read_table(
source=partitioned_path
/ f"epacems-{year}-{quarter}.parquet",
/ f"epacems-{year_quarter}.parquet",
filters=[[("state", "=", state.upper())]],
schema=schema,
)
for quarter in quarters
for year_quarter in year_partition.year_quarters
]
)
)
Expand Down
37 changes: 22 additions & 15 deletions src/pudl/extract/epacems.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@
during the transform process with help from the crosswalk.
"""
from pathlib import Path
from typing import NamedTuple
from typing import Annotated

import pandas as pd
from pydantic import BaseModel, StringConstraints

import pudl.logging_helpers
from pudl.metadata.classes import Resource
Expand Down Expand Up @@ -97,23 +98,31 @@
"""Set: The set of EPA CEMS columns to ignore when reading data."""


class EpaCemsPartition(NamedTuple):
class EpaCemsPartition(BaseModel):
"""Represents EpaCems partition identifying unique resource file."""

year: str
quarter: str
year_quarter: Annotated[
str, StringConstraints(strict=True, pattern=r"^(19|20)\d{2}[q][1-4]$")
]

@property
def year(self):
"""Return the year associated with the year_quarter."""
return pd.to_datetime(self.year_quarter).year

def get_key(self):
"""Returns hashable key for use with EpaCemsDatastore."""
return (self.year, self.quarter)
return self.year_quarter

def get_filters(self):
"""Returns filters for retrieving given partition resource from Datastore."""
return {"year": self.year, "quarter": self.quarter}
return {"year_quarter": self.year_quarter}

def get_quarterly_file(self) -> Path:
"""Return the name of the CSV file that holds annual hourly data."""
return Path(f"epacems-{self.year}-{self.quarter}.csv")
return Path(
f"epacems-{self.year}-{pd.to_datetime(self.year_quarter).quarter}.csv"
)


class EpaCemsDatastore:
Expand All @@ -129,7 +138,7 @@ def __init__(self, datastore: Datastore):
self.datastore = datastore

def get_data_frame(self, partition: EpaCemsPartition) -> pd.DataFrame:
"""Constructs dataframe from a zipfile for a given (year, quarter) partition."""
"""Constructs dataframe from a zipfile for a given (year_quarter) partition."""
archive = self.datastore.get_zipfile_resource(
"epacems", **partition.get_filters()
)
Expand Down Expand Up @@ -159,25 +168,23 @@ def _csv_to_dataframe(
).rename(columns=rename_dict)


def extract(year: int, quarter: int, ds: Datastore):
def extract(year_quarter: str, ds: Datastore):
"""Coordinate the extraction of EPA CEMS hourly DataFrames.

Args:
year: report year of the data to extract
quarter: report quarter of the data to extract
year_quarter: report year and quarter of the data to extract
ds: Initialized datastore
Yields:
pandas.DataFrame: A single quarter-year of EPA CEMS hourly emissions data.
"""
ds = EpaCemsDatastore(ds)
partition = EpaCemsPartition(quarter=quarter, year=year)
partition = EpaCemsPartition(year_quarter=year_quarter)
year = partition.year
# We have to assign the reporting year for partitioning purposes
try:
df = ds.get_data_frame(partition).assign(year=year)
except KeyError: # If no quarter-year combination found, return empty df.
logger.warning(
f"No data found for {quarter} in {year}. Returning empty dataframe."
)
logger.warning(f"No data found for {year_quarter}. Returning empty dataframe.")
res = Resource.from_id("hourly_emissions_epacems")
df = res.format_df(pd.DataFrame())
return df
10 changes: 6 additions & 4 deletions src/pudl/metadata/sources.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
"""Metadata and operational constants."""
from typing import Any

import pandas as pd

from pudl.metadata.constants import CONTRIBUTORS, KEYWORDS, LICENSES
from pudl.metadata.enums import EPACEMS_STATES

SOURCES: dict[str, Any] = {
"censusdp1tract": {
Expand Down Expand Up @@ -272,9 +273,10 @@
},
"field_namespace": "epacems",
"working_partitions": {
"years": sorted(set(range(1995, 2024))),
"quarters": sorted(set(range(1, 5))),
"states": sorted(EPACEMS_STATES),
"year_quarters": [
str(q).lower()
for q in pd.period_range(start="1995q1", end="2023q3", freq="Q")
]
},
"contributors": [
CONTRIBUTORS["catalyst-cooperative"],
Expand Down
7 changes: 2 additions & 5 deletions src/pudl/output/epacems.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import dask.dataframe as dd

from pudl.settings import EpaCemsSettings
from pudl.workspace.setup import PudlPaths


Expand Down Expand Up @@ -125,8 +124,6 @@ def epacems(
Returns:
The requested epacems data
"""
epacems_settings = EpaCemsSettings(states=states, years=years)

# columns=None is handled by dd.read_parquet; gives all columns
if columns is not None:
# nonexistent columns are handled by dd.read_parquet; raises ValueError
Expand All @@ -142,8 +139,8 @@ def epacems(
index=False,
split_row_groups=True,
filters=year_state_filter(
states=epacems_settings.states,
years=epacems_settings.years,
states=states,
years=years,
),
)
return epacems
3 changes: 1 addition & 2 deletions src/pudl/package_data/settings/etl_fast.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,4 @@ datasets:
# Note that the CEMS data relies on EIA 860 data for plant locations,
# so if you're loading CEMS data for a particular year, you should
# also load the EIA 860 data for that year if possible
quarters: [1]
years: [2022]
year_quarters: ["2022q1"]
34 changes: 1 addition & 33 deletions src/pudl/package_data/settings/etl_full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -270,36 +270,4 @@ datasets:
# Note that the CEMS data relies on EIA 860 data for plant locations,
# so if you're loading CEMS data for a particular year, you should
# also load the EIA 860 data for that year if possible
quarters: [1, 2, 3, 4]
years:
[
1995,
1996,
1997,
1998,
1999,
2000,
2001,
2002,
2003,
2004,
2005,
2006,
2007,
2008,
2009,
2010,
2011,
2012,
2013,
2014,
2015,
2016,
2017,
2018,
2019,
2020,
2021,
2022,
2023,
]
year_quarters: ["all"]
30 changes: 8 additions & 22 deletions src/pudl/settings.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
"""Module for validating pudl etl settings."""
import itertools
import json
from enum import Enum, unique
from typing import Any, ClassVar, Self
Expand Down Expand Up @@ -96,11 +95,8 @@ def partitions(cls) -> list[None | dict[str, str]]: # noqa: N805
``pd.json_normalize``.
"""
partitions = []
if hasattr(cls, "years") and hasattr(cls, "quarters"):
partitions = [
{"year": year, "quarter": quarter}
for year, quarter in itertools.product(cls.years, cls.quarters)
]
if hasattr(cls, "year_quarters"):
partitions = [{"year_quarters": part} for part in cls.year_quarters]
elif hasattr(cls, "years"):
partitions = [{"year": part} for part in cls.years]
return partitions
Expand Down Expand Up @@ -157,25 +153,15 @@ class EpaCemsSettings(GenericDatasetSettings):

data_source: ClassVar[DataSource] = DataSource.from_id("epacems")

years: list[int] = data_source.working_partitions["years"]
quarters: list[int] = data_source.working_partitions["quarters"]
states: list[str] = data_source.working_partitions["states"]

@field_validator("quarters")
@classmethod
def allow_all_keyword_quarters(cls, quarters): # noqa: N805
"""Allow users to specify ['all'] to get all quarters."""
if quarters == ["all"]:
quarters = cls.data_source.working_partitions["quarters"]
return quarters
year_quarters: list[str] = data_source.working_partitions["year_quarters"]

@field_validator("states")
@field_validator("year_quarters")
@classmethod
def allow_all_keyword_states(cls, states): # noqa: N805
def allow_all_keyword_year_quarters(cls, year_quarters): # noqa: N805
"""Allow users to specify ['all'] to get all quarters."""
if states == ["all"]:
states = cls.data_source.working_partitions["states"]
return states
if year_quarters == ["all"]:
year_quarters = cls.data_source.working_partitions["year_quarters"]
return year_quarters


class Eia923Settings(GenericDatasetSettings):
Expand Down
4 changes: 2 additions & 2 deletions src/pudl/transform/epacems.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,11 @@ def transform(
"""Transform EPA CEMS hourly data and ready it for export to Parquet.

Args:
raw_df: An extracted by not yet transformed state-year of EPA CEMS data.
raw_df: An extracted by not yet transformed year_quarter of EPA CEMS data.
pudl_engine: SQLAlchemy connection engine for connecting to an existing PUDL DB.

Returns:
A single year-state of EPA CEMS data
A single year_quarter of EPA CEMS data
"""
# Create all the table inputs used for the subtransform functions below

Expand Down
2 changes: 1 addition & 1 deletion src/pudl/workspace/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ class ZenodoDoiSettings(BaseSettings):
eia923: ZenodoDoi = "10.5281/zenodo.10067550"
eia_bulk_elec: ZenodoDoi = "10.5281/zenodo.7067367"
epacamd_eia: ZenodoDoi = "10.5281/zenodo.7900974"
epacems: ZenodoDoi = "10.5281/zenodo.10233186"
epacems: ZenodoDoi = "10.5281/zenodo.10306114" # STILL A DRAFT ARCHIVE
ferc1: ZenodoDoi = "10.5281/zenodo.8326634"
ferc2: ZenodoDoi = "10.5281/zenodo.8326697"
ferc6: ZenodoDoi = "10.5281/zenodo.8326696"
Expand Down