From 2043eaa505dc0b0ce3c01d9180c7f855ca8775c9 Mon Sep 17 00:00:00 2001 From: dsweber2 Date: Thu, 22 Feb 2024 16:03:05 -0800 Subject: [PATCH 01/18] first draft of splitting signals --- nwss_wastewater/delphi_nwss/constants.py | 12 ++- nwss_wastewater/delphi_nwss/pull.py | 98 +++++++++++++++++++----- nwss_wastewater/delphi_nwss/run.py | 70 +++++++++++------ nwss_wastewater/tests/test_pull.py | 35 ++++++++- nwss_wastewater/tests/test_run.py | 55 ++++++++----- 5 files changed, 203 insertions(+), 67 deletions(-) diff --git a/nwss_wastewater/delphi_nwss/constants.py b/nwss_wastewater/delphi_nwss/constants.py index 5e8eb2aeb..99af24d5f 100644 --- a/nwss_wastewater/delphi_nwss/constants.py +++ b/nwss_wastewater/delphi_nwss/constants.py @@ -12,6 +12,16 @@ SIGNALS = ["pcr_conc_smoothed"] METRIC_SIGNALS = ["detect_prop_15d", "percentile", "ptc_15d"] +PROVIDER_NORMS = { + "provider": ["CDC_VERILY", "CDC_VERILY", "NWSS", "NWSS", "WWS"], + "normalization": [ + "flow-population", + "microbial", + "flow-population", + "microbial", + "microbial", + ], +} METRIC_DATES = ["date_start", "date_end"] SAMPLE_SITE_NAMES = { "wwtp_jurisdiction": "category", @@ -24,6 +34,6 @@ "sampling_prior": bool, "sample_location_specify": float, } -SIG_DIGITS = 7 +SIG_DIGITS = 4 NEWLINE = "\n" diff --git a/nwss_wastewater/delphi_nwss/pull.py b/nwss_wastewater/delphi_nwss/pull.py index f4b781e12..affe40a65 100644 --- a/nwss_wastewater/delphi_nwss/pull.py +++ b/nwss_wastewater/delphi_nwss/pull.py @@ -7,6 +7,7 @@ from .constants import ( SIGNALS, + PROVIDER_NORMS, METRIC_SIGNALS, METRIC_DATES, SAMPLE_SITE_NAMES, @@ -28,7 +29,7 @@ def sig_digit_round(value, n_digits): sign_mask = value < 0 value[sign_mask] *= -1 exponent = np.ceil(np.log10(value)) - result = 10**exponent * np.round(value * 10 ** (-exponent), n_digits) + result = 10 ** exponent * np.round(value * 10 ** (-exponent), n_digits) result[sign_mask] *= -1 result[zero_mask] = in_value[zero_mask] return result @@ -60,21 +61,66 @@ def warn_string(df, type_dict): """ -def add_population(df, df_metric): - """Add the population column from df_metric to df, and rename some columns.""" +def reformat(df, df_metric): + """Add columns from df_metric to df, and rename some columns. + + Specifically the population and METRIC_SIGNAL columns, and renames date_start to timestamp. + """ # drop unused columns from df_metric - df_population = df_metric.loc[:, ["key_plot_id", "date_start", "population_served"]] + df_metric_core = df_metric.loc[ + :, ["key_plot_id", "date_start", "population_served", *METRIC_SIGNALS] + ] # get matching keys - df_population = df_population.rename(columns={"date_start": "timestamp"}) - df_population = df_population.set_index(["key_plot_id", "timestamp"]) + df_metric_core = df_metric_core.rename(columns={"date_start": "timestamp"}) + df_metric_core = df_metric_core.set_index(["key_plot_id", "timestamp"]) df = df.set_index(["key_plot_id", "timestamp"]) - df = df.join(df_population) + df = df.join(df_metric_core) df = df.reset_index() return df -def pull_nwss_data(socrata_token: str): +def drop_unnormalized(df): + """Drop unnormalized. + + mutate `df` to no longer have rows where the normalization scheme isn't actually identified, + as we can't classify the kind of signal + """ + return df[~df["normalization"].isna()] + + +def add_identifier_columns(df): + """Add identifier columns. + + Add columns to get more detail than key_plot_id gives; + specifically, state, and `provider_normalization`, which gives the signal identifier + """ + df["state"] = df.key_plot_id.str.extract( + r"_(\w\w)_" + ) # a pair of alphanumerics surrounded by _ + df["provider"] = df.key_plot_id.str.extract( + r"(.*)_[a-z]{2}_" + ) # anything followed by state ^ + df["signal_name"] = df.provider + "_" + df.normalization + + +def check_endpoints(df): + """Make sure that there aren't any new signals that we need to add.""" + # compare with existing column name checker + # also add a note about handling errors + unique_provider_norms = ( + df[["provider", "normalization"]] + .drop_duplicates() + .sort_values(["provider", "normalization"]) + .reset_index(drop=True) + ) + if not unique_provider_norms.equals(pd.DataFrame(PROVIDER_NORMS)): + raise ValueError( + f"There are new providers and/or norms. They are\n{unique_provider_norms}" + ) + + +def pull_nwss_data(token: str): """Pull the latest NWSS Wastewater data, and conforms it into a dataset. The output dataset has: @@ -95,13 +141,15 @@ def pull_nwss_data(socrata_token: str): pd.DataFrame Dataframe as described above. """ + # Constants + keep_columns = [*SIGNALS, *METRIC_SIGNALS] # concentration key types type_dict, type_dict_metric = construct_typedicts() # Pull data from Socrata API - client = Socrata("data.cdc.gov", socrata_token) - results_concentration = client.get("g653-rqe2", limit=10**10) - results_metric = client.get("2ew6-ywp6", limit=10**10) + client = Socrata("data.cdc.gov", token) + results_concentration = client.get("g653-rqe2", limit=10 ** 10) + results_metric = client.get("2ew6-ywp6", limit=10 ** 10) df_metric = pd.DataFrame.from_records(results_metric) df_concentration = pd.DataFrame.from_records(results_concentration) df_concentration = df_concentration.rename(columns={"date": "timestamp"}) @@ -116,19 +164,29 @@ def pull_nwss_data(socrata_token: str): except KeyError as exc: raise ValueError(warn_string(df_metric, type_dict_metric)) from exc + # if the normalization scheme isn't recorded, why is it even included as a sample site? + df = drop_unnormalized(df_concentration) # pull 2 letter state labels out of the key_plot_id labels - df_concentration["state"] = df_concentration.key_plot_id.str.extract(r"_(\w\w)_") + add_identifier_columns(df) + # move population and metric signals over to df + df = reformat(df, df_metric) # round out some of the numeric noise that comes from smoothing - df_concentration[SIGNALS[0]] = sig_digit_round( - df_concentration[SIGNALS[0]], SIG_DIGITS - ) + for signal in [*SIGNALS, *METRIC_SIGNALS]: + df[signal] = sig_digit_round(df[signal], SIG_DIGITS) - df_concentration = add_population(df_concentration, df_metric) # if there are population NA's, assume the previous value is accurate (most # likely introduced by dates only present in one and not the other; even # otherwise, best to assume some value rather than break the data) - df_concentration.population_served = df_concentration.population_served.ffill() - - keep_columns = ["timestamp", "state", "population_served"] - return df_concentration[SIGNALS + keep_columns] + df.population_served = df.population_served.ffill() + check_endpoints(df) + keep_columns.extend( + [ + "timestamp", + "state", + "population_served", + "normalization", + "provider", + ] + ) + return df[keep_columns] diff --git a/nwss_wastewater/delphi_nwss/run.py b/nwss_wastewater/delphi_nwss/run.py index 378849ba5..5a21ec49f 100644 --- a/nwss_wastewater/delphi_nwss/run.py +++ b/nwss_wastewater/delphi_nwss/run.py @@ -29,7 +29,7 @@ from delphi_utils import S3ArchiveDiffer, get_structured_logger, create_export_csv from delphi_utils.nancodes import add_default_nancodes -from .constants import GEOS, SIGNALS +from .constants import GEOS, METRIC_SIGNALS, PROVIDER_NORMS, SIGNALS from .pull import pull_nwss_data @@ -50,7 +50,7 @@ def generate_weights(df, column_aggregating="pcr_conc_smoothed"): """ # set the weight of places with na's to zero df[f"relevant_pop_{column_aggregating}"] = ( - df["population_served"] * df[column_aggregating].notna() + df["population_served"] * np.abs(df[column_aggregating]).notna() ) # generate the weighted version df[f"weighted_{column_aggregating}"] = ( @@ -126,38 +126,60 @@ def run_module(params): export_dir = params["common"]["export_dir"] socrata_token = params["indicator"]["socrata_token"] if "archive" in params: - daily_arch_diff = S3ArchiveDiffer( + arch_diff = S3ArchiveDiffer( params["archive"]["cache_dir"], export_dir, params["archive"]["bucket_name"], - "nchs_mortality", + "nwss_wastewater", params["archive"]["aws_credentials"], ) - daily_arch_diff.update_cache() + arch_diff.update_cache() run_stats = [] ## build the base version of the signal at the most detailed geo level you can get. ## compute stuff here or farm out to another function or file df_pull = pull_nwss_data(socrata_token) ## aggregate - for sensor in SIGNALS: - df = df_pull.copy() - # add weighed column - df = generate_weights(df, sensor) - - for geo in GEOS: - logger.info("Generating signal and exporting to CSV", metric=sensor) - if geo == "nation": - agg_df = weighted_nation_sum(df, sensor) - else: - agg_df = weighted_state_sum(df, geo, sensor) - # add se, sample_size, and na codes - agg_df = add_needed_columns(agg_df) - # actual export - dates = create_export_csv( - agg_df, geo_res=geo, export_dir=export_dir, sensor=sensor - ) - if len(dates) > 0: - run_stats.append((max(dates), len(dates))) + # iterate over the providers and the normalizations that they specifically provide + for (provider, normalization) in zip( + PROVIDER_NORMS["provider"], PROVIDER_NORMS["normalization"] + ): + # copy by only taking the relevant subsection + df_prov_norm = df_pull[ + (df_pull.provider == provider) & (df_pull.normalization == normalization) + ] + df_prov_norm = df_prov_norm.drop(["provider", "normalization"], axis=1) + for sensor in [*SIGNALS, *METRIC_SIGNALS]: + full_sensor_name = sensor + "_" + provider + "_" + normalization + df_prov_norm = df_prov_norm.rename(columns={sensor: full_sensor_name}) + # add weighed column + df = generate_weights(df_prov_norm, full_sensor_name) + for geo in GEOS: + logger.info( + "Generating signal and exporting to CSV", metric=full_sensor_name + ) + if geo == "nation": + agg_df = weighted_nation_sum(df, full_sensor_name) + else: + agg_df = weighted_state_sum(df, geo, full_sensor_name) + # add se, sample_size, and na codes + agg_df = add_needed_columns(agg_df) + # actual export + dates = create_export_csv( + agg_df, geo_res=geo, export_dir=export_dir, sensor=full_sensor_name + ) + if "archive" in params: + _, common_diffs, new_files = arch_diff.diff_exports() + to_archive = [ + f for f, diff in common_diffs.items() if diff is not None + ] + to_archive += new_files + _, fails = arch_diff.archive_exports(to_archive) + succ_common_diffs = { + f: diff for f, diff in common_diffs.items() if f not in fails + } + arch_diff.filter_exports(succ_common_diffs) + if len(dates) > 0: + run_stats.append((max(dates), len(dates))) ## log this indicator run logging(start_time, run_stats, logger) diff --git a/nwss_wastewater/tests/test_pull.py b/nwss_wastewater/tests/test_pull.py index 8a2edbd23..431878993 100644 --- a/nwss_wastewater/tests/test_pull.py +++ b/nwss_wastewater/tests/test_pull.py @@ -10,9 +10,11 @@ import pandas.api.types as ptypes from delphi_nwss.pull import ( + add_identifier_columns, + check_endpoints, construct_typedicts, sig_digit_round, - add_population, + reformat, warn_string, ) import numpy as np @@ -111,6 +113,15 @@ def test_column_conversions_metric(): assert all(ptypes.is_numeric_dtype(converted[flo].dtype) for flo in float_typed) +def test_warn_string(): + type_dict, type_dict_metric = construct_typedicts() + df_conc = pd.read_csv("test_data/conc_data.csv") + assert ( + warn_string(df_conc, type_dict) + == "\nExpected column(s) missed, The dataset schema may\nhave changed. Please investigate and amend the code.\n\nColumns needed:\npcr_conc_smoothed\ntimestamp\n\nColumns available:\nUnnamed: 0\nkey_plot_id\ndate\npcr_conc_smoothed\nnormalization\n" + ) + + def test_formatting(): type_dict, type_dict_metric = construct_typedicts() df_metric = pd.read_csv("test_data/metric_data.csv", index_col=0) @@ -132,6 +143,28 @@ def test_formatting(): "pcr_conc_smoothed", "normalization", "population_served", + "detect_prop_15d", + "percentile", + "ptc_15d", ] ) ) + + +def test_identifier_colnames(): + test_df = pd.read_csv("test_data/conc_data.csv", index_col=0) + add_identifier_columns(test_df) + assert all(test_df.state.unique() == ["ak", "tn"]) + assert all(test_df.provider.unique() == ["CDC_BIOBOT", "WWS"]) + # the only cases where the signal name is wrong is when normalization isn't defined + assert all( + (test_df.signal_name == test_df.provider + "_" + test_df.normalization) + | (test_df.normalization.isna()) + ) + assert all( + ( + test_df.signal_name.unique() + == ["CDC_BIOBOT_flow-population", np.nan, "WWS_microbial"] + ) + | (pd.isna(test_df.signal_name.unique())) + ) diff --git a/nwss_wastewater/tests/test_run.py b/nwss_wastewater/tests/test_run.py index 218e1f8d0..4270ad87d 100644 --- a/nwss_wastewater/tests/test_run.py +++ b/nwss_wastewater/tests/test_run.py @@ -13,6 +13,7 @@ from delphi_nwss.constants import GEOS, SIGNALS from delphi_nwss.run import ( + add_needed_columns, generate_weights, sum_all_nan, weighted_state_sum, @@ -57,28 +58,22 @@ def test_weight_generation(): def test_weighted_state_sum(): dataFrame = pd.DataFrame( { - "state": [ - "al", - "al", - "ca", - "ca", - "nd", - ], - "timestamp": np.zeros(5), - "a": [1, 2, 3, 4, 12], - "b": [5, 6, 7, np.nan, np.nan], - "population_served": [10, 5, 8, 1, 3], + "state": ["al", "al", "ca", "ca", "nd", "me", "me"], + "timestamp": np.zeros(7), + "a": [1, 2, 3, 4, 12, -2, 2], + "b": [5, 6, 7, np.nan, np.nan, -1, -2], + "population_served": [10, 5, 8, 1, 3, 1, 2], } ) weighted = generate_weights(dataFrame, column_aggregating="b") agg = weighted_state_sum(weighted, "state", "b") expected_agg = pd.DataFrame( { - "timestamp": np.zeros(3), - "geo_id": ["al", "ca", "nd"], - "relevant_pop_b": [10 + 5, 8 + 0, 0], - "weighted_b": [5 * 10 + 6 * 5, 7 * 8 + 0, np.nan], - "val": [80 / 15, 56 / 8, np.nan], + "timestamp": np.zeros(4), + "geo_id": ["al", "ca", "me", "nd"], + "relevant_pop_b": [10 + 5, 8 + 0, 1 + 2, 0], + "weighted_b": [5 * 10 + 6 * 5, 7 * 8 + 0, 1 * -1 + -2 * 2, np.nan], + "val": [80 / 15, 56 / 8, -5 / 3, np.nan], } ) assert_frame_equal(agg, expected_agg) @@ -87,11 +82,11 @@ def test_weighted_state_sum(): agg_a = weighted_state_sum(weighted, "state", "a") expected_agg_a = pd.DataFrame( { - "timestamp": np.zeros(3), - "geo_id": ["al", "ca", "nd"], - "relevant_pop_a": [10 + 5, 8 + 1, 3], - "weighted_a": [1 * 10 + 2 * 5, 3 * 8 + 1 * 4, 12 * 3], - "val": [20 / 15, 28 / 9, 36 / 3], + "timestamp": np.zeros(4), + "geo_id": ["al", "ca", "me", "nd"], + "relevant_pop_a": [10 + 5, 8 + 1, 1 + 2, 3], + "weighted_a": [1 * 10 + 2 * 5, 3 * 8 + 1 * 4, -2 * 1 + 2 * 2, 12 * 3], + "val": [20 / 15, 28 / 9, (-2 * 1 + 2 * 2) / 3, 36 / 3], } ) assert_frame_equal(agg_a, expected_agg_a) @@ -125,3 +120,21 @@ def test_weighted_nation_sum(): } ) assert_frame_equal(agg, expected_agg) + + +def test_adding_cols(): + df = pd.DataFrame({"val": [0.0, np.nan], "timestamp": np.zeros(2)}) + modified = add_needed_columns(df) + modified + expected_df = pd.DataFrame( + { + "val": [0.0, np.nan], + "timestamp": np.zeros(2), + "se": [np.nan, np.nan], + "sample_size": [np.nan, np.nan], + "missing_val": [0, 5], + "missing_se": [1, 1], + "missing_sample_size": [1, 1], + } + ) + assert_frame_equal(modified, expected_df) From 38713cb411ab8f7574e6c8e083ca86c7c53463da Mon Sep 17 00:00:00 2001 From: dsweber2 Date: Thu, 14 Mar 2024 13:20:30 -0700 Subject: [PATCH 02/18] formatting and date is `date_end`, not `date_start` --- nwss_wastewater/delphi_nwss/pull.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/nwss_wastewater/delphi_nwss/pull.py b/nwss_wastewater/delphi_nwss/pull.py index affe40a65..1f2386612 100644 --- a/nwss_wastewater/delphi_nwss/pull.py +++ b/nwss_wastewater/delphi_nwss/pull.py @@ -29,7 +29,7 @@ def sig_digit_round(value, n_digits): sign_mask = value < 0 value[sign_mask] *= -1 exponent = np.ceil(np.log10(value)) - result = 10 ** exponent * np.round(value * 10 ** (-exponent), n_digits) + result = 10**exponent * np.round(value * 10 ** (-exponent), n_digits) result[sign_mask] *= -1 result[zero_mask] = in_value[zero_mask] return result @@ -71,7 +71,7 @@ def reformat(df, df_metric): :, ["key_plot_id", "date_start", "population_served", *METRIC_SIGNALS] ] # get matching keys - df_metric_core = df_metric_core.rename(columns={"date_start": "timestamp"}) + df_metric_core = df_metric_core.rename(columns={"date_end": "timestamp"}) df_metric_core = df_metric_core.set_index(["key_plot_id", "timestamp"]) df = df.set_index(["key_plot_id", "timestamp"]) @@ -148,8 +148,8 @@ def pull_nwss_data(token: str): # Pull data from Socrata API client = Socrata("data.cdc.gov", token) - results_concentration = client.get("g653-rqe2", limit=10 ** 10) - results_metric = client.get("2ew6-ywp6", limit=10 ** 10) + results_concentration = client.get("g653-rqe2", limit=10**10) + results_metric = client.get("2ew6-ywp6", limit=10**10) df_metric = pd.DataFrame.from_records(results_metric) df_concentration = pd.DataFrame.from_records(results_concentration) df_concentration = df_concentration.rename(columns={"date": "timestamp"}) From 2c96adc4c34b737fd8a218e51cc009cd229ec33c Mon Sep 17 00:00:00 2001 From: dsweber2 Date: Fri, 29 Mar 2024 11:01:20 -0700 Subject: [PATCH 03/18] formatting and date_start -> date_end --- nwss_wastewater/delphi_nwss/pull.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/nwss_wastewater/delphi_nwss/pull.py b/nwss_wastewater/delphi_nwss/pull.py index 1f2386612..4bc4aba23 100644 --- a/nwss_wastewater/delphi_nwss/pull.py +++ b/nwss_wastewater/delphi_nwss/pull.py @@ -29,7 +29,7 @@ def sig_digit_round(value, n_digits): sign_mask = value < 0 value[sign_mask] *= -1 exponent = np.ceil(np.log10(value)) - result = 10**exponent * np.round(value * 10 ** (-exponent), n_digits) + result = 10 ** exponent * np.round(value * 10 ** (-exponent), n_digits) result[sign_mask] *= -1 result[zero_mask] = in_value[zero_mask] return result @@ -68,7 +68,7 @@ def reformat(df, df_metric): """ # drop unused columns from df_metric df_metric_core = df_metric.loc[ - :, ["key_plot_id", "date_start", "population_served", *METRIC_SIGNALS] + :, ["key_plot_id", "date_end", "population_served", *METRIC_SIGNALS] ] # get matching keys df_metric_core = df_metric_core.rename(columns={"date_end": "timestamp"}) @@ -148,8 +148,8 @@ def pull_nwss_data(token: str): # Pull data from Socrata API client = Socrata("data.cdc.gov", token) - results_concentration = client.get("g653-rqe2", limit=10**10) - results_metric = client.get("2ew6-ywp6", limit=10**10) + results_concentration = client.get("g653-rqe2", limit=10 ** 10) + results_metric = client.get("2ew6-ywp6", limit=10 ** 10) df_metric = pd.DataFrame.from_records(results_metric) df_concentration = pd.DataFrame.from_records(results_concentration) df_concentration = df_concentration.rename(columns={"date": "timestamp"}) From 6f683094ae651072de5027701000cd172f149dda Mon Sep 17 00:00:00 2001 From: dsweber2 Date: Fri, 29 Mar 2024 12:51:35 -0700 Subject: [PATCH 04/18] minor fixes to tests --- nwss_wastewater/tests/test_pull.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nwss_wastewater/tests/test_pull.py b/nwss_wastewater/tests/test_pull.py index 431878993..5ed60b504 100644 --- a/nwss_wastewater/tests/test_pull.py +++ b/nwss_wastewater/tests/test_pull.py @@ -118,7 +118,7 @@ def test_warn_string(): df_conc = pd.read_csv("test_data/conc_data.csv") assert ( warn_string(df_conc, type_dict) - == "\nExpected column(s) missed, The dataset schema may\nhave changed. Please investigate and amend the code.\n\nColumns needed:\npcr_conc_smoothed\ntimestamp\n\nColumns available:\nUnnamed: 0\nkey_plot_id\ndate\npcr_conc_smoothed\nnormalization\n" + == "\nExpected column(s) missed, The dataset schema may\nhave changed. Please investigate and amend the code.\n\nColumns needed:\npcr_conc_smoothed\ntimestamp\n\nColumns available:\nUnnamed: 0\ndate\nkey_plot_id\nnormalization\npcr_conc_smoothed\n" ) @@ -132,7 +132,7 @@ def test_formatting(): df = df.rename(columns={"date": "timestamp"}) df = df.astype(type_dict) - df_formatted = add_population(df, df_metric) + df_formatted = reformat(df, df_metric) assert all( df_formatted.columns From c7e300e6f5f13860f64f540cb0be7da95d79f835 Mon Sep 17 00:00:00 2001 From: Dmitry Shemetov Date: Fri, 19 Apr 2024 18:14:47 -0700 Subject: [PATCH 05/18] lint: minor code clean to nwss --- nwss_wastewater/delphi_nwss/constants.py | 31 ++++--- nwss_wastewater/delphi_nwss/pull.py | 110 +++++++++-------------- nwss_wastewater/delphi_nwss/run.py | 5 +- nwss_wastewater/tests/test_pull.py | 54 ++--------- nwss_wastewater/tests/test_run.py | 18 +--- 5 files changed, 68 insertions(+), 150 deletions(-) diff --git a/nwss_wastewater/delphi_nwss/constants.py b/nwss_wastewater/delphi_nwss/constants.py index 99af24d5f..b04206d14 100644 --- a/nwss_wastewater/delphi_nwss/constants.py +++ b/nwss_wastewater/delphi_nwss/constants.py @@ -22,18 +22,23 @@ "microbial", ], } -METRIC_DATES = ["date_start", "date_end"] -SAMPLE_SITE_NAMES = { - "wwtp_jurisdiction": "category", - "wwtp_id": int, - "reporting_jurisdiction": "category", - "sample_location": "category", - "county_names": "category", - "county_fips": "category", - "population_served": float, - "sampling_prior": bool, - "sample_location_specify": float, -} SIG_DIGITS = 4 -NEWLINE = "\n" +TYPE_DICT = {key: float for key in SIGNALS} +TYPE_DICT.update({"timestamp": "datetime64[ns]"}) +TYPE_DICT_METRIC = {key: float for key in METRIC_SIGNALS} +TYPE_DICT_METRIC.update({key: "datetime64[ns]" for key in ["date_start", "date_end"]}) +# Sample site names +TYPE_DICT_METRIC.update( + { + "wwtp_jurisdiction": "category", + "wwtp_id": int, + "reporting_jurisdiction": "category", + "sample_location": "category", + "county_names": "category", + "county_fips": "category", + "population_served": float, + "sampling_prior": bool, + "sample_location_specify": float, + } +) diff --git a/nwss_wastewater/delphi_nwss/pull.py b/nwss_wastewater/delphi_nwss/pull.py index 4bc4aba23..53ade5829 100644 --- a/nwss_wastewater/delphi_nwss/pull.py +++ b/nwss_wastewater/delphi_nwss/pull.py @@ -9,10 +9,9 @@ SIGNALS, PROVIDER_NORMS, METRIC_SIGNALS, - METRIC_DATES, - SAMPLE_SITE_NAMES, SIG_DIGITS, - NEWLINE, + TYPE_DICT, + TYPE_DICT_METRIC, ) @@ -29,40 +28,14 @@ def sig_digit_round(value, n_digits): sign_mask = value < 0 value[sign_mask] *= -1 exponent = np.ceil(np.log10(value)) - result = 10 ** exponent * np.round(value * 10 ** (-exponent), n_digits) + result = 10**exponent * np.round(value * 10 ** (-exponent), n_digits) result[sign_mask] *= -1 result[zero_mask] = in_value[zero_mask] return result -def construct_typedicts(): - """Create the type conversion dictionary for both dataframes.""" - # basic type conversion - type_dict = {key: float for key in SIGNALS} - type_dict["timestamp"] = "datetime64[ns]" - # metric type conversion - signals_dict_metric = {key: float for key in METRIC_SIGNALS} - metric_dates_dict = {key: "datetime64[ns]" for key in METRIC_DATES} - type_dict_metric = {**metric_dates_dict, **signals_dict_metric, **SAMPLE_SITE_NAMES} - return type_dict, type_dict_metric - - -def warn_string(df, type_dict): - """Format the warning string.""" - return f""" -Expected column(s) missed, The dataset schema may -have changed. Please investigate and amend the code. - -Columns needed: -{NEWLINE.join(sorted(type_dict.keys()))} - -Columns available: -{NEWLINE.join(sorted(df.columns))} -""" - - def reformat(df, df_metric): - """Add columns from df_metric to df, and rename some columns. + """Add columns from df_metric to df, and rename some columns. Specifically the population and METRIC_SIGNAL columns, and renames date_start to timestamp. """ @@ -80,27 +53,16 @@ def reformat(df, df_metric): return df -def drop_unnormalized(df): - """Drop unnormalized. - - mutate `df` to no longer have rows where the normalization scheme isn't actually identified, - as we can't classify the kind of signal - """ - return df[~df["normalization"].isna()] - - def add_identifier_columns(df): """Add identifier columns. Add columns to get more detail than key_plot_id gives; specifically, state, and `provider_normalization`, which gives the signal identifier """ - df["state"] = df.key_plot_id.str.extract( - r"_(\w\w)_" - ) # a pair of alphanumerics surrounded by _ - df["provider"] = df.key_plot_id.str.extract( - r"(.*)_[a-z]{2}_" - ) # anything followed by state ^ + # a pair of alphanumerics surrounded by _ + df["state"] = df.key_plot_id.str.extract(r"_(\w\w)_") + # anything followed by state ^ + df["provider"] = df.key_plot_id.str.extract(r"(.*)_[a-z]{2}_") df["signal_name"] = df.provider + "_" + df.normalization @@ -120,7 +82,7 @@ def check_endpoints(df): ) -def pull_nwss_data(token: str): +def pull_nwss_data(token: str, logger): """Pull the latest NWSS Wastewater data, and conforms it into a dataset. The output dataset has: @@ -141,32 +103,39 @@ def pull_nwss_data(token: str): pd.DataFrame Dataframe as described above. """ - # Constants - keep_columns = [*SIGNALS, *METRIC_SIGNALS] - # concentration key types - type_dict, type_dict_metric = construct_typedicts() - # Pull data from Socrata API client = Socrata("data.cdc.gov", token) - results_concentration = client.get("g653-rqe2", limit=10 ** 10) - results_metric = client.get("2ew6-ywp6", limit=10 ** 10) + results_concentration = client.get("g653-rqe2", limit=10**10) + results_metric = client.get("2ew6-ywp6", limit=10**10) df_metric = pd.DataFrame.from_records(results_metric) df_concentration = pd.DataFrame.from_records(results_concentration) df_concentration = df_concentration.rename(columns={"date": "timestamp"}) + # Schema checks. try: - df_concentration = df_concentration.astype(type_dict) + df_concentration = df_concentration.astype(TYPE_DICT) except KeyError as exc: - raise ValueError(warn_string(df_concentration, type_dict)) from exc + raise KeyError( + f"Expected column(s) missed. Schema may have changed. expected={sorted(TYPE_DICT.keys())} received={sorted(df_concentration.columns)}" + ) from exc + + if new_columns := set(df_concentration.columns) - set(TYPE_DICT.keys()): + logger.info("New columns found in NWSS dataset.", new_columns=new_columns) try: - df_metric = df_metric.astype(type_dict_metric) + df_metric = df_metric.astype(TYPE_DICT_METRIC) except KeyError as exc: - raise ValueError(warn_string(df_metric, type_dict_metric)) from exc + raise KeyError( + f"Expected column(s) missed. Schema may have changed. expected={sorted(TYPE_DICT_METRIC.keys())} received={sorted(df_metric.columns)}" + ) from exc + + if new_columns := set(df_metric.columns) - set(TYPE_DICT_METRIC.keys()): + logger.info("New columns found in NWSS dataset.", new_columns=new_columns) - # if the normalization scheme isn't recorded, why is it even included as a sample site? - df = drop_unnormalized(df_concentration) - # pull 2 letter state labels out of the key_plot_id labels + # Drop sites without a normalization scheme. + df = df_concentration[~df_concentration["normalization"].isna()] + + # Pull 2 letter state labels out of the key_plot_id labels. add_identifier_columns(df) # move population and metric signals over to df @@ -180,13 +149,14 @@ def pull_nwss_data(token: str): # otherwise, best to assume some value rather than break the data) df.population_served = df.population_served.ffill() check_endpoints(df) - keep_columns.extend( - [ - "timestamp", - "state", - "population_served", - "normalization", - "provider", - ] - ) + + keep_columns = [ + *SIGNALS, + *METRIC_SIGNALS, + "timestamp", + "state", + "population_served", + "normalization", + "provider", + ] return df[keep_columns] diff --git a/nwss_wastewater/delphi_nwss/run.py b/nwss_wastewater/delphi_nwss/run.py index 5a21ec49f..3355bb254 100644 --- a/nwss_wastewater/delphi_nwss/run.py +++ b/nwss_wastewater/delphi_nwss/run.py @@ -21,6 +21,7 @@ - "bucket_name: str, name of S3 bucket to read/write - "cache_dir": str, directory of locally cached data """ + import time from datetime import datetime @@ -138,10 +139,10 @@ def run_module(params): run_stats = [] ## build the base version of the signal at the most detailed geo level you can get. ## compute stuff here or farm out to another function or file - df_pull = pull_nwss_data(socrata_token) + df_pull = pull_nwss_data(socrata_token, logger) ## aggregate # iterate over the providers and the normalizations that they specifically provide - for (provider, normalization) in zip( + for provider, normalization in zip( PROVIDER_NORMS["provider"], PROVIDER_NORMS["normalization"] ): # copy by only taking the relevant subsection diff --git a/nwss_wastewater/tests/test_pull.py b/nwss_wastewater/tests/test_pull.py index 5ed60b504..6b7bffa24 100644 --- a/nwss_wastewater/tests/test_pull.py +++ b/nwss_wastewater/tests/test_pull.py @@ -1,22 +1,12 @@ -from datetime import datetime, date -import json -from unittest.mock import patch -import tempfile -import os -import time -from datetime import datetime - import pandas as pd import pandas.api.types as ptypes from delphi_nwss.pull import ( add_identifier_columns, - check_endpoints, - construct_typedicts, sig_digit_round, reformat, - warn_string, ) +from delphi_nwss.constants import TYPE_DICT, TYPE_DICT_METRIC import numpy as np @@ -31,32 +21,10 @@ def test_sig_digit(): ).all() -def test_column_type_dicts(): - type_dict, type_dict_metric = construct_typedicts() - assert type_dict == {"pcr_conc_smoothed": float, "timestamp": "datetime64[ns]"} - assert type_dict_metric == { - "date_start": "datetime64[ns]", - "date_end": "datetime64[ns]", - "detect_prop_15d": float, - "percentile": float, - "ptc_15d": float, - "wwtp_jurisdiction": "category", - "wwtp_id": int, - "reporting_jurisdiction": "category", - "sample_location": "category", - "county_names": "category", - "county_fips": "category", - "population_served": float, - "sampling_prior": bool, - "sample_location_specify": float, - } - - def test_column_conversions_concentration(): - type_dict, type_dict_metric = construct_typedicts() df = pd.read_csv("test_data/conc_data.csv", index_col=0) df = df.rename(columns={"date": "timestamp"}) - converted = df.astype(type_dict) + converted = df.astype(TYPE_DICT) assert all( converted.columns == pd.Index(["key_plot_id", "timestamp", "pcr_conc_smoothed", "normalization"]) @@ -66,9 +34,8 @@ def test_column_conversions_concentration(): def test_column_conversions_metric(): - type_dict, type_dict_metric = construct_typedicts() df = pd.read_csv("test_data/metric_data.csv", index_col=0) - converted = df.astype(type_dict_metric) + converted = df.astype(TYPE_DICT_METRIC) assert all( converted.columns == pd.Index( @@ -113,24 +80,13 @@ def test_column_conversions_metric(): assert all(ptypes.is_numeric_dtype(converted[flo].dtype) for flo in float_typed) -def test_warn_string(): - type_dict, type_dict_metric = construct_typedicts() - df_conc = pd.read_csv("test_data/conc_data.csv") - assert ( - warn_string(df_conc, type_dict) - == "\nExpected column(s) missed, The dataset schema may\nhave changed. Please investigate and amend the code.\n\nColumns needed:\npcr_conc_smoothed\ntimestamp\n\nColumns available:\nUnnamed: 0\ndate\nkey_plot_id\nnormalization\npcr_conc_smoothed\n" - ) - - def test_formatting(): - type_dict, type_dict_metric = construct_typedicts() df_metric = pd.read_csv("test_data/metric_data.csv", index_col=0) - df_metric = df_metric.astype(type_dict_metric) + df_metric = df_metric.astype(TYPE_DICT_METRIC) - type_dict, type_dict_metric = construct_typedicts() df = pd.read_csv("test_data/conc_data.csv", index_col=0) df = df.rename(columns={"date": "timestamp"}) - df = df.astype(type_dict) + df = df.astype(TYPE_DICT) df_formatted = reformat(df, df_metric) diff --git a/nwss_wastewater/tests/test_run.py b/nwss_wastewater/tests/test_run.py index 4270ad87d..161d92556 100644 --- a/nwss_wastewater/tests/test_run.py +++ b/nwss_wastewater/tests/test_run.py @@ -1,17 +1,7 @@ -from datetime import datetime, date -import json -from unittest.mock import patch -import tempfile -import os -import time -from datetime import datetime - import numpy as np import pandas as pd from pandas.testing import assert_frame_equal -from delphi_utils import S3ArchiveDiffer, get_structured_logger, create_export_csv, Nans -from delphi_nwss.constants import GEOS, SIGNALS from delphi_nwss.run import ( add_needed_columns, generate_weights, @@ -23,13 +13,9 @@ def test_sum_all_nan(): """Check that sum_all_nan returns NaN iff everything is a NaN""" - no_nans = np.array([3, 5]) - assert sum_all_nan(no_nans) == 8 - partial_nan = np.array([np.nan, 3, 5]) + assert sum_all_nan(np.array([3, 5])) == 8 assert np.isclose(sum_all_nan([np.nan, 3, 5]), 8) - - oops_all_nans = np.array([np.nan, np.nan]) - assert np.isnan(oops_all_nans).all() + assert np.isnan(np.array([np.nan, np.nan])).all() def test_weight_generation(): From f9b068778b05af6d216c71b5b0766b82f90a1a84 Mon Sep 17 00:00:00 2001 From: Dmitry Shemetov Date: Tue, 23 Apr 2024 11:13:07 -0700 Subject: [PATCH 06/18] lint: remove NEWLINE constant, consistent style --- nchs_mortality/delphi_nchs_mortality/constants.py | 5 ----- nchs_mortality/delphi_nchs_mortality/pull.py | 6 +++--- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/nchs_mortality/delphi_nchs_mortality/constants.py b/nchs_mortality/delphi_nchs_mortality/constants.py index 800444e58..2bdd78419 100644 --- a/nchs_mortality/delphi_nchs_mortality/constants.py +++ b/nchs_mortality/delphi_nchs_mortality/constants.py @@ -25,8 +25,3 @@ "prop" ] INCIDENCE_BASE = 100000 - -# this is necessary as a delimiter in the f-string expressions we use to -# construct detailed error reports -# (https://www.python.org/dev/peps/pep-0498/#escape-sequences) -NEWLINE = "\n" diff --git a/nchs_mortality/delphi_nchs_mortality/pull.py b/nchs_mortality/delphi_nchs_mortality/pull.py index 18bbfd59a..5a96d9a1f 100644 --- a/nchs_mortality/delphi_nchs_mortality/pull.py +++ b/nchs_mortality/delphi_nchs_mortality/pull.py @@ -9,7 +9,7 @@ from delphi_utils.geomap import GeoMapper -from .constants import METRICS, RENAME, NEWLINE +from .constants import METRICS, RENAME def standardize_columns(df): """Rename columns to comply with a standard set. @@ -90,10 +90,10 @@ def pull_nchs_mortality_data(socrata_token: str, test_file: Optional[str] = None have changed. Please investigate and amend the code. Columns needed: -{NEWLINE.join(type_dict.keys())} +{'\n'.join(type_dict.keys())} Columns available: -{NEWLINE.join(df.columns)} +{'\n'.join(df.columns)} """) from exc df = df[keep_columns + ["timestamp", "state"]].set_index("timestamp") From d337b4e0408b21cf1d4920cb089c3686d27fcf2b Mon Sep 17 00:00:00 2001 From: dsweber2 Date: Tue, 23 Apr 2024 14:46:49 -0500 Subject: [PATCH 07/18] encapsulate repeated type conversion --- nwss_wastewater/delphi_nwss/pull.py | 41 ++++++++++++++++------------- 1 file changed, 22 insertions(+), 19 deletions(-) diff --git a/nwss_wastewater/delphi_nwss/pull.py b/nwss_wastewater/delphi_nwss/pull.py index 53ade5829..f6a3e71d3 100644 --- a/nwss_wastewater/delphi_nwss/pull.py +++ b/nwss_wastewater/delphi_nwss/pull.py @@ -34,6 +34,26 @@ def sig_digit_round(value, n_digits): return result +def convert_df_type(df, logger): + """convert types and warn if there are unexpected columns""" + try: + df = df.astype(TYPE_DICT) + except KeyError as exc: + raise KeyError( + f""" +Expected column(s) missed, The dataset schema may +have changed. Please investigate and amend the code. + +expected={NEWLINE.join(sorted(type_dict.keys()))} + +received={NEWLINE.join(sorted(df.columns))} +""" + ) from exc + if new_columns := set(df.columns) - set(TYPE_DICT.keys()): + logger.info("New columns found in NWSS dataset.", new_columns=new_columns) + return df + + def reformat(df, df_metric): """Add columns from df_metric to df, and rename some columns. @@ -112,25 +132,8 @@ def pull_nwss_data(token: str, logger): df_concentration = df_concentration.rename(columns={"date": "timestamp"}) # Schema checks. - try: - df_concentration = df_concentration.astype(TYPE_DICT) - except KeyError as exc: - raise KeyError( - f"Expected column(s) missed. Schema may have changed. expected={sorted(TYPE_DICT.keys())} received={sorted(df_concentration.columns)}" - ) from exc - - if new_columns := set(df_concentration.columns) - set(TYPE_DICT.keys()): - logger.info("New columns found in NWSS dataset.", new_columns=new_columns) - - try: - df_metric = df_metric.astype(TYPE_DICT_METRIC) - except KeyError as exc: - raise KeyError( - f"Expected column(s) missed. Schema may have changed. expected={sorted(TYPE_DICT_METRIC.keys())} received={sorted(df_metric.columns)}" - ) from exc - - if new_columns := set(df_metric.columns) - set(TYPE_DICT_METRIC.keys()): - logger.info("New columns found in NWSS dataset.", new_columns=new_columns) + df_concentration = convert_df_type(df_concentration, logger) + df_metric = convert_df_type(df_metric, logger) # Drop sites without a normalization scheme. df = df_concentration[~df_concentration["normalization"].isna()] From c968604d6adcc933f9e3b7ff94474223bee91f11 Mon Sep 17 00:00:00 2001 From: dsweber2 Date: Tue, 23 Apr 2024 14:52:51 -0500 Subject: [PATCH 08/18] happy linter --- nwss_wastewater/delphi_nwss/pull.py | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/nwss_wastewater/delphi_nwss/pull.py b/nwss_wastewater/delphi_nwss/pull.py index f6a3e71d3..8cebaa64a 100644 --- a/nwss_wastewater/delphi_nwss/pull.py +++ b/nwss_wastewater/delphi_nwss/pull.py @@ -28,28 +28,29 @@ def sig_digit_round(value, n_digits): sign_mask = value < 0 value[sign_mask] *= -1 exponent = np.ceil(np.log10(value)) - result = 10**exponent * np.round(value * 10 ** (-exponent), n_digits) + result = 10 ** exponent * np.round(value * 10 ** (-exponent), n_digits) result[sign_mask] *= -1 result[zero_mask] = in_value[zero_mask] return result -def convert_df_type(df, logger): - """convert types and warn if there are unexpected columns""" +def convert_df_type(df, type_dict, logger): + """Convert types and warn if there are unexpected columns.""" try: - df = df.astype(TYPE_DICT) + df = df.astype(type_dict) except KeyError as exc: + newline = "\n" raise KeyError( f""" Expected column(s) missed, The dataset schema may have changed. Please investigate and amend the code. -expected={NEWLINE.join(sorted(type_dict.keys()))} +expected={newline.join(sorted(type_dict.keys()))} -received={NEWLINE.join(sorted(df.columns))} +received={newline.join(sorted(df.columns))} """ ) from exc - if new_columns := set(df.columns) - set(TYPE_DICT.keys()): + if new_columns := set(df.columns) - set(type_dict.keys()): logger.info("New columns found in NWSS dataset.", new_columns=new_columns) return df @@ -125,15 +126,15 @@ def pull_nwss_data(token: str, logger): """ # Pull data from Socrata API client = Socrata("data.cdc.gov", token) - results_concentration = client.get("g653-rqe2", limit=10**10) - results_metric = client.get("2ew6-ywp6", limit=10**10) + results_concentration = client.get("g653-rqe2", limit=10 ** 10) + results_metric = client.get("2ew6-ywp6", limit=10 ** 10) df_metric = pd.DataFrame.from_records(results_metric) df_concentration = pd.DataFrame.from_records(results_concentration) df_concentration = df_concentration.rename(columns={"date": "timestamp"}) # Schema checks. - df_concentration = convert_df_type(df_concentration, logger) - df_metric = convert_df_type(df_metric, logger) + df_concentration = convert_df_type(df_concentration, TYPE_DICT, logger) + df_metric = convert_df_type(df_metric, TYPE_DICT_METRIC, logger) # Drop sites without a normalization scheme. df = df_concentration[~df_concentration["normalization"].isna()] From 55a821cfc8015448849db16a8abed44a7cabb81c Mon Sep 17 00:00:00 2001 From: Dmitry Shemetov Date: Tue, 23 Apr 2024 16:33:51 -0700 Subject: [PATCH 09/18] fix: newline shenanigans --- nchs_mortality/delphi_nchs_mortality/pull.py | 7 ++----- nwss_wastewater/delphi_nwss/pull.py | 6 ++---- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/nchs_mortality/delphi_nchs_mortality/pull.py b/nchs_mortality/delphi_nchs_mortality/pull.py index 5a96d9a1f..38fa95ffe 100644 --- a/nchs_mortality/delphi_nchs_mortality/pull.py +++ b/nchs_mortality/delphi_nchs_mortality/pull.py @@ -89,11 +89,8 @@ def pull_nchs_mortality_data(socrata_token: str, test_file: Optional[str] = None Expected column(s) missed, The dataset schema may have changed. Please investigate and amend the code. -Columns needed: -{'\n'.join(type_dict.keys())} - -Columns available: -{'\n'.join(df.columns)} +received={''.join(type_dict.keys())} +expected={''.join(df.columns)} """) from exc df = df[keep_columns + ["timestamp", "state"]].set_index("timestamp") diff --git a/nwss_wastewater/delphi_nwss/pull.py b/nwss_wastewater/delphi_nwss/pull.py index 8cebaa64a..edaa8f948 100644 --- a/nwss_wastewater/delphi_nwss/pull.py +++ b/nwss_wastewater/delphi_nwss/pull.py @@ -39,15 +39,13 @@ def convert_df_type(df, type_dict, logger): try: df = df.astype(type_dict) except KeyError as exc: - newline = "\n" raise KeyError( f""" Expected column(s) missed, The dataset schema may have changed. Please investigate and amend the code. -expected={newline.join(sorted(type_dict.keys()))} - -received={newline.join(sorted(df.columns))} +expected={''.join(sorted(type_dict.keys()))} +received={''.join(sorted(df.columns))} """ ) from exc if new_columns := set(df.columns) - set(type_dict.keys()): From 61277b290081712643b8f45daf6603b209842823 Mon Sep 17 00:00:00 2001 From: Dmitry Shemetov Date: Tue, 23 Apr 2024 16:35:13 -0700 Subject: [PATCH 10/18] fix: jank --- nchs_mortality/delphi_nchs_mortality/pull.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nchs_mortality/delphi_nchs_mortality/pull.py b/nchs_mortality/delphi_nchs_mortality/pull.py index 38fa95ffe..f22a73904 100644 --- a/nchs_mortality/delphi_nchs_mortality/pull.py +++ b/nchs_mortality/delphi_nchs_mortality/pull.py @@ -89,8 +89,8 @@ def pull_nchs_mortality_data(socrata_token: str, test_file: Optional[str] = None Expected column(s) missed, The dataset schema may have changed. Please investigate and amend the code. -received={''.join(type_dict.keys())} -expected={''.join(df.columns)} +expected={''.join(type_dict.keys())} +received={''.join(df.columns)} """) from exc df = df[keep_columns + ["timestamp", "state"]].set_index("timestamp") From 402f2ab552d76ad6a13507f7e6cb445965ee7d1a Mon Sep 17 00:00:00 2001 From: dsweber2 Date: Tue, 14 May 2024 12:00:04 -0500 Subject: [PATCH 11/18] moving over to geomapper's generic function --- _delphi_utils_python/DEVELOP.md | 2 + nwss_wastewater/delphi_nwss/run.py | 74 +++++--------------- nwss_wastewater/tests/test_run.py | 105 +---------------------------- 3 files changed, 21 insertions(+), 160 deletions(-) diff --git a/_delphi_utils_python/DEVELOP.md b/_delphi_utils_python/DEVELOP.md index 2407e29a8..20d41166c 100644 --- a/_delphi_utils_python/DEVELOP.md +++ b/_delphi_utils_python/DEVELOP.md @@ -54,3 +54,5 @@ When you are finished, the virtual environment can be deactivated and deactivate rm -r env ``` +## Releasing the module +If you have made enough changes that it warrants updating [the PyPi project](https://pypi.org/project/delphi-utils/), currently this is done as part of merging from `main` to `prod`. diff --git a/nwss_wastewater/delphi_nwss/run.py b/nwss_wastewater/delphi_nwss/run.py index 3355bb254..7c048b140 100644 --- a/nwss_wastewater/delphi_nwss/run.py +++ b/nwss_wastewater/delphi_nwss/run.py @@ -26,62 +26,18 @@ from datetime import datetime import numpy as np -import pandas as pd -from delphi_utils import S3ArchiveDiffer, get_structured_logger, create_export_csv +from delphi_utils import ( + GeoMapper, + S3ArchiveDiffer, + get_structured_logger, + create_export_csv, +) from delphi_utils.nancodes import add_default_nancodes from .constants import GEOS, METRIC_SIGNALS, PROVIDER_NORMS, SIGNALS from .pull import pull_nwss_data -def sum_all_nan(x): - """Return a normal sum unless everything is NaN, then return that.""" - all_nan = np.isnan(x).all() - if all_nan: - return np.nan - return np.nansum(x) - - -def generate_weights(df, column_aggregating="pcr_conc_smoothed"): - """ - Weigh column_aggregating by population. - - generate the relevant population amounts, and create a weighted but - unnormalized column, derived from `column_aggregating` - """ - # set the weight of places with na's to zero - df[f"relevant_pop_{column_aggregating}"] = ( - df["population_served"] * np.abs(df[column_aggregating]).notna() - ) - # generate the weighted version - df[f"weighted_{column_aggregating}"] = ( - df[column_aggregating] * df[f"relevant_pop_{column_aggregating}"] - ) - return df - - -def weighted_state_sum(df: pd.DataFrame, geo: str, sensor: str): - """Sum sensor, weighted by population for non NA's, grouped by state.""" - agg_df = df.groupby(["timestamp", geo]).agg( - {f"relevant_pop_{sensor}": "sum", f"weighted_{sensor}": sum_all_nan} - ) - agg_df["val"] = agg_df[f"weighted_{sensor}"] / agg_df[f"relevant_pop_{sensor}"] - agg_df = agg_df.reset_index() - agg_df = agg_df.rename(columns={"state": "geo_id"}) - return agg_df - - -def weighted_nation_sum(df: pd.DataFrame, sensor: str): - """Sum sensor, weighted by population for non NA's.""" - agg_df = df.groupby("timestamp").agg( - {f"relevant_pop_{sensor}": "sum", f"weighted_{sensor}": sum_all_nan} - ) - agg_df["val"] = agg_df[f"weighted_{sensor}"] / agg_df[f"relevant_pop_{sensor}"] - agg_df = agg_df.reset_index() - agg_df["geo_id"] = "us" - return agg_df - - def add_needed_columns(df, col_names=None): """Short util to add expected columns not found in the dataset.""" if col_names is None: @@ -140,7 +96,7 @@ def run_module(params): ## build the base version of the signal at the most detailed geo level you can get. ## compute stuff here or farm out to another function or file df_pull = pull_nwss_data(socrata_token, logger) - ## aggregate + geomapper = GeoMapper() # iterate over the providers and the normalizations that they specifically provide for provider, normalization in zip( PROVIDER_NORMS["provider"], PROVIDER_NORMS["normalization"] @@ -153,16 +109,22 @@ def run_module(params): for sensor in [*SIGNALS, *METRIC_SIGNALS]: full_sensor_name = sensor + "_" + provider + "_" + normalization df_prov_norm = df_prov_norm.rename(columns={sensor: full_sensor_name}) - # add weighed column - df = generate_weights(df_prov_norm, full_sensor_name) for geo in GEOS: logger.info( "Generating signal and exporting to CSV", metric=full_sensor_name ) if geo == "nation": - agg_df = weighted_nation_sum(df, full_sensor_name) - else: - agg_df = weighted_state_sum(df, geo, full_sensor_name) + df_prov_norm["nation"] = "us" + agg_df = geomapper.aggregate_by_weighted_sum( + df_prov_norm, + geo, + full_sensor_name, + "timestamp", + "population_served", + ) + agg_df = agg_df.rename( + columns={geo: "geo_id", f"weighted_{full_sensor_name}": "val"} + ) # add se, sample_size, and na codes agg_df = add_needed_columns(agg_df) # actual export diff --git a/nwss_wastewater/tests/test_run.py b/nwss_wastewater/tests/test_run.py index 161d92556..dc5740140 100644 --- a/nwss_wastewater/tests/test_run.py +++ b/nwss_wastewater/tests/test_run.py @@ -2,110 +2,7 @@ import pandas as pd from pandas.testing import assert_frame_equal -from delphi_nwss.run import ( - add_needed_columns, - generate_weights, - sum_all_nan, - weighted_state_sum, - weighted_nation_sum, -) - - -def test_sum_all_nan(): - """Check that sum_all_nan returns NaN iff everything is a NaN""" - assert sum_all_nan(np.array([3, 5])) == 8 - assert np.isclose(sum_all_nan([np.nan, 3, 5]), 8) - assert np.isnan(np.array([np.nan, np.nan])).all() - - -def test_weight_generation(): - dataFrame = pd.DataFrame( - { - "a": [1, 2, 3, 4, np.nan], - "b": [5, 6, 7, 8, 9], - "population_served": [10, 5, 8, 1, 3], - } - ) - weighted = generate_weights(dataFrame, column_aggregating="a") - weighted_by_hand = pd.DataFrame( - { - "a": [1, 2, 3, 4, np.nan], - "b": [5, 6, 7, 8, 9], - "population_served": [10, 5, 8, 1, 3], - "relevant_pop_a": [10, 5, 8, 1, 0], - "weighted_a": [10.0, 2 * 5.0, 3 * 8, 4.0 * 1, np.nan * 0], - } - ) - assert_frame_equal(weighted, weighted_by_hand) - # operations are in-place - assert_frame_equal(weighted, dataFrame) - - -def test_weighted_state_sum(): - dataFrame = pd.DataFrame( - { - "state": ["al", "al", "ca", "ca", "nd", "me", "me"], - "timestamp": np.zeros(7), - "a": [1, 2, 3, 4, 12, -2, 2], - "b": [5, 6, 7, np.nan, np.nan, -1, -2], - "population_served": [10, 5, 8, 1, 3, 1, 2], - } - ) - weighted = generate_weights(dataFrame, column_aggregating="b") - agg = weighted_state_sum(weighted, "state", "b") - expected_agg = pd.DataFrame( - { - "timestamp": np.zeros(4), - "geo_id": ["al", "ca", "me", "nd"], - "relevant_pop_b": [10 + 5, 8 + 0, 1 + 2, 0], - "weighted_b": [5 * 10 + 6 * 5, 7 * 8 + 0, 1 * -1 + -2 * 2, np.nan], - "val": [80 / 15, 56 / 8, -5 / 3, np.nan], - } - ) - assert_frame_equal(agg, expected_agg) - - weighted = generate_weights(dataFrame, column_aggregating="a") - agg_a = weighted_state_sum(weighted, "state", "a") - expected_agg_a = pd.DataFrame( - { - "timestamp": np.zeros(4), - "geo_id": ["al", "ca", "me", "nd"], - "relevant_pop_a": [10 + 5, 8 + 1, 1 + 2, 3], - "weighted_a": [1 * 10 + 2 * 5, 3 * 8 + 1 * 4, -2 * 1 + 2 * 2, 12 * 3], - "val": [20 / 15, 28 / 9, (-2 * 1 + 2 * 2) / 3, 36 / 3], - } - ) - assert_frame_equal(agg_a, expected_agg_a) - - -def test_weighted_nation_sum(): - dataFrame = pd.DataFrame( - { - "state": [ - "al", - "al", - "ca", - "ca", - "nd", - ], - "timestamp": np.hstack((np.zeros(3), np.ones(2))), - "a": [1, 2, 3, 4, 12], - "b": [5, 6, 7, np.nan, np.nan], - "population_served": [10, 5, 8, 1, 3], - } - ) - weighted = generate_weights(dataFrame, column_aggregating="a") - agg = weighted_nation_sum(weighted, "a") - expected_agg = pd.DataFrame( - { - "timestamp": [0.0, 1], - "relevant_pop_a": [10 + 5 + 8, 1 + 3], - "weighted_a": [1 * 10 + 2 * 5 + 3 * 8, 1 * 4 + 3 * 12], - "val": [44 / 23, 40 / 4], - "geo_id": ["us", "us"], - } - ) - assert_frame_equal(agg, expected_agg) +from delphi_nwss.run import add_needed_columns def test_adding_cols(): From e82e9fdb93ecc8e53781f20f11e580d3d6c7e8fa Mon Sep 17 00:00:00 2001 From: Dmitry Shemetov Date: Wed, 5 Jun 2024 12:13:27 -0700 Subject: [PATCH 12/18] lint: apply darker format --- nwss_wastewater/delphi_nwss/pull.py | 23 +++++-------------- nwss_wastewater/delphi_nwss/run.py | 35 +++++++---------------------- 2 files changed, 14 insertions(+), 44 deletions(-) diff --git a/nwss_wastewater/delphi_nwss/pull.py b/nwss_wastewater/delphi_nwss/pull.py index edaa8f948..83fc2522c 100644 --- a/nwss_wastewater/delphi_nwss/pull.py +++ b/nwss_wastewater/delphi_nwss/pull.py @@ -5,14 +5,7 @@ import pandas as pd from sodapy import Socrata -from .constants import ( - SIGNALS, - PROVIDER_NORMS, - METRIC_SIGNALS, - SIG_DIGITS, - TYPE_DICT, - TYPE_DICT_METRIC, -) +from .constants import METRIC_SIGNALS, PROVIDER_NORMS, SIG_DIGITS, SIGNALS, TYPE_DICT, TYPE_DICT_METRIC def sig_digit_round(value, n_digits): @@ -28,7 +21,7 @@ def sig_digit_round(value, n_digits): sign_mask = value < 0 value[sign_mask] *= -1 exponent = np.ceil(np.log10(value)) - result = 10 ** exponent * np.round(value * 10 ** (-exponent), n_digits) + result = 10**exponent * np.round(value * 10 ** (-exponent), n_digits) result[sign_mask] *= -1 result[zero_mask] = in_value[zero_mask] return result @@ -59,9 +52,7 @@ def reformat(df, df_metric): Specifically the population and METRIC_SIGNAL columns, and renames date_start to timestamp. """ # drop unused columns from df_metric - df_metric_core = df_metric.loc[ - :, ["key_plot_id", "date_end", "population_served", *METRIC_SIGNALS] - ] + df_metric_core = df_metric.loc[:, ["key_plot_id", "date_end", "population_served", *METRIC_SIGNALS]] # get matching keys df_metric_core = df_metric_core.rename(columns={"date_end": "timestamp"}) df_metric_core = df_metric_core.set_index(["key_plot_id", "timestamp"]) @@ -96,9 +87,7 @@ def check_endpoints(df): .reset_index(drop=True) ) if not unique_provider_norms.equals(pd.DataFrame(PROVIDER_NORMS)): - raise ValueError( - f"There are new providers and/or norms. They are\n{unique_provider_norms}" - ) + raise ValueError(f"There are new providers and/or norms. They are\n{unique_provider_norms}") def pull_nwss_data(token: str, logger): @@ -124,8 +113,8 @@ def pull_nwss_data(token: str, logger): """ # Pull data from Socrata API client = Socrata("data.cdc.gov", token) - results_concentration = client.get("g653-rqe2", limit=10 ** 10) - results_metric = client.get("2ew6-ywp6", limit=10 ** 10) + results_concentration = client.get("g653-rqe2", limit=10**10) + results_metric = client.get("2ew6-ywp6", limit=10**10) df_metric = pd.DataFrame.from_records(results_metric) df_concentration = pd.DataFrame.from_records(results_concentration) df_concentration = df_concentration.rename(columns={"date": "timestamp"}) diff --git a/nwss_wastewater/delphi_nwss/run.py b/nwss_wastewater/delphi_nwss/run.py index 7c048b140..3dc88f0b6 100644 --- a/nwss_wastewater/delphi_nwss/run.py +++ b/nwss_wastewater/delphi_nwss/run.py @@ -26,12 +26,7 @@ from datetime import datetime import numpy as np -from delphi_utils import ( - GeoMapper, - S3ArchiveDiffer, - get_structured_logger, - create_export_csv, -) +from delphi_utils import GeoMapper, S3ArchiveDiffer, create_export_csv, get_structured_logger from delphi_utils.nancodes import add_default_nancodes from .constants import GEOS, METRIC_SIGNALS, PROVIDER_NORMS, SIGNALS @@ -98,21 +93,15 @@ def run_module(params): df_pull = pull_nwss_data(socrata_token, logger) geomapper = GeoMapper() # iterate over the providers and the normalizations that they specifically provide - for provider, normalization in zip( - PROVIDER_NORMS["provider"], PROVIDER_NORMS["normalization"] - ): + for provider, normalization in zip(PROVIDER_NORMS["provider"], PROVIDER_NORMS["normalization"]): # copy by only taking the relevant subsection - df_prov_norm = df_pull[ - (df_pull.provider == provider) & (df_pull.normalization == normalization) - ] + df_prov_norm = df_pull[(df_pull.provider == provider) & (df_pull.normalization == normalization)] df_prov_norm = df_prov_norm.drop(["provider", "normalization"], axis=1) for sensor in [*SIGNALS, *METRIC_SIGNALS]: full_sensor_name = sensor + "_" + provider + "_" + normalization df_prov_norm = df_prov_norm.rename(columns={sensor: full_sensor_name}) for geo in GEOS: - logger.info( - "Generating signal and exporting to CSV", metric=full_sensor_name - ) + logger.info("Generating signal and exporting to CSV", metric=full_sensor_name) if geo == "nation": df_prov_norm["nation"] = "us" agg_df = geomapper.aggregate_by_weighted_sum( @@ -122,25 +111,17 @@ def run_module(params): "timestamp", "population_served", ) - agg_df = agg_df.rename( - columns={geo: "geo_id", f"weighted_{full_sensor_name}": "val"} - ) + agg_df = agg_df.rename(columns={geo: "geo_id", f"weighted_{full_sensor_name}": "val"}) # add se, sample_size, and na codes agg_df = add_needed_columns(agg_df) # actual export - dates = create_export_csv( - agg_df, geo_res=geo, export_dir=export_dir, sensor=full_sensor_name - ) + dates = create_export_csv(agg_df, geo_res=geo, export_dir=export_dir, sensor=full_sensor_name) if "archive" in params: _, common_diffs, new_files = arch_diff.diff_exports() - to_archive = [ - f for f, diff in common_diffs.items() if diff is not None - ] + to_archive = [f for f, diff in common_diffs.items() if diff is not None] to_archive += new_files _, fails = arch_diff.archive_exports(to_archive) - succ_common_diffs = { - f: diff for f, diff in common_diffs.items() if f not in fails - } + succ_common_diffs = {f: diff for f, diff in common_diffs.items() if f not in fails} arch_diff.filter_exports(succ_common_diffs) if len(dates) > 0: run_stats.append((max(dates), len(dates))) From 9c546ec059b448bba15b6bd57072202327937688 Mon Sep 17 00:00:00 2001 From: Dmitry Shemetov Date: Wed, 5 Jun 2024 12:17:05 -0700 Subject: [PATCH 13/18] lint: apply darker format --- nchs_mortality/delphi_nchs_mortality/pull.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/nchs_mortality/delphi_nchs_mortality/pull.py b/nchs_mortality/delphi_nchs_mortality/pull.py index f22a73904..aef964168 100644 --- a/nchs_mortality/delphi_nchs_mortality/pull.py +++ b/nchs_mortality/delphi_nchs_mortality/pull.py @@ -5,12 +5,12 @@ import numpy as np import pandas as pd -from sodapy import Socrata - from delphi_utils.geomap import GeoMapper +from sodapy import Socrata from .constants import METRICS, RENAME + def standardize_columns(df): """Rename columns to comply with a standard set. @@ -85,13 +85,15 @@ def pull_nchs_mortality_data(socrata_token: str, test_file: Optional[str] = None try: df = df.astype(type_dict) except KeyError as exc: - raise ValueError(f""" + raise ValueError( + f""" Expected column(s) missed, The dataset schema may have changed. Please investigate and amend the code. expected={''.join(type_dict.keys())} received={''.join(df.columns)} -""") from exc +""" + ) from exc df = df[keep_columns + ["timestamp", "state"]].set_index("timestamp") From 19fe0554b675385a260350e1ef1c7f1a21157e49 Mon Sep 17 00:00:00 2001 From: dsweber2 Date: Fri, 7 Jun 2024 17:41:45 -0500 Subject: [PATCH 14/18] Nat's suggestions --- nwss_wastewater/delphi_nwss/pull.py | 24 +++++++------ nwss_wastewater/delphi_nwss/run.py | 56 ++++++++++++++--------------- nwss_wastewater/tests/test_pull.py | 2 +- 3 files changed, 40 insertions(+), 42 deletions(-) diff --git a/nwss_wastewater/delphi_nwss/pull.py b/nwss_wastewater/delphi_nwss/pull.py index 83fc2522c..a6a291e05 100644 --- a/nwss_wastewater/delphi_nwss/pull.py +++ b/nwss_wastewater/delphi_nwss/pull.py @@ -47,9 +47,9 @@ def convert_df_type(df, type_dict, logger): def reformat(df, df_metric): - """Add columns from df_metric to df, and rename some columns. + """Combine df_metric and df - Specifically the population and METRIC_SIGNAL columns, and renames date_start to timestamp. + Move population and METRIC_SIGNAL columns from df_metric to df, and rename date_start to timestamp. """ # drop unused columns from df_metric df_metric_core = df_metric.loc[:, ["key_plot_id", "date_end", "population_served", *METRIC_SIGNALS]] @@ -57,6 +57,7 @@ def reformat(df, df_metric): df_metric_core = df_metric_core.rename(columns={"date_end": "timestamp"}) df_metric_core = df_metric_core.set_index(["key_plot_id", "timestamp"]) df = df.set_index(["key_plot_id", "timestamp"]) + df = df.sort_index() df = df.join(df_metric_core) df = df.reset_index() @@ -64,19 +65,21 @@ def reformat(df, df_metric): def add_identifier_columns(df): - """Add identifier columns. + """Parse `key_plot_id` to create several key columns - Add columns to get more detail than key_plot_id gives; + `key_plot_id` is of format "___wwtp_id". We split by `_` and put each resulting item into its own column. Add columns to get more detail than key_plot_id gives; specifically, state, and `provider_normalization`, which gives the signal identifier """ - # a pair of alphanumerics surrounded by _ + df = df.copy() + # a pair of alphanumerics surrounded by _; for example, it matches "_al_", and not "_3a_" and returns just the two letters "al" df["state"] = df.key_plot_id.str.extract(r"_(\w\w)_") - # anything followed by state ^ + # anything followed by state as described just above. For example "CDC_VERILY_al" pulls out "CDC_VERILY" df["provider"] = df.key_plot_id.str.extract(r"(.*)_[a-z]{2}_") df["signal_name"] = df.provider + "_" + df.normalization + return df -def check_endpoints(df): +def check_expected_signals(df): """Make sure that there aren't any new signals that we need to add.""" # compare with existing column name checker # also add a note about handling errors @@ -103,8 +106,7 @@ def pull_nwss_data(token: str, logger): ---------- socrata_token: str My App Token for pulling the NWSS data (could be the same as the nchs data) - test_file: Optional[str] - When not null, name of file from which to read test data + logger: the structured logger Returns ------- @@ -127,7 +129,7 @@ def pull_nwss_data(token: str, logger): df = df_concentration[~df_concentration["normalization"].isna()] # Pull 2 letter state labels out of the key_plot_id labels. - add_identifier_columns(df) + df = add_identifier_columns(df) # move population and metric signals over to df df = reformat(df, df_metric) @@ -139,7 +141,7 @@ def pull_nwss_data(token: str, logger): # likely introduced by dates only present in one and not the other; even # otherwise, best to assume some value rather than break the data) df.population_served = df.population_served.ffill() - check_endpoints(df) + check_expected_signals(df) keep_columns = [ *SIGNALS, diff --git a/nwss_wastewater/delphi_nwss/run.py b/nwss_wastewater/delphi_nwss/run.py index 3dc88f0b6..dd359a7b0 100644 --- a/nwss_wastewater/delphi_nwss/run.py +++ b/nwss_wastewater/delphi_nwss/run.py @@ -2,7 +2,7 @@ """Functions to call when running the function. This module should contain a function called `run_module`, that is executed -when the module is run with `python -m MODULE_NAME`. `run_module`'s lone argument should be a +when the module is run with `python -m delphi_nwss`. `run_module`'s lone argument should be a nested dictionary of parameters loaded from the params.json file. We expect the `params` to have the following structure: - "common": @@ -16,17 +16,17 @@ `delphi_utils.add_prefix()` - "test_file" (optional): str, name of file from which to read test data - "socrata_token": str, authentication for upstream data pull - - "archive" (optional): if provided, output will be archived with S3 - - "aws_credentials": Dict[str, str], AWS login credentials (see S3 documentation) - - "bucket_name: str, name of S3 bucket to read/write - - "cache_dir": str, directory of locally cached data """ import time from datetime import datetime import numpy as np -from delphi_utils import GeoMapper, S3ArchiveDiffer, create_export_csv, get_structured_logger +from delphi_utils import ( + GeoMapper, + get_structured_logger, + create_export_csv, +) from delphi_utils.nancodes import add_default_nancodes from .constants import GEOS, METRIC_SIGNALS, PROVIDER_NORMS, SIGNALS @@ -37,6 +37,10 @@ def add_needed_columns(df, col_names=None): """Short util to add expected columns not found in the dataset.""" if col_names is None: col_names = ["se", "sample_size"] + else: + assert "geo_value" not in col_names + assert "time_value" not in col_names + assert "value" not in col_names for col_name in col_names: df[col_name] = np.nan @@ -77,52 +81,44 @@ def run_module(params): ) export_dir = params["common"]["export_dir"] socrata_token = params["indicator"]["socrata_token"] - if "archive" in params: - arch_diff = S3ArchiveDiffer( - params["archive"]["cache_dir"], - export_dir, - params["archive"]["bucket_name"], - "nwss_wastewater", - params["archive"]["aws_credentials"], - ) - arch_diff.update_cache() - run_stats = [] ## build the base version of the signal at the most detailed geo level you can get. ## compute stuff here or farm out to another function or file df_pull = pull_nwss_data(socrata_token, logger) geomapper = GeoMapper() # iterate over the providers and the normalizations that they specifically provide - for provider, normalization in zip(PROVIDER_NORMS["provider"], PROVIDER_NORMS["normalization"]): + for provider, normalization in zip( + PROVIDER_NORMS["provider"], PROVIDER_NORMS["normalization"] + ): # copy by only taking the relevant subsection - df_prov_norm = df_pull[(df_pull.provider == provider) & (df_pull.normalization == normalization)] + df_prov_norm = df_pull[ + (df_pull.provider == provider) & (df_pull.normalization == normalization) + ] df_prov_norm = df_prov_norm.drop(["provider", "normalization"], axis=1) for sensor in [*SIGNALS, *METRIC_SIGNALS]: full_sensor_name = sensor + "_" + provider + "_" + normalization - df_prov_norm = df_prov_norm.rename(columns={sensor: full_sensor_name}) for geo in GEOS: - logger.info("Generating signal and exporting to CSV", metric=full_sensor_name) + logger.info( + "Generating signal and exporting to CSV", metric=full_sensor_name + ) if geo == "nation": df_prov_norm["nation"] = "us" agg_df = geomapper.aggregate_by_weighted_sum( df_prov_norm, geo, - full_sensor_name, + sensor, "timestamp", "population_served", ) - agg_df = agg_df.rename(columns={geo: "geo_id", f"weighted_{full_sensor_name}": "val"}) + agg_df = agg_df.rename( + columns={geo: "geo_id", f"weighted_{sensor}": "val"} + ) # add se, sample_size, and na codes agg_df = add_needed_columns(agg_df) # actual export - dates = create_export_csv(agg_df, geo_res=geo, export_dir=export_dir, sensor=full_sensor_name) - if "archive" in params: - _, common_diffs, new_files = arch_diff.diff_exports() - to_archive = [f for f, diff in common_diffs.items() if diff is not None] - to_archive += new_files - _, fails = arch_diff.archive_exports(to_archive) - succ_common_diffs = {f: diff for f, diff in common_diffs.items() if f not in fails} - arch_diff.filter_exports(succ_common_diffs) + dates = create_export_csv( + agg_df, geo_res=geo, export_dir=export_dir, sensor=full_sensor_name + ) if len(dates) > 0: run_stats.append((max(dates), len(dates))) ## log this indicator run diff --git a/nwss_wastewater/tests/test_pull.py b/nwss_wastewater/tests/test_pull.py index 6b7bffa24..273f6e311 100644 --- a/nwss_wastewater/tests/test_pull.py +++ b/nwss_wastewater/tests/test_pull.py @@ -109,7 +109,7 @@ def test_formatting(): def test_identifier_colnames(): test_df = pd.read_csv("test_data/conc_data.csv", index_col=0) - add_identifier_columns(test_df) + test_df = add_identifier_columns(test_df) assert all(test_df.state.unique() == ["ak", "tn"]) assert all(test_df.provider.unique() == ["CDC_BIOBOT", "WWS"]) # the only cases where the signal name is wrong is when normalization isn't defined From 89b5793def6e2a3521041a34f5fdc5dcf3d40d19 Mon Sep 17 00:00:00 2001 From: dsweber2 Date: Fri, 21 Jun 2024 14:39:05 -0500 Subject: [PATCH 15/18] PROVIDER column pairs to provider dict. --- nwss_wastewater/delphi_nwss/constants.py | 12 ++-- nwss_wastewater/delphi_nwss/pull.py | 38 ++++++++++--- nwss_wastewater/delphi_nwss/run.py | 72 +++++++++++++----------- 3 files changed, 71 insertions(+), 51 deletions(-) diff --git a/nwss_wastewater/delphi_nwss/constants.py b/nwss_wastewater/delphi_nwss/constants.py index b04206d14..10ffbafd6 100644 --- a/nwss_wastewater/delphi_nwss/constants.py +++ b/nwss_wastewater/delphi_nwss/constants.py @@ -13,15 +13,11 @@ SIGNALS = ["pcr_conc_smoothed"] METRIC_SIGNALS = ["detect_prop_15d", "percentile", "ptc_15d"] PROVIDER_NORMS = { - "provider": ["CDC_VERILY", "CDC_VERILY", "NWSS", "NWSS", "WWS"], - "normalization": [ - "flow-population", - "microbial", - "flow-population", - "microbial", - "microbial", - ], + "CDC_VERILY": ("flow-population", "microbial"), + "NWSS": ("flow-population", "microbial"), + "WWS": ("microbial",), } + SIG_DIGITS = 4 TYPE_DICT = {key: float for key in SIGNALS} diff --git a/nwss_wastewater/delphi_nwss/pull.py b/nwss_wastewater/delphi_nwss/pull.py index a6a291e05..a5f600345 100644 --- a/nwss_wastewater/delphi_nwss/pull.py +++ b/nwss_wastewater/delphi_nwss/pull.py @@ -5,7 +5,14 @@ import pandas as pd from sodapy import Socrata -from .constants import METRIC_SIGNALS, PROVIDER_NORMS, SIG_DIGITS, SIGNALS, TYPE_DICT, TYPE_DICT_METRIC +from .constants import ( + METRIC_SIGNALS, + PROVIDER_NORMS, + SIG_DIGITS, + SIGNALS, + TYPE_DICT, + TYPE_DICT_METRIC, +) def sig_digit_round(value, n_digits): @@ -49,10 +56,13 @@ def convert_df_type(df, type_dict, logger): def reformat(df, df_metric): """Combine df_metric and df - Move population and METRIC_SIGNAL columns from df_metric to df, and rename date_start to timestamp. + Move population and METRIC_SIGNAL columns from df_metric to df, and rename + date_start to timestamp. """ # drop unused columns from df_metric - df_metric_core = df_metric.loc[:, ["key_plot_id", "date_end", "population_served", *METRIC_SIGNALS]] + df_metric_core = df_metric.loc[ + :, ["key_plot_id", "date_end", "population_served", *METRIC_SIGNALS] + ] # get matching keys df_metric_core = df_metric_core.rename(columns={"date_end": "timestamp"}) df_metric_core = df_metric_core.set_index(["key_plot_id", "timestamp"]) @@ -67,13 +77,17 @@ def reformat(df, df_metric): def add_identifier_columns(df): """Parse `key_plot_id` to create several key columns - `key_plot_id` is of format "___wwtp_id". We split by `_` and put each resulting item into its own column. Add columns to get more detail than key_plot_id gives; - specifically, state, and `provider_normalization`, which gives the signal identifier + `key_plot_id` is of format "___wwtp_id". + We split by `_` and put each resulting item into its own column. + Add columns to get more detail than key_plot_id gives; specifically, state, and + `provider_normalization`, which gives the signal identifier """ df = df.copy() - # a pair of alphanumerics surrounded by _; for example, it matches "_al_", and not "_3a_" and returns just the two letters "al" + # a pair of alphanumerics surrounded by _; for example, it matches "_al_", + # and not "_3a_" and returns just the two letters "al" df["state"] = df.key_plot_id.str.extract(r"_(\w\w)_") - # anything followed by state as described just above. For example "CDC_VERILY_al" pulls out "CDC_VERILY" + # anything followed by state as described just above. + # For example "CDC_VERILY_al" pulls out "CDC_VERILY" df["provider"] = df.key_plot_id.str.extract(r"(.*)_[a-z]{2}_") df["signal_name"] = df.provider + "_" + df.normalization return df @@ -89,8 +103,14 @@ def check_expected_signals(df): .sort_values(["provider", "normalization"]) .reset_index(drop=True) ) - if not unique_provider_norms.equals(pd.DataFrame(PROVIDER_NORMS)): - raise ValueError(f"There are new providers and/or norms. They are\n{unique_provider_norms}") + for provider, normalization in zip( + unique_provider_norms["provider"], unique_provider_norms["normalization"] + ): + if not normalization in PROVIDER_NORMS[provider]: + raise ValueError( + f"There are new providers and/or norms." + f"The full new set is\n{unique_provider_norms}" + ) def pull_nwss_data(token: str, logger): diff --git a/nwss_wastewater/delphi_nwss/run.py b/nwss_wastewater/delphi_nwss/run.py index dd359a7b0..d236bca6a 100644 --- a/nwss_wastewater/delphi_nwss/run.py +++ b/nwss_wastewater/delphi_nwss/run.py @@ -87,39 +87,43 @@ def run_module(params): df_pull = pull_nwss_data(socrata_token, logger) geomapper = GeoMapper() # iterate over the providers and the normalizations that they specifically provide - for provider, normalization in zip( - PROVIDER_NORMS["provider"], PROVIDER_NORMS["normalization"] - ): - # copy by only taking the relevant subsection - df_prov_norm = df_pull[ - (df_pull.provider == provider) & (df_pull.normalization == normalization) - ] - df_prov_norm = df_prov_norm.drop(["provider", "normalization"], axis=1) - for sensor in [*SIGNALS, *METRIC_SIGNALS]: - full_sensor_name = sensor + "_" + provider + "_" + normalization - for geo in GEOS: - logger.info( - "Generating signal and exporting to CSV", metric=full_sensor_name - ) - if geo == "nation": - df_prov_norm["nation"] = "us" - agg_df = geomapper.aggregate_by_weighted_sum( - df_prov_norm, - geo, - sensor, - "timestamp", - "population_served", - ) - agg_df = agg_df.rename( - columns={geo: "geo_id", f"weighted_{sensor}": "val"} - ) - # add se, sample_size, and na codes - agg_df = add_needed_columns(agg_df) - # actual export - dates = create_export_csv( - agg_df, geo_res=geo, export_dir=export_dir, sensor=full_sensor_name - ) - if len(dates) > 0: - run_stats.append((max(dates), len(dates))) + for provider, normalizations in PROVIDER_NORMS.items(): + for normalization in normalizations: + # copy by only taking the relevant subsection + df_prov_norm = df_pull[ + (df_pull.provider == provider) + & (df_pull.normalization == normalization) + ] + df_prov_norm = df_prov_norm.drop(["provider", "normalization"], axis=1) + for sensor in [*SIGNALS, *METRIC_SIGNALS]: + full_sensor_name = sensor + "_" + provider + "_" + normalization + for geo in GEOS: + logger.info( + "Generating signal and exporting to CSV", + metric=full_sensor_name, + ) + if geo == "nation": + df_prov_norm["nation"] = "us" + agg_df = geomapper.aggregate_by_weighted_sum( + df_prov_norm, + geo, + sensor, + "timestamp", + "population_served", + ) + agg_df = agg_df.rename( + columns={geo: "geo_id", f"weighted_{sensor}": "val"} + ) + # add se, sample_size, and na codes + agg_df = add_needed_columns(agg_df) + # actual export + dates = create_export_csv( + agg_df, + geo_res=geo, + export_dir=export_dir, + sensor=full_sensor_name, + ) + if len(dates) > 0: + run_stats.append((max(dates), len(dates))) ## log this indicator run logging(start_time, run_stats, logger) From 55d56b39d3b0ac0c48d5e0c6702d7c36e9ccc68b Mon Sep 17 00:00:00 2001 From: dsweber2 Date: Wed, 26 Jun 2024 12:28:57 -0500 Subject: [PATCH 16/18] lint --- nwss_wastewater/delphi_nwss/pull.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nwss_wastewater/delphi_nwss/pull.py b/nwss_wastewater/delphi_nwss/pull.py index a5f600345..6824cb25b 100644 --- a/nwss_wastewater/delphi_nwss/pull.py +++ b/nwss_wastewater/delphi_nwss/pull.py @@ -54,7 +54,7 @@ def convert_df_type(df, type_dict, logger): def reformat(df, df_metric): - """Combine df_metric and df + """Combine df_metric and df. Move population and METRIC_SIGNAL columns from df_metric to df, and rename date_start to timestamp. @@ -75,7 +75,7 @@ def reformat(df, df_metric): def add_identifier_columns(df): - """Parse `key_plot_id` to create several key columns + """Parse `key_plot_id` to create several key columns. `key_plot_id` is of format "___wwtp_id". We split by `_` and put each resulting item into its own column. From b5c38ce298a661af1b50dbc3abb3f17458c8becb Mon Sep 17 00:00:00 2001 From: Nat DeFries <42820733+nmdefries@users.noreply.github.com> Date: Wed, 26 Jun 2024 17:46:19 -0400 Subject: [PATCH 17/18] move socrata ids to constants --- nwss_wastewater/delphi_nwss/constants.py | 4 ++++ nwss_wastewater/delphi_nwss/pull.py | 9 ++++++--- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/nwss_wastewater/delphi_nwss/constants.py b/nwss_wastewater/delphi_nwss/constants.py index 10ffbafd6..648e44708 100644 --- a/nwss_wastewater/delphi_nwss/constants.py +++ b/nwss_wastewater/delphi_nwss/constants.py @@ -38,3 +38,7 @@ "sample_location_specify": float, } ) + +SOURCE_URL = "data.cdc.gov" +CONCENTRATION_TABLE_ID = "g653-rqe2" +METRIC_TABLE_ID = "2ew6-ywp6" diff --git a/nwss_wastewater/delphi_nwss/pull.py b/nwss_wastewater/delphi_nwss/pull.py index 6824cb25b..9fb571e1c 100644 --- a/nwss_wastewater/delphi_nwss/pull.py +++ b/nwss_wastewater/delphi_nwss/pull.py @@ -12,6 +12,9 @@ SIGNALS, TYPE_DICT, TYPE_DICT_METRIC, + SOURCE_URL, + CONCENTRATION_TABLE_ID, + METRIC_TABLE_ID, ) @@ -134,9 +137,9 @@ def pull_nwss_data(token: str, logger): Dataframe as described above. """ # Pull data from Socrata API - client = Socrata("data.cdc.gov", token) - results_concentration = client.get("g653-rqe2", limit=10**10) - results_metric = client.get("2ew6-ywp6", limit=10**10) + client = Socrata(SOURCE_URL, token) + results_concentration = client.get(CONCENTRATION_TABLE_ID, limit=10**10) + results_metric = client.get(METRIC_TABLE_ID, limit=10**10) df_metric = pd.DataFrame.from_records(results_metric) df_concentration = pd.DataFrame.from_records(results_concentration) df_concentration = df_concentration.rename(columns={"date": "timestamp"}) From eadd03d0448e9f25092de9923864fd48d783b7d1 Mon Sep 17 00:00:00 2001 From: Nat DeFries <42820733+nmdefries@users.noreply.github.com> Date: Thu, 27 Jun 2024 12:44:54 -0400 Subject: [PATCH 18/18] perform ffill by group and comment --- nwss_wastewater/delphi_nwss/pull.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/nwss_wastewater/delphi_nwss/pull.py b/nwss_wastewater/delphi_nwss/pull.py index 9fb571e1c..d9fd9921c 100644 --- a/nwss_wastewater/delphi_nwss/pull.py +++ b/nwss_wastewater/delphi_nwss/pull.py @@ -160,10 +160,16 @@ def pull_nwss_data(token: str, logger): for signal in [*SIGNALS, *METRIC_SIGNALS]: df[signal] = sig_digit_round(df[signal], SIG_DIGITS) - # if there are population NA's, assume the previous value is accurate (most - # likely introduced by dates only present in one and not the other; even - # otherwise, best to assume some value rather than break the data) - df.population_served = df.population_served.ffill() + # For each location, fill missing population values with a previous + # population value. + # Missing population values seem to be introduced by dates present in only + # one of the two (concentration and metric) datastes. This `ffill` approach + # assumes that the population on a previous date is still accurate. However, + # population served by a given sewershed can and does change over time. The + # effect is presumably minimal since contiguous dates with missing + # population should be limited in length such that incorrect + # population values are quickly corrected. + df.population_served = df.population_served.groupby(by = ["key_plot_id"]).ffill() check_expected_signals(df) keep_columns = [