-
-
Notifications
You must be signed in to change notification settings - Fork 106
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Extract raw PHMSA distribution and start of transmission data (Table A-D, H, I) #2932
Changes from 41 commits
e7db559
2534c40
57e6a9f
f7f040d
c0a4f1c
33220e9
b06585e
d9916ab
4fb33e2
a4cbf89
aaecdfa
ee9b3bf
61bc757
542ce85
cf0454d
d26b4aa
0b08162
e1215fe
4f50183
65af95d
2cbd7dd
5587b2e
8b7bbc9
36752c3
1994527
cce25e2
60785da
128e9fa
2f38aa6
c04dff5
d9dc053
56930be
480d656
3d90e21
41ce79b
ab93e06
91bf98c
405776c
16b8641
db68ed9
4122299
f8ab0e0
90220be
bfe5dd2
6871b92
7b0aa39
f5b4fbb
5b37e94
aee4548
1ac438e
e98c456
37c4dd0
c5d22e6
20f05b1
ffaa8da
30d237b
27dab3e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,200 @@ | ||
{ | ||
"cells": [ | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"# Testing column mapping for Excel spreadsheets\n", | ||
"This notebook is designed to quickly test column maps for Excel spreadsheets. It will flag the following:\n", | ||
"1) Column names that are input but don't exist in the actual data\n", | ||
"2) Column names present in the raw data but not mapped\n", | ||
"3) Invalid inputs for pages and files in `page_map.csv` and `file_map.csv`" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"First, select the raw dataset you're going to be mapping and locate all relevant file directories." | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"import pudl\n", | ||
"from pudl.workspace.datastore import ZenodoDoiSettings\n", | ||
"import os\n", | ||
"import importlib\n", | ||
"from pathlib import Path\n", | ||
"import pandas as pd\n", | ||
"from zipfile import ZipFile\n", | ||
"import logging\n", | ||
"import sys\n", | ||
"\n", | ||
"logger = logging.getLogger()\n", | ||
"logger.setLevel(logging.INFO)\n", | ||
"handler = logging.StreamHandler(stream=sys.stdout)\n", | ||
"formatter = logging.Formatter('%(message)s')\n", | ||
"handler.setFormatter(formatter)\n", | ||
"logger.handlers = [handler]" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"dataset = \"phmsagas\"\n", | ||
"doi_path = getattr(ZenodoDoiSettings(), dataset).replace(\"/\", \"-\")\n", | ||
"data_path = os.path.join(os.getenv(\"PUDL_INPUT\"),dataset,doi_path) # Get path to raw data\n", | ||
"map_path = os.path.join(Path(pudl.package_data.__file__).parents[0], dataset) # Get path to mapping CSVs" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"First, validate the file map. Make sure all file names included in the CSV actually exist in the raw data." | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"file_map = pd.read_csv(\n", | ||
" os.path.join(map_path, \"file_map.csv\"), index_col=0, comment=\"#\"\n", | ||
" )\n", | ||
"raw_files = os.listdir(data_path)\n", | ||
"\n", | ||
"# For each file, if zipfile get list of file names contained inside\n", | ||
"all_files = []\n", | ||
"for file in raw_files:\n", | ||
" if file.endswith(\"zip\"):\n", | ||
" file_path = os.path.join(data_path, file)\n", | ||
" file_list = ZipFile(file_path).namelist()\n", | ||
" all_files.append({file_path: file_list})\n", | ||
"\n", | ||
"for table_files in file_map.values.tolist(): # For each table with a list of files\n", | ||
" for file in table_files: # For each file included in this table\n", | ||
" if file not in str(all_files): # Search the list of files for the file text, flag if not.\n", | ||
" logger.warning(f\"File '{file}' not found in actual raw data. Check file name.\")" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"Next, read in the column mapping CSVs. For each one, read in the raw data and make sure no columns are missing." | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"sheet_name = pd.read_csv(\n", | ||
" os.path.join(map_path, \"page_map.csv\"), index_col=0, comment=\"#\"\n", | ||
" )\n", | ||
"skip_rows = pd.read_csv(\n", | ||
" os.path.join(map_path, \"skiprows.csv\"), index_col=0, comment=\"#\"\n", | ||
" )" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"Sometimes we don't care about missing raw columns, or we only want to check a particular table. Set parameters here to fine tune what you're checking." | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"raw_check = False # If false, only check that mapped columns are found in the raw dataset.\n", | ||
" # Useful when a table is split between several pages.\n", | ||
"table_subset = [] # Leave list empty to check all tables\n", | ||
"years_subset = [] # Use empty list if you want to check all years, otherwise supply a list of integers or a range" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"def find_zip(file: str, dicts: list[dict[str,str]]) -> str:\n", | ||
" for dic in dicts:\n", | ||
" match = [i for i in dic if file in dic[i]]\n", | ||
" if match == []:\n", | ||
" continue\n", | ||
" return match[0]\n", | ||
"\n", | ||
"ds = pudl.workspace.datastore.Datastore()\n", | ||
"for page in file_map.index:\n", | ||
" if not table_subset or page in table_subset:\n", | ||
" column_maps = pd.read_csv(\n", | ||
" os.path.join(map_path, \"column_maps\", f\"{page}.csv\"), index_col=0, comment=\"#\"\n", | ||
" )\n", | ||
" for index in file_map.columns: \n", | ||
" if not years_subset or int(index) in years_subset:\n", | ||
" logger.info(f\"Checking column maps for {page}, {index}\")\n", | ||
" file = file_map.loc[page,index] # Get file name\n", | ||
" archive = ZipFile(find_zip(file, all_files)) # Open zipfile and read file\n", | ||
" with archive.open(file) as excel_file:\n", | ||
" raw_file = pd.read_excel(\n", | ||
" excel_file,\n", | ||
" sheet_name=sheet_name.loc[page,index],\n", | ||
" skiprows=skip_rows.loc[page,index],\n", | ||
" )\n", | ||
" raw_file = pudl.helpers.simplify_columns(raw_file) # Add pre-processing step used before column rename\n", | ||
" raw_columns = raw_file.columns # Get raw column names\n", | ||
" mapped_columns = column_maps.loc[:, index].dropna()\n", | ||
" raw_missing = [col for col in raw_columns if col not in mapped_columns.values]\n", | ||
" mapped_missing = [col for col in mapped_columns if col not in raw_columns.values]\n", | ||
" if raw_missing and raw_check:\n", | ||
" logger.warning(f\"Raw columns {raw_missing} from {file} are not mapped.\")\n", | ||
" if mapped_missing:\n", | ||
" logger.warning(f\"Mapped columns {mapped_missing} do not exist in the raw data file {file}\")\n", | ||
" " | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"Go back and fix any incorrectly labelled columns. Then run the cell above again, until all columns are correctly labelled." | ||
] | ||
} | ||
], | ||
"metadata": { | ||
"kernelspec": { | ||
"display_name": "pudl-dev", | ||
"language": "python", | ||
"name": "python3" | ||
}, | ||
"language_info": { | ||
"codemirror_mode": { | ||
"name": "ipython", | ||
"version": 3 | ||
}, | ||
"file_extension": ".py", | ||
"mimetype": "text/x-python", | ||
"name": "python", | ||
"nbconvert_exporter": "python", | ||
"pygments_lexer": "ipython3", | ||
"version": "3.11.6" | ||
} | ||
}, | ||
"nbformat": 4, | ||
"nbformat_minor": 2 | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,5 +18,6 @@ | |
excel, | ||
ferc1, | ||
ferc714, | ||
phmsagas, | ||
xbrl, | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -44,6 +44,12 @@ class Metadata: | |
* column_map/${page}.csv currently informs us how to translate input column | ||
names to standardized pudl names for given (partition, input_col_name). | ||
Relevant page is encoded in the filename. | ||
|
||
Optional file: | ||
|
||
* page_part_map.csv tells us what page is connected to an additional partition | ||
outside the partition included in the rest of the mapping files (usually year). | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know this is somewhat complex, but it's still not super clear to me what this means because I don't understand the format of the raw data. I think I'm particularly thrown off by the word "page". Is this an excel tab? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But I can clarify this description a bit to make it clearer. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmmm yeah that probably makes sense. Ive never used There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the whole excel extractor is designed around extracting "pages". these mostly correspond to "all the data in a tab of an excel file... but i think it is defined more generically on purpose. bc we've used to extract dbf data and are converting it to extract data from csv's. and if there are multiple chunks of data you want to extract from the same tab you could use but still out of the full context of the doc string for even this metadata class the page may sound out of place, but could you read the full docstring for the class and then let us know if it still feels confusing? |
||
""" | ||
|
||
# TODO: we could validate whether metadata is valid for all year. We should have | ||
|
@@ -63,6 +69,13 @@ def __init__(self, dataset_name: str): | |
self._skipfooter = self._load_csv(pkg, "skipfooter.csv") | ||
self._sheet_name = self._load_csv(pkg, "page_map.csv") | ||
self._file_name = self._load_csv(pkg, "file_map.csv") | ||
# Most excel extracted datasets do not have a page to part map. If they | ||
# don't, assign null. | ||
try: | ||
self._page_part_map = self._load_csv(pkg, "page_part_map.csv") | ||
except FileNotFoundError: | ||
self._page_part_map = pd.DataFrame() | ||
|
||
column_map_pkg = pkg + ".column_maps" | ||
self._column_map = {} | ||
for res_path in importlib.resources.files(column_map_pkg).iterdir(): | ||
|
@@ -223,16 +236,26 @@ def process_renamed(df, page, **partition): | |
"""Transforms dataframe after columns are renamed.""" | ||
return df | ||
|
||
@staticmethod | ||
def process_final_page(df, page): | ||
"""Final processing stage applied to a page DataFrame.""" | ||
return df | ||
|
||
@staticmethod | ||
def get_dtypes(page, **partition): | ||
"""Provide custom dtypes for given page and partition.""" | ||
return {} | ||
|
||
def process_final_page(self, df, page): | ||
"""Final processing stage applied to a page DataFrame.""" | ||
return df | ||
|
||
def zipfile_resource_partitions(self, page, **partition) -> dict: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This returns the "input data" partition that tells us where to find the source data for a certain output partition, right? And if we want to do weird custom logic here in a dataset-specific subclass of If so, it might be clearer if we rename the method + update the docstring to establish: "if you are looking to do some fancy mapping from input partitions to output partitions, for example if you output quarterly data but read in yearly files, override this method. the default is to take the output partitions + add any additional partitions defined in the page_part_map.csv." |
||
"""Get partition used for the returning a zipfile from the datastore. | ||
|
||
This method appends any page to partition mapping in | ||
:attr:`METADATA._page_part_map`. Most datasets do not have page to part | ||
maps and just return the same partition that is passed in. | ||
""" | ||
if not self.METADATA._page_part_map.empty: | ||
partition.update(self.METADATA._page_part_map.loc[page]) | ||
return partition | ||
|
||
def extract(self, **partitions): | ||
"""Extracts dataframes. | ||
|
||
|
@@ -311,7 +334,7 @@ def extract(self, **partitions): | |
) | ||
df = pd.concat([df, pd.DataFrame(columns=missing_cols)], sort=True) | ||
|
||
raw_dfs[page] = self.process_final_page(df, page) | ||
raw_dfs[page] = self.process_final_page(df=df, page=page) | ||
return raw_dfs | ||
|
||
def load_excel_file(self, page, **partition): | ||
|
@@ -343,7 +366,10 @@ def load_excel_file(self, page, **partition): | |
) | ||
excel_file = pd.ExcelFile(res) | ||
except KeyError: | ||
zf = self.ds.get_zipfile_resource(self._dataset_name, **partition) | ||
zf = self.ds.get_zipfile_resource( | ||
self._dataset_name, | ||
**self.zipfile_resource_partitions(page, **partition), | ||
) | ||
|
||
# If loading the excel file from the zip fails then try to open a dbf file. | ||
extension = pathlib.Path(xlsx_filename).suffix.lower() | ||
|
@@ -442,13 +468,12 @@ def years_from_settings_factory(name: str) -> OpDefinition: | |
"""Construct a Dagster op to get target years from settings in the Dagster context. | ||
|
||
Args: | ||
name: Name of an Excel based dataset (e.g. "eia860"). Currently this must be | ||
one of the attributes of :class:`pudl.settings.EiaSettings` | ||
name: Name of an Excel based dataset (e.g. "eia860"). | ||
|
||
""" | ||
|
||
def years_from_settings(context) -> DynamicOutput: | ||
"""Produce target years for the given dataset from the EIA settings object. | ||
"""Produce target years for the given dataset from the dataset settings object. | ||
|
||
These will be used to kick off worker processes to extract each year of data in | ||
parallel. | ||
|
@@ -458,8 +483,11 @@ def years_from_settings(context) -> DynamicOutput: | |
extracted. See the Dagster API documentation for more details: | ||
https://docs.dagster.io/_apidocs/dynamic#dagster.DynamicOut | ||
""" | ||
eia_settings = context.resources.dataset_settings.eia | ||
for year in getattr(eia_settings, name).years: | ||
if "eia" in name: # Account for nested settings if EIA | ||
year_settings = context.resources.dataset_settings.eia | ||
else: | ||
year_settings = context.resources.dataset_settings | ||
for year in getattr(year_settings, name).years: | ||
yield DynamicOutput(year, mapping_key=str(year)) | ||
|
||
return op( | ||
|
@@ -477,8 +505,7 @@ def raw_df_factory( | |
Args: | ||
extractor_cls: The dataset-specific Excel extractor used to extract the data. | ||
Needs to correspond to the dataset identified by ``name``. | ||
name: Name of an Excel based dataset (e.g. "eia860"). Currently this must be | ||
one of the attributes of :class:`pudl.settings.EiaSettings` | ||
name: Name of an Excel based dataset (e.g. "eia860"). | ||
""" | ||
# Build a Dagster op that can extract a single year of data | ||
year_extractor = year_extractor_factory(extractor_cls, name) | ||
|
@@ -487,7 +514,7 @@ def raw_df_factory( | |
years_from_settings = years_from_settings_factory(name) | ||
|
||
def raw_dfs() -> dict[str, pd.DataFrame]: | ||
"""Produce a dictionary of extracted EIA dataframes.""" | ||
"""Produce a dictionary of extracted dataframes.""" | ||
years = years_from_settings() | ||
# Clone dagster op for each year using DynamicOut.map() | ||
# See https://docs.dagster.io/_apidocs/dynamic#dagster.DynamicOut | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the logic behind the
gas
tag - is there other PHMSA data we are not using?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is how the dataset was named when we first added it to PUDL. I'm not sure it makes the most sense (vs. just
phmsa
) but we'd have to overhaul all of our names in two different repos to change it at this point.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That does sound annoying, @cmgosnell what do you think? Who did the original naming?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As per discussion in slack, this reflects the desire to name our sources {distributor}{source}, as we do for
eia923
orepacems
. Rather than splitting it intophmsatransmission
,phmsadistribution
, we're following the high-level split between gas and hazardous materials data reported by PHMSA.