From b73115c1eb652a30d49dde398be0f0d79b289fd2 Mon Sep 17 00:00:00 2001 From: Ben Smith Date: Wed, 3 Feb 2021 14:32:43 -0500 Subject: [PATCH 1/5] Add logging to metadata cache updater --- .../covidcast/covidcast_meta_cache_updater.py | 25 ++++++++++++++++--- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/src/acquisition/covidcast/covidcast_meta_cache_updater.py b/src/acquisition/covidcast/covidcast_meta_cache_updater.py index e28fbeb10..0117d51ff 100644 --- a/src/acquisition/covidcast/covidcast_meta_cache_updater.py +++ b/src/acquisition/covidcast/covidcast_meta_cache_updater.py @@ -3,17 +3,20 @@ # standard library import argparse import sys +import time # first party from delphi.epidata.acquisition.covidcast.database import Database +from delphi.epidata.acquisition.covidcast.logger import get_structured_logger from delphi.epidata.client.delphi_epidata import Epidata - +from delphi.operations import secrets def get_argument_parser(): """Define command line arguments.""" - # there are no flags, but --help will still work - return argparse.ArgumentParser() + parser = argparse.ArgumentParser() + parser.add_argument("--log_file", help="filename for log output") + return parser def main(args, epidata_impl=Epidata, database_impl=Database): @@ -21,12 +24,20 @@ def main(args, epidata_impl=Epidata, database_impl=Database): `args`: parsed command-line arguments """ + + secrets.db.host = 'delphi_database_epidata' + secrets.db.epi = ('user', 'pass') + + logger = get_structured_logger("metadata_cache_updater", filename = args.log_file) + start_time = time.time() database = database_impl() database.connect() # fetch metadata try: + metadata_calculation_start_time = time.time() metadata = database.get_covidcast_meta() + metadata_calculation_interval_in_seconds = metadata_calculation_start_time - start_time except: # clean up before failing database.disconnect(True) @@ -35,7 +46,7 @@ def main(args, epidata_impl=Epidata, database_impl=Database): args = ("success",1) if len(metadata)==0: args = ("no results",-2) - + print('covidcast_meta result: %s (code %d)' % args) if args[-1] != 1: @@ -44,13 +55,19 @@ def main(args, epidata_impl=Epidata, database_impl=Database): # update the cache try: + metadata_update_start_time = time.time() database.update_covidcast_meta_cache(metadata) + metadata_update_interval_in_seconds = time.time() - metadata_update_start_time print('successfully cached epidata') finally: # no catch block so that an exception above will cause the program to # fail after the following cleanup database.disconnect(True) + logger.info("Generated and updated covidcast metadata", + metadata_calculation_interval_in_seconds = round(metadata_calculation_interval_in_seconds, 2), + metadata_update_interval_in_seconds = round(metadata_update_interval_in_seconds, 2), + total_runtime_in_seconds = round(time.time() - start_time, 2)) return True From ebee6d442f87eec99f3c49702145ee6e3ebf2215 Mon Sep 17 00:00:00 2001 From: Ben Smith Date: Wed, 3 Feb 2021 14:35:38 -0500 Subject: [PATCH 2/5] Reflow --- .../covidcast/covidcast_meta_cache_updater.py | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/src/acquisition/covidcast/covidcast_meta_cache_updater.py b/src/acquisition/covidcast/covidcast_meta_cache_updater.py index 0117d51ff..fd76736f8 100644 --- a/src/acquisition/covidcast/covidcast_meta_cache_updater.py +++ b/src/acquisition/covidcast/covidcast_meta_cache_updater.py @@ -9,7 +9,6 @@ from delphi.epidata.acquisition.covidcast.database import Database from delphi.epidata.acquisition.covidcast.logger import get_structured_logger from delphi.epidata.client.delphi_epidata import Epidata -from delphi.operations import secrets def get_argument_parser(): """Define command line arguments.""" @@ -24,18 +23,16 @@ def main(args, epidata_impl=Epidata, database_impl=Database): `args`: parsed command-line arguments """ - - secrets.db.host = 'delphi_database_epidata' - secrets.db.epi = ('user', 'pass') - - logger = get_structured_logger("metadata_cache_updater", filename = args.log_file) + logger = get_structured_logger( + "metadata_cache_updater", + filename=args.log_file) start_time = time.time() database = database_impl() database.connect() # fetch metadata try: - metadata_calculation_start_time = time.time() + metadata_calculation_start_time = time.time() metadata = database.get_covidcast_meta() metadata_calculation_interval_in_seconds = metadata_calculation_start_time - start_time except: @@ -64,10 +61,13 @@ def main(args, epidata_impl=Epidata, database_impl=Database): # fail after the following cleanup database.disconnect(True) - logger.info("Generated and updated covidcast metadata", - metadata_calculation_interval_in_seconds = round(metadata_calculation_interval_in_seconds, 2), - metadata_update_interval_in_seconds = round(metadata_update_interval_in_seconds, 2), - total_runtime_in_seconds = round(time.time() - start_time, 2)) + logger.info( + "Generated and updated covidcast metadata", + metadata_calculation_interval_in_seconds=round( + metadata_calculation_interval_in_seconds, 2), + metadata_update_interval_in_seconds=round( + metadata_update_interval_in_seconds, 2), + total_runtime_in_seconds=round(time.time() - start_time, 2)) return True From 18cbae0746b1a6da347042821084c2121610d705 Mon Sep 17 00:00:00 2001 From: Ben Smith Date: Wed, 3 Feb 2021 14:36:37 -0500 Subject: [PATCH 3/5] remove whitespace --- src/acquisition/covidcast/covidcast_meta_cache_updater.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/acquisition/covidcast/covidcast_meta_cache_updater.py b/src/acquisition/covidcast/covidcast_meta_cache_updater.py index fd76736f8..8081ecfd4 100644 --- a/src/acquisition/covidcast/covidcast_meta_cache_updater.py +++ b/src/acquisition/covidcast/covidcast_meta_cache_updater.py @@ -43,7 +43,7 @@ def main(args, epidata_impl=Epidata, database_impl=Database): args = ("success",1) if len(metadata)==0: args = ("no results",-2) - + print('covidcast_meta result: %s (code %d)' % args) if args[-1] != 1: From 062c4bc134f77f9279d18774b954a06566f99c5a Mon Sep 17 00:00:00 2001 From: Ben Smith Date: Wed, 3 Feb 2021 14:37:34 -0500 Subject: [PATCH 4/5] Add logger --- src/acquisition/covidcast/logger.py | 92 +++++++++++++++++++++++++++++ 1 file changed, 92 insertions(+) create mode 100644 src/acquisition/covidcast/logger.py diff --git a/src/acquisition/covidcast/logger.py b/src/acquisition/covidcast/logger.py new file mode 100644 index 000000000..62f2ff460 --- /dev/null +++ b/src/acquisition/covidcast/logger.py @@ -0,0 +1,92 @@ +"""Structured logger utility for creating JSON logs in Delphi pipelines.""" +import logging +import sys +import threading +import structlog + + +def handle_exceptions(logger): + """Handle exceptions using the provided logger.""" + def exception_handler(etype, value, traceback): + logger.exception("Top-level exception occurred", + exc_info=(etype, value, traceback)) + + def multithread_exception_handler(args): + exception_handler(args.exc_type, args.exc_value, args.exc_traceback) + + sys.excepthook = exception_handler + threading.excepthook = multithread_exception_handler + + +def get_structured_logger(name=__name__, + filename=None, + log_exceptions=True): + """Create a new structlog logger. + + Use the logger returned from this in indicator code using the standard + wrapper calls, e.g.: + + logger = get_structured_logger(__name__) + logger.warning("Error", type="Signal too low"). + + The output will be rendered as JSON which can easily be consumed by logs + processors. + + See the structlog documentation for details. + + Parameters + --------- + name: Name to use for logger (included in log lines), __name__ from caller + is a good choice. + filename: An (optional) file to write log output. + """ + # Configure the underlying logging configuration + handlers = [logging.StreamHandler()] + if filename: + handlers.append(logging.FileHandler(filename)) + + logging.basicConfig( + format="%(message)s", + level=logging.INFO, + handlers=handlers + ) + + # Configure structlog. This uses many of the standard suggestions from + # the structlog documentation. + structlog.configure( + processors=[ + # Filter out log levels we are not tracking. + structlog.stdlib.filter_by_level, + # Include logger name in output. + structlog.stdlib.add_logger_name, + # Include log level in output. + structlog.stdlib.add_log_level, + # Allow formatting into arguments e.g., logger.info("Hello, %s", + # name) + structlog.stdlib.PositionalArgumentsFormatter(), + # Add timestamps. + structlog.processors.TimeStamper(fmt="iso"), + # Match support for exception logging in the standard logger. + structlog.processors.StackInfoRenderer(), + structlog.processors.format_exc_info, + # Decode unicode characters + structlog.processors.UnicodeDecoder(), + # Render as JSON + structlog.processors.JSONRenderer() + ], + # Use a dict class for keeping track of data. + context_class=dict, + # Use a standard logger for the actual log call. + logger_factory=structlog.stdlib.LoggerFactory(), + # Use a standard wrapper class for utilities like log.warning() + wrapper_class=structlog.stdlib.BoundLogger, + # Cache the logger + cache_logger_on_first_use=True, + ) + + logger = structlog.get_logger(name) + + if log_exceptions: + handle_exceptions(logger) + + return logger From 49e786bcb03b65e9d5370077e79eca8cfbb1c82b Mon Sep 17 00:00:00 2001 From: Ben Smith Date: Tue, 9 Feb 2021 13:57:07 -0500 Subject: [PATCH 5/5] Update test --- .../covidcast/test_covidcast_meta_cache_updater.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/acquisition/covidcast/test_covidcast_meta_cache_updater.py b/tests/acquisition/covidcast/test_covidcast_meta_cache_updater.py index 8f43efea1..bb0c4f90d 100644 --- a/tests/acquisition/covidcast/test_covidcast_meta_cache_updater.py +++ b/tests/acquisition/covidcast/test_covidcast_meta_cache_updater.py @@ -34,7 +34,7 @@ def test_main_successful(self): 'epidata': [{'foo': 'bar'}], } - args = None + args = MagicMock(log_file="log") mock_epidata_impl = MagicMock() mock_epidata_impl.covidcast_meta.return_value = api_response mock_database = MagicMock() @@ -64,7 +64,7 @@ def test_main_failure(self): 'message': 'no', } - args = None + args = MagicMock(log_file="log") mock_database = MagicMock() mock_database.get_covidcast_meta.return_value = list() fake_database_impl = lambda: mock_database