Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions nhsn/delphi_nhsn/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

GEOS = ["state", "nation", "hhs"]

MAIN_DATASET_ID = "ua7e-t2fy"
PRELIM_DATASET_ID = "mpgq-jmmr"

# column name from socrata
TOTAL_ADMISSION_COVID_API = "totalconfc19newadm"
TOTAL_ADMISSION_FLU_API = "totalconfflunewadm"
Expand Down
93 changes: 93 additions & 0 deletions nhsn/delphi_nhsn/patch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
"""
This module is used for patching data in the delphi_nhsn package.

To use this module, you need to specify the range of issue dates in params.json, like so:

{
"common": {
...
},
"validation": {
...
},
"patch": {
"patch_dir": "/Users/minhkhuele/Desktop/delphi/covidcast-indicators/nhsn/patch"
}
}

It will generate data for the range of issue dates corresponding to source data files available in "backup_dir"
specified under "common", and store them in batch issue format under "patch_dir":
[name-of-patch]/issue_[issue-date]/nhsn/actual_data_file.csv
"""

from datetime import datetime
from os import makedirs
from pathlib import Path
from typing import List

from delphi_utils import get_structured_logger, read_params
from epiweeks import Week

from .run import run_module


def filter_source_files(source_files: List[Path]):
"""
Filter patch files such that each element in the list is an unique epiweek with the latest issue date.

Parameters
----------
source_files

Returns
-------
list of issue dates

"""
epiweek_dict = dict()

for file in source_files:
if "prelim" not in file.stem:
current_issue_date = datetime.strptime(file.name.split(".")[0], "%Y%m%d")
epiweek = Week.fromdate(current_issue_date)
epiweek_dict[epiweek] = file

filtered_patch_list = list(epiweek_dict.values())
return filtered_patch_list


def patch(params):
"""
Run the doctor visits indicator for a range of issue dates.

The range of issue dates is specified in params.json using the following keys:
- "patch": Only used for patching data
- "patch_dir": str, directory to write all issues output
"""
logger = get_structured_logger("delphi_nhsn.patch", filename=params["common"]["log_filename"])

source_files = sorted(Path(params["common"]["backup_dir"]).glob("*.csv.gz"))
makedirs(params["patch"]["patch_dir"], exist_ok=True)

logger.info(
"Starting patching",
patch_directory=params["patch"]["patch_dir"],
start_issue=source_files[0].name.split(".")[0],
end_issue=source_files[-1].name.split(".")[0],
)

patch_list = filter_source_files(source_files)
for file in patch_list:
issue_date = datetime.strptime(file.name.split(".")[0], "%Y%m%d")
current_issue_ew = Week.fromdate(issue_date)
logger.info("Running issue", issue_date=issue_date.strftime("%Y-%m-%d"))
params["patch"]["issue_date"] = issue_date.strftime("%Y%m%d")
current_issue_dir = f"{params['patch']['patch_dir']}/issue_{current_issue_ew}/nhsn"
makedirs(current_issue_dir, exist_ok=True)
params["common"]["export_dir"] = current_issue_dir
params["common"]["custom_run"] = True
run_module(params, logger)


if __name__ == "__main__":
patch(read_params())
59 changes: 53 additions & 6 deletions nhsn/delphi_nhsn/pull.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
# -*- coding: utf-8 -*-
"""Functions for pulling NSSP ER data."""
import logging
from pathlib import Path
from typing import Optional

import pandas as pd
from delphi_utils import create_backup_csv
from sodapy import Socrata

from .constants import PRELIM_SIGNALS_MAP, PRELIM_TYPE_DICT, SIGNALS_MAP, TYPE_DICT
from .constants import MAIN_DATASET_ID, PRELIM_DATASET_ID, PRELIM_SIGNALS_MAP, PRELIM_TYPE_DICT, SIGNALS_MAP, TYPE_DICT


def pull_data(socrata_token: str, dataset_id: str):
Expand All @@ -27,7 +28,42 @@ def pull_data(socrata_token: str, dataset_id: str):
return df


def pull_nhsn_data(socrata_token: str, backup_dir: str, custom_run: bool, logger: Optional[logging.Logger] = None):
def pull_data_from_file(filepath: str, issue_date: str, logger, prelim_flag=False) -> pd.DataFrame:
"""
Pull data from source file.

The source file is generated from delphi_utils.create_backup_csv
Parameters
----------
filepath: full path where the source file is located
issue_date: date when the file was pulled / generated
logger
prelim_flag: boolean to indicate which dataset to grab

Returns
-------
pd.DataFrame
Dataframe as described above.
"""
df = pd.DataFrame()
if issue_date:
issue_date = issue_date.replace("-", "")
filename = f"{issue_date}_prelim.csv.gz" if prelim_flag else f"{issue_date}.csv.gz"
backup_file = Path(filepath, filename)

if backup_file.exists():
df = pd.read_csv(backup_file, compression="gzip")
logger.info("Pulling data from file", file=filename, num_rows=len(df))
return df


def pull_nhsn_data(
socrata_token: str,
backup_dir: str,
custom_run: bool,
issue_date: Optional[str],
logger: Optional[logging.Logger] = None,
):
"""Pull the latest NHSN hospital admission data, and conforms it into a dataset.

The output dataset has:
Expand All @@ -52,7 +88,11 @@ def pull_nhsn_data(socrata_token: str, backup_dir: str, custom_run: bool, logger
Dataframe as described above.
"""
# Pull data from Socrata API
df = pull_data(socrata_token, dataset_id="ua7e-t2fy")
df = (
pull_data(socrata_token, dataset_id=MAIN_DATASET_ID)
if not custom_run
else pull_data_from_file(backup_dir, issue_date, logger, prelim_flag=False)
)

keep_columns = list(TYPE_DICT.keys())

Expand All @@ -75,7 +115,11 @@ def pull_nhsn_data(socrata_token: str, backup_dir: str, custom_run: bool, logger


def pull_preliminary_nhsn_data(
socrata_token: str, backup_dir: str, custom_run: bool, logger: Optional[logging.Logger] = None
socrata_token: str,
backup_dir: str,
custom_run: bool,
issue_date: Optional[str],
logger: Optional[logging.Logger] = None,
):
"""Pull the latest preliminary NHSN hospital admission data, and conforms it into a dataset.

Expand All @@ -100,8 +144,11 @@ def pull_preliminary_nhsn_data(
pd.DataFrame
Dataframe as described above.
"""
# Pull data from Socrata API
df = pull_data(socrata_token, dataset_id="mpgq-jmmr")
df = (
pull_data(socrata_token, dataset_id=PRELIM_DATASET_ID)
if not custom_run
else pull_data_from_file(backup_dir, issue_date, logger, prelim_flag=True)
)

keep_columns = list(PRELIM_TYPE_DICT.keys())

Expand Down
24 changes: 15 additions & 9 deletions nhsn/delphi_nhsn/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from .pull import pull_nhsn_data, pull_preliminary_nhsn_data


def run_module(params):
def run_module(params, logger=None):
"""
Run the indicator.

Expand All @@ -35,14 +35,16 @@ def run_module(params):
Nested dictionary of parameters.
"""
start_time = time.time()
logger = get_structured_logger(
__name__,
filename=params["common"].get("log_filename"),
log_exceptions=params["common"].get("log_exceptions", True),
)
if not logger:
logger = get_structured_logger(
__name__,
filename=params["common"].get("log_filename"),
log_exceptions=params["common"].get("log_exceptions", True),
)
export_dir = params["common"]["export_dir"]
backup_dir = params["common"]["backup_dir"]
custom_run = params["common"].get("custom_run", False)
issue_date = params.get("patch", dict()).get("issue_date", None)
socrata_token = params["indicator"]["socrata_token"]
export_start_date = params["indicator"]["export_start_date"]
run_stats = []
Expand All @@ -51,12 +53,16 @@ def run_module(params):
export_start_date = date.today() - timedelta(days=date.today().weekday() + 2)
export_start_date = export_start_date.strftime("%Y-%m-%d")

nhsn_df = pull_nhsn_data(socrata_token, backup_dir, custom_run=custom_run, logger=logger)
preliminary_nhsn_df = pull_preliminary_nhsn_data(socrata_token, backup_dir, custom_run=custom_run, logger=logger)
nhsn_df = pull_nhsn_data(socrata_token, backup_dir, custom_run=custom_run, issue_date=issue_date, logger=logger)
preliminary_nhsn_df = pull_preliminary_nhsn_data(
socrata_token, backup_dir, custom_run=custom_run, issue_date=issue_date, logger=logger
)

geo_mapper = GeoMapper()
signal_df_dict = {signal: nhsn_df for signal in SIGNALS_MAP}
signal_df_dict.update({signal: preliminary_nhsn_df for signal in PRELIM_SIGNALS_MAP})
# some of the source backups do not include for preliminary data TODO remove after first patch
if not preliminary_nhsn_df.empty:
signal_df_dict.update({signal: preliminary_nhsn_df for signal in PRELIM_SIGNALS_MAP})
Comment on lines +63 to +65
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not fully understanding what's going on here, but Is there a way to write this chunk such that we don't have to change it later?


for signal, df_pull in signal_df_dict.items():
for geo in GEOS:
Expand Down
11 changes: 6 additions & 5 deletions nhsn/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
# queries the nhsn data with timestamp (2021-08-19, 2021-10-19) with CO and USA data


with open("test_data/page.json", "r") as f:
with open(f"{TEST_DIR}/test_data/page.json", "r") as f:
TEST_DATA = json.load(f)

with open("test_data/prelim_page.json", "r") as f:
with open(f"{TEST_DIR}/test_data/prelim_page.json", "r") as f:
PRELIM_TEST_DATA = json.load(f)

@pytest.fixture(scope="session")
Expand Down Expand Up @@ -50,11 +50,12 @@ def params():
@pytest.fixture
def params_w_patch(params):
params_copy = copy.deepcopy(params)
params_copy["common"]["custom_run"] = True
params_copy["patch"] = {
"start_issue": "2024-06-27",
"end_issue": "2024-06-29",
"patch_dir": "./patch_dir"
"patch_dir": f"{TEST_DIR}/patch_dir",
"issue_date": "2024-12-12",
}

return params_copy

@pytest.fixture(scope="function")
Expand Down
Binary file added nhsn/tests/test_data/20241212.csv.gz
Binary file not shown.
Binary file added nhsn/tests/test_data/20241212_prelim.csv.gz
Binary file not shown.
Loading
Loading