Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
9c6513b
adding all symptom sets and updated preprocess function to compute ra…
Jan 16, 2022
04b256d
using delphi_utils smoothers for new signals and old smoother for old…
Jan 18, 2022
82eab20
selecting correct symptom sets to be added to the API
Jan 18, 2022
1a249d3
removing old signals
Jan 18, 2022
79c4d88
debugging the use of the smoother
Jan 18, 2022
da27293
removed dashes from variable names in sample data for tests, minor ch…
Jan 19, 2022
7923cbf
adding comments due to changes in unit test code
Jan 20, 2022
8870873
removing ununsed symptoms from metrics list
Jan 20, 2022
e79db73
add se and n fields to nation
nmdefries Jan 20, 2022
508b819
naive deduplication
nmdefries Jan 20, 2022
4a98f02
Merge branch 'main' into google_symptoms_omicron.
Jan 20, 2022
e82a843
changing line lengths and changing indexes of symptoms in unit tests …
Jan 20, 2022
a8eeba1
expect a single row per ts-geo
nmdefries Jan 20, 2022
1f5900a
Merge from main
Jan 20, 2022
bde6471
changing symptom set names to lower case
Jan 20, 2022
240d74c
Merge pull request #1482 from cmu-delphi/bot/sync-prod-main
krivard Jan 20, 2022
8f62e5f
indent
nmdefries Jan 20, 2022
2f805e6
check nation se, n fields
nmdefries Jan 21, 2022
276210c
Merge pull request #1479 from cmu-delphi/ndefries/dsew-cpr-ses
krivard Jan 21, 2022
ed51b3b
[cpr] Fix bug in export date handling
krivard Jan 21, 2022
3ff6381
[cpr] Select signals using params file
krivard Jan 21, 2022
0b065c3
Merge pull request #1483 from cmu-delphi/krivard/cpr-no-test
krivard Jan 21, 2022
79e26a3
Add a more thorough fips_to_megacounty test
dshemetov Jan 21, 2022
c2be816
Update _delphi_utils_python/tests/test_geomap.py
krivard Jan 21, 2022
9434566
Merge pull request #1484 from cmu-delphi/add_megacounty_test
krivard Jan 21, 2022
4363f05
Update google_symptoms/delphi_google_symptoms/run.py
nloliveira Jan 23, 2022
6d24998
automatic defining metrics and combined_metric based on symptom_sets …
Jan 23, 2022
af6f17a
removing unused import
Jan 23, 2022
1fe898a
Merge branch 'main' into google_symptoms_omicron
Jan 23, 2022
2c80571
updating singal names in the params.json template
Jan 23, 2022
0943417
adding smooth signal names to google symptoms params json file
Jan 24, 2022
e170ab9
test_pull_data type patch
nmdefries Jan 24, 2022
d983305
Merge pull request #1485 from cmu-delphi/ndefries/hhs-test-type-swap
korlaxxalrok Jan 24, 2022
a877c81
Merge branch 'main' into google_symptoms_omicron
Jan 24, 2022
8d5c23b
Merge pull request #1478 from cmu-delphi/google_symptoms_omicron
korlaxxalrok Jan 24, 2022
88d876c
chore: bump delphi_utils to 0.2.11
Jan 24, 2022
dee7c94
chore: bump covidcast-indicators to 0.2.23
Jan 24, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.2.22
current_version = 0.2.23
commit = True
message = chore: bump covidcast-indicators to {new_version}
tag = False
2 changes: 1 addition & 1 deletion _delphi_utils_python/.bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.2.10
current_version = 0.2.11
commit = True
message = chore: bump delphi_utils to {new_version}
tag = False
Expand Down
2 changes: 1 addition & 1 deletion _delphi_utils_python/delphi_utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@
from .nancodes import Nans
from .weekday import Weekday

__version__ = "0.2.10"
__version__ = "0.2.11"
2 changes: 1 addition & 1 deletion _delphi_utils_python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

setup(
name="delphi_utils",
version="0.2.10",
version="0.2.11",
description="Shared Utility Functions for Indicators",
long_description=long_description,
long_description_content_type="text/markdown",
Expand Down
29 changes: 29 additions & 0 deletions _delphi_utils_python/tests/test_geomap.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,14 @@ class TestGeoMapper:
),
)
)
mega_data_3 = pd.DataFrame(
{
"fips": [1123, 1125, 1126, 1128, 1129, 18181],
"timestamp": [pd.Timestamp("2018-01-01")] * 6,
"visits": [4, 1, 2, 5, 10, 100001],
"count": [2, 1, 5, 7, 3, 10021],
}
)
jhu_uid_data = pd.DataFrame(
{
"jhu_uid": [
Expand Down Expand Up @@ -208,6 +216,27 @@ def test_megacounty(self, geomapper):
new_data[["count"]].sum() - self.mega_data[["count"]].sum()
).sum() < 1e-3

new_data = geomapper.fips_to_megacounty(self.mega_data_3, 4, 1)
expected_df = pd.DataFrame(
{
"megafips": ["01000", "01128", "01129", "18181"],
"timestamp": [pd.Timestamp("2018-01-01")] * 4,
"visits": [7, 5, 10, 100001],
"count": [8, 7, 3, 10021],
}
)
pd.testing.assert_frame_equal(new_data.set_index("megafips").sort_index(axis=1), expected_df.set_index("megafips").sort_index(axis=1))
new_data = geomapper.fips_to_megacounty(self.mega_data_3, 4, 1, thr_col="count")
expected_df = pd.DataFrame(
{
"megafips": ["01000", "01126", "01128", "18181"],
"timestamp": [pd.Timestamp("2018-01-01")] * 4,
"visits": [15, 2, 5, 100001],
"count": [6, 5, 7, 10021],
}
)
pd.testing.assert_frame_equal(new_data.set_index("megafips").sort_index(axis=1), expected_df.set_index("megafips").sort_index(axis=1))

def test_add_population_column(self, geomapper):
new_data = geomapper.add_population_column(self.fips_data_3, "fips")
assert new_data.shape == (5, 5)
Expand Down
3 changes: 2 additions & 1 deletion ansible/templates/dsew_community_profile-params-prod.json.j2
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
},
"indicator": {
"input_cache": "./input_cache",
"reports": "new"
"reports": "new",
"export_signals": ["confirmed covid-19 admissions"]
},
"validation": {
"common": {
Expand Down
10 changes: 7 additions & 3 deletions ansible/templates/google_symptoms-params-prod.json.j2
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,13 @@
"dynamic": {
"ref_window_size": 7,
"smoothed_signals": [
"ageusia_smoothed_search",
"sum_anosmia_ageusia_smoothed_search",
"anosmia_smoothed_search"
"s01_smoothed_search",
"s02_smoothed_search",
"s03_smoothed_search",
"s05_smoothed_search",
"s06_smoothed_search",
"s08_smoothed_search",
"scontrol_smoothed_search"
]
}
},
Expand Down
2 changes: 2 additions & 0 deletions dsew_community_profile/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ Indicator-specific parameters:
and exported.
* `export_start_date`: a YYYY-mm-dd string indicating the first date to export.
* `export_end_date`: a YYYY-mm-dd string indicating the final date to export.
* `export_signals`: list of string keys from constants.SIGNALS indicating which
signals to export

## Running the Indicator

Expand Down
19 changes: 17 additions & 2 deletions dsew_community_profile/delphi_dsew_community_profile/pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,12 +342,16 @@ def nation_from_state(df, sig, geomapper):
).drop(
"norm_denom", axis=1
)
return geomapper.replace_geocode(
df = geomapper.replace_geocode(
df,
'state_id',
'nation',
new_col="geo_id"
)
df["se"] = None
df["sample_size"] = None

return df

def fetch_new_reports(params, logger=None):
"""Retrieve, compute, and collate all data we haven't seen yet."""
Expand All @@ -367,10 +371,21 @@ def fetch_new_reports(params, logger=None):
lambda x: x[x["publish_date"] == x["publish_date"].max()]
).drop(
"publish_date", axis=1
).drop_duplicates(
)

if len(latest_sig_df.index) > 0:
ret[sig] = latest_sig_df.reset_index(drop=True)
latest_sig_df = latest_sig_df.reset_index(drop=True)

assert all(latest_sig_df.groupby(
["timestamp", "geo_id"]
).size(
).reset_index(
drop=True
) == 1), f"Duplicate rows in {sig} indicate that one or" \
+ " more reports was published multiple times and the copies differ"

ret[sig] = latest_sig_df

# add nation from state
geomapper = GeoMapper()
Expand Down
11 changes: 8 additions & 3 deletions dsew_community_profile/delphi_dsew_community_profile/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,12 @@ def run_module(params):
__name__, filename=params["common"].get("log_filename"),
log_exceptions=params["common"].get("log_exceptions", True))
def replace_date_param(p):
if p in params["indicator"] and params["indicator"][p] is not None:
date_param = datetime.strptime(params["indicator"][p], "%Y-%m-%d").date()
params["indicator"][p] = date_param
if p in params["indicator"]:
if params["indicator"][p] is None:
del params["indicator"][p]
else:
date_param = datetime.strptime(params["indicator"][p], "%Y-%m-%d").date()
params["indicator"][p] = date_param
replace_date_param("export_start_date")
replace_date_param("export_end_date")
export_params = {
Expand All @@ -56,6 +59,8 @@ def replace_date_param(p):
dfs = fetch_new_reports(params, logger)
for key, df in dfs.items():
(geo, sig) = key
if sig not in params["indicator"]["export_signals"]:
continue
dates = create_export_csv(
df,
params['common']['export_dir'],
Expand Down
7 changes: 6 additions & 1 deletion dsew_community_profile/params.json.template
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@
"input_cache": "./input_cache",
"reports": "new",
"export_start_date": null,
"export_end_date": null
"export_end_date": null,
"export_signals": [
"confirmed covid-19 admissions",
"total",
"positivity"
]
},
"validation": {
"common": {
Expand Down
12 changes: 9 additions & 3 deletions dsew_community_profile/tests/test_pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,9 @@ def test_nation_from_state(self):
test_df = pd.DataFrame({
'state_id': ['pa', 'wv'],
'timestamp': [datetime(year=2020, month=1, day=1)]*2,
'val': [15., 150.],})
'val': [15., 150.],
'se': [None, None],
'sample_size': [None, None],})

pa_pop = int(state_pop.loc[state_pop.state_id == "pa", "pop"])
wv_pop = int(state_pop.loc[state_pop.state_id == "wv", "pop"])
Expand All @@ -191,7 +193,9 @@ def test_nation_from_state(self):
pd.DataFrame({
'geo_id': ['us'],
'timestamp': [datetime(year=2020, month=1, day=1)],
'val': [15. + 150.],}),
'val': [15. + 150.],
'se': [None],
'sample_size': [None],}),
check_like=True
)

Expand All @@ -204,6 +208,8 @@ def test_nation_from_state(self):
pd.DataFrame({
'geo_id': ['us'],
'timestamp': [datetime(year=2020, month=1, day=1)],
'val': [15*pa_pop/tot_pop + 150*wv_pop/tot_pop],}),
'val': [15*pa_pop/tot_pop + 150*wv_pop/tot_pop],
'se': [None],
'sample_size': [None],}),
check_like=True
)
41 changes: 31 additions & 10 deletions google_symptoms/delphi_google_symptoms/constants.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,33 @@
"""Registry for constants."""
from functools import partial
from datetime import timedelta

from .smooth import (
identity,
kday_moving_average,
)
from delphi_utils import Smoother

# global constants
METRICS = ["Anosmia", "Ageusia"]
COMBINED_METRIC = "sum_anosmia_ageusia"

SYMPTOM_SETS = {
"s01": ["Cough", "Phlegm", "Sputum", "Upper respiratory tract infection"],
"s02": ["Nasal congestion", "Post nasal drip", "Rhinorrhea", "Sinusitis",
"Rhinitis", "Common cold"],
"s03": ["Fever", "Hyperthermia", "Chills", "Shivering", "Low grade fever"],
#"s04": ["Fatigue", "Weakness", "Muscle weakness", "Myalgia", "Pain"],
"s05": ["Shortness of breath", "Wheeze", "Croup", "Pneumonia", "Asthma",
"Crackles", "Acute bronchitis", "Bronchitis"],
"s06": ["Anosmia", "Dysgeusia", "Ageusia"],
#"s07": ["Nausea", "Vomiting", "Diarrhea", "Indigestion", "Abdominal pain"],
"s08": ["Laryngitis", "Sore throat", "Throat irritation"],
#"s09": ["Headache", "Migraine", "Cluster headache", "Dizziness", "Lightheadedness"],
#"s10": ["Night sweats","Perspiration", "hyperhidrosis"],
"scontrol": ["Type 2 diabetes", "Urinary tract infection", "Hair loss",
"Candidiasis", "Weight gain"]
}

COMBINED_METRIC = list(SYMPTOM_SETS.keys())

METRICS = list()
for combmetric in COMBINED_METRIC:
METRICS = METRICS + SYMPTOM_SETS[combmetric]

SMOOTHERS = ["raw", "smoothed"]
GEO_RESOLUTIONS = [
"state",
Expand All @@ -20,12 +38,15 @@
"nation"
]

seven_day_moving_average = partial(kday_moving_average, k=7)
SMOOTHERS_MAP = {
"raw": (identity, lambda d: d - timedelta(days=7)),
"smoothed": (seven_day_moving_average, lambda d: d),
"raw": (Smoother("identity", impute_method=None),
lambda d: d - timedelta(days=7)),
"smoothed": (Smoother("moving_average", window_length=7,
impute_method='zeros'), lambda d: d)
}



STATE_TO_ABBREV = {'Alabama': 'al',
'Alaska': 'ak',
# 'American Samoa': 'as',
Expand Down
12 changes: 10 additions & 2 deletions google_symptoms/delphi_google_symptoms/geo.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def generate_transition_matrix(geo_res):
).fillna(0).reset_index().rename({mapping_flag: "geo_id"}, axis = 1)
return map_df

def geo_map(df, geo_res):
def geo_map(df, geo_res, namescols = None):
"""
Compute derived HRR and MSA counts as a weighted sum of the county dataset.

Expand All @@ -59,6 +59,11 @@ def geo_map(df, geo_res):
and columns for signal vals
geo_res: str
"msa", "hrr", "hhs" or "nation"
namescols: list of strings
names of columns of df but geo_id and timestamp
when running the pipeline, this will always be METRICS+COMBINED_METRIC
this parameter was added to allow us to run unit tests in subsets of
metrics and combined_metric's

Returns
-------
Expand All @@ -67,6 +72,9 @@ def geo_map(df, geo_res):
and columns for signal vals.
The geo_id has been converted from fips to HRRs/MSAs
"""
if namescols is None:
namescols = METRICS + COMBINED_METRIC

if geo_res == "county":
return df

Expand All @@ -75,7 +83,7 @@ def geo_map(df, geo_res):
for _date in df["timestamp"].unique():
val_lists = df[df["timestamp"] == _date].merge(
map_df["geo_id"], how="right"
)[METRICS + [COMBINED_METRIC]].fillna(0)
)[namescols].fillna(0)
newdf = pd.DataFrame(
np.matmul(map_df.values[:, 1:].T, val_lists.values),
columns = list(val_lists.keys())
Expand Down
16 changes: 8 additions & 8 deletions google_symptoms/delphi_google_symptoms/pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import numpy as np
import pandas as pd

from .constants import DC_FIPS, METRICS, COMBINED_METRIC
from .constants import DC_FIPS, METRICS, COMBINED_METRIC, SYMPTOM_SETS


# Create map of BigQuery symptom column names to desired column names.
Expand Down Expand Up @@ -39,18 +39,18 @@ def preprocess(df, level):
Dataframe as described above.
"""
# Constants
KEEP_COLUMNS = ["geo_id", "date"] + METRICS + [COMBINED_METRIC]
KEEP_COLUMNS = ["geo_id", "date"] + METRICS + COMBINED_METRIC

df.rename(colname_map, axis=1, inplace=True)
df["geo_id"] = df["open_covid_region_code"].apply(
lambda x: x.split("-")[-1].lower())

df[COMBINED_METRIC] = 0
for metric in METRICS:
df[COMBINED_METRIC] += df[metric].fillna(0)
df.loc[
(df["Anosmia"].isnull())
& (df["Ageusia"].isnull()), COMBINED_METRIC] = np.nan
for cb_metric in COMBINED_METRIC:
df[cb_metric] = 0
for metric in SYMPTOM_SETS[cb_metric]:
df[cb_metric] += df[metric].fillna(0)
df[cb_metric] = df[cb_metric]/len(SYMPTOM_SETS[cb_metric])
df.loc[df[SYMPTOM_SETS[cb_metric]].isnull().all(axis=1), cb_metric] = np.nan

# Delete rows with missing FIPS
null_mask = (df["geo_id"].isnull())
Expand Down
13 changes: 7 additions & 6 deletions google_symptoms/delphi_google_symptoms/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
)
from delphi_utils.validator.utils import lag_converter

from .constants import (METRICS, COMBINED_METRIC,
from .constants import (COMBINED_METRIC,
GEO_RESOLUTIONS, SMOOTHERS, SMOOTHERS_MAP)
from .geo import geo_map
from .pull import pull_gs_data
Expand Down Expand Up @@ -94,15 +94,16 @@ def run_module(params):

if len(df_pull) == 0:
continue
for metric, smoother in product(
METRICS+[COMBINED_METRIC], SMOOTHERS):
for metric, smoother in product(COMBINED_METRIC, SMOOTHERS):
logger.info("generating signal and exporting to CSV",
geo_res=geo_res,
metric=metric,
smoother=smoother)
df = df_pull.set_index(["timestamp", "geo_id"])
df["val"] = df[metric].groupby(level=1
).transform(SMOOTHERS_MAP[smoother][0])
df = df_pull
df["val"] = df[metric].astype(float)
df["val"] = df[["geo_id", "val"]].groupby(
"geo_id")["val"].transform(
SMOOTHERS_MAP[smoother][0].smooth)
df["se"] = np.nan
df["sample_size"] = np.nan
# Drop early entries where data insufficient for smoothing
Expand Down
Loading