Skip to content

Commit

Permalink
Review findings fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
desislava-marinova committed Jul 11, 2022
1 parent 7845c5e commit 2d3c6d3
Showing 1 changed file with 20 additions and 21 deletions.
Expand Up @@ -206,30 +206,29 @@ private void handleMatched(final SourceQueue<T> sourceQueue, final M match) {
}
receiveCounter.increment();
sourceQueue.offer(mapMessage(match))
.handle((result, error) -> incrementEnqueueCounters(match, result, error));
.whenComplete(this::incrementEnqueueCounters)
.thenAccept(result -> {
if (!result.isEnqueued()) {
messageDiscarded(match, result);
}
});
}

private Void incrementEnqueueCounters(final M message, final QueueOfferResult result, final Throwable error) {
try {
if (QueueOfferResult.enqueued().equals(result)) {
enqueueSuccessCounter.increment();
} else if (QueueOfferResult.dropped().equals(result)) {
enqueueDroppedCounter.increment();
logger.error("Dropped message as result of backpressure strategy! - result was: {} - adjust queue " +
"size or scaling if this appears regularly", result);
} else if (result instanceof final QueueOfferResult.Failure failure) {
logger.error(failure.cause(), "Enqueue failed!");
enqueueFailureCounter.increment();
} else {
logger.error(error, "Enqueue failed without acknowledgement!");
enqueueFailureCounter.increment();
}
return null;
} finally {
if (!result.isEnqueued()) {
messageDiscarded(message, result);
}
private Void incrementEnqueueCounters(final QueueOfferResult result, final Throwable error) {
if (QueueOfferResult.enqueued().equals(result)) {
enqueueSuccessCounter.increment();
} else if (QueueOfferResult.dropped().equals(result)) {
enqueueDroppedCounter.increment();
logger.error("Dropped message as result of backpressure strategy! - result was: {} - adjust queue " +
"size or scaling if this appears regularly", result);
} else if (result instanceof final QueueOfferResult.Failure failure) {
logger.error(failure.cause(), "Enqueue failed!");
enqueueFailureCounter.increment();
} else {
logger.error(error, "Enqueue failed without acknowledgement!");
enqueueFailureCounter.increment();
}
return null;
}

private void handleUnknownThrowable(final Throwable unknownThrowable) {
Expand Down

0 comments on commit 2d3c6d3

Please sign in to comment.