Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CHNG pipeline fix #1858

Merged
merged 5 commits into from
Jun 15, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions changehc/delphi_changehc/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import time
from datetime import datetime, timedelta
from typing import Dict, Any
import pdb
krivard marked this conversation as resolved.
Show resolved Hide resolved

# third party
from delphi_utils import get_structured_logger
Expand Down Expand Up @@ -78,6 +79,16 @@ def make_asserts(params):
assert (files["denom"] is None) == (files["flu"] is None), \
"exactly one of denom and flu files are provided"

def process_dates(params, startdate_dt, enddate_dt):
"""Process the start and end dates for indicator."""
enddate = params["indicator"].get("end_date")
if enddate is None:
enddate = str(enddate_dt.date())
startdate = params["indicator"].get("start_date", str(startdate_dt.date()))
if startdate is None:
startdate = str(startdate_dt.date())
return startdate, enddate


def run_module(params: Dict[str, Dict[str, Any]]):
"""
Expand Down Expand Up @@ -144,8 +155,8 @@ def run_module(params: Dict[str, Dict[str, Any]]):
enddate_dt = dropdate_dt - timedelta(days=n_waiting_days)
startdate_dt = enddate_dt - timedelta(days=n_backfill_days)
# now allow manual overrides
enddate = enddate = params["indicator"].get("end_date",str(enddate_dt.date()))
startdate = params["indicator"].get("start_date", str(startdate_dt.date()))

startdate, enddate = process_dates(params, startdate_dt, enddate_dt)

logger.info("generating signal and exporting to CSV",
first_sensor_date = startdate,
Expand Down
5 changes: 4 additions & 1 deletion changehc/delphi_changehc/update_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ def geo_reindex(self, data):
thr_col="den",
mega_col=geo,
date_col=Config.DATE_COL)
# this line should be removed once the fix is implemented for megacounties
data_frame = data_frame[~((data_frame['county'].str.len() > 5) | (data_frame['county'].str.contains('_')))]
elif geo == "state":
data_frame = gmpr.replace_geocode(data, "fips", "state_id", new_col="state",
date_col=Config.DATE_COL)
Expand All @@ -172,7 +174,8 @@ def geo_reindex(self, data):
multiindex = pd.MultiIndex.from_product((unique_geo_ids, self.fit_dates),
names=[geo, Config.DATE_COL])
assert (len(multiindex) <= (len(gmpr.get_geo_values(gmpr.as_mapper_name(geo))) * len(self.fit_dates))
), "more loc-date pairs than maximum number of geographies x number of dates"
), f"more loc-date pairs than maximum number of geographies x number of dates, length of multiindex is {len(multiindex)}, geo level is {geo}"

# fill dataframe with missing dates using 0
data_frame = data_frame.reindex(multiindex, fill_value=0)
data_frame.fillna(0, inplace=True)
Expand Down
Loading