Skip to content

Commit

Permalink
Update utilities for NAN codes:
Browse files Browse the repository at this point in the history
* update export utility to export, validate, and test the missing cols
* add deletion coding to the archiver, make it expect missing cols, and
  let it handle comparisons between missing and non-missing CSVs
  • Loading branch information
dshemetov committed Apr 27, 2021
1 parent 33537f0 commit 8557308
Show file tree
Hide file tree
Showing 4 changed files with 235 additions and 39 deletions.
35 changes: 27 additions & 8 deletions _delphi_utils_python/delphi_utils/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@
from git import Repo
from git.refs.head import Head
import pandas as pd
import numpy as np

from .utils import read_params
from .logger import get_structured_logger
from .nancodes import Nans

Files = List[str]
FileDiffMap = Dict[str, Optional[str]]
Expand Down Expand Up @@ -73,8 +75,10 @@ def diff_export_csv(
changed_df is the pd.DataFrame of common rows from after_csv with changed values.
added_df is the pd.DataFrame of added rows from after_csv.
"""
export_csv_dtypes = {"geo_id": str, "val": float,
"se": float, "sample_size": float}
export_csv_dtypes = {
"geo_id": str, "val": float, "se": float, "sample_size": float,
"missing_val": int, "missing_se": int, "missing_sample_size": int
}

before_df = pd.read_csv(before_csv, dtype=export_csv_dtypes)
before_df.set_index("geo_id", inplace=True)
Expand All @@ -89,12 +93,27 @@ def diff_export_csv(
before_df_cmn = before_df.reindex(common_idx)
after_df_cmn = after_df.reindex(common_idx)

# Exact comparisons, treating NA == NA as True
same_mask = before_df_cmn == after_df_cmn
same_mask |= pd.isna(before_df_cmn) & pd.isna(after_df_cmn)
# If CSVs have different columns (no missingness), mark all values as new
if ("missing_val" in before_df_cmn.columns) ^ ("missing_val" in after_df_cmn.columns):
same_mask = after_df_cmn.copy()
same_mask.loc[:] = False
else:
# Exact comparisons, treating NA == NA as True
same_mask = before_df_cmn == after_df_cmn
same_mask |= pd.isna(before_df_cmn) & pd.isna(after_df_cmn)

# Code deleted entries as nans with the deleted missing code
deleted_df = before_df.loc[deleted_idx, :].copy()
deleted_df[["val", "se", "sample_size"]] = np.nan
deleted_df[["missing_val", "missing_se", "missing_sample_size"]] = Nans.DELETED

# Code deleted entries as nans with the deleted missing code
deleted_df = before_df.loc[deleted_idx, :].copy()
deleted_df[["val", "se", "sample_size"]] = np.nan
deleted_df[["missing_val", "missing_se", "missing_sample_size"]] = Nans.DELETED

return (
before_df.loc[deleted_idx, :],
deleted_df,
after_df_cmn.loc[~(same_mask.all(axis=1)), :],
after_df.loc[added_idx, :])

Expand Down Expand Up @@ -227,11 +246,11 @@ def diff_exports(self) -> Tuple[Files, FileDiffMap, Files]:

deleted_df, changed_df, added_df = diff_export_csv(
before_file, after_file)
new_issues_df = pd.concat([changed_df, added_df], axis=0)
new_issues_df = pd.concat([deleted_df, changed_df, added_df], axis=0)

if len(deleted_df) > 0:
print(
f"Warning, diff has deleted indices in {after_file} that will be ignored")
f"Diff has deleted indices in {after_file} that have been coded as nans.")

# Write the diffs to diff_file, if applicable
if len(new_issues_df) > 0:
Expand Down
42 changes: 40 additions & 2 deletions _delphi_utils_python/delphi_utils/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,32 @@
from datetime import datetime
from os.path import join
from typing import Optional
import logging

import numpy as np
import pandas as pd

from .nancodes import Nans

def filter_contradicting_missing_codes(df, sensor, metric, date, logger=None):
"""Find values with contradictory missingness codes, filter them, and log."""
columns = ["val", "se", "sample_size"]
# Get indicies where the XNOR is true (i.e. both are true or both are false).
masks = [
~(df[column].isna() ^ df["missing_" + column].eq(Nans.NOT_MISSING))
for column in columns
]
for mask in masks:
if not logger is None and df.loc[mask].size > 0:
logger.info(
"Filtering contradictory missing code in " +
"{0}_{1}_{2}.".format(sensor, metric, date.strftime(format="%Y-%m-%d"))
)
df = df.loc[~mask]
elif logger is None and df.loc[mask].size > 0:
df = df.loc[~mask]
return df

def create_export_csv(
df: pd.DataFrame,
export_dir: str,
Expand All @@ -15,7 +37,8 @@ def create_export_csv(
metric: Optional[str] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
remove_null_samples: Optional[bool] = False
remove_null_samples: Optional[bool] = False,
logger: Optional[logging.Logger] = None
):
"""Export data in the format expected by the Delphi API.
Expand All @@ -39,6 +62,8 @@ def create_export_csv(
Latest date to export or None if no maximum date restrictions should be applied.
remove_null_samples: Optional[bool]
Whether to remove entries whose sample sizes are null.
logger: Optional[logging.Logger]
Pass a logger object here to log information about contradictory missing codes.
Returns
---------
Expand All @@ -64,7 +89,20 @@ def create_export_csv(
else:
export_filename = f"{date.strftime('%Y%m%d')}_{geo_res}_{metric}_{sensor}.csv"
export_file = join(export_dir, export_filename)
export_df = df[df["timestamp"] == date][["geo_id", "val", "se", "sample_size",]]
expected_columns = [
"geo_id",
"val",
"se",
"sample_size",
"missing_val",
"missing_se",
"missing_sample_size"
]
export_df = df[df["timestamp"] == date].filter(items=expected_columns)
if "missing_val" in export_df.columns:
export_df = filter_contradicting_missing_codes(
export_df, sensor, metric, date, logger=logger
)
if remove_null_samples:
export_df = export_df[export_df["sample_size"].notnull()]
export_df = export_df.round({"val": 7, "se": 7})
Expand Down
121 changes: 93 additions & 28 deletions _delphi_utils_python/tests/test_archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,53 @@
import pytest

from delphi_utils.archive import ArchiveDiffer, GitArchiveDiffer, S3ArchiveDiffer,\
archiver_from_params
archiver_from_params, Nans

CSV_DTYPES = {"geo_id": str, "val": float, "se": float, "sample_size": float}
CSV_DTYPES = {
"geo_id": str, "val": float, "se": float, "sample_size": float,
"missing_val": int, "missing_se":int, "missing_sample_size": int
}

CSVS_BEFORE = {
# Common
"csv0": pd.DataFrame({
"geo_id": ["1", "2", "3"],
"val": [1.000000001, 2.00000002, 3.00000003],
"se": [0.1, 0.2, 0.3],
"sample_size": [10.0, 20.0, 30.0]}),
"sample_size": [10.0, 20.0, 30.0],
"missing_val": [Nans.NOT_MISSING] * 3,
"missing_se": [Nans.NOT_MISSING] * 3,
"missing_sample_size": [Nans.NOT_MISSING] * 3,
}),

"csv1": pd.DataFrame({
"geo_id": ["1", "2", "3"],
"val": [1.0, 2.0, 3.0],
"se": [np.nan, 0.20000002, 0.30000003],
"sample_size": [10.0, 20.0, 30.0]}),
"sample_size": [10.0, 20.0, 30.0],
"missing_val": [Nans.NOT_MISSING] * 3,
"missing_se": [Nans.NOT_MISSING] * 3,
"missing_sample_size": [Nans.NOT_MISSING] * 3,
}),

# Deleted
"csv2": pd.DataFrame({
"geo_id": ["1"],
"val": [1.0],
"se": [0.1],
"sample_size": [10.0]}),
"sample_size": [10.0],
"missing_val": [Nans.NOT_MISSING],
"missing_se": [Nans.NOT_MISSING],
"missing_sample_size": [Nans.NOT_MISSING],
}),

# Common, but updated with missing columns
"csv4": pd.DataFrame({
"geo_id": ["1"],
"val": [1.0],
"se": [0.1],
"sample_size": [10.0]
}),
}

CSVS_AFTER = {
Expand All @@ -45,23 +68,45 @@
"geo_id": ["1", "2", "3"],
"val": [1.0, 2.0, 3.0],
"se": [0.10000001, 0.20000002, 0.30000003],
"sample_size": [10.0, 20.0, 30.0]}),
"sample_size": [10.0, 20.0, 30.0],
"missing_val": [Nans.NOT_MISSING] * 3,
"missing_se": [Nans.NOT_MISSING] * 3,
"missing_sample_size": [Nans.NOT_MISSING] * 3,
}),

"csv1": pd.DataFrame({
"geo_id": ["1", "2", "4"],
"val": [1.0, 2.1, 4.0],
"se": [np.nan, 0.21, np.nan],
"sample_size": [10.0, 21.0, 40.0]}),
"sample_size": [10.0, 21.0, 40.0],
"missing_val": [Nans.NOT_MISSING] * 3,
"missing_se": [Nans.NOT_MISSING] * 3,
"missing_sample_size": [Nans.NOT_MISSING] * 3,
}),

# Added
"csv3": pd.DataFrame({
"geo_id": ["2"],
"val": [2.0000002],
"se": [0.2],
"sample_size": [20.0]}),
"sample_size": [20.0],
"missing_val": [Nans.NOT_MISSING],
"missing_se": [Nans.NOT_MISSING],
"missing_sample_size": [Nans.NOT_MISSING],
}),

# Common, but updated with missing columns
"csv4": pd.DataFrame({
"geo_id": ["1"],
"val": [1.0],
"se": [0.1],
"sample_size": [10.0],
"missing_val": [Nans.NOT_MISSING],
"missing_se": [Nans.NOT_MISSING],
"missing_sample_size": [Nans.NOT_MISSING],
}),
}


class TestArchiveDiffer:

def test_stubs(self):
Expand All @@ -80,10 +125,14 @@ def test_diff_and_filter_exports(self, tmp_path):
mkdir(export_dir)

csv1_diff = pd.DataFrame({
"geo_id": ["2", "4"],
"val": [2.1, 4.0],
"se": [0.21, np.nan],
"sample_size": [21.0, 40.0]})
"geo_id": ["3", "2", "4"],
"val": [np.nan, 2.1, 4.0],
"se": [np.nan, 0.21, np.nan],
"sample_size": [np.nan, 21.0, 40.0],
"missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
"missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
"missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
})

arch_diff = ArchiveDiffer(cache_dir, export_dir)

Expand All @@ -106,15 +155,15 @@ def test_diff_and_filter_exports(self, tmp_path):
# Check return values
assert set(deleted_files) == {join(cache_dir, "csv2.csv")}
assert set(common_diffs.keys()) == {
join(export_dir, f) for f in ["csv0.csv", "csv1.csv"]}
join(export_dir, f) for f in ["csv0.csv", "csv1.csv", "csv4.csv"]}
assert set(new_files) == {join(export_dir, "csv3.csv")}
assert common_diffs[join(export_dir, "csv0.csv")] is None
assert common_diffs[join(export_dir, "csv1.csv")] == join(
export_dir, "csv1.csv.diff")

# Check filesystem for actual files
assert set(listdir(export_dir)) == {
"csv0.csv", "csv1.csv", "csv1.csv.diff", "csv3.csv"}
"csv0.csv", "csv1.csv", "csv1.csv.diff", "csv3.csv", "csv4.csv", "csv4.csv.diff"}
assert_frame_equal(
pd.read_csv(join(export_dir, "csv1.csv.diff"), dtype=CSV_DTYPES),
csv1_diff)
Expand All @@ -132,7 +181,7 @@ def test_diff_and_filter_exports(self, tmp_path):
arch_diff.filter_exports(common_diffs)

# Check exports directory just has incremental changes
assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv"}
assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv", "csv4.csv"}
assert_frame_equal(
pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES),
csv1_diff)
Expand Down Expand Up @@ -259,12 +308,16 @@ def test_run(self, tmp_path, s3_client):
assert_frame_equal(pd.read_csv(body, dtype=CSV_DTYPES), df)

# Check exports directory just has incremental changes
assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv"}
assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv", "csv4.csv"}
csv1_diff = pd.DataFrame({
"geo_id": ["2", "4"],
"val": [2.1, 4.0],
"se": [0.21, np.nan],
"sample_size": [21.0, 40.0]})
"geo_id": ["3", "2", "4"],
"val": [np.nan, 2.1, 4.0],
"se": [np.nan, 0.21, np.nan],
"sample_size": [np.nan, 21.0, 40.0],
"missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
"missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
"missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
})
assert_frame_equal(
pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES),
csv1_diff)
Expand Down Expand Up @@ -346,7 +399,11 @@ def test_diff_exports(self, tmp_path):
"geo_id": ["1", "2", "3"],
"val": [1.0, 2.0, 3.0],
"se": [0.1, 0.2, 0.3],
"sample_size": [10.0, 20.0, 30.0]})
"sample_size": [10.0, 20.0, 30.0],
"missing_val": [Nans.NOT_MISSING] * 3,
"missing_se": [Nans.NOT_MISSING] * 3,
"missing_sample_size": [Nans.NOT_MISSING] * 3,
})

# Write exact same CSV into cache and export, so no diffs expected
csv1.to_csv(join(cache_dir, "csv1.csv"), index=False)
Expand Down Expand Up @@ -383,7 +440,11 @@ def test_archive_exports(self, tmp_path):
"geo_id": ["1", "2", "3"],
"val": [1.0, 2.0, 3.0],
"se": [0.1, 0.2, 0.3],
"sample_size": [10.0, 20.0, 30.0]})
"sample_size": [10.0, 20.0, 30.0],
"missing_val": [Nans.NOT_MISSING] * 3,
"missing_se": [Nans.NOT_MISSING] * 3,
"missing_sample_size": [Nans.NOT_MISSING] * 3,
})

# csv1.csv is now a dirty edit in the repo, and to be exported too
csv1.to_csv(join(cache_dir, "csv1.csv"), index=False)
Expand Down Expand Up @@ -460,12 +521,16 @@ def test_run(self, tmp_path):
original_branch.checkout()

# Check exports directory just has incremental changes
assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv"}
assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv", "csv4.csv"}
csv1_diff = pd.DataFrame({
"geo_id": ["2", "4"],
"val": [2.1, 4.0],
"se": [0.21, np.nan],
"sample_size": [21.0, 40.0]})
"geo_id": ["3", "2", "4"],
"val": [np.nan, 2.1, 4.0],
"se": [np.nan, 0.21, np.nan],
"sample_size": [np.nan, 21.0, 40.0],
"missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
"missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
"missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
})
assert_frame_equal(
pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES),
csv1_diff)
Expand Down
Loading

0 comments on commit 8557308

Please sign in to comment.