Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions google_symptoms/delphi_google_symptoms/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@
METRICS = METRICS + SYMPTOM_SETS[combmetric]

SMOOTHERS = ["raw", "smoothed"]
GEO_RESOLUTIONS = [
"state",
"county",
"msa",
"hrr",
"hhs",
"nation"
]

GEO_RESOLUTIONS = {
"state": "state",
"county": "county",
"msa": "county",
"hrr": "county",
"hhs": "state",
"nation": "state",
}

SMOOTHERS_MAP = {
"raw": (Smoother("identity", impute_method=None),
Expand Down
2 changes: 1 addition & 1 deletion google_symptoms/delphi_google_symptoms/date_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def generate_num_export_days(params: Dict, logger) -> [int]:
expected_date_diff += global_max_expected_lag

if latest_date_diff > expected_date_diff:
logger.info("Missing date", date=to_datetime(min(gs_metadata.max_time)).date())
logger.info("Lag is more than expected", expected_lag=expected_date_diff, lag=latest_date_diff)

num_export_days = expected_date_diff

Expand Down
2 changes: 1 addition & 1 deletion google_symptoms/delphi_google_symptoms/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def patch(params):
makedirs(f"{current_issue_dir}", exist_ok=True)

params["common"]["export_dir"] = f"""{current_issue_dir}"""
params["indicator"]["custom_run"] = True
params["common"]["custom_run"] = True

date_settings = patch_dates[issue_date]

Expand Down
12 changes: 7 additions & 5 deletions google_symptoms/delphi_google_symptoms/pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ def produce_query(level, date_range):
return query


def pull_gs_data_one_geolevel(level, date_range):
def pull_gs_data_one_geolevel(level, date_range, logger):
"""Pull latest data for a single geo level.

Fetch data and transform it into the appropriate format, as described in
Expand Down Expand Up @@ -209,6 +209,9 @@ def pull_gs_data_one_geolevel(level, date_range):

if len(df) == 0:
df = pd.DataFrame(columns=["open_covid_region_code", "date"] + list(colname_map.keys()))
logger.info(
"No data available for date range", geo_level=level, start_date=date_range[0], end_date=date_range[1]
)

df = preprocess(df, level)
return df
Expand All @@ -232,7 +235,7 @@ def initialize_credentials(credentials):
pandas_gbq.context.project = credentials.project_id


def pull_gs_data(credentials, export_start_date, export_end_date, num_export_days, custom_run_flag):
def pull_gs_data(credentials, export_start_date, export_end_date, num_export_days, custom_run_flag, logger):
"""Pull latest dataset for each geo level and combine.

PS: No information for PR
Expand Down Expand Up @@ -264,10 +267,9 @@ def pull_gs_data(credentials, export_start_date, export_end_date, num_export_day
dfs = {}

# For state level data
dfs["state"] = pull_gs_data_one_geolevel("state", retrieve_dates)
dfs["state"] = pull_gs_data_one_geolevel("state", retrieve_dates, logger)
# For county level data
dfs["county"] = pull_gs_data_one_geolevel("county", retrieve_dates)

dfs["county"] = pull_gs_data_one_geolevel("county", retrieve_dates, logger)

# Add District of Columbia as county
try:
Expand Down
16 changes: 10 additions & 6 deletions google_symptoms/delphi_google_symptoms/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def run_module(params, logger=None):
num_export_days = generate_num_export_days(params, logger)
# safety check for patch parameters exists in file, but not running custom runs/patches
custom_run_flag = (
False if not params["indicator"].get("custom_run", False) else params["indicator"].get("custom_run", False)
False if not params["common"].get("custom_run", False) else params["indicator"].get("custom_run", False)
)

# Pull GS data
Expand All @@ -68,17 +68,21 @@ def run_module(params, logger=None):
export_end_date,
num_export_days,
custom_run_flag,
logger,
)
for geo_res in GEO_RESOLUTIONS:

for geo_res, mapped_res in GEO_RESOLUTIONS.items():
df_pull = dfs[mapped_res]
if len(df_pull) == 0:
logger.info("Skipping processing; No data available for geo", geo_type=geo_res)
continue
if geo_res == "state":
df_pull = dfs["state"]
elif geo_res in ["hhs", "nation"]:
df_pull = geo_map(dfs["state"], geo_res)
df_pull = geo_map(dfs[mapped_res], geo_res)
else:
df_pull = geo_map(dfs["county"], geo_res)
df_pull = geo_map(dfs[mapped_res], geo_res)

if len(df_pull) == 0:
continue
for metric, smoother in product(COMBINED_METRIC, SMOOTHERS):
sensor_name = "_".join([smoother, "search"])
logger.info("Generating signal and exporting to CSV", geo_type=geo_res, signal=f"{metric}_{sensor_name}")
Expand Down
1 change: 1 addition & 0 deletions google_symptoms/params.json.template
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"common": {
"export_dir": "./receiving",
"log_exceptions": false,
"custom_run": false,
"log_filename": "./google-symptoms.log"
},
"indicator": {
Expand Down
2 changes: 1 addition & 1 deletion google_symptoms/tests/test_patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def mocked_patch(self, params_):
mock_patch("delphi_google_symptoms.pull.pandas_gbq.read_gbq") as mock_read_gbq, \
mock_patch("delphi_google_symptoms.pull.initialize_credentials", return_value=None), \
mock_patch("delphi_google_symptoms.date_utils.covidcast.metadata", return_value=covidcast_metadata), \
mock_patch("delphi_google_symptoms.run.GEO_RESOLUTIONS", new=["state"]):
mock_patch("delphi_google_symptoms.run.GEO_RESOLUTIONS", new={"state": "state"}):
def side_effect(*args, **kwargs):
if "symptom_search_sub_region_1_daily" in args[0]:
df = state_data_gap
Expand Down
14 changes: 9 additions & 5 deletions google_symptoms/tests/test_pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
pull_gs_data, preprocess, format_dates_for_query, pull_gs_data_one_geolevel)
from delphi_google_symptoms.constants import METRICS, COMBINED_METRIC
from conftest import TEST_DIR
from delphi_utils import get_structured_logger

good_input = {
"state": f"{TEST_DIR}/test_data/small_states_daily.csv",
Expand All @@ -30,6 +31,7 @@


class TestPullGoogleSymptoms:
logger = get_structured_logger()
@freeze_time("2021-01-05")
@mock.patch("pandas_gbq.read_gbq")
@mock.patch("delphi_google_symptoms.pull.initialize_credentials")
Expand All @@ -49,7 +51,9 @@ def test_good_file(self, mock_credentials, mock_read_gbq):
end_date = datetime.combine(date.today(), datetime.min.time())

dfs = pull_gs_data("", datetime.strptime(
"20201230", "%Y%m%d"), datetime.combine(date.today(), datetime.min.time()), 0, False)
"20201230", "%Y%m%d"),
datetime.combine(date.today(), datetime.min.time()),
0, False, self.logger)

for level in ["county", "state"]:
df = dfs[level]
Expand Down Expand Up @@ -119,7 +123,7 @@ def test_format_dates_for_query(self):
def test_pull_one_gs_no_dates(self, mock_read_gbq):
mock_read_gbq.return_value = pd.DataFrame()

output = pull_gs_data_one_geolevel("state", ["", ""])
output = pull_gs_data_one_geolevel("state", ["", ""], self.logger)
expected = pd.DataFrame(columns=new_keep_cols)
assert_frame_equal(output, expected, check_dtype = False)

Expand All @@ -133,7 +137,7 @@ def test_pull_one_gs_retry_success(self):
with mock.patch("pandas_gbq.read_gbq") as mock_read_gbq:
mock_read_gbq.side_effect = [badRequestException, pd.DataFrame()]

output = pull_gs_data_one_geolevel("state", ["", ""])
output = pull_gs_data_one_geolevel("state", ["", ""], self.logger)
expected = pd.DataFrame(columns=new_keep_cols)
assert_frame_equal(output, expected, check_dtype = False)
assert mock_read_gbq.call_count == 2
Expand All @@ -147,7 +151,7 @@ def test_pull_one_gs_retry_too_many(self):
with mock.patch("pandas_gbq.read_gbq") as mock_read_gbq:
with pytest.raises(BadRequest):
mock_read_gbq.side_effect = [badRequestException, badRequestException, pd.DataFrame()]
pull_gs_data_one_geolevel("state", ["", ""])
pull_gs_data_one_geolevel("state", ["", ""], self.logger)


def test_pull_one_gs_retry_bad(self):
Expand All @@ -156,7 +160,7 @@ def test_pull_one_gs_retry_bad(self):
with mock.patch("pandas_gbq.read_gbq") as mock_read_gbq:
with pytest.raises(BadRequest):
mock_read_gbq.side_effect = [badRequestException,pd.DataFrame()]
pull_gs_data_one_geolevel("state", ["", ""])
pull_gs_data_one_geolevel("state", ["", ""], self.logger)

def test_preprocess_no_data(self):
output = preprocess(pd.DataFrame(columns=keep_cols), "state")
Expand Down
Loading