Skip to content

Commit

Permalink
Add KafkaConsumerMetricRegistry for collecting kafka consumer metrics
Browse files Browse the repository at this point in the history
Implements KafkaConsumerMetricsRegistry as singleton for collecting the Apache Kafka consumer metrics.
Some minor code styling refactorings.
Adjust ClientActorPropsFactory to be a singleton, since it doesn't have any state.

Signed-off-by: David Schwilk <david.schwilk@bosch.io>
  • Loading branch information
DerSchwilk committed Nov 4, 2021
1 parent de0c7bf commit 717d0fb
Show file tree
Hide file tree
Showing 20 changed files with 337 additions and 58 deletions.
6 changes: 6 additions & 0 deletions connectivity/service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,12 @@ jmh-generator-annprocess). jmh-generator-annprocess overwrites the whole META-IN
<artifactId>jackson-annotations</artifactId>
</dependency>

<dependency>
<groupId>io.kamon</groupId>
<artifactId>kamon-kafka_2.13</artifactId>
<version>2.3.1</version>
</dependency>

<!-- ### Testing ### -->
<dependency>
<groupId>org.openjdk.jmh</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/
package org.eclipse.ditto.connectivity.service.config;

import java.time.Duration;
import java.util.Objects;

import javax.annotation.concurrent.Immutable;
Expand All @@ -35,12 +36,15 @@ final class DefaultKafkaConsumerConfig implements KafkaConsumerConfig {
private final ConnectionThrottlingConfig throttlingConfig;
private final ExponentialBackOffConfig restartBackOffConfig;
private final Config alpakkaConfig;
private final Duration metricCollectingInterval;

private DefaultKafkaConsumerConfig(final Config kafkaConsumerScopedConfig) {
throttlingConfig = ConnectionThrottlingConfig.of(kafkaConsumerScopedConfig);
restartBackOffConfig =
DefaultExponentialBackOffConfig.of(getConfigOrEmpty(kafkaConsumerScopedConfig, RESTART_PATH));
alpakkaConfig = getConfigOrEmpty(kafkaConsumerScopedConfig, ALPAKKA_PATH);
metricCollectingInterval =
kafkaConsumerScopedConfig.getDuration(ConfigValue.METRIC_COLLECTING_INTERVAL.getConfigPath());
}

/**
Expand Down Expand Up @@ -73,6 +77,11 @@ public Config getAlpakkaConfig() {
return alpakkaConfig;
}

@Override
public Duration getMetricCollectingInterval() {
return metricCollectingInterval;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand All @@ -84,12 +93,13 @@ public boolean equals(final Object o) {
final DefaultKafkaConsumerConfig that = (DefaultKafkaConsumerConfig) o;
return Objects.equals(throttlingConfig, that.throttlingConfig) &&
Objects.equals(restartBackOffConfig, that.restartBackOffConfig) &&
Objects.equals(alpakkaConfig, that.alpakkaConfig);
Objects.equals(alpakkaConfig, that.alpakkaConfig) &&
Objects.equals(metricCollectingInterval, that.metricCollectingInterval);
}

@Override
public int hashCode() {
return Objects.hash(throttlingConfig, restartBackOffConfig, alpakkaConfig);
return Objects.hash(throttlingConfig, restartBackOffConfig, alpakkaConfig, metricCollectingInterval);
}

@Override
Expand All @@ -98,6 +108,7 @@ public String toString() {
"throttlingConfig=" + throttlingConfig +
", restartBackOffConfig=" + restartBackOffConfig +
", alpakkaConfig=" + alpakkaConfig +
", metricCollectingInterval=" + metricCollectingInterval +
"]";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@
*/
package org.eclipse.ditto.connectivity.service.config;

import java.time.Duration;

import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.base.service.config.supervision.ExponentialBackOffConfig;
import org.eclipse.ditto.internal.utils.config.KnownConfigValue;

import com.typesafe.config.Config;

Expand Down Expand Up @@ -46,6 +49,13 @@ public interface KafkaConsumerConfig {
*/
Config getAlpakkaConfig();

/**
* Returns the interval in which metrics from the Apache Kafka client should be collected.
*
* @return the interval.
*/
Duration getMetricCollectingInterval();

/**
* Returns an instance of {@code KafkaConsumerConfig} based on the settings of the specified Config.
*
Expand All @@ -57,4 +67,35 @@ static KafkaConsumerConfig of(final Config config) {
return DefaultKafkaConsumerConfig.of(config);
}

/**
* An enumeration of the known config path expressions and their associated default values for
* {@code KafkaConsumerConfig}.
*/
enum ConfigValue implements KnownConfigValue {

/**
* The interval in which Apache Kafka client metrics should be collected.
*/
METRIC_COLLECTING_INTERVAL("metric-collecting-interval", Duration.ofSeconds(10L));

private final String path;
private final Object defaultValue;

ConfigValue(final String thePath, final Object theDefaultValue) {
path = thePath;
defaultValue = theDefaultValue;
}

@Override
public Object getDefaultValue() {
return defaultValue;
}

@Override
public String getConfigPath() {
return path;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,27 +32,38 @@
import akka.actor.Props;

/**
* The default implementation of {@link ClientActorPropsFactory}.
* The default implementation of {@link ClientActorPropsFactory}. Singleton which is created just once
* and otherwise returns the already created instance.
*/
@Immutable
public final class DefaultClientActorPropsFactory implements ClientActorPropsFactory {

private DefaultClientActorPropsFactory() {
}
@Nullable private static DefaultClientActorPropsFactory instance;



private DefaultClientActorPropsFactory() {}

/**
* Returns an instance of {@code DefaultClientActorPropsFactory}.
* Returns an instance of {@code DefaultClientActorPropsFactory}. Creates a new one if not already done.
*
* @return the factory instance.
*/
public static DefaultClientActorPropsFactory getInstance() {
return new DefaultClientActorPropsFactory();
if (null == instance) {
instance = new DefaultClientActorPropsFactory();
}
return instance;
}

@Override
public Props getActorPropsForType(final Connection connection, @Nullable final ActorRef proxyActor,
final ActorRef connectionActor, final ActorSystem actorSystem,
final DittoHeaders dittoHeaders, final Config connectivityConfigOverwrites) {
public Props getActorPropsForType(final Connection connection,
@Nullable final ActorRef proxyActor,
final ActorRef connectionActor,
final ActorSystem actorSystem,
final DittoHeaders dittoHeaders,
final Config connectivityConfigOverwrites) {

final ConnectionType connectionType = connection.getConnectionType();
final Props result;
switch (connectionType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiConsumer;
import java.util.function.Supplier;

import javax.annotation.concurrent.Immutable;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.connectivity.api.ExternalMessage;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.connectivity.service.config.ConnectionThrottlingConfig;
import org.eclipse.ditto.connectivity.service.messaging.AcknowledgeableMessage;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.ConnectionMonitor;
Expand All @@ -30,10 +32,13 @@
import akka.Done;
import akka.NotUsed;
import akka.kafka.CommitterSettings;
import akka.kafka.ConsumerMessage;
import akka.kafka.ConsumerMessage.CommittableOffset;
import akka.kafka.javadsl.Committer;
import akka.kafka.javadsl.Consumer;
import akka.stream.Graph;
import akka.stream.Materializer;
import akka.stream.SinkShape;
import akka.stream.javadsl.MergeHub;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
Expand All @@ -52,33 +57,37 @@ final class AtLeastOnceConsumerStream implements KafkaConsumerStream {
private final Sink<CommittableTransformationResult, NotUsed> dreSink;
private final Sink<CommittableTransformationResult, NotUsed> unexpectedMessageSink;
private final Consumer.DrainingControl<Done> consumerControl;
private final KafkaConsumerMetricsRegistry kafkaConsumerMetricsRegistry;
private final ConnectionId connectionId;

AtLeastOnceConsumerStream(
final AtLeastOnceKafkaConsumerSourceSupplier sourceSupplier,
final Supplier<Source<ConsumerMessage.CommittableMessage<String, String>, Consumer.Control>> sourceSupplier,
final CommitterSettings committerSettings,
final ConnectionThrottlingConfig throttlingConfig,
final KafkaMessageTransformer kafkaMessageTransformer,
final boolean dryRun,
final Materializer materializer,
final ConnectionMonitor inboundMonitor,
final ConnectionMonitor ackMonitor,
final Sink<AcknowledgeableMessage, NotUsed> inboundMappingSink,
final Sink<DittoRuntimeException, ?> exceptionSink) {
final Graph<SinkShape<AcknowledgeableMessage>, NotUsed> inboundMappingSink,
final Graph<SinkShape<DittoRuntimeException>, ?> exceptionSink,
final KafkaConsumerMetricsRegistry kafkaConsumerMetricsRegistry,
final ConnectionId connectionId) {

this.ackMonitor = ackMonitor;

// Pre materialize sinks with MergeHub to avoid multiple materialization per kafka record in processTransformationResult
this.externalMessageSink = MergeHub.of(KafkaAcknowledgableMessage.class)
externalMessageSink = MergeHub.of(KafkaAcknowledgableMessage.class)
.map(KafkaAcknowledgableMessage::getAcknowledgeableMessage)
.to(inboundMappingSink)
.run(materializer);

this.dreSink = MergeHub.of(CommittableTransformationResult.class)
dreSink = MergeHub.of(CommittableTransformationResult.class)
.map(AtLeastOnceConsumerStream::extractDittoRuntimeException)
.to(exceptionSink)
.run(materializer);

this.unexpectedMessageSink = MergeHub.of(CommittableTransformationResult.class)
unexpectedMessageSink = MergeHub.of(CommittableTransformationResult.class)
.to(Sink.foreach(transformationResult -> inboundMonitor.exception(
"Got unexpected transformation result <{0}>. This is an internal error. " +
"Please contact the service team.", transformationResult)))
Expand All @@ -93,6 +102,10 @@ final class AtLeastOnceConsumerStream implements KafkaConsumerStream {
.mapAsync(throttlingConfig.getMaxInFlight(), x -> x)
.toMat(Committer.sink(committerSettings), Consumer::createDrainingControl)
.run(materializer);

this.kafkaConsumerMetricsRegistry = kafkaConsumerMetricsRegistry;
this.connectionId = connectionId;
registerForMetricCollection();
}

@Override
Expand All @@ -102,6 +115,7 @@ public CompletionStage<Done> whenComplete(final BiConsumer<? super Done, ? super

@Override
public CompletionStage<Done> stop() {
kafkaConsumerMetricsRegistry.deregisterConsumer(connectionId, getClass().getSimpleName());
return consumerControl.drainAndShutdown(materializer.executionContext());
}

Expand All @@ -118,7 +132,7 @@ private Source<CompletableFuture<CommittableOffset>, NotUsed> processTransformat
if (isExternalMessage(result)) {
return Source.single(result)
.map(this::toAcknowledgeableMessage)
.alsoTo(this.externalMessageSink)
.alsoTo(externalMessageSink)
.map(KafkaAcknowledgableMessage::getAcknowledgementFuture);
}
/*
Expand Down Expand Up @@ -169,4 +183,8 @@ private static DittoRuntimeException extractDittoRuntimeException(final Committa
.orElseThrow(); // at this point, the DRE is present
}

private void registerForMetricCollection() {
kafkaConsumerMetricsRegistry.registerConsumer(connectionId, consumerControl, getClass().getSimpleName());
}

}

0 comments on commit 717d0fb

Please sign in to comment.