From 75052798412ee335e7a90a25b55d13144a46c31e Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Thu, 7 Nov 2024 13:59:59 -0500 Subject: [PATCH 1/6] more logging and cleaned logic --- .../delphi_google_symptoms/constants.py | 17 +++++++++-------- .../delphi_google_symptoms/date_utils.py | 2 +- google_symptoms/delphi_google_symptoms/pull.py | 10 +++++----- google_symptoms/delphi_google_symptoms/run.py | 10 +++++++--- 4 files changed, 22 insertions(+), 17 deletions(-) diff --git a/google_symptoms/delphi_google_symptoms/constants.py b/google_symptoms/delphi_google_symptoms/constants.py index 795ac3df7..ad5a5e718 100644 --- a/google_symptoms/delphi_google_symptoms/constants.py +++ b/google_symptoms/delphi_google_symptoms/constants.py @@ -30,14 +30,15 @@ METRICS = METRICS + SYMPTOM_SETS[combmetric] SMOOTHERS = ["raw", "smoothed"] -GEO_RESOLUTIONS = [ - "state", - "county", - "msa", - "hrr", - "hhs", - "nation" -] + +GEO_RESOLUTIONS = { + "state" : "state", + "county" : "county", + "msa": "county", + "hrr": "county", + "hhs": "state", + "nation": "state", +} SMOOTHERS_MAP = { "raw": (Smoother("identity", impute_method=None), diff --git a/google_symptoms/delphi_google_symptoms/date_utils.py b/google_symptoms/delphi_google_symptoms/date_utils.py index 2ad6244e9..1d9de2cd4 100644 --- a/google_symptoms/delphi_google_symptoms/date_utils.py +++ b/google_symptoms/delphi_google_symptoms/date_utils.py @@ -98,7 +98,7 @@ def generate_num_export_days(params: Dict, logger) -> [int]: expected_date_diff += global_max_expected_lag if latest_date_diff > expected_date_diff: - logger.info("Missing date", date=to_datetime(min(gs_metadata.max_time)).date()) + logger.info("Lag is more than expected", expected_lag=expected_date_diff, lag=latest_date_diff) num_export_days = expected_date_diff diff --git a/google_symptoms/delphi_google_symptoms/pull.py b/google_symptoms/delphi_google_symptoms/pull.py index a8c4cdfde..5bdd093e3 100644 --- a/google_symptoms/delphi_google_symptoms/pull.py +++ b/google_symptoms/delphi_google_symptoms/pull.py @@ -158,7 +158,7 @@ def produce_query(level, date_range): return query -def pull_gs_data_one_geolevel(level, date_range): +def pull_gs_data_one_geolevel(level, date_range, logger): """Pull latest data for a single geo level. Fetch data and transform it into the appropriate format, as described in @@ -209,6 +209,7 @@ def pull_gs_data_one_geolevel(level, date_range): if len(df) == 0: df = pd.DataFrame(columns=["open_covid_region_code", "date"] + list(colname_map.keys())) + logger.info("No data available for date range", geo_level=level, start_date=date_range[0], end_date=date_range[1]) df = preprocess(df, level) return df @@ -232,7 +233,7 @@ def initialize_credentials(credentials): pandas_gbq.context.project = credentials.project_id -def pull_gs_data(credentials, export_start_date, export_end_date, num_export_days, custom_run_flag): +def pull_gs_data(credentials, export_start_date, export_end_date, num_export_days, custom_run_flag, logger): """Pull latest dataset for each geo level and combine. PS: No information for PR @@ -264,10 +265,9 @@ def pull_gs_data(credentials, export_start_date, export_end_date, num_export_day dfs = {} # For state level data - dfs["state"] = pull_gs_data_one_geolevel("state", retrieve_dates) + dfs["state"] = pull_gs_data_one_geolevel("state", retrieve_dates, logger) # For county level data - dfs["county"] = pull_gs_data_one_geolevel("county", retrieve_dates) - + dfs["county"] = pull_gs_data_one_geolevel("county", retrieve_dates, logger) # Add District of Columbia as county try: diff --git a/google_symptoms/delphi_google_symptoms/run.py b/google_symptoms/delphi_google_symptoms/run.py index 8ad1d6d10..30811e061 100644 --- a/google_symptoms/delphi_google_symptoms/run.py +++ b/google_symptoms/delphi_google_symptoms/run.py @@ -68,8 +68,14 @@ def run_module(params, logger=None): export_end_date, num_export_days, custom_run_flag, + logger ) - for geo_res in GEO_RESOLUTIONS: + + for geo_res in GEO_RESOLUTIONS.keys(): + df_pull = dfs[GEO_RESOLUTIONS[geo_res]] + if len(df_pull) == 0: + logger.info("Skipping processing; No data available for geo level", geo_level=geo_res) + continue if geo_res == "state": df_pull = dfs["state"] elif geo_res in ["hhs", "nation"]: @@ -77,8 +83,6 @@ def run_module(params, logger=None): else: df_pull = geo_map(dfs["county"], geo_res) - if len(df_pull) == 0: - continue for metric, smoother in product(COMBINED_METRIC, SMOOTHERS): sensor_name = "_".join([smoother, "search"]) logger.info("Generating signal and exporting to CSV", geo_type=geo_res, signal=f"{metric}_{sensor_name}") From 82379c41b082165b41c664f17573587f69354526 Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Thu, 7 Nov 2024 14:10:02 -0500 Subject: [PATCH 2/6] changed variable names --- google_symptoms/delphi_google_symptoms/run.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/google_symptoms/delphi_google_symptoms/run.py b/google_symptoms/delphi_google_symptoms/run.py index 30811e061..c404ae6ac 100644 --- a/google_symptoms/delphi_google_symptoms/run.py +++ b/google_symptoms/delphi_google_symptoms/run.py @@ -71,17 +71,17 @@ def run_module(params, logger=None): logger ) - for geo_res in GEO_RESOLUTIONS.keys(): - df_pull = dfs[GEO_RESOLUTIONS[geo_res]] + for geo_level, mapped_res in GEO_RESOLUTIONS.items(): + df_pull = dfs[mapped_res] if len(df_pull) == 0: - logger.info("Skipping processing; No data available for geo level", geo_level=geo_res) + logger.info("Skipping processing; No data available for geo level", geo_level=geo_level) continue - if geo_res == "state": + if geo_level == "state": df_pull = dfs["state"] - elif geo_res in ["hhs", "nation"]: - df_pull = geo_map(dfs["state"], geo_res) + elif geo_level in ["hhs", "nation"]: + df_pull = geo_map(dfs[mapped_res], geo_level) else: - df_pull = geo_map(dfs["county"], geo_res) + df_pull = geo_map(dfs[mapped_res], geo_level) for metric, smoother in product(COMBINED_METRIC, SMOOTHERS): sensor_name = "_".join([smoother, "search"]) From 6eb7bc2c7741adee41d239ba1f591ec8f2e782f3 Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Thu, 7 Nov 2024 16:26:29 -0500 Subject: [PATCH 3/6] lint --- .../delphi_google_symptoms/constants.py | 4 ++-- google_symptoms/delphi_google_symptoms/pull.py | 4 +++- google_symptoms/delphi_google_symptoms/run.py | 14 +++++++------- 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/google_symptoms/delphi_google_symptoms/constants.py b/google_symptoms/delphi_google_symptoms/constants.py index ad5a5e718..b4d51af26 100644 --- a/google_symptoms/delphi_google_symptoms/constants.py +++ b/google_symptoms/delphi_google_symptoms/constants.py @@ -32,8 +32,8 @@ SMOOTHERS = ["raw", "smoothed"] GEO_RESOLUTIONS = { - "state" : "state", - "county" : "county", + "state": "state", + "county": "county", "msa": "county", "hrr": "county", "hhs": "state", diff --git a/google_symptoms/delphi_google_symptoms/pull.py b/google_symptoms/delphi_google_symptoms/pull.py index 5bdd093e3..22ad29723 100644 --- a/google_symptoms/delphi_google_symptoms/pull.py +++ b/google_symptoms/delphi_google_symptoms/pull.py @@ -209,7 +209,9 @@ def pull_gs_data_one_geolevel(level, date_range, logger): if len(df) == 0: df = pd.DataFrame(columns=["open_covid_region_code", "date"] + list(colname_map.keys())) - logger.info("No data available for date range", geo_level=level, start_date=date_range[0], end_date=date_range[1]) + logger.info( + "No data available for date range", geo_level=level, start_date=date_range[0], end_date=date_range[1] + ) df = preprocess(df, level) return df diff --git a/google_symptoms/delphi_google_symptoms/run.py b/google_symptoms/delphi_google_symptoms/run.py index c404ae6ac..591181e13 100644 --- a/google_symptoms/delphi_google_symptoms/run.py +++ b/google_symptoms/delphi_google_symptoms/run.py @@ -68,20 +68,20 @@ def run_module(params, logger=None): export_end_date, num_export_days, custom_run_flag, - logger + logger, ) - for geo_level, mapped_res in GEO_RESOLUTIONS.items(): + for geo_res, mapped_res in GEO_RESOLUTIONS.items(): df_pull = dfs[mapped_res] if len(df_pull) == 0: - logger.info("Skipping processing; No data available for geo level", geo_level=geo_level) + logger.info("Skipping processing; No data available for geo", geo_type=geo_res) continue - if geo_level == "state": + if geo_res == "state": df_pull = dfs["state"] - elif geo_level in ["hhs", "nation"]: - df_pull = geo_map(dfs[mapped_res], geo_level) + elif geo_res in ["hhs", "nation"]: + df_pull = geo_map(dfs[mapped_res], geo_res) else: - df_pull = geo_map(dfs[mapped_res], geo_level) + df_pull = geo_map(dfs[mapped_res], geo_res) for metric, smoother in product(COMBINED_METRIC, SMOOTHERS): sensor_name = "_".join([smoother, "search"]) From 78b892e4cf346ebb46399a1845b99029c70d94d3 Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Thu, 7 Nov 2024 16:34:09 -0500 Subject: [PATCH 4/6] add param to test --- google_symptoms/tests/test_pull.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/google_symptoms/tests/test_pull.py b/google_symptoms/tests/test_pull.py index 4367995b8..c21be3f48 100644 --- a/google_symptoms/tests/test_pull.py +++ b/google_symptoms/tests/test_pull.py @@ -12,6 +12,7 @@ pull_gs_data, preprocess, format_dates_for_query, pull_gs_data_one_geolevel) from delphi_google_symptoms.constants import METRICS, COMBINED_METRIC from conftest import TEST_DIR +from delphi_utils import get_structured_logger good_input = { "state": f"{TEST_DIR}/test_data/small_states_daily.csv", @@ -30,6 +31,7 @@ class TestPullGoogleSymptoms: + logger = get_structured_logger() @freeze_time("2021-01-05") @mock.patch("pandas_gbq.read_gbq") @mock.patch("delphi_google_symptoms.pull.initialize_credentials") @@ -49,7 +51,9 @@ def test_good_file(self, mock_credentials, mock_read_gbq): end_date = datetime.combine(date.today(), datetime.min.time()) dfs = pull_gs_data("", datetime.strptime( - "20201230", "%Y%m%d"), datetime.combine(date.today(), datetime.min.time()), 0, False) + "20201230", "%Y%m%d"), + datetime.combine(date.today(), datetime.min.time()), + 0, False, self.logger) for level in ["county", "state"]: df = dfs[level] @@ -119,7 +123,7 @@ def test_format_dates_for_query(self): def test_pull_one_gs_no_dates(self, mock_read_gbq): mock_read_gbq.return_value = pd.DataFrame() - output = pull_gs_data_one_geolevel("state", ["", ""]) + output = pull_gs_data_one_geolevel("state", ["", ""], self.logger) expected = pd.DataFrame(columns=new_keep_cols) assert_frame_equal(output, expected, check_dtype = False) @@ -133,7 +137,7 @@ def test_pull_one_gs_retry_success(self): with mock.patch("pandas_gbq.read_gbq") as mock_read_gbq: mock_read_gbq.side_effect = [badRequestException, pd.DataFrame()] - output = pull_gs_data_one_geolevel("state", ["", ""]) + output = pull_gs_data_one_geolevel("state", ["", ""], self.logger) expected = pd.DataFrame(columns=new_keep_cols) assert_frame_equal(output, expected, check_dtype = False) assert mock_read_gbq.call_count == 2 @@ -147,7 +151,7 @@ def test_pull_one_gs_retry_too_many(self): with mock.patch("pandas_gbq.read_gbq") as mock_read_gbq: with pytest.raises(BadRequest): mock_read_gbq.side_effect = [badRequestException, badRequestException, pd.DataFrame()] - pull_gs_data_one_geolevel("state", ["", ""]) + pull_gs_data_one_geolevel("state", ["", ""], self.logger) def test_pull_one_gs_retry_bad(self): @@ -156,7 +160,7 @@ def test_pull_one_gs_retry_bad(self): with mock.patch("pandas_gbq.read_gbq") as mock_read_gbq: with pytest.raises(BadRequest): mock_read_gbq.side_effect = [badRequestException,pd.DataFrame()] - pull_gs_data_one_geolevel("state", ["", ""]) + pull_gs_data_one_geolevel("state", ["", ""], self.logger) def test_preprocess_no_data(self): output = preprocess(pd.DataFrame(columns=keep_cols), "state") From 2293d50f41ac1e3d6c84dd189cdde9749e8f0d1d Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Thu, 7 Nov 2024 16:59:07 -0500 Subject: [PATCH 5/6] fix test --- google_symptoms/tests/test_patch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google_symptoms/tests/test_patch.py b/google_symptoms/tests/test_patch.py index 4eb782860..8caef9247 100644 --- a/google_symptoms/tests/test_patch.py +++ b/google_symptoms/tests/test_patch.py @@ -54,7 +54,7 @@ def mocked_patch(self, params_): mock_patch("delphi_google_symptoms.pull.pandas_gbq.read_gbq") as mock_read_gbq, \ mock_patch("delphi_google_symptoms.pull.initialize_credentials", return_value=None), \ mock_patch("delphi_google_symptoms.date_utils.covidcast.metadata", return_value=covidcast_metadata), \ - mock_patch("delphi_google_symptoms.run.GEO_RESOLUTIONS", new=["state"]): + mock_patch("delphi_google_symptoms.run.GEO_RESOLUTIONS", new={"state": "state"}): def side_effect(*args, **kwargs): if "symptom_search_sub_region_1_daily" in args[0]: df = state_data_gap From a797b6ab99fed1937d5872f7b1bb36cfa82f9a9c Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Thu, 7 Nov 2024 19:04:56 -0500 Subject: [PATCH 6/6] consistent location for custom run flag --- google_symptoms/delphi_google_symptoms/patch.py | 2 +- google_symptoms/delphi_google_symptoms/run.py | 2 +- google_symptoms/params.json.template | 1 + 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/google_symptoms/delphi_google_symptoms/patch.py b/google_symptoms/delphi_google_symptoms/patch.py index 85df89394..f6782f2e8 100755 --- a/google_symptoms/delphi_google_symptoms/patch.py +++ b/google_symptoms/delphi_google_symptoms/patch.py @@ -78,7 +78,7 @@ def patch(params): makedirs(f"{current_issue_dir}", exist_ok=True) params["common"]["export_dir"] = f"""{current_issue_dir}""" - params["indicator"]["custom_run"] = True + params["common"]["custom_run"] = True date_settings = patch_dates[issue_date] diff --git a/google_symptoms/delphi_google_symptoms/run.py b/google_symptoms/delphi_google_symptoms/run.py index 591181e13..00e9d10d8 100644 --- a/google_symptoms/delphi_google_symptoms/run.py +++ b/google_symptoms/delphi_google_symptoms/run.py @@ -58,7 +58,7 @@ def run_module(params, logger=None): num_export_days = generate_num_export_days(params, logger) # safety check for patch parameters exists in file, but not running custom runs/patches custom_run_flag = ( - False if not params["indicator"].get("custom_run", False) else params["indicator"].get("custom_run", False) + False if not params["common"].get("custom_run", False) else params["indicator"].get("custom_run", False) ) # Pull GS data diff --git a/google_symptoms/params.json.template b/google_symptoms/params.json.template index daa5666f4..537d6e273 100644 --- a/google_symptoms/params.json.template +++ b/google_symptoms/params.json.template @@ -2,6 +2,7 @@ "common": { "export_dir": "./receiving", "log_exceptions": false, + "custom_run": false, "log_filename": "./google-symptoms.log" }, "indicator": {