diff --git a/src/acquisition/covidcast/covidcast_meta_cache_updater.py b/src/acquisition/covidcast/covidcast_meta_cache_updater.py index e28fbeb10..8081ecfd4 100644 --- a/src/acquisition/covidcast/covidcast_meta_cache_updater.py +++ b/src/acquisition/covidcast/covidcast_meta_cache_updater.py @@ -3,17 +3,19 @@ # standard library import argparse import sys +import time # first party from delphi.epidata.acquisition.covidcast.database import Database +from delphi.epidata.acquisition.covidcast.logger import get_structured_logger from delphi.epidata.client.delphi_epidata import Epidata - def get_argument_parser(): """Define command line arguments.""" - # there are no flags, but --help will still work - return argparse.ArgumentParser() + parser = argparse.ArgumentParser() + parser.add_argument("--log_file", help="filename for log output") + return parser def main(args, epidata_impl=Epidata, database_impl=Database): @@ -21,12 +23,18 @@ def main(args, epidata_impl=Epidata, database_impl=Database): `args`: parsed command-line arguments """ + logger = get_structured_logger( + "metadata_cache_updater", + filename=args.log_file) + start_time = time.time() database = database_impl() database.connect() # fetch metadata try: + metadata_calculation_start_time = time.time() metadata = database.get_covidcast_meta() + metadata_calculation_interval_in_seconds = metadata_calculation_start_time - start_time except: # clean up before failing database.disconnect(True) @@ -44,13 +52,22 @@ def main(args, epidata_impl=Epidata, database_impl=Database): # update the cache try: + metadata_update_start_time = time.time() database.update_covidcast_meta_cache(metadata) + metadata_update_interval_in_seconds = time.time() - metadata_update_start_time print('successfully cached epidata') finally: # no catch block so that an exception above will cause the program to # fail after the following cleanup database.disconnect(True) + logger.info( + "Generated and updated covidcast metadata", + metadata_calculation_interval_in_seconds=round( + metadata_calculation_interval_in_seconds, 2), + metadata_update_interval_in_seconds=round( + metadata_update_interval_in_seconds, 2), + total_runtime_in_seconds=round(time.time() - start_time, 2)) return True diff --git a/src/acquisition/covidcast/logger.py b/src/acquisition/covidcast/logger.py new file mode 100644 index 000000000..62f2ff460 --- /dev/null +++ b/src/acquisition/covidcast/logger.py @@ -0,0 +1,92 @@ +"""Structured logger utility for creating JSON logs in Delphi pipelines.""" +import logging +import sys +import threading +import structlog + + +def handle_exceptions(logger): + """Handle exceptions using the provided logger.""" + def exception_handler(etype, value, traceback): + logger.exception("Top-level exception occurred", + exc_info=(etype, value, traceback)) + + def multithread_exception_handler(args): + exception_handler(args.exc_type, args.exc_value, args.exc_traceback) + + sys.excepthook = exception_handler + threading.excepthook = multithread_exception_handler + + +def get_structured_logger(name=__name__, + filename=None, + log_exceptions=True): + """Create a new structlog logger. + + Use the logger returned from this in indicator code using the standard + wrapper calls, e.g.: + + logger = get_structured_logger(__name__) + logger.warning("Error", type="Signal too low"). + + The output will be rendered as JSON which can easily be consumed by logs + processors. + + See the structlog documentation for details. + + Parameters + --------- + name: Name to use for logger (included in log lines), __name__ from caller + is a good choice. + filename: An (optional) file to write log output. + """ + # Configure the underlying logging configuration + handlers = [logging.StreamHandler()] + if filename: + handlers.append(logging.FileHandler(filename)) + + logging.basicConfig( + format="%(message)s", + level=logging.INFO, + handlers=handlers + ) + + # Configure structlog. This uses many of the standard suggestions from + # the structlog documentation. + structlog.configure( + processors=[ + # Filter out log levels we are not tracking. + structlog.stdlib.filter_by_level, + # Include logger name in output. + structlog.stdlib.add_logger_name, + # Include log level in output. + structlog.stdlib.add_log_level, + # Allow formatting into arguments e.g., logger.info("Hello, %s", + # name) + structlog.stdlib.PositionalArgumentsFormatter(), + # Add timestamps. + structlog.processors.TimeStamper(fmt="iso"), + # Match support for exception logging in the standard logger. + structlog.processors.StackInfoRenderer(), + structlog.processors.format_exc_info, + # Decode unicode characters + structlog.processors.UnicodeDecoder(), + # Render as JSON + structlog.processors.JSONRenderer() + ], + # Use a dict class for keeping track of data. + context_class=dict, + # Use a standard logger for the actual log call. + logger_factory=structlog.stdlib.LoggerFactory(), + # Use a standard wrapper class for utilities like log.warning() + wrapper_class=structlog.stdlib.BoundLogger, + # Cache the logger + cache_logger_on_first_use=True, + ) + + logger = structlog.get_logger(name) + + if log_exceptions: + handle_exceptions(logger) + + return logger diff --git a/tests/acquisition/covidcast/test_covidcast_meta_cache_updater.py b/tests/acquisition/covidcast/test_covidcast_meta_cache_updater.py index 8f43efea1..bb0c4f90d 100644 --- a/tests/acquisition/covidcast/test_covidcast_meta_cache_updater.py +++ b/tests/acquisition/covidcast/test_covidcast_meta_cache_updater.py @@ -34,7 +34,7 @@ def test_main_successful(self): 'epidata': [{'foo': 'bar'}], } - args = None + args = MagicMock(log_file="log") mock_epidata_impl = MagicMock() mock_epidata_impl.covidcast_meta.return_value = api_response mock_database = MagicMock() @@ -64,7 +64,7 @@ def test_main_failure(self): 'message': 'no', } - args = None + args = MagicMock(log_file="log") mock_database = MagicMock() mock_database.get_covidcast_meta.return_value = list() fake_database_impl = lambda: mock_database