Skip to content

Commit

Permalink
Merge branch 'main' into nans_covidactnow
Browse files Browse the repository at this point in the history
  • Loading branch information
dshemetov committed Oct 19, 2021
2 parents 20eac44 + ef13336 commit c9fcb55
Show file tree
Hide file tree
Showing 131 changed files with 15,202 additions and 14,896 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.1.16
current_version = 0.1.25
commit = True
message = chore: bump covidcast-indicators to {new_version}
tag = False
10 changes: 4 additions & 6 deletions .github/workflows/r-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,12 @@ jobs:
${{ runner.os }}-r-facebook-survey-
- name: Install R dependencies
run: |
if ( packageVersion("readr") != "1.4.0" ) {
install.packages("devtools")
devtools::install_version("readr", version = "1.4.0")
if ( !require("remotes") ) {
install.packages("remotes")
}
install.packages("remotes")
remotes::update_packages(c("rcmdcheck", "mockr"), upgrade="always")
remotes::update_packages(c("rcmdcheck", "mockr", "remotes"), upgrade="always")
dependency_list <- remotes::dev_package_deps(dependencies=TRUE)
remotes::update_packages(dependency_list$package[dependency_list$package != "readr"], upgrade="always")
remotes::update_packages(dependency_list$package, upgrade="always")
shell: Rscript {0}
- name: Check
run: |
Expand Down
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
params.json

# Do not commit output files
receiving/*.csv
**/receiving/*.csv

# Do not commit hidden macOS files
.DS_Store
Expand Down
2 changes: 1 addition & 1 deletion _delphi_utils_python/.bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.1.12
current_version = 0.1.17
commit = True
message = chore: bump delphi_utils to {new_version}
tag = False
Expand Down
2 changes: 1 addition & 1 deletion _delphi_utils_python/delphi_utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@
from .signal import add_prefix
from .nancodes import Nans

__version__ = "0.1.12"
__version__ = "0.1.17"
36 changes: 8 additions & 28 deletions _delphi_utils_python/delphi_utils/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ def diff_export_csv(
# Code deleted entries as nans with the deleted missing code
deleted_df = before_df.loc[deleted_idx, :].copy()
deleted_df[["val", "se", "sample_size"]] = np.nan
deleted_df[["missing_val", "missing_se", "missing_sample_size"]] = Nans.DELETED
if "missing_val" in after_df_cmn.columns:
deleted_df[["missing_val", "missing_se", "missing_sample_size"]] = Nans.DELETED

return (
deleted_df,
Expand Down Expand Up @@ -254,26 +255,7 @@ def diff_exports(self) -> Tuple[Files, FileDiffMap, Files]:
new_issues_df.to_csv(diff_file, na_rep="NA")
common_diffs[after_file] = diff_file

export_csv_dtypes = {
"geo_id": str, "val": float, "se": float, "sample_size": float,
"missing_val": int, "missing_se": int, "missing_sample_size": int
}

# Replace deleted files with empty versions, but only if the cached version is not
# already empty
deleted_files_nanfilled = []
for deleted_file in deleted_files:
deleted_df = pd.read_csv(deleted_file, dtype=export_csv_dtypes)
print(
f"Diff deleted {deleted_file}; generating corresponding CSV with deleted rows."
)
deleted_df[["val", "se", "sample_size"]] = np.nan
deleted_df[["missing_val", "missing_se", "missing_sample_size"]] = Nans.DELETED
filename = join(self.export_dir, basename(deleted_file))
deleted_df.to_csv(filename, index=False)
deleted_files_nanfilled.append(filename)

return deleted_files_nanfilled, common_diffs, new_files
return deleted_files, common_diffs, new_files

def archive_exports(self, exported_files: Files) -> Tuple[Files, Files]:
"""
Expand All @@ -299,10 +281,9 @@ def filter_exports(self, common_diffs: FileDiffMap):
Filter export directory to only contain relevant files.
Filters down the export_dir to only contain:
1) New files, 2) Changed files, filtered-down to the ADDED and CHANGED rows
only, and 3) Deleted files replaced with empty CSVs with the same name. Should
be called after archive_exports() so we archive the raw exports before potentially
modifying them.
1) New files, 2) Changed files, filtered-down to the ADDED and CHANGED rows only.
Should be called after archive_exports() so we archive the raw exports before
potentially modifying them.
Parameters
----------
Expand Down Expand Up @@ -331,13 +312,12 @@ def run(self):
self.update_cache()

# Diff exports, and make incremental versions
deleted_files, common_diffs, new_files = self.diff_exports()
_, common_diffs, new_files = self.diff_exports()

# Archive changed, new, and emptied deleted files
# Archive changed and new files only
to_archive = [f for f, diff in common_diffs.items()
if diff is not None]
to_archive += new_files
to_archive += deleted_files
_, fails = self.archive_exports(to_archive)

# Filter existing exports to exclude those that failed to archive
Expand Down
16 changes: 13 additions & 3 deletions _delphi_utils_python/delphi_utils/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import Optional
import logging

from epiweeks import Week
import numpy as np
import pandas as pd

Expand Down Expand Up @@ -39,7 +40,8 @@ def create_export_csv(
end_date: Optional[datetime] = None,
remove_null_samples: Optional[bool] = False,
write_empty_days: Optional[bool] = False,
logger: Optional[logging.Logger] = None
logger: Optional[logging.Logger] = None,
weekly_dates = False,
):
"""Export data in the format expected by the Delphi API.
Expand Down Expand Up @@ -68,6 +70,9 @@ def create_export_csv(
even if there is no data for the day. If false, only the days present are written.
logger: Optional[logging.Logger]
Pass a logger object here to log information about contradictory missing codes.
weekly_dates: Optional[bool]
Whether the output data are weekly or not. If True, will prefix files with
"weekly_YYYYWW" where WW is the epiweek instead of the usual YYYYMMDD for daily files.
Returns
---------
Expand All @@ -90,10 +95,15 @@ def create_export_csv(
dates = pd.date_range(start_date, end_date)

for date in dates:
if weekly_dates:
t = Week.fromdate(pd.to_datetime(str(date)))
date_str = "weekly_" + str(t.year) + str(t.week).zfill(2)
else:
date_str = date.strftime('%Y%m%d')
if metric is None:
export_filename = f"{date.strftime('%Y%m%d')}_{geo_res}_{sensor}.csv"
export_filename = f"{date_str}_{geo_res}_{sensor}.csv"
else:
export_filename = f"{date.strftime('%Y%m%d')}_{geo_res}_{metric}_{sensor}.csv"
export_filename = f"{date_str}_{geo_res}_{metric}_{sensor}.csv"
export_file = join(export_dir, export_filename)
expected_columns = [
"geo_id",
Expand Down
8 changes: 4 additions & 4 deletions _delphi_utils_python/delphi_utils/geomap.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def megacounty_creation(
thr_win_len,
thr_col="visits",
fips_col="fips",
date_col="date",
date_col="timestamp",
mega_col="megafips",
):
"""Create megacounty column.
Expand Down Expand Up @@ -340,7 +340,7 @@ def replace_geocode(
new_code,
from_col=None,
new_col=None,
date_col="date",
date_col="timestamp",
data_cols=None,
dropna=True,
):
Expand All @@ -366,7 +366,7 @@ def replace_geocode(
new_code: {'fips', 'zip', 'state_code', 'state_id', 'state_name', 'hrr', 'msa',
'hhs'}
Specifies the geocode type of the data in new_col.
date_col: str or None, default "date"
date_col: str or None, default "timestamp"
Specify which column contains the date values. Used for value aggregation.
If None, then the aggregation is done only on geo_id.
data_cols: list, default None
Expand Down Expand Up @@ -457,7 +457,7 @@ def fips_to_megacounty(
thr_win_len,
thr_col="visits",
fips_col="fips",
date_col="date",
date_col="timestamp",
mega_col="megafips",
count_cols=None,
):
Expand Down
9 changes: 6 additions & 3 deletions _delphi_utils_python/delphi_utils/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from typing import Any, Callable, Dict, Optional
from .archive import ArchiveDiffer, archiver_from_params
from .logger import get_structured_logger
from .utils import read_params
from .utils import read_params, transfer_files
from .validator.validate import Validator
from .validator.run import validator_from_params

Expand Down Expand Up @@ -44,8 +44,11 @@ def run_indicator_pipeline(indicator_fn: Callable[[Params], None],
validation_report.log(get_structured_logger(
name = indicator_fn.__module__,
filename=params["common"].get("log_filename", None)))
if archiver and (not validator or validation_report.success()):
archiver.run()
if (not validator or validation_report.success()):
if archiver:
archiver.run()
if "delivery" in params:
transfer_files()


if __name__ == "__main__":
Expand Down
16 changes: 13 additions & 3 deletions _delphi_utils_python/delphi_utils/utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
"""Read parameter files containing configuration information."""
# -*- coding: utf-8 -*-
from json import load,dump
from os.path import exists
from shutil import copyfile
from shutil import copyfile, move
import os
import sys

def read_params():
Expand All @@ -11,7 +11,7 @@ def read_params():
If the file does not exist, it copies the file 'params.json.template' to
'params.json' and then reads the file.
"""
if not exists("params.json"):
if not os.path.exists("params.json"):
copyfile("params.json.template", "params.json")

with open("params.json", "r") as json_file:
Expand Down Expand Up @@ -87,3 +87,13 @@ def params_run():
with open("params.json", "w") as f:
dump(params, f, sort_keys=True, indent=2)
print(f"Updated {n} items")

def transfer_files():
"""Transfer files to prepare for acquisition."""
params = read_params()
export_dir = params["common"].get("export_dir", None)
delivery_dir = params["delivery"].get("delivery_dir", None)
files_to_export = os.listdir(export_dir)
for file_name in files_to_export:
if file_name.endswith(".csv") or file_name.endswith(".CSV"):
move(os.path.join(export_dir, file_name), delivery_dir)
5 changes: 3 additions & 2 deletions _delphi_utils_python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
required = [
"boto3",
"covidcast",
"epiweeks",
"freezegun",
"gitpython",
"mock",
Expand All @@ -24,7 +25,7 @@

setup(
name="delphi_utils",
version="0.1.12",
version="0.1.17",
description="Shared Utility Functions for Indicators",
long_description=long_description,
long_description_content_type="text/markdown",
Expand All @@ -35,7 +36,7 @@
classifiers=[
"Development Status :: 5 - Production/Stable",
"Intended Audience :: Developers",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
],
packages=find_packages(),
package_data={'': ['data/*.csv']}
Expand Down
Loading

0 comments on commit c9fcb55

Please sign in to comment.