Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 36 additions & 2 deletions integrations/acquisition/covidcast_nowcast/test_csv_uploading.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ def test_uploading(self):
success_dir = '/common/covidcast_nowcast/archive/successful/src/'
failed_dir = '/common/covidcast_nowcast/archive/failed/src/'
os.makedirs(receiving_dir, exist_ok=True)
os.makedirs(success_dir, exist_ok=True)
os.makedirs(failed_dir, exist_ok=True)

# valid
with open(receiving_dir + '20200419_state_sig.csv', 'w') as f:
Expand Down Expand Up @@ -118,3 +116,39 @@ def test_uploading(self):
}],
'message': 'success',
})

@patch('delphi.epidata.acquisition.covidcast_nowcast.load_sensors.CsvImporter.find_csv_files',
new=FIXED_ISSUE_IMPORTER)
def test_duplicate_row(self):
"""Test duplicate unique keys are updated."""

# print full diff if something unexpected comes out
self.maxDiff=None

receiving_dir = '/common/covidcast_nowcast/receiving/src/'
os.makedirs(receiving_dir, exist_ok=True)

with open(receiving_dir + '20200419_state_sig.csv', 'w') as f:
f.write('sensor_name,geo_value,value\n')
f.write('testsensor,ca,1\n')
main()
with open(receiving_dir + '20200419_state_sig.csv', 'w') as f:
f.write('sensor_name,geo_value,value\n')
f.write('testsensor,ca,2\n')
main()

# most most recent value is the one stored
response = Epidata.covidcast_nowcast(
'src', 'sig', 'testsensor', 'day', 'state', 20200419, 'ca')
self.assertEqual(response, {
'result': 1,
'epidata': [{
'time_value': 20200419,
'geo_value': 'ca',
'value': 2,
'issue': 20200421,
'lag': 2,
'signal': 'sig',
}],
'message': 'success',
})
38 changes: 31 additions & 7 deletions src/acquisition/covidcast_nowcast/load_sensors.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from shutil import move
import os
import time

import delphi.operations.secrets as secrets
Expand All @@ -7,6 +8,8 @@
from delphi.epidata.acquisition.covidcast.csv_importer import CsvImporter

SENSOR_CSV_PATH = "/common/covidcast_nowcast/receiving/"
SUCCESS_DIR = "archive/successful"
FAIL_DIR = "archive/failed"
TABLE_NAME = "covidcast_nowcast"
DB_NAME = "epidata"
CSV_DTYPES = {"sensor_name": str, "geo_value": str, "value": float}
Expand All @@ -33,17 +36,20 @@ def main(csv_path: str = SENSOR_CSV_PATH) -> None:
"""
user, pw = secrets.db.epi
engine = sqlalchemy.create_engine(f"mysql+pymysql://{user}:{pw}@{secrets.db.host}/{DB_NAME}")
for filepath, attributes in CsvImporter.find_csv_files(csv_path):
if attributes is None:
move(filepath, filepath.replace("receiving", "archive/failed"))
for filepath, attribute in CsvImporter.find_csv_files(csv_path):
if attribute is None:
_move_after_processing(filepath, success=False)
continue
try:
data = load_and_prepare_file(filepath, attributes)
data.to_sql(TABLE_NAME, engine, if_exists="append", index=False)
data = load_and_prepare_file(filepath, attribute)
conn = engine.connect()
with conn.begin():
method = _create_upsert_method(sqlalchemy.MetaData(conn))
data.to_sql(TABLE_NAME, engine, if_exists="append", method=method, index=False)
except Exception:
move(filepath, filepath.replace("receiving", "archive/failed"))
_move_after_processing(filepath, success=False)
raise
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need a finally block to do any connection cleanup with the database? I know the mysql.connect library requires something like

but not sure if sqlalchemy needs something similar

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i wondered that too when i first implemented it, since ive had to cloes sqlalchemy connections before, but the to_sql method can take an engine and not a connection. Based on this answer, looks like it isn't needed. The connection used for the connection metadata is in a context manager so i think thats ok as well

move(filepath, filepath.replace("receiving", "archive/successful"))
_move_after_processing(filepath, success=True)


def load_and_prepare_file(filepath: str, attributes: tuple) -> pd.DataFrame:
Expand Down Expand Up @@ -75,5 +81,23 @@ def load_and_prepare_file(filepath: str, attributes: tuple) -> pd.DataFrame:
return data


def _move_after_processing(filepath, success):
archive_dir = SUCCESS_DIR if success else FAIL_DIR
new_dir = os.path.dirname(filepath).replace(
"receiving", archive_dir)
os.makedirs(new_dir, exist_ok=True)
move(filepath, filepath.replace("receiving", archive_dir))
print(f"{filepath} moved to {archive_dir}")


def _create_upsert_method(meta):
def method(table, conn, keys, data_iter):
sql_table = sqlalchemy.Table(table.name, meta, autoload=True)
insert_stmt = sqlalchemy.dialects.mysql.insert(sql_table).values([dict(zip(keys, data)) for data in data_iter])
upsert_stmt = insert_stmt.on_duplicate_key_update({x.name: x for x in insert_stmt.inserted})
conn.execute(upsert_stmt)
return method


if __name__ == "__main__":
main()