Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ansible/templates/quidel_covidtest-params-prod.json.j2
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
"export_end_date": "",
"pull_start_date": "2020-05-26",
"pull_end_date":"",
"backfill_dir": "/common/backfill/quidel_covidtest",
"backfill_merge_day": 0,
"export_day_range":40,
"aws_credentials": {
"aws_access_key_id": "{{ quidel_aws_access_key_id }}",
Expand Down
125 changes: 125 additions & 0 deletions quidel_covidtest/delphi_quidel_covidtest/backfill.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
# -*- coding: utf-8 -*-
"""Store backfill data."""
import os
import glob
from datetime import datetime

import pandas as pd

from delphi_utils import GeoMapper


gmpr = GeoMapper()

def store_backfill_file(df, _end_date, backfill_dir):
"""
Store county level backfill data into backfill_dir.

Parameter:
df: pd.DataFrame
Pre-process file at ZipCode level
_end_date: datetime
The most recent date when the raw data is received
backfill_dir: str
specified path to store backfill files.
"""
backfilldata = df.copy()
backfilldata = gmpr.replace_geocode(backfilldata, from_code="zip", new_code="fips",
from_col="zip", new_col="fips", date_col="timestamp")
backfilldata = gmpr.add_geocode(backfilldata, from_code="fips", new_code="state_id",
from_col="fips", new_col="state_id")
backfilldata.rename({"timestamp": "time_value",
"totalTest_total": "den_total",
"positiveTest_total": "num_total",
"positiveTest_age_0_4": "num_age_0_4",
"totalTest_age_0_4": "den_age_0_4",
"positiveTest_age_5_17": "num_age_5_17",
"totalTest_age_5_17": "den_age_5_17",
"positiveTest_age_18_49": "num_age_18_49",
Comment on lines +31 to +38
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have we considered saving each of these age buckets to a separate data file? That way, the numerator/denominator field names can be standardized ("num" and "den") for easier handling in the corrections pipeline. I'm imagining filenames like quidel_covidtest_all_ages_as_of_20200817.parquet, quidel_covidtest_age_0_4_as_of_20200817.parquet, etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did. But it adds burn to the storage. But it might not be supper big deal.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having a consistent format is really nice and will save some work down the line if the backfill system gets more complicated. BUT either way works and this is just the first version (and I already have the backfill correction package to handle quidel's nonstandard numerator/denominator names, so that's not a concern).

"totalTest_age_18_49": "den_age_18_49",
"positiveTest_age_50_64": "num_age_50_64",
"totalTest_age_50_64": "den_age_50_64",
"positiveTest_age_65plus": "num_age_65plus",
"totalTest_age_65plus": "den_age_65plus",
"positiveTest_age_0_17": "num_age_0_17",
"totalTest_age_0_17": "den_age_0_17"},
axis=1, inplace=True)
#Store one year's backfill data
_start_date = _end_date.replace(year=_end_date.year-1)
selected_columns = ['time_value', 'fips', 'state_id',
'den_total', 'num_total',
'num_age_0_4', 'den_age_0_4',
'num_age_5_17', 'den_age_5_17',
'num_age_18_49', 'den_age_18_49',
'num_age_50_64', 'den_age_50_64',
'num_age_65plus', 'den_age_65plus',
'num_age_0_17', 'den_age_0_17']
backfilldata = backfilldata.loc[backfilldata["time_value"] >= _start_date,
selected_columns]
path = backfill_dir + \
"/quidel_covidtest_as_of_%s.parquet"%datetime.strftime(_end_date, "%Y%m%d")
# Store intermediate file into the backfill folder
backfilldata.to_parquet(path, index=False)

def merge_backfill_file(backfill_dir, backfill_merge_day, today,
test_mode=False, check_nd=25):
"""
Merge ~4 weeks' backfill data into one file.

Usually this function should merge 28 days' data into a new file so as to
save the reading time when running the backfill pipelines. We set a softer
threshold to allow flexibility in data delivery.

Parameters
----------
today : datetime
The most recent date when the raw data is received
backfill_dir : str
specified path to store backfill files.
backfill_merge_day: int
The day of a week that we used to merge the backfill files. e.g. 0
is Monday.
test_mode: bool
check_nd: int
The criteria of the number of unmerged files. Ideally, we want the
number to be 28, but we use a looser criteria from practical
considerations
"""
new_files = glob.glob(backfill_dir + "/quidel_covidtest_as_of_*")
if len(new_files) == 0: # if no any daily file is stored
return

def get_date(file_link):
# Keep the function here consistent with the backfill path in
# function `store_backfill_file`
fn = file_link.split("/")[-1].split(".parquet")[0].split("_")[-1]
return datetime.strptime(fn, "%Y%m%d")

date_list = list(map(get_date, new_files))
earliest_date = min(date_list)
latest_date = max(date_list)

# Check whether to merge
# Check the number of files that are not merged
if today.weekday() != backfill_merge_day or (today-earliest_date).days <= check_nd:
return

# Start to merge files
pdList = []
for fn in new_files:
df = pd.read_parquet(fn, engine='pyarrow')
issue_date = get_date(fn)
df["issue_date"] = issue_date
df["lag"] = [(issue_date - x).days for x in df["time_value"]]
pdList.append(df)
merged_file = pd.concat(pdList).sort_values(["time_value", "fips"])
path = backfill_dir + "/quidel_covidtest_from_%s_to_%s.parquet"%(
datetime.strftime(earliest_date, "%Y%m%d"),
datetime.strftime(latest_date, "%Y%m%d"))
merged_file.to_parquet(path, index=False)

# Delete daily files once we have the merged one.
if not test_mode:
for fn in new_files:
os.remove(fn)
return
4 changes: 4 additions & 0 deletions quidel_covidtest/delphi_quidel_covidtest/pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

from .constants import AGE_GROUPS



def get_from_s3(start_date, end_date, bucket, logger):
"""
Get raw data from aws s3 bucket.
Expand Down Expand Up @@ -57,6 +59,8 @@ def get_from_s3(start_date, end_date, bucket, logger):

# Fetch data received on the same day
for fn in s3_files[search_date]:
if ".csv" not in fn:
continue #Add to avoid that the folder name was readed as a fn.
if fn in set(df["fname"].values):
continue
obj = bucket.Object(key=fn)
Expand Down
11 changes: 10 additions & 1 deletion quidel_covidtest/delphi_quidel_covidtest/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
check_export_start_date,
check_export_end_date,
update_cache_file)

from .backfill import (store_backfill_file, merge_backfill_file)

def log_exit(start_time, stats, logger):
"""Log at program exit."""
Expand Down Expand Up @@ -66,6 +66,7 @@ def run_module(params: Dict[str, Any]):
- indicator":
- "static_file_dir": str, directory name with population information
- "input_cache_dir": str, directory in which to cache input data
- "backfill_dir": str, directory in which to store the backfill files
- "export_start_date": str, YYYY-MM-DD format of earliest date to create output
- "export_end_date": str, YYYY-MM-DD format of latest date to create output or "" to create
through the present
Expand All @@ -85,16 +86,24 @@ def run_module(params: Dict[str, Any]):
stats = []
atexit.register(log_exit, start_time, stats, logger)
cache_dir = params["indicator"]["input_cache_dir"]
backfill_dir = params["indicator"]["backfill_dir"]
backfill_merge_day = params["indicator"]["backfill_merge_day"]
export_dir = params["common"]["export_dir"]
export_start_date = params["indicator"]["export_start_date"]
export_end_date = params["indicator"]["export_end_date"]
export_day_range = params["indicator"]["export_day_range"]

# Pull data and update export date
df, _end_date = pull_quidel_covidtest(params["indicator"], logger)
# Merge 4 weeks' data into one file to save runtime
# Notice that here we don't check the _end_date(receive date)
# since we always want such merging happens on a certain day of a week
merge_backfill_file(backfill_dir, backfill_merge_day, datetime.today())
if _end_date is None:
logger.info("The data is up-to-date. Currently, no new data to be ingested.")
return
# Store the backfill intermediate file
store_backfill_file(df, _end_date, backfill_dir)
export_end_date = check_export_end_date(
export_end_date, _end_date, END_FROM_TODAY_MINUS)
export_start_date = check_export_start_date(
Expand Down
2 changes: 2 additions & 0 deletions quidel_covidtest/params.json.template
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
"indicator": {
"static_file_dir": "./static",
"input_cache_dir": "./cache",
"backfill_dir": "./backfill",
"backfill_merge_day": 0,
"export_start_date": "2020-05-26",
"export_end_date": "",
"pull_start_date": "2020-05-26",
Expand Down
1 change: 1 addition & 0 deletions quidel_covidtest/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
required = [
"numpy",
"pandas",
"pyarrow",
"pydocstyle",
"pytest",
"pytest-cov",
Expand Down
1 change: 1 addition & 0 deletions quidel_covidtest/tests/backfill/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.csv
108 changes: 108 additions & 0 deletions quidel_covidtest/tests/test_backfill.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import logging
import os
import glob
from datetime import datetime

import pandas as pd

from delphi_quidel_covidtest.pull import pull_quidel_covidtest

from delphi_quidel_covidtest.backfill import (store_backfill_file,
merge_backfill_file)

END_FROM_TODAY_MINUS = 5
EXPORT_DAY_RANGE = 40

TEST_LOGGER = logging.getLogger()
backfill_dir="./backfill"

class TestBackfill:

df, _end_date = pull_quidel_covidtest({
"static_file_dir": "../static",
"input_cache_dir": "./cache",
"export_start_date": "2020-06-30",
"export_end_date": "",
"pull_start_date": "2020-07-09",
"pull_end_date":"",
"aws_credentials": {
"aws_access_key_id": "",
"aws_secret_access_key": ""
},
"bucket_name": "",
"wip_signal": "",
"test_mode": True
}, TEST_LOGGER)

def test_store_backfill_file(self):

store_backfill_file(self.df, datetime(2020, 1, 1), backfill_dir)
fn = "quidel_covidtest_as_of_20200101.parquet"
assert fn in os.listdir(backfill_dir)

backfill_df = pd.read_parquet(backfill_dir + "/"+ fn, engine='pyarrow')

selected_columns = ['time_value', 'fips', 'state_id',
'den_total', 'num_total',
'num_age_0_4', 'den_age_0_4',
'num_age_5_17', 'den_age_5_17',
'num_age_18_49', 'den_age_18_49',
'num_age_50_64', 'den_age_50_64',
'num_age_65plus', 'den_age_65plus',
'num_age_0_17', 'den_age_0_17']
assert set(selected_columns) == set(backfill_df.columns)

os.remove(backfill_dir + "/" + fn)
assert fn not in os.listdir(backfill_dir)

def test_merge_backfill_file(self):

today = datetime.today()
fn = "quidel_covidtest_from_20200817_to_20200820.parquet"
assert fn not in os.listdir(backfill_dir)

# Check the when no daily file stored
today = datetime(2020, 8, 20)
merge_backfill_file(backfill_dir, today.weekday(), today, test_mode=True, check_nd=8)
assert fn not in os.listdir(backfill_dir)

for d in range(17, 21):
dropdate = datetime(2020, 8, d)
store_backfill_file(self.df, dropdate, backfill_dir)

# Check the when the merged file is not generated
today = datetime(2020, 8, 20)
merge_backfill_file(backfill_dir, today.weekday(), today, test_mode=True, check_nd=8)
assert fn not in os.listdir(backfill_dir)

# Generate the merged file, but not delete it
merge_backfill_file(backfill_dir, today.weekday(), today, test_mode=True, check_nd=2)
assert fn in os.listdir(backfill_dir)

# Read daily file
new_files = glob.glob(backfill_dir + "/quidel_covidtest*.parquet")
pdList = []
for file in new_files:
if "from" in file:
continue
df = pd.read_parquet(file, engine='pyarrow')
issue_date = datetime.strptime(file[-16:-8], "%Y%m%d")
df["issue_date"] = issue_date
df["lag"] = [(issue_date - x).days for x in df["time_value"]]
pdList.append(df)
os.remove(file)
new_files = glob.glob(backfill_dir + "/quidel_covidtest*.parquet")
assert len(new_files) == 1

expected = pd.concat(pdList).sort_values(["time_value", "fips"])

# Read the merged file
merged = pd.read_parquet(backfill_dir + "/" + fn, engine='pyarrow')

assert set(expected.columns) == set(merged.columns)
assert expected.shape[0] == merged.shape[0]
assert expected.shape[1] == merged.shape[1]

os.remove(backfill_dir + "/" + fn)
assert fn not in os.listdir(backfill_dir)

1 change: 1 addition & 0 deletions quidel_covidtest/tests/test_pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,4 @@ def test_check_export_start_date(self):
expected = [datetime(2020, 5, 26), datetime(2020, 6, 20), datetime(2020, 5, 26)]

assert tested == expected

2 changes: 2 additions & 0 deletions quidel_covidtest/tests/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ class TestRun:
"indicator": {
"static_file_dir": "../static",
"input_cache_dir": "./cache",
"backfill_dir": "./backfill",
"backfill_merge_day": 0,
"export_start_date": "2020-06-30",
"export_end_date": "",
"pull_start_date": "2020-07-09",
Expand Down