From 5321f50bf0c9c134caa37b474a3b2e47fb6d0cc6 Mon Sep 17 00:00:00 2001 From: David Schwilk Date: Wed, 12 Jan 2022 08:14:55 +0100 Subject: [PATCH] Add counter for connectivity acks to measure messages waiting for acks in a given moment Signed-off-by: David Schwilk --- .../service/messaging/BaseConsumerActor.java | 26 +++++++++++++------ 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/BaseConsumerActor.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/BaseConsumerActor.java index c2a824208f..e775306556 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/BaseConsumerActor.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/BaseConsumerActor.java @@ -65,6 +65,7 @@ public abstract class BaseConsumerActor extends AbstractActorWithTimers { private static final String TIMER_ACK_HANDLING = "connectivity_ack_handling"; + private static final String COUNTER_ACK_HANDLING = "connectivity_ack_handling_counter"; protected final String sourceAddress; protected final Source source; @@ -130,12 +131,16 @@ private ExternalMessageWithSender withSender(final AcknowledgeableMessage acknow private void prepareResponseHandler(final AcknowledgeableMessage acknowledgeableMessage, final ActorRef responseCollector) { - final StartedTimer timer = DittoMetrics.timer(TIMER_ACK_HANDLING) + final StartedTimer ackTimer = DittoMetrics.timer(TIMER_ACK_HANDLING) .tag(TracingTags.CONNECTION_ID, connectionId.toString()) .tag(TracingTags.CONNECTION_TYPE, connectionType.getName()) .start(); + final var ackCounter = DittoMetrics.gauge(COUNTER_ACK_HANDLING) + .tag(TracingTags.CONNECTION_ID, connectionId.toString()) + .tag(TracingTags.CONNECTION_TYPE, connectionType.toString()) + .increment(); final Context traceContext = DittoTracing.extractTraceContext(acknowledgeableMessage.getMessage().getHeaders()); - DittoTracing.wrapTimer(traceContext, timer); + DittoTracing.wrapTimer(traceContext, ackTimer); final Duration askTimeout = acknowledgementConfig.getCollectorFallbackAskTimeout(); // Ask response collector actor to get the collected responses in a future @@ -156,7 +161,8 @@ private void prepareResponseHandler(final AcknowledgeableMessage acknowledgeable if (output != null) { final List> failedResponses = output.getFailedResponses(); if (output.allExpectedResponsesArrived() && failedResponses.isEmpty()) { - timer.tag(TracingTags.ACK_SUCCESS, true).stop(); + ackTimer.tag(TracingTags.ACK_SUCCESS, true).stop(); + ackCounter.decrement(); acknowledgeableMessage.settle(); } else { // empty failed responses indicate that SetCount was missing @@ -164,9 +170,10 @@ private void prepareResponseHandler(final AcknowledgeableMessage acknowledgeable someFailedResponseRequiresRedelivery(failedResponses); log().debug("Rejecting [redeliver={}] due to failed responses <{}>. " + "ResponseCollector=<{}>", shouldRedeliver, failedResponses, responseCollector); - timer.tag(TracingTags.ACK_SUCCESS, false) + ackTimer.tag(TracingTags.ACK_SUCCESS, false) .tag(TracingTags.ACK_REDELIVER, shouldRedeliver) .stop(); + ackCounter.decrement(); acknowledgeableMessage.reject(shouldRedeliver); } } else { @@ -177,23 +184,26 @@ private void prepareResponseHandler(final AcknowledgeableMessage acknowledgeable // Redeliver and pray this unexpected error goes away log().debug("Rejecting [redeliver=true] due to error <{}>. " + "ResponseCollector=<{}>", rootCause, responseCollector); - timer.tag(TracingTags.ACK_SUCCESS, false) + ackTimer.tag(TracingTags.ACK_SUCCESS, false) .tag(TracingTags.ACK_REDELIVER, true) .stop(); + ackCounter.decrement(); acknowledgeableMessage.reject(true); return null; }); if (dittoRuntimeException != null) { if (isConsideredSuccess(dittoRuntimeException)) { - timer.tag(TracingTags.ACK_SUCCESS, true).stop(); + ackTimer.tag(TracingTags.ACK_SUCCESS, true).stop(); + ackCounter.tag(TracingTags.ACK_SUCCESS, true).decrement(); acknowledgeableMessage.settle(); } else { final var shouldRedeliver = requiresRedelivery(dittoRuntimeException.getHttpStatus()); log().debug("Rejecting [redeliver={}] due to error <{}>. ResponseCollector=<{}>", shouldRedeliver, dittoRuntimeException, responseCollector); - timer.tag(TracingTags.ACK_SUCCESS, false) + ackTimer.tag(TracingTags.ACK_SUCCESS, false) .tag(TracingTags.ACK_REDELIVER, shouldRedeliver) .stop(); + ackCounter.decrement(); acknowledgeableMessage.reject(shouldRedeliver); } } @@ -211,7 +221,7 @@ private void prepareResponseHandler(final AcknowledgeableMessage acknowledgeable */ protected final Sink getDittoRuntimeExceptionSink() { return Flow.fromFunction( - message -> message.setDittoHeaders(enrichHeadersWithReplyInformation(message.getDittoHeaders()))) + message -> message.setDittoHeaders(enrichHeadersWithReplyInformation(message.getDittoHeaders()))) .via(Flow.fromFunction(value -> { inboundMonitor.failure(value.getDittoHeaders(), value); return value;