Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
c854b5b
Update Ansible
korlaxxalrok Dec 8, 2021
c3227c4
Add `hosp_claims` automation rig
korlaxxalrok Dec 8, 2021
9d30a69
Merge branch 'main' into productionize-claims_hosp
korlaxxalrok Dec 9, 2021
17c35f7
Merge branch 'main' into productionize-claims_hosp
korlaxxalrok Dec 9, 2021
095f792
Re-add new secrets after fixing merge conflict with main
korlaxxalrok Dec 9, 2021
b70e0bb
Register template and template-secrets in case we have both
korlaxxalrok Dec 9, 2021
ab1c2b8
Update claims_hosp/HospClaims/automate/update_json.py
korlaxxalrok Dec 10, 2021
4193038
Remove unused lines
korlaxxalrok May 3, 2022
17b85d5
Update claims_hosp/HospClaims/automate/update_json.py
korlaxxalrok May 3, 2022
a631d77
In light of #1419 being merged, we can change to 70 dates
korlaxxalrok May 3, 2022
d6dff50
Merge branch 'productionize-claims_hosp' of github.com:cmu-delphi/cov…
korlaxxalrok May 3, 2022
126f6e1
Remove unused line
korlaxxalrok May 3, 2022
d9b087f
add scripts and modification required for pulling data
Jun 6, 2022
0e59cd1
rm downloaded files at the end
Jun 6, 2022
c927e00
add logger info for it
Jun 6, 2022
c69cc2f
fixed errors in pulling
Jun 6, 2022
40411f8
add unit tests
Jun 6, 2022
b753b7e
change the path to test.log
Jun 7, 2022
edf8e3f
fixed an error
Jun 7, 2022
2d4400b
change the format of geo_id at hrr level from float to str
Jun 7, 2022
a7f9146
fix hrr geo id type
Jun 7, 2022
a2fcfcb
add unit tests
Jun 7, 2022
40fa62d
change back. Geo ids for hrr are still float numbers
Jun 7, 2022
78e3c4f
fix linting
Jun 7, 2022
93fdafa
remove unused code
Jun 9, 2022
42d7217
Use os.remove to clean raw files
jingjtang Jun 14, 2022
a011ffa
Use mock for the logger in unit tests
jingjtang Jun 14, 2022
06875d7
include filename with errors
Jun 14, 2022
cdd0583
delete commented-out code
Jun 14, 2022
4f86e1e
Update logger info with variables
jingjtang Jun 14, 2022
747d580
Remove continue but check file links with .csv.gz
Jun 14, 2022
c0482c3
fix the error in switching to Mock in unittest.mock
Jun 14, 2022
ce1f45e
remove unused variables
Jun 14, 2022
2546cea
remove unused messages
Jun 14, 2022
7adf66f
add too-few-public-methods back to message control for linting
Jun 14, 2022
0c39be0
add logger info for files to download
Jun 15, 2022
c77c811
Update logger info for the latest claims file
jingjtang Jun 15, 2022
0be164c
update the function for renmaing the raw drops
Jun 15, 2022
2c9b21d
remove agg function
Jun 15, 2022
080ad45
update import info for the modification function
Jun 15, 2022
819b043
add j2 file for claims_hosp params
Jun 15, 2022
cfb682c
Manually merge vault changes
krivard Jun 15, 2022
a8c2a6e
Merge branch 'main' into productionize-claims_hosp
krivard Jun 15, 2022
6f4976c
Update logger info
jingjtang Jun 16, 2022
1c83ffd
add min_max_dates, csv_export_count, etc to logger info
Jun 16, 2022
50c93fb
update the unit test for modification of the raw files
Jun 17, 2022
300397f
fix the error in the unit test
Jun 17, 2022
76b4db0
rename the parameter for test_mode check
Jun 21, 2022
aef9511
Remove try/except from get_timestamp by converting to re
krivard Jun 22, 2022
170f2e3
Update expected file count for inpatient-only
krivard Jun 24, 2022
d1ffb14
Merge pull request #1644 from cmu-delphi/krivard/pch-no-general-except
krivard Jun 24, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions ansible/ansible-deploy-staging.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
local_action: stat path="templates/{{ indicator }}-params-prod.json.j2"
register: template

- name: Check to see if we have a secrets template to send.
local_action: stat path="templates/{{ indicator }}-secrets-prod.py.j2"
register: template-secrets

- name: Set production params file.
copy:
src: files/{{ indicator }}-params-prod.json
Expand All @@ -42,3 +46,11 @@
owner: "{{ runtime_user }}"
group: "{{ runtime_user }}"
when: template.stat.exists

- name: Set production secrets template.
template:
src: templates/{{ indicator }}-secrets-prod.py.j2
dest: "{{ indicators_runtime_dir }}/{{ indicator }}/secrets.py"
owner: "{{ runtime_user }}"
group: "{{ runtime_user }}"
when: template-secrets.stat.exists
12 changes: 12 additions & 0 deletions ansible/ansible-deploy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
local_action: stat path="templates/{{ indicator }}-params-prod.json.j2"
register: template

- name: Check to see if we have a secrets template to send.
local_action: stat path="templates/{{ indicator }}-secrets-prod.py.j2"
register: template-secrets

- name: Set production params file.
copy:
src: files/{{ indicator }}-params-prod.json
Expand All @@ -42,3 +46,11 @@
owner: "{{ runtime_user }}"
group: "{{ runtime_user }}"
when: template.stat.exists

- name: Set production secrets template.
template:
src: templates/{{ indicator }}-secrets-prod.py.j2
dest: "{{ indicators_runtime_dir }}/{{ indicator }}/secrets.py"
owner: "{{ runtime_user }}"
group: "{{ runtime_user }}"
when: template-secrets.stat.exists
43 changes: 43 additions & 0 deletions ansible/templates/claims_hosp-params-prod.py.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
{
"common": {
"export_dir": "./receiving",
"log_exceptions": false
},
"indicator": {
"input_dir": "./retrieve_files",
"start_date": "2020-02-01",
"end_date": null,
"drop_date": null,
"n_backfill_days": 70,
"n_waiting_days": 3,
"write_se": false,
"obfuscated_prefix": "foo_obfuscated",
"parallel": false,
"geos": ["state", "msa", "hrr", "county"],
"weekday": [true, false],
"ftp_credentials": {
"host": "{{ claims_hosp_ftp_host }}",
"user": "{{ claims_hosp_ftp_user }}",
"pass": "{{ claims_hosp_ftp_password }}",
"port": 2222
}
},
"validation": {
"common": {
"data_source": "hospital-admissions",
"span_length": 14,
"min_expected_lag": {"all": "3"},
"max_expected_lag": {"all": "4"},
"dry_run": true,
"suppressed_errors": []
},
"static": {
"minimum_sample_size": 5,
"missing_se_allowed": true,
"missing_sample_size_allowed": true
},
"dynamic": {
"ref_window_size": 7
}
}
}
11 changes: 11 additions & 0 deletions ansible/templates/claims_hosp-secrets-prod.py.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
class claims:
HOST = 'ftp.delphi.cmu.edu'
USER = {{ claims_hosp_ftp_user }}
PASS = {{ claims_hosp_ftp_password }}
PORT = 2222


class covidcast:
HOST = "delphi.midas.cs.cmu.edu"
USER = {{ claims_hosp_midas_user }}
PASS = {{ claims_hosp_midas_password }}
5 changes: 5 additions & 0 deletions ansible/vars.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ changehc_sftp_host: "{{ vault_changehc_sftp_host }}"
changehc_sftp_port: "{{ vault_changehc_sftp_port }}"
changehc_sftp_user: "{{ vault_changehc_sftp_user }}"
changehc_sftp_password: "{{ vault_changehc_sftp_password }}"
# claims_hosp
claims_hosp_ftp_user: "{{ vault_claims_hosp_ftp_user }}"
claims_hosp_ftp_password: "{{ vault_claims_hosp_ftp_password }}"
claims_hosp_midas_user: "{{ vault_claims_hosp_midas_user }}"
claims_hosp_midas_password: "{{ vault_claims_hosp_midas_password }}"
# NCHS
nchs_mortality_token: "{{ vault_nchs_mortality_token }}"
# SirCAL
Expand Down
464 changes: 237 additions & 227 deletions ansible/vault.yaml

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion claims_hosp/.pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ disable=logging-format-interpolation,
# Allow pytest functions to be part of a class.
no-self-use,
# Allow pytest classes to have one test.
too-few-public-methods
too-few-public-methods,


[BASIC]

Expand Down
93 changes: 93 additions & 0 deletions claims_hosp/delphi_claims_hosp/download_claims_ftp_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
#!/usr/bin/env python3
"""Downloads files modified in the last 24 hours from the delphi ftp server."""

# standard
import datetime
import functools
from os import path
import re

# third party
import paramiko


class AllowAnythingPolicy(paramiko.MissingHostKeyPolicy):
"""Class for missing host key policy."""

def missing_host_key(self, client, hostname, key):
"""Check missing host key."""
return


def print_callback(filename, logger, bytes_so_far, bytes_total):
"""Print the callback information."""
rough_percent_transferred = int(100 * (bytes_so_far / bytes_total))
if (rough_percent_transferred % 25) == 0:
logger.info("Transfer in progress", filename=filename, percent=rough_percent_transferred)

FILENAME_TIMESTAMP = re.compile(r".*EDI_AGG_INPATIENT_(?P<ymd>[0-9]*)_(?P<hm>[0-9]*)[^0-9]*")
def get_timestamp(name):
"""Get the reference date in datetime format."""
m = FILENAME_TIMESTAMP.match(name)
if not m:
return datetime.datetime(1900, 1, 1)
return datetime.datetime.strptime(''.join(m.groups()), "%Y%m%d%H%M")

def change_date_format(name):
"""Flip date from YYYYMMDD to MMDDYYYY."""
split_name = name.split("_")
date = split_name[3]
flip_date = date[6:] + date[4:6] + date[:4]
split_name[3] = flip_date
name = '_'.join(split_name)
return name


def download(ftp_credentials, out_path, logger):
"""Pull the latest raw files."""
current_time = datetime.datetime.now()
seconds_in_day = 24 * 60 * 60
logger.info("starting download", time=current_time)

# open client
client = paramiko.SSHClient()
client.set_missing_host_key_policy(AllowAnythingPolicy())

client.connect(ftp_credentials["host"],
username=ftp_credentials["user"],
password=ftp_credentials["pass"],
port=ftp_credentials["port"])
sftp = client.open_sftp()
sftp.chdir('/hosp/receiving')


# go through files in recieving dir
files_to_download = []
for fileattr in sftp.listdir_attr():
file_time = get_timestamp(fileattr.filename)
time_diff_to_current_time = current_time - file_time
if 0 < time_diff_to_current_time.total_seconds() <= seconds_in_day:
files_to_download.append(fileattr.filename)
logger.info("File to download", filename=fileattr.filename)

# make sure we don't download more than the 1 chunk (2x a day) drops for IP - 01/07/21,
# *2 for multiple day drops
assert len(files_to_download) <= 2 * (2), \
f"more files dropped ({len(files_to_download)}) than expected (4)"

filepaths_to_download = {}
for file in files_to_download:
flipped_file = change_date_format(file)
if "INPATIENT" in file:
full_path = path.join(out_path, flipped_file)
if path.exists(full_path):
logger.info("Skip the existing file", filename=flipped_file)
else:
filepaths_to_download[file] = full_path

# download!
for infile, outfile in filepaths_to_download.items():
callback_for_filename = functools.partial(print_callback, infile, logger)
sftp.get(infile, outfile, callback=callback_for_filename)

client.close()
32 changes: 32 additions & 0 deletions claims_hosp/delphi_claims_hosp/get_latest_claims_name.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#!/usr/bin/env python3
"""Return the latest drop."""

# standard
import datetime
from pathlib import Path

def get_latest_filename(dir_path, logger):
"""Get the latest filename from the list of downloaded raw files."""
current_date = datetime.datetime.now()
files = list(Path(dir_path).glob("*"))

latest_timestamp = datetime.datetime(1900, 1, 1)
latest_filename = None
for file in files:
split_name = file.name.split("_")
if len(split_name) == 5:
ddmmyyyy = split_name[3]
hhmm = ''.join(filter(str.isdigit, split_name[4]))
timestamp = datetime.datetime.strptime(''.join([ddmmyyyy, hhmm]),
"%d%m%Y%H%M")
if timestamp > latest_timestamp:
if timestamp <= current_date:
latest_timestamp = timestamp
latest_filename = file

assert current_date.date() == latest_timestamp.date(), "no drop for today"

logger.info("Latest claims file", filename=latest_filename)

# return for other uses
return latest_filename
61 changes: 61 additions & 0 deletions claims_hosp/delphi_claims_hosp/modify_claims_drops.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#!/usr/bin/env python3

"""Modify the drops.

Drops are expected to be numbered as:

../EDI_AGG_INPATIENT/EDI_AGG_INPATIENT_1_07052020_1456.csv.gz
../EDI_AGG_INPATIENT/EDI_AGG_INPATIENT_2_07052020_1456.csv.gz
... etc.
"""

# standard
from pathlib import Path

# third party
import numpy as np
import pandas as pd


def modify_and_write(data_path, logger, test_mode=False):
"""
Modify drops given a folder path.

Will rename necessary columns in the input files, and check the number of
columns and duplications.

Args:
data_path: path to the folder with duplicated drops.
test_mode: Don't overwrite the drops if test_mode==True

"""
files = np.array(list(Path(data_path).glob("*.csv.gz")))
dfs_list = []
for f in files:
filename = str(f)
out_path = f.parents[0] / f.name
dfs = pd.read_csv(f, dtype={"PatCountyFIPS": str,
"patCountyFIPS": str})
if "servicedate" in dfs.columns:
dfs.rename(columns={"servicedate": "ServiceDate"}, inplace=True)
if "patCountyFIPS" in dfs.columns:
dfs.rename(columns={"patCountyFIPS": "PatCountyFIPS"}, inplace=True)
if "patHRRname" in dfs.columns:
dfs.rename(columns={"patHRRname": "Pat HRR Name"}, inplace=True)
if "patAgeGroup" in dfs.columns:
dfs.rename(columns={"patAgeGroup": "PatAgeGroup"}, inplace=True)
if "patHRRid" in dfs.columns:
dfs.rename(columns={"patHRRid": "Pat HRR ID"}, inplace=True)

assert np.sum(
dfs.duplicated(subset=["ServiceDate", "PatCountyFIPS",
"Pat HRR Name", "PatAgeGroup"])) == 0, \
f'Duplication across drops in {filename}!'
assert dfs.shape[1] == 10, f'Wrong number of columns in {filename}'

if test_mode:
dfs_list.append(dfs)
else:
dfs.to_csv(out_path, index=False)
logger.info(f"Wrote {out_path}")
return files, dfs_list
Loading