Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add better logging in the indicator runner #1892

Merged
merged 10 commits into from
Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions _delphi_utils_python/delphi_utils/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def run_indicator_pipeline(indicator_fn: Callable[[Params], None],
validator = validator_fn(params)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The archiver already logs run timing, and each indicator has its own bit of logging which includes that; this adds logging for the flash & validation steps.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the individual indicators' run time logging looks mostly sufficient, except in quidel_covidtest where it happens at program exit, which will include anything that runs after the core indicator function and thus lead to inaccuracy.

you can refactor this so the runner does the timing and logging, with indicator_fn (aka each indicator's run_module()) returning a dict of metrics it wants logged (like csv_export_count and max_lag_in_days)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I can actually think of one disadvantage of that - it means that the summary line will be gone from the logs if the indicator is ran individually (e.g. env/bin/python -m delphi_quidel_covidtest as the README suggests).
Are we OK with this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a lot of the documentation in this repo is is quite old and should be brought up to date.

is it ever desirable to run an indicator without validation and archiving? perhaps we can answer that in #1895. you should at least fix the timing for the quidel indicator in the meanwhile.

archiver = archiver_fn(params)

start_time = time.time()
melange396 marked this conversation as resolved.
Show resolved Hide resolved
t1 = multiprocessing.Process(target=flash_fn, args=[params])
t1.start()
start = time.time()
Expand All @@ -77,6 +78,10 @@ def run_indicator_pipeline(indicator_fn: Callable[[Params], None],
else:
t1.terminate()
t1.join()
elapsed_time_in_seconds = round(time.time() - start_time, 2)
logger.info("Completed flash step",
melange396 marked this conversation as resolved.
Show resolved Hide resolved
elapsed_time_in_seconds = elapsed_time_in_seconds)

if validator:
validation_report = validator.validate()
validation_report.log(logger)
Expand Down
17 changes: 12 additions & 5 deletions _delphi_utils_python/delphi_utils/validator/dynamic.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,8 @@ def check_min_allowed_max_date(self, max_date, geo_type, signal_type, report):
ValidationFailure("check_min_max_date",
geo_type=geo_type,
signal=signal_type,
melange396 marked this conversation as resolved.
Show resolved Hide resolved
message="date of most recent generated file seems too long ago"))
message="date of most recent generated file seems too long ago "
f"({max_date} < {self.params.generation_date} - {min_thres})"))

report.increment_total_checks()

Expand All @@ -263,7 +264,8 @@ def check_max_allowed_max_date(self, max_date, geo_type, signal_type, report):
ValidationFailure("check_max_max_date",
geo_type=geo_type,
signal=signal_type,
melange396 marked this conversation as resolved.
Show resolved Hide resolved
message="date of most recent generated file seems too recent"))
message="date of most recent generated file seems too recent "
f"({max_date} > {self.params.generation_date} - {max_thres})"))

report.increment_total_checks()

Expand Down Expand Up @@ -307,7 +309,9 @@ def create_dfs(self, geo_sig_df, api_df_or_error, checking_date, geo_type, signa
signal_type,
"test data for a given checking date-geo type-signal type"
" combination is missing. Source data may be missing"
" for one or more dates"))
" for one or more dates "
f"({checking_date} < {self.params.generation_date} "
f"- {min_thres})"))
return False

# Reference dataframe runs backwards from the recent_cutoff_date
Expand Down Expand Up @@ -418,7 +422,9 @@ def check_max_date_vs_reference(self, df_to_test, df_to_reference, checking_date
checking_date,
geo_type,
signal_type,
"reference df has days beyond the max date in the =df_to_test="))
"reference df has days beyond the max date in the =df_to_test= "
f"{df_to_test['time_value'].max()} < "
f"{df_to_reference['time_value'].max().date()}"))

report.increment_total_checks()

Expand Down Expand Up @@ -459,7 +465,8 @@ def check_rapid_change_num_rows(self, df_to_test, df_to_reference, checking_date
geo_type,
signal_type,
"Number of rows per day seems to have changed rapidly (reference "
"vs test data)"))
"vs test data); "
f"relative difference: {abs(compare_rows)} > 0.35"))
report.increment_total_checks()

def check_positive_negative_spikes(self, source_df, api_frames, geo, sig, report):
Expand Down
13 changes: 11 additions & 2 deletions _delphi_utils_python/delphi_utils/validator/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ def __init__(self, errors_to_suppress: List[ValidationFailure],
Warnings raised from validation execution
unsuppressed_errors: List[Exception]
Errors raised from validation failures not found in `self.errors_to_suppress`
elapsed_time_in_seconds: float
Elapsed time of validation run, rounded down
"""
self.errors_to_suppress = errors_to_suppress
self.data_source = data_source
Expand All @@ -44,6 +46,7 @@ def __init__(self, errors_to_suppress: List[ValidationFailure],
self.raised_warnings = []
self.unsuppressed_errors = []
self.dry_run = dry_run
self.elapsed_time_in_seconds = 0
melange396 marked this conversation as resolved.
Show resolved Hide resolved
# pylint: enable=R0902

def add_raised_error(self, error):
Expand All @@ -68,6 +71,10 @@ def increment_total_checks(self):
"""Record a check."""
self.total_checks += 1

def set_elapsed_time_in_seconds(self, time):
"""Set elapsed runtime in seconds for later logging."""
self.elapsed_time_in_seconds = time

def add_raised_warning(self, warning):
"""Add a warning to the report.

Expand All @@ -94,15 +101,17 @@ def log(self, logger=None):
checks_failed = len(self.unsuppressed_errors),
checks_suppressed = self.num_suppressed,
warnings = len(self.raised_warnings),
phase = "validation")
phase = "validation",
elapsed_time_in_seconds=self.elapsed_time_in_seconds)
else:
logger.info("Validation run unsuccessful",
data_source = self.data_source,
checks_run = self.total_checks,
checks_failed = len(self.unsuppressed_errors),
checks_suppressed = self.num_suppressed,
warnings = len(self.raised_warnings),
phase="validation")
phase="validation",
elapsed_time_in_seconds=self.elapsed_time_in_seconds)
# Threshold for slack alerts if warnings are excessive,
# Currently extremely strict, set by observation of 1 month's logs
excessive_warnings = self.total_checks > 0 and \
Expand Down
18 changes: 13 additions & 5 deletions _delphi_utils_python/delphi_utils/validator/static.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ def find_all_unexpected_geo_ids(df_to_test, geo_regex, geo_type):
ValidationFailure(
"check_geo_id_type",
filename=nameformat,
message="geo_ids saved as floats; strings preferred"))
message=f"{len(leftover)} geo_ids saved as floats; strings preferred"))
melange396 marked this conversation as resolved.
Show resolved Hide resolved

if geo_type in fill_len.keys():
# Left-pad with zeroes up to expected length. Fixes missing leading zeroes
Expand Down Expand Up @@ -281,29 +281,35 @@ def check_bad_val(self, df_to_test, nameformat, signal_type, report):

if percent_option:
if not df_to_test[(df_to_test['val'] > 100)].empty:
bad_values = df_to_test[(df_to_test['val'] > 100)]['val'].unique()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice touch with the .unique() filter! should we do that for all the messages that print lists of values?

report.add_raised_error(
ValidationFailure(
"check_val_pct_gt_100",
filename=nameformat,
message="val column can't have any cell greater than 100 for percents"))
message="val column can't have any cell greater than 100 for percents; "
f"invalid values: {bad_values}"))

report.increment_total_checks()

if proportion_option:
if not df_to_test[(df_to_test['val'] > 100000)].empty:
bad_values = df_to_test[(df_to_test['val'] > 100000)]['val'].unique()
report.add_raised_error(
ValidationFailure("check_val_prop_gt_100k",
filename=nameformat,
message="val column can't have any cell greater than 100000 "
"for proportions"))
"for proportions; "
f"invalid values: {bad_values}"))

report.increment_total_checks()

if not df_to_test[(df_to_test['val'] < 0)].empty:
bad_values = df_to_test[(df_to_test['val'] < 0)]['val'].unique()
report.add_raised_error(
ValidationFailure("check_val_lt_0",
filename=nameformat,
message="val column can't have any cell smaller than 0"))
message="val column can't have any cell smaller than 0; "
f"invalid values: {bad_values}"))

report.increment_total_checks()

Expand Down Expand Up @@ -346,10 +352,12 @@ def check_bad_se(self, df_to_test, nameformat, report):
report.increment_total_checks()

if df_to_test["se"].isnull().mean() > 0.5:
bad_mean = round(df_to_test["se"].isnull().mean() * 100, 2)
report.add_raised_error(
ValidationFailure("check_se_many_missing",
filename=nameformat,
message='Recent se values are >50% NA'))
message='Many recent se values are missing: '
f'{bad_mean} > 50%'))
melange396 marked this conversation as resolved.
Show resolved Hide resolved

report.increment_total_checks()

Expand Down
3 changes: 3 additions & 0 deletions _delphi_utils_python/delphi_utils/validator/validate.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
"""Tools to validate CSV source data, including various check methods."""
import time
from .datafetcher import load_all_files
from .dynamic import DynamicValidator
from .errors import ValidationFailure
Expand Down Expand Up @@ -54,11 +55,13 @@ def validate(self):
Returns:
- ValidationReport collating the validation outcomes
"""
start_time = time.time()
report = ValidationReport(self.suppressed_errors, self.data_source, self.dry_run)
frames_list = load_all_files(self.export_dir, self.time_window.start_date,
self.time_window.end_date)
self.static_validation.validate(frames_list, report)
# Dynamic Validation only performed when frames_list is populated
if len(frames_list) > 0:
self.dynamic_validation.validate(aggregate_frames(frames_list), report)
report.set_elapsed_time_in_seconds(round(time.time() - start_time, 2))
return report
Loading