diff --git a/_delphi_utils_python/delphi_utils/export.py b/_delphi_utils_python/delphi_utils/export.py index 82493032e..82a460abe 100644 --- a/_delphi_utils_python/delphi_utils/export.py +++ b/_delphi_utils_python/delphi_utils/export.py @@ -1,6 +1,5 @@ """Export data in the format expected by the Delphi API.""" # -*- coding: utf-8 -*- -import gzip import logging from datetime import datetime from os.path import getsize, join @@ -189,15 +188,21 @@ def create_backup_csv( issue = datetime.today().strftime("%Y%m%d") backup_filename = [issue, geo_res, metric, sensor] - backup_filename = "_".join(filter(None, backup_filename)) + ".csv.gz" + backup_filename = "_".join(filter(None, backup_filename)) backup_file = join(backup_dir, backup_filename) - - with gzip.open(backup_file, "wt", newline="") as f: - df.to_csv(f, index=False, na_rep="NA") - - if logger: - logger.info( - "Backup file created", - backup_file=backup_file, - backup_size=getsize(backup_file), + try: + # defacto data format is csv, but parquet preserved data types (keeping both as intermidary measures) + df.to_csv( + f"{backup_file}.csv.gz", index=False, na_rep="NA", compression="gzip" ) + df.to_parquet(f"{backup_file}.parquet", index=False) + + if logger: + logger.info( + "Backup file created", + backup_file=backup_file, + backup_size=getsize(f"{backup_file}.csv.gz"), + ) + # pylint: disable=W0703 + except Exception as e: + logger.info("Backup file creation failed", msg=e) diff --git a/_delphi_utils_python/pyproject.toml b/_delphi_utils_python/pyproject.toml index c47590a29..9ec3f5fc5 100644 --- a/_delphi_utils_python/pyproject.toml +++ b/_delphi_utils_python/pyproject.toml @@ -23,6 +23,7 @@ dependencies = [ "gitpython", "importlib_resources>=1.3", "numpy", + "pyarrow", "pandas>=1.1.0", "requests", "slackclient", diff --git a/_delphi_utils_python/tests/test_export.py b/_delphi_utils_python/tests/test_export.py index c9c1f8483..1d11b7068 100644 --- a/_delphi_utils_python/tests/test_export.py +++ b/_delphi_utils_python/tests/test_export.py @@ -1,15 +1,16 @@ """Tests for exporting CSV files.""" from datetime import datetime +import logging from os import listdir from os.path import join -from typing import Any, Dict, List +from typing import Any, Dict import mock import numpy as np import pandas as pd from pandas.testing import assert_frame_equal -from delphi_utils import create_export_csv, Nans +from delphi_utils import create_export_csv, Nans, create_backup_csv, get_structured_logger def _set_df_dtypes(df: pd.DataFrame, dtypes: Dict[str, Any]) -> pd.DataFrame: @@ -386,3 +387,22 @@ def test_export_sort(self, tmp_path): }) sorted_csv = _set_df_dtypes(pd.read_csv(join(tmp_path, "20200215_county_test.csv")), dtypes={"geo_id": str}) assert_frame_equal(sorted_csv,expected_df) + + def test_create_backup_regular(self, caplog, tmp_path): + caplog.set_level(logging.INFO) + logger = get_structured_logger() + today = datetime.strftime(datetime.today(), "%Y%m%d") + dtypes = self.DF.dtypes.to_dict() + del dtypes["timestamp"] + geo_res = "county" + metric = "test" + sensor = "deaths" + create_backup_csv(df=self.DF, backup_dir=tmp_path, custom_run=False, issue=None, geo_res=geo_res, metric=metric, sensor=sensor, logger=logger) + assert "Backup file created" in caplog.text + + actual = pd.read_csv(join(tmp_path, f"{today}_{geo_res}_{metric}_{sensor}.csv.gz"), dtype=dtypes, parse_dates=["timestamp"]) + assert self.DF.equals(actual) + + actual_parquet = pd.read_parquet(join(tmp_path, f"{today}_{geo_res}_{metric}_{sensor}.parquet")) + assert actual_parquet.equals(actual) + diff --git a/ansible/templates/nssp-params-prod.json.j2 b/ansible/templates/nssp-params-prod.json.j2 index b131b6130..1ff73d843 100644 --- a/ansible/templates/nssp-params-prod.json.j2 +++ b/ansible/templates/nssp-params-prod.json.j2 @@ -1,6 +1,7 @@ { "common": { "export_dir": "/common/covidcast/receiving/nssp", + "backup_dir": "./raw_data_backups", "log_filename": "/var/log/indicators/nssp.log", "log_exceptions": false }, diff --git a/nchs_mortality/tests/raw_data_backups/.gitignore b/nchs_mortality/tests/raw_data_backups/.gitignore index 2b7efbb36..3d44aa305 100644 --- a/nchs_mortality/tests/raw_data_backups/.gitignore +++ b/nchs_mortality/tests/raw_data_backups/.gitignore @@ -1,2 +1,3 @@ *.csv -*.gz \ No newline at end of file +*.gz +*.parquet \ No newline at end of file diff --git a/nchs_mortality/tests/test_pull.py b/nchs_mortality/tests/test_pull.py index 4f18210f6..39d44d9b3 100644 --- a/nchs_mortality/tests/test_pull.py +++ b/nchs_mortality/tests/test_pull.py @@ -1,7 +1,10 @@ +import glob import os import pytest import pandas as pd + +from delphi_utils import get_structured_logger from delphi_utils.geomap import GeoMapper from delphi_nchs_mortality.pull import pull_nchs_mortality_data, standardize_columns @@ -98,13 +101,30 @@ def test_bad_file_with_missing_cols(self): with pytest.raises(ValueError): pull_nchs_mortality_data(SOCRATA_TOKEN, backup_dir = "", custom_run = True, test_file = "bad_data_with_missing_cols.csv") - def test_backup_today_data(self): + def test_backup_today_data(self, caplog): today = pd.Timestamp.today().strftime("%Y%m%d") backup_dir = "./raw_data_backups" - pull_nchs_mortality_data(SOCRATA_TOKEN, backup_dir = backup_dir, custom_run = False, test_file = "test_data.csv") - backup_file = f"{backup_dir}/{today}.csv.gz" - backup_df = pd.read_csv(backup_file) + logger = get_structured_logger() + pull_nchs_mortality_data(SOCRATA_TOKEN, backup_dir = backup_dir, custom_run = False, test_file = "test_data.csv", logger=logger) + + # Check logger used: + assert "Backup file created" in caplog.text + + # Check that backup file was created + backup_files = glob.glob(f"{backup_dir}/{today}*") + assert len(backup_files) == 2, "Backup file was not created" + source_df = pd.read_csv("test_data/test_data.csv") + for backup_file in backup_files: + if backup_file.endswith(".csv.gz"): + backup_df = pd.read_csv(backup_file) + else: + backup_df = pd.read_parquet(backup_file) + pd.testing.assert_frame_equal(source_df, backup_df) + + backup_file_parquet = f"{backup_dir}/{today}.parquet" + backup_df = pd.read_parquet(backup_file_parquet) pd.testing.assert_frame_equal(source_df, backup_df) + if os.path.exists(backup_file): os.remove(backup_file) diff --git a/nssp/delphi_nssp/pull.py b/nssp/delphi_nssp/pull.py index ece94fab4..de6934bc8 100644 --- a/nssp/delphi_nssp/pull.py +++ b/nssp/delphi_nssp/pull.py @@ -1,9 +1,11 @@ # -*- coding: utf-8 -*- """Functions for pulling NSSP ER data.""" - +import logging import textwrap +from typing import Optional import pandas as pd +from delphi_utils import create_backup_csv from sodapy import Socrata from .constants import NEWLINE, SIGNALS, SIGNALS_MAP, TYPE_DICT @@ -27,7 +29,7 @@ def warn_string(df, type_dict): return warn -def pull_nssp_data(socrata_token: str): +def pull_nssp_data(socrata_token: str, backup_dir: str, custom_run: bool, logger: Optional[logging.Logger] = None): """Pull the latest NSSP ER visits data, and conforms it into a dataset. The output dataset has: @@ -38,9 +40,13 @@ def pull_nssp_data(socrata_token: str): Parameters ---------- socrata_token: str - My App Token for pulling the NWSS data (could be the same as the nchs data) - test_file: Optional[str] - When not null, name of file from which to read test data + My App Token for pulling the NSSP data (could be the same as the nchs data) + backup_dir: str + Directory to which to save raw backup data + custom_run: bool + Flag indicating if the current run is a patch. If so, don't save any data to disk + logger: Optional[logging.Logger] + logger object Returns ------- @@ -59,6 +65,7 @@ def pull_nssp_data(socrata_token: str): results.extend(page) offset += limit df_ervisits = pd.DataFrame.from_records(results) + create_backup_csv(df_ervisits, backup_dir, custom_run, logger=logger) df_ervisits = df_ervisits.rename(columns={"week_end": "timestamp"}) df_ervisits = df_ervisits.rename(columns=SIGNALS_MAP) diff --git a/nssp/delphi_nssp/run.py b/nssp/delphi_nssp/run.py index b22d03c20..b512e8aba 100644 --- a/nssp/delphi_nssp/run.py +++ b/nssp/delphi_nssp/run.py @@ -78,12 +78,14 @@ def run_module(params): log_exceptions=params["common"].get("log_exceptions", True), ) export_dir = params["common"]["export_dir"] + backup_dir = params["common"]["backup_dir"] + custom_run = params["common"].get("custom_run", False) socrata_token = params["indicator"]["socrata_token"] run_stats = [] ## build the base version of the signal at the most detailed geo level you can get. ## compute stuff here or farm out to another function or file - df_pull = pull_nssp_data(socrata_token) + df_pull = pull_nssp_data(socrata_token, backup_dir, custom_run=custom_run, logger=logger) ## aggregate geo_mapper = GeoMapper() for signal in SIGNALS: diff --git a/nssp/params.json.template b/nssp/params.json.template index df989ede7..1805baebb 100644 --- a/nssp/params.json.template +++ b/nssp/params.json.template @@ -1,6 +1,7 @@ { "common": { "export_dir": "./receiving", + "backup_dir": "./raw_data_backups", "log_filename": "./nssp.log", "log_exceptions": false }, diff --git a/nssp/raw_data_backups/.gitignore b/nssp/raw_data_backups/.gitignore new file mode 100644 index 000000000..3c515981b --- /dev/null +++ b/nssp/raw_data_backups/.gitignore @@ -0,0 +1,120 @@ +# You should hard commit a prototype for this file, but we +# want to avoid accidental adding of API tokens and other +# private data parameters +params.json + +# Do not commit output files +receiving/*.csv + +# Remove macOS files +.DS_Store + +# virtual environment +dview/ + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +coverage.xml +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +.static_storage/ +.media/ +local_settings.py + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# pyenv +.python-version + +# celery beat schedule file +celerybeat-schedule + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ \ No newline at end of file diff --git a/nssp/tests/test_pull.py b/nssp/tests/test_pull.py index b356341f6..d0ebbc550 100644 --- a/nssp/tests/test_pull.py +++ b/nssp/tests/test_pull.py @@ -1,29 +1,23 @@ -from datetime import datetime, date +import glob import json -import unittest from unittest.mock import patch, MagicMock -import tempfile import os -import time -from datetime import datetime -import pdb + import pandas as pd -import pandas.api.types as ptypes from delphi_nssp.pull import ( pull_nssp_data, ) -from delphi_nssp.constants import ( - SIGNALS, - NEWLINE, - SIGNALS_MAP, - TYPE_DICT, -) +from delphi_nssp.constants import SIGNALS +from delphi_utils import get_structured_logger -class TestPullNSSPData(unittest.TestCase): +class TestPullNSSPData: @patch("delphi_nssp.pull.Socrata") - def test_pull_nssp_data(self, mock_socrata): + def test_pull_nssp_data(self, mock_socrata, caplog): + today = pd.Timestamp.today().strftime("%Y%m%d") + backup_dir = 'test_raw_data_backups' + # Load test data with open("test_data/page.txt", "r") as f: test_data = json.load(f) @@ -33,10 +27,27 @@ def test_pull_nssp_data(self, mock_socrata): mock_client.get.side_effect = [test_data, []] # Return test data on first call, empty list on second call mock_socrata.return_value = mock_client + custom_run = False + logger = get_structured_logger() # Call function with test token test_token = "test_token" - result = pull_nssp_data(test_token) - print(result) + result = pull_nssp_data(test_token, backup_dir, custom_run, logger) + + # Check logger used: + assert "Backup file created" in caplog.text + + # Check that backup file was created + backup_files = glob.glob(f"{backup_dir}/{today}*") + assert len(backup_files) == 2, "Backup file was not created" + + expected_data = pd.DataFrame(test_data) + for backup_file in backup_files: + if backup_file.endswith(".csv.gz"): + dtypes = expected_data.dtypes.to_dict() + actual_data = pd.read_csv(backup_file, dtype=dtypes) + else: + actual_data = pd.read_parquet(backup_file) + pd.testing.assert_frame_equal(expected_data, actual_data) # Check that Socrata client was initialized with correct arguments mock_socrata.assert_called_once_with("data.cdc.gov", test_token) @@ -55,6 +66,6 @@ def test_pull_nssp_data(self, mock_socrata): for signal in SIGNALS: assert result[signal].notnull().all(), f"{signal} has rogue NaN" - -if __name__ == "__main__": - unittest.main() + # clean up + for file in backup_files: + os.remove(file) \ No newline at end of file diff --git a/nssp/tests/test_raw_data_backups/.gitignore b/nssp/tests/test_raw_data_backups/.gitignore new file mode 100644 index 000000000..4007b678f --- /dev/null +++ b/nssp/tests/test_raw_data_backups/.gitignore @@ -0,0 +1,3 @@ +*.gz +*.csv +*.parquet \ No newline at end of file