Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 16 additions & 11 deletions _delphi_utils_python/delphi_utils/export.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""Export data in the format expected by the Delphi API."""
# -*- coding: utf-8 -*-
import gzip
import logging
from datetime import datetime
from os.path import getsize, join
Expand Down Expand Up @@ -189,15 +188,21 @@ def create_backup_csv(
issue = datetime.today().strftime("%Y%m%d")

backup_filename = [issue, geo_res, metric, sensor]
backup_filename = "_".join(filter(None, backup_filename)) + ".csv.gz"
backup_filename = "_".join(filter(None, backup_filename))
backup_file = join(backup_dir, backup_filename)

with gzip.open(backup_file, "wt", newline="") as f:
df.to_csv(f, index=False, na_rep="NA")

if logger:
logger.info(
"Backup file created",
backup_file=backup_file,
backup_size=getsize(backup_file),
try:
# defacto data format is csv, but parquet preserved data types (keeping both as intermidary measures)
df.to_csv(
f"{backup_file}.csv.gz", index=False, na_rep="NA", compression="gzip"
)
df.to_parquet(f"{backup_file}.parquet", index=False)

if logger:
logger.info(
"Backup file created",
backup_file=backup_file,
backup_size=getsize(f"{backup_file}.csv.gz"),
)
# pylint: disable=W0703
except Exception as e:
logger.info("Backup file creation failed", msg=e)
1 change: 1 addition & 0 deletions _delphi_utils_python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ dependencies = [
"gitpython",
"importlib_resources>=1.3",
"numpy",
"pyarrow",
"pandas>=1.1.0",
"requests",
"slackclient",
Expand Down
24 changes: 22 additions & 2 deletions _delphi_utils_python/tests/test_export.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
"""Tests for exporting CSV files."""
from datetime import datetime
import logging
from os import listdir
from os.path import join
from typing import Any, Dict, List
from typing import Any, Dict

import mock
import numpy as np
import pandas as pd
from pandas.testing import assert_frame_equal

from delphi_utils import create_export_csv, Nans
from delphi_utils import create_export_csv, Nans, create_backup_csv, get_structured_logger


def _set_df_dtypes(df: pd.DataFrame, dtypes: Dict[str, Any]) -> pd.DataFrame:
Expand Down Expand Up @@ -386,3 +387,22 @@ def test_export_sort(self, tmp_path):
})
sorted_csv = _set_df_dtypes(pd.read_csv(join(tmp_path, "20200215_county_test.csv")), dtypes={"geo_id": str})
assert_frame_equal(sorted_csv,expected_df)

def test_create_backup_regular(self, caplog, tmp_path):
caplog.set_level(logging.INFO)
logger = get_structured_logger()
today = datetime.strftime(datetime.today(), "%Y%m%d")
dtypes = self.DF.dtypes.to_dict()
del dtypes["timestamp"]
geo_res = "county"
metric = "test"
sensor = "deaths"
create_backup_csv(df=self.DF, backup_dir=tmp_path, custom_run=False, issue=None, geo_res=geo_res, metric=metric, sensor=sensor, logger=logger)
assert "Backup file created" in caplog.text

actual = pd.read_csv(join(tmp_path, f"{today}_{geo_res}_{metric}_{sensor}.csv.gz"), dtype=dtypes, parse_dates=["timestamp"])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nmdefries this gets me bit worried... when I was writing this, when we write to csv, the datatypes of the columns are not preserved for csv. Is there a particular reason we want to use csv.gz compared to parquet/other data format?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically, status quo. We use CSV for all of our other data storage (meaning we don't preserve data types anywhere else). Since these files are intended to be backups/not in regular use, having them mesh well with other systems doesn't matter so much. I don't have a strong preference. I'm worried, though, that using a different less-standard format will cause more engineering effort here. One of our goals for this was quick rollout.

The pro of using a data-type preserving format is that it lets us read these files in and continue the pipeline as if we were pulling from the source and doing our normal processing.

@minhkhul thoughts? Maybe y'all could briefly talk to Adam about this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

talked with adam and decided on storing both csv and parquet for now; shouldn't be a problem for a while as both nssp and nchs is small

assert self.DF.equals(actual)

actual_parquet = pd.read_parquet(join(tmp_path, f"{today}_{geo_res}_{metric}_{sensor}.parquet"))
assert actual_parquet.equals(actual)

1 change: 1 addition & 0 deletions ansible/templates/nssp-params-prod.json.j2
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"common": {
"export_dir": "/common/covidcast/receiving/nssp",
"backup_dir": "./raw_data_backups",
"log_filename": "/var/log/indicators/nssp.log",
"log_exceptions": false
},
Expand Down
3 changes: 2 additions & 1 deletion nchs_mortality/tests/raw_data_backups/.gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
*.csv
*.gz
*.gz
*.parquet
28 changes: 24 additions & 4 deletions nchs_mortality/tests/test_pull.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import glob
import os
import pytest

import pandas as pd

from delphi_utils import get_structured_logger
from delphi_utils.geomap import GeoMapper

from delphi_nchs_mortality.pull import pull_nchs_mortality_data, standardize_columns
Expand Down Expand Up @@ -98,13 +101,30 @@ def test_bad_file_with_missing_cols(self):
with pytest.raises(ValueError):
pull_nchs_mortality_data(SOCRATA_TOKEN, backup_dir = "", custom_run = True, test_file = "bad_data_with_missing_cols.csv")

def test_backup_today_data(self):
def test_backup_today_data(self, caplog):
today = pd.Timestamp.today().strftime("%Y%m%d")
backup_dir = "./raw_data_backups"
pull_nchs_mortality_data(SOCRATA_TOKEN, backup_dir = backup_dir, custom_run = False, test_file = "test_data.csv")
backup_file = f"{backup_dir}/{today}.csv.gz"
backup_df = pd.read_csv(backup_file)
logger = get_structured_logger()
pull_nchs_mortality_data(SOCRATA_TOKEN, backup_dir = backup_dir, custom_run = False, test_file = "test_data.csv", logger=logger)

# Check logger used:
assert "Backup file created" in caplog.text

# Check that backup file was created
backup_files = glob.glob(f"{backup_dir}/{today}*")
assert len(backup_files) == 2, "Backup file was not created"

source_df = pd.read_csv("test_data/test_data.csv")
for backup_file in backup_files:
if backup_file.endswith(".csv.gz"):
backup_df = pd.read_csv(backup_file)
else:
backup_df = pd.read_parquet(backup_file)
pd.testing.assert_frame_equal(source_df, backup_df)

backup_file_parquet = f"{backup_dir}/{today}.parquet"
backup_df = pd.read_parquet(backup_file_parquet)
pd.testing.assert_frame_equal(source_df, backup_df)

if os.path.exists(backup_file):
os.remove(backup_file)
17 changes: 12 additions & 5 deletions nssp/delphi_nssp/pull.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
# -*- coding: utf-8 -*-
"""Functions for pulling NSSP ER data."""

import logging
import textwrap
from typing import Optional

import pandas as pd
from delphi_utils import create_backup_csv
from sodapy import Socrata

from .constants import NEWLINE, SIGNALS, SIGNALS_MAP, TYPE_DICT
Expand All @@ -27,7 +29,7 @@ def warn_string(df, type_dict):
return warn


def pull_nssp_data(socrata_token: str):
def pull_nssp_data(socrata_token: str, backup_dir: str, custom_run: bool, logger: Optional[logging.Logger] = None):
"""Pull the latest NSSP ER visits data, and conforms it into a dataset.

The output dataset has:
Expand All @@ -38,9 +40,13 @@ def pull_nssp_data(socrata_token: str):
Parameters
----------
socrata_token: str
My App Token for pulling the NWSS data (could be the same as the nchs data)
test_file: Optional[str]
When not null, name of file from which to read test data
My App Token for pulling the NSSP data (could be the same as the nchs data)
backup_dir: str
Directory to which to save raw backup data
custom_run: bool
Flag indicating if the current run is a patch. If so, don't save any data to disk
logger: Optional[logging.Logger]
logger object

Returns
-------
Expand All @@ -59,6 +65,7 @@ def pull_nssp_data(socrata_token: str):
results.extend(page)
offset += limit
df_ervisits = pd.DataFrame.from_records(results)
create_backup_csv(df_ervisits, backup_dir, custom_run, logger=logger)
df_ervisits = df_ervisits.rename(columns={"week_end": "timestamp"})
df_ervisits = df_ervisits.rename(columns=SIGNALS_MAP)

Expand Down
4 changes: 3 additions & 1 deletion nssp/delphi_nssp/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,14 @@ def run_module(params):
log_exceptions=params["common"].get("log_exceptions", True),
)
export_dir = params["common"]["export_dir"]
backup_dir = params["common"]["backup_dir"]
custom_run = params["common"].get("custom_run", False)
socrata_token = params["indicator"]["socrata_token"]

run_stats = []
## build the base version of the signal at the most detailed geo level you can get.
## compute stuff here or farm out to another function or file
df_pull = pull_nssp_data(socrata_token)
df_pull = pull_nssp_data(socrata_token, backup_dir, custom_run=custom_run, logger=logger)
## aggregate
geo_mapper = GeoMapper()
for signal in SIGNALS:
Expand Down
1 change: 1 addition & 0 deletions nssp/params.json.template
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"common": {
"export_dir": "./receiving",
"backup_dir": "./raw_data_backups",
"log_filename": "./nssp.log",
"log_exceptions": false
},
Expand Down
120 changes: 120 additions & 0 deletions nssp/raw_data_backups/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# You should hard commit a prototype for this file, but we
# want to avoid accidental adding of API tokens and other
# private data parameters
params.json

# Do not commit output files
receiving/*.csv

# Remove macOS files
.DS_Store

# virtual environment
dview/

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
coverage.xml
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
.pytest_cache/

# Translations
*.mo
*.pot

# Django stuff:
*.log
.static_storage/
.media/
local_settings.py

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
target/

# Jupyter Notebook
.ipynb_checkpoints

# pyenv
.python-version

# celery beat schedule file
celerybeat-schedule

# SageMath parsed files
*.sage.py

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# mypy
.mypy_cache/
Loading
Loading