Skip to content

Commit

Permalink
Merge pull request #1226 from bosch-io/feature/kafka-metrics
Browse files Browse the repository at this point in the history
Collect Apache Kafka consumer metrics
  • Loading branch information
Yannic92 committed Nov 15, 2021
2 parents 1c3e0cb + a2114a5 commit 0156a19
Show file tree
Hide file tree
Showing 24 changed files with 312 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@
*/
package org.eclipse.ditto.connectivity.service.config;

import java.time.Duration;
import java.util.Objects;

import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.base.service.config.supervision.DefaultExponentialBackOffConfig;
import org.eclipse.ditto.base.service.config.supervision.ExponentialBackOffConfig;
import org.eclipse.ditto.internal.utils.config.DittoConfigError;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
Expand All @@ -35,12 +37,18 @@ final class DefaultKafkaConsumerConfig implements KafkaConsumerConfig {
private final ConnectionThrottlingConfig throttlingConfig;
private final ExponentialBackOffConfig restartBackOffConfig;
private final Config alpakkaConfig;
private final Duration metricCollectingInterval;

private DefaultKafkaConsumerConfig(final Config kafkaConsumerScopedConfig) {
throttlingConfig = ConnectionThrottlingConfig.of(kafkaConsumerScopedConfig);
restartBackOffConfig =
DefaultExponentialBackOffConfig.of(getConfigOrEmpty(kafkaConsumerScopedConfig, RESTART_PATH));
alpakkaConfig = getConfigOrEmpty(kafkaConsumerScopedConfig, ALPAKKA_PATH);
metricCollectingInterval =
kafkaConsumerScopedConfig.getDuration(ConfigValue.METRIC_COLLECTING_INTERVAL.getConfigPath());
if (metricCollectingInterval.isNegative() || metricCollectingInterval.isZero()) {
throw new DittoConfigError("The Kafka consumer metric collecting interval has to be positive.");
}
}

/**
Expand Down Expand Up @@ -73,6 +81,11 @@ public Config getAlpakkaConfig() {
return alpakkaConfig;
}

@Override
public Duration getMetricCollectingInterval() {
return metricCollectingInterval;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand All @@ -84,12 +97,13 @@ public boolean equals(final Object o) {
final DefaultKafkaConsumerConfig that = (DefaultKafkaConsumerConfig) o;
return Objects.equals(throttlingConfig, that.throttlingConfig) &&
Objects.equals(restartBackOffConfig, that.restartBackOffConfig) &&
Objects.equals(alpakkaConfig, that.alpakkaConfig);
Objects.equals(alpakkaConfig, that.alpakkaConfig) &&
Objects.equals(metricCollectingInterval, that.metricCollectingInterval);
}

@Override
public int hashCode() {
return Objects.hash(throttlingConfig, restartBackOffConfig, alpakkaConfig);
return Objects.hash(throttlingConfig, restartBackOffConfig, alpakkaConfig, metricCollectingInterval);
}

@Override
Expand All @@ -98,6 +112,7 @@ public String toString() {
"throttlingConfig=" + throttlingConfig +
", restartBackOffConfig=" + restartBackOffConfig +
", alpakkaConfig=" + alpakkaConfig +
", metricCollectingInterval=" + metricCollectingInterval +
"]";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@
*/
package org.eclipse.ditto.connectivity.service.config;

import java.time.Duration;

import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.base.service.config.supervision.ExponentialBackOffConfig;
import org.eclipse.ditto.internal.utils.config.KnownConfigValue;

import com.typesafe.config.Config;

Expand Down Expand Up @@ -46,6 +49,13 @@ public interface KafkaConsumerConfig {
*/
Config getAlpakkaConfig();

/**
* Returns the interval in which metrics from the Apache Kafka client should be collected.
*
* @return the interval.
*/
Duration getMetricCollectingInterval();

/**
* Returns an instance of {@code KafkaConsumerConfig} based on the settings of the specified Config.
*
Expand All @@ -57,4 +67,35 @@ static KafkaConsumerConfig of(final Config config) {
return DefaultKafkaConsumerConfig.of(config);
}

/**
* An enumeration of the known config path expressions and their associated default values for
* {@code KafkaConsumerConfig}.
*/
enum ConfigValue implements KnownConfigValue {

/**
* The interval in which Apache Kafka client metrics should be collected.
*/
METRIC_COLLECTING_INTERVAL("metric-collecting-interval", Duration.ofSeconds(10L));

private final String path;
private final Object defaultValue;

ConfigValue(final String thePath, final Object theDefaultValue) {
path = thePath;
defaultValue = theDefaultValue;
}

@Override
public Object getDefaultValue() {
return defaultValue;
}

@Override
public String getConfigPath() {
return path;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,27 +32,36 @@
import akka.actor.Props;

/**
* The default implementation of {@link ClientActorPropsFactory}.
* The default implementation of {@link ClientActorPropsFactory}. Singleton which is created just once
* and otherwise returns the already created instance.
*/
@Immutable
public final class DefaultClientActorPropsFactory implements ClientActorPropsFactory {

private DefaultClientActorPropsFactory() {
}
@Nullable private static DefaultClientActorPropsFactory instance;

private DefaultClientActorPropsFactory() {}

/**
* Returns an instance of {@code DefaultClientActorPropsFactory}.
* Returns an instance of {@code DefaultClientActorPropsFactory}. Creates a new one if not already done.
*
* @return the factory instance.
*/
public static DefaultClientActorPropsFactory getInstance() {
return new DefaultClientActorPropsFactory();
if (null == instance) {
instance = new DefaultClientActorPropsFactory();
}
return instance;
}

@Override
public Props getActorPropsForType(final Connection connection, @Nullable final ActorRef proxyActor,
final ActorRef connectionActor, final ActorSystem actorSystem,
final DittoHeaders dittoHeaders, final Config connectivityConfigOverwrites) {
public Props getActorPropsForType(final Connection connection,
@Nullable final ActorRef proxyActor,
final ActorRef connectionActor,
final ActorSystem actorSystem,
final DittoHeaders dittoHeaders,
final Config connectivityConfigOverwrites) {

final ConnectionType connectionType = connection.getConnectionType();
final Props result;
switch (connectionType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.connectivity.api.ExternalMessage;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.connectivity.service.config.ConnectionThrottlingConfig;
import org.eclipse.ditto.connectivity.service.messaging.AcknowledgeableMessage;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.ConnectionMonitor;
Expand Down Expand Up @@ -52,6 +53,7 @@ final class AtLeastOnceConsumerStream implements KafkaConsumerStream {
private final Sink<CommittableTransformationResult, NotUsed> dreSink;
private final Sink<CommittableTransformationResult, NotUsed> unexpectedMessageSink;
private final Consumer.DrainingControl<Done> consumerControl;
private final KafkaConsumerMetrics consumerMetrics;

AtLeastOnceConsumerStream(
final AtLeastOnceKafkaConsumerSourceSupplier sourceSupplier,
Expand All @@ -63,22 +65,24 @@ final class AtLeastOnceConsumerStream implements KafkaConsumerStream {
final ConnectionMonitor inboundMonitor,
final ConnectionMonitor ackMonitor,
final Sink<AcknowledgeableMessage, NotUsed> inboundMappingSink,
final Sink<DittoRuntimeException, ?> exceptionSink) {
final Sink<DittoRuntimeException, ?> exceptionSink,
final ConnectionId connectionId,
final String consumerId) {

this.ackMonitor = ackMonitor;

// Pre materialize sinks with MergeHub to avoid multiple materialization per kafka record in processTransformationResult
this.externalMessageSink = MergeHub.of(KafkaAcknowledgableMessage.class)
externalMessageSink = MergeHub.of(KafkaAcknowledgableMessage.class)
.map(KafkaAcknowledgableMessage::getAcknowledgeableMessage)
.to(inboundMappingSink)
.run(materializer);

this.dreSink = MergeHub.of(CommittableTransformationResult.class)
dreSink = MergeHub.of(CommittableTransformationResult.class)
.map(AtLeastOnceConsumerStream::extractDittoRuntimeException)
.to(exceptionSink)
.run(materializer);

this.unexpectedMessageSink = MergeHub.of(CommittableTransformationResult.class)
unexpectedMessageSink = MergeHub.of(CommittableTransformationResult.class)
.to(Sink.foreach(transformationResult -> inboundMonitor.exception(
"Got unexpected transformation result <{0}>. This is an internal error. " +
"Please contact the service team", transformationResult)))
Expand All @@ -93,6 +97,8 @@ final class AtLeastOnceConsumerStream implements KafkaConsumerStream {
.mapAsync(throttlingConfig.getMaxInFlight(), x -> x)
.toMat(Committer.sink(committerSettings), Consumer::createDrainingControl)
.run(materializer);

consumerMetrics = KafkaConsumerMetrics.newInstance(consumerControl, connectionId, consumerId);
}

@Override
Expand All @@ -105,6 +111,11 @@ public CompletionStage<Done> stop() {
return consumerControl.drainAndShutdown(materializer.executionContext());
}

@Override
public void reportMetrics() {
consumerMetrics.reportMetrics();
}

private Source<CompletableFuture<CommittableOffset>, NotUsed> processTransformationResult(
final CommittableTransformationResult result) {

Expand All @@ -118,7 +129,7 @@ private Source<CompletableFuture<CommittableOffset>, NotUsed> processTransformat
if (isExternalMessage(result)) {
return Source.single(result)
.map(this::toAcknowledgeableMessage)
.alsoTo(this.externalMessageSink)
.alsoTo(externalMessageSink)
.map(KafkaAcknowledgableMessage::getAcknowledgementFuture);
}
/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.connectivity.api.ExternalMessage;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.connectivity.service.config.ConnectionThrottlingConfig;
import org.eclipse.ditto.connectivity.service.messaging.AcknowledgeableMessage;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.ConnectionMonitor;
Expand Down Expand Up @@ -48,6 +49,7 @@ final class AtMostOnceConsumerStream implements KafkaConsumerStream {
private final Sink<KafkaCompletableMessage, NotUsed> externalMessageSink;
private final Sink<TransformationResult, NotUsed> dreSink;
private final Sink<TransformationResult, NotUsed> unexpectedMessageSink;
private final KafkaConsumerMetrics consumerMetrics;

AtMostOnceConsumerStream(
final AtMostOnceKafkaConsumerSourceSupplier sourceSupplier,
Expand All @@ -57,17 +59,19 @@ final class AtMostOnceConsumerStream implements KafkaConsumerStream {
final Materializer materializer,
final ConnectionMonitor inboundMonitor,
final Sink<AcknowledgeableMessage, NotUsed> inboundMappingSink,
final Sink<DittoRuntimeException, ?> exceptionSink) {
final Sink<DittoRuntimeException, ?> exceptionSink,
final ConnectionId connectionId,
final String consumerId) {

this.materializer = materializer;

// Pre materialize sinks with MergeHub to avoid multiple materialization per kafka record in processTransformationResult
this.externalMessageSink = MergeHub.of(KafkaCompletableMessage.class)
externalMessageSink = MergeHub.of(KafkaCompletableMessage.class)
.map(KafkaCompletableMessage::getAcknowledgeableMessage)
.to(inboundMappingSink)
.run(materializer);

this.dreSink = MergeHub.of(TransformationResult.class)
dreSink = MergeHub.of(TransformationResult.class)
.map(AtMostOnceConsumerStream::extractDittoRuntimeException)
.to(exceptionSink)
.run(materializer);
Expand All @@ -87,6 +91,8 @@ final class AtMostOnceConsumerStream implements KafkaConsumerStream {
.mapAsync(throttlingConfig.getMaxInFlight(), x -> x)
.toMat(Sink.ignore(), Consumer::createDrainingControl)
.run(materializer);

consumerMetrics = KafkaConsumerMetrics.newInstance(consumerControl, connectionId, consumerId);
}

@Override
Expand All @@ -99,13 +105,18 @@ public CompletionStage<Done> stop() {
return consumerControl.drainAndShutdown(materializer.executionContext());
}

@Override
public void reportMetrics() {
consumerMetrics.reportMetrics();
}

private Source<CompletableFuture<Done>, NotUsed> processTransformationResult(
final TransformationResult result) {

if (isExternalMessage(result)) {
return Source.single(result)
.map(this::toAcknowledgeableMessage)
.alsoTo(this.externalMessageSink)
.map(AtMostOnceConsumerStream::toAcknowledgeableMessage)
.alsoTo(externalMessageSink)
.map(KafkaCompletableMessage::getAcknowledgementFuture);
}

Expand All @@ -122,7 +133,7 @@ private Source<CompletableFuture<Done>, NotUsed> processTransformationResult(
.map(unexpected -> offsetFuture);
}

private KafkaCompletableMessage toAcknowledgeableMessage(final TransformationResult value) {
private static KafkaCompletableMessage toAcknowledgeableMessage(final TransformationResult value) {
final ExternalMessage externalMessage =
value.getExternalMessage().orElseThrow(); // at this point, the ExternalMessage is present
return new KafkaCompletableMessage(externalMessage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ public Props props(final Connection connection,
final boolean dryRun,
final String clientId,
final ConnectivityStatusResolver connectivityStatusResolver,
final ConnectivityConfig connectivityConfig) {

final ConnectivityConfig connectivityConfig) {
return KafkaPublisherActor.props(connection, sendProducerFactory, dryRun, clientId,
connectivityStatusResolver, connectivityConfig);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ final class DefaultSendProducerFactory implements SendProducerFactory {

private DefaultSendProducerFactory(final ProducerSettings<String, String> producerSettings,
final ActorSystem actorSystem) {

this.producerSettings = producerSettings;
this.actorSystem = actorSystem;
}
Expand All @@ -39,6 +40,7 @@ private DefaultSendProducerFactory(final ProducerSettings<String, String> produc
*/
static DefaultSendProducerFactory getInstance(final ProducerSettings<String, String> producerSettings,
final ActorSystem actorSystem) {

return new DefaultSendProducerFactory(producerSettings, actorSystem);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,11 +221,11 @@ private void startKafkaConsumers(final boolean dryRun, final ConnectionId connec

// start consumer actors
connection().getSources().stream()
.flatMap(this::consumerDataFromSource)
.forEach(consumerData -> this.startKafkaConsumer(consumerData, dryRun));
.flatMap(KafkaClientActor::consumerDataFromSource)
.forEach(consumerData -> startKafkaConsumer(consumerData, dryRun));
}

private Stream<ConsumerData> consumerDataFromSource(final Source source) {
private static Stream<ConsumerData> consumerDataFromSource(final Source source) {
return source.getAddresses().stream()
.flatMap(sourceAddress ->
IntStream.range(0, source.getConsumerCount())
Expand All @@ -239,8 +239,7 @@ private void startKafkaConsumer(final ConsumerData consumerData, final boolean d
final KafkaConsumerStreamFactory streamFactory =
new KafkaConsumerStreamFactory(throttlingConfig, propertiesFactory, consumerData, dryRun);
final Props consumerActorProps =
KafkaConsumerActor.props(connection(), streamFactory,
consumerData.getAddress(), consumerData.getSource(), getInboundMappingSink(),
KafkaConsumerActor.props(connection(), streamFactory, consumerData, getInboundMappingSink(),
connectivityStatusResolver, connectivityConfig());
final ActorRef consumerActor =
startChildActorConflictFree(consumerData.getActorNamePrefix(), consumerActorProps);
Expand Down Expand Up @@ -307,4 +306,5 @@ private void completeTestConnectionFuture(final Status.Status testResult) {
getSelf().tell(new Status.Failure(exception), getSelf());
}
}

}
Loading

0 comments on commit 0156a19

Please sign in to comment.