diff --git a/google_symptoms/delphi_google_symptoms/constants.py b/google_symptoms/delphi_google_symptoms/constants.py index 795ac3df7..b4d51af26 100644 --- a/google_symptoms/delphi_google_symptoms/constants.py +++ b/google_symptoms/delphi_google_symptoms/constants.py @@ -30,14 +30,15 @@ METRICS = METRICS + SYMPTOM_SETS[combmetric] SMOOTHERS = ["raw", "smoothed"] -GEO_RESOLUTIONS = [ - "state", - "county", - "msa", - "hrr", - "hhs", - "nation" -] + +GEO_RESOLUTIONS = { + "state": "state", + "county": "county", + "msa": "county", + "hrr": "county", + "hhs": "state", + "nation": "state", +} SMOOTHERS_MAP = { "raw": (Smoother("identity", impute_method=None), diff --git a/google_symptoms/delphi_google_symptoms/date_utils.py b/google_symptoms/delphi_google_symptoms/date_utils.py index 2ad6244e9..1d9de2cd4 100644 --- a/google_symptoms/delphi_google_symptoms/date_utils.py +++ b/google_symptoms/delphi_google_symptoms/date_utils.py @@ -98,7 +98,7 @@ def generate_num_export_days(params: Dict, logger) -> [int]: expected_date_diff += global_max_expected_lag if latest_date_diff > expected_date_diff: - logger.info("Missing date", date=to_datetime(min(gs_metadata.max_time)).date()) + logger.info("Lag is more than expected", expected_lag=expected_date_diff, lag=latest_date_diff) num_export_days = expected_date_diff diff --git a/google_symptoms/delphi_google_symptoms/patch.py b/google_symptoms/delphi_google_symptoms/patch.py index 85df89394..f6782f2e8 100755 --- a/google_symptoms/delphi_google_symptoms/patch.py +++ b/google_symptoms/delphi_google_symptoms/patch.py @@ -78,7 +78,7 @@ def patch(params): makedirs(f"{current_issue_dir}", exist_ok=True) params["common"]["export_dir"] = f"""{current_issue_dir}""" - params["indicator"]["custom_run"] = True + params["common"]["custom_run"] = True date_settings = patch_dates[issue_date] diff --git a/google_symptoms/delphi_google_symptoms/pull.py b/google_symptoms/delphi_google_symptoms/pull.py index a8c4cdfde..22ad29723 100644 --- a/google_symptoms/delphi_google_symptoms/pull.py +++ b/google_symptoms/delphi_google_symptoms/pull.py @@ -158,7 +158,7 @@ def produce_query(level, date_range): return query -def pull_gs_data_one_geolevel(level, date_range): +def pull_gs_data_one_geolevel(level, date_range, logger): """Pull latest data for a single geo level. Fetch data and transform it into the appropriate format, as described in @@ -209,6 +209,9 @@ def pull_gs_data_one_geolevel(level, date_range): if len(df) == 0: df = pd.DataFrame(columns=["open_covid_region_code", "date"] + list(colname_map.keys())) + logger.info( + "No data available for date range", geo_level=level, start_date=date_range[0], end_date=date_range[1] + ) df = preprocess(df, level) return df @@ -232,7 +235,7 @@ def initialize_credentials(credentials): pandas_gbq.context.project = credentials.project_id -def pull_gs_data(credentials, export_start_date, export_end_date, num_export_days, custom_run_flag): +def pull_gs_data(credentials, export_start_date, export_end_date, num_export_days, custom_run_flag, logger): """Pull latest dataset for each geo level and combine. PS: No information for PR @@ -264,10 +267,9 @@ def pull_gs_data(credentials, export_start_date, export_end_date, num_export_day dfs = {} # For state level data - dfs["state"] = pull_gs_data_one_geolevel("state", retrieve_dates) + dfs["state"] = pull_gs_data_one_geolevel("state", retrieve_dates, logger) # For county level data - dfs["county"] = pull_gs_data_one_geolevel("county", retrieve_dates) - + dfs["county"] = pull_gs_data_one_geolevel("county", retrieve_dates, logger) # Add District of Columbia as county try: diff --git a/google_symptoms/delphi_google_symptoms/run.py b/google_symptoms/delphi_google_symptoms/run.py index 8ad1d6d10..00e9d10d8 100644 --- a/google_symptoms/delphi_google_symptoms/run.py +++ b/google_symptoms/delphi_google_symptoms/run.py @@ -58,7 +58,7 @@ def run_module(params, logger=None): num_export_days = generate_num_export_days(params, logger) # safety check for patch parameters exists in file, but not running custom runs/patches custom_run_flag = ( - False if not params["indicator"].get("custom_run", False) else params["indicator"].get("custom_run", False) + False if not params["common"].get("custom_run", False) else params["indicator"].get("custom_run", False) ) # Pull GS data @@ -68,17 +68,21 @@ def run_module(params, logger=None): export_end_date, num_export_days, custom_run_flag, + logger, ) - for geo_res in GEO_RESOLUTIONS: + + for geo_res, mapped_res in GEO_RESOLUTIONS.items(): + df_pull = dfs[mapped_res] + if len(df_pull) == 0: + logger.info("Skipping processing; No data available for geo", geo_type=geo_res) + continue if geo_res == "state": df_pull = dfs["state"] elif geo_res in ["hhs", "nation"]: - df_pull = geo_map(dfs["state"], geo_res) + df_pull = geo_map(dfs[mapped_res], geo_res) else: - df_pull = geo_map(dfs["county"], geo_res) + df_pull = geo_map(dfs[mapped_res], geo_res) - if len(df_pull) == 0: - continue for metric, smoother in product(COMBINED_METRIC, SMOOTHERS): sensor_name = "_".join([smoother, "search"]) logger.info("Generating signal and exporting to CSV", geo_type=geo_res, signal=f"{metric}_{sensor_name}") diff --git a/google_symptoms/params.json.template b/google_symptoms/params.json.template index daa5666f4..537d6e273 100644 --- a/google_symptoms/params.json.template +++ b/google_symptoms/params.json.template @@ -2,6 +2,7 @@ "common": { "export_dir": "./receiving", "log_exceptions": false, + "custom_run": false, "log_filename": "./google-symptoms.log" }, "indicator": { diff --git a/google_symptoms/tests/test_patch.py b/google_symptoms/tests/test_patch.py index 4eb782860..8caef9247 100644 --- a/google_symptoms/tests/test_patch.py +++ b/google_symptoms/tests/test_patch.py @@ -54,7 +54,7 @@ def mocked_patch(self, params_): mock_patch("delphi_google_symptoms.pull.pandas_gbq.read_gbq") as mock_read_gbq, \ mock_patch("delphi_google_symptoms.pull.initialize_credentials", return_value=None), \ mock_patch("delphi_google_symptoms.date_utils.covidcast.metadata", return_value=covidcast_metadata), \ - mock_patch("delphi_google_symptoms.run.GEO_RESOLUTIONS", new=["state"]): + mock_patch("delphi_google_symptoms.run.GEO_RESOLUTIONS", new={"state": "state"}): def side_effect(*args, **kwargs): if "symptom_search_sub_region_1_daily" in args[0]: df = state_data_gap diff --git a/google_symptoms/tests/test_pull.py b/google_symptoms/tests/test_pull.py index 4367995b8..c21be3f48 100644 --- a/google_symptoms/tests/test_pull.py +++ b/google_symptoms/tests/test_pull.py @@ -12,6 +12,7 @@ pull_gs_data, preprocess, format_dates_for_query, pull_gs_data_one_geolevel) from delphi_google_symptoms.constants import METRICS, COMBINED_METRIC from conftest import TEST_DIR +from delphi_utils import get_structured_logger good_input = { "state": f"{TEST_DIR}/test_data/small_states_daily.csv", @@ -30,6 +31,7 @@ class TestPullGoogleSymptoms: + logger = get_structured_logger() @freeze_time("2021-01-05") @mock.patch("pandas_gbq.read_gbq") @mock.patch("delphi_google_symptoms.pull.initialize_credentials") @@ -49,7 +51,9 @@ def test_good_file(self, mock_credentials, mock_read_gbq): end_date = datetime.combine(date.today(), datetime.min.time()) dfs = pull_gs_data("", datetime.strptime( - "20201230", "%Y%m%d"), datetime.combine(date.today(), datetime.min.time()), 0, False) + "20201230", "%Y%m%d"), + datetime.combine(date.today(), datetime.min.time()), + 0, False, self.logger) for level in ["county", "state"]: df = dfs[level] @@ -119,7 +123,7 @@ def test_format_dates_for_query(self): def test_pull_one_gs_no_dates(self, mock_read_gbq): mock_read_gbq.return_value = pd.DataFrame() - output = pull_gs_data_one_geolevel("state", ["", ""]) + output = pull_gs_data_one_geolevel("state", ["", ""], self.logger) expected = pd.DataFrame(columns=new_keep_cols) assert_frame_equal(output, expected, check_dtype = False) @@ -133,7 +137,7 @@ def test_pull_one_gs_retry_success(self): with mock.patch("pandas_gbq.read_gbq") as mock_read_gbq: mock_read_gbq.side_effect = [badRequestException, pd.DataFrame()] - output = pull_gs_data_one_geolevel("state", ["", ""]) + output = pull_gs_data_one_geolevel("state", ["", ""], self.logger) expected = pd.DataFrame(columns=new_keep_cols) assert_frame_equal(output, expected, check_dtype = False) assert mock_read_gbq.call_count == 2 @@ -147,7 +151,7 @@ def test_pull_one_gs_retry_too_many(self): with mock.patch("pandas_gbq.read_gbq") as mock_read_gbq: with pytest.raises(BadRequest): mock_read_gbq.side_effect = [badRequestException, badRequestException, pd.DataFrame()] - pull_gs_data_one_geolevel("state", ["", ""]) + pull_gs_data_one_geolevel("state", ["", ""], self.logger) def test_pull_one_gs_retry_bad(self): @@ -156,7 +160,7 @@ def test_pull_one_gs_retry_bad(self): with mock.patch("pandas_gbq.read_gbq") as mock_read_gbq: with pytest.raises(BadRequest): mock_read_gbq.side_effect = [badRequestException,pd.DataFrame()] - pull_gs_data_one_geolevel("state", ["", ""]) + pull_gs_data_one_geolevel("state", ["", ""], self.logger) def test_preprocess_no_data(self): output = preprocess(pd.DataFrame(columns=keep_cols), "state")