Skip to content

Commit

Permalink
De-Register automatically after the consumerControl has completed eit…
Browse files Browse the repository at this point in the history
…her successfully or execptionally

* This makes the registration more stable/consistent because it's less likely
  that a developer forgets to de-register or makes de-register wrong
  (see previous commit)

Signed-off-by: Yannic Klem <yannic.klem@bosch.io>
  • Loading branch information
Yannic92 committed Nov 11, 2021
1 parent 3aa2869 commit 362399b
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,6 @@ final class AtLeastOnceConsumerStream implements KafkaConsumerStream {
private final Sink<CommittableTransformationResult, NotUsed> dreSink;
private final Sink<CommittableTransformationResult, NotUsed> unexpectedMessageSink;
private final Consumer.DrainingControl<Done> consumerControl;
private final KafkaConsumerMetricsRegistry kafkaConsumerMetricsRegistry;
private final ConnectionId connectionId;
private final String consumerId;

AtLeastOnceConsumerStream(
final Supplier<Source<ConsumerMessage.CommittableMessage<String, String>, Consumer.Control>> sourceSupplier,
Expand All @@ -77,7 +74,6 @@ final class AtLeastOnceConsumerStream implements KafkaConsumerStream {
final String consumerId) {

this.ackMonitor = ackMonitor;
this.consumerId = consumerId;

// Pre materialize sinks with MergeHub to avoid multiple materialization per kafka record in processTransformationResult
externalMessageSink = MergeHub.of(KafkaAcknowledgableMessage.class)
Expand Down Expand Up @@ -106,9 +102,7 @@ final class AtLeastOnceConsumerStream implements KafkaConsumerStream {
.toMat(Committer.sink(committerSettings), Consumer::createDrainingControl)
.run(materializer);

this.kafkaConsumerMetricsRegistry = kafkaConsumerMetricsRegistry;
this.connectionId = connectionId;
registerForMetricCollection();
kafkaConsumerMetricsRegistry.registerConsumer(connectionId, consumerControl, consumerId);
}

@Override
Expand All @@ -118,7 +112,6 @@ public CompletionStage<Done> whenComplete(final BiConsumer<? super Done, ? super

@Override
public CompletionStage<Done> stop() {
kafkaConsumerMetricsRegistry.deregisterConsumer(connectionId, consumerId);
return consumerControl.drainAndShutdown(materializer.executionContext());
}

Expand Down Expand Up @@ -186,8 +179,4 @@ private static DittoRuntimeException extractDittoRuntimeException(final Committa
.orElseThrow(); // at this point, the DRE is present
}

private void registerForMetricCollection() {
kafkaConsumerMetricsRegistry.registerConsumer(connectionId, consumerControl, consumerId);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,6 @@ final class AtMostOnceConsumerStream implements KafkaConsumerStream {
private final Sink<KafkaCompletableMessage, NotUsed> externalMessageSink;
private final Sink<TransformationResult, NotUsed> dreSink;
private final Sink<TransformationResult, NotUsed> unexpectedMessageSink;
private final KafkaConsumerMetricsRegistry kafkaConsumerMetricsRegistry;
private final ConnectionId connectionId;
private final String consumerId;

AtMostOnceConsumerStream(
final Supplier<Source<ConsumerRecord<String, String>, Consumer.Control>> sourceSupplier,
Expand All @@ -70,7 +67,6 @@ final class AtMostOnceConsumerStream implements KafkaConsumerStream {
final String consumerId) {

this.materializer = materializer;
this.consumerId = consumerId;

// Pre materialize sinks with MergeHub to avoid multiple materialization per kafka record in processTransformationResult
externalMessageSink = MergeHub.of(KafkaCompletableMessage.class)
Expand Down Expand Up @@ -99,9 +95,7 @@ final class AtMostOnceConsumerStream implements KafkaConsumerStream {
.toMat(Sink.ignore(), Consumer::createDrainingControl)
.run(materializer);

this.kafkaConsumerMetricsRegistry = kafkaConsumerMetricsRegistry;
this.connectionId = connectionId;
registerForMetricCollection();
kafkaConsumerMetricsRegistry.registerConsumer(connectionId, consumerControl, consumerId);
}

@Override
Expand All @@ -111,7 +105,6 @@ public CompletionStage<Done> whenComplete(final BiConsumer<? super Done, ? super

@Override
public CompletionStage<Done> stop() {
kafkaConsumerMetricsRegistry.deregisterConsumer(connectionId, consumerId);
return consumerControl.drainAndShutdown(materializer.executionContext());
}

Expand Down Expand Up @@ -164,8 +157,4 @@ private static DittoRuntimeException extractDittoRuntimeException(final Transfor
return value.getDittoRuntimeException().orElseThrow(); // at this point, the DRE is present
}

private void registerForMetricCollection() {
kafkaConsumerMetricsRegistry.registerConsumer(connectionId, consumerControl, consumerId);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ void registerConsumer(final ConnectionId connectionId, final Consumer.DrainingCo
LOGGER.debug("Registering new consumer for metric reporting: <{}:{}>", connectionId, consumerId);
metricsMap.put(new CacheKey(connectionId, consumerId),
KafkaConsumerMetrics.newInstance(consumerControl, connectionId, consumerId));
consumerControl.streamCompletion()
.whenComplete((done, error) -> deregisterConsumer(connectionId, consumerId));
}

/**
Expand All @@ -82,7 +84,7 @@ void registerConsumer(final ConnectionId connectionId, final Consumer.DrainingCo
* @param connectionId the connectionId the consumer belongs to.
* @param consumerId the unique identifier of the consumer stream.
*/
void deregisterConsumer(final ConnectionId connectionId, final String consumerId) {
private void deregisterConsumer(final ConnectionId connectionId, final String consumerId) {
LOGGER.debug("De-registering consumer for metric reporting: <{}:{}>", connectionId, consumerId);
metricsMap.remove(new CacheKey(connectionId, consumerId));
}
Expand Down

0 comments on commit 362399b

Please sign in to comment.