diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/AtLeastOnceConsumerStream.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/AtLeastOnceConsumerStream.java index ff96627ac6..98de3e1b5b 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/AtLeastOnceConsumerStream.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/AtLeastOnceConsumerStream.java @@ -57,9 +57,6 @@ final class AtLeastOnceConsumerStream implements KafkaConsumerStream { private final Sink dreSink; private final Sink unexpectedMessageSink; private final Consumer.DrainingControl consumerControl; - private final KafkaConsumerMetricsRegistry kafkaConsumerMetricsRegistry; - private final ConnectionId connectionId; - private final String consumerId; AtLeastOnceConsumerStream( final Supplier, Consumer.Control>> sourceSupplier, @@ -77,7 +74,6 @@ final class AtLeastOnceConsumerStream implements KafkaConsumerStream { final String consumerId) { this.ackMonitor = ackMonitor; - this.consumerId = consumerId; // Pre materialize sinks with MergeHub to avoid multiple materialization per kafka record in processTransformationResult externalMessageSink = MergeHub.of(KafkaAcknowledgableMessage.class) @@ -106,9 +102,7 @@ final class AtLeastOnceConsumerStream implements KafkaConsumerStream { .toMat(Committer.sink(committerSettings), Consumer::createDrainingControl) .run(materializer); - this.kafkaConsumerMetricsRegistry = kafkaConsumerMetricsRegistry; - this.connectionId = connectionId; - registerForMetricCollection(); + kafkaConsumerMetricsRegistry.registerConsumer(connectionId, consumerControl, consumerId); } @Override @@ -118,7 +112,6 @@ public CompletionStage whenComplete(final BiConsumer stop() { - kafkaConsumerMetricsRegistry.deregisterConsumer(connectionId, consumerId); return consumerControl.drainAndShutdown(materializer.executionContext()); } @@ -186,8 +179,4 @@ private static DittoRuntimeException extractDittoRuntimeException(final Committa .orElseThrow(); // at this point, the DRE is present } - private void registerForMetricCollection() { - kafkaConsumerMetricsRegistry.registerConsumer(connectionId, consumerControl, consumerId); - } - } diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/AtMostOnceConsumerStream.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/AtMostOnceConsumerStream.java index c3fe476b7d..3136486ab3 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/AtMostOnceConsumerStream.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/AtMostOnceConsumerStream.java @@ -52,9 +52,6 @@ final class AtMostOnceConsumerStream implements KafkaConsumerStream { private final Sink externalMessageSink; private final Sink dreSink; private final Sink unexpectedMessageSink; - private final KafkaConsumerMetricsRegistry kafkaConsumerMetricsRegistry; - private final ConnectionId connectionId; - private final String consumerId; AtMostOnceConsumerStream( final Supplier, Consumer.Control>> sourceSupplier, @@ -70,7 +67,6 @@ final class AtMostOnceConsumerStream implements KafkaConsumerStream { final String consumerId) { this.materializer = materializer; - this.consumerId = consumerId; // Pre materialize sinks with MergeHub to avoid multiple materialization per kafka record in processTransformationResult externalMessageSink = MergeHub.of(KafkaCompletableMessage.class) @@ -99,9 +95,7 @@ final class AtMostOnceConsumerStream implements KafkaConsumerStream { .toMat(Sink.ignore(), Consumer::createDrainingControl) .run(materializer); - this.kafkaConsumerMetricsRegistry = kafkaConsumerMetricsRegistry; - this.connectionId = connectionId; - registerForMetricCollection(); + kafkaConsumerMetricsRegistry.registerConsumer(connectionId, consumerControl, consumerId); } @Override @@ -111,7 +105,6 @@ public CompletionStage whenComplete(final BiConsumer stop() { - kafkaConsumerMetricsRegistry.deregisterConsumer(connectionId, consumerId); return consumerControl.drainAndShutdown(materializer.executionContext()); } @@ -164,8 +157,4 @@ private static DittoRuntimeException extractDittoRuntimeException(final Transfor return value.getDittoRuntimeException().orElseThrow(); // at this point, the DRE is present } - private void registerForMetricCollection() { - kafkaConsumerMetricsRegistry.registerConsumer(connectionId, consumerControl, consumerId); - } - } diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaConsumerMetricsRegistry.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaConsumerMetricsRegistry.java index 5efb08f99b..a0cdbf8c8e 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaConsumerMetricsRegistry.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaConsumerMetricsRegistry.java @@ -74,6 +74,8 @@ void registerConsumer(final ConnectionId connectionId, final Consumer.DrainingCo LOGGER.debug("Registering new consumer for metric reporting: <{}:{}>", connectionId, consumerId); metricsMap.put(new CacheKey(connectionId, consumerId), KafkaConsumerMetrics.newInstance(consumerControl, connectionId, consumerId)); + consumerControl.streamCompletion() + .whenComplete((done, error) -> deregisterConsumer(connectionId, consumerId)); } /** @@ -82,7 +84,7 @@ void registerConsumer(final ConnectionId connectionId, final Consumer.DrainingCo * @param connectionId the connectionId the consumer belongs to. * @param consumerId the unique identifier of the consumer stream. */ - void deregisterConsumer(final ConnectionId connectionId, final String consumerId) { + private void deregisterConsumer(final ConnectionId connectionId, final String consumerId) { LOGGER.debug("De-registering consumer for metric reporting: <{}:{}>", connectionId, consumerId); metricsMap.remove(new CacheKey(connectionId, consumerId)); }