Skip to content

Commit

Permalink
Remove ConsumerMetricsRegistry and schedule reporting in consumer act…
Browse files Browse the repository at this point in the history
…or instead

* This avoids manual thread handling

Signed-off-by: Yannic Klem <yannic.klem@bosch.io>
  • Loading branch information
Yannic92 committed Nov 12, 2021
1 parent cbda604 commit ef6eb9e
Show file tree
Hide file tree
Showing 11 changed files with 56 additions and 178 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ final class AtLeastOnceConsumerStream implements KafkaConsumerStream {
private final Sink<CommittableTransformationResult, NotUsed> dreSink;
private final Sink<CommittableTransformationResult, NotUsed> unexpectedMessageSink;
private final Consumer.DrainingControl<Done> consumerControl;
private final KafkaConsumerMetrics consumerMetrics;

AtLeastOnceConsumerStream(
final Supplier<Source<ConsumerMessage.CommittableMessage<String, String>, Consumer.Control>> sourceSupplier,
Expand All @@ -69,7 +70,6 @@ final class AtLeastOnceConsumerStream implements KafkaConsumerStream {
final ConnectionMonitor ackMonitor,
final Graph<SinkShape<AcknowledgeableMessage>, NotUsed> inboundMappingSink,
final Graph<SinkShape<DittoRuntimeException>, ?> exceptionSink,
final KafkaConsumerMetricsRegistry kafkaConsumerMetricsRegistry,
final ConnectionId connectionId,
final String consumerId) {

Expand Down Expand Up @@ -102,7 +102,7 @@ final class AtLeastOnceConsumerStream implements KafkaConsumerStream {
.toMat(Committer.sink(committerSettings), Consumer::createDrainingControl)
.run(materializer);

registerForMetricCollection(kafkaConsumerMetricsRegistry, connectionId, consumerId);
consumerMetrics = KafkaConsumerMetrics.newInstance(consumerControl, connectionId, consumerId);
}

@Override
Expand All @@ -115,6 +115,11 @@ public CompletionStage<Done> stop() {
return consumerControl.drainAndShutdown(materializer.executionContext());
}

@Override
public void reportMetrics() {
consumerMetrics.reportMetrics();
}

private Source<CompletableFuture<CommittableOffset>, NotUsed> processTransformationResult(
final CommittableTransformationResult result) {

Expand Down Expand Up @@ -179,10 +184,4 @@ private static DittoRuntimeException extractDittoRuntimeException(final Committa
.orElseThrow(); // at this point, the DRE is present
}

private void registerForMetricCollection(final KafkaConsumerMetricsRegistry kafkaConsumerMetricsRegistry,
final ConnectionId connectionId, final String consumerId) {

kafkaConsumerMetricsRegistry.registerConsumer(connectionId, consumerControl, consumerId);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ final class AtMostOnceConsumerStream implements KafkaConsumerStream {
private final Sink<KafkaCompletableMessage, NotUsed> externalMessageSink;
private final Sink<TransformationResult, NotUsed> dreSink;
private final Sink<TransformationResult, NotUsed> unexpectedMessageSink;
private final KafkaConsumerMetrics consumerMetrics;

AtMostOnceConsumerStream(
final Supplier<Source<ConsumerRecord<String, String>, Consumer.Control>> sourceSupplier,
Expand All @@ -62,7 +63,6 @@ final class AtMostOnceConsumerStream implements KafkaConsumerStream {
final ConnectionMonitor inboundMonitor,
final Graph<SinkShape<AcknowledgeableMessage>, NotUsed> inboundMappingSink,
final Graph<SinkShape<DittoRuntimeException>, ?> exceptionSink,
final KafkaConsumerMetricsRegistry kafkaConsumerMetricsRegistry,
final ConnectionId connectionId,
final String consumerId) {

Expand Down Expand Up @@ -95,7 +95,7 @@ final class AtMostOnceConsumerStream implements KafkaConsumerStream {
.toMat(Sink.ignore(), Consumer::createDrainingControl)
.run(materializer);

registerForMetricCollection(kafkaConsumerMetricsRegistry, connectionId, consumerId);
consumerMetrics = KafkaConsumerMetrics.newInstance(consumerControl, connectionId, consumerId);
}

@Override
Expand All @@ -108,6 +108,11 @@ public CompletionStage<Done> stop() {
return consumerControl.drainAndShutdown(materializer.executionContext());
}

@Override
public void reportMetrics() {
consumerMetrics.reportMetrics();
}

private Source<CompletableFuture<Done>, NotUsed> processTransformationResult(
final TransformationResult result) {

Expand Down Expand Up @@ -157,10 +162,4 @@ private static DittoRuntimeException extractDittoRuntimeException(final Transfor
return value.getDittoRuntimeException().orElseThrow(); // at this point, the DRE is present
}

private void registerForMetricCollection(final KafkaConsumerMetricsRegistry kafkaConsumerMetricsRegistry,
final ConnectionId connectionId, final String consumerId) {

kafkaConsumerMetricsRegistry.registerConsumer(connectionId, consumerControl, consumerId);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ public final class KafkaClientActor extends BaseClientActor {
private ActorRef kafkaPublisherActor;
private final List<ActorRef> kafkaConsumerActors;
private final KafkaConfig kafkaConfig;
private final KafkaConsumerMetricsRegistry kafkaConsumerMetricsRegistry;

private KafkaClientActor(final Connection connection,
@Nullable final ActorRef proxyActor,
Expand All @@ -78,7 +77,6 @@ private KafkaClientActor(final Connection connection,
propertiesFactory = PropertiesFactory.newInstance(connection, kafkaConfig, getClientId(connection.getId()));
this.publisherActorFactory = publisherActorFactory;
pendingStatusReportsFromStreams = new HashSet<>();
kafkaConsumerMetricsRegistry = KafkaConsumerMetricsRegistry.getInstance(kafkaConfig.getConsumerConfig().getMetricCollectingInterval());
}

@SuppressWarnings("unused") // used by `props` via reflection
Expand Down Expand Up @@ -240,11 +238,12 @@ private void startKafkaConsumer(final ConsumerData consumerData, final boolean d
final ConnectionThrottlingConfig throttlingConfig = consumerConfig.getThrottlingConfig();
final ExponentialBackOffConfig consumerRestartBackOffConfig = consumerConfig.getRestartBackOffConfig();
final KafkaConsumerStreamFactory streamFactory =
new KafkaConsumerStreamFactory(throttlingConfig, propertiesFactory, consumerData, dryRun, kafkaConsumerMetricsRegistry);
new KafkaConsumerStreamFactory(throttlingConfig, propertiesFactory, consumerData, dryRun);
final Props consumerActorProps =
KafkaConsumerActor.props(connection(), streamFactory,
consumerData, getInboundMappingSink(),
connectivityStatusResolver, consumerRestartBackOffConfig);
connectivityStatusResolver, consumerRestartBackOffConfig,
consumerConfig.getMetricCollectingInterval());
final ActorRef consumerActor =
startChildActorConflictFree(consumerData.getActorNamePrefix(), consumerActorProps);
kafkaConsumerActors.add(consumerActor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ private KafkaConsumerActor(final Connection connection,
final ConsumerData consumerData,
final Sink<Object, NotUsed> inboundMappingSink,
final ConnectivityStatusResolver connectivityStatusResolver,
final ExponentialBackOffConfig exponentialBackOffConfig) {
final ExponentialBackOffConfig exponentialBackOffConfig,
final Duration metricsReportingInterval) {

super(connection, consumerData.getAddress(), inboundMappingSink, consumerData.getSource(),
connectivityStatusResolver);
Expand Down Expand Up @@ -89,17 +90,19 @@ inboundAcknowledgedMonitor, getMessageMappingSink(),
return kafkaConsumerStream;
}, exponentialBackOffConfig);
}
timers().startTimerAtFixedRate(ReportMetrics.class, ReportMetrics.INSTANCE, metricsReportingInterval);
}

static Props props(final Connection connection,
final KafkaConsumerStreamFactory streamFactory,
final ConsumerData consumerData,
final Sink<Object, NotUsed> inboundMappingSink,
final ConnectivityStatusResolver connectivityStatusResolver,
final ExponentialBackOffConfig exponentialBackOffConfig) {
final ExponentialBackOffConfig exponentialBackOffConfig,
final Duration metricsReportingInterval) {

return Props.create(KafkaConsumerActor.class, connection, streamFactory, consumerData,
inboundMappingSink, connectivityStatusResolver, exponentialBackOffConfig);
inboundMappingSink, connectivityStatusResolver, exponentialBackOffConfig, metricsReportingInterval);
}

@Override
Expand All @@ -114,6 +117,7 @@ public Receive createReceive() {
.match(ResourceStatus.class, this::handleAddressStatus)
.match(RetrieveAddressStatus.class, ram -> getSender().tell(getCurrentSourceStatus(), getSelf()))
.match(GracefulStop.class, stop -> shutdown())
.match(ReportMetrics.class, reportMetrics -> reportMetrics())
.match(MessageRejectedException.class, this::restartStream)
.match(RestartableKafkaConsumerStream.class, this::setStream)
.matchAny(unhandled -> {
Expand All @@ -128,6 +132,10 @@ protected ThreadSafeDittoLoggingAdapter log() {
return log;
}

private void reportMetrics() {
kafkaStream.reportMetrics();
}

private void shutdown() {
if (kafkaStream != null) {
try {
Expand Down Expand Up @@ -210,4 +218,13 @@ private GracefulStop() {

}

static final class ReportMetrics {

static final ReportMetrics INSTANCE = new ReportMetrics();

private ReportMetrics() {
// intentionally empty
}
}

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,9 @@ interface KafkaConsumerStream {
*/
CompletionStage<Done> stop();

/**
* Triggers report of metrics to DittoMetrics.
*/
void reportMetrics();

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,16 @@ final class KafkaConsumerStreamFactory {
private final AtMostOnceKafkaConsumerSourceSupplier atMostOnceKafkaConsumerSourceSupplier;
private final AtLeastOnceKafkaConsumerSourceSupplier atLeastOnceKafkaConsumerSourceSupplier;
private final ConnectionThrottlingConfig throttlingConfig;
private final KafkaConsumerMetricsRegistry kafkaConsumerMetricsRegistry;

KafkaConsumerStreamFactory(final ConnectionThrottlingConfig throttlingConfig,
final PropertiesFactory propertiesFactory,
final ConsumerData consumerData,
final boolean dryRun,
final KafkaConsumerMetricsRegistry kafkaConsumerMetricsRegistry) {
final boolean dryRun) {

this.throttlingConfig = throttlingConfig;
this.consumerData = consumerData;
this.dryRun = dryRun;
this.propertiesFactory = propertiesFactory;
this.kafkaConsumerMetricsRegistry = kafkaConsumerMetricsRegistry;
atMostOnceKafkaConsumerSourceSupplier =
new AtMostOnceKafkaConsumerSourceSupplier(propertiesFactory, consumerData.getAddress(), dryRun);
atLeastOnceKafkaConsumerSourceSupplier =
Expand All @@ -74,16 +71,14 @@ final class KafkaConsumerStreamFactory {
KafkaConsumerStreamFactory(final AtMostOnceKafkaConsumerSourceSupplier atMostOnceKafkaConsumerSourceSupplier,
final AtLeastOnceKafkaConsumerSourceSupplier atLeastOnceKafkaConsumerSourceSupplier,
final ConsumerData consumerData,
final boolean dryRun,
final KafkaConsumerMetricsRegistry kafkaConsumerMetricsRegistry) {
final boolean dryRun) {

this.consumerData = consumerData;
this.dryRun = dryRun;
propertiesFactory = null;
throttlingConfig = ConnectionThrottlingConfig.of(ConfigFactory.empty());
this.atMostOnceKafkaConsumerSourceSupplier = atMostOnceKafkaConsumerSourceSupplier;
this.atLeastOnceKafkaConsumerSourceSupplier = atLeastOnceKafkaConsumerSourceSupplier;
this.kafkaConsumerMetricsRegistry = kafkaConsumerMetricsRegistry;
}

KafkaConsumerStream newAtMostOnceConsumerStream(
Expand All @@ -103,7 +98,6 @@ KafkaConsumerStream newAtMostOnceConsumerStream(
inboundMonitor,
messageMappingSink,
dreSink,
kafkaConsumerMetricsRegistry,
connectionId,
consumerId);
}
Expand All @@ -128,7 +122,6 @@ KafkaConsumerStream newAtLeastOnceConsumerStream(
ackMonitor,
messageMappingSink,
dreSink,
kafkaConsumerMetricsRegistry,
connectionId,
consumerId);
}
Expand Down

0 comments on commit ef6eb9e

Please sign in to comment.