diff --git a/nhsn/delphi_nhsn/constants.py b/nhsn/delphi_nhsn/constants.py index e6e6e4359..d51241b4f 100644 --- a/nhsn/delphi_nhsn/constants.py +++ b/nhsn/delphi_nhsn/constants.py @@ -2,6 +2,9 @@ GEOS = ["state", "nation", "hhs"] +MAIN_DATASET_ID = "ua7e-t2fy" +PRELIM_DATASET_ID = "mpgq-jmmr" + # column name from socrata TOTAL_ADMISSION_COVID_API = "totalconfc19newadm" TOTAL_ADMISSION_FLU_API = "totalconfflunewadm" diff --git a/nhsn/delphi_nhsn/patch.py b/nhsn/delphi_nhsn/patch.py new file mode 100644 index 000000000..31fae3070 --- /dev/null +++ b/nhsn/delphi_nhsn/patch.py @@ -0,0 +1,93 @@ +""" +This module is used for patching data in the delphi_nhsn package. + +To use this module, you need to specify the range of issue dates in params.json, like so: + +{ + "common": { + ... + }, + "validation": { + ... + }, + "patch": { + "patch_dir": "/Users/minhkhuele/Desktop/delphi/covidcast-indicators/nhsn/patch" + } +} + +It will generate data for the range of issue dates corresponding to source data files available in "backup_dir" +specified under "common", and store them in batch issue format under "patch_dir": +[name-of-patch]/issue_[issue-date]/nhsn/actual_data_file.csv +""" + +from datetime import datetime +from os import makedirs +from pathlib import Path +from typing import List + +from delphi_utils import get_structured_logger, read_params +from epiweeks import Week + +from .run import run_module + + +def filter_source_files(source_files: List[Path]): + """ + Filter patch files such that each element in the list is an unique epiweek with the latest issue date. + + Parameters + ---------- + source_files + + Returns + ------- + list of issue dates + + """ + epiweek_dict = dict() + + for file in source_files: + if "prelim" not in file.stem: + current_issue_date = datetime.strptime(file.name.split(".")[0], "%Y%m%d") + epiweek = Week.fromdate(current_issue_date) + epiweek_dict[epiweek] = file + + filtered_patch_list = list(epiweek_dict.values()) + return filtered_patch_list + + +def patch(params): + """ + Run the doctor visits indicator for a range of issue dates. + + The range of issue dates is specified in params.json using the following keys: + - "patch": Only used for patching data + - "patch_dir": str, directory to write all issues output + """ + logger = get_structured_logger("delphi_nhsn.patch", filename=params["common"]["log_filename"]) + + source_files = sorted(Path(params["common"]["backup_dir"]).glob("*.csv.gz")) + makedirs(params["patch"]["patch_dir"], exist_ok=True) + + logger.info( + "Starting patching", + patch_directory=params["patch"]["patch_dir"], + start_issue=source_files[0].name.split(".")[0], + end_issue=source_files[-1].name.split(".")[0], + ) + + patch_list = filter_source_files(source_files) + for file in patch_list: + issue_date = datetime.strptime(file.name.split(".")[0], "%Y%m%d") + current_issue_ew = Week.fromdate(issue_date) + logger.info("Running issue", issue_date=issue_date.strftime("%Y-%m-%d")) + params["patch"]["issue_date"] = issue_date.strftime("%Y%m%d") + current_issue_dir = f"{params['patch']['patch_dir']}/issue_{current_issue_ew}/nhsn" + makedirs(current_issue_dir, exist_ok=True) + params["common"]["export_dir"] = current_issue_dir + params["common"]["custom_run"] = True + run_module(params, logger) + + +if __name__ == "__main__": + patch(read_params()) diff --git a/nhsn/delphi_nhsn/pull.py b/nhsn/delphi_nhsn/pull.py index 2e1114142..7377ef958 100644 --- a/nhsn/delphi_nhsn/pull.py +++ b/nhsn/delphi_nhsn/pull.py @@ -1,13 +1,14 @@ # -*- coding: utf-8 -*- """Functions for pulling NSSP ER data.""" import logging +from pathlib import Path from typing import Optional import pandas as pd from delphi_utils import create_backup_csv from sodapy import Socrata -from .constants import PRELIM_SIGNALS_MAP, PRELIM_TYPE_DICT, SIGNALS_MAP, TYPE_DICT +from .constants import MAIN_DATASET_ID, PRELIM_DATASET_ID, PRELIM_SIGNALS_MAP, PRELIM_TYPE_DICT, SIGNALS_MAP, TYPE_DICT def pull_data(socrata_token: str, dataset_id: str): @@ -27,7 +28,42 @@ def pull_data(socrata_token: str, dataset_id: str): return df -def pull_nhsn_data(socrata_token: str, backup_dir: str, custom_run: bool, logger: Optional[logging.Logger] = None): +def pull_data_from_file(filepath: str, issue_date: str, logger, prelim_flag=False) -> pd.DataFrame: + """ + Pull data from source file. + + The source file is generated from delphi_utils.create_backup_csv + Parameters + ---------- + filepath: full path where the source file is located + issue_date: date when the file was pulled / generated + logger + prelim_flag: boolean to indicate which dataset to grab + + Returns + ------- + pd.DataFrame + Dataframe as described above. + """ + df = pd.DataFrame() + if issue_date: + issue_date = issue_date.replace("-", "") + filename = f"{issue_date}_prelim.csv.gz" if prelim_flag else f"{issue_date}.csv.gz" + backup_file = Path(filepath, filename) + + if backup_file.exists(): + df = pd.read_csv(backup_file, compression="gzip") + logger.info("Pulling data from file", file=filename, num_rows=len(df)) + return df + + +def pull_nhsn_data( + socrata_token: str, + backup_dir: str, + custom_run: bool, + issue_date: Optional[str], + logger: Optional[logging.Logger] = None, +): """Pull the latest NHSN hospital admission data, and conforms it into a dataset. The output dataset has: @@ -52,7 +88,11 @@ def pull_nhsn_data(socrata_token: str, backup_dir: str, custom_run: bool, logger Dataframe as described above. """ # Pull data from Socrata API - df = pull_data(socrata_token, dataset_id="ua7e-t2fy") + df = ( + pull_data(socrata_token, dataset_id=MAIN_DATASET_ID) + if not custom_run + else pull_data_from_file(backup_dir, issue_date, logger, prelim_flag=False) + ) keep_columns = list(TYPE_DICT.keys()) @@ -75,7 +115,11 @@ def pull_nhsn_data(socrata_token: str, backup_dir: str, custom_run: bool, logger def pull_preliminary_nhsn_data( - socrata_token: str, backup_dir: str, custom_run: bool, logger: Optional[logging.Logger] = None + socrata_token: str, + backup_dir: str, + custom_run: bool, + issue_date: Optional[str], + logger: Optional[logging.Logger] = None, ): """Pull the latest preliminary NHSN hospital admission data, and conforms it into a dataset. @@ -100,8 +144,11 @@ def pull_preliminary_nhsn_data( pd.DataFrame Dataframe as described above. """ - # Pull data from Socrata API - df = pull_data(socrata_token, dataset_id="mpgq-jmmr") + df = ( + pull_data(socrata_token, dataset_id=PRELIM_DATASET_ID) + if not custom_run + else pull_data_from_file(backup_dir, issue_date, logger, prelim_flag=True) + ) keep_columns = list(PRELIM_TYPE_DICT.keys()) diff --git a/nhsn/delphi_nhsn/run.py b/nhsn/delphi_nhsn/run.py index 80f7cab47..15f5559c5 100644 --- a/nhsn/delphi_nhsn/run.py +++ b/nhsn/delphi_nhsn/run.py @@ -25,7 +25,7 @@ from .pull import pull_nhsn_data, pull_preliminary_nhsn_data -def run_module(params): +def run_module(params, logger=None): """ Run the indicator. @@ -35,14 +35,16 @@ def run_module(params): Nested dictionary of parameters. """ start_time = time.time() - logger = get_structured_logger( - __name__, - filename=params["common"].get("log_filename"), - log_exceptions=params["common"].get("log_exceptions", True), - ) + if not logger: + logger = get_structured_logger( + __name__, + filename=params["common"].get("log_filename"), + log_exceptions=params["common"].get("log_exceptions", True), + ) export_dir = params["common"]["export_dir"] backup_dir = params["common"]["backup_dir"] custom_run = params["common"].get("custom_run", False) + issue_date = params.get("patch", dict()).get("issue_date", None) socrata_token = params["indicator"]["socrata_token"] export_start_date = params["indicator"]["export_start_date"] run_stats = [] @@ -51,12 +53,16 @@ def run_module(params): export_start_date = date.today() - timedelta(days=date.today().weekday() + 2) export_start_date = export_start_date.strftime("%Y-%m-%d") - nhsn_df = pull_nhsn_data(socrata_token, backup_dir, custom_run=custom_run, logger=logger) - preliminary_nhsn_df = pull_preliminary_nhsn_data(socrata_token, backup_dir, custom_run=custom_run, logger=logger) + nhsn_df = pull_nhsn_data(socrata_token, backup_dir, custom_run=custom_run, issue_date=issue_date, logger=logger) + preliminary_nhsn_df = pull_preliminary_nhsn_data( + socrata_token, backup_dir, custom_run=custom_run, issue_date=issue_date, logger=logger + ) geo_mapper = GeoMapper() signal_df_dict = {signal: nhsn_df for signal in SIGNALS_MAP} - signal_df_dict.update({signal: preliminary_nhsn_df for signal in PRELIM_SIGNALS_MAP}) + # some of the source backups do not include for preliminary data TODO remove after first patch + if not preliminary_nhsn_df.empty: + signal_df_dict.update({signal: preliminary_nhsn_df for signal in PRELIM_SIGNALS_MAP}) for signal, df_pull in signal_df_dict.items(): for geo in GEOS: diff --git a/nhsn/tests/conftest.py b/nhsn/tests/conftest.py index 525d8ae7e..b89946a02 100644 --- a/nhsn/tests/conftest.py +++ b/nhsn/tests/conftest.py @@ -16,10 +16,10 @@ # queries the nhsn data with timestamp (2021-08-19, 2021-10-19) with CO and USA data -with open("test_data/page.json", "r") as f: +with open(f"{TEST_DIR}/test_data/page.json", "r") as f: TEST_DATA = json.load(f) -with open("test_data/prelim_page.json", "r") as f: +with open(f"{TEST_DIR}/test_data/prelim_page.json", "r") as f: PRELIM_TEST_DATA = json.load(f) @pytest.fixture(scope="session") @@ -50,11 +50,12 @@ def params(): @pytest.fixture def params_w_patch(params): params_copy = copy.deepcopy(params) + params_copy["common"]["custom_run"] = True params_copy["patch"] = { - "start_issue": "2024-06-27", - "end_issue": "2024-06-29", - "patch_dir": "./patch_dir" + "patch_dir": f"{TEST_DIR}/patch_dir", + "issue_date": "2024-12-12", } + return params_copy @pytest.fixture(scope="function") diff --git a/nhsn/tests/test_data/20241212.csv.gz b/nhsn/tests/test_data/20241212.csv.gz new file mode 100644 index 000000000..61c85a5ba Binary files /dev/null and b/nhsn/tests/test_data/20241212.csv.gz differ diff --git a/nhsn/tests/test_data/20241212_prelim.csv.gz b/nhsn/tests/test_data/20241212_prelim.csv.gz new file mode 100644 index 000000000..9ef690301 Binary files /dev/null and b/nhsn/tests/test_data/20241212_prelim.csv.gz differ diff --git a/nhsn/tests/test_patch.py b/nhsn/tests/test_patch.py new file mode 100644 index 000000000..639d5d262 --- /dev/null +++ b/nhsn/tests/test_patch.py @@ -0,0 +1,130 @@ +import glob +import os +from collections import defaultdict +from pathlib import Path +import shutil +from unittest.mock import patch as mock_patch + +import pandas as pd +from datetime import datetime, timedelta + +from epiweeks import Week + +from delphi_nhsn.patch import filter_source_files, patch +from delphi_nhsn.constants import TOTAL_ADMISSION_COVID_API, TOTAL_ADMISSION_FLU_API +from conftest import TEST_DATA, PRELIM_TEST_DATA, TEST_DIR + +class TestPatch: + + def generate_date_list(self, start_date, end_date): + # Generate a list of dates + date_list = [] + current_date = start_date + + while current_date <= end_date: + date_list.append(current_date.strftime('%Y%m%d')) + current_date += timedelta(days=1) + return date_list + + def generate_dummy_file_names(self): + start_date = datetime(2024, 8, 1) + end_date = datetime(2024, 8, 4) + date_list_part1 = self.generate_date_list(start_date, end_date) + + start_date = datetime(2024, 9, 6) + end_date = datetime(2024, 9, 10) + date_list_part2 = self.generate_date_list(start_date, end_date) + + start_date = datetime(2024, 10, 6) + end_date = datetime(2024, 10, 15) + date_list_part3 = self.generate_date_list(start_date, end_date) + + start_date = datetime(2024, 11, 16) + end_date = datetime(2024, 11, 22) + date_list_part4 = self.generate_date_list(start_date, end_date) + + date_list = date_list_part1 + date_list_part2 + date_list_part3 + date_list_part4 + + file_list = [] + for date in date_list: + custom_filename = Path(f"/tmp/{date}.csv.gz") + file_list.append(custom_filename) + return file_list + + def test_filter_source_files(self): + filelist = self.generate_dummy_file_names() + epiweek_dict = defaultdict(list) + for file in filelist: + issue_dt = datetime.strptime(file.name.split(".")[0], "%Y%m%d") + issue_epiweek = Week.fromdate(issue_dt) + epiweek_dict[issue_epiweek].append(issue_dt) + patch_issue_list = filter_source_files(filelist) + for file in patch_issue_list: + issue_dt = datetime.strptime(file.name.split(".")[0], "%Y%m%d") + issue_epiweek = Week.fromdate(issue_dt) + assert max(epiweek_dict[issue_epiweek]) == issue_dt + + def generate_test_source_files(self): + start_date = datetime(2024, 8, 1) + end_date = datetime(2024, 8, 4) + date_list_part1 = self.generate_date_list(start_date, end_date) + + start_date = datetime(2024, 9, 6) + end_date = datetime(2024, 9, 10) + date_list_part2 = self.generate_date_list(start_date, end_date) + + start_date = datetime(2024, 11, 16) + end_date = datetime(2024, 11, 22) + date_list_part4 = self.generate_date_list(start_date, end_date) + + date_list = date_list_part1 + date_list_part2 + date_list_part4 + + file_list = [] + prelim_file_list = [] + for date in date_list: + custom_filename = f"{TEST_DIR}/backups/{date}.csv.gz" + custom_filename_prelim = f"{TEST_DIR}/backups/{date}_prelim.csv.gz" + test_data = pd.DataFrame(TEST_DATA) + test_data[TOTAL_ADMISSION_COVID_API] = int(date) + test_data[TOTAL_ADMISSION_FLU_API] = int(date) + test_prelim_data = pd.DataFrame(PRELIM_TEST_DATA) + test_prelim_data[TOTAL_ADMISSION_COVID_API] = int(date) + test_prelim_data[TOTAL_ADMISSION_FLU_API] = int(date) + + test_data = test_data.head(2) + test_data.to_csv( + custom_filename, index=False, na_rep="NA", compression="gzip" + ) + test_prelim_data = test_data.head(2) + test_prelim_data.to_csv( + custom_filename_prelim, index=False, na_rep="NA", compression="gzip" + ) + file_list.append(custom_filename) + prelim_file_list.append(custom_filename_prelim) + return file_list, prelim_file_list + + def test_patch(self, params_w_patch): + with mock_patch("delphi_nhsn.patch.read_params", return_value=params_w_patch): + file_list, prelim_file_list = self.generate_test_source_files() + patch(params_w_patch) + + for issue_path in Path(f"{TEST_DIR}/patch_dir").glob("*"): + issue_dt_str = issue_path.name.replace("issue_", "") + for file in Path(issue_path / "nhsn").iterdir(): + df = pd.read_csv(file) + val = Week.fromdate(datetime.strptime(str(int(df["val"][0])), "%Y%m%d")) + assert issue_dt_str == str(val) + + # clean up + shutil.rmtree(f"{TEST_DIR}/patch_dir") + + for file in file_list: + os.remove(file) + + for file in prelim_file_list: + os.remove(file) + + + + + diff --git a/nhsn/tests/test_pull.py b/nhsn/tests/test_pull.py index c09c838d7..daa3acd92 100644 --- a/nhsn/tests/test_pull.py +++ b/nhsn/tests/test_pull.py @@ -8,13 +8,12 @@ from delphi_nhsn.pull import ( pull_nhsn_data, pull_data, - pull_preliminary_nhsn_data + pull_preliminary_nhsn_data, pull_data_from_file ) from delphi_nhsn.constants import SIGNALS_MAP, PRELIM_SIGNALS_MAP from delphi_utils import get_structured_logger -from conftest import TEST_DATA, PRELIM_TEST_DATA - +from conftest import TEST_DATA, PRELIM_TEST_DATA, TEST_DIR DATASETS = [{"id":"ua7e-t2fy", "test_data": TEST_DATA}, @@ -42,16 +41,46 @@ def test_socrata_call(self, mock_socrata, dataset, params): # Check that get method was called with correct arguments mock_client.get.assert_any_call(dataset["id"], limit=50000, offset=0) + def test_pull_from_file(self, caplog, params_w_patch): + backup_dir = f"{TEST_DIR}/test_data" + issue_date = params_w_patch["patch"]["issue_date"] + logger = get_structured_logger() + + # Load test data + expected_data = pd.DataFrame(TEST_DATA) + + df = pull_data_from_file(backup_dir, issue_date, logger=logger) + df = df.astype('str') + expected_data = expected_data.astype('str') + assert "Pulling data from file" in caplog.text + + pd.testing.assert_frame_equal(expected_data, df) + + def test_pull_from_file_prelim(self, caplog, params_w_patch): + backup_dir = f"{TEST_DIR}/test_data" + issue_date = params_w_patch["patch"]["issue_date"] + logger = get_structured_logger() + + # Load test data + expected_data = pd.DataFrame(PRELIM_TEST_DATA) + + df = pull_data_from_file(backup_dir, issue_date, logger=logger, prelim_flag=True) + df = df.astype('str') + expected_data = expected_data.astype('str') + + assert "Pulling data from file" in caplog.text + pd.testing.assert_frame_equal(expected_data, df) + def test_pull_nhsn_data_output(self, caplog, params): with patch('sodapy.Socrata.get') as mock_get: mock_get.side_effect = [TEST_DATA, []] backup_dir = params["common"]["backup_dir"] test_token = params["indicator"]["socrata_token"] - custom_run = True + custom_run = params["common"]["custom_run"] logger = get_structured_logger() - result = pull_nhsn_data(test_token, backup_dir, custom_run, logger) + result = pull_nhsn_data(test_token, backup_dir, custom_run, issue_date=None, logger=logger) # Check result assert result["timestamp"].notnull().all(), "timestamp has rogue NaN" @@ -74,7 +103,7 @@ def test_pull_nhsn_data_backup(self, caplog, params): logger = get_structured_logger() # Call function with test token - pull_nhsn_data(test_token, backup_dir, custom_run, logger) + pull_nhsn_data(test_token, backup_dir, custom_run, issue_date=None, logger=logger) # Check logger used: assert "Backup file created" in caplog.text @@ -99,11 +128,11 @@ def test_pull_prelim_nhsn_data_output(self, caplog, params): mock_get.side_effect = [PRELIM_TEST_DATA, []] backup_dir = params["common"]["backup_dir"] test_token = params["indicator"]["socrata_token"] - custom_run = True + custom_run = params["common"]["custom_run"] logger = get_structured_logger() - result = pull_preliminary_nhsn_data(test_token, backup_dir, custom_run, logger) + result = pull_preliminary_nhsn_data(test_token, backup_dir, custom_run, issue_date=None, logger=logger) # Check result assert result["timestamp"].notnull().all(), "timestamp has rogue NaN" @@ -126,7 +155,7 @@ def test_pull_prelim_nhsn_data_backup(self, caplog, params): logger = get_structured_logger() # Call function with test token - pull_preliminary_nhsn_data(test_token, backup_dir, custom_run, logger) + pull_preliminary_nhsn_data(test_token, backup_dir, custom_run, issue_date=None, logger=logger) # Check logger used: assert "Backup file created" in caplog.text diff --git a/nhsn/tests/test_run.py b/nhsn/tests/test_run.py index cfc47e50f..c96ec7953 100644 --- a/nhsn/tests/test_run.py +++ b/nhsn/tests/test_run.py @@ -1,3 +1,4 @@ +import glob import os from pathlib import Path @@ -47,3 +48,8 @@ def test_output_files_exist(self, params, run_as_module): for file in Path(export_dir).glob("*.csv"): os.remove(file) + + today = pd.Timestamp.today().strftime("%Y%m%d") + backup_dir = glob.glob(f"{Path(params['common']['backup_dir'])}/{today}*") + for file in backup_dir: + os.remove(file)