Skip to content

Commit

Permalink
Few more minor code improvements:
Browse files Browse the repository at this point in the history
1. added is_disabled property to dataset settings, which can be set
specifically on ferc1 and ferc2 dbf settings (not globally applied
and/or enforced, but it would be nice)
2. construct ferc extractors in a loop
3. pull the right config out of the global FercToSqliteSettings to
facilitate (2)
2. construct ferc extractors in a loop
3. pull the right config out of the global FercToSqliteSettings to
facilitate (2)
2. construct ferc extractors in a loop
3. pull the right config out of the global FercToSqliteSettings to
facilitate (2)
2. construct ferc extractors in a loop
3. pull the right config out of the global FercToSqliteSettings to
facilitate (2)
  • Loading branch information
rousik committed May 23, 2023
1 parent 30544eb commit 8d16438
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 33 deletions.
34 changes: 18 additions & 16 deletions src/pudl/extract/dbf.py
Expand Up @@ -15,6 +15,7 @@
import pudl
import pudl.logging_helpers
from pudl.metadata.classes import DataSource
from pudl.settings import FercToSqliteSettings, GenericDatasetSettings
from pudl.workspace.datastore import Datastore

logger = pudl.logging_helpers.get_logger(__name__)
Expand Down Expand Up @@ -282,20 +283,6 @@ def __init__(
self.datastore = datastore
self.dataset = dataset
self.field_parser = field_parser
# at the moment, archives are inconsistent in terms of upper/lower casing the
# partition data_format values. We will infer the correct value by inspecting the
# descriptor.
# The following workaround could be removed once all zenodo archives agree on
# the capitalization.
parts = self.datastore.get_datapackage_descriptor(self.dataset).get_partitions()
if "dbf" in parts["data_format"]:
self._data_format = "dbf"
elif "DBF" in parts["data_format"]:
self._data_format = "DBF"
else:
raise RuntimeError(
f"dbf/DBF not found in the data_format for the dataset {self.dataset}"
)

# dbc_file_map.csv contains path to the DBC file that contains the
# overall database schemas. It is expected that DBF files live in
Expand Down Expand Up @@ -409,7 +396,7 @@ class FercDbfExtractor:
def __init__(
self,
datastore: Datastore,
settings: Any,
settings: FercToSqliteSettings,
output_path: Path,
clobber: bool = False,
):
Expand All @@ -421,7 +408,7 @@ def __init__(
output_path: directory where the output databases should be stored.
clobber: if True, existing databases should be replaced.
"""
self.settings = settings
self.settings: GenericDatasetSettings = self.get_settings(settings)
self.clobber = clobber
self.output_path = output_path
self.datastore = datastore
Expand All @@ -430,6 +417,14 @@ def __init__(
self.sqlite_meta = sa.MetaData()
self.sqlite_meta.reflect(self.sqlite_engine)

def get_settings(
self, global_settings: FercToSqliteSettings
) -> GenericDatasetSettings:
"""Returns dataset relevant settings from the global_settings."""
return NotImplemented(
"get_settings() needs to extract dataset specific settings."
)

def get_dbf_reader(self, datastore: Datastore) -> AbstractFercDbfReader:
"""Returns appropriate instance of AbstractFercDbfReader to access the data."""
return FercDbfReader(datastore, dataset=self.DATASET)
Expand All @@ -441,6 +436,13 @@ def get_db_path(self) -> str:

def execute(self):
"""Runs the extraction of the data from dbf to sqlite."""
logger.info(
f"Running dbf extraction for {self.DATASET} with settings: {self.settings}"
)
if self.settings.is_disabled:
logger.warning(f"Dataset {self.DATASET} extraction is disabled, skipping")
return

# TODO(rousik): perhaps we should check clobber here before starting anything.
self.delete_schema()
self.create_sqlite_tables()
Expand Down
39 changes: 22 additions & 17 deletions src/pudl/extract/ferc1.py
Expand Up @@ -94,7 +94,7 @@
ferc1_dbf_sqlite_io_manager,
ferc1_xbrl_sqlite_io_manager,
)
from pudl.settings import DatasetsSettings
from pudl.settings import DatasetsSettings, FercToSqliteSettings, GenericDatasetSettings
from pudl.workspace.datastore import Datastore

logger = pudl.logging_helpers.get_logger(__name__)
Expand Down Expand Up @@ -213,8 +213,15 @@
class Ferc1DbfExtractor(FercDbfExtractor):
"""Wrapper for running the foxpro to sqlite conversion of FERC1 dataset."""

DATASET = "ferc1"
DATABASE_NAME = "ferc1.sqlite"

def get_settings(
self, global_settings: FercToSqliteSettings
) -> GenericDatasetSettings:
"""Returns settings for FERC Form 1 DBF dataset."""
return global_settings.ferc1_dbf_to_sqlite_settings

def get_dbf_reader(self, base_datastore: Datastore) -> AbstractFercDbfReader:
"""Returns an instace of :class:`FercDbfReader`.
Expand All @@ -241,7 +248,9 @@ def finalize_schema(self, meta: sa.MetaData) -> sa.MetaData:
This marks f1_responent_id.respondent_id as a primary key and adds foreign key
constraints on all tables with respondent_id column.
"""
return add_key_constraints(meta, pk_table="f1_respondent_id", column="respondent_id")
return add_key_constraints(
meta, pk_table="f1_respondent_id", column="respondent_id"
)

def postprocess(self):
"""Applies final transformations on the data in sqlite database.
Expand Down Expand Up @@ -340,21 +349,17 @@ def dbf2sqlite(context) -> None:
# to make the integration # between the class and dagster better? Investigate.
logger.info(f"dbf2sqlite settings: {context.resources.ferc_to_sqlite_settings}")

# TODO(rousik): this is disabled temporarily to run only ferc form 2.
#Ferc1DbfExtractor(
# datastore=context.resources.datastore,
# settings=context.resources.ferc_to_sqlite_settings.ferc1_dbf_to_sqlite_settings,
# clobber=context.op_config["clobber"],
# output_path=context.op_config["pudl_output_path"],
#).execute()

Ferc2DbfExtractor(
datastore=context.resources.datastore,
settings=context.resources.ferc_to_sqlite_settings.ferc2_dbf_to_sqlite_settings,
clobber=context.op_config["clobber"],
output_path=context.op_config["pudl_output_path"],
).execute()
# TODO(rousik): move this out of ferc1 and into ferc.py which will have all of those.
extractors = [
Ferc1DbfExtractor,
Ferc2DbfExtractor,
]
for xclass in extractors:
xclass(
datastore=context.resources.datastore,
settings=context.resources.ferc_to_sqlite_settings,
clobber=context.op_config["clobber"],
output_path=context.op_config["pudl_output_path"],
).execute()


###########################################################################
Expand Down
7 changes: 7 additions & 0 deletions src/pudl/extract/ferc2.py
Expand Up @@ -17,6 +17,7 @@
import pudl
from pudl.extract.dbf import FercDbfExtractor
from pudl.extract.ferc import add_key_constraints
from pudl.settings import FercToSqliteSettings, GenericDatasetSettings

logger = pudl.logging_helpers.get_logger(__name__)

Expand All @@ -27,6 +28,12 @@ class Ferc2DbfExtractor(FercDbfExtractor):
DATASET = "ferc2"
DATABASE_NAME = "ferc2.sqlite"

def get_settings(
self, global_settings: FercToSqliteSettings
) -> GenericDatasetSettings:
"""Returns settings for FERC Form 1 DBF dataset."""
return global_settings.ferc2_dbf_to_sqlite_settings

def finalize_schema(self, meta: sa.MetaData) -> sa.MetaData:
"""Add primary and foreign keys for respondent_id."""
return add_key_constraints(
Expand Down
10 changes: 10 additions & 0 deletions src/pudl/settings.py
Expand Up @@ -91,6 +91,11 @@ def partitions(cls) -> list[None | dict[str, str]]: # noqa: N805
partitions = [{"year": part} for part in cls.years]
return partitions

@property
def is_disabled(self) -> bool:
"""Returns True if the dataset is disabled and should be skipped."""
return getattr(self, "disabled", False)


class Ferc1Settings(GenericDatasetSettings):
"""An immutable pydantic model to validate Ferc1Settings.
Expand Down Expand Up @@ -427,12 +432,14 @@ class Ferc1DbfToSqliteSettings(GenericDatasetSettings):
Args:
years: List of years to validate.
disabled: if true, skip processing this dataset.
"""

data_source: ClassVar[DataSource] = DataSource.from_id("ferc1")
years: list[int] = [
year for year in data_source.working_partitions["years"] if year <= 2020
]
disabled: bool = False

refyear: ClassVar[int] = max(years)

Expand Down Expand Up @@ -481,11 +488,14 @@ class Ferc2DbfToSqliteSettings(GenericDatasetSettings):
Args:
years: List of years to validate.
disabled: if True, skip processing this dataset.
"""

data_source: ClassVar[DataSource] = DataSource.from_id("ferc2")
years: list[int] = [
year for year in data_source.working_partitions["years"] if year <= 2020
]
disabled: bool = False

refyear: ClassVar[int] = max(years)

Expand Down

0 comments on commit 8d16438

Please sign in to comment.