Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add NAN code support to HHS Hosp #1008

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 80 additions & 65 deletions hhs_hosp/delphi_hhs/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@

import time
from delphi_epidata import Epidata
from delphi_utils.export import create_export_csv
from delphi_utils.geomap import GeoMapper
from delphi_utils import get_structured_logger
from delphi_utils import create_export_csv, get_structured_logger, Nans, GeoMapper
import numpy as np
import pandas as pd

from .constants import SIGNALS, GEOS, SMOOTHERS, CONFIRMED, SUM_CONF_SUSP, CONFIRMED_FLU


def _date_to_int(d):
"""Return a date object as a yyyymmdd int."""
return int(d.strftime("%Y%m%d"))
Expand Down Expand Up @@ -64,6 +63,19 @@ def generate_date_ranges(start, end):
return output


def add_nancodes(df):
"""Add nancodes to a signal dataframe."""
# Default missingness codes
df["missing_val"] = Nans.NOT_MISSING
df["missing_se"] = Nans.NOT_APPLICABLE
df["missing_sample_size"] = Nans.NOT_APPLICABLE

# Mark any remaining nans with unknown
remaining_nans_mask = df["val"].isnull()
df.loc[remaining_nans_mask, "missing_val"] = Nans.OTHER
return df


def run_module(params):
"""
Generate ground truth HHS hospitalization data.
Expand All @@ -79,16 +91,16 @@ def run_module(params):
"""
start_time = time.time()
logger = get_structured_logger(
__name__, filename=params["common"].get("log_filename"),
log_exceptions=params["common"].get("log_exceptions", True))
__name__,
filename=params["common"].get("log_filename"),
log_exceptions=params["common"].get("log_exceptions", True),
)
mapper = GeoMapper()
request_all_states = ",".join(mapper.get_geo_values("state_id"))
end_day = date.today()
if "epidata" in params["common"] and \
"as_of" in params["common"]["epidata"]:
if "epidata" in params["common"] and "as_of" in params["common"]["epidata"]:
end_day = min(
end_day,
datetime.strptime(str(params["common"]["epidata"]["as_of"]), "%Y%m%d").date()
end_day, datetime.strptime(str(params["common"]["epidata"]["as_of"]), "%Y%m%d").date()
)
past_reference_day = date(year=2020, month=1, day=1) # first available date in DB
date_range = generate_date_ranges(past_reference_day, end_day)
Expand All @@ -100,33 +112,32 @@ def run_module(params):
raise Exception(f"Bad result from Epidata for {r}: {response['message']}")
if response["result"] == -2 and r == date_range[-1]: # -2 code means no results
continue
dfs.append(pd.DataFrame(response['epidata']))
dfs.append(pd.DataFrame(response["epidata"]))
all_columns = pd.concat(dfs)
geo_mapper = GeoMapper()
stats = []
for sensor, smoother, geo in product(SIGNALS, SMOOTHERS, GEOS):
logger.info("Generating signal and exporting to CSV",
geo_res = geo,
sensor = sensor,
smoother = smoother)
df = geo_mapper.add_geocode(make_signal(all_columns, sensor),
"state_id",
"state_code",
from_col="state")
logger.info(
"Generating signal and exporting to CSV", geo_res=geo, sensor=sensor, smoother=smoother
)
df = geo_mapper.add_geocode(
make_signal(all_columns, sensor), "state_id", "state_code", from_col="state"
)
if sensor.endswith("_prop"):
df=pop_proportion(df, geo_mapper)
df = pop_proportion(df, geo_mapper)
df = make_geo(df, geo, geo_mapper)
df["se"] = np.nan
df["sample_size"] = np.nan
df = smooth_values(df, smoother[0])
df = add_nancodes(df)
if df.empty:
continue
sensor_name = sensor + smoother[1]
# don't export first 6 days for smoothed signals since they'll be nan.
start_date = min(df.timestamp) + timedelta(6) if smoother[1] else min(df.timestamp)
dates = create_export_csv(df,
params["common"]["export_dir"],
geo,
sensor_name,
start_date=start_date)
dates = create_export_csv(
df, params["common"]["export_dir"], geo, sensor_name, start_date=start_date
)
if len(dates) > 0:
stats.append((max(dates), len(dates)))

Expand All @@ -135,71 +146,75 @@ def run_module(params):
csv_export_count = sum(s[-1] for s in stats)
max_lag_in_days = min_max_date and (datetime.now() - min_max_date).days
formatted_min_max_date = min_max_date and min_max_date.strftime("%Y-%m-%d")
logger.info("Completed indicator run",
elapsed_time_in_seconds = elapsed_time_in_seconds,
csv_export_count = csv_export_count,
max_lag_in_days = max_lag_in_days,
oldest_final_export_date = formatted_min_max_date)
logger.info(
"Completed indicator run",
elapsed_time_in_seconds=elapsed_time_in_seconds,
csv_export_count=csv_export_count,
max_lag_in_days=max_lag_in_days,
oldest_final_export_date=formatted_min_max_date,
)


def smooth_values(df, smoother):
"""Smooth the value column in the dataframe."""
df["val"] = df["val"].astype(float)
df["val"] = df[["geo_id", "val"]].groupby("geo_id")["val"].transform(
smoother.smooth
)
df["val"] = df[["geo_id", "val"]].groupby("geo_id")["val"].transform(smoother.smooth)
return df

def pop_proportion(df,geo_mapper):

def pop_proportion(df, geo_mapper):
"""Get the population-proportionate variants as the dataframe val."""
pop_val=geo_mapper.add_population_column(df, "state_code")
df["val"]=round(df["val"]/pop_val["population"]*100000, 7)
pop_val = geo_mapper.add_population_column(df, "state_code")
df["val"] = round(df["val"] / pop_val["population"] * 100000, 7)
pop_val.drop("population", axis=1, inplace=True)
return df


def make_geo(state, geo, geo_mapper):
"""Transform incoming geo (state) to another geo."""
if geo == "state":
exported = state.rename(columns={"state": "geo_id"})
else:
exported = geo_mapper.replace_geocode(state, "state_code", geo, new_col="geo_id")
exported["se"] = np.nan
exported["sample_size"] = np.nan
exported = geo_mapper.replace_geocode(
state, "state_code", geo, new_col="geo_id", date_col="timestamp"
)
return exported


def make_signal(all_columns, sig):
"""Generate column sums according to signal name."""
assert sig in SIGNALS, f"Unexpected signal name '{sig}';" + \
" familiar names are '{', '.join(SIGNALS)}'"
assert sig in SIGNALS, (
f"Unexpected signal name '{sig}';" + " familiar names are '{', '.join(SIGNALS)}'"
)
if sig.startswith(CONFIRMED):
df = pd.DataFrame({
"state": all_columns.state.apply(str.lower),
"timestamp":int_date_to_previous_day_datetime(all_columns.date),
"val": \
all_columns.previous_day_admission_adult_covid_confirmed + \
all_columns.previous_day_admission_pediatric_covid_confirmed
})
df = pd.DataFrame(
{
"state": all_columns.state.apply(str.lower),
"timestamp": int_date_to_previous_day_datetime(all_columns.date),
"val": all_columns.previous_day_admission_adult_covid_confirmed
+ all_columns.previous_day_admission_pediatric_covid_confirmed,
}
)
elif sig.startswith(SUM_CONF_SUSP):
df = pd.DataFrame({
"state": all_columns.state.apply(str.lower),
"timestamp":int_date_to_previous_day_datetime(all_columns.date),
"val": \
all_columns.previous_day_admission_adult_covid_confirmed + \
all_columns.previous_day_admission_adult_covid_suspected + \
all_columns.previous_day_admission_pediatric_covid_confirmed + \
all_columns.previous_day_admission_pediatric_covid_suspected,
})
df = pd.DataFrame(
{
"state": all_columns.state.apply(str.lower),
"timestamp": int_date_to_previous_day_datetime(all_columns.date),
"val": all_columns.previous_day_admission_adult_covid_confirmed
+ all_columns.previous_day_admission_adult_covid_suspected
+ all_columns.previous_day_admission_pediatric_covid_confirmed
+ all_columns.previous_day_admission_pediatric_covid_suspected,
}
)
elif sig.startswith(CONFIRMED_FLU):
df = pd.DataFrame({
"state": all_columns.state.apply(str.lower),
"timestamp":int_date_to_previous_day_datetime(all_columns.date),
"val": \
all_columns.previous_day_admission_influenza_confirmed
})
else:
raise Exception(
"Bad programmer: signal '{sig}' in SIGNALS but not handled in make_signal"
df = pd.DataFrame(
{
"state": all_columns.state.apply(str.lower),
"timestamp": int_date_to_previous_day_datetime(all_columns.date),
"val": all_columns.previous_day_admission_influenza_confirmed,
}
)
else:
raise Exception("Bad programmer: signal '{sig}' in SIGNALS but not handled in make_signal")
df["val"] = df.val.astype(float)
return df
47 changes: 31 additions & 16 deletions hhs_hosp/tests/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
import tempfile
import os

from delphi_hhs.run import _date_to_int, int_date_to_previous_day_datetime, generate_date_ranges, \
from delphi_hhs.run import _date_to_int, add_nancodes, int_date_to_previous_day_datetime, generate_date_ranges, \
make_signal, make_geo, run_module, pop_proportion
from delphi_hhs.constants import SMOOTHERS, GEOS, SIGNALS, \
CONFIRMED, SUM_CONF_SUSP, CONFIRMED_FLU, CONFIRMED_PROP, SUM_CONF_SUSP_PROP, CONFIRMED_FLU_PROP
from delphi_utils.geomap import GeoMapper
from delphi_utils import GeoMapper, Nans
from freezegun import freeze_time
import numpy as np
import pandas as pd
Expand Down Expand Up @@ -85,15 +85,15 @@ def test_make_signal():
})
pd.testing.assert_frame_equal(expected_flu, make_signal(data, CONFIRMED_FLU))
pd.testing.assert_frame_equal(expected_flu, make_signal(data, CONFIRMED_FLU_PROP))

with pytest.raises(Exception):
make_signal(data, "zig")

def test_pop_proportion():
geo_mapper = GeoMapper()
state_pop = geo_mapper.get_crosswalk("state_code", "pop")

test_df = pd.DataFrame({
test_df = pd.DataFrame({
'state': ['PA'],
'state_code': [42],
'timestamp': [datetime(year=2020, month=1, day=1)],
Expand All @@ -109,7 +109,7 @@ def test_pop_proportion():
'val': [15/pa_pop*100000],})
)

test_df= pd.DataFrame({
test_df= pd.DataFrame({
'state': ['WV'],
'state_code': [54],
'timestamp': [datetime(year=2020, month=1, day=1)],
Expand Down Expand Up @@ -137,30 +137,23 @@ def test_make_geo():
'val': [1., 2., 4.],
})

template = {
'se': np.nan,
'sample_size': np.nan,
}
expecteds = {
"state": pd.DataFrame(
dict(template,
geo_id=data.state,
dict(geo_id=data.state,
timestamp=data.timestamp,
val=data.val)),
"hhs": pd.DataFrame(
dict(template,
geo_id=['3', '5'],
dict(geo_id=['3', '5'],
timestamp=[test_timestamp] * 2,
val=[3., 4.])),
"nation": pd.DataFrame(
dict(template,
geo_id=['us'],
dict(geo_id=['us'],
timestamp=[test_timestamp],
val=[7.]))
}
for geo, expected in expecteds.items():
result = make_geo(data, geo, geo_mapper)
for series in ["geo_id", "timestamp", "val", "se", "sample_size"]:
for series in ["geo_id", "timestamp", "val"]:
pd.testing.assert_series_equal(expected[series], result[series], obj=f"{geo}:{series}")


Expand Down Expand Up @@ -207,3 +200,25 @@ def test_ignore_last_range_no_results(mock_covid_hosp, mock_export):
}
}
assert not run_module(params) # function should not raise value error and has no return value

def test_add_nancode():
data = pd.DataFrame({
'state': ['PA','WV','OH'],
'state_code': [42, 54, 39],
'timestamp': [pd.to_datetime("20200601")]*3,
'val': [1, 2, np.nan],
'se': [np.nan] * 3,
'sample_size': [np.nan] * 3,
})
expected = pd.DataFrame({
'state': ['PA','WV','OH'],
'state_code': [42, 54, 39],
'timestamp': [pd.to_datetime("20200601")]*3,
'val': [1, 2, np.nan],
'se': [np.nan] * 3,
'sample_size': [np.nan] * 3,
'missing_val': [Nans.NOT_MISSING] * 2 + [Nans.OTHER],
'missing_se': [Nans.NOT_APPLICABLE] * 3,
'missing_sample_size': [Nans.NOT_APPLICABLE] * 3,
})
pd.testing.assert_frame_equal(expected, add_nancodes(data))