Skip to content

Commit

Permalink
Use only viable metrics (#36)
Browse files Browse the repository at this point in the history
This adds decision_changed/unchanged/failed counters to keep track of
each decision_context value from policies that is used when new results
and waivers are received.

Messaging metrics collected by old consumers contain the "handler" label
with suffix "_consumer".
  • Loading branch information
hluk committed May 19, 2022
1 parent f528e5d commit 29bd1ea
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 31 deletions.
21 changes: 10 additions & 11 deletions greenwave/consumers/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@
import greenwave.decision

from greenwave.monitor import (
publish_decision_exceptions_result_counter,
messaging_tx_to_send_counter, messaging_tx_stopped_counter,
messaging_tx_sent_ok_counter, messaging_tx_failed_counter)
decision_changed_counter,
decision_failed_counter,
decision_unchanged_counter,
messaging_tx_sent_ok_counter,
messaging_tx_failed_counter,
)
from greenwave.policies import applicable_decision_context_product_version_pairs
from greenwave.utils import right_before_this_time

Expand Down Expand Up @@ -147,7 +150,6 @@ def _publish_decision_update_fedora_messaging(self, decision):
log.error(
'Fedora Messaging broker rejected message %s: %s',
msg.id, e)
self._inc(messaging_tx_stopped_counter)
except fedora_messaging.exceptions.ConnectionException as e:
log.error('Error sending message %s: %s', msg.id, e)
self._inc(messaging_tx_failed_counter)
Expand All @@ -168,12 +170,10 @@ def _old_and_new_decisions(self, submit_time, **request_data):
log.debug('old decision: %s', old_decision)
except requests.exceptions.HTTPError as e:
log.exception('Failed to retrieve decision for data=%s, error: %s', request_data, e)
self._inc(messaging_tx_stopped_counter)
return None, None

return old_decision, decision

@publish_decision_exceptions_result_counter.count_exceptions()
def _publish_decision_change(
self,
submit_time,
Expand All @@ -195,8 +195,6 @@ def _publish_decision_change(
policies, **policy_attributes)

for decision_context, product_version in sorted(contexts_product_versions):
self._inc(messaging_tx_to_send_counter)

old_decision, decision = self._old_and_new_decisions(
submit_time,
decision_context=decision_context,
Expand All @@ -205,13 +203,16 @@ def _publish_decision_change(
subject_identifier=subject.identifier,
)
if decision is None:
self._inc(decision_failed_counter.labels(decision_context=decision_context))
continue

if _is_decision_unchanged(old_decision, decision):
log.debug('Decision unchanged: %s', decision)
self._inc(messaging_tx_stopped_counter)
self._inc(decision_unchanged_counter.labels(decision_context=decision_context))
continue

self._inc(decision_changed_counter.labels(decision_context=decision_context))

decision.update({
'subject_type': subject.type,
'subject_identifier': subject.identifier,
Expand All @@ -229,5 +230,3 @@ def _publish_decision_change(
self._publish_decision_update_fedmsg(decision)
elif self.flask_app.config['MESSAGING'] == 'fedora-messaging':
self._publish_decision_update_fedora_messaging(decision)

self._inc(messaging_tx_stopped_counter)
2 changes: 1 addition & 1 deletion greenwave/consumers/resultsdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class ResultsDBHandler(Consumer):
config_key = 'resultsdb_handler'
hub_config_prefix = 'resultsdb_'
default_topic = 'taskotron.result.new'
monitor_labels = {'handler': 'resultsdb'}
monitor_labels = {'handler': 'resultsdb_consumer'}

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
Expand Down
2 changes: 1 addition & 1 deletion greenwave/consumers/waiverdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class WaiverDBHandler(Consumer):
config_key = 'waiverdb_handler'
hub_config_prefix = 'waiverdb_'
default_topic = 'waiver.new'
monitor_labels = {'handler': 'waiverdb'}
monitor_labels = {'handler': 'waiverdb_consumer'}

def _consume_message(self, message):
msg = message['msg']
Expand Down
18 changes: 8 additions & 10 deletions greenwave/listeners/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@
import greenwave.app_factory
from greenwave.logger import init_logging, log_to_stdout
from greenwave.monitor import (
decision_changed_counter,
decision_failed_counter,
decision_unchanged_counter,
messaging_rx_counter,
messaging_rx_failed_counter,
messaging_rx_ignored_counter,
messaging_rx_processed_ok_counter,
messaging_tx_failed_counter,
messaging_tx_sent_ok_counter,
messaging_tx_stopped_counter,
messaging_tx_to_send_counter,
publish_decision_exceptions_result_counter,
)
from greenwave.policies import applicable_decision_context_product_version_pairs
from greenwave.utils import right_before_this_time
Expand Down Expand Up @@ -65,7 +65,7 @@ def _send_nack(listener, headers):


class BaseListener(stomp.ConnectionListener):
monitor_labels = {"handler": "greenwave_consumer"}
monitor_labels = {"handler": "greenwave_listener"}

def __init__(self, uid_suffix, config_obj=None):
super().__init__()
Expand Down Expand Up @@ -237,12 +237,10 @@ def _old_and_new_decisions(self, submit_time, **request_data):
self.app.logger.exception(
"Failed to retrieve decision for data=%s, error: %s", request_data, e
)
self._inc(messaging_tx_stopped_counter)
return None, None

return old_decision, decision

@publish_decision_exceptions_result_counter.count_exceptions()
def _publish_decision_change(
self, submit_time, subject, testcase, product_version, publish_testcase
):
Expand All @@ -263,8 +261,6 @@ def _publish_decision_change(
self.app.logger.info("Getting greenwave info")

for decision_context, product_version in sorted(contexts_product_versions):
self._inc(messaging_tx_to_send_counter)

old_decision, decision = self._old_and_new_decisions(
submit_time,
decision_context=decision_context,
Expand All @@ -273,15 +269,18 @@ def _publish_decision_change(
subject_identifier=subject.identifier,
)
if decision is None:
self._inc(decision_failed_counter.labels(decision_context=decision_context))
continue

if _is_decision_unchanged(old_decision, decision):
self.app.logger.debug(
"Skipped emitting fedmsg, decision did not change: %s", decision
)
self._inc(messaging_tx_stopped_counter)
self._inc(decision_unchanged_counter.labels(decision_context=decision_context))
continue

self._inc(decision_changed_counter.labels(decision_context=decision_context))

decision.update(
{
"subject_type": subject.type,
Expand All @@ -298,4 +297,3 @@ def _publish_decision_change(

self.app.logger.info("Publishing decision change message: %r", decision)
self._publish_decision_update(decision)
self._inc(messaging_tx_stopped_counter)
14 changes: 6 additions & 8 deletions greenwave/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,6 @@ def wrapper(*args, **kwargs):
# Number of received messages, which failed during processing
messaging_rx_failed_counter = Counter('messaging_rx_failed')

# Total number of messages to send
messaging_tx_to_send_counter = Counter('messaging_tx_to_send')
# Number of messages, which were eventually stopped before sending
messaging_tx_stopped_counter = Counter('messaging_tx_stopped')
# Number of messages, which were sent successfully
messaging_tx_sent_ok_counter = Counter('messaging_tx_sent_ok')
# Number of messages, for which the sender failed
Expand All @@ -97,7 +93,9 @@ def wrapper(*args, **kwargs):
decision_exception_counter = Counter('total_decision_exceptions')
# Decision latency
decision_request_duration_seconds = Histogram('decision_request_duration_seconds')
# All exceptions occurred in publishing a message after a new waiver
publish_decision_exceptions_waiver_counter = Counter('publish_decision_exceptions_new_waiver')
# All exceptions occurred in publishing a message after a new result
publish_decision_exceptions_result_counter = Counter('publish_decision_exceptions_new_result')
# New result/waiver caused specific decision to change
decision_changed_counter = Counter('decision_changed')
# New result/waiver did not cause specific decision to change
decision_unchanged_counter = Counter('decision_unchanged')
# Failed to retrieve decision for a new result/waiver
decision_failed_counter = Counter('decision_failed')

0 comments on commit 29bd1ea

Please sign in to comment.