diff --git a/integrations/acquisition/covidcast_nowcast/test_csv_uploading.py b/integrations/acquisition/covidcast_nowcast/test_csv_uploading.py index b401b3ae0..b92ed5065 100644 --- a/integrations/acquisition/covidcast_nowcast/test_csv_uploading.py +++ b/integrations/acquisition/covidcast_nowcast/test_csv_uploading.py @@ -72,8 +72,6 @@ def test_uploading(self): success_dir = '/common/covidcast_nowcast/archive/successful/src/' failed_dir = '/common/covidcast_nowcast/archive/failed/src/' os.makedirs(receiving_dir, exist_ok=True) - os.makedirs(success_dir, exist_ok=True) - os.makedirs(failed_dir, exist_ok=True) # valid with open(receiving_dir + '20200419_state_sig.csv', 'w') as f: @@ -118,3 +116,39 @@ def test_uploading(self): }], 'message': 'success', }) + + @patch('delphi.epidata.acquisition.covidcast_nowcast.load_sensors.CsvImporter.find_csv_files', + new=FIXED_ISSUE_IMPORTER) + def test_duplicate_row(self): + """Test duplicate unique keys are updated.""" + + # print full diff if something unexpected comes out + self.maxDiff=None + + receiving_dir = '/common/covidcast_nowcast/receiving/src/' + os.makedirs(receiving_dir, exist_ok=True) + + with open(receiving_dir + '20200419_state_sig.csv', 'w') as f: + f.write('sensor_name,geo_value,value\n') + f.write('testsensor,ca,1\n') + main() + with open(receiving_dir + '20200419_state_sig.csv', 'w') as f: + f.write('sensor_name,geo_value,value\n') + f.write('testsensor,ca,2\n') + main() + + # most most recent value is the one stored + response = Epidata.covidcast_nowcast( + 'src', 'sig', 'testsensor', 'day', 'state', 20200419, 'ca') + self.assertEqual(response, { + 'result': 1, + 'epidata': [{ + 'time_value': 20200419, + 'geo_value': 'ca', + 'value': 2, + 'issue': 20200421, + 'lag': 2, + 'signal': 'sig', + }], + 'message': 'success', + }) diff --git a/src/acquisition/covidcast_nowcast/load_sensors.py b/src/acquisition/covidcast_nowcast/load_sensors.py index 2c633d183..16c4ef2b0 100644 --- a/src/acquisition/covidcast_nowcast/load_sensors.py +++ b/src/acquisition/covidcast_nowcast/load_sensors.py @@ -1,4 +1,5 @@ from shutil import move +import os import time import delphi.operations.secrets as secrets @@ -7,6 +8,8 @@ from delphi.epidata.acquisition.covidcast.csv_importer import CsvImporter SENSOR_CSV_PATH = "/common/covidcast_nowcast/receiving/" +SUCCESS_DIR = "archive/successful" +FAIL_DIR = "archive/failed" TABLE_NAME = "covidcast_nowcast" DB_NAME = "epidata" CSV_DTYPES = {"sensor_name": str, "geo_value": str, "value": float} @@ -33,17 +36,20 @@ def main(csv_path: str = SENSOR_CSV_PATH) -> None: """ user, pw = secrets.db.epi engine = sqlalchemy.create_engine(f"mysql+pymysql://{user}:{pw}@{secrets.db.host}/{DB_NAME}") - for filepath, attributes in CsvImporter.find_csv_files(csv_path): - if attributes is None: - move(filepath, filepath.replace("receiving", "archive/failed")) + for filepath, attribute in CsvImporter.find_csv_files(csv_path): + if attribute is None: + _move_after_processing(filepath, success=False) continue try: - data = load_and_prepare_file(filepath, attributes) - data.to_sql(TABLE_NAME, engine, if_exists="append", index=False) + data = load_and_prepare_file(filepath, attribute) + conn = engine.connect() + with conn.begin(): + method = _create_upsert_method(sqlalchemy.MetaData(conn)) + data.to_sql(TABLE_NAME, engine, if_exists="append", method=method, index=False) except Exception: - move(filepath, filepath.replace("receiving", "archive/failed")) + _move_after_processing(filepath, success=False) raise - move(filepath, filepath.replace("receiving", "archive/successful")) + _move_after_processing(filepath, success=True) def load_and_prepare_file(filepath: str, attributes: tuple) -> pd.DataFrame: @@ -75,5 +81,23 @@ def load_and_prepare_file(filepath: str, attributes: tuple) -> pd.DataFrame: return data +def _move_after_processing(filepath, success): + archive_dir = SUCCESS_DIR if success else FAIL_DIR + new_dir = os.path.dirname(filepath).replace( + "receiving", archive_dir) + os.makedirs(new_dir, exist_ok=True) + move(filepath, filepath.replace("receiving", archive_dir)) + print(f"{filepath} moved to {archive_dir}") + + +def _create_upsert_method(meta): + def method(table, conn, keys, data_iter): + sql_table = sqlalchemy.Table(table.name, meta, autoload=True) + insert_stmt = sqlalchemy.dialects.mysql.insert(sql_table).values([dict(zip(keys, data)) for data in data_iter]) + upsert_stmt = insert_stmt.on_duplicate_key_update({x.name: x for x in insert_stmt.inserted}) + conn.execute(upsert_stmt) + return method + + if __name__ == "__main__": main()