Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add NAN code support to Quidel Covidtest #999

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
66 changes: 52 additions & 14 deletions _delphi_utils_python/delphi_utils/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@
from git import Repo
from git.refs.head import Head
import pandas as pd
import numpy as np

from .utils import read_params
from .logger import get_structured_logger
from .nancodes import Nans

Files = List[str]
FileDiffMap = Dict[str, Optional[str]]
Expand Down Expand Up @@ -73,8 +75,10 @@ def diff_export_csv(
changed_df is the pd.DataFrame of common rows from after_csv with changed values.
added_df is the pd.DataFrame of added rows from after_csv.
"""
export_csv_dtypes = {"geo_id": str, "val": float,
"se": float, "sample_size": float}
export_csv_dtypes = {
"geo_id": str, "val": float, "se": float, "sample_size": float,
"missing_val": int, "missing_se": int, "missing_sample_size": int
}

before_df = pd.read_csv(before_csv, dtype=export_csv_dtypes)
before_df.set_index("geo_id", inplace=True)
Expand All @@ -89,12 +93,22 @@ def diff_export_csv(
before_df_cmn = before_df.reindex(common_idx)
after_df_cmn = after_df.reindex(common_idx)

# Exact comparisons, treating NA == NA as True
same_mask = before_df_cmn == after_df_cmn
same_mask |= pd.isna(before_df_cmn) & pd.isna(after_df_cmn)
# If CSVs have different columns (no missingness), mark all values as new
if ("missing_val" in before_df_cmn.columns) ^ ("missing_val" in after_df_cmn.columns):
same_mask = after_df_cmn.copy()
same_mask.loc[:] = False
else:
# Exact comparisons, treating NA == NA as True
same_mask = before_df_cmn == after_df_cmn
same_mask |= pd.isna(before_df_cmn) & pd.isna(after_df_cmn)

# Code deleted entries as nans with the deleted missing code
deleted_df = before_df.loc[deleted_idx, :].copy()
deleted_df[["val", "se", "sample_size"]] = np.nan
deleted_df[["missing_val", "missing_se", "missing_sample_size"]] = Nans.DELETED

return (
before_df.loc[deleted_idx, :],
deleted_df,
after_df_cmn.loc[~(same_mask.all(axis=1)), :],
after_df.loc[added_idx, :])

Expand Down Expand Up @@ -227,11 +241,11 @@ def diff_exports(self) -> Tuple[Files, FileDiffMap, Files]:

deleted_df, changed_df, added_df = diff_export_csv(
before_file, after_file)
new_issues_df = pd.concat([changed_df, added_df], axis=0)
new_issues_df = pd.concat([deleted_df, changed_df, added_df], axis=0)

if len(deleted_df) > 0:
print(
f"Warning, diff has deleted indices in {after_file} that will be ignored")
f"Diff has deleted indices in {after_file} that have been coded as nans.")

# Write the diffs to diff_file, if applicable
if len(new_issues_df) > 0:
Expand All @@ -240,7 +254,26 @@ def diff_exports(self) -> Tuple[Files, FileDiffMap, Files]:
new_issues_df.to_csv(diff_file, na_rep="NA")
common_diffs[after_file] = diff_file

return deleted_files, common_diffs, new_files
export_csv_dtypes = {
"geo_id": str, "val": float, "se": float, "sample_size": float,
"missing_val": int, "missing_se": int, "missing_sample_size": int
}

# Replace deleted files with empty versions, but only if the cached version is not
# already empty
deleted_files_nanfilled = []
for deleted_file in deleted_files:
deleted_df = pd.read_csv(deleted_file, dtype=export_csv_dtypes)
print(
f"Diff has deleted {deleted_file}; generating a CSV with corresponding deleted rows."
)
deleted_df[["val", "se", "sample_size"]] = np.nan
deleted_df[["missing_val", "missing_se", "missing_sample_size"]] = Nans.DELETED
filename = join(self.export_dir, basename(deleted_file))
deleted_df.to_csv(filename, index=False)
deleted_files_nanfilled.append(filename)

return deleted_files_nanfilled, common_diffs, new_files

def archive_exports(self, exported_files: Files) -> Tuple[Files, Files]:
"""
Expand All @@ -266,9 +299,10 @@ def filter_exports(self, common_diffs: FileDiffMap):
Filter export directory to only contain relevant files.

Filters down the export_dir to only contain:
1) New files, 2) Changed files, filtered-down to the ADDED and CHANGED rows only.
Should be called after archive_exports() so we archive the raw exports before
potentially modifying them.
1) New files, 2) Changed files, filtered-down to the ADDED and CHANGED rows
only, and 3) Deleted files replaced with empty CSVs with the same name. Should
be called after archive_exports() so we archive the raw exports before potentially
modifying them.

Parameters
----------
Expand Down Expand Up @@ -297,12 +331,13 @@ def run(self):
self.update_cache()

# Diff exports, and make incremental versions
_, common_diffs, new_files = self.diff_exports()
deleted_files, common_diffs, new_files = self.diff_exports()

# Archive changed and new files only
# Archive changed, new, and emptied deleted files
to_archive = [f for f, diff in common_diffs.items()
if diff is not None]
to_archive += new_files
to_archive += deleted_files
_, fails = self.archive_exports(to_archive)

# Filter existing exports to exclude those that failed to archive
Expand Down Expand Up @@ -414,6 +449,9 @@ def archive_exports(self, # pylint: disable=arguments-differ
archive_success.append(exported_file)
except FileNotFoundError:
archive_fail.append(exported_file)
except shutil.SameFileError:
# no need to copy if the cached file is the same
archive_success.append(exported_file)

self._exports_archived = True

Expand Down
42 changes: 40 additions & 2 deletions _delphi_utils_python/delphi_utils/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,32 @@
from datetime import datetime
from os.path import join
from typing import Optional
import logging

import numpy as np
import pandas as pd

from .nancodes import Nans

def filter_contradicting_missing_codes(df, sensor, metric, date, logger=None):
"""Find values with contradictory missingness codes, filter them, and log."""
columns = ["val", "se", "sample_size"]
# Get indicies where the XNOR is true (i.e. both are true or both are false).
masks = [
~(df[column].isna() ^ df["missing_" + column].eq(Nans.NOT_MISSING))
for column in columns
]
for mask in masks:
if not logger is None and df.loc[mask].size > 0:
logger.info(
"Filtering contradictory missing code in " +
"{0}_{1}_{2}.".format(sensor, metric, date.strftime(format="%Y-%m-%d"))
)
df = df.loc[~mask]
elif logger is None and df.loc[mask].size > 0:
df = df.loc[~mask]
return df

def create_export_csv(
df: pd.DataFrame,
export_dir: str,
Expand All @@ -16,7 +38,8 @@ def create_export_csv(
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
remove_null_samples: Optional[bool] = False,
write_empty_days: Optional[bool] = False
write_empty_days: Optional[bool] = False,
logger: Optional[logging.Logger] = None
):
"""Export data in the format expected by the Delphi API.

Expand All @@ -43,6 +66,8 @@ def create_export_csv(
write_empty_days: Optional[bool]
If true, every day in between start_date and end_date will have a CSV file written
even if there is no data for the day. If false, only the days present are written.
logger: Optional[logging.Logger]
Pass a logger object here to log information about contradictory missing codes.

Returns
---------
Expand Down Expand Up @@ -70,7 +95,20 @@ def create_export_csv(
else:
export_filename = f"{date.strftime('%Y%m%d')}_{geo_res}_{metric}_{sensor}.csv"
export_file = join(export_dir, export_filename)
export_df = df[df["timestamp"] == date][["geo_id", "val", "se", "sample_size",]]
expected_columns = [
"geo_id",
"val",
"se",
"sample_size",
"missing_val",
"missing_se",
"missing_sample_size"
]
export_df = df[df["timestamp"] == date].filter(items=expected_columns)
if "missing_val" in export_df.columns:
export_df = filter_contradicting_missing_codes(
export_df, sensor, metric, date, logger=logger
)
if remove_null_samples:
export_df = export_df[export_df["sample_size"].notnull()]
export_df = export_df.round({"val": 7, "se": 7})
Expand Down
Loading