Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

secondary compartment/context assignment #106

Merged
merged 21 commits into from
May 9, 2022
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
0ffdb96
Update setup.py
a-w-beck Dec 6, 2021
918d840
Merge branch 'release_1.0.3' into urban_rural_classify
bl-young Apr 7, 2022
b793cdf
New assign_secondary_context handler function in globals for esupy.co…
a-w-beck Apr 8, 2022
1ea0b05
Ammended remainder of inplace=True pandas method calls
a-w-beck Apr 8, 2022
26e9699
Reorganized secondary context/compartment functions s.t. assignment h…
a-w-beck Apr 8, 2022
f2568cf
Migration of globals.assign_secondary_context() down into esupy.conte…
a-w-beck Apr 11, 2022
e36902c
remove unneeded esupy pip VCS install line
a-w-beck Apr 11, 2022
84f051c
adjust validation to include compartment in some cases
bl-young Apr 14, 2022
50ce8d6
fix missing cmpt_urb
bl-young Apr 14, 2022
94e800b
remove extra parameter
bl-young Apr 14, 2022
91ad470
Merge branch 'master' into urban_rural_classify
a-w-beck Apr 15, 2022
7407ceb
Update README.md
a-w-beck Apr 15, 2022
f738437
Corrected flow control :bug: preventing release height concatenation …
a-w-beck Apr 18, 2022
05be46d
Set concat_compartment()'s has_geo_pkgs arg as True in TRI.py within …
a-w-beck Apr 18, 2022
00c8006
Merge branch 'develop' into urban_rural_classify
bl-young Apr 20, 2022
ed74501
Rename column cmpt_urb --> UrbanRural; allow urban/rural assignment f…
a-w-beck May 3, 2022
74cbb7e
add `UrbanRural` to format specs
bl-young May 4, 2022
84cb0fe
return applied parameters when generating inventories, and add to met…
bl-young May 4, 2022
3c8128b
add `ready_for_review` for revised PRs
bl-young May 4, 2022
7e7cf1d
fix `FutureWarning`
bl-young May 4, 2022
1b06206
Temporarily set all compartments to the primary compartment until ove…
bl-young May 4, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ or
pip install . -r requirements.txt -r rcrainfo_requirements.txt
```

### Secondary Context Installation Steps
In order to enable calculation and assignment of urban/rural and release height secondary contexts, please refer to [esupy's README.md](https://github.com/USEPA/esupy/tree/main#installation-instructions-for-optional-geospatial-packages) for installation instructions, which may require a copy of the [`env_sec_ctxt.yaml`](https://github.com/USEPA/standardizedinventories/blob/master/env_sec_ctxt.yaml) file included here.


## Wiki

Expand Down
19 changes: 19 additions & 0 deletions env_sec_ctxt.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
name: stewi_2ndry_ctxt
channels:
- defaults
dependencies:
- python=3.9.7
- pip=21.2.4 # must specify else conda returns warning
- numpy=1.21.2
- pandas=1.3.4
- geopandas=0.9.0
- shapely=1.7.1
- requests=2.26.0
- appdirs=1.4.4
#- pytest=6.2.4
- spyder

- pip:
- pyarrow==6.0.1 # causes error if installed via conda default channel "OSError: NotImplemented: Support for codec 'snappy' not built"
- requests_ftp==0.3.1
- "--editable=git+https://github.com/USEPA/standardizedinventories.git@urban_rural_classify#egg=StEWI"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we'll need to remember to update this to develop or master

2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
git+https://github.com/USEPA/esupy#egg=esupy
git+https://github.com/USEPA/esupy.git@develop#egg=esupy
pandas>=1.3 # Powerful data structures for data analysis, time series, and statistics.
numpy>=1.20.1 # NumPy is the fundamental package for array computing with Python
requests>=2.20 # Python HTTP for Humans; used for webservice calls
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from setuptools import setup

install_requires=['esupy @ git+https://github.com/USEPA/esupy#egg=esupy',
install_requires=['esupy @ git+https://github.com/USEPA/esupy.git@urban_rural_classify#egg=esupy',
bl-young marked this conversation as resolved.
Show resolved Hide resolved
'numpy>=1.20.1',
'pandas>=1.3',
'requests>=2.20',
Expand Down
52 changes: 25 additions & 27 deletions stewi/NEI.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,21 @@
2011
"""

import pandas as pd
import numpy as np
import argparse
import requests
import zipfile
import io
import zipfile
from pathlib import Path

import numpy as np
import pandas as pd
import requests

from esupy.processed_data_mgmt import download_from_remote
from esupy.util import strip_file_extension
from stewi.globals import DATA_PATH, write_metadata, USton_kg, lb_kg,\
log, store_inventory, config, read_source_metadata,\
paths, aggregate, get_reliability_table_for_source, set_stewi_meta
paths, aggregate, get_reliability_table_for_source, set_stewi_meta,\
assign_secondary_context
from stewi.validate import update_validationsets_sources, validate_inventory,\
write_validation_result

Expand Down Expand Up @@ -110,20 +112,11 @@ def standardize_output(year, source='Point'):
'ReliabilityScore'])

nei['Compartment'] = 'air'
"""
# Modify compartment based on stack height (ft)
nei.loc[nei['StackHeight'] < 32, 'Compartment'] = 'air/ground'
nei.loc[(nei['StackHeight'] >= 32) & (nei['StackHeight'] < 164),
'Compartment'] = 'air/low'
nei.loc[(nei['StackHeight'] >= 164) & (nei['StackHeight'] < 492),
'Compartment'] = 'air/high'
nei.loc[nei['StackHeight'] >= 492, 'Compartment'] = 'air/very high'
"""
else:
nei['DataReliability'] = 3
# add Source column
nei['Source'] = source
nei.reset_index(drop=True, inplace=True)
nei = nei.reset_index(drop=True)
return nei


Expand Down Expand Up @@ -182,9 +175,10 @@ def generate_national_totals(year):
df['FlowAmount'] * USton_kg)
df = df.drop(columns=['UOM'])
# sum across all facilities to create national totals
df = df.groupby(['FlowID', 'FlowName'])['FlowAmount'].sum().reset_index()
df = (df.groupby(['FlowID', 'FlowName'])['FlowAmount'].sum()
.reset_index()
.rename(columns={'FlowAmount': 'FlowAmount[kg]'}))
# save national totals to .csv
df.rename(columns={'FlowAmount': 'FlowAmount[kg]'}, inplace=True)
log.info(f'saving NEI_{year}_NationalTotals.csv to {DATA_PATH}')
df.to_csv(DATA_PATH.joinpath(f'NEI_{year}_NationalTotals.csv'),
index=False)
Expand All @@ -208,11 +202,10 @@ def validate_national_totals(nei_flowbyfacility, year):
generate_national_totals(year)
else:
log.info('using already processed national totals validation file')
nei_national_totals = pd.read_csv(DATA_PATH
.joinpath(f'NEI_{year}_NationalTotals.csv'),
header=0, dtype={"FlowAmount[kg]": float})
nei_national_totals.rename(columns={'FlowAmount[kg]': 'FlowAmount'},
inplace=True)
nei_national_totals = (
pd.read_csv(DATA_PATH.joinpath(f'NEI_{year}_NationalTotals.csv'),
header=0, dtype={"FlowAmount[kg]": float})
.rename(columns={'FlowAmount[kg]': 'FlowAmount'}))
validation_result = validate_inventory(nei_flowbyfacility,
nei_national_totals,
group_by='flow', tolerance=5.0)
Expand Down Expand Up @@ -253,9 +246,12 @@ def main(**kwargs):
if kwargs['Option'] == 'A':

nei_point = standardize_output(year)
nei_point = assign_secondary_context(nei_point, int(year),
'urb', 'rh', 'concat')

log.info('generating flow by facility output')
nei_flowbyfacility = aggregate(nei_point, ['FacilityID', 'FlowName'])
nei_flowbyfacility = aggregate(nei_point, ['FacilityID', 'FlowName',
'Compartment'])
store_inventory(nei_flowbyfacility, 'NEI_' + year, 'flowbyfacility')
log.debug(len(nei_flowbyfacility))
#2017: 2184786
Expand All @@ -264,7 +260,7 @@ def main(**kwargs):
#2011: 1840866

log.info('generating flow by SCC output')
nei_flowbyprocess = aggregate(nei_point, ['FacilityID',
nei_flowbyprocess = aggregate(nei_point, ['FacilityID', 'Compartment',
'FlowName', 'Process'])
nei_flowbyprocess['ProcessType'] = 'SCC'
store_inventory(nei_flowbyprocess, 'NEI_' + year, 'flowbyprocess')
Expand All @@ -284,9 +280,11 @@ def main(**kwargs):
#2011: 277

log.info('generating facility output')
facility = nei_point[['FacilityID', 'FacilityName', 'Address',
'City', 'State', 'Zip', 'Latitude',
'Longitude', 'NAICS', 'County']]
fac_fields = ['FacilityID', 'FacilityName', 'Address',
'City', 'State', 'Zip', 'Latitude',
'Longitude', 'NAICS', 'County', 'cmpt_urb']
facility = nei_point[[f for f in fac_fields
if f in nei_point.columns]]
facility = facility.drop_duplicates('FacilityID')
facility = facility.astype({'Zip': 'str'})
store_inventory(facility, 'NEI_' + year, 'facility')
Expand Down
123 changes: 61 additions & 62 deletions stewi/TRI.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from stewi.globals import unit_convert, DATA_PATH, set_stewi_meta,\
get_reliability_table_for_source, write_metadata, url_is_alive,\
lb_kg, g_kg, config, store_inventory, log, paths, compile_source_metadata,\
read_source_metadata, aggregate
read_source_metadata, aggregate, assign_secondary_context, concat_compartment
from stewi.validate import update_validationsets_sources, validate_inventory,\
write_validation_result
import stewi.exceptions
Expand Down Expand Up @@ -95,8 +95,7 @@ def generate_national_totals(year):
"""
filename = TRI_DATA_PATH.joinpath(f'TRI_chem_release_{year}.csv')
df = pd.read_csv(filename, header=0)
df.replace(',', 0.0, inplace=True)
df.replace('.', 0.0, inplace=True)
df = df.replace(',', 0.0).replace('.', 0.0)
cols = ['Compartment', 'FlowName', 'Unit', 'FlowAmount']
compartments = {'air': ['Fugitive Air Emissions',
'Point Source Air Emissions'],
Expand All @@ -114,7 +113,7 @@ def generate_national_totals(year):
for column in columns:
df_aux[column] = df_aux[column].str.replace(',', '').astype('float')
df_aux['FlowAmount'] = df_aux[columns].sum(axis=1)
df_aux.rename(columns={'Chemical': 'FlowName'}, inplace=True)
df_aux = df_aux.rename(columns={'Chemical': 'FlowName'})
df_aux['Unit'] = 'Pounds'
df_aux['Compartment'] = compartment
df_National = pd.concat([df_National, df_aux], axis=0,
Expand All @@ -128,7 +127,7 @@ def generate_national_totals(year):
if df_National is None:
log.warning('Totals not generated')
return
df_National.sort_values(by=['FlowName', 'Compartment'], inplace=True)
df_National = df_National.sort_values(by=['FlowName', 'Compartment'])
log.info(f'saving TRI_{year}_NationalTotals.csv to {DATA_PATH}')
df_National.to_csv(DATA_PATH.joinpath(f'TRI_{year}_NationalTotals.csv'),
index=False)
Expand Down Expand Up @@ -179,10 +178,11 @@ def map_to_fedefl(df):
return mapped_df


def imp_fields(tri_fields_txt):
"""Import list of fields from TRI that are desired for LCI."""
tri_required_fields_csv = tri_fields_txt
tri_req_fields = pd.read_csv(tri_required_fields_csv, header=None)
def imp_fields(fname):
"""
Import list of fields from TRI that are desired for LCI.
"""
tri_req_fields = pd.read_csv(fname, header=None)
tri_req_fields = list(tri_req_fields[0])
return tri_req_fields

Expand Down Expand Up @@ -233,12 +233,6 @@ def import_TRI_by_release_type(d, year):
return tri


def strip_coln_white_space(df, coln):
"""Remove white space after some basis of estimate codes."""
df[coln] = df[coln].str.strip()
return df


def validate_national_totals(inv, TRIyear):
log.info('validating data against national totals')
filename = DATA_PATH.joinpath(f'TRI_{TRIyear}_NationalTotals.csv')
Expand All @@ -248,85 +242,66 @@ def validate_national_totals(inv, TRIyear):
tri_national_totals['FlowAmount_kg'] = 0
tri_national_totals = unit_convert(tri_national_totals, 'FlowAmount_kg',
'Unit', 'Pounds', lb_kg, 'FlowAmount')
# drop old amount and units
tri_national_totals.drop('FlowAmount', axis=1, inplace=True)
tri_national_totals.drop('Unit', axis=1, inplace=True)
# Rename cols to match reference format
tri_national_totals.rename(columns={'FlowAmount_kg': 'FlowAmount'},
inplace=True)
# drop old amount and units; rename cols to match reference format
tri_national_totals = \
(tri_national_totals.drop(columns=['FlowAmount', 'Unit'])
.rename(columns={'FlowAmount_kg': 'FlowAmount'}))
inv = map_to_fedefl(inv)
if inv is not None:
validation_result = validate_inventory(inv, tri_national_totals,
group_by='flow', tolerance=5.0)
group_by='compartment', tolerance=5.0)
write_validation_result('TRI', TRIyear, validation_result)
else:
log.warning(f'validation file for TRI_{TRIyear} does not exist. '
'Please run option B')


def Generate_TRI_files_csv(TRIyear, Files):
"""Generate TRI inventories from downloaded files."""
def Generate_TRI_files_csv(TRIyear):
"""
Generate TRI inventories from downloaded files.
:param TRIyear: str
"""
tri_required_fields = imp_fields(TRI_DATA_PATH.joinpath('TRI_required_fields.txt'))
keys = imp_fields(TRI_DATA_PATH.joinpath('TRI_keys.txt'))
values = list()
for p in range(len(keys)):
start = 13 + 2*p
end = start + 1
values.append(concat_req_field(tri_required_fields[start: end + 1]))
# Create a dictionary that had the import fields for each release
# type to use in import process
# Create dict of required fields on import for each release type
import_dict = dict_create(keys, values)
# Build the TRI DataFrame
tri = import_TRI_by_release_type(import_dict, TRIyear)
# drop NA for Amount, but leave in zeros
tri = tri.dropna(subset=['FlowAmount'])
tri = strip_coln_white_space(tri, 'Basis of Estimate')
tri = tri.dropna(subset=['FlowAmount']) # drop nan amount rows
tri['Basis of Estimate'] = tri['Basis of Estimate'].str.strip() # rm trailing spaces
# Convert to float if there are errors - be careful with this line
if tri['FlowAmount'].values.dtype != 'float64':
tri['FlowAmount'] = pd.to_numeric(tri['FlowAmount'], errors='coerce')
tri = tri[tri['FlowAmount'] != 0]
# Import reliability scores for TRI
tri_reliability_table = get_reliability_table_for_source('TRI')
tri = pd.merge(tri, tri_reliability_table, left_on='Basis of Estimate',
right_on='Code', how='left')
tri = (pd.merge(tri, tri_reliability_table, left_on='Basis of Estimate',
right_on='Code', how='left')
.drop(columns=['Basis of Estimate', 'Code']))
tri['DQI Reliability Score'] = tri['DQI Reliability Score'].fillna(value=5)
tri.drop(['Basis of Estimate', 'Code'], axis=1, inplace=True)
# Replace source info with Context
source_to_context = pd.read_csv(TRI_DATA_PATH
.joinpath('TRI_ReleaseType_to_Compartment.csv'))
source_to_context = pd.read_csv(TRI_DATA_PATH.joinpath(
'TRI_ReleaseType_to_Compartment.csv'))
tri = pd.merge(tri, source_to_context, how='left')
# Convert units to ref mass unit of kg
tri['Amount_kg'] = 0.0
tri = unit_convert(tri, 'Amount_kg', 'Unit', 'Pounds', lb_kg, 'FlowAmount')
tri = unit_convert(tri, 'Amount_kg', 'Unit', 'Grams', g_kg, 'FlowAmount')
tri.drop(columns=['FlowAmount', 'Unit'], inplace=True)
# Rename cols to match reference format
tri.rename(columns={'Amount_kg': 'FlowAmount',
'DQI Reliability Score': 'DataReliability'},
inplace=True)
tri.drop(columns=['ReleaseType'], inplace=True)
grouping_vars = ['FacilityID', 'FlowName', 'CAS', 'Compartment']
tri = aggregate(tri, grouping_vars)

validate_national_totals(tri, TRIyear)
tri = (tri.drop(columns=['FlowAmount', 'Unit', 'ReleaseType'])
.rename(columns={'Amount_kg': 'FlowAmount', # to match reference format
'DQI Reliability Score': 'DataReliability'}))

# FLOWS
flowsdf = tri[['FlowName', 'CAS', 'Compartment']
].drop_duplicates().reset_index(drop=True)
flowsdf.loc[:, 'FlowID'] = flowsdf['CAS']
store_inventory(flowsdf, 'TRI_' + TRIyear, 'flow')

# FLOW BY FACILITY
tri.drop(columns=['CAS'], inplace=True)
store_inventory(tri, 'TRI_' + TRIyear, 'flowbyfacility')

# FACILITY
# Import and handle TRI facility data
# FACILITY - import and handle TRI facility data
import_facility = tri_required_fields[0:10]
tri_facility = pd.read_csv(OUTPUT_PATH.joinpath(f'US_1a_{TRIyear}.csv'),
usecols=import_facility,
low_memory=False)
tri_facility = tri_facility.drop_duplicates(ignore_index=True)
tri_facility = (pd.read_csv(OUTPUT_PATH.joinpath(f'US_1a_{TRIyear}.csv'),
usecols=import_facility,
low_memory=False)
.drop_duplicates(ignore_index=True))
# rename columns
TRI_facility_name_crosswalk = {
'TRIFID': 'FacilityID',
Expand All @@ -340,9 +315,33 @@ def Generate_TRI_files_csv(TRIyear, Files):
'LATITUDE': 'Latitude',
'LONGITUDE': 'Longitude',
}
tri_facility.rename(columns=TRI_facility_name_crosswalk, inplace=True)
tri_facility = tri_facility.rename(columns=TRI_facility_name_crosswalk)

tri_facility = assign_secondary_context(tri_facility, int(TRIyear), 'urb')
store_inventory(tri_facility, 'TRI_' + TRIyear, 'facility')

if 'cmpt_urb' in tri_facility.columns: # given urban/rural assignment success
# merge & concat urban/rural into tri.Compartment before aggregation
tri = tri.merge(tri_facility[['FacilityID', 'cmpt_urb']].drop_duplicates(),
how='left', on='FacilityID')
tri.loc[tri['Compartment'] != 'air', 'cmpt_urb'] = 'unspecified'
tri = concat_compartment(tri, 'urb')

grouping_vars = ['FacilityID', 'FlowName', 'CAS', 'Compartment']
tri = aggregate(tri, grouping_vars)

validate_national_totals(tri, TRIyear)

# FLOWS
flows = (tri[['FlowName', 'CAS', 'Compartment']].drop_duplicates()
.reset_index(drop=True))
flows['FlowID'] = flows['CAS']
store_inventory(flows, 'TRI_' + TRIyear, 'flow')

# FLOW BY FACILITY
fbf = tri.drop(columns=['CAS'])
store_inventory(fbf, 'TRI_' + TRIyear, 'flowbyfacility')


def generate_metadata(year, files, datatype='inventory'):
"""Get metadata and writes to .json."""
Expand Down Expand Up @@ -428,7 +427,7 @@ def main(**kwargs):

elif kwargs['Option'] == 'C':
log.info(f'generating TRI inventory from files for {year}')
Generate_TRI_files_csv(year, files)
Generate_TRI_files_csv(year)
generate_metadata(year, files, datatype='inventory')


Expand Down
2 changes: 1 addition & 1 deletion stewi/egrid.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ def validate_eGRID(year, flowbyfac):
# drop old unit
egrid_national_totals.drop('Unit', axis=1, inplace=True)
validation_result = validate_inventory(flowbyfac, egrid_national_totals,
group_by='flow', tolerance=5.0)
group_by='compartment', tolerance=5.0)
write_validation_result('eGRID', year, validation_result)


Expand Down
Loading