Skip to content

Commit

Permalink
Count successful and failed acknowledgements in connection metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Yannic Klem <yannic.klem@bosch.io>
  • Loading branch information
Yannic92 committed Sep 3, 2021
1 parent d6a7fb8 commit ece8ad3
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ final class AtLeastOnceConsumerStream implements KafkaConsumerStream {
private static final Logger LOGGER = DittoLoggerFactory.getThreadSafeLogger(AtLeastOnceConsumerStream.class);

private final ConnectionMonitor inboundMonitor;
private final ConnectionMonitor ackMonitor;
private final Materializer materializer;
private final Sink<AcknowledgeableMessage, NotUsed> inboundMappingSink;
private final Sink<DittoRuntimeException, ?> dreSink;
Expand All @@ -59,10 +60,12 @@ final class AtLeastOnceConsumerStream implements KafkaConsumerStream {
final boolean dryRun,
final Materializer materializer,
final ConnectionMonitor inboundMonitor,
final ConnectionMonitor ackMonitor,
final Sink<AcknowledgeableMessage, NotUsed> inboundMappingSink,
final Sink<DittoRuntimeException, ?> dreSink) {

this.inboundMonitor = inboundMonitor;
this.ackMonitor = ackMonitor;
this.inboundMappingSink = inboundMappingSink;
this.dreSink = dreSink;
this.materializer = materializer;
Expand Down Expand Up @@ -91,7 +94,7 @@ private Source<CompletableFuture<CommittableOffset>, NotUsed> processTransformat
final CommittableTransformationResult result) {
if (isExternalMessage(result)) {
return Source.single(result)
.map(AtLeastOnceConsumerStream::toAcknowledgeableMessage)
.map(this::toAcknowledgeableMessage)
.alsoTo(this.externalMessageSink())
.map(KafkaAcknowledgableMessage::getAcknowledgementFuture);
}
Expand Down Expand Up @@ -122,12 +125,12 @@ private static boolean isExternalMessage(final CommittableTransformationResult t
return transformationResult.getTransformationResult().getExternalMessage().isPresent();
}

private static KafkaAcknowledgableMessage toAcknowledgeableMessage(final CommittableTransformationResult value) {
private KafkaAcknowledgableMessage toAcknowledgeableMessage(final CommittableTransformationResult value) {
final ExternalMessage externalMessage = value.getTransformationResult()
.getExternalMessage()
.orElseThrow(); // at this point, the ExternalMessage is present
final CommittableOffset committableOffset = value.getCommittableOffset();
return new KafkaAcknowledgableMessage(externalMessage, committableOffset);
return new KafkaAcknowledgableMessage(externalMessage, committableOffset, ackMonitor);
}

private static boolean isNotDryRun(final ConsumerRecord<String, String> cRecord, final boolean dryRun) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.eclipse.ditto.connectivity.api.ExternalMessage;
import org.eclipse.ditto.connectivity.service.messaging.AcknowledgeableMessage;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.ConnectionMonitor;

import akka.kafka.ConsumerMessage;

Expand All @@ -38,14 +39,20 @@ final class KafkaAcknowledgableMessage {
private final CompletableFuture<ConsumerMessage.CommittableOffset> acknowledgementFuture;

KafkaAcknowledgableMessage(final ExternalMessage message,
final ConsumerMessage.CommittableOffset committableOffset) {
final ConsumerMessage.CommittableOffset committableOffset,
final ConnectionMonitor ackMonitor) {
this.acknowledgementFuture = new CompletableFuture<>();
this.acknowledgeableMessage = AcknowledgeableMessage.of(message,
() -> acknowledgementFuture.complete(committableOffset),
() -> {
ackMonitor.success(message);
acknowledgementFuture.complete(committableOffset);
},
shouldRedeliver -> {
if (shouldRedeliver) {
ackMonitor.exception("Message was rejected and redelivery is requested.");
acknowledgementFuture.completeExceptionally(MessageRejectedException.getInstance());
} else {
ackMonitor.exception("Message was rejected and no redelivery is requested.");
acknowledgementFuture.complete(committableOffset);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ private KafkaConsumerActor(final Connection connection,
() -> {
final KafkaConsumerStream kafkaConsumerStream =
streamFactory.newAtLeastOnceConsumerStream(materializer, inboundMonitor,
getMessageMappingSink(), getDittoRuntimeExceptionSink());
inboundAcknowledgedMonitor, getMessageMappingSink(),
getDittoRuntimeExceptionSink());
kafkaConsumerStream.whenComplete(this::handleStreamCompletion);
return kafkaConsumerStream;
}, exponentialBackOffConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ KafkaConsumerStream newAtMostOnceConsumerStream(
KafkaConsumerStream newAtLeastOnceConsumerStream(
final Materializer materializer,
final ConnectionMonitor inboundMonitor,
final ConnectionMonitor ackMonitor,
final Sink<AcknowledgeableMessage, NotUsed> messageMappingSink,
final Sink<DittoRuntimeException, ?> dreSink) {

Expand All @@ -108,6 +109,7 @@ KafkaConsumerStream newAtLeastOnceConsumerStream(
dryRun,
materializer,
inboundMonitor,
ackMonitor,
messageMappingSink,
dreSink);
}
Expand Down

0 comments on commit ece8ad3

Please sign in to comment.