diff --git a/ansible/templates/dsew_community_profile-params-prod.json.j2 b/ansible/templates/dsew_community_profile-params-prod.json.j2 index fd377d758..ec3e254c3 100644 --- a/ansible/templates/dsew_community_profile-params-prod.json.j2 +++ b/ansible/templates/dsew_community_profile-params-prod.json.j2 @@ -26,7 +26,9 @@ "ref_window_size": 7, "smoothed_signals": [ "naats_total_7dav", - "naats_positivity_7dav" + "naats_positivity_7dav", + "confirmed_admissions_covid_1d_prop_7dav", + "confirmed_admissions_covid_1d_7dav" ] } } diff --git a/dsew_community_profile/delphi_dsew_community_profile/constants.py b/dsew_community_profile/delphi_dsew_community_profile/constants.py index 51c62b5ea..1404e52f4 100644 --- a/dsew_community_profile/delphi_dsew_community_profile/constants.py +++ b/dsew_community_profile/delphi_dsew_community_profile/constants.py @@ -50,22 +50,34 @@ class Transform: SIGNALS = { "total": { "is_rate" : False, - "api_name": "naats_total_7dav" + "api_name": "naats_total_7dav", + "make_prop": False }, "positivity": { "is_rate" : True, - "api_name": "naats_positivity_7dav" + "api_name": "naats_positivity_7dav", + "make_prop": False }, "confirmed covid-19 admissions": { "is_rate" : False, - "api_name": "confirmed_admissions_covid_1d_7dav" + "api_name": "confirmed_admissions_covid_1d_7dav", + "make_prop": True, + "api_prop_name": "confirmed_admissions_covid_1d_prop_7dav" } } COUNTS_7D_SIGNALS = {key for key, value in SIGNALS.items() if not value["is_rate"]} -def make_signal_name(key): - """Convert a signal key to the corresponding signal name for the API.""" +def make_signal_name(key, is_prop=False): + """Convert a signal key to the corresponding signal name for the API. + + Note, this function gets called twice with the same `key` for signals that support + population-proportion ("prop") variants. + """ + if is_prop: + return SIGNALS[key]["api_prop_name"] return SIGNALS[key]["api_name"] -NEWLINE="\n" +NEWLINE = "\n" +IS_PROP = True +NOT_PROP = False diff --git a/dsew_community_profile/delphi_dsew_community_profile/pull.py b/dsew_community_profile/delphi_dsew_community_profile/pull.py index a65b26a07..b1ac7069b 100644 --- a/dsew_community_profile/delphi_dsew_community_profile/pull.py +++ b/dsew_community_profile/delphi_dsew_community_profile/pull.py @@ -11,8 +11,9 @@ from delphi_utils.geomap import GeoMapper -from .constants import TRANSFORMS, SIGNALS, COUNTS_7D_SIGNALS, NEWLINE -from .constants import DOWNLOAD_ATTACHMENT, DOWNLOAD_LISTING +from .constants import (TRANSFORMS, SIGNALS, COUNTS_7D_SIGNALS, NEWLINE, + IS_PROP, NOT_PROP, + DOWNLOAD_ATTACHMENT, DOWNLOAD_LISTING) # YYYYMMDD # example: "Community Profile Report 20211104.xlsx" @@ -248,7 +249,7 @@ def _parse_sheet(self, sheet): if (sheet.level == "msa" or sheet.level == "county") \ and self.publish_date < datetime.date(2021, 1, 8) \ and sig == "confirmed covid-19 admissions": - self.dfs[(sheet.level, sig)] = pd.DataFrame( + self.dfs[(sheet.level, sig, NOT_PROP)] = pd.DataFrame( columns = ["geo_id", "timestamp", "val", \ "se", "sample_size", "publish_date"] ) @@ -258,7 +259,7 @@ def _parse_sheet(self, sheet): assert len(sig_select) > 0, \ f"No {sig} in any of {select}\n\nAll headers:\n{NEWLINE.join(list(df.columns))}" - self.dfs[(sheet.level, sig)] = pd.concat([ + self.dfs[(sheet.level, sig, NOT_PROP)] = pd.concat([ pd.DataFrame({ "geo_id": sheet.geo_id_select(df).apply(sheet.geo_id_apply), "timestamp": pd.to_datetime(self.times[si[0]][sig]), @@ -271,7 +272,7 @@ def _parse_sheet(self, sheet): ]) for sig in COUNTS_7D_SIGNALS: - self.dfs[(sheet.level, sig)]["val"] /= 7 # 7-day total -> 7-day average + self.dfs[(sheet.level, sig, NOT_PROP)]["val"] /= 7 # 7-day total -> 7-day average def as_cached_filename(params, config): @@ -390,13 +391,46 @@ def fetch_new_reports(params, logger=None): # add nation from state geomapper = GeoMapper() for sig in SIGNALS: - state_key = ("state", sig) + state_key = ("state", sig, NOT_PROP) if state_key not in ret: continue - ret[("nation", sig)] = nation_from_state( + ret[("nation", sig, NOT_PROP)] = nation_from_state( ret[state_key].rename(columns={"geo_id": "state_id"}), sig, geomapper ) + for key, df in ret.copy().items(): + (geo, sig, _) = key + if SIGNALS[sig]["make_prop"]: + ret[(geo, sig, IS_PROP)] = generate_prop_signal(df, geo, geomapper) + return ret + +def generate_prop_signal(df, geo, geo_mapper): + """Transform base df into a proportion (per 100k population).""" + if geo == "state": + geo = "state_id" + if geo == "county": + geo = "fips" + + # Add population data + if geo == "msa": + map_df = geo_mapper.get_crosswalk("fips", geo) + map_df = geo_mapper.add_population_column( + map_df, "fips" + ).drop( + "fips", axis=1 + ).groupby( + geo + ).sum( + ).reset_index( + ) + df = pd.merge(df, map_df, left_on="geo_id", right_on=geo, how="inner") + else: + df = geo_mapper.add_population_column(df, geo, geocode_col="geo_id") + + df["val"] = df["val"] / df["population"] * 100000 + df.drop(["population", geo], axis=1, inplace=True) + + return df diff --git a/dsew_community_profile/delphi_dsew_community_profile/run.py b/dsew_community_profile/delphi_dsew_community_profile/run.py index d27c96216..3ce69b325 100644 --- a/dsew_community_profile/delphi_dsew_community_profile/run.py +++ b/dsew_community_profile/delphi_dsew_community_profile/run.py @@ -58,14 +58,14 @@ def replace_date_param(p): run_stats = [] dfs = fetch_new_reports(params, logger) for key, df in dfs.items(): - (geo, sig) = key + (geo, sig, is_prop) = key if sig not in params["indicator"]["export_signals"]: continue dates = create_export_csv( df, params['common']['export_dir'], geo, - make_signal_name(sig), + make_signal_name(sig, is_prop), **export_params ) if len(dates)>0: diff --git a/dsew_community_profile/params.json.template b/dsew_community_profile/params.json.template index 3a64d71ab..42fc7faad 100644 --- a/dsew_community_profile/params.json.template +++ b/dsew_community_profile/params.json.template @@ -32,7 +32,9 @@ "ref_window_size": 7, "smoothed_signals": [ "naats_total_7dav", - "naats_positivity_7dav" + "naats_positivity_7dav", + "confirmed_admissions_covid_1d_prop_7dav", + "confirmed_admissions_covid_1d_7dav" ] } } diff --git a/dsew_community_profile/tests/params.json.template b/dsew_community_profile/tests/params.json.template index 89cee4bf0..645bd253f 100644 --- a/dsew_community_profile/tests/params.json.template +++ b/dsew_community_profile/tests/params.json.template @@ -25,7 +25,9 @@ "ref_window_size": 7, "smoothed_signals": [ "naats_total_7dav", - "naats_positivity_7dav" + "naats_positivity_7dav", + "confirmed_admissions_covid_1d_prop_7dav", + "confirmed_admissions_covid_1d_7dav" ] } } diff --git a/dsew_community_profile/tests/test_pull.py b/dsew_community_profile/tests/test_pull.py index 60f0fa5dd..b898e21b6 100644 --- a/dsew_community_profile/tests/test_pull.py +++ b/dsew_community_profile/tests/test_pull.py @@ -9,7 +9,7 @@ from delphi_dsew_community_profile.pull import DatasetTimes from delphi_dsew_community_profile.pull import Dataset -from delphi_dsew_community_profile.pull import fetch_listing, nation_from_state +from delphi_dsew_community_profile.pull import fetch_listing, nation_from_state, generate_prop_signal example = namedtuple("example", "given expected") @@ -213,3 +213,84 @@ def test_nation_from_state(self): 'sample_size': [None],}), check_like=True ) + + def test_generate_prop_signal_msa(self): + geomapper = GeoMapper() + county_pop = geomapper.get_crosswalk("fips", "pop") + county_msa = geomapper.get_crosswalk("fips", "msa") + msa_pop = county_pop.merge(county_msa, on="fips", how="inner").groupby("msa").sum().reset_index() + + test_df = pd.DataFrame({ + 'geo_id': ['35620', '31080'], + 'timestamp': [datetime(year=2020, month=1, day=1)]*2, + 'val': [15., 150.], + 'se': [None, None], + 'sample_size': [None, None],}) + + nyc_pop = int(msa_pop.loc[msa_pop.msa == "35620", "pop"]) + la_pop = int(msa_pop.loc[msa_pop.msa == "31080", "pop"]) + + expected_df = pd.DataFrame({ + 'geo_id': ['35620', '31080'], + 'timestamp': [datetime(year=2020, month=1, day=1)]*2, + 'val': [15. / nyc_pop * 100000, 150. / la_pop * 100000], + 'se': [None, None], + 'sample_size': [None, None],}) + + pd.testing.assert_frame_equal( + generate_prop_signal( + test_df.copy(), + "msa", + geomapper + ), + expected_df, + check_like=True + ) + def test_generate_prop_signal_non_msa(self): + geomapper = GeoMapper() + + geos = { + "state": { + "code_name": "state_id", + "geo_names": ['pa', 'wv'] + }, + "county": { + "code_name": "fips", + "geo_names": ['36061', '06037'] + }, + # nation uses the same logic path so no need to test separately + "hhs": { + "code_name": "hhs", + "geo_names": ["1", "4"] + } + } + + for geo, settings in geos.items(): + geo_pop = geomapper.get_crosswalk(settings["code_name"], "pop") + + test_df = pd.DataFrame({ + 'geo_id': settings["geo_names"], + 'timestamp': [datetime(year=2020, month=1, day=1)]*2, + 'val': [15., 150.], + 'se': [None, None], + 'sample_size': [None, None],}) + + pop1 = int(geo_pop.loc[geo_pop[settings["code_name"]] == settings["geo_names"][0], "pop"]) + pop2 = int(geo_pop.loc[geo_pop[settings["code_name"]] == settings["geo_names"][1], "pop"]) + + expected_df = pd.DataFrame({ + 'geo_id': settings["geo_names"], + 'timestamp': [datetime(year=2020, month=1, day=1)]*2, + 'val': [15. / pop1 * 100000, 150. / pop2 * 100000], + 'se': [None, None], + 'sample_size': [None, None],}) + + pd.testing.assert_frame_equal( + generate_prop_signal( + test_df.copy(), + geo, + geomapper + ), + expected_df, + check_like=True + )