Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use csv instead of json for DMR to avoid data timeouts #139

Merged
merged 2 commits into from
Jun 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 84 additions & 130 deletions stewi/DMR.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import urllib
import time
from pathlib import Path
from io import BytesIO

from esupy.processed_data_mgmt import read_source_metadata
from stewi.globals import unit_convert,\
Expand Down Expand Up @@ -70,27 +71,25 @@ def generate_url(url_params):
params = {k: v for k, v in url_params.items() if v}

params['p_nd'] = DETECTION
params['output'] = 'JSON'
params['output'] = 'CSV'
if 'p_poll_cat' in params:
params['p_poll_cat'] = 'Nut' + params['p_poll_cat']
if PARAM_GROUP:
params['p_param_group'] = 'Y' # default is N
if not ESTIMATION:
params['p_est'] = 'N' # default is Y
if 'responseset' not in params:
params['responseset'] = '20000'
# if 'responseset' not in params:
# params['responseset'] = '20000'

url = _config['base_url'] + urllib.parse.urlencode(params)

return url


def query_dmr(year, sic_list=None, state_list=STATES, nutrient=''):
"""Loop through a set of states and sics to download and pickle DMR data.
def query_dmr(year, state_list=STATES, nutrient=''):
"""Loop through a set of states to download and pickle DMR data.

:param year: str, year of data
:param sic_list: Option to break up queries further by list of 2-digit
SIC codes
:param state_list: List of states to include in query
:param nutrient: Option to query by nutrient category with aggregation.
Input 'N' or 'P'
Expand All @@ -104,8 +103,9 @@ def query_dmr(year, sic_list=None, state_list=STATES, nutrient=''):
'p_st': '',
'p_poll_cat': nutrient,
'p_nutrient_agg': 'N',
'responseset': '9000',
'pageno': '1',
'suppress_headers': 'Y',
# 'responseset': '9000',
# 'pageno': '1',
}
if nutrient:
filestub = nutrient + "_"
Expand All @@ -117,7 +117,7 @@ def query_dmr(year, sic_list=None, state_list=STATES, nutrient=''):
results[state] = 'success'
else:
url_params['p_st'] = state
results[state] = download_data(url_params, filepath, sic_list)
results[state] = download_data(url_params, filepath)
return results


Expand All @@ -130,89 +130,66 @@ def check_for_file(filepath: Path, state) -> bool:
return False


def download_data(url_params, filepath: Path, sic_list) -> str:
def download_data(url_params, filepath: Path) -> str:
df = pd.DataFrame()
if sic_list:
skip_errors = True
else:
skip_errors = False
sic_list = ['']
for sic in sic_list:
url_params['p_sic2'] = sic
counter = 1
pages = 1
while counter <= pages:
url_params['pageno'] = counter
url = generate_url(url_params)
log.debug(url)
for attempt in range(3):
try:
r = requests.get(url)
url = generate_url(url_params)
log.debug(url)
for attempt in range(3):
try:
r = requests.get(url)
r.raise_for_status()
# When more than 100,000 records, need to split queries
if ((len(r.content) < 1000) and
('Maximum number of records' in str(r.content))):
for x in ('NGP', 'GPC', 'NPD'):
split_url = f'{url}&p_permit_type={x}'
r = requests.get(split_url)
r.raise_for_status()
result = pd.DataFrame(r.json(strict=False))
# ^^ set strict=False to avoid JSONDecodeError on
# control characters
break
except (requests.exceptions.HTTPError,
requests.exceptions.ConnectionError) as err:
log.info(err)
time.sleep(20)
pass
else:
log.warning("exceeded max attempts")
return 'other_error'
if 'Error' in result.index:
if skip_errors:
log.debug(f"error in sic_{sic}")
break
elif result['Results'].astype(str).str.contains('Maximum').any():
return 'max_error'
else:
return 'other_error'
elif 'NoDataMsg' in result.index:
if skip_errors:
log.debug(f"no data in sic_{sic}")
break
else:
return 'no_data'
df_sub = pd.read_csv(BytesIO(r.content), low_memory=False)
if len(df_sub) < 3: continue
df = pd.concat([df, df_sub], ignore_index=True)
else:
df = pd.concat([df, pd.DataFrame(result['Results']['Results'])],
ignore_index=True)
# set page count
pages = int(result['Results']['PageCount'])
counter += 1
df = pd.read_csv(BytesIO(r.content), low_memory=False)
break
except (requests.exceptions.HTTPError,
requests.exceptions.ConnectionError) as err:
log.info(err)
time.sleep(20)
pass
else:
log.warning("exceeded max attempts")
return 'other_error'
log.debug(f"saving to {filepath}")
pd.to_pickle(df, filepath)
return 'success'


def standardize_df(input_df):
"""Modify DMR data to meet StEWI specifications."""
dmr_required_fields = pd.read_csv(DMR_DATA_PATH
.joinpath('DMR_required_fields.txt'),
dmr_required_fields = pd.read_csv(DMR_DATA_PATH / 'DMR_required_fields.txt',
header=None)[0]
output_df = input_df[dmr_required_fields].copy()
dmr_reliability_table = get_reliability_table_for_source('DMR')
dmr_reliability_table.drop(columns=['Code'], inplace=True)
dmr_reliability_table = (get_reliability_table_for_source('DMR')
.drop(columns=['Code']))
output_df['DataReliability'] = dmr_reliability_table[
'DQI Reliability Score'].values[0]

# Rename with standard column names
field_dictionary = {'ExternalPermitNmbr': 'FacilityID',
'Siccode': 'SIC',
'NaicsCode': 'NAICS',
'StateCode': 'State',
'PollutantLoad': 'FlowAmount',
'CountyName': 'County',
'GeocodeLatitude': 'Latitude',
'GeocodeLongitude': 'Longitude'}
field_dictionary = {'NPDES Permit Number': 'FacilityID',
'Facility Name': 'FacilityName',
'SIC Code': 'SIC',
'NAICS Code': 'NAICS',
'Pollutant Load (kg/yr)': 'FlowAmount',
'Facility Latitude': 'Latitude',
'Facility Longitude': 'Longitude',
'ZIP Code': 'Zip',}
if PARAM_GROUP:
field_dictionary['PollutantDesc'] = 'FlowName'
field_dictionary['PollutantCode'] = 'FlowID'
field_dictionary['Pollutant Description'] = 'FlowName'
field_dictionary['Pollutant Code'] = 'FlowID'
else:
field_dictionary['ParameterDesc'] = 'FlowName'
field_dictionary['ParameterCode'] = 'FlowID'
output_df.rename(columns=field_dictionary, inplace=True)
field_dictionary['Parameter Description'] = 'FlowName'
field_dictionary['Parameter Code'] = 'FlowID'
output_df = output_df.rename(columns=field_dictionary)
# Drop flow amount of '--'
output_df = output_df[output_df['FlowAmount'] != '--']
# Already in kg/yr, so no conversion necessary
Expand All @@ -232,7 +209,7 @@ def standardize_df(input_df):
flows.PARAMETER_CODE), ['FlowID']] = output_df['FlowID_x']
output_df.loc[~output_df.FlowID_x.isin(
flows.PARAMETER_CODE), ['FlowID']] = output_df['FlowID_y']
output_df.drop(columns=['FlowID_x', 'FlowID_y'], inplace=True)
output_df = output_df.drop(columns=['FlowID_x', 'FlowID_y'])

return output_df

Expand All @@ -255,7 +232,7 @@ def combine_DMR_inventory(year, nutrient=''):
result = unpickle(filepath)
if result is None:
log.warning(f'No data found for {state}. Retrying query...')
if (query_dmr(year=year, sic_list=None,
if (query_dmr(year=year,
state_list=[state],
nutrient=nutrient).get(state) == 'success'):
result = unpickle(filepath)
Expand Down Expand Up @@ -289,14 +266,13 @@ def download_state_totals_validation(year):
state_csv['Total Pollutant Pounds (lb/yr) for Non-Majors']
state_totals['Unit'] = 'lb'
state_names = states_df[['states', 'state_name']]
state_totals = state_totals.merge(state_names, how='left',
on='state_name')
state_totals.drop(columns=['state_name'], inplace=True)
state_totals.dropna(subset=['states'], inplace=True)
state_totals.rename(columns={'states': 'State'}, inplace=True)
state_totals = (state_totals
.merge(state_names, how='left', on='state_name')
.drop(columns=['state_name'])
.dropna(subset=['states'])
.rename(columns={'states': 'State'}))
log.info(f'saving DMR_{year}_StateTotals.csv to {DATA_PATH}')
state_totals.to_csv(DATA_PATH.joinpath(f"DMR_{year}_StateTotals.csv"),
index=False)
state_totals.to_csv(DATA_PATH / f"DMR_{year}_StateTotals.csv", index=False)

# Update validationSets_Sources.csv
validation_dict = {'Inventory': 'DMR',
Expand Down Expand Up @@ -331,7 +307,7 @@ def validate_state_totals(df, year):
state_flow_exclude_list = flow_exclude['POLLUTANT_DESC'].to_list()

dmr_by_state = df[~df['FlowName'].isin(state_flow_exclude_list)]
dmr_by_state = dmr_by_state[dmr_by_state['PermitTypeCode'] == 'NPD']
dmr_by_state = dmr_by_state[dmr_by_state['Permit Type'] == 'NPD']

dmr_by_state = dmr_by_state[['State', 'FlowAmount']]
dmr_by_state = dmr_by_state[['State', 'FlowAmount']
Expand Down Expand Up @@ -383,8 +359,8 @@ def consolidate_nutrients(df, drop_list, nutrient):
flow = ['Phosphorus', 'PHOSP']
elif nutrient == 'N':
flow = ['Nitrogen', 'N']
df.loc[(df['PollutantDesc'].isin(drop_list)), ['PollutantDesc',
'PollutantCode']] = flow
df.loc[(df['Pollutant Description'].isin(drop_list)),
['Pollutant Description', 'Pollutant Code']] = flow

return df

Expand Down Expand Up @@ -431,10 +407,10 @@ def remove_duplicate_organic_enrichment(df):
if not frame['NonPrefList'].all():
frame = frame[frame['PrefList']]
to_be_concat.append(frame)
df = pd.concat(to_be_concat)
df.sort_index(inplace=True)
df.drop(columns=['PrefList', 'NonPrefList'], inplace=True)
df.reset_index(inplace=True)
df = (pd.concat(to_be_concat)
.sort_index()
.drop(columns=['PrefList', 'NonPrefList'])
.reset_index(drop=True))
return df


Expand Down Expand Up @@ -481,10 +457,10 @@ def remove_nutrient_overlap_TRI(df, preference):
if not frame['NonPrefList'].all():
frame = frame[frame['PrefList']]
to_be_concat.append(frame)
df = pd.concat(to_be_concat)
df.sort_index(inplace=True)
df.drop(columns=['PrefList', 'NonPrefList'], inplace=True)
df.reset_index(inplace=True, drop=True)
df = (pd.concat(to_be_concat)
.sort_index()
.drop(columns=['PrefList', 'NonPrefList'])
.reset_index(drop=True))

return df

Expand All @@ -511,33 +487,15 @@ def main(**kwargs):
year = str(year)
if kwargs['Option'] == 'A':
log.info(f"Querying for {year}")

# two digit SIC codes from advanced search drop down stripped and formatted as a list
sic2 = list(pd.read_csv(DMR_DATA_PATH.joinpath('2_digit_SIC.csv'),
dtype={'SIC2': str})['SIC2'])
# Query by state, then by SIC-state where necessary
# Query by state
result_dict = query_dmr(year=year)
log.debug('possible errors: ' + ', '.join(
[s for s in result_dict.keys()
if result_dict[s] != 'success']))
state_max_error_list = [s for s in result_dict.keys()
if result_dict[s] == 'max_error']
state_no_data_list = [s for s in result_dict.keys()
if result_dict[s] == 'no_data']
if (len(state_max_error_list) == 0) and (len(state_no_data_list) == 0):
if (len(state_no_data_list) == 0):
log.info('all states succesfully downloaded')
else:
if (len(state_max_error_list) > 0):
log.error(f"Max error: {' '.join(state_max_error_list)}")
if (len(state_no_data_list) > 0):
log.error(f"No data error: {' '.join(state_no_data_list)}")
log.info('Breaking up queries further by SIC')
result_dict = query_dmr(year=year, sic_list=sic2,
state_list=state_max_error_list)
sic_state_max_error_list = [s for s in result_dict.keys()
if result_dict[s] == 'max_error']
if len(sic_state_max_error_list) > 0:
log.error(f"Max error: {' '.join(sic_state_max_error_list)}")

log.info(f"Querying nutrients for {year}")
# Query aggregated nutrients data
Expand All @@ -546,16 +504,11 @@ def main(**kwargs):
log.debug('possible errors: ' + ', '.join(
[s for s in result_dict.keys()
if result_dict[s] != 'success']))
state_max_error_list = [s for s in result_dict.keys()
if result_dict[s] == 'max_error']
state_no_data_list = [s for s in result_dict.keys()
if result_dict[s] == 'no_data']
if (len(state_max_error_list) == 0) and (len(state_no_data_list) == 0):
if (len(state_no_data_list) == 0):
log.info(f'all states succesfully downloaded for {nutrient}')
else:
result_dict = query_dmr(year=year, sic_list=sic2,
state_list=state_max_error_list,
nutrient=nutrient)

# write metadata
generate_metadata(year, datatype='source')

Expand Down Expand Up @@ -586,25 +539,26 @@ def main(**kwargs):
# Filter out nitrogen and phosphorus flows before combining
# with aggregated nutrients
dmr_nut_filtered = state_df[~state_df['FlowName'].isin(nut_drop_list)]
dmr_df = pd.concat([dmr_nut_filtered,
nutrient_agg_df]).reset_index(drop=True)
dmr_df = (pd.concat([dmr_nut_filtered, nutrient_agg_df])
.reset_index(drop=True))

# PermitTypeCode needed for state validation but not maintained
dmr_df = dmr_df.drop(columns=['PermitTypeCode'])
dmr_df = dmr_df.drop(columns=['Permit Type'])

# generate output for facility
facility_columns = ['FacilityID', 'FacilityName', 'City',
'State', 'Zip', 'Latitude', 'Longitude',
'County', 'NAICS', 'SIC'] # 'Address' not in DMR
dmr_facility = dmr_df[facility_columns].drop_duplicates()
dmr_facility['Zip'] = dmr_facility['Zip'].astype(str)
store_inventory(dmr_facility, f'DMR_{year}', 'facility')

# generate output for flow
flow_columns = ['FlowID', 'FlowName']
dmr_flow = dmr_df[flow_columns].drop_duplicates()
dmr_flow.sort_values(by=['FlowName'], inplace=True)
dmr_flow['Compartment'] = 'water'
dmr_flow['Unit'] = 'kg'
dmr_flow = (dmr_df.filter(['FlowID', 'FlowName'])
.drop_duplicates()
.sort_values(by=['FlowName'])
.assign(Compartment='water')
.assign(Unit='kg'))
store_inventory(dmr_flow, f'DMR_{year}', 'flow')

# generate output for flowbyfacility
Expand Down
Loading