Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add better logging in the indicator runner #1892

Merged
merged 10 commits into from
Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion _delphi_utils_python/delphi_utils/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,13 @@ def run_indicator_pipeline(indicator_fn: Callable[[Params], None],
start = time.time()
while time.time()-start < timer:
if not t1.is_alive():
logger.info("Completed flash step",
elapsed_time_in_seconds = round(time.time() - start, 2))
break
time.sleep(10)
time.sleep(1)
else:
logger.error(f"Flash step timed out ({timer} s), terminating",
elapsed_time_in_seconds = round(time.time() - start, 2))
t1.terminate()
t1.join()
if validator:
Expand Down
19 changes: 14 additions & 5 deletions _delphi_utils_python/delphi_utils/validator/dynamic.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,9 @@ def check_min_allowed_max_date(self, max_date, geo_type, signal_type, report):
ValidationFailure("check_min_max_date",
geo_type=geo_type,
signal=signal_type,
melange396 marked this conversation as resolved.
Show resolved Hide resolved
message="date of most recent generated file seems too long ago"))
date=max_date,
message="date of most recent generated file seems too long ago "
f"({max_date} < {self.params.generation_date} - {min_thres})"))

report.increment_total_checks()

Expand All @@ -263,7 +265,9 @@ def check_max_allowed_max_date(self, max_date, geo_type, signal_type, report):
ValidationFailure("check_max_max_date",
geo_type=geo_type,
signal=signal_type,
melange396 marked this conversation as resolved.
Show resolved Hide resolved
message="date of most recent generated file seems too recent"))
date=max_date,
message="date of most recent generated file seems too recent "
f"({max_date} > {self.params.generation_date} - {max_thres})"))

report.increment_total_checks()

Expand Down Expand Up @@ -307,7 +311,9 @@ def create_dfs(self, geo_sig_df, api_df_or_error, checking_date, geo_type, signa
signal_type,
"test data for a given checking date-geo type-signal type"
" combination is missing. Source data may be missing"
" for one or more dates"))
" for one or more dates "
f"({checking_date} < {self.params.generation_date} "
f"- {min_thres})"))
return False

# Reference dataframe runs backwards from the recent_cutoff_date
Expand Down Expand Up @@ -418,7 +424,9 @@ def check_max_date_vs_reference(self, df_to_test, df_to_reference, checking_date
checking_date,
geo_type,
signal_type,
"reference df has days beyond the max date in the =df_to_test="))
"reference df has days beyond the max date in the =df_to_test= "
f"{df_to_test['time_value'].max()} < "
f"{df_to_reference['time_value'].max().date()}"))

report.increment_total_checks()

Expand Down Expand Up @@ -459,7 +467,8 @@ def check_rapid_change_num_rows(self, df_to_test, df_to_reference, checking_date
geo_type,
signal_type,
"Number of rows per day seems to have changed rapidly (reference "
"vs test data)"))
"vs test data); "
f"relative difference: {abs(compare_rows)} > 0.35"))
report.increment_total_checks()

def check_positive_negative_spikes(self, source_df, api_frames, geo, sig, report):
Expand Down
13 changes: 11 additions & 2 deletions _delphi_utils_python/delphi_utils/validator/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ def __init__(self, errors_to_suppress: List[ValidationFailure],
Warnings raised from validation execution
unsuppressed_errors: List[Exception]
Errors raised from validation failures not found in `self.errors_to_suppress`
elapsed_time_in_seconds: float
Elapsed time of validation run, rounded down
"""
self.errors_to_suppress = errors_to_suppress
self.data_source = data_source
Expand All @@ -44,6 +46,7 @@ def __init__(self, errors_to_suppress: List[ValidationFailure],
self.raised_warnings = []
self.unsuppressed_errors = []
self.dry_run = dry_run
self.elapsed_time_in_seconds = -1
# pylint: enable=R0902

def add_raised_error(self, error):
Expand All @@ -68,6 +71,10 @@ def increment_total_checks(self):
"""Record a check."""
self.total_checks += 1

def set_elapsed_time_in_seconds(self, time):
"""Set elapsed runtime in seconds for later logging."""
self.elapsed_time_in_seconds = time

def add_raised_warning(self, warning):
"""Add a warning to the report.

Expand All @@ -94,15 +101,17 @@ def log(self, logger=None):
checks_failed = len(self.unsuppressed_errors),
checks_suppressed = self.num_suppressed,
warnings = len(self.raised_warnings),
phase = "validation")
phase = "validation",
elapsed_time_in_seconds=self.elapsed_time_in_seconds)
else:
logger.info("Validation run unsuccessful",
data_source = self.data_source,
checks_run = self.total_checks,
checks_failed = len(self.unsuppressed_errors),
checks_suppressed = self.num_suppressed,
warnings = len(self.raised_warnings),
phase="validation")
phase="validation",
elapsed_time_in_seconds=self.elapsed_time_in_seconds)
# Threshold for slack alerts if warnings are excessive,
# Currently extremely strict, set by observation of 1 month's logs
excessive_warnings = self.total_checks > 0 and \
Expand Down
30 changes: 19 additions & 11 deletions _delphi_utils_python/delphi_utils/validator/static.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,17 +178,17 @@ def check_bad_geo_id_value(self, df_to_test, filename, geo_type, report):
- report: ValidationReport; report where results are added
"""
valid_geos = self._get_valid_geo_values(geo_type)
unexpected_geos = [geo for geo in df_to_test['geo_id']
if geo.lower() not in valid_geos]
unexpected_geos = {geo for geo in df_to_test['geo_id']
if geo.lower() not in valid_geos}
if len(unexpected_geos) > 0:
report.add_raised_error(
ValidationFailure(
"check_bad_geo_id_value",
filename=filename,
message=f"Unrecognized geo_ids (not in historical data) {unexpected_geos}"))
report.increment_total_checks()
upper_case_geos = [
geo for geo in df_to_test['geo_id'] if geo.lower() != geo]
upper_case_geos = {
geo for geo in df_to_test['geo_id'] if geo.lower() != geo}
if len(upper_case_geos) > 0:
report.add_raised_warning(
ValidationFailure(
Expand Down Expand Up @@ -218,8 +218,8 @@ def find_all_unexpected_geo_ids(df_to_test, geo_regex, geo_type):
if geo_type in numeric_geo_types:
# Check if geo_ids were stored as floats (contain decimal point) and
# contents before decimal match the specified regex pattern.
leftover = [geo[1] for geo in df_to_test["geo_id"].str.split(
".") if len(geo) > 1 and re.match(geo_regex, geo[0])]
leftover = {geo[1] for geo in df_to_test["geo_id"].str.split(
".") if len(geo) > 1 and re.match(geo_regex, geo[0])}

# If any floats found, remove decimal and anything after.
if len(leftover) > 0:
Expand All @@ -230,7 +230,7 @@ def find_all_unexpected_geo_ids(df_to_test, geo_regex, geo_type):
ValidationFailure(
"check_geo_id_type",
filename=nameformat,
message="geo_ids saved as floats; strings preferred"))
message=f"geo_ids saved as floats; strings preferred: {leftover}"))

if geo_type in fill_len.keys():
# Left-pad with zeroes up to expected length. Fixes missing leading zeroes
Expand Down Expand Up @@ -281,29 +281,35 @@ def check_bad_val(self, df_to_test, nameformat, signal_type, report):

if percent_option:
if not df_to_test[(df_to_test['val'] > 100)].empty:
bad_values = df_to_test[(df_to_test['val'] > 100)]['val'].unique()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice touch with the .unique() filter! should we do that for all the messages that print lists of values?

report.add_raised_error(
ValidationFailure(
"check_val_pct_gt_100",
filename=nameformat,
message="val column can't have any cell greater than 100 for percents"))
message="val column can't have any cell greater than 100 for percents; "
f"invalid values: {bad_values}"))

report.increment_total_checks()

if proportion_option:
if not df_to_test[(df_to_test['val'] > 100000)].empty:
bad_values = df_to_test[(df_to_test['val'] > 100000)]['val'].unique()
report.add_raised_error(
ValidationFailure("check_val_prop_gt_100k",
filename=nameformat,
message="val column can't have any cell greater than 100000 "
"for proportions"))
"for proportions; "
f"invalid values: {bad_values}"))

report.increment_total_checks()

if not df_to_test[(df_to_test['val'] < 0)].empty:
bad_values = df_to_test[(df_to_test['val'] < 0)]['val'].unique()
report.add_raised_error(
ValidationFailure("check_val_lt_0",
filename=nameformat,
message="val column can't have any cell smaller than 0"))
message="val column can't have any cell smaller than 0; "
f"invalid values: {bad_values}"))

report.increment_total_checks()

Expand Down Expand Up @@ -346,10 +352,12 @@ def check_bad_se(self, df_to_test, nameformat, report):
report.increment_total_checks()

if df_to_test["se"].isnull().mean() > 0.5:
bad_mean = round(df_to_test["se"].isnull().mean() * 100, 2)
report.add_raised_error(
ValidationFailure("check_se_many_missing",
filename=nameformat,
message='Recent se values are >50% NA'))
message='Recent se values are >50% NA: '
f'{bad_mean}%'))

report.increment_total_checks()

Expand Down
3 changes: 3 additions & 0 deletions _delphi_utils_python/delphi_utils/validator/validate.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
"""Tools to validate CSV source data, including various check methods."""
import time
from .datafetcher import load_all_files
from .dynamic import DynamicValidator
from .errors import ValidationFailure
Expand Down Expand Up @@ -54,11 +55,13 @@ def validate(self):
Returns:
- ValidationReport collating the validation outcomes
"""
start_time = time.time()
report = ValidationReport(self.suppressed_errors, self.data_source, self.dry_run)
frames_list = load_all_files(self.export_dir, self.time_window.start_date,
self.time_window.end_date)
self.static_validation.validate(frames_list, report)
# Dynamic Validation only performed when frames_list is populated
if len(frames_list) > 0:
self.dynamic_validation.validate(aggregate_frames(frames_list), report)
report.set_elapsed_time_in_seconds(round(time.time() - start_time, 2))
return report
9 changes: 8 additions & 1 deletion quidel_covidtest/delphi_quidel_covidtest/pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"""Collect and process Quidel export files."""
from os.path import join
import os
import time
from datetime import datetime, timedelta
import boto3

Expand Down Expand Up @@ -369,7 +370,7 @@ def check_export_start_date(export_start_date, export_end_date,
return datetime(2020, 5, 26)
return export_start_date

def update_cache_file(df, _end_date, cache_dir):
def update_cache_file(df, _end_date, cache_dir, logger):
"""
Update cache file. Remove the old one, export the new one.

Expand All @@ -380,8 +381,14 @@ def update_cache_file(df, _end_date, cache_dir):
The most recent date when the raw data is received
cache_dir:
./cache where the cache file is stored
logger: logging.Logger
Structured logger.
"""
start_time = time.time()
for fn in os.listdir(cache_dir):
if ".csv" in fn:
os.remove(join(cache_dir, fn))
df.to_csv(join(cache_dir, "pulled_until_%s.csv") % _end_date.strftime("%Y%m%d"), index=False)
logger.info("Completed cache file update",
end_date = _end_date.strftime('%Y-%m-%d'),
elapsed_time_in_seconds = round(time.time() - start_time, 2))
6 changes: 5 additions & 1 deletion quidel_covidtest/delphi_quidel_covidtest/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ def run_module(params: Dict[str, Any]):
__name__, filename=params["common"].get("log_filename"),
log_exceptions=params["common"].get("log_exceptions", True))
stats = []
# Log at program exit in case of an exception, otherwise after successful completion
atexit.register(log_exit, start_time, stats, logger)
cache_dir = params["indicator"]["input_cache_dir"]
export_dir = params["common"]["export_dir"]
Expand Down Expand Up @@ -223,4 +224,7 @@ def run_module(params: Dict[str, Any]):

# Export the cache file if the pipeline runs successfully.
# Otherwise, don't update the cache file
update_cache_file(df, _end_date, cache_dir)
update_cache_file(df, _end_date, cache_dir, logger)
# Log stats now instead of at program exit
atexit.unregister(log_exit)
log_exit(start_time, stats, logger)
Comment on lines +228 to +230
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

Loading