From db561a3f519aeb35e303db72b792d8ed4ae85625 Mon Sep 17 00:00:00 2001 From: minhkhul Date: Tue, 29 Oct 2024 11:41:49 -0400 Subject: [PATCH 01/16] base changes --- ansible/templates/nssp-params-prod.json.j2 | 1 + nssp/delphi_nssp/pull.py | 16 +++++++++++----- nssp/delphi_nssp/run.py | 9 ++++++++- nssp/params.json.template | 1 + 4 files changed, 21 insertions(+), 6 deletions(-) diff --git a/ansible/templates/nssp-params-prod.json.j2 b/ansible/templates/nssp-params-prod.json.j2 index b131b6130..1ff73d843 100644 --- a/ansible/templates/nssp-params-prod.json.j2 +++ b/ansible/templates/nssp-params-prod.json.j2 @@ -1,6 +1,7 @@ { "common": { "export_dir": "/common/covidcast/receiving/nssp", + "backup_dir": "./raw_data_backups", "log_filename": "/var/log/indicators/nssp.log", "log_exceptions": false }, diff --git a/nssp/delphi_nssp/pull.py b/nssp/delphi_nssp/pull.py index ece94fab4..f71d84e98 100644 --- a/nssp/delphi_nssp/pull.py +++ b/nssp/delphi_nssp/pull.py @@ -1,10 +1,11 @@ # -*- coding: utf-8 -*- """Functions for pulling NSSP ER data.""" - +import logging import textwrap import pandas as pd from sodapy import Socrata +from delphi_utils import create_backup_csv from .constants import NEWLINE, SIGNALS, SIGNALS_MAP, TYPE_DICT @@ -27,7 +28,7 @@ def warn_string(df, type_dict): return warn -def pull_nssp_data(socrata_token: str): +def pull_nssp_data(socrata_token: str, backup_dir: str, custom_run: bool, logger: Optional[logging.Logger] = None): """Pull the latest NSSP ER visits data, and conforms it into a dataset. The output dataset has: @@ -38,9 +39,13 @@ def pull_nssp_data(socrata_token: str): Parameters ---------- socrata_token: str - My App Token for pulling the NWSS data (could be the same as the nchs data) - test_file: Optional[str] - When not null, name of file from which to read test data + My App Token for pulling the NSSP data (could be the same as the nchs data) + backup_dir: str + Directory to which to save raw backup data + custom_run: bool + Flag indicating if the current run is a patch. If so, don't save any data to disk + logger: Optional[logging.Logger] + logger object Returns ------- @@ -59,6 +64,7 @@ def pull_nssp_data(socrata_token: str): results.extend(page) offset += limit df_ervisits = pd.DataFrame.from_records(results) + create_backup_csv(df_ervisits, backup_dir, custom_run, logger=logger) df_ervisits = df_ervisits.rename(columns={"week_end": "timestamp"}) df_ervisits = df_ervisits.rename(columns=SIGNALS_MAP) diff --git a/nssp/delphi_nssp/run.py b/nssp/delphi_nssp/run.py index b22d03c20..41a458a5d 100644 --- a/nssp/delphi_nssp/run.py +++ b/nssp/delphi_nssp/run.py @@ -78,12 +78,19 @@ def run_module(params): log_exceptions=params["common"].get("log_exceptions", True), ) export_dir = params["common"]["export_dir"] + backup_dir = params["common"]["backup_dir"] + custom_run = params["common"].get("custom_run", False) socrata_token = params["indicator"]["socrata_token"] run_stats = [] ## build the base version of the signal at the most detailed geo level you can get. ## compute stuff here or farm out to another function or file - df_pull = pull_nssp_data(socrata_token) + df_pull = pull_nssp_data( + socrata_token, + backup_dir, + custom_run=custom_run, + logger=logger + ) ## aggregate geo_mapper = GeoMapper() for signal in SIGNALS: diff --git a/nssp/params.json.template b/nssp/params.json.template index df989ede7..1805baebb 100644 --- a/nssp/params.json.template +++ b/nssp/params.json.template @@ -1,6 +1,7 @@ { "common": { "export_dir": "./receiving", + "backup_dir": "./raw_data_backups", "log_filename": "./nssp.log", "log_exceptions": false }, From b9f57548f7dff9cfd7c0bfb2e21b331a8599de8a Mon Sep 17 00:00:00 2001 From: minhkhul Date: Tue, 29 Oct 2024 11:42:34 -0400 Subject: [PATCH 02/16] backup dir --- nssp/raw_data_backups/.gitignore | 120 +++++++++++++++++++++++++++++++ 1 file changed, 120 insertions(+) create mode 100644 nssp/raw_data_backups/.gitignore diff --git a/nssp/raw_data_backups/.gitignore b/nssp/raw_data_backups/.gitignore new file mode 100644 index 000000000..3c515981b --- /dev/null +++ b/nssp/raw_data_backups/.gitignore @@ -0,0 +1,120 @@ +# You should hard commit a prototype for this file, but we +# want to avoid accidental adding of API tokens and other +# private data parameters +params.json + +# Do not commit output files +receiving/*.csv + +# Remove macOS files +.DS_Store + +# virtual environment +dview/ + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +coverage.xml +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +.static_storage/ +.media/ +local_settings.py + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# pyenv +.python-version + +# celery beat schedule file +celerybeat-schedule + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ \ No newline at end of file From 3cbec82510fa18f1cc9859fac70201a16436df8f Mon Sep 17 00:00:00 2001 From: minhkhul Date: Tue, 29 Oct 2024 15:51:21 -0400 Subject: [PATCH 03/16] add test --- nssp/delphi_nssp/pull.py | 1 + nssp/tests/test_pull.py | 18 +++++++++++++++++- nssp/tests/test_raw_data_backups/.gitignore | 2 ++ 3 files changed, 20 insertions(+), 1 deletion(-) create mode 100644 nssp/tests/test_raw_data_backups/.gitignore diff --git a/nssp/delphi_nssp/pull.py b/nssp/delphi_nssp/pull.py index f71d84e98..d633eb6dd 100644 --- a/nssp/delphi_nssp/pull.py +++ b/nssp/delphi_nssp/pull.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- """Functions for pulling NSSP ER data.""" +from typing import Optional import logging import textwrap diff --git a/nssp/tests/test_pull.py b/nssp/tests/test_pull.py index b356341f6..1470265d8 100644 --- a/nssp/tests/test_pull.py +++ b/nssp/tests/test_pull.py @@ -33,11 +33,27 @@ def test_pull_nssp_data(self, mock_socrata): mock_client.get.side_effect = [test_data, []] # Return test data on first call, empty list on second call mock_socrata.return_value = mock_client + backup_dir = 'test_raw_data_backups' + # Clear old backup files + for file in os.listdir(backup_dir): + os.remove(os.path.join(backup_dir, file)) + + custom_run = False + + mock_logger = MagicMock() + # Call function with test token test_token = "test_token" - result = pull_nssp_data(test_token) + result = pull_nssp_data(test_token, backup_dir, custom_run, mock_logger) print(result) + # Check logger used: + mock_logger.info.assert_called() + + # Check that backup file was created + backup_files = os.listdir(backup_dir) + assert len(backup_files) == 1, "Backup file was not created" + # Check that Socrata client was initialized with correct arguments mock_socrata.assert_called_once_with("data.cdc.gov", test_token) diff --git a/nssp/tests/test_raw_data_backups/.gitignore b/nssp/tests/test_raw_data_backups/.gitignore new file mode 100644 index 000000000..c6cc3735a --- /dev/null +++ b/nssp/tests/test_raw_data_backups/.gitignore @@ -0,0 +1,2 @@ +*.gz +*.csv \ No newline at end of file From 1071cb3f0f1c1d5541ddd5dabb86bc7684730430 Mon Sep 17 00:00:00 2001 From: minhkhul Date: Tue, 29 Oct 2024 15:52:48 -0400 Subject: [PATCH 04/16] lint --- nssp/delphi_nssp/pull.py | 4 ++-- nssp/delphi_nssp/run.py | 7 +------ 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/nssp/delphi_nssp/pull.py b/nssp/delphi_nssp/pull.py index d633eb6dd..de6934bc8 100644 --- a/nssp/delphi_nssp/pull.py +++ b/nssp/delphi_nssp/pull.py @@ -1,12 +1,12 @@ # -*- coding: utf-8 -*- """Functions for pulling NSSP ER data.""" -from typing import Optional import logging import textwrap +from typing import Optional import pandas as pd -from sodapy import Socrata from delphi_utils import create_backup_csv +from sodapy import Socrata from .constants import NEWLINE, SIGNALS, SIGNALS_MAP, TYPE_DICT diff --git a/nssp/delphi_nssp/run.py b/nssp/delphi_nssp/run.py index 41a458a5d..b512e8aba 100644 --- a/nssp/delphi_nssp/run.py +++ b/nssp/delphi_nssp/run.py @@ -85,12 +85,7 @@ def run_module(params): run_stats = [] ## build the base version of the signal at the most detailed geo level you can get. ## compute stuff here or farm out to another function or file - df_pull = pull_nssp_data( - socrata_token, - backup_dir, - custom_run=custom_run, - logger=logger - ) + df_pull = pull_nssp_data(socrata_token, backup_dir, custom_run=custom_run, logger=logger) ## aggregate geo_mapper = GeoMapper() for signal in SIGNALS: From 870024eab8a0186b8c8a278fcb34703c1eaefdec Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Wed, 6 Nov 2024 12:16:58 -0500 Subject: [PATCH 05/16] adding tests for create_backup_csv --- _delphi_utils_python/delphi_utils/export.py | 23 ++++++++++++--------- _delphi_utils_python/tests/test_export.py | 20 ++++++++++++++++-- 2 files changed, 31 insertions(+), 12 deletions(-) diff --git a/_delphi_utils_python/delphi_utils/export.py b/_delphi_utils_python/delphi_utils/export.py index 82493032e..c4d5c28df 100644 --- a/_delphi_utils_python/delphi_utils/export.py +++ b/_delphi_utils_python/delphi_utils/export.py @@ -191,13 +191,16 @@ def create_backup_csv( backup_filename = [issue, geo_res, metric, sensor] backup_filename = "_".join(filter(None, backup_filename)) + ".csv.gz" backup_file = join(backup_dir, backup_filename) - - with gzip.open(backup_file, "wt", newline="") as f: - df.to_csv(f, index=False, na_rep="NA") - - if logger: - logger.info( - "Backup file created", - backup_file=backup_file, - backup_size=getsize(backup_file), - ) + try: + with gzip.open(backup_file, "wt", newline="") as f: + df.to_csv(f, index=False, na_rep="NA") + + if logger: + logger.info( + "Backup file created", + backup_file=backup_file, + backup_size=getsize(backup_file), + ) + #pylint: disable=W0703 + except Exception as e: + logger.info("Backup file creation failed", msg=e) diff --git a/_delphi_utils_python/tests/test_export.py b/_delphi_utils_python/tests/test_export.py index c9c1f8483..06c8888b6 100644 --- a/_delphi_utils_python/tests/test_export.py +++ b/_delphi_utils_python/tests/test_export.py @@ -1,15 +1,16 @@ """Tests for exporting CSV files.""" from datetime import datetime +import logging from os import listdir from os.path import join -from typing import Any, Dict, List +from typing import Any, Dict import mock import numpy as np import pandas as pd from pandas.testing import assert_frame_equal -from delphi_utils import create_export_csv, Nans +from delphi_utils import create_export_csv, Nans, create_backup_csv, get_structured_logger def _set_df_dtypes(df: pd.DataFrame, dtypes: Dict[str, Any]) -> pd.DataFrame: @@ -386,3 +387,18 @@ def test_export_sort(self, tmp_path): }) sorted_csv = _set_df_dtypes(pd.read_csv(join(tmp_path, "20200215_county_test.csv")), dtypes={"geo_id": str}) assert_frame_equal(sorted_csv,expected_df) + + def test_create_backup_regular(self, caplog, tmp_path): + caplog.set_level(logging.INFO) + logger = get_structured_logger() + today = datetime.strftime(datetime.today(), "%Y%m%d") + dtypes = self.DF.dtypes.to_dict() + del dtypes["timestamp"] + geo_res = "county" + metric = "test" + sensor = "deaths" + create_backup_csv(df=self.DF, backup_dir=tmp_path, custom_run=False, issue=None, geo_res=geo_res, metric=metric, sensor=sensor, logger=logger) + assert "Backup file created" in caplog.text + + actual = pd.read_csv(join(tmp_path, f"{today}_{geo_res}_{metric}_{sensor}.csv.gz"), dtype=dtypes, parse_dates=["timestamp"]) + assert self.DF.equals(actual) From cbc458b12faf2db084a95c8f969134d69cbde3a8 Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Wed, 6 Nov 2024 14:18:43 -0500 Subject: [PATCH 06/16] also writing into parquet --- _delphi_utils_python/delphi_utils/export.py | 9 +++++---- _delphi_utils_python/tests/test_export.py | 4 ++++ 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/_delphi_utils_python/delphi_utils/export.py b/_delphi_utils_python/delphi_utils/export.py index c4d5c28df..101c0a572 100644 --- a/_delphi_utils_python/delphi_utils/export.py +++ b/_delphi_utils_python/delphi_utils/export.py @@ -189,17 +189,18 @@ def create_backup_csv( issue = datetime.today().strftime("%Y%m%d") backup_filename = [issue, geo_res, metric, sensor] - backup_filename = "_".join(filter(None, backup_filename)) + ".csv.gz" + backup_filename = "_".join(filter(None, backup_filename)) backup_file = join(backup_dir, backup_filename) try: - with gzip.open(backup_file, "wt", newline="") as f: - df.to_csv(f, index=False, na_rep="NA") + # defacto data format is csv, but parquet preserved data types (keeping both as intermidary measures) + df.to_csv(f"{backup_file}.csv.gz", index=False, na_rep="NA", compression='gzip') + df.to_parquet(f"{backup_file}.parquet", index=False) if logger: logger.info( "Backup file created", backup_file=backup_file, - backup_size=getsize(backup_file), + backup_size=getsize(f"{backup_file}.csv.gz"), ) #pylint: disable=W0703 except Exception as e: diff --git a/_delphi_utils_python/tests/test_export.py b/_delphi_utils_python/tests/test_export.py index 06c8888b6..1d11b7068 100644 --- a/_delphi_utils_python/tests/test_export.py +++ b/_delphi_utils_python/tests/test_export.py @@ -402,3 +402,7 @@ def test_create_backup_regular(self, caplog, tmp_path): actual = pd.read_csv(join(tmp_path, f"{today}_{geo_res}_{metric}_{sensor}.csv.gz"), dtype=dtypes, parse_dates=["timestamp"]) assert self.DF.equals(actual) + + actual_parquet = pd.read_parquet(join(tmp_path, f"{today}_{geo_res}_{metric}_{sensor}.parquet")) + assert actual_parquet.equals(actual) + From 5901c79763c67c6af770f6b1829c6f3800d09987 Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Wed, 6 Nov 2024 16:50:37 -0500 Subject: [PATCH 07/16] adding pyarrow as dependency --- nchs_mortality/setup.py | 1 + nssp/setup.py | 1 + 2 files changed, 2 insertions(+) diff --git a/nchs_mortality/setup.py b/nchs_mortality/setup.py index 142db6f35..74244780b 100644 --- a/nchs_mortality/setup.py +++ b/nchs_mortality/setup.py @@ -13,6 +13,7 @@ "pydocstyle", "pylint==2.8.3", "pytest-cov", + "pyarrow", "pytest", "sodapy", ] diff --git a/nssp/setup.py b/nssp/setup.py index a6cbf640a..885539c8d 100644 --- a/nssp/setup.py +++ b/nssp/setup.py @@ -12,6 +12,7 @@ "sodapy", "epiweeks", "freezegun", + "pyarrow", "us", ] From 97a2effea4a3757977bc472c77da84633b2cd3ca Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Wed, 6 Nov 2024 17:00:03 -0500 Subject: [PATCH 08/16] clean test --- nssp/tests/test_pull.py | 32 ++++++++------------------------ 1 file changed, 8 insertions(+), 24 deletions(-) diff --git a/nssp/tests/test_pull.py b/nssp/tests/test_pull.py index 1470265d8..379369f92 100644 --- a/nssp/tests/test_pull.py +++ b/nssp/tests/test_pull.py @@ -1,29 +1,17 @@ -from datetime import datetime, date import json -import unittest from unittest.mock import patch, MagicMock -import tempfile import os -import time -from datetime import datetime -import pdb -import pandas as pd -import pandas.api.types as ptypes from delphi_nssp.pull import ( pull_nssp_data, ) -from delphi_nssp.constants import ( - SIGNALS, - NEWLINE, - SIGNALS_MAP, - TYPE_DICT, -) +from delphi_nssp.constants import SIGNALS +from delphi_utils import get_structured_logger -class TestPullNSSPData(unittest.TestCase): +class TestPullNSSPData: @patch("delphi_nssp.pull.Socrata") - def test_pull_nssp_data(self, mock_socrata): + def test_pull_nssp_data(self, mock_socrata, caplog): # Load test data with open("test_data/page.txt", "r") as f: test_data = json.load(f) @@ -40,19 +28,18 @@ def test_pull_nssp_data(self, mock_socrata): custom_run = False - mock_logger = MagicMock() + logger = get_structured_logger() # Call function with test token test_token = "test_token" - result = pull_nssp_data(test_token, backup_dir, custom_run, mock_logger) - print(result) + result = pull_nssp_data(test_token, backup_dir, custom_run, logger) # Check logger used: - mock_logger.info.assert_called() + assert "Backup file created" in caplog.text # Check that backup file was created backup_files = os.listdir(backup_dir) - assert len(backup_files) == 1, "Backup file was not created" + assert len(backup_files) == 2, "Backup file was not created" # Check that Socrata client was initialized with correct arguments mock_socrata.assert_called_once_with("data.cdc.gov", test_token) @@ -71,6 +58,3 @@ def test_pull_nssp_data(self, mock_socrata): for signal in SIGNALS: assert result[signal].notnull().all(), f"{signal} has rogue NaN" - -if __name__ == "__main__": - unittest.main() From b24b4bf1f160d0ac5be72dec8e677584165f9ba9 Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Wed, 6 Nov 2024 17:03:25 -0500 Subject: [PATCH 09/16] adjusting logic to match new naming format and chunking --- _delphi_utils_python/delphi_utils/export.py | 1 - 1 file changed, 1 deletion(-) diff --git a/_delphi_utils_python/delphi_utils/export.py b/_delphi_utils_python/delphi_utils/export.py index 101c0a572..f2207adf2 100644 --- a/_delphi_utils_python/delphi_utils/export.py +++ b/_delphi_utils_python/delphi_utils/export.py @@ -1,6 +1,5 @@ """Export data in the format expected by the Delphi API.""" # -*- coding: utf-8 -*- -import gzip import logging from datetime import datetime from os.path import getsize, join From 78ace13c358f688ec5d6ff44805acafe9dd4bc06 Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Thu, 7 Nov 2024 10:18:39 -0500 Subject: [PATCH 10/16] moving dependencies --- _delphi_utils_python/pyproject.toml | 1 + nchs_mortality/setup.py | 1 - nssp/setup.py | 1 - 3 files changed, 1 insertion(+), 2 deletions(-) diff --git a/_delphi_utils_python/pyproject.toml b/_delphi_utils_python/pyproject.toml index c47590a29..9ec3f5fc5 100644 --- a/_delphi_utils_python/pyproject.toml +++ b/_delphi_utils_python/pyproject.toml @@ -23,6 +23,7 @@ dependencies = [ "gitpython", "importlib_resources>=1.3", "numpy", + "pyarrow", "pandas>=1.1.0", "requests", "slackclient", diff --git a/nchs_mortality/setup.py b/nchs_mortality/setup.py index 74244780b..142db6f35 100644 --- a/nchs_mortality/setup.py +++ b/nchs_mortality/setup.py @@ -13,7 +13,6 @@ "pydocstyle", "pylint==2.8.3", "pytest-cov", - "pyarrow", "pytest", "sodapy", ] diff --git a/nssp/setup.py b/nssp/setup.py index 885539c8d..a6cbf640a 100644 --- a/nssp/setup.py +++ b/nssp/setup.py @@ -12,7 +12,6 @@ "sodapy", "epiweeks", "freezegun", - "pyarrow", "us", ] From b3da58bee6bfdca663a04cc2f44861fba8c91b34 Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Thu, 7 Nov 2024 14:22:00 -0500 Subject: [PATCH 11/16] lint --- _delphi_utils_python/delphi_utils/export.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/_delphi_utils_python/delphi_utils/export.py b/_delphi_utils_python/delphi_utils/export.py index f2207adf2..a145b8aa3 100644 --- a/_delphi_utils_python/delphi_utils/export.py +++ b/_delphi_utils_python/delphi_utils/export.py @@ -192,7 +192,7 @@ def create_backup_csv( backup_file = join(backup_dir, backup_filename) try: # defacto data format is csv, but parquet preserved data types (keeping both as intermidary measures) - df.to_csv(f"{backup_file}.csv.gz", index=False, na_rep="NA", compression='gzip') + df.to_csv(f"{backup_file}.csv.gz", index=False, na_rep="NA", compression="gzip") df.to_parquet(f"{backup_file}.parquet", index=False) if logger: @@ -201,6 +201,6 @@ def create_backup_csv( backup_file=backup_file, backup_size=getsize(f"{backup_file}.csv.gz"), ) - #pylint: disable=W0703 + # pylint: disable=W0703 except Exception as e: logger.info("Backup file creation failed", msg=e) From c471c5f339efe29f9e369c8b7dc64152325888be Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Thu, 7 Nov 2024 14:40:07 -0500 Subject: [PATCH 12/16] made test more robust --- nchs_mortality/tests/test_pull.py | 28 ++++++++++++++++++++++++---- nssp/tests/test_pull.py | 10 ++++++++++ 2 files changed, 34 insertions(+), 4 deletions(-) diff --git a/nchs_mortality/tests/test_pull.py b/nchs_mortality/tests/test_pull.py index 4f18210f6..39d44d9b3 100644 --- a/nchs_mortality/tests/test_pull.py +++ b/nchs_mortality/tests/test_pull.py @@ -1,7 +1,10 @@ +import glob import os import pytest import pandas as pd + +from delphi_utils import get_structured_logger from delphi_utils.geomap import GeoMapper from delphi_nchs_mortality.pull import pull_nchs_mortality_data, standardize_columns @@ -98,13 +101,30 @@ def test_bad_file_with_missing_cols(self): with pytest.raises(ValueError): pull_nchs_mortality_data(SOCRATA_TOKEN, backup_dir = "", custom_run = True, test_file = "bad_data_with_missing_cols.csv") - def test_backup_today_data(self): + def test_backup_today_data(self, caplog): today = pd.Timestamp.today().strftime("%Y%m%d") backup_dir = "./raw_data_backups" - pull_nchs_mortality_data(SOCRATA_TOKEN, backup_dir = backup_dir, custom_run = False, test_file = "test_data.csv") - backup_file = f"{backup_dir}/{today}.csv.gz" - backup_df = pd.read_csv(backup_file) + logger = get_structured_logger() + pull_nchs_mortality_data(SOCRATA_TOKEN, backup_dir = backup_dir, custom_run = False, test_file = "test_data.csv", logger=logger) + + # Check logger used: + assert "Backup file created" in caplog.text + + # Check that backup file was created + backup_files = glob.glob(f"{backup_dir}/{today}*") + assert len(backup_files) == 2, "Backup file was not created" + source_df = pd.read_csv("test_data/test_data.csv") + for backup_file in backup_files: + if backup_file.endswith(".csv.gz"): + backup_df = pd.read_csv(backup_file) + else: + backup_df = pd.read_parquet(backup_file) + pd.testing.assert_frame_equal(source_df, backup_df) + + backup_file_parquet = f"{backup_dir}/{today}.parquet" + backup_df = pd.read_parquet(backup_file_parquet) pd.testing.assert_frame_equal(source_df, backup_df) + if os.path.exists(backup_file): os.remove(backup_file) diff --git a/nssp/tests/test_pull.py b/nssp/tests/test_pull.py index 379369f92..5cef22a52 100644 --- a/nssp/tests/test_pull.py +++ b/nssp/tests/test_pull.py @@ -2,6 +2,8 @@ from unittest.mock import patch, MagicMock import os +import pandas as pd + from delphi_nssp.pull import ( pull_nssp_data, ) @@ -41,6 +43,14 @@ def test_pull_nssp_data(self, mock_socrata, caplog): backup_files = os.listdir(backup_dir) assert len(backup_files) == 2, "Backup file was not created" + expected_data = pd.DataFrame(test_data) + for backup_file in backup_files: + if backup_file.endswith(".csv.gz"): + actual_data = pd.read_csv(os.path.join(backup_dir, backup_file)) + else: + actual_data = pd.read_parquet(os.path.join(backup_dir, backup_file)) + pd.testing.assert_frame_equal(expected_data, actual_data) + # Check that Socrata client was initialized with correct arguments mock_socrata.assert_called_once_with("data.cdc.gov", test_token) From 188a0afd20c563ee88f332a526896da82c99f00c Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Thu, 7 Nov 2024 14:44:01 -0500 Subject: [PATCH 13/16] fix test --- nssp/tests/test_pull.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/nssp/tests/test_pull.py b/nssp/tests/test_pull.py index 5cef22a52..692568c0b 100644 --- a/nssp/tests/test_pull.py +++ b/nssp/tests/test_pull.py @@ -46,7 +46,8 @@ def test_pull_nssp_data(self, mock_socrata, caplog): expected_data = pd.DataFrame(test_data) for backup_file in backup_files: if backup_file.endswith(".csv.gz"): - actual_data = pd.read_csv(os.path.join(backup_dir, backup_file)) + dtypes = expected_data.dtypes.to_dict() + actual_data = pd.read_csv(os.path.join(backup_dir, backup_file), dtype=dtypes) else: actual_data = pd.read_parquet(os.path.join(backup_dir, backup_file)) pd.testing.assert_frame_equal(expected_data, actual_data) From 4e4700449c2aa85379598bf0e423c8d5ff47d6d4 Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Thu, 7 Nov 2024 14:51:25 -0500 Subject: [PATCH 14/16] clean up --- .../tests/raw_data_backups/.gitignore | 3 ++- nssp/tests/test_pull.py | 20 +++++++++---------- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/nchs_mortality/tests/raw_data_backups/.gitignore b/nchs_mortality/tests/raw_data_backups/.gitignore index 2b7efbb36..3d44aa305 100644 --- a/nchs_mortality/tests/raw_data_backups/.gitignore +++ b/nchs_mortality/tests/raw_data_backups/.gitignore @@ -1,2 +1,3 @@ *.csv -*.gz \ No newline at end of file +*.gz +*.parquet \ No newline at end of file diff --git a/nssp/tests/test_pull.py b/nssp/tests/test_pull.py index 692568c0b..d0ebbc550 100644 --- a/nssp/tests/test_pull.py +++ b/nssp/tests/test_pull.py @@ -1,3 +1,4 @@ +import glob import json from unittest.mock import patch, MagicMock import os @@ -14,6 +15,9 @@ class TestPullNSSPData: @patch("delphi_nssp.pull.Socrata") def test_pull_nssp_data(self, mock_socrata, caplog): + today = pd.Timestamp.today().strftime("%Y%m%d") + backup_dir = 'test_raw_data_backups' + # Load test data with open("test_data/page.txt", "r") as f: test_data = json.load(f) @@ -23,15 +27,8 @@ def test_pull_nssp_data(self, mock_socrata, caplog): mock_client.get.side_effect = [test_data, []] # Return test data on first call, empty list on second call mock_socrata.return_value = mock_client - backup_dir = 'test_raw_data_backups' - # Clear old backup files - for file in os.listdir(backup_dir): - os.remove(os.path.join(backup_dir, file)) - custom_run = False - logger = get_structured_logger() - # Call function with test token test_token = "test_token" result = pull_nssp_data(test_token, backup_dir, custom_run, logger) @@ -40,16 +37,16 @@ def test_pull_nssp_data(self, mock_socrata, caplog): assert "Backup file created" in caplog.text # Check that backup file was created - backup_files = os.listdir(backup_dir) + backup_files = glob.glob(f"{backup_dir}/{today}*") assert len(backup_files) == 2, "Backup file was not created" expected_data = pd.DataFrame(test_data) for backup_file in backup_files: if backup_file.endswith(".csv.gz"): dtypes = expected_data.dtypes.to_dict() - actual_data = pd.read_csv(os.path.join(backup_dir, backup_file), dtype=dtypes) + actual_data = pd.read_csv(backup_file, dtype=dtypes) else: - actual_data = pd.read_parquet(os.path.join(backup_dir, backup_file)) + actual_data = pd.read_parquet(backup_file) pd.testing.assert_frame_equal(expected_data, actual_data) # Check that Socrata client was initialized with correct arguments @@ -69,3 +66,6 @@ def test_pull_nssp_data(self, mock_socrata, caplog): for signal in SIGNALS: assert result[signal].notnull().all(), f"{signal} has rogue NaN" + # clean up + for file in backup_files: + os.remove(file) \ No newline at end of file From e2aa3e05182efdde3273b9e2f0f7bf8171de5780 Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Thu, 7 Nov 2024 14:55:31 -0500 Subject: [PATCH 15/16] adding parqut into gitignore --- nssp/tests/test_raw_data_backups/.gitignore | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/nssp/tests/test_raw_data_backups/.gitignore b/nssp/tests/test_raw_data_backups/.gitignore index c6cc3735a..4007b678f 100644 --- a/nssp/tests/test_raw_data_backups/.gitignore +++ b/nssp/tests/test_raw_data_backups/.gitignore @@ -1,2 +1,3 @@ *.gz -*.csv \ No newline at end of file +*.csv +*.parquet \ No newline at end of file From f5257277455772c08e045a777f417403549bfcf2 Mon Sep 17 00:00:00 2001 From: minhkhul Date: Thu, 7 Nov 2024 18:06:39 -0500 Subject: [PATCH 16/16] placate the linter --- _delphi_utils_python/delphi_utils/export.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/_delphi_utils_python/delphi_utils/export.py b/_delphi_utils_python/delphi_utils/export.py index a145b8aa3..82a460abe 100644 --- a/_delphi_utils_python/delphi_utils/export.py +++ b/_delphi_utils_python/delphi_utils/export.py @@ -192,7 +192,9 @@ def create_backup_csv( backup_file = join(backup_dir, backup_filename) try: # defacto data format is csv, but parquet preserved data types (keeping both as intermidary measures) - df.to_csv(f"{backup_file}.csv.gz", index=False, na_rep="NA", compression="gzip") + df.to_csv( + f"{backup_file}.csv.gz", index=False, na_rep="NA", compression="gzip" + ) df.to_parquet(f"{backup_file}.parquet", index=False) if logger: