-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
first draft of splitting NWSS signals #1946
base: main
Are you sure you want to change the base?
Changes from all commits
2043eaa
38713cb
2c96adc
6f68309
c7e300e
f9b0687
d337b4e
c968604
55a821c
61277b2
402f2ab
e82e9fd
9c546ec
19fe055
89b5793
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,12 +6,12 @@ | |
from sodapy import Socrata | ||
|
||
from .constants import ( | ||
SIGNALS, | ||
METRIC_SIGNALS, | ||
METRIC_DATES, | ||
SAMPLE_SITE_NAMES, | ||
PROVIDER_NORMS, | ||
SIG_DIGITS, | ||
NEWLINE, | ||
SIGNALS, | ||
TYPE_DICT, | ||
TYPE_DICT_METRIC, | ||
) | ||
|
||
|
||
|
@@ -34,47 +34,86 @@ def sig_digit_round(value, n_digits): | |
return result | ||
|
||
|
||
def construct_typedicts(): | ||
"""Create the type conversion dictionary for both dataframes.""" | ||
# basic type conversion | ||
type_dict = {key: float for key in SIGNALS} | ||
type_dict["timestamp"] = "datetime64[ns]" | ||
# metric type conversion | ||
signals_dict_metric = {key: float for key in METRIC_SIGNALS} | ||
metric_dates_dict = {key: "datetime64[ns]" for key in METRIC_DATES} | ||
type_dict_metric = {**metric_dates_dict, **signals_dict_metric, **SAMPLE_SITE_NAMES} | ||
return type_dict, type_dict_metric | ||
|
||
|
||
def warn_string(df, type_dict): | ||
"""Format the warning string.""" | ||
return f""" | ||
def convert_df_type(df, type_dict, logger): | ||
"""Convert types and warn if there are unexpected columns.""" | ||
try: | ||
df = df.astype(type_dict) | ||
except KeyError as exc: | ||
raise KeyError( | ||
f""" | ||
Expected column(s) missed, The dataset schema may | ||
have changed. Please investigate and amend the code. | ||
|
||
Columns needed: | ||
{NEWLINE.join(sorted(type_dict.keys()))} | ||
|
||
Columns available: | ||
{NEWLINE.join(sorted(df.columns))} | ||
expected={''.join(sorted(type_dict.keys()))} | ||
received={''.join(sorted(df.columns))} | ||
""" | ||
) from exc | ||
if new_columns := set(df.columns) - set(type_dict.keys()): | ||
logger.info("New columns found in NWSS dataset.", new_columns=new_columns) | ||
return df | ||
|
||
|
||
def reformat(df, df_metric): | ||
"""Combine df_metric and df | ||
|
||
def add_population(df, df_metric): | ||
"""Add the population column from df_metric to df, and rename some columns.""" | ||
Move population and METRIC_SIGNAL columns from df_metric to df, and rename | ||
date_start to timestamp. | ||
""" | ||
# drop unused columns from df_metric | ||
df_population = df_metric.loc[:, ["key_plot_id", "date_start", "population_served"]] | ||
df_metric_core = df_metric.loc[ | ||
:, ["key_plot_id", "date_end", "population_served", *METRIC_SIGNALS] | ||
] | ||
# get matching keys | ||
df_population = df_population.rename(columns={"date_start": "timestamp"}) | ||
df_population = df_population.set_index(["key_plot_id", "timestamp"]) | ||
df_metric_core = df_metric_core.rename(columns={"date_end": "timestamp"}) | ||
df_metric_core = df_metric_core.set_index(["key_plot_id", "timestamp"]) | ||
df = df.set_index(["key_plot_id", "timestamp"]) | ||
df = df.sort_index() | ||
|
||
df = df.join(df_population) | ||
df = df.join(df_metric_core) | ||
df = df.reset_index() | ||
return df | ||
|
||
|
||
def pull_nwss_data(socrata_token: str): | ||
def add_identifier_columns(df): | ||
"""Parse `key_plot_id` to create several key columns | ||
|
||
`key_plot_id` is of format "<provider>_<state>_<plant id>_wwtp_id". | ||
We split by `_` and put each resulting item into its own column. | ||
Add columns to get more detail than key_plot_id gives; specifically, state, and | ||
`provider_normalization`, which gives the signal identifier | ||
""" | ||
df = df.copy() | ||
# a pair of alphanumerics surrounded by _; for example, it matches "_al_", | ||
# and not "_3a_" and returns just the two letters "al" | ||
df["state"] = df.key_plot_id.str.extract(r"_(\w\w)_") | ||
# anything followed by state as described just above. | ||
# For example "CDC_VERILY_al" pulls out "CDC_VERILY" | ||
df["provider"] = df.key_plot_id.str.extract(r"(.*)_[a-z]{2}_") | ||
df["signal_name"] = df.provider + "_" + df.normalization | ||
return df | ||
|
||
|
||
def check_expected_signals(df): | ||
"""Make sure that there aren't any new signals that we need to add.""" | ||
# compare with existing column name checker | ||
# also add a note about handling errors | ||
unique_provider_norms = ( | ||
df[["provider", "normalization"]] | ||
.drop_duplicates() | ||
.sort_values(["provider", "normalization"]) | ||
.reset_index(drop=True) | ||
) | ||
for provider, normalization in zip( | ||
unique_provider_norms["provider"], unique_provider_norms["normalization"] | ||
): | ||
if not normalization in PROVIDER_NORMS[provider]: | ||
raise ValueError( | ||
f"There are new providers and/or norms." | ||
f"The full new set is\n{unique_provider_norms}" | ||
) | ||
|
||
|
||
def pull_nwss_data(token: str, logger): | ||
dsweber2 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"""Pull the latest NWSS Wastewater data, and conforms it into a dataset. | ||
|
||
The output dataset has: | ||
|
@@ -87,48 +126,50 @@ def pull_nwss_data(socrata_token: str): | |
---------- | ||
socrata_token: str | ||
My App Token for pulling the NWSS data (could be the same as the nchs data) | ||
test_file: Optional[str] | ||
When not null, name of file from which to read test data | ||
logger: the structured logger | ||
|
||
Returns | ||
------- | ||
pd.DataFrame | ||
Dataframe as described above. | ||
""" | ||
# concentration key types | ||
type_dict, type_dict_metric = construct_typedicts() | ||
|
||
# Pull data from Socrata API | ||
client = Socrata("data.cdc.gov", socrata_token) | ||
client = Socrata("data.cdc.gov", token) | ||
results_concentration = client.get("g653-rqe2", limit=10**10) | ||
results_metric = client.get("2ew6-ywp6", limit=10**10) | ||
Comment on lines
+137
to
139
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. issue: URL and GET IDs should be saved as constants (with descriptive names). I know this isn't part of this PR, so worth opening an issue for. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do you mean There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep |
||
df_metric = pd.DataFrame.from_records(results_metric) | ||
df_concentration = pd.DataFrame.from_records(results_concentration) | ||
df_concentration = df_concentration.rename(columns={"date": "timestamp"}) | ||
|
||
try: | ||
df_concentration = df_concentration.astype(type_dict) | ||
except KeyError as exc: | ||
raise ValueError(warn_string(df_concentration, type_dict)) from exc | ||
# Schema checks. | ||
df_concentration = convert_df_type(df_concentration, TYPE_DICT, logger) | ||
df_metric = convert_df_type(df_metric, TYPE_DICT_METRIC, logger) | ||
|
||
try: | ||
df_metric = df_metric.astype(type_dict_metric) | ||
except KeyError as exc: | ||
raise ValueError(warn_string(df_metric, type_dict_metric)) from exc | ||
# Drop sites without a normalization scheme. | ||
df = df_concentration[~df_concentration["normalization"].isna()] | ||
|
||
# pull 2 letter state labels out of the key_plot_id labels | ||
df_concentration["state"] = df_concentration.key_plot_id.str.extract(r"_(\w\w)_") | ||
# Pull 2 letter state labels out of the key_plot_id labels. | ||
df = add_identifier_columns(df) | ||
|
||
# move population and metric signals over to df | ||
df = reformat(df, df_metric) | ||
# round out some of the numeric noise that comes from smoothing | ||
df_concentration[SIGNALS[0]] = sig_digit_round( | ||
df_concentration[SIGNALS[0]], SIG_DIGITS | ||
) | ||
for signal in [*SIGNALS, *METRIC_SIGNALS]: | ||
df[signal] = sig_digit_round(df[signal], SIG_DIGITS) | ||
|
||
df_concentration = add_population(df_concentration, df_metric) | ||
# if there are population NA's, assume the previous value is accurate (most | ||
# likely introduced by dates only present in one and not the other; even | ||
# otherwise, best to assume some value rather than break the data) | ||
Comment on lines
160
to
162
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. question: RE "dates only present in one and not the other", is this referring to the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is; only one of them actually has population data listed (the metric data vs the concentration). There's actually a ton of signal metadata present in the metric data not in the concentration data. |
||
df_concentration.population_served = df_concentration.population_served.ffill() | ||
|
||
keep_columns = ["timestamp", "state", "population_served"] | ||
return df_concentration[SIGNALS + keep_columns] | ||
df.population_served = df.population_served.ffill() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. issue: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good catch; I'm adding a |
||
check_expected_signals(df) | ||
|
||
keep_columns = [ | ||
*SIGNALS, | ||
*METRIC_SIGNALS, | ||
"timestamp", | ||
"state", | ||
"population_served", | ||
"normalization", | ||
"provider", | ||
] | ||
return df[keep_columns] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: What is the distinction between these two signal sets?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: I find these signal names hard to parse. I'd prefer longer, more descriptive names. Our final signal names will include source and normalization method, though, so maybe they'd get too long?
Worth more thought. Have you checked these names with Roni yet?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They're from two separate socrata APIs.
I haven't run the names by Roni, that's a good idea. The names are based on mirroring the original dataset's names.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might have been left out of the "how to make an indicator" doc, but officially we're supposed to check signal names with Roni.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"question" is resolved, renaming is still open.