diff --git a/_delphi_utils_python/delphi_utils/archive.py b/_delphi_utils_python/delphi_utils/archive.py index 5d1036bcd..994dfd7df 100644 --- a/_delphi_utils_python/delphi_utils/archive.py +++ b/_delphi_utils_python/delphi_utils/archive.py @@ -40,9 +40,11 @@ from git import Repo from git.refs.head import Head import pandas as pd +import numpy as np from .utils import read_params from .logger import get_structured_logger +from .nancodes import Nans Files = List[str] FileDiffMap = Dict[str, Optional[str]] @@ -73,8 +75,10 @@ def diff_export_csv( changed_df is the pd.DataFrame of common rows from after_csv with changed values. added_df is the pd.DataFrame of added rows from after_csv. """ - export_csv_dtypes = {"geo_id": str, "val": float, - "se": float, "sample_size": float} + export_csv_dtypes = { + "geo_id": str, "val": float, "se": float, "sample_size": float, + "missing_val": int, "missing_se": int, "missing_sample_size": int + } before_df = pd.read_csv(before_csv, dtype=export_csv_dtypes) before_df.set_index("geo_id", inplace=True) @@ -89,12 +93,22 @@ def diff_export_csv( before_df_cmn = before_df.reindex(common_idx) after_df_cmn = after_df.reindex(common_idx) - # Exact comparisons, treating NA == NA as True - same_mask = before_df_cmn == after_df_cmn - same_mask |= pd.isna(before_df_cmn) & pd.isna(after_df_cmn) + # If CSVs have different columns (no missingness), mark all values as new + if ("missing_val" in before_df_cmn.columns) ^ ("missing_val" in after_df_cmn.columns): + same_mask = after_df_cmn.copy() + same_mask.loc[:] = False + else: + # Exact comparisons, treating NA == NA as True + same_mask = before_df_cmn == after_df_cmn + same_mask |= pd.isna(before_df_cmn) & pd.isna(after_df_cmn) + + # Code deleted entries as nans with the deleted missing code + deleted_df = before_df.loc[deleted_idx, :].copy() + deleted_df[["val", "se", "sample_size"]] = np.nan + deleted_df[["missing_val", "missing_se", "missing_sample_size"]] = Nans.DELETED return ( - before_df.loc[deleted_idx, :], + deleted_df, after_df_cmn.loc[~(same_mask.all(axis=1)), :], after_df.loc[added_idx, :]) @@ -227,11 +241,11 @@ def diff_exports(self) -> Tuple[Files, FileDiffMap, Files]: deleted_df, changed_df, added_df = diff_export_csv( before_file, after_file) - new_issues_df = pd.concat([changed_df, added_df], axis=0) + new_issues_df = pd.concat([deleted_df, changed_df, added_df], axis=0) if len(deleted_df) > 0: print( - f"Warning, diff has deleted indices in {after_file} that will be ignored") + f"Diff has deleted indices in {after_file} that have been coded as nans.") # Write the diffs to diff_file, if applicable if len(new_issues_df) > 0: @@ -240,7 +254,26 @@ def diff_exports(self) -> Tuple[Files, FileDiffMap, Files]: new_issues_df.to_csv(diff_file, na_rep="NA") common_diffs[after_file] = diff_file - return deleted_files, common_diffs, new_files + export_csv_dtypes = { + "geo_id": str, "val": float, "se": float, "sample_size": float, + "missing_val": int, "missing_se": int, "missing_sample_size": int + } + + # Replace deleted files with empty versions, but only if the cached version is not + # already empty + deleted_files_nanfilled = [] + for deleted_file in deleted_files: + deleted_df = pd.read_csv(deleted_file, dtype=export_csv_dtypes) + print( + f"Diff has deleted {deleted_file}; generating a CSV with corresponding deleted rows." + ) + deleted_df[["val", "se", "sample_size"]] = np.nan + deleted_df[["missing_val", "missing_se", "missing_sample_size"]] = Nans.DELETED + filename = join(self.export_dir, basename(deleted_file)) + deleted_df.to_csv(filename, index=False) + deleted_files_nanfilled.append(filename) + + return deleted_files_nanfilled, common_diffs, new_files def archive_exports(self, exported_files: Files) -> Tuple[Files, Files]: """ @@ -266,9 +299,10 @@ def filter_exports(self, common_diffs: FileDiffMap): Filter export directory to only contain relevant files. Filters down the export_dir to only contain: - 1) New files, 2) Changed files, filtered-down to the ADDED and CHANGED rows only. - Should be called after archive_exports() so we archive the raw exports before - potentially modifying them. + 1) New files, 2) Changed files, filtered-down to the ADDED and CHANGED rows + only, and 3) Deleted files replaced with empty CSVs with the same name. Should + be called after archive_exports() so we archive the raw exports before potentially + modifying them. Parameters ---------- @@ -297,12 +331,13 @@ def run(self): self.update_cache() # Diff exports, and make incremental versions - _, common_diffs, new_files = self.diff_exports() + deleted_files, common_diffs, new_files = self.diff_exports() - # Archive changed and new files only + # Archive changed, new, and emptied deleted files to_archive = [f for f, diff in common_diffs.items() if diff is not None] to_archive += new_files + to_archive += deleted_files _, fails = self.archive_exports(to_archive) # Filter existing exports to exclude those that failed to archive @@ -414,6 +449,9 @@ def archive_exports(self, # pylint: disable=arguments-differ archive_success.append(exported_file) except FileNotFoundError: archive_fail.append(exported_file) + except shutil.SameFileError: + # no need to copy if the cached file is the same + archive_success.append(exported_file) self._exports_archived = True diff --git a/_delphi_utils_python/delphi_utils/export.py b/_delphi_utils_python/delphi_utils/export.py index 5a3b804b2..afc1a4c8a 100644 --- a/_delphi_utils_python/delphi_utils/export.py +++ b/_delphi_utils_python/delphi_utils/export.py @@ -3,10 +3,32 @@ from datetime import datetime from os.path import join from typing import Optional +import logging import numpy as np import pandas as pd +from .nancodes import Nans + +def filter_contradicting_missing_codes(df, sensor, metric, date, logger=None): + """Find values with contradictory missingness codes, filter them, and log.""" + columns = ["val", "se", "sample_size"] + # Get indicies where the XNOR is true (i.e. both are true or both are false). + masks = [ + ~(df[column].isna() ^ df["missing_" + column].eq(Nans.NOT_MISSING)) + for column in columns + ] + for mask in masks: + if not logger is None and df.loc[mask].size > 0: + logger.info( + "Filtering contradictory missing code in " + + "{0}_{1}_{2}.".format(sensor, metric, date.strftime(format="%Y-%m-%d")) + ) + df = df.loc[~mask] + elif logger is None and df.loc[mask].size > 0: + df = df.loc[~mask] + return df + def create_export_csv( df: pd.DataFrame, export_dir: str, @@ -16,7 +38,8 @@ def create_export_csv( start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, remove_null_samples: Optional[bool] = False, - write_empty_days: Optional[bool] = False + write_empty_days: Optional[bool] = False, + logger: Optional[logging.Logger] = None ): """Export data in the format expected by the Delphi API. @@ -43,6 +66,8 @@ def create_export_csv( write_empty_days: Optional[bool] If true, every day in between start_date and end_date will have a CSV file written even if there is no data for the day. If false, only the days present are written. + logger: Optional[logging.Logger] + Pass a logger object here to log information about contradictory missing codes. Returns --------- @@ -70,7 +95,20 @@ def create_export_csv( else: export_filename = f"{date.strftime('%Y%m%d')}_{geo_res}_{metric}_{sensor}.csv" export_file = join(export_dir, export_filename) - export_df = df[df["timestamp"] == date][["geo_id", "val", "se", "sample_size",]] + expected_columns = [ + "geo_id", + "val", + "se", + "sample_size", + "missing_val", + "missing_se", + "missing_sample_size" + ] + export_df = df[df["timestamp"] == date].filter(items=expected_columns) + if "missing_val" in export_df.columns: + export_df = filter_contradicting_missing_codes( + export_df, sensor, metric, date, logger=logger + ) if remove_null_samples: export_df = export_df[export_df["sample_size"].notnull()] export_df = export_df.round({"val": 7, "se": 7}) diff --git a/_delphi_utils_python/tests/test_archive.py b/_delphi_utils_python/tests/test_archive.py index 1b068f898..111acf92f 100644 --- a/_delphi_utils_python/tests/test_archive.py +++ b/_delphi_utils_python/tests/test_archive.py @@ -14,8 +14,12 @@ from delphi_utils.archive import ArchiveDiffer, GitArchiveDiffer, S3ArchiveDiffer,\ archiver_from_params +from delphi_utils.nancodes import Nans -CSV_DTYPES = {"geo_id": str, "val": float, "se": float, "sample_size": float} +CSV_DTYPES = { + "geo_id": str, "val": float, "se": float, "sample_size": float, + "missing_val": int, "missing_se":int, "missing_sample_size": int + } CSVS_BEFORE = { # Common @@ -23,20 +27,40 @@ "geo_id": ["1", "2", "3"], "val": [1.000000001, 2.00000002, 3.00000003], "se": [0.1, 0.2, 0.3], - "sample_size": [10.0, 20.0, 30.0]}), + "sample_size": [10.0, 20.0, 30.0], + "missing_val": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.NOT_MISSING] * 3, + "missing_sample_size": [Nans.NOT_MISSING] * 3, + }), "csv1": pd.DataFrame({ "geo_id": ["1", "2", "3"], "val": [1.0, 2.0, 3.0], "se": [np.nan, 0.20000002, 0.30000003], - "sample_size": [10.0, 20.0, 30.0]}), + "sample_size": [10.0, 20.0, 30.0], + "missing_val": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.NOT_MISSING] * 3, + "missing_sample_size": [Nans.NOT_MISSING] * 3, + }), # Deleted "csv2": pd.DataFrame({ "geo_id": ["1"], "val": [1.0], "se": [0.1], - "sample_size": [10.0]}), + "sample_size": [10.0], + "missing_val": [Nans.NOT_MISSING], + "missing_se": [Nans.NOT_MISSING], + "missing_sample_size": [Nans.NOT_MISSING], + }), + + # Common, but updated with missing columns + "csv4": pd.DataFrame({ + "geo_id": ["1"], + "val": [1.0], + "se": [0.1], + "sample_size": [10.0] + }), } CSVS_AFTER = { @@ -45,23 +69,45 @@ "geo_id": ["1", "2", "3"], "val": [1.0, 2.0, 3.0], "se": [0.10000001, 0.20000002, 0.30000003], - "sample_size": [10.0, 20.0, 30.0]}), + "sample_size": [10.0, 20.0, 30.0], + "missing_val": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.NOT_MISSING] * 3, + "missing_sample_size": [Nans.NOT_MISSING] * 3, + }), "csv1": pd.DataFrame({ "geo_id": ["1", "2", "4"], "val": [1.0, 2.1, 4.0], "se": [np.nan, 0.21, np.nan], - "sample_size": [10.0, 21.0, 40.0]}), + "sample_size": [10.0, 21.0, 40.0], + "missing_val": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.NOT_MISSING] * 3, + "missing_sample_size": [Nans.NOT_MISSING] * 3, + }), # Added "csv3": pd.DataFrame({ "geo_id": ["2"], "val": [2.0000002], "se": [0.2], - "sample_size": [20.0]}), + "sample_size": [20.0], + "missing_val": [Nans.NOT_MISSING], + "missing_se": [Nans.NOT_MISSING], + "missing_sample_size": [Nans.NOT_MISSING], + }), + + # Common, but updated with missing columns + "csv4": pd.DataFrame({ + "geo_id": ["1"], + "val": [1.0], + "se": [0.1], + "sample_size": [10.0], + "missing_val": [Nans.NOT_MISSING], + "missing_se": [Nans.NOT_MISSING], + "missing_sample_size": [Nans.NOT_MISSING], + }), } - class TestArchiveDiffer: def test_stubs(self): @@ -80,10 +126,24 @@ def test_diff_and_filter_exports(self, tmp_path): mkdir(export_dir) csv1_diff = pd.DataFrame({ - "geo_id": ["2", "4"], - "val": [2.1, 4.0], - "se": [0.21, np.nan], - "sample_size": [21.0, 40.0]}) + "geo_id": ["3", "2", "4"], + "val": [np.nan, 2.1, 4.0], + "se": [np.nan, 0.21, np.nan], + "sample_size": [np.nan, 21.0, 40.0], + "missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + "missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + "missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + }) + + csv2_deleted = pd.DataFrame({ + "geo_id": ["1"], + "val": [np.nan], + "se": [np.nan], + "sample_size": [np.nan], + "missing_val": [Nans.DELETED], + "missing_se": [Nans.DELETED], + "missing_sample_size": [Nans.DELETED], + }) arch_diff = ArchiveDiffer(cache_dir, export_dir) @@ -104,9 +164,9 @@ def test_diff_and_filter_exports(self, tmp_path): deleted_files, common_diffs, new_files = arch_diff.diff_exports() # Check return values - assert set(deleted_files) == {join(cache_dir, "csv2.csv")} + assert set(deleted_files) == {join(export_dir, "csv2.csv")} assert set(common_diffs.keys()) == { - join(export_dir, f) for f in ["csv0.csv", "csv1.csv"]} + join(export_dir, f) for f in ["csv0.csv", "csv1.csv", "csv4.csv"]} assert set(new_files) == {join(export_dir, "csv3.csv")} assert common_diffs[join(export_dir, "csv0.csv")] is None assert common_diffs[join(export_dir, "csv1.csv")] == join( @@ -114,7 +174,10 @@ def test_diff_and_filter_exports(self, tmp_path): # Check filesystem for actual files assert set(listdir(export_dir)) == { - "csv0.csv", "csv1.csv", "csv1.csv.diff", "csv3.csv"} + "csv0.csv", "csv1.csv", "csv1.csv.diff", + "csv3.csv", "csv4.csv", "csv4.csv.diff", + "csv2.csv" + } assert_frame_equal( pd.read_csv(join(export_dir, "csv1.csv.diff"), dtype=CSV_DTYPES), csv1_diff) @@ -131,8 +194,11 @@ def test_diff_and_filter_exports(self, tmp_path): arch_diff.filter_exports(common_diffs) - # Check exports directory just has incremental changes - assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv"} + # Check exports directory just has incremental and deleted changes + assert set(listdir(export_dir)) == {"csv1.csv", "csv2.csv", "csv3.csv", "csv4.csv"} + assert_frame_equal( + pd.read_csv(join(export_dir, "csv2.csv"), dtype=CSV_DTYPES), + csv2_deleted) assert_frame_equal( pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES), csv1_diff) @@ -259,15 +325,31 @@ def test_run(self, tmp_path, s3_client): assert_frame_equal(pd.read_csv(body, dtype=CSV_DTYPES), df) # Check exports directory just has incremental changes - assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv"} + assert set(listdir(export_dir)) == {"csv1.csv", "csv2.csv", "csv3.csv", "csv4.csv"} csv1_diff = pd.DataFrame({ - "geo_id": ["2", "4"], - "val": [2.1, 4.0], - "se": [0.21, np.nan], - "sample_size": [21.0, 40.0]}) + "geo_id": ["3", "2", "4"], + "val": [np.nan, 2.1, 4.0], + "se": [np.nan, 0.21, np.nan], + "sample_size": [np.nan, 21.0, 40.0], + "missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + "missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + "missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + }) assert_frame_equal( pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES), csv1_diff) + csv2_deleted = pd.DataFrame({ + "geo_id": ["1"], + "val": [np.nan], + "se": [np.nan], + "sample_size": [np.nan], + "missing_val": [Nans.DELETED], + "missing_se": [Nans.DELETED], + "missing_sample_size": [Nans.DELETED], + }) + assert_frame_equal( + pd.read_csv(join(export_dir, "csv2.csv"), dtype=CSV_DTYPES), + csv2_deleted) class TestGitArchiveDiffer: @@ -346,7 +428,11 @@ def test_diff_exports(self, tmp_path): "geo_id": ["1", "2", "3"], "val": [1.0, 2.0, 3.0], "se": [0.1, 0.2, 0.3], - "sample_size": [10.0, 20.0, 30.0]}) + "sample_size": [10.0, 20.0, 30.0], + "missing_val": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.NOT_MISSING] * 3, + "missing_sample_size": [Nans.NOT_MISSING] * 3, + }) # Write exact same CSV into cache and export, so no diffs expected csv1.to_csv(join(cache_dir, "csv1.csv"), index=False) @@ -383,7 +469,11 @@ def test_archive_exports(self, tmp_path): "geo_id": ["1", "2", "3"], "val": [1.0, 2.0, 3.0], "se": [0.1, 0.2, 0.3], - "sample_size": [10.0, 20.0, 30.0]}) + "sample_size": [10.0, 20.0, 30.0], + "missing_val": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.NOT_MISSING] * 3, + "missing_sample_size": [Nans.NOT_MISSING] * 3, + }) # csv1.csv is now a dirty edit in the repo, and to be exported too csv1.to_csv(join(cache_dir, "csv1.csv"), index=False) @@ -460,15 +550,32 @@ def test_run(self, tmp_path): original_branch.checkout() # Check exports directory just has incremental changes - assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv"} + assert set(listdir(export_dir)) == {"csv1.csv", "csv2.csv", "csv3.csv", "csv4.csv"} csv1_diff = pd.DataFrame({ - "geo_id": ["2", "4"], - "val": [2.1, 4.0], - "se": [0.21, np.nan], - "sample_size": [21.0, 40.0]}) + "geo_id": ["3", "2", "4"], + "val": [np.nan, 2.1, 4.0], + "se": [np.nan, 0.21, np.nan], + "sample_size": [np.nan, 21.0, 40.0], + "missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + "missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + "missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + }) assert_frame_equal( pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES), csv1_diff) + csv2_deleted = pd.DataFrame({ + "geo_id": ["1"], + "val": [np.nan], + "se": [np.nan], + "sample_size": [np.nan], + "missing_val": [Nans.DELETED], + "missing_se": [Nans.DELETED], + "missing_sample_size": [Nans.DELETED], + }) + assert_frame_equal( + pd.read_csv(join(export_dir, "csv2.csv"), dtype=CSV_DTYPES), + csv2_deleted) + class TestFromParams: diff --git a/_delphi_utils_python/tests/test_export.py b/_delphi_utils_python/tests/test_export.py index 31ec5c113..b22a710cd 100644 --- a/_delphi_utils_python/tests/test_export.py +++ b/_delphi_utils_python/tests/test_export.py @@ -3,8 +3,11 @@ from os import listdir, remove from os.path import join +import mock +import numpy as np import pandas as pd -from delphi_utils import create_export_csv + +from delphi_utils import create_export_csv, Nans def _clean_directory(directory): """Clean files out of a directory.""" @@ -43,6 +46,34 @@ class TestExport: } ) + # A sample data frame with missingness. + DF2 = pd.DataFrame( + { + "geo_id": ["51093", "51175", "51175", "51620"], + "timestamp": TIMES, + "val": [3.12345678910, np.nan, 2.2, 2.6], + "se": [0.15, 0.22, np.nan, 0.34], + "sample_size": [100, 100, 101, None], + "missing_val": [Nans.NOT_MISSING, Nans.OTHER, Nans.NOT_MISSING, Nans.NOT_MISSING], + "missing_se": [Nans.NOT_MISSING, Nans.NOT_MISSING, Nans.OTHER, Nans.NOT_MISSING], + "missing_sample_size": [Nans.NOT_MISSING] * 3 + [Nans.OTHER] + } + ) + + # A sample data frame with contradictory missing codes. + DF3 = pd.DataFrame( + { + "geo_id": ["51093", "51175", "51175", "51620"], + "timestamp": TIMES, + "val": [np.nan, np.nan, 2.2, 2.6], + "se": [0.15, 0.22, np.nan, 0.34], + "sample_size": [100, 100, 101, None], + "missing_val": [Nans.NOT_MISSING, Nans.OTHER, Nans.NOT_MISSING, Nans.NOT_MISSING], + "missing_se": [Nans.NOT_MISSING, Nans.NOT_MISSING, Nans.OTHER, Nans.NOT_MISSING], + "missing_sample_size": [Nans.NOT_MISSING] * 3 + [Nans.OTHER] + } + ) + # Directory in which to store tests. TEST_DIR = "test_dir" @@ -235,3 +266,46 @@ def test_export_without_null_removal(self): ] ) assert pd.read_csv(join(self.TEST_DIR, "20200606_state_test.csv")).size > 0 + + def test_export_df_with_missingness(self): + _clean_directory(self.TEST_DIR) + + create_export_csv( + df=self.DF2.copy(), + export_dir=self.TEST_DIR, + geo_res="state", + sensor="test", + remove_null_samples=False + ) + assert _non_ignored_files_set(self.TEST_DIR) == set( + [ + "20200215_state_test.csv", + "20200301_state_test.csv", + "20200315_state_test.csv", + ] + ) + assert pd.read_csv(join(self.TEST_DIR, "20200315_state_test.csv")).size > 0 + + @mock.patch("delphi_utils.logger") + def test_export_df_with_contradictory_missingness(self, mock_logger): + _clean_directory(self.TEST_DIR) + + create_export_csv( + df=self.DF3.copy(), + export_dir=self.TEST_DIR, + geo_res="state", + sensor="test", + remove_null_samples=False, + logger=mock_logger + ) + assert _non_ignored_files_set(self.TEST_DIR) == set( + [ + "20200215_state_test.csv", + "20200301_state_test.csv", + "20200315_state_test.csv", + ] + ) + assert pd.read_csv(join(self.TEST_DIR, "20200315_state_test.csv")).size > 0 + mock_logger.info.assert_called_once_with( + "Filtering contradictory missing code in test_None_2020-02-15." + ) diff --git a/quidel_covidtest/delphi_quidel_covidtest/run.py b/quidel_covidtest/delphi_quidel_covidtest/run.py index d82f80135..0e99c347d 100644 --- a/quidel_covidtest/delphi_quidel_covidtest/run.py +++ b/quidel_covidtest/delphi_quidel_covidtest/run.py @@ -12,7 +12,8 @@ from delphi_utils import ( add_prefix, create_export_csv, - get_structured_logger + get_structured_logger, + Nans ) from .constants import (END_FROM_TODAY_MINUS, @@ -40,6 +41,22 @@ def log_exit(start_time, stats, logger): max_lag_in_days = max_lag_in_days, oldest_final_export_date = formatted_min_max_date) +def add_nancodes(df): + """Add nancodes to the dataframe.""" + # Default missingness codes + df["missing_val"] = Nans.NOT_MISSING + df["missing_se"] = Nans.NOT_MISSING + df["missing_sample_size"] = Nans.NOT_MISSING + + # Mark any remaining nans with unknown + remaining_nans_mask = df["val"].isnull() + df.loc[remaining_nans_mask, "missing_val"] = Nans.OTHER + remaining_nans_mask = df["se"].isnull() + df.loc[remaining_nans_mask, "missing_se"] = Nans.OTHER + remaining_nans_mask = df["sample_size"].isnull() + df.loc[remaining_nans_mask, "missing_sample_size"] = Nans.OTHER + return df + def run_module(params: Dict[str, Any]): """Run the quidel_covidtest indicator. @@ -111,6 +128,7 @@ def run_module(params: Dict[str, Any]): geo_groups, res_key, smooth=smoothers[sensor][1], device=smoothers[sensor][0], first_date=first_date, last_date=last_date) + state_df = add_nancodes(state_df) dates = create_export_csv( state_df, geo_res=geo_res, @@ -130,6 +148,7 @@ def run_module(params: Dict[str, Any]): geo_groups, geo_data, res_key, smooth=smoothers[sensor][1], device=smoothers[sensor][0], first_date=first_date, last_date=last_date) + res_df = add_nancodes(res_df) dates = create_export_csv(res_df, geo_res=geo_res, sensor=sensor, export_dir=export_dir, start_date=export_start_date, end_date=export_end_date, remove_null_samples=True) diff --git a/quidel_covidtest/tests/test_run.py b/quidel_covidtest/tests/test_run.py index 89c59a24f..0541c4261 100644 --- a/quidel_covidtest/tests/test_run.py +++ b/quidel_covidtest/tests/test_run.py @@ -66,7 +66,11 @@ def test_output_files(self, clean_receiving_dir): df = pd.read_csv( join("./receiving", "20200718_state_covid_ag_smoothed_pct_positive.csv") ) - assert (df.columns.values == ["geo_id", "val", "se", "sample_size"]).all() + expected_columns = [ + "geo_id", "val", "se", "sample_size", + "missing_val", "missing_se", "missing_sample_size" + ] + assert (df.columns.values == expected_columns).all() # test_intermediate_file flag = None