Skip to content

Commit

Permalink
Merge pull request #1275 from bosch-io/feature/waiting-ack-counter
Browse files Browse the repository at this point in the history
Count connectivity messages awaiting acknowledgements
  • Loading branch information
jokraehe committed Jan 17, 2022
2 parents 3170ae9 + 5321f50 commit 4da346a
Showing 1 changed file with 18 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
public abstract class BaseConsumerActor extends AbstractActorWithTimers {

private static final String TIMER_ACK_HANDLING = "connectivity_ack_handling";
private static final String COUNTER_ACK_HANDLING = "connectivity_ack_handling_counter";

protected final String sourceAddress;
protected final Source source;
Expand Down Expand Up @@ -130,12 +131,16 @@ private ExternalMessageWithSender withSender(final AcknowledgeableMessage acknow
private void prepareResponseHandler(final AcknowledgeableMessage acknowledgeableMessage,
final ActorRef responseCollector) {

final StartedTimer timer = DittoMetrics.timer(TIMER_ACK_HANDLING)
final StartedTimer ackTimer = DittoMetrics.timer(TIMER_ACK_HANDLING)
.tag(TracingTags.CONNECTION_ID, connectionId.toString())
.tag(TracingTags.CONNECTION_TYPE, connectionType.getName())
.start();
final var ackCounter = DittoMetrics.gauge(COUNTER_ACK_HANDLING)
.tag(TracingTags.CONNECTION_ID, connectionId.toString())
.tag(TracingTags.CONNECTION_TYPE, connectionType.toString())
.increment();
final Context traceContext = DittoTracing.extractTraceContext(acknowledgeableMessage.getMessage().getHeaders());
DittoTracing.wrapTimer(traceContext, timer);
DittoTracing.wrapTimer(traceContext, ackTimer);

final Duration askTimeout = acknowledgementConfig.getCollectorFallbackAskTimeout();
// Ask response collector actor to get the collected responses in a future
Expand All @@ -156,17 +161,19 @@ private void prepareResponseHandler(final AcknowledgeableMessage acknowledgeable
if (output != null) {
final List<CommandResponse<?>> failedResponses = output.getFailedResponses();
if (output.allExpectedResponsesArrived() && failedResponses.isEmpty()) {
timer.tag(TracingTags.ACK_SUCCESS, true).stop();
ackTimer.tag(TracingTags.ACK_SUCCESS, true).stop();
ackCounter.decrement();
acknowledgeableMessage.settle();
} else {
// empty failed responses indicate that SetCount was missing
final boolean shouldRedeliver = failedResponses.isEmpty() ||
someFailedResponseRequiresRedelivery(failedResponses);
log().debug("Rejecting [redeliver={}] due to failed responses <{}>. " +
"ResponseCollector=<{}>", shouldRedeliver, failedResponses, responseCollector);
timer.tag(TracingTags.ACK_SUCCESS, false)
ackTimer.tag(TracingTags.ACK_SUCCESS, false)
.tag(TracingTags.ACK_REDELIVER, shouldRedeliver)
.stop();
ackCounter.decrement();
acknowledgeableMessage.reject(shouldRedeliver);
}
} else {
Expand All @@ -177,23 +184,26 @@ private void prepareResponseHandler(final AcknowledgeableMessage acknowledgeable
// Redeliver and pray this unexpected error goes away
log().debug("Rejecting [redeliver=true] due to error <{}>. " +
"ResponseCollector=<{}>", rootCause, responseCollector);
timer.tag(TracingTags.ACK_SUCCESS, false)
ackTimer.tag(TracingTags.ACK_SUCCESS, false)
.tag(TracingTags.ACK_REDELIVER, true)
.stop();
ackCounter.decrement();
acknowledgeableMessage.reject(true);
return null;
});
if (dittoRuntimeException != null) {
if (isConsideredSuccess(dittoRuntimeException)) {
timer.tag(TracingTags.ACK_SUCCESS, true).stop();
ackTimer.tag(TracingTags.ACK_SUCCESS, true).stop();
ackCounter.tag(TracingTags.ACK_SUCCESS, true).decrement();
acknowledgeableMessage.settle();
} else {
final var shouldRedeliver = requiresRedelivery(dittoRuntimeException.getHttpStatus());
log().debug("Rejecting [redeliver={}] due to error <{}>. ResponseCollector=<{}>",
shouldRedeliver, dittoRuntimeException, responseCollector);
timer.tag(TracingTags.ACK_SUCCESS, false)
ackTimer.tag(TracingTags.ACK_SUCCESS, false)
.tag(TracingTags.ACK_REDELIVER, shouldRedeliver)
.stop();
ackCounter.decrement();
acknowledgeableMessage.reject(shouldRedeliver);
}
}
Expand All @@ -211,7 +221,7 @@ private void prepareResponseHandler(final AcknowledgeableMessage acknowledgeable
*/
protected final Sink<DittoRuntimeException, ?> getDittoRuntimeExceptionSink() {
return Flow.<DittoRuntimeException, DittoRuntimeException>fromFunction(
message -> message.setDittoHeaders(enrichHeadersWithReplyInformation(message.getDittoHeaders())))
message -> message.setDittoHeaders(enrichHeadersWithReplyInformation(message.getDittoHeaders())))
.via(Flow.<DittoRuntimeException, Object>fromFunction(value -> {
inboundMonitor.failure(value.getDittoHeaders(), value);
return value;
Expand Down

0 comments on commit 4da346a

Please sign in to comment.