diff --git a/ansible/templates/quidel_covidtest-params-prod.json.j2 b/ansible/templates/quidel_covidtest-params-prod.json.j2 index fe605206b..c8f7ebf49 100644 --- a/ansible/templates/quidel_covidtest-params-prod.json.j2 +++ b/ansible/templates/quidel_covidtest-params-prod.json.j2 @@ -10,6 +10,8 @@ "export_end_date": "", "pull_start_date": "2020-05-26", "pull_end_date":"", + "backfill_dir": "/common/backfill/quidel_covidtest", + "backfill_merge_day": 0, "export_day_range":40, "aws_credentials": { "aws_access_key_id": "{{ quidel_aws_access_key_id }}", diff --git a/quidel_covidtest/delphi_quidel_covidtest/backfill.py b/quidel_covidtest/delphi_quidel_covidtest/backfill.py new file mode 100644 index 000000000..7e8482551 --- /dev/null +++ b/quidel_covidtest/delphi_quidel_covidtest/backfill.py @@ -0,0 +1,125 @@ +# -*- coding: utf-8 -*- +"""Store backfill data.""" +import os +import glob +from datetime import datetime + +import pandas as pd + +from delphi_utils import GeoMapper + + +gmpr = GeoMapper() + +def store_backfill_file(df, _end_date, backfill_dir): + """ + Store county level backfill data into backfill_dir. + + Parameter: + df: pd.DataFrame + Pre-process file at ZipCode level + _end_date: datetime + The most recent date when the raw data is received + backfill_dir: str + specified path to store backfill files. + """ + backfilldata = df.copy() + backfilldata = gmpr.replace_geocode(backfilldata, from_code="zip", new_code="fips", + from_col="zip", new_col="fips", date_col="timestamp") + backfilldata = gmpr.add_geocode(backfilldata, from_code="fips", new_code="state_id", + from_col="fips", new_col="state_id") + backfilldata.rename({"timestamp": "time_value", + "totalTest_total": "den_total", + "positiveTest_total": "num_total", + "positiveTest_age_0_4": "num_age_0_4", + "totalTest_age_0_4": "den_age_0_4", + "positiveTest_age_5_17": "num_age_5_17", + "totalTest_age_5_17": "den_age_5_17", + "positiveTest_age_18_49": "num_age_18_49", + "totalTest_age_18_49": "den_age_18_49", + "positiveTest_age_50_64": "num_age_50_64", + "totalTest_age_50_64": "den_age_50_64", + "positiveTest_age_65plus": "num_age_65plus", + "totalTest_age_65plus": "den_age_65plus", + "positiveTest_age_0_17": "num_age_0_17", + "totalTest_age_0_17": "den_age_0_17"}, + axis=1, inplace=True) + #Store one year's backfill data + _start_date = _end_date.replace(year=_end_date.year-1) + selected_columns = ['time_value', 'fips', 'state_id', + 'den_total', 'num_total', + 'num_age_0_4', 'den_age_0_4', + 'num_age_5_17', 'den_age_5_17', + 'num_age_18_49', 'den_age_18_49', + 'num_age_50_64', 'den_age_50_64', + 'num_age_65plus', 'den_age_65plus', + 'num_age_0_17', 'den_age_0_17'] + backfilldata = backfilldata.loc[backfilldata["time_value"] >= _start_date, + selected_columns] + path = backfill_dir + \ + "/quidel_covidtest_as_of_%s.parquet"%datetime.strftime(_end_date, "%Y%m%d") + # Store intermediate file into the backfill folder + backfilldata.to_parquet(path, index=False) + +def merge_backfill_file(backfill_dir, backfill_merge_day, today, + test_mode=False, check_nd=25): + """ + Merge ~4 weeks' backfill data into one file. + + Usually this function should merge 28 days' data into a new file so as to + save the reading time when running the backfill pipelines. We set a softer + threshold to allow flexibility in data delivery. + + Parameters + ---------- + today : datetime + The most recent date when the raw data is received + backfill_dir : str + specified path to store backfill files. + backfill_merge_day: int + The day of a week that we used to merge the backfill files. e.g. 0 + is Monday. + test_mode: bool + check_nd: int + The criteria of the number of unmerged files. Ideally, we want the + number to be 28, but we use a looser criteria from practical + considerations + """ + new_files = glob.glob(backfill_dir + "/quidel_covidtest_as_of_*") + if len(new_files) == 0: # if no any daily file is stored + return + + def get_date(file_link): + # Keep the function here consistent with the backfill path in + # function `store_backfill_file` + fn = file_link.split("/")[-1].split(".parquet")[0].split("_")[-1] + return datetime.strptime(fn, "%Y%m%d") + + date_list = list(map(get_date, new_files)) + earliest_date = min(date_list) + latest_date = max(date_list) + + # Check whether to merge + # Check the number of files that are not merged + if today.weekday() != backfill_merge_day or (today-earliest_date).days <= check_nd: + return + + # Start to merge files + pdList = [] + for fn in new_files: + df = pd.read_parquet(fn, engine='pyarrow') + issue_date = get_date(fn) + df["issue_date"] = issue_date + df["lag"] = [(issue_date - x).days for x in df["time_value"]] + pdList.append(df) + merged_file = pd.concat(pdList).sort_values(["time_value", "fips"]) + path = backfill_dir + "/quidel_covidtest_from_%s_to_%s.parquet"%( + datetime.strftime(earliest_date, "%Y%m%d"), + datetime.strftime(latest_date, "%Y%m%d")) + merged_file.to_parquet(path, index=False) + + # Delete daily files once we have the merged one. + if not test_mode: + for fn in new_files: + os.remove(fn) + return diff --git a/quidel_covidtest/delphi_quidel_covidtest/pull.py b/quidel_covidtest/delphi_quidel_covidtest/pull.py index ab397ce47..2ac5b958f 100644 --- a/quidel_covidtest/delphi_quidel_covidtest/pull.py +++ b/quidel_covidtest/delphi_quidel_covidtest/pull.py @@ -10,6 +10,8 @@ from .constants import AGE_GROUPS + + def get_from_s3(start_date, end_date, bucket, logger): """ Get raw data from aws s3 bucket. @@ -57,6 +59,8 @@ def get_from_s3(start_date, end_date, bucket, logger): # Fetch data received on the same day for fn in s3_files[search_date]: + if ".csv" not in fn: + continue #Add to avoid that the folder name was readed as a fn. if fn in set(df["fname"].values): continue obj = bucket.Object(key=fn) diff --git a/quidel_covidtest/delphi_quidel_covidtest/run.py b/quidel_covidtest/delphi_quidel_covidtest/run.py index fb1a69ac2..4f7f7fc8e 100644 --- a/quidel_covidtest/delphi_quidel_covidtest/run.py +++ b/quidel_covidtest/delphi_quidel_covidtest/run.py @@ -26,7 +26,7 @@ check_export_start_date, check_export_end_date, update_cache_file) - +from .backfill import (store_backfill_file, merge_backfill_file) def log_exit(start_time, stats, logger): """Log at program exit.""" @@ -66,6 +66,7 @@ def run_module(params: Dict[str, Any]): - indicator": - "static_file_dir": str, directory name with population information - "input_cache_dir": str, directory in which to cache input data + - "backfill_dir": str, directory in which to store the backfill files - "export_start_date": str, YYYY-MM-DD format of earliest date to create output - "export_end_date": str, YYYY-MM-DD format of latest date to create output or "" to create through the present @@ -85,6 +86,8 @@ def run_module(params: Dict[str, Any]): stats = [] atexit.register(log_exit, start_time, stats, logger) cache_dir = params["indicator"]["input_cache_dir"] + backfill_dir = params["indicator"]["backfill_dir"] + backfill_merge_day = params["indicator"]["backfill_merge_day"] export_dir = params["common"]["export_dir"] export_start_date = params["indicator"]["export_start_date"] export_end_date = params["indicator"]["export_end_date"] @@ -92,9 +95,15 @@ def run_module(params: Dict[str, Any]): # Pull data and update export date df, _end_date = pull_quidel_covidtest(params["indicator"], logger) + # Merge 4 weeks' data into one file to save runtime + # Notice that here we don't check the _end_date(receive date) + # since we always want such merging happens on a certain day of a week + merge_backfill_file(backfill_dir, backfill_merge_day, datetime.today()) if _end_date is None: logger.info("The data is up-to-date. Currently, no new data to be ingested.") return + # Store the backfill intermediate file + store_backfill_file(df, _end_date, backfill_dir) export_end_date = check_export_end_date( export_end_date, _end_date, END_FROM_TODAY_MINUS) export_start_date = check_export_start_date( diff --git a/quidel_covidtest/params.json.template b/quidel_covidtest/params.json.template index e96d2ccbc..d05ecd25e 100644 --- a/quidel_covidtest/params.json.template +++ b/quidel_covidtest/params.json.template @@ -7,6 +7,8 @@ "indicator": { "static_file_dir": "./static", "input_cache_dir": "./cache", + "backfill_dir": "./backfill", + "backfill_merge_day": 0, "export_start_date": "2020-05-26", "export_end_date": "", "pull_start_date": "2020-05-26", diff --git a/quidel_covidtest/setup.py b/quidel_covidtest/setup.py index 4c01e8593..369ac30c0 100644 --- a/quidel_covidtest/setup.py +++ b/quidel_covidtest/setup.py @@ -4,6 +4,7 @@ required = [ "numpy", "pandas", + "pyarrow", "pydocstyle", "pytest", "pytest-cov", diff --git a/quidel_covidtest/tests/backfill/.gitignore b/quidel_covidtest/tests/backfill/.gitignore new file mode 100644 index 000000000..afed0735d --- /dev/null +++ b/quidel_covidtest/tests/backfill/.gitignore @@ -0,0 +1 @@ +*.csv diff --git a/quidel_covidtest/tests/test_backfill.py b/quidel_covidtest/tests/test_backfill.py new file mode 100644 index 000000000..7a033fb47 --- /dev/null +++ b/quidel_covidtest/tests/test_backfill.py @@ -0,0 +1,108 @@ +import logging +import os +import glob +from datetime import datetime + +import pandas as pd + +from delphi_quidel_covidtest.pull import pull_quidel_covidtest + +from delphi_quidel_covidtest.backfill import (store_backfill_file, + merge_backfill_file) + +END_FROM_TODAY_MINUS = 5 +EXPORT_DAY_RANGE = 40 + +TEST_LOGGER = logging.getLogger() +backfill_dir="./backfill" + +class TestBackfill: + + df, _end_date = pull_quidel_covidtest({ + "static_file_dir": "../static", + "input_cache_dir": "./cache", + "export_start_date": "2020-06-30", + "export_end_date": "", + "pull_start_date": "2020-07-09", + "pull_end_date":"", + "aws_credentials": { + "aws_access_key_id": "", + "aws_secret_access_key": "" + }, + "bucket_name": "", + "wip_signal": "", + "test_mode": True + }, TEST_LOGGER) + + def test_store_backfill_file(self): + + store_backfill_file(self.df, datetime(2020, 1, 1), backfill_dir) + fn = "quidel_covidtest_as_of_20200101.parquet" + assert fn in os.listdir(backfill_dir) + + backfill_df = pd.read_parquet(backfill_dir + "/"+ fn, engine='pyarrow') + + selected_columns = ['time_value', 'fips', 'state_id', + 'den_total', 'num_total', + 'num_age_0_4', 'den_age_0_4', + 'num_age_5_17', 'den_age_5_17', + 'num_age_18_49', 'den_age_18_49', + 'num_age_50_64', 'den_age_50_64', + 'num_age_65plus', 'den_age_65plus', + 'num_age_0_17', 'den_age_0_17'] + assert set(selected_columns) == set(backfill_df.columns) + + os.remove(backfill_dir + "/" + fn) + assert fn not in os.listdir(backfill_dir) + + def test_merge_backfill_file(self): + + today = datetime.today() + fn = "quidel_covidtest_from_20200817_to_20200820.parquet" + assert fn not in os.listdir(backfill_dir) + + # Check the when no daily file stored + today = datetime(2020, 8, 20) + merge_backfill_file(backfill_dir, today.weekday(), today, test_mode=True, check_nd=8) + assert fn not in os.listdir(backfill_dir) + + for d in range(17, 21): + dropdate = datetime(2020, 8, d) + store_backfill_file(self.df, dropdate, backfill_dir) + + # Check the when the merged file is not generated + today = datetime(2020, 8, 20) + merge_backfill_file(backfill_dir, today.weekday(), today, test_mode=True, check_nd=8) + assert fn not in os.listdir(backfill_dir) + + # Generate the merged file, but not delete it + merge_backfill_file(backfill_dir, today.weekday(), today, test_mode=True, check_nd=2) + assert fn in os.listdir(backfill_dir) + + # Read daily file + new_files = glob.glob(backfill_dir + "/quidel_covidtest*.parquet") + pdList = [] + for file in new_files: + if "from" in file: + continue + df = pd.read_parquet(file, engine='pyarrow') + issue_date = datetime.strptime(file[-16:-8], "%Y%m%d") + df["issue_date"] = issue_date + df["lag"] = [(issue_date - x).days for x in df["time_value"]] + pdList.append(df) + os.remove(file) + new_files = glob.glob(backfill_dir + "/quidel_covidtest*.parquet") + assert len(new_files) == 1 + + expected = pd.concat(pdList).sort_values(["time_value", "fips"]) + + # Read the merged file + merged = pd.read_parquet(backfill_dir + "/" + fn, engine='pyarrow') + + assert set(expected.columns) == set(merged.columns) + assert expected.shape[0] == merged.shape[0] + assert expected.shape[1] == merged.shape[1] + + os.remove(backfill_dir + "/" + fn) + assert fn not in os.listdir(backfill_dir) + diff --git a/quidel_covidtest/tests/test_pull.py b/quidel_covidtest/tests/test_pull.py index ae35fc68f..a3436392b 100644 --- a/quidel_covidtest/tests/test_pull.py +++ b/quidel_covidtest/tests/test_pull.py @@ -103,3 +103,4 @@ def test_check_export_start_date(self): expected = [datetime(2020, 5, 26), datetime(2020, 6, 20), datetime(2020, 5, 26)] assert tested == expected + diff --git a/quidel_covidtest/tests/test_run.py b/quidel_covidtest/tests/test_run.py index d9e28108a..4e15bea91 100644 --- a/quidel_covidtest/tests/test_run.py +++ b/quidel_covidtest/tests/test_run.py @@ -21,6 +21,8 @@ class TestRun: "indicator": { "static_file_dir": "../static", "input_cache_dir": "./cache", + "backfill_dir": "./backfill", + "backfill_merge_day": 0, "export_start_date": "2020-06-30", "export_end_date": "", "pull_start_date": "2020-07-09",