From 680fd33cf9f07386d0304e3ecce41c0b9a1ae48b Mon Sep 17 00:00:00 2001 From: Jan Rous Date: Fri, 12 May 2023 16:07:15 -0600 Subject: [PATCH] Few more changes to ferc2 attempt to load data in single partition chunks instead of loading all into memory to see if this could help with speed. Turns out it may slow things down a little, perhaps due to some bottlenecks in loading or dumping into sqlite. Fixed paths for the early form 2 years. --- src/pudl/extract/dbf.py | 47 +++++++++++++++++++- src/pudl/package_data/ferc2/dbc_file_map.csv | 17 +++---- 2 files changed, 55 insertions(+), 9 deletions(-) diff --git a/src/pudl/extract/dbf.py b/src/pudl/extract/dbf.py index 2e9801017b..0f0c3e743a 100644 --- a/src/pudl/extract/dbf.py +++ b/src/pudl/extract/dbf.py @@ -119,12 +119,16 @@ def get_db_schema(self) -> dict[str, list[str]]: elif obj_type == "Field": parent_id = row.get("PARENTID") table_columns[parent_id].append(obj_name) - # Remap table ids to table names. + # Remap table ids to table names.pd self._table_schemas = { table_names[tid]: cols for tid, cols in table_columns.items() } return self._table_schemas + def get_table_names(self) -> list[str]: + """Returns list of known table names.""" + return self.get_db_schema().keys() + def get_table_dbf(self, table_name: str) -> DBF: """Opens the DBF for a given table.""" fname = self._table_file_map[table_name] @@ -494,6 +498,10 @@ def is_valid_partition(fl: dict[str, Any]) -> bool: def load_table_data(self): """Loads all tables from fox pro database and writes them to sqlite.""" + # Use new approach for ferc2 + if self.DATASET == "ferc2": + return self.load_table_data_v2() + partitions = [ p for p in self.datastore.get_datapackage_descriptor( @@ -527,6 +535,43 @@ def load_table_data(self): index=False, ) + def load_table_data_v2(self): + """This is next-gen of the data loading which loads tables in increments. + + Rather than aggregating across all partitions before loading into sqlite, this + loads data into sqlite incrementally. This should be faster, and could be more + easily parallelized. + """ + partitions = [ + p + for p in self.datastore.get_datapackage_descriptor( + self.DATASET + ).get_partition_filters() + if self.is_valid_partition(p) and p.get("year", None) in self.settings.years + ] + logger.info( + f"Loading {self.DATASET} table data from {len(partitions)} partitions." + ) + for par in partitions: + archive = self.dbf_reader.get_archive(**par) + for table_name in archive.get_table_names(): + # TODO(rousik): code below could be run in parallel + new_df = archive.load_table(table_name) + if new_df is None or len(new_df) <= 0: + continue + new_df = self.transform_table(table_name, new_df) + coltypes = { + col.name: col.type for col in self.sqlite_meta.tables[table_name].c + } + new_df.to_sql( + table_name, + self.sqlite_engine, + if_exists="append", + chunksize=100000, + dtype=coltypes, + index=False, + ) + def finalize_schema(self, meta: sa.MetaData) -> sa.MetaData: """This method is called just before the schema is written to sqlite. diff --git a/src/pudl/package_data/ferc2/dbc_file_map.csv b/src/pudl/package_data/ferc2/dbc_file_map.csv index 2202e2528a..6e6cea2078 100644 --- a/src/pudl/package_data/ferc2/dbc_file_map.csv +++ b/src/pudl/package_data/ferc2/dbc_file_map.csv @@ -1,12 +1,12 @@ year,path -1996,UPLOADERS/FORM2/tmpwork/F2_PUB.DBC -1997,UPLOADERS/FORM2/tmpwork/F2_PUB.DBC -1998,UPLOADERS/FORM2/tmpwork/F2_PUB.DBC -1999,UPLOADERS/FORM2/tmpwork/F2_PUB.DBC -2000,UPLOADERS/FORM2/tmpwork/F2_PUB.DBC -2001,UPLOADERS/FORM2/tmpwork/F2_PUB.DBC -2002,UPLOADERS/FORM2/tmpwork/F2_PUB.DBC -2003,UPLOADERS/FORM2/tmpwork/F2_PUB.DBC +1996,FORMSADMIN/FORM2/tmpwork/F2_PUB.DBC +1997,FORMSADMIN/FORM2/tmpwork/F2_PUB.DBC +1998,FORMSADMIN/FORM2/tmpwork/F2_PUB.DBC +1999,FORMSADMIN/FORM2/tmpwork/F2_PUB.DBC +2000,FORMSADMIN/FORM2/tmpwork/F2_PUB.DBC +2001,FORMSADMIN/FORM2/tmpwork/F2_PUB.DBC +2002,FORMSADMIN/FORM2/tmpwork/F2_PUB.DBC +2003,FORMSADMIN/FORM2/tmpwork/F2_PUB.DBC 2004,UPLOADERS/FORM2/working/F2_PUB.DBC 2005,UPLOADERS/FORM2/working/F2_PUB.DBC 2006,UPLOADERS/FORM2/working/F2_PUB.DBC @@ -24,3 +24,4 @@ year,path 2018,UPLOADERS/FORM2/working/F2_PUB.DBC 2019,UPLOADERS/FORM2/working/F2_PUB.DBC 2020,UPLOADERS/FORM2/working/F2_PUB.DBC +2021,UPLOADERS/FORM2/working/F2_PUB.DBC