diff --git a/.github/workflows/python-ci.yml b/.github/workflows/python-ci.yml index 19fdf56d5..403f14a8d 100644 --- a/.github/workflows/python-ci.yml +++ b/.github/workflows/python-ci.yml @@ -16,7 +16,7 @@ jobs: if: github.event.pull_request.draft == false strategy: matrix: - packages: [_delphi_utils_python, changehc, claims_hosp, combo_cases_and_deaths, doctor_visits, google_symptoms, hhs_hosp, hhs_facilities, jhu, nchs_mortality, nowcast, quidel, quidel_covidtest, safegraph_patterns, sir_complainsalot, usafacts] + packages: [_delphi_utils_python, changehc, claims_hosp, combo_cases_and_deaths, doctor_visits, dsew_community_profile, google_symptoms, hhs_hosp, hhs_facilities, jhu, nchs_mortality, nowcast, quidel, quidel_covidtest, safegraph_patterns, sir_complainsalot, usafacts] defaults: run: working-directory: ${{ matrix.packages }} diff --git a/ansible/templates/dsew_community_profile-prod.json.j2 b/ansible/templates/dsew_community_profile-prod.json.j2 new file mode 100644 index 000000000..89cee4bf0 --- /dev/null +++ b/ansible/templates/dsew_community_profile-prod.json.j2 @@ -0,0 +1,32 @@ +{ + "common": { + "export_dir": "./receiving", + "log_filename": "dsew_cpr.log" + }, + "indicator": { + "input_cache": "./input_cache", + "reports": "new" + }, + "validation": { + "common": { + "data_source": "dsew_cpr", + "span_length": 14, + "min_expected_lag": {"all": "5"}, + "max_expected_lag": {"all": "9"}, + "dry_run": true, + "suppressed_errors": [] + }, + "static": { + "minimum_sample_size": 0, + "missing_se_allowed": true, + "missing_sample_size_allowed": true + }, + "dynamic": { + "ref_window_size": 7, + "smoothed_signals": [ + "naats_total_7dav", + "naats_positivity_7dav" + ] + } + } +} diff --git a/dsew_community_profile/.pylintrc b/dsew_community_profile/.pylintrc new file mode 100644 index 000000000..f30837c7e --- /dev/null +++ b/dsew_community_profile/.pylintrc @@ -0,0 +1,22 @@ + +[MESSAGES CONTROL] + +disable=logging-format-interpolation, + too-many-locals, + too-many-arguments, + # Allow pytest functions to be part of a class. + no-self-use, + # Allow pytest classes to have one test. + too-few-public-methods + +[BASIC] + +# Allow arbitrarily short-named variables. +variable-rgx=[a-z_][a-z0-9_]* +argument-rgx=[a-z_][a-z0-9_]* +attr-rgx=[a-z_][a-z0-9_]* + +[DESIGN] + +# Don't complain about pytest "unused" arguments. +ignored-argument-names=(_.*|run_as_module) \ No newline at end of file diff --git a/dsew_community_profile/DETAILS.md b/dsew_community_profile/DETAILS.md new file mode 100644 index 000000000..56816ee06 --- /dev/null +++ b/dsew_community_profile/DETAILS.md @@ -0,0 +1,133 @@ +# Dataset layout + +The Data Strategy and Execution Workgroup (DSEW) publishes a Community Profile +Report each weekday, comprising a pair of files: an Excel workbook (.xlsx) and a +PDF which shows select metrics from the workbook as time series charts and +choropleth maps. These files are listed as attachments on the healthdata.gov +site: + +https://healthdata.gov/Health/COVID-19-Community-Profile-Report/gqxm-d9w9 + +Each Excel file attachment has a filename. The filename contains a date, +presumably the publish date. The attachment also has an alphanumeric +assetId. Both the filename and the assetId are required for downloading the +file. Whether this means that updated versions of a particular file may be +uploaded by DSEW at later times is not known. The attachment does not explicitly +list an upload timestamp. To be safe, we cache our downloads using both the +assetId and the filename. + +# Workbook layout + +Each Excel file is a workbook with multiple sheets. The exemplar file used in +writing this indicator is "Community Profile Report 20211102.xlsx". The sheets +include: + +- User Notes: Instructions for using the workbook +- Overview: US National figures for the last 5 weeks, plus monthly peaks back to + April 2020 +- Regions*: Figures for FEMA regions (double-checked: they match HHS regions + except that FEMA 2 does not include Palau while HHS 2 does) +- States*: Figures for US states and territories +- CBSAs*: Figures for US Census Block Statistical Areas +- Counties*: Figures for US counties +- Weekly Transmission Categories: Lists of high, substantial, and moderate + transmission states and territories +- National Peaks: Monthly national peaks back to April 2020 +- National Historic: Daily national figures back to January 22 2020 +- Data Notes: Source and methods information for all metrics +- Color Thresholds: Color-coding is used extensively in all sheets; these are + the keys + +The starred sheets above have nearly-identical column layouts, and together +cover the county, MSA, state, and HHS geographical levels used in +covidcast. Rather than aggregate them ourselves and risk a mismatch, this +indicator lifts these geographical aggregations directly from the corresponding +sheets of the workbook. + +GeoMapper _is_ used to generate national figures from +state, due to architectural differences between the starred sheets and the +Overview sheet. If we discover that our nation-level figures differ too much +from those listed in the Overview sheet, we can add dedicated parsing for the +Overview sheet and remove GeoMapper from this indicator altogether. + +# Sheet layout + +## Headers + +Each starred sheet has two rows of headers. The first row uses merged cells to +group several columns together under a single "overheader". This overheader +often includes the reference period for that group of columns, such as: + +- CASES/DEATHS: LAST WEEK (October 26-November 1) +- TESTING: LAST WEEK (October 24-30, Test Volume October 20-26) +- TESTING: PREVIOUS WEEK (October 17-23, Test Volume October 13-19) + +Overheaders have changed periodically since the first report. For example, the +"TESTING: LAST WEEK" overheader above has also appeared as "VIRAL (RT-PCR) LAB +TESTING: LAST WEEK", with and without a separate reference date for Test +Volume. All known overheader forms are checked in test_pull.py. + +The second row contains a header for each column. The headers uniquely identify +each column included in the sheet. Column headers include spaces, and typically +specify both the metric and the reference period over which it was calculated, +such as: + +- Total NAATs - last 7 days (may be an underestimate due to delayed reporting) +- NAAT positivity rate - previous 7 days (may be an underestimate due to delayed + reporting) + +Columns headers have also changed periodically since the first report. For +example, the "Total NAATs - last 7 days" header above has also appeared as +"Total RT-PCR diagnostic tests - last 7 days". + +## Contents + +Each starred sheet contains test positivity and total test volume figures for +two reference periods, "last [week]" and "previous [week]". In some reports, the +reference periods for test positivity and total test volume are the same; in +others, they are different, such that the report contains figures for four +distinct reference periods, two for each metric we extract. + +# Time series conversions and parsing notes + +## Reference date + +The reference period in the overheader never includes the year. We guess the +reference year by picking the same year as the publish date (i.e., the date +extracted from the filename), and if the reference month is greater than the +publish month, subtract 1 from the reference year. This adequately covers the +December-January boundary. + +We select as reference date the end date of the reference period for each +metric. Reference periods are always 7 days, so this indicator produces +seven-day averages. We divide the total testing volume by seven and leave the +test positivity alone. + +## Geo ID + +The Counties sheet lists FIPS codes numerically, such that FIPS with a leading +zero only have four digits. We fix this by zero-filling to five characters. + +MSAs are a subset of CBSAs. We fix this by selecting only CBSAs with type +"Metropolitan". + +Most of the starred sheets have the geo id as the first non-index column. The +Region sheet has no such column. We fix this by generating the HHS ids from the +index column instead. + +## Combining multiple reports + +Each report file generates two reference dates for each metric, up to four +reference dates total. Since it's not clear whether new versions of past files +are ever made available, the default mode (params.indicator.reports="new") +fetches any files that are not already in the input cache, then combines the +results into a single data frame before exporting. This will generate correct +behavior should (for instance) a previously-downloaded file get a new assetId. + +For the initial run on an empty input cache, and for runs configured to process +a range of reports (using params.indicator.reports=YYYY-mm-dd--YYYY-mm-dd), this +indicator makes no distinction between figures that came from different +reports. That may not be what you want. If the covidcast issue date needs to +match the date on the report filename, then the indicator must instead be run +repeatedly, with equal start and end dates, keeping the output of each run +separate. diff --git a/dsew_community_profile/Makefile b/dsew_community_profile/Makefile new file mode 100644 index 000000000..bdea33afd --- /dev/null +++ b/dsew_community_profile/Makefile @@ -0,0 +1,30 @@ +.PHONY = venv, lint, test, clean + +dir = $(shell find ./delphi_* -name __init__.py | grep -o 'delphi_[_[:alnum:]]*') + +venv: + python3.8 -m venv env + +install: venv + . env/bin/activate; \ + pip install wheel ; \ + pip install -e ../_delphi_utils_python ;\ + pip install -e . + +install-ci: venv + . env/bin/activate; \ + pip install wheel ; \ + pip install ../_delphi_utils_python ;\ + pip install . + +lint: + . env/bin/activate; pylint $(dir) + . env/bin/activate; pydocstyle $(dir) + +test: + . env/bin/activate ;\ + (cd tests && ../env/bin/pytest --cov=$(dir) --cov-report=term-missing) + +clean: + rm -rf env + rm -f params.json diff --git a/dsew_community_profile/README.md b/dsew_community_profile/README.md new file mode 100644 index 000000000..b070217a1 --- /dev/null +++ b/dsew_community_profile/README.md @@ -0,0 +1,84 @@ +# COVID-19 Community Profile Report + +The Data Strategy and Execution Workgroup (DSEW) publishes a Community Profile +Report each weekday at this location: + +https://healthdata.gov/Health/COVID-19-Community-Profile-Report/gqxm-d9w9 + +This indicator extracts COVID-19 test figures from these reports. + +Indicator-specific parameters: + +* `input_cache`: a directory where Excel (.xlsx) files downloaded from + healthdata.gov will be stored for posterity. Each file is 3.3 MB in size, so + we expect this directory to require ~1GB of disk space for each year of + operation. +* `reports`: {new | all | YYYY-mm-dd--YYYY-mm-dd} a string indicating which + reports to export. The default, "new", downloads and exports only reports not + already found in the input cache. The "all" setting exports data for all + available reports, downloading them to the input cache if necessary. The date + range setting refers to the date listed in the filename for the report, + presumably the publish date. Only reports named with a date within the + specified range (inclusive) will be downloaded to the input cache if necessary + and exported. +* `export_start_date`: a YYYY-mm-dd string indicating the first date to export. +* `export_end_date`: a YYYY-mm-dd string indicating the final date to export. + +## Running the Indicator + +The indicator is run by directly executing the Python module contained in this +directory. The safest way to do this is to create a virtual environment, +installed the common DELPHI tools, and then install the module and its +dependencies. To do this, run the following command from this directory: + +``` +make install +``` + +This command will install the package in editable mode, so you can make changes that +will automatically propagate to the installed package. + +All of the user-changable parameters are stored in `params.json`. To execute +the module and produce the output datasets (by default, in `receiving`), run +the following: + +``` +env/bin/python -m delphi_dsew_community_profile +``` + +If you want to enter the virtual environment in your shell, +you can run `source env/bin/activate`. Run `deactivate` to leave the virtual environment. + +Once you are finished, you can remove the virtual environment and +params file with the following: + +``` +make clean +``` + +## Testing the code + +To run static tests of the code style, run the following command: + +``` +make lint +``` + +Unit tests are also included in the module. To execute these, run the following +command from this directory: + +``` +make test +``` + +To run individual tests, run the following: + +``` +(cd tests && ../env/bin/pytest .py --cov=delphi_dsew_community_profile --cov-report=term-missing) +``` + +The output will show the number of unit tests that passed and failed, along +with the percentage of code covered by the tests. + +None of the linting or unit tests should fail, and the code lines that are not covered by unit tests should be small and +should not include critical sub-routines. diff --git a/dsew_community_profile/REVIEW.md b/dsew_community_profile/REVIEW.md new file mode 100644 index 000000000..03f87b17a --- /dev/null +++ b/dsew_community_profile/REVIEW.md @@ -0,0 +1,38 @@ +## Code Review (Python) + +A code review of this module should include a careful look at the code and the +output. To assist in the process, but certainly not in replace of it, please +check the following items. + +**Documentation** + +- [ ] the README.md file template is filled out and currently accurate; it is +possible to load and test the code using only the instructions given +- [ ] minimal docstrings (one line describing what the function does) are +included for all functions; full docstrings describing the inputs and expected +outputs should be given for non-trivial functions + +**Structure** + +- [ ] code should pass lint checks (`make lint`) +- [ ] any required metadata files are checked into the repository and placed +within the directory `static` +- [ ] any intermediate files that are created and stored by the module should +be placed in the directory `cache` +- [ ] final expected output files to be uploaded to the API are placed in the +`receiving` directory; output files should not be committed to the respository +- [ ] all options and API keys are passed through the file `params.json` +- [ ] template parameter file (`params.json.template`) is checked into the +code; no personal (i.e., usernames) or private (i.e., API keys) information is +included in this template file + +**Testing** + +- [ ] module can be installed in a new virtual environment (`make install`) +- [ ] reasonably high level of unit test coverage covering all of the main logic +of the code (e.g., missing coverage for raised errors that do not currently seem +possible to reach are okay; missing coverage for options that will be needed are +not) +- [ ] all unit tests run without errors (`make test`) +- [ ] indicator directory has been added to GitHub CI +(`covidcast-indicators/.github/workflows/python-ci.yml`) diff --git a/dsew_community_profile/cache/.gitignore b/dsew_community_profile/cache/.gitignore new file mode 100644 index 000000000..e69de29bb diff --git a/dsew_community_profile/delphi_dsew_community_profile/__init__.py b/dsew_community_profile/delphi_dsew_community_profile/__init__.py new file mode 100644 index 000000000..52a507259 --- /dev/null +++ b/dsew_community_profile/delphi_dsew_community_profile/__init__.py @@ -0,0 +1,13 @@ +# -*- coding: utf-8 -*- +"""Module to pull and clean indicators from the XXXXX source. + +This file defines the functions that are made public by the module. As the +module is intended to be executed though the main method, these are primarily +for testing. +""" + +from __future__ import absolute_import + +from . import run + +__version__ = "0.1.0" diff --git a/dsew_community_profile/delphi_dsew_community_profile/__main__.py b/dsew_community_profile/delphi_dsew_community_profile/__main__.py new file mode 100644 index 000000000..ab5a749dc --- /dev/null +++ b/dsew_community_profile/delphi_dsew_community_profile/__main__.py @@ -0,0 +1,12 @@ +# -*- coding: utf-8 -*- +"""Call the function run_module when executed. + +This file indicates that calling the module (`python -m delphi_dsew_community_profile`) will +call the function `run_module` found within the run.py file. There should be +no need to change this template. +""" + +from delphi_utils import read_params +from .run import run_module # pragma: no cover + +run_module(read_params()) # pragma: no cover diff --git a/dsew_community_profile/delphi_dsew_community_profile/constants.py b/dsew_community_profile/delphi_dsew_community_profile/constants.py new file mode 100644 index 000000000..51c62b5ea --- /dev/null +++ b/dsew_community_profile/delphi_dsew_community_profile/constants.py @@ -0,0 +1,71 @@ +"""Registry for variations.""" +from collections.abc import Callable as function +from dataclasses import dataclass + +URL_PREFIX = "https://healthdata.gov/api/views/gqxm-d9w9" +DOWNLOAD_ATTACHMENT = URL_PREFIX + "/files/{assetId}?download=true&filename={filename}" +DOWNLOAD_LISTING = URL_PREFIX + ".json" + +@dataclass +class Transform: + """Transformation filters for interpreting a particular sheet in the workbook.""" + + name: str = None + level: str = None + row_filter: function = None + geo_id_select: function = None + geo_id_apply: function = None + +T_FIRST = lambda df: df[df.columns[0]] +TRANSFORMS = { + t.name: t for t in [ + Transform( + name="Regions", + level="hhs", + geo_id_select=lambda df: df.index.to_series(), + geo_id_apply=lambda x: x.replace("Region ", "") + ), + Transform( + name="States", + level="state", + geo_id_select=T_FIRST, + geo_id_apply=lambda x: x.lower() + ), + Transform( + name="CBSAs", + level="msa", + row_filter=lambda df: df['CBSA type'] == "Metropolitan", + geo_id_select=T_FIRST, + geo_id_apply=lambda x: f"{x}" + ), + Transform( + name="Counties", + level="county", + geo_id_select=T_FIRST, + geo_id_apply=lambda x: f"{x:05}" + ) + ]} + +# signal id : is_rate, name to report in API +SIGNALS = { + "total": { + "is_rate" : False, + "api_name": "naats_total_7dav" + }, + "positivity": { + "is_rate" : True, + "api_name": "naats_positivity_7dav" + }, + "confirmed covid-19 admissions": { + "is_rate" : False, + "api_name": "confirmed_admissions_covid_1d_7dav" + } +} + +COUNTS_7D_SIGNALS = {key for key, value in SIGNALS.items() if not value["is_rate"]} + +def make_signal_name(key): + """Convert a signal key to the corresponding signal name for the API.""" + return SIGNALS[key]["api_name"] + +NEWLINE="\n" diff --git a/dsew_community_profile/delphi_dsew_community_profile/pull.py b/dsew_community_profile/delphi_dsew_community_profile/pull.py new file mode 100644 index 000000000..f09d0badf --- /dev/null +++ b/dsew_community_profile/delphi_dsew_community_profile/pull.py @@ -0,0 +1,387 @@ +# -*- coding: utf-8 -*- +"""Functions to call when downloading data.""" +from dataclasses import dataclass +import datetime +import os +import re +from urllib.parse import quote_plus as quote_as_url + +import pandas as pd +import requests + +from delphi_utils.geomap import GeoMapper + +from .constants import TRANSFORMS, SIGNALS, COUNTS_7D_SIGNALS, NEWLINE +from .constants import DOWNLOAD_ATTACHMENT, DOWNLOAD_LISTING + +# YYYYMMDD +# example: "Community Profile Report 20211104.xlsx" +RE_DATE_FROM_FILENAME = re.compile(r'.*([0-9]{4})([0-9]{2})([0-9]{2}).*xlsx') + +# example: "TESTING: LAST WEEK (October 24-30, Test Volume October 20-26)" +# example: "TESTING: PREVIOUS WEEK (October 17-23, Test Volume October 13-19)" +DATE_EXP = r'(?:([A-Za-z]*) )?([0-9]{1,2})' +DATE_RANGE_EXP = f"{DATE_EXP}-{DATE_EXP}" +RE_DATE_FROM_TEST_HEADER = re.compile( + rf'.*TESTING: (.*) WEEK \({DATE_RANGE_EXP}(?:, Test Volume ({DATE_RANGE_EXP}))? *\)' +) + +# example: "HOSPITAL UTILIZATION: LAST WEEK (January 2-8)" +RE_DATE_FROM_HOSP_HEADER = re.compile( + rf'HOSPITAL UTILIZATION: (.*) WEEK \({DATE_RANGE_EXP}\)' +) + +# example: "NAAT positivity rate - last 7 days (may be an underestimate due to delayed reporting)" +# example: "Total NAATs - last 7 days (may be an underestimate due to delayed reporting)" +RE_COLUMN_FROM_HEADER = re.compile('- (.*) 7 days') + +@dataclass +class DatasetTimes: + """Collect reference dates for a column.""" + + column: str + positivity_reference_date: datetime.date + total_reference_date: datetime.date + hosp_reference_date: datetime.date + + @staticmethod + def from_header(header, publish_date): + """Convert reference dates in overheader to DatasetTimes.""" + def as_date(sub_result): + month = sub_result[2] if sub_result[2] else sub_result[0] + assert month, f"Bad month in header: {header}\nsub_result: {sub_result}" + month_numeric = datetime.datetime.strptime(month, "%B").month + day = sub_result[3] + year = publish_date.year + # year boundary + if month_numeric > publish_date.month: + year -= 1 + return datetime.datetime.strptime(f"{year}-{month}-{day}", "%Y-%B-%d").date() + + if RE_DATE_FROM_TEST_HEADER.match(header): + findall_result = RE_DATE_FROM_TEST_HEADER.findall(header)[0] + column = findall_result[0].lower() + positivity_reference_date = as_date(findall_result[1:5]) + if findall_result[6]: + # Reports published starting 2021-03-17 specify different reference + # dates for positivity and total test volume + total_reference_date = as_date(findall_result[6:10]) + else: + total_reference_date = positivity_reference_date + + hosp_reference_date = None + elif RE_DATE_FROM_HOSP_HEADER.match(header): + findall_result = RE_DATE_FROM_HOSP_HEADER.findall(header)[0] + column = findall_result[0].lower() + hosp_reference_date = as_date(findall_result[1:5]) + + total_reference_date = None + positivity_reference_date = None + else: + raise ValueError(f"Couldn't find reference date in header '{header}'") + + return DatasetTimes(column, positivity_reference_date, + total_reference_date, hosp_reference_date) + def __getitem__(self, key): + """Use DatasetTimes like a dictionary.""" + if key.lower()=="positivity": + return self.positivity_reference_date + if key.lower()=="total": + return self.total_reference_date + if key.lower()=="confirmed covid-19 admissions": + return self.hosp_reference_date + raise ValueError( + f"Bad reference date type request '{key}'; " + \ + "need 'total', 'positivity', or 'confirmed covid-19 admissions'" + ) + def __setitem__(self, key, newvalue): + """Use DatasetTimes like a dictionary.""" + if key.lower()=="positivity": + self.positivity_reference_date = newvalue + if key.lower()=="total": + self.total_reference_date = newvalue + if key.lower()=="confirmed covid-19 admissions": + self.hosp_reference_date = newvalue + else: + raise ValueError( + f"Bad reference date type request '{key}'; " + \ + "need 'total', 'positivity', or 'confirmed covid-19 admissions'" + ) + def __eq__(self, other): + """Check equality by value.""" + return isinstance(other, DatasetTimes) and \ + other.column == self.column and \ + other.positivity_reference_date == self.positivity_reference_date and \ + other.total_reference_date == self.total_reference_date + +class Dataset: + """All data extracted from a single report file.""" + + def __init__(self, config, sheets=TRANSFORMS.keys(), logger=None): + """Create a new Dataset instance. + + Download and cache the requested report file. + + Parse the file into data frames at multiple geo levels. + """ + self.publish_date = self.parse_publish_date(config['filename']) + self.url = DOWNLOAD_ATTACHMENT.format( + assetId=config['assetId'], + filename=quote_as_url(config['filename']) + ) + if not os.path.exists(config['cached_filename']): + if logger: + logger.info("Downloading file", filename=config['cached_filename']) + resp = requests.get(self.url) + with open(config['cached_filename'], 'wb') as f: + f.write(resp.content) + + self.workbook = pd.ExcelFile(config['cached_filename']) + + self.dfs = {} + self.times = {} + for si in sheets: + assert si in TRANSFORMS, f"Bad sheet requested: {si}" + if logger: + logger.info("Building dfs", + sheet=f"{si}", + filename=config['cached_filename']) + sheet = TRANSFORMS[si] + self._parse_times_for_sheet(sheet) + self._parse_sheet(sheet) + + @staticmethod + def parse_publish_date(report_filename): + """Extract publish date from filename.""" + return datetime.date( + *[int(x) for x in RE_DATE_FROM_FILENAME.findall(report_filename)[0]] + ) + @staticmethod + def skip_overheader(header): + """Ignore irrelevant overheaders.""" + # include "TESTING: [LAST|PREVIOUS] WEEK (October 24-30, Test Volume October 20-26)" + # include "VIRAL (RT-PCR) LAB TESTING: [LAST|PREVIOUS] WEEK (August 24-30, ..." + # include "HOSPITAL UTILIZATION: LAST WEEK (January 2-8)" + return not (isinstance(header, str) and \ + (header.startswith("TESTING:") or \ + header.startswith("VIRAL (RT-PCR) LAB TESTING:") or \ + header.startswith("HOSPITAL UTILIZATION:")) and \ + # exclude "TESTING: % CHANGE FROM PREVIOUS WEEK" \ + # exclude "TESTING: DEMOGRAPHIC DATA" \ + # exclude "HOSPITAL UTILIZATION: CHANGE FROM PREVIOUS WEEK" \ + # exclude "HOSPITAL UTILIZATION: DEMOGRAPHIC DATA" \ + header.find("WEEK (") > 0) + def _parse_times_for_sheet(self, sheet): + """Record reference dates for this sheet.""" + # grab reference dates from overheaders + overheaders = pd.read_excel( + self.workbook, sheet_name=sheet.name, + header=None, + nrows=1 + ).values.flatten().tolist() + for h in overheaders: + if self.skip_overheader(h): + continue + + dt = DatasetTimes.from_header(h, self.publish_date) + if dt.column in self.times: + # Items that are not None should be the same between sheets. + # Fill None items with the newly calculated version of the + # field from dt. + for sig in SIGNALS: + if self.times[dt.column][sig] is not None and dt[sig] is not None: + assert self.times[dt.column][sig] == dt[sig], \ + f"Conflicting reference date from {sheet.name} {dt[sig]}" + \ + f"vs previous {self.times[dt.column][sig]}" + elif self.times[dt.column][sig] is None: + self.times[dt.column][sig] = dt[sig] + else: + self.times[dt.column] = dt + assert len(self.times) == 2, \ + f"No times extracted from overheaders:\n{NEWLINE.join(str(s) for s in overheaders)}" + + @staticmethod + def retain_header(header): + """Ignore irrelevant headers.""" + return all([ + # include "Total NAATs - [last|previous] 7 days ..." + # include "Total RT-PCR diagnostic tests - [last|previous] 7 days ..." + # include "NAAT positivity rate - [last|previous] 7 days ..." + # include "Viral (RT-PCR) lab test positivity rate - [last|previous] 7 days ..." + (header.startswith("Total NAATs") or + header.startswith("NAAT positivity rate") or + header.startswith("Total RT-PCR") or + header.startswith("Viral (RT-PCR)")), + # exclude "NAAT positivity rate - absolute change ..." + header.find("7 days") > 0, + # exclude "NAAT positivity rate - last 7 days - ages <5" + header.find(" ages") < 0, + ]) or all([ + # include "Confirmed COVID-19 admissions - last 7 days" + header.startswith("Confirmed COVID-19 admissions"), + # exclude "Confirmed COVID-19 admissions - percent change" + header.find("7 days") > 0, + # exclude "Confirmed COVID-19 admissions - last 7 days - ages <18" + # exclude "Confirmed COVID-19 admissions - last 7 days - age unknown" + header.find(" age") < 0, + # exclude "Confirmed COVID-19 admissions per 100 inpatient beds - last 7 days" + header.find(" beds") < 0, + ]) + def _parse_sheet(self, sheet): + """Extract data frame for this sheet.""" + df = pd.read_excel( + self.workbook, + sheet_name=sheet.name, + header=1, + index_col=0, + ) + if sheet.row_filter: + df = df.loc[sheet.row_filter(df)] + select = [ + (RE_COLUMN_FROM_HEADER.findall(h)[0], h, h.lower()) + for h in list(df.columns) + if self.retain_header(h) + ] + + for sig in SIGNALS: + # Hospital admissions not available at the county or CBSA level prior to Jan 8, 2021. + if (sheet.level == "msa" or sheet.level == "county") \ + and self.publish_date < datetime.date(2021, 1, 8) \ + and sig == "confirmed covid-19 admissions": + self.dfs[(sheet.level, sig)] = pd.DataFrame( + columns = ["geo_id", "timestamp", "val", \ + "se", "sample_size", "publish_date"] + ) + continue + + sig_select = [s for s in select if s[-1].find(sig) >= 0] + assert len(sig_select) > 0, \ + f"No {sig} in any of {select}\n\nAll headers:\n{NEWLINE.join(list(df.columns))}" + + self.dfs[(sheet.level, sig)] = pd.concat([ + pd.DataFrame({ + "geo_id": sheet.geo_id_select(df).apply(sheet.geo_id_apply), + "timestamp": pd.to_datetime(self.times[si[0]][sig]), + "val": df[si[-2]], + "se": None, + "sample_size": None, + "publish_date": self.publish_date + }) + for si in sig_select + ]) + + for sig in COUNTS_7D_SIGNALS: + self.dfs[(sheet.level, sig)]["val"] /= 7 # 7-day total -> 7-day average + + +def as_cached_filename(params, config): + """Formulate a filename to uniquely identify this report in the input cache.""" + return os.path.join( + params['indicator']['input_cache'], + f"{config['assetId']}--{config['filename']}" + ) + +def fetch_listing(params): + """Generate the list of report files to process.""" + listing = requests.get(DOWNLOAD_LISTING).json()['metadata']['attachments'] + + # drop the pdf files + listing = [ + dict( + el, + cached_filename=as_cached_filename(params, el), + publish_date=Dataset.parse_publish_date(el['filename']) + ) + for el in listing if el['filename'].endswith("xlsx") + ] + + if params['indicator']['reports'] == 'new': + # drop files we already have in the input cache + listing = [el for el in listing if not os.path.exists(el['cached_filename'])] + elif params['indicator']['reports'].find("--") > 0: + # drop files outside the specified publish-date range + start_str, _, end_str = params['indicator']['reports'].partition("--") + start_date = datetime.datetime.strptime(start_str, "%Y-%m-%d").date() + end_date = datetime.datetime.strptime(end_str, "%Y-%m-%d").date() + listing = [ + el for el in listing + if start_date <= el['publish_date'] <= end_date + ] + # reference date is guaranteed to be before publish date, so we can trim + # reports that are too early + if 'export_start_date' in params['indicator']: + listing = [ + el for el in listing + if params['indicator']['export_start_date'] < el['publish_date'] + ] + # can't do the same for export_end_date + return listing + +def download_and_parse(listing, logger): + """Convert a list of report files into Dataset instances.""" + datasets = {} + for item in listing: + d = Dataset(item, logger=logger) + for sig, df in d.dfs.items(): + if sig not in datasets: + datasets[sig] = [] + datasets[sig].append(df) + return datasets + +def nation_from_state(df, sig, geomapper): + """Compute nation level from state df.""" + if SIGNALS[sig]["is_rate"]: # true if sig is a rate + df = geomapper.add_population_column(df, "state_id") \ + .rename(columns={"population":"weight"}) + + norm_denom = df.groupby("timestamp").agg(norm_denom=("weight", "sum")) + df = df.join( + norm_denom, on="timestamp", how="left" + ).assign( + weight=lambda x: x.weight / x.norm_denom + ).drop( + "norm_denom", axis=1 + ) + return geomapper.replace_geocode( + df, + 'state_id', + 'nation', + new_col="geo_id" + ) + +def fetch_new_reports(params, logger=None): + """Retrieve, compute, and collate all data we haven't seen yet.""" + listing = fetch_listing(params) + + # download and parse individual reports + datasets = download_and_parse(listing, logger) + + # collect like signals together, keeping most recent publish date + ret = {} + for sig, lst in datasets.items(): + latest_sig_df = pd.concat( + lst + ).groupby( + "timestamp" + ).apply( + lambda x: x[x["publish_date"] == x["publish_date"].max()] + ).drop( + "publish_date", axis=1 + ) + + if len(latest_sig_df.index) > 0: + ret[sig] = latest_sig_df.reset_index(drop=True) + + # add nation from state + geomapper = GeoMapper() + for sig in SIGNALS: + state_key = ("state", sig) + if state_key not in ret: + continue + ret[("nation", sig)] = nation_from_state( + ret[state_key].rename(columns={"geo_id": "state_id"}), + sig, + geomapper + ) + + return ret diff --git a/dsew_community_profile/delphi_dsew_community_profile/run.py b/dsew_community_profile/delphi_dsew_community_profile/run.py new file mode 100644 index 000000000..9d045187b --- /dev/null +++ b/dsew_community_profile/delphi_dsew_community_profile/run.py @@ -0,0 +1,79 @@ +# -*- coding: utf-8 -*- +"""Functions to call when running the indicator. + +This module should contain a function called `run_module`, that is executed when +the module is run with `python -m delphi_dsew_community_profile`. +`run_module`'s lone argument should be a nested dictionary of parameters loaded +from the params.json file. We expect the `params` to have the following +structure: + + - "common": + - "export_dir": str, directory to which the results are exported + - "log_filename": (optional) str, path to log file + - "indicator": (optional) + - Any other indicator-specific settings +""" +from datetime import datetime +import time + +from delphi_utils import get_structured_logger +from delphi_utils.export import create_export_csv +import pandas as pd + +from .constants import make_signal_name +from .pull import fetch_new_reports + + +def run_module(params): + """ + Run the indicator. + + Arguments + -------- + params: Dict[str, Any] + Nested dictionary of parameters. + """ + start_time = time.time() + logger = get_structured_logger( + __name__, filename=params["common"].get("log_filename"), + log_exceptions=params["common"].get("log_exceptions", True)) + def replace_date_param(p): + if p in params["indicator"] and params["indicator"][p] is not None: + date_param = datetime.strptime(params["indicator"][p], "%Y-%m-%d").date() + params["indicator"][p] = date_param + replace_date_param("export_start_date") + replace_date_param("export_end_date") + export_params = { + 'start_date': params["indicator"].get("export_start_date", None), + 'end_date': params["indicator"].get("export_end_date", None) + } + export_params = { + k: pd.to_datetime(v) if v is not None else v + for k, v in export_params.items() + } + + run_stats = [] + dfs = fetch_new_reports(params, logger) + for key, df in dfs.items(): + (geo, sig) = key + dates = create_export_csv( + df, + params['common']['export_dir'], + geo, + make_signal_name(sig), + **export_params + ) + if len(dates)>0: + run_stats.append((max(dates), len(dates))) + + ## log this indicator run + elapsed_time_in_seconds = round(time.time() - start_time, 2) + min_max_date = run_stats and min(s[0] for s in run_stats) + csv_export_count = sum(s[-1] for s in run_stats) + max_lag_in_days = min_max_date and (datetime.now() - min_max_date).days + formatted_min_max_date = min_max_date and min_max_date.strftime("%Y-%m-%d") + logger.info("Completed indicator run", + elapsed_time_in_seconds = elapsed_time_in_seconds, + csv_export_count = csv_export_count, + max_lag_in_days = max_lag_in_days, + oldest_final_export_date = formatted_min_max_date) diff --git a/dsew_community_profile/input_cache/.gitignore b/dsew_community_profile/input_cache/.gitignore new file mode 100644 index 000000000..7c1222033 --- /dev/null +++ b/dsew_community_profile/input_cache/.gitignore @@ -0,0 +1 @@ +*.xlsx diff --git a/dsew_community_profile/params.json.template b/dsew_community_profile/params.json.template new file mode 100644 index 000000000..37096599c --- /dev/null +++ b/dsew_community_profile/params.json.template @@ -0,0 +1,34 @@ +{ + "common": { + "export_dir": "./receiving", + "log_filename": "dsew_cpr.log" + }, + "indicator": { + "input_cache": "./input_cache", + "reports": "new", + "export_start_date": null, + "export_end_date": null + }, + "validation": { + "common": { + "data_source": "dsew_cpr", + "span_length": 14, + "min_expected_lag": {"all": "5"}, + "max_expected_lag": {"all": "9"}, + "dry_run": true, + "suppressed_errors": [] + }, + "static": { + "minimum_sample_size": 0, + "missing_se_allowed": true, + "missing_sample_size_allowed": true + }, + "dynamic": { + "ref_window_size": 7, + "smoothed_signals": [ + "naats_total_7dav", + "naats_positivity_7dav" + ] + } + } +} diff --git a/dsew_community_profile/setup.py b/dsew_community_profile/setup.py new file mode 100644 index 000000000..fb5f9d4a9 --- /dev/null +++ b/dsew_community_profile/setup.py @@ -0,0 +1,30 @@ +from setuptools import setup +from setuptools import find_packages + +required = [ + "numpy", + "openpyxl", + "pandas", + "pydocstyle", + "pytest", + "pytest-cov", + "pylint==2.8.3", + "delphi-utils", + "covidcast" +] + +setup( + name="delphi_dsew_community_profile", + version="0.1.0", + description="Indicator tracking specimen test results and hospital admissions published in the COVID-19 Community Profile Report by the Data Strategy and Execution Workgroup", + author="", + author_email="", + url="https://github.com/cmu-delphi/covidcast-indicators", + install_requires=required, + classifiers=[ + "Development Status :: 5 - Production/Stable", + "Intended Audience :: Developers", + "Programming Language :: Python :: 3.8", + ], + packages=find_packages(), +) diff --git a/dsew_community_profile/static/.gitignore b/dsew_community_profile/static/.gitignore new file mode 100644 index 000000000..e69de29bb diff --git a/dsew_community_profile/tests/params.json.template b/dsew_community_profile/tests/params.json.template new file mode 100644 index 000000000..89cee4bf0 --- /dev/null +++ b/dsew_community_profile/tests/params.json.template @@ -0,0 +1,32 @@ +{ + "common": { + "export_dir": "./receiving", + "log_filename": "dsew_cpr.log" + }, + "indicator": { + "input_cache": "./input_cache", + "reports": "new" + }, + "validation": { + "common": { + "data_source": "dsew_cpr", + "span_length": 14, + "min_expected_lag": {"all": "5"}, + "max_expected_lag": {"all": "9"}, + "dry_run": true, + "suppressed_errors": [] + }, + "static": { + "minimum_sample_size": 0, + "missing_se_allowed": true, + "missing_sample_size_allowed": true + }, + "dynamic": { + "ref_window_size": 7, + "smoothed_signals": [ + "naats_total_7dav", + "naats_positivity_7dav" + ] + } + } +} diff --git a/dsew_community_profile/tests/test_pull.py b/dsew_community_profile/tests/test_pull.py new file mode 100644 index 000000000..e472bf3d6 --- /dev/null +++ b/dsew_community_profile/tests/test_pull.py @@ -0,0 +1,209 @@ +from collections import namedtuple +from datetime import date, datetime +from itertools import chain +import pandas as pd +import pytest +from unittest.mock import patch, Mock + +from delphi_utils.geomap import GeoMapper + +from delphi_dsew_community_profile.pull import DatasetTimes +from delphi_dsew_community_profile.pull import Dataset +from delphi_dsew_community_profile.pull import fetch_listing, nation_from_state + +example = namedtuple("example", "given expected") + +class TestPull: + def test_DatasetTimes(self): + examples = [ + example(DatasetTimes("xyzzy", date(2021, 10, 30), date(2021, 10, 20), date(2021, 10, 22)), + DatasetTimes("xyzzy", date(2021, 10, 30), date(2021, 10, 20), date(2021, 10, 22))), + ] + for ex in examples: + assert ex.given == ex.expected, "Equality" + + dt = DatasetTimes("xyzzy", date(2021, 10, 30), date(2021, 10, 20), date(2021, 10, 22)) + assert dt["positivity"] == date(2021, 10, 30), "positivity" + assert dt["total"] == date(2021, 10, 20), "total" + assert dt["confirmed covid-19 admissions"] == date(2021, 10, 22), "confirmed covid-19 admissions" + with pytest.raises(ValueError): + dt["xyzzy"] + + def test_DatasetTimes_from_header(self): + examples = [ + example("TESTING: LAST WEEK (October 24-30, Test Volume October 20-26)", + DatasetTimes("last", date(2021, 10, 30), date(2021, 10, 26), None)), + example("TESTING: PREVIOUS WEEK (October 24-30, Test Volume October 20-26)", + DatasetTimes("previous", date(2021, 10, 30), date(2021, 10, 26), None)), + example("TESTING: LAST WEEK (October 24-November 30, Test Volume October 20-26)", + DatasetTimes("last", date(2021, 11, 30), date(2021, 10, 26), None)), + example("VIRAL (RT-PCR) LAB TESTING: LAST WEEK (June 7-13, Test Volume June 3-9 )", + DatasetTimes("last", date(2021, 6, 13), date(2021, 6, 9), None)), + example("VIRAL (RT-PCR) LAB TESTING: LAST WEEK (March 7-13)", + DatasetTimes("last", date(2021, 3, 13), date(2021, 3, 13), None)), + example("HOSPITAL UTILIZATION: LAST WEEK (June 2-8)", + DatasetTimes("last", None, None, date(2021, 6, 8))), + example("HOSPITAL UTILIZATION: LAST WEEK (June 28-July 8)", + DatasetTimes("last", None, None, date(2021, 7, 8))) + ] + for ex in examples: + assert DatasetTimes.from_header(ex.given, date(2021, 12, 31)) == ex.expected, ex.given + + # test year boundary + examples = [ + example("TESTING: LAST WEEK (October 24-30, Test Volume October 20-26)", + DatasetTimes("last", date(2020, 10, 30), date(2020, 10, 26), None)), + ] + for ex in examples: + assert DatasetTimes.from_header(ex.given, date(2021, 1, 1)) == ex.expected, ex.given + + def test_Dataset_skip_overheader(self): + examples = [ + example("TESTING: LAST WEEK (October 24-30, Test Volume October 20-26)", + False), + example("TESTING: PREVIOUS WEEK (October 17-23, Test Volume October 13-19)", + False), + example("VIRAL (RT-PCR) LAB TESTING: LAST WEEK (August 24-30, Test Volume August 20-26)", + False), + example("VIRAL (RT-PCR) LAB TESTING: PREVIOUS WEEK (August 17-23, Test Volume August 13-19)", + False), + example("TESTING: % CHANGE FROM PREVIOUS WEEK", + True), + example("VIRAL (RT-PCR) LAB TESTING: % CHANGE FROM PREVIOUS WEEK", + True), + example("TESTING: DEMOGRAPHIC DATA", + True), + example("HOSPITAL UTILIZATION: LAST WEEK (January 2-8)", + False), + example("HOSPITAL UTILIZATION: CHANGE FROM PREVIOUS WEEK", + True), + example("HOSPITAL UTILIZATION: DEMOGRAPHIC DATA", + True) + ] + for ex in examples: + assert Dataset.skip_overheader(ex.given) == ex.expected, ex.given + def test_Dataset_retain_header(self): + examples = [ + example("Total NAATs - last 7 days (may be an underestimate due to delayed reporting)", + True), + example("Total NAATs - previous 7 days (may be an underestimate due to delayed reporting)", + True), + example("NAAT positivity rate - last 7 days (may be an underestimate due to delayed reporting)", + True), + example("NAAT positivity rate - previous 7 days (may be an underestimate due to delayed reporting)", + True), + example("NAAT positivity rate - absolute change (may be an underestimate due to delayed reporting)", + False), + example("NAAT positivity rate - last 7 days - ages <5", + False), + example("Total RT-PCR diagnostic tests - last 7 days (may be an underestimate due to delayed reporting)", + True), + example("Viral (RT-PCR) lab test positivity rate - last 7 days (may be an underestimate due to delayed reporting)", + True), + example("RT-PCR tests per 100k - last 7 days (may be an underestimate due to delayed reporting)", + False), + example("Confirmed COVID-19 admissions - last 7 days", + True), + example("Confirmed COVID-19 admissions - percent change", + False), + example("Confirmed COVID-19 admissions - last 7 days - ages <18", + False), + example("Confirmed COVID-19 admissions - last 7 days - age unknown", + False), + example("Confirmed COVID-19 admissions per 100 inpatient beds - last 7 days", + False) + ] + for ex in examples: + assert Dataset.retain_header(ex.given) == ex.expected, ex.given + + def test_Dataset_parse_sheet(self): + # TODO + pass + + @patch('requests.get') + @patch('os.path.exists') + def test_fetch_listing(self, mock_listing, mock_exists): + inst = namedtuple("attachment", "assetId filename publish cache") + instances = list(chain(*[ + [ + inst(f"{i}", f"2021010{i}.xlsx", date(2021, 1, i), f"{i}---2021010{i}.xlsx"), + inst(f"p{i}", f"2021010{i}.pdf", date(2021, 1, i), f"p{i}---2021010{i}.pdf"), + ] + for i in [1, 2, 3, 4, 5] + ])) + + mock_listing.return_value = Mock() + mock_listing.return_value.json = Mock( + return_value = { + 'metadata': { + 'attachments': [ + {"assetId": i.assetId, "filename": i.filename} + for i in instances + ] + } + } + ) + + mock_exists.reset_mock(return_value=False) + + def as_listing(instance): + return { + "assetId": instance.assetId, + "filename": instance.filename, + "cached_filename": instance.cache, + "publish_date": instance.publish + } + ex = example( + {'indicator':{'reports':'new'}}, + [ + as_listing(instance) + for i, instance in filter(lambda x: x[0]%2 == 0, enumerate(instances)) + ] + ) + + for actual, expected in zip(fetch_listing(ex.given), ex.expected): + assert actual == expected + + def test_nation_from_state(self): + geomapper = GeoMapper() + state_pop = geomapper.get_crosswalk("state_id", "pop") + + test_df = pd.DataFrame({ + 'state_id': ['pa', 'wv'], + 'timestamp': [datetime(year=2020, month=1, day=1)]*2, + 'val': [15., 150.],}) + + pa_pop = int(state_pop.loc[state_pop.state_id == "pa", "pop"]) + wv_pop = int(state_pop.loc[state_pop.state_id == "wv", "pop"]) + tot_pop = pa_pop + wv_pop + + assert True, nation_from_state( + test_df.copy(), + "total", + geomapper + ) + pd.testing.assert_frame_equal( + nation_from_state( + test_df.copy(), + "total", + geomapper + ), + pd.DataFrame({ + 'geo_id': ['us'], + 'timestamp': [datetime(year=2020, month=1, day=1)], + 'val': [15. + 150.],}), + check_like=True + ) + + pd.testing.assert_frame_equal( + nation_from_state( + test_df.copy(), + "positivity", + geomapper + ), + pd.DataFrame({ + 'geo_id': ['us'], + 'timestamp': [datetime(year=2020, month=1, day=1)], + 'val': [15*pa_pop/tot_pop + 150*wv_pop/tot_pop],}), + check_like=True + )