Skip to content

Commit

Permalink
Add Kamon orchestration for Kafka consumer metrics.
Browse files Browse the repository at this point in the history
Adds Kamon gauges for kafka consumer metrics. The kamon gauges report the metric values as double. Thus also added double gauge API to Gauge. Remembers new consumers for metric reporting init, since the consumerControl is not ready from the get-go and produces NullPointerException if the metrics are accessed directly after instantiation.

Signed-off-by: David Schwilk <david.schwilk@bosch.io>
  • Loading branch information
DerSchwilk committed Nov 8, 2021
1 parent 717d0fb commit 20c4f7c
Show file tree
Hide file tree
Showing 8 changed files with 171 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.eclipse.ditto.base.service.config.supervision.DefaultExponentialBackOffConfig;
import org.eclipse.ditto.base.service.config.supervision.ExponentialBackOffConfig;
import org.eclipse.ditto.internal.utils.config.DittoConfigError;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
Expand All @@ -45,6 +46,9 @@ private DefaultKafkaConsumerConfig(final Config kafkaConsumerScopedConfig) {
alpakkaConfig = getConfigOrEmpty(kafkaConsumerScopedConfig, ALPAKKA_PATH);
metricCollectingInterval =
kafkaConsumerScopedConfig.getDuration(ConfigValue.METRIC_COLLECTING_INTERVAL.getConfigPath());
if (metricCollectingInterval.isNegative() || metricCollectingInterval.isZero()) {
throw new DittoConfigError("The Kafka consumer metric collecting interval has to be positive.");
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public Props props(final Connection connection,
final boolean dryRun,
final String clientId,
final ConnectivityStatusResolver connectivityStatusResolver) {

return KafkaPublisherActor.props(connection, config, sendProducerFactory, dryRun, clientId,
connectivityStatusResolver);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ final class DefaultSendProducerFactory implements SendProducerFactory {

private DefaultSendProducerFactory(final ProducerSettings<String, String> producerSettings,
final ActorSystem actorSystem) {

this.producerSettings = producerSettings;
this.actorSystem = actorSystem;
}
Expand All @@ -39,6 +40,7 @@ private DefaultSendProducerFactory(final ProducerSettings<String, String> produc
*/
static DefaultSendProducerFactory getInstance(final ProducerSettings<String, String> producerSettings,
final ActorSystem actorSystem) {

return new DefaultSendProducerFactory(producerSettings, actorSystem);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.messaging.kafka;

import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;

import java.util.Map;
import java.util.concurrent.CompletionStage;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.internal.utils.metrics.DittoMetrics;
import org.eclipse.ditto.internal.utils.metrics.instruments.gauge.Gauge;

import akka.kafka.javadsl.Consumer;

/**
* Gets the Apache Kafka Metrics from a ConsumerControl and creates/sets Kamon gauges accordingly.
*/
public final class KafkaConsumerMetrics {

private final Consumer.Control consumerControl;
private Map<MetricName, Gauge> gauges;

private KafkaConsumerMetrics(final Consumer.Control consumerControl, final CharSequence connectionId,
final String streamName) {

this.consumerControl = consumerControl;
createGauges(consumerControl, connectionId, streamName).thenAccept(gaugeMap -> {
gauges = gaugeMap;
reportMetrics();
});
}

/**
* Returns a new instance of {@code KafkaConsumerMetrics}.
*
* @param consumerControl the consumer control from which to retrieve the metrics.
* @param connectionId the {@code connectionId} for which the metrics are applicable.
* @param streamName the name of the stream for which the metrics are applicable.
* @return the new instance.
* @throws java.lang.NullPointerException if any argument is {@code null}.
*/
static KafkaConsumerMetrics newInstance(final Consumer.Control consumerControl, final CharSequence connectionId,
final String streamName) {

checkNotNull(consumerControl, "consumerControl");
checkNotNull(connectionId, "connectionId");
checkNotNull(streamName, "streamName");

return new KafkaConsumerMetrics(consumerControl, connectionId, streamName);
}

/**
* Report metrics via Kamon gauges.
*/
void reportMetrics() {
consumerControl.getMetrics()
.thenAccept(metrics -> metrics.values()
.stream()
.filter(metricContainsValue())
.forEach(metric -> gauges.get(metric.metricName()).set((Double) metric.metricValue())));
}

private static Predicate<Metric> metricContainsValue() {
return metric -> !(metric.metricValue() instanceof String);
}

private static CompletionStage<Map<MetricName, Gauge>> createGauges(final Consumer.Control consumerControl,
final CharSequence connectionId,
final String streamName) {

return consumerControl.getMetrics()
.thenApply(metrics -> metrics.values()
.stream()
.collect(Collectors.toMap(Metric::metricName,
metric -> DittoMetrics.gauge(metric.metricName().name())
.tag(ConnectionId.class.getSimpleName(), connectionId.toString())
.tag("streamName", streamName))));

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,20 @@
*/
package org.eclipse.ditto.connectivity.service.messaging.kafka;

import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;

import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import javax.annotation.Nullable;

import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLogger;
Expand All @@ -38,12 +40,14 @@ public final class KafkaConsumerMetricsRegistry {

@Nullable private static KafkaConsumerMetricsRegistry instance;

private final Map<CacheKey, Consumer.DrainingControl<Done>> consumerControlMap;
private final Map<CacheKey, KafkaConsumerMetrics> metricsMap;
private static final ThreadSafeDittoLogger LOGGER =
DittoLoggerFactory.getThreadSafeLogger(KafkaConsumerMetricsRegistry.class);

private final AtomicReference<Set<NewConsumer>> rememberForInit = new AtomicReference<>(new HashSet<>());

private KafkaConsumerMetricsRegistry(final Duration metricCollectingInterval) {
consumerControlMap = new HashMap<>();
metricsMap = new HashMap<>();
scheduleMetricReporting(metricCollectingInterval);
}

Expand All @@ -52,8 +56,10 @@ private KafkaConsumerMetricsRegistry(final Duration metricCollectingInterval) {
*
* @param metricCollectingInterval the interval in which to collect the metrics.
* @return the instance.
* @throws NullPointerException if {@code metricCollectingInterval} is {@code null}.
*/
public static KafkaConsumerMetricsRegistry getInstance(final Duration metricCollectingInterval) {
checkNotNull(metricCollectingInterval, "metricCollectingInterval");
if (null == instance) {
instance = new KafkaConsumerMetricsRegistry(metricCollectingInterval);
}
Expand All @@ -70,7 +76,12 @@ public static KafkaConsumerMetricsRegistry getInstance(final Duration metricColl
void registerConsumer(final ConnectionId connectionId, final Consumer.DrainingControl<Done> consumerControl,
final String streamName) {

consumerControlMap.put(new CacheKey(connectionId, streamName), consumerControl);
LOGGER.debug("Registering new consumer for metric reporting: <{}:{}>", connectionId, streamName);
// No way to check whether consumerControl is ready, thus waiting for interval till next metric reporting.
rememberForInit.getAndUpdate(set -> {
set.add(new NewConsumer(connectionId, streamName, consumerControl));
return set;
});
}

/**
Expand All @@ -80,24 +91,33 @@ void registerConsumer(final ConnectionId connectionId, final Consumer.DrainingCo
* @param streamName the name of the stream to which the metrics apply.
*/
void deregisterConsumer(final ConnectionId connectionId, final String streamName) {
consumerControlMap.remove(new CacheKey(connectionId, streamName));
LOGGER.debug("De-registering consumer for metric reporting: <{}:{}>", connectionId, streamName);
metricsMap.remove(new CacheKey(connectionId, streamName));
}

private void scheduleMetricReporting(final Duration metricCollectingInterval) {
new ScheduledThreadPoolExecutor(1).scheduleWithFixedDelay(this::reportMetrics,
LOGGER.info("Scheduling Kafka metric reporting in interval of: <{}>", metricCollectingInterval);
Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(this::reportMetrics,
metricCollectingInterval.getSeconds(), metricCollectingInterval.getSeconds(), TimeUnit.SECONDS);
}

private void reportMetrics() {
consumerControlMap.forEach((cacheKey, consumerControl) -> consumerControl.getMetrics()
.thenAccept(KafkaConsumerMetricsRegistry::metricsToString));
LOGGER.debug("Reporting metrics for Kafka consumer streams. <{}> consumer streams registered",
metricsMap.size());

createNewKafkaConsumerMetrics();
metricsMap.forEach((cacheKey, kafkaConsumerMetrics) -> kafkaConsumerMetrics.reportMetrics());
}

private static void metricsToString(final Map<MetricName, Metric> metricMap) {
final AtomicReference<String> metrics = new AtomicReference<>("");
metricMap.forEach((metricName, metric) -> metrics.getAndSet(metrics.get()
.concat(metricName.name() + " : " + metric.metricValue() + "\n")));
LOGGER.info(metrics.get());
private void createNewKafkaConsumerMetrics() {
rememberForInit.getAndUpdate(
set -> {
set.forEach(newConsumer -> metricsMap.put(
new CacheKey(newConsumer.connectionId, newConsumer.streamName),
KafkaConsumerMetrics.newInstance(newConsumer.consumerControl, newConsumer.connectionId,
newConsumer.streamName)));
return new HashSet<>();
});
}

private static final class CacheKey {
Expand Down Expand Up @@ -130,5 +150,23 @@ public int hashCode() {
public String toString() {
return String.format("%s:%s", connectionId, streamName);
}

}

private static final class NewConsumer {

private final ConnectionId connectionId;
private final String streamName;
private final Consumer.Control consumerControl;

private NewConsumer(final ConnectionId connectionId, final String streamName,
final Consumer.Control consumerControl) {

this.connectionId = connectionId;
this.streamName = streamName;
this.consumerControl = consumerControl;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ private KafkaPublisherActor(final Connection connection,
final boolean dryRun,
final String clientId,
final ConnectivityStatusResolver connectivityStatusResolver) {

super(connection, clientId, connectivityStatusResolver);
this.dryRun = dryRun;
final Materializer materializer = Materializer.createMaterializer(this::getContext);
Expand Down Expand Up @@ -364,6 +365,7 @@ private void handleSendResult(

private CompletableFuture<RecordMetadata> publish(final KafkaPublishTarget publishTarget,
final ExternalMessage externalMessage) {

final CompletableFuture<RecordMetadata> resultFuture = new CompletableFuture<>();
final ProducerRecord<String, String> producerRecord = getProducerRecord(publishTarget, externalMessage);
final ProducerMessage.Envelope<String, String, CompletableFuture<RecordMetadata>> envelope =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ default Gauge self() {
*/
void set(Long value);

/**
* Sets the value of the gauge to the given value.
*
* @param value The value the gauge should be set to.
*/
void set(Double value);

/**
* Gets the current value of the gauge.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
* Kamon based implementation of {@link Gauge}.
*/
@Immutable
public class KamonGauge implements Gauge {
public final class KamonGauge implements Gauge {

private static final Logger LOGGER = LoggerFactory.getLogger(KamonGauge.class);

Expand Down Expand Up @@ -62,6 +62,11 @@ public void set(final Long value) {
getKamonInternalGauge().update(value);
}

@Override
public void set(final Double value) {
getKamonInternalGauge().update(value);
}

@Override
public Long get() {
final kamon.metric.Gauge kamonInternalGauge = getKamonInternalGauge();
Expand Down

0 comments on commit 20c4f7c

Please sign in to comment.