Skip to content

Commit

Permalink
Few more changes to ferc2
Browse files Browse the repository at this point in the history
attempt to load data in single partition chunks instead of loading
all into memory to see if this could help with speed. Turns out it
may slow things down a little, perhaps due to some bottlenecks in
loading or dumping into sqlite.

Fixed paths for the early form 2 years.
  • Loading branch information
rousik committed May 12, 2023
1 parent 2af083d commit 680fd33
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 9 deletions.
47 changes: 46 additions & 1 deletion src/pudl/extract/dbf.py
Expand Up @@ -119,12 +119,16 @@ def get_db_schema(self) -> dict[str, list[str]]:
elif obj_type == "Field":
parent_id = row.get("PARENTID")
table_columns[parent_id].append(obj_name)
# Remap table ids to table names.
# Remap table ids to table names.pd
self._table_schemas = {
table_names[tid]: cols for tid, cols in table_columns.items()
}
return self._table_schemas

def get_table_names(self) -> list[str]:
"""Returns list of known table names."""
return self.get_db_schema().keys()

def get_table_dbf(self, table_name: str) -> DBF:
"""Opens the DBF for a given table."""
fname = self._table_file_map[table_name]
Expand Down Expand Up @@ -494,6 +498,10 @@ def is_valid_partition(fl: dict[str, Any]) -> bool:

def load_table_data(self):
"""Loads all tables from fox pro database and writes them to sqlite."""
# Use new approach for ferc2
if self.DATASET == "ferc2":
return self.load_table_data_v2()

partitions = [
p
for p in self.datastore.get_datapackage_descriptor(
Expand Down Expand Up @@ -527,6 +535,43 @@ def load_table_data(self):
index=False,
)

def load_table_data_v2(self):
"""This is next-gen of the data loading which loads tables in increments.
Rather than aggregating across all partitions before loading into sqlite, this
loads data into sqlite incrementally. This should be faster, and could be more
easily parallelized.
"""
partitions = [
p
for p in self.datastore.get_datapackage_descriptor(
self.DATASET
).get_partition_filters()
if self.is_valid_partition(p) and p.get("year", None) in self.settings.years
]
logger.info(
f"Loading {self.DATASET} table data from {len(partitions)} partitions."
)
for par in partitions:
archive = self.dbf_reader.get_archive(**par)
for table_name in archive.get_table_names():
# TODO(rousik): code below could be run in parallel
new_df = archive.load_table(table_name)
if new_df is None or len(new_df) <= 0:
continue
new_df = self.transform_table(table_name, new_df)
coltypes = {
col.name: col.type for col in self.sqlite_meta.tables[table_name].c
}
new_df.to_sql(
table_name,
self.sqlite_engine,
if_exists="append",
chunksize=100000,
dtype=coltypes,
index=False,
)

def finalize_schema(self, meta: sa.MetaData) -> sa.MetaData:
"""This method is called just before the schema is written to sqlite.
Expand Down
17 changes: 9 additions & 8 deletions src/pudl/package_data/ferc2/dbc_file_map.csv
@@ -1,12 +1,12 @@
year,path
1996,UPLOADERS/FORM2/tmpwork/F2_PUB.DBC
1997,UPLOADERS/FORM2/tmpwork/F2_PUB.DBC
1998,UPLOADERS/FORM2/tmpwork/F2_PUB.DBC
1999,UPLOADERS/FORM2/tmpwork/F2_PUB.DBC
2000,UPLOADERS/FORM2/tmpwork/F2_PUB.DBC
2001,UPLOADERS/FORM2/tmpwork/F2_PUB.DBC
2002,UPLOADERS/FORM2/tmpwork/F2_PUB.DBC
2003,UPLOADERS/FORM2/tmpwork/F2_PUB.DBC
1996,FORMSADMIN/FORM2/tmpwork/F2_PUB.DBC
1997,FORMSADMIN/FORM2/tmpwork/F2_PUB.DBC
1998,FORMSADMIN/FORM2/tmpwork/F2_PUB.DBC
1999,FORMSADMIN/FORM2/tmpwork/F2_PUB.DBC
2000,FORMSADMIN/FORM2/tmpwork/F2_PUB.DBC
2001,FORMSADMIN/FORM2/tmpwork/F2_PUB.DBC
2002,FORMSADMIN/FORM2/tmpwork/F2_PUB.DBC
2003,FORMSADMIN/FORM2/tmpwork/F2_PUB.DBC
2004,UPLOADERS/FORM2/working/F2_PUB.DBC
2005,UPLOADERS/FORM2/working/F2_PUB.DBC
2006,UPLOADERS/FORM2/working/F2_PUB.DBC
Expand All @@ -24,3 +24,4 @@ year,path
2018,UPLOADERS/FORM2/working/F2_PUB.DBC
2019,UPLOADERS/FORM2/working/F2_PUB.DBC
2020,UPLOADERS/FORM2/working/F2_PUB.DBC
2021,UPLOADERS/FORM2/working/F2_PUB.DBC

0 comments on commit 680fd33

Please sign in to comment.