From ef6eb9e1b711d5be2fea49193bc2e18b1e2c694b Mon Sep 17 00:00:00 2001 From: Yannic Klem Date: Fri, 12 Nov 2021 09:11:45 +0100 Subject: [PATCH] Remove ConsumerMetricsRegistry and schedule reporting in consumer actor instead * This avoids manual thread handling Signed-off-by: Yannic Klem --- .../kafka/AtLeastOnceConsumerStream.java | 15 +- .../kafka/AtMostOnceConsumerStream.java | 15 +- .../messaging/kafka/KafkaClientActor.java | 7 +- .../messaging/kafka/KafkaConsumerActor.java | 23 ++- .../kafka/KafkaConsumerMetricsRegistry.java | 137 ------------------ .../messaging/kafka/KafkaConsumerStream.java | 5 + .../kafka/KafkaConsumerStreamFactory.java | 11 +- .../kafka/RestartableKafkaConsumerStream.java | 5 + .../kafka/AtLeastOnceConsumerStreamTest.java | 7 +- .../kafka/AtMostOnceConsumerStreamTest.java | 4 +- .../kafka/KafkaConsumerActorTest.java | 5 +- 11 files changed, 56 insertions(+), 178 deletions(-) delete mode 100644 connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaConsumerMetricsRegistry.java diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/AtLeastOnceConsumerStream.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/AtLeastOnceConsumerStream.java index 61e65186ba..815c30c5a2 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/AtLeastOnceConsumerStream.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/AtLeastOnceConsumerStream.java @@ -57,6 +57,7 @@ final class AtLeastOnceConsumerStream implements KafkaConsumerStream { private final Sink dreSink; private final Sink unexpectedMessageSink; private final Consumer.DrainingControl consumerControl; + private final KafkaConsumerMetrics consumerMetrics; AtLeastOnceConsumerStream( final Supplier, Consumer.Control>> sourceSupplier, @@ -69,7 +70,6 @@ final class AtLeastOnceConsumerStream implements KafkaConsumerStream { final ConnectionMonitor ackMonitor, final Graph, NotUsed> inboundMappingSink, final Graph, ?> exceptionSink, - final KafkaConsumerMetricsRegistry kafkaConsumerMetricsRegistry, final ConnectionId connectionId, final String consumerId) { @@ -102,7 +102,7 @@ final class AtLeastOnceConsumerStream implements KafkaConsumerStream { .toMat(Committer.sink(committerSettings), Consumer::createDrainingControl) .run(materializer); - registerForMetricCollection(kafkaConsumerMetricsRegistry, connectionId, consumerId); + consumerMetrics = KafkaConsumerMetrics.newInstance(consumerControl, connectionId, consumerId); } @Override @@ -115,6 +115,11 @@ public CompletionStage stop() { return consumerControl.drainAndShutdown(materializer.executionContext()); } + @Override + public void reportMetrics() { + consumerMetrics.reportMetrics(); + } + private Source, NotUsed> processTransformationResult( final CommittableTransformationResult result) { @@ -179,10 +184,4 @@ private static DittoRuntimeException extractDittoRuntimeException(final Committa .orElseThrow(); // at this point, the DRE is present } - private void registerForMetricCollection(final KafkaConsumerMetricsRegistry kafkaConsumerMetricsRegistry, - final ConnectionId connectionId, final String consumerId) { - - kafkaConsumerMetricsRegistry.registerConsumer(connectionId, consumerControl, consumerId); - } - } diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/AtMostOnceConsumerStream.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/AtMostOnceConsumerStream.java index 16828cd80f..eec32d9d7e 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/AtMostOnceConsumerStream.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/AtMostOnceConsumerStream.java @@ -52,6 +52,7 @@ final class AtMostOnceConsumerStream implements KafkaConsumerStream { private final Sink externalMessageSink; private final Sink dreSink; private final Sink unexpectedMessageSink; + private final KafkaConsumerMetrics consumerMetrics; AtMostOnceConsumerStream( final Supplier, Consumer.Control>> sourceSupplier, @@ -62,7 +63,6 @@ final class AtMostOnceConsumerStream implements KafkaConsumerStream { final ConnectionMonitor inboundMonitor, final Graph, NotUsed> inboundMappingSink, final Graph, ?> exceptionSink, - final KafkaConsumerMetricsRegistry kafkaConsumerMetricsRegistry, final ConnectionId connectionId, final String consumerId) { @@ -95,7 +95,7 @@ final class AtMostOnceConsumerStream implements KafkaConsumerStream { .toMat(Sink.ignore(), Consumer::createDrainingControl) .run(materializer); - registerForMetricCollection(kafkaConsumerMetricsRegistry, connectionId, consumerId); + consumerMetrics = KafkaConsumerMetrics.newInstance(consumerControl, connectionId, consumerId); } @Override @@ -108,6 +108,11 @@ public CompletionStage stop() { return consumerControl.drainAndShutdown(materializer.executionContext()); } + @Override + public void reportMetrics() { + consumerMetrics.reportMetrics(); + } + private Source, NotUsed> processTransformationResult( final TransformationResult result) { @@ -157,10 +162,4 @@ private static DittoRuntimeException extractDittoRuntimeException(final Transfor return value.getDittoRuntimeException().orElseThrow(); // at this point, the DRE is present } - private void registerForMetricCollection(final KafkaConsumerMetricsRegistry kafkaConsumerMetricsRegistry, - final ConnectionId connectionId, final String consumerId) { - - kafkaConsumerMetricsRegistry.registerConsumer(connectionId, consumerControl, consumerId); - } - } diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaClientActor.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaClientActor.java index 72e39a5f15..2b200220fe 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaClientActor.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaClientActor.java @@ -63,7 +63,6 @@ public final class KafkaClientActor extends BaseClientActor { private ActorRef kafkaPublisherActor; private final List kafkaConsumerActors; private final KafkaConfig kafkaConfig; - private final KafkaConsumerMetricsRegistry kafkaConsumerMetricsRegistry; private KafkaClientActor(final Connection connection, @Nullable final ActorRef proxyActor, @@ -78,7 +77,6 @@ private KafkaClientActor(final Connection connection, propertiesFactory = PropertiesFactory.newInstance(connection, kafkaConfig, getClientId(connection.getId())); this.publisherActorFactory = publisherActorFactory; pendingStatusReportsFromStreams = new HashSet<>(); - kafkaConsumerMetricsRegistry = KafkaConsumerMetricsRegistry.getInstance(kafkaConfig.getConsumerConfig().getMetricCollectingInterval()); } @SuppressWarnings("unused") // used by `props` via reflection @@ -240,11 +238,12 @@ private void startKafkaConsumer(final ConsumerData consumerData, final boolean d final ConnectionThrottlingConfig throttlingConfig = consumerConfig.getThrottlingConfig(); final ExponentialBackOffConfig consumerRestartBackOffConfig = consumerConfig.getRestartBackOffConfig(); final KafkaConsumerStreamFactory streamFactory = - new KafkaConsumerStreamFactory(throttlingConfig, propertiesFactory, consumerData, dryRun, kafkaConsumerMetricsRegistry); + new KafkaConsumerStreamFactory(throttlingConfig, propertiesFactory, consumerData, dryRun); final Props consumerActorProps = KafkaConsumerActor.props(connection(), streamFactory, consumerData, getInboundMappingSink(), - connectivityStatusResolver, consumerRestartBackOffConfig); + connectivityStatusResolver, consumerRestartBackOffConfig, + consumerConfig.getMetricCollectingInterval()); final ActorRef consumerActor = startChildActorConflictFree(consumerData.getActorNamePrefix(), consumerActorProps); kafkaConsumerActors.add(consumerActor); diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaConsumerActor.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaConsumerActor.java index c468da824c..8aa64fd04c 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaConsumerActor.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaConsumerActor.java @@ -58,7 +58,8 @@ private KafkaConsumerActor(final Connection connection, final ConsumerData consumerData, final Sink inboundMappingSink, final ConnectivityStatusResolver connectivityStatusResolver, - final ExponentialBackOffConfig exponentialBackOffConfig) { + final ExponentialBackOffConfig exponentialBackOffConfig, + final Duration metricsReportingInterval) { super(connection, consumerData.getAddress(), inboundMappingSink, consumerData.getSource(), connectivityStatusResolver); @@ -89,6 +90,7 @@ inboundAcknowledgedMonitor, getMessageMappingSink(), return kafkaConsumerStream; }, exponentialBackOffConfig); } + timers().startTimerAtFixedRate(ReportMetrics.class, ReportMetrics.INSTANCE, metricsReportingInterval); } static Props props(final Connection connection, @@ -96,10 +98,11 @@ static Props props(final Connection connection, final ConsumerData consumerData, final Sink inboundMappingSink, final ConnectivityStatusResolver connectivityStatusResolver, - final ExponentialBackOffConfig exponentialBackOffConfig) { + final ExponentialBackOffConfig exponentialBackOffConfig, + final Duration metricsReportingInterval) { return Props.create(KafkaConsumerActor.class, connection, streamFactory, consumerData, - inboundMappingSink, connectivityStatusResolver, exponentialBackOffConfig); + inboundMappingSink, connectivityStatusResolver, exponentialBackOffConfig, metricsReportingInterval); } @Override @@ -114,6 +117,7 @@ public Receive createReceive() { .match(ResourceStatus.class, this::handleAddressStatus) .match(RetrieveAddressStatus.class, ram -> getSender().tell(getCurrentSourceStatus(), getSelf())) .match(GracefulStop.class, stop -> shutdown()) + .match(ReportMetrics.class, reportMetrics -> reportMetrics()) .match(MessageRejectedException.class, this::restartStream) .match(RestartableKafkaConsumerStream.class, this::setStream) .matchAny(unhandled -> { @@ -128,6 +132,10 @@ protected ThreadSafeDittoLoggingAdapter log() { return log; } + private void reportMetrics() { + kafkaStream.reportMetrics(); + } + private void shutdown() { if (kafkaStream != null) { try { @@ -210,4 +218,13 @@ private GracefulStop() { } + static final class ReportMetrics { + + static final ReportMetrics INSTANCE = new ReportMetrics(); + + private ReportMetrics() { + // intentionally empty + } + } + } diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaConsumerMetricsRegistry.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaConsumerMetricsRegistry.java deleted file mode 100644 index 17007ae31e..0000000000 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaConsumerMetricsRegistry.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Copyright (c) 2021 Contributors to the Eclipse Foundation - * - * See the NOTICE file(s) distributed with this work for additional - * information regarding copyright ownership. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License 2.0 which is available at - * http://www.eclipse.org/legal/epl-2.0 - * - * SPDX-License-Identifier: EPL-2.0 - */ -package org.eclipse.ditto.connectivity.service.messaging.kafka; - -import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull; - -import java.time.Duration; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -import javax.annotation.Nullable; - -import org.eclipse.ditto.connectivity.model.ConnectionId; -import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory; -import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLogger; - -import akka.Done; -import akka.kafka.javadsl.Consumer; - -/** - * Registry for collecting Kafka consumer metrics. - */ -final class KafkaConsumerMetricsRegistry { - - private static final ThreadSafeDittoLogger LOGGER = - DittoLoggerFactory.getThreadSafeLogger(KafkaConsumerMetricsRegistry.class); - - @Nullable private static KafkaConsumerMetricsRegistry instance; - - private final Map metricsMap; - - private KafkaConsumerMetricsRegistry(final Duration metricCollectingInterval) { - metricsMap = new ConcurrentHashMap<>(); - scheduleMetricReporting(metricCollectingInterval); - } - - /** - * Get an instance of the registry. Returns an already existing instance inf already created. - * - * @param metricCollectingInterval the interval in which to collect the metrics. - * @return the instance. - * @throws NullPointerException if {@code metricCollectingInterval} is {@code null}. - */ - public static KafkaConsumerMetricsRegistry getInstance(final Duration metricCollectingInterval) { - checkNotNull(metricCollectingInterval, "metricCollectingInterval"); - if (null == instance) { - instance = new KafkaConsumerMetricsRegistry(metricCollectingInterval); - } - return instance; - } - - /** - * Register a consumer for metric collecting. - * - * @param connectionId the connectionId the consumer belongs to. - * @param consumerControl the control of the consumer. - * @param consumerId the unique identifier of the consumer stream. - */ - void registerConsumer(final ConnectionId connectionId, final Consumer.DrainingControl consumerControl, - final String consumerId) { - LOGGER.debug("Registering new consumer for metric reporting: <{}:{}>", connectionId, consumerId); - metricsMap.put(new CacheKey(connectionId, consumerId), - KafkaConsumerMetrics.newInstance(consumerControl, connectionId, consumerId)); - consumerControl.streamCompletion() - .whenComplete((done, error) -> deregisterConsumer(connectionId, consumerId)); - } - - /** - * Deregister a consumer for metric collecting. - * - * @param connectionId the connectionId the consumer belongs to. - * @param consumerId the unique identifier of the consumer stream. - */ - private void deregisterConsumer(final ConnectionId connectionId, final String consumerId) { - LOGGER.debug("De-registering consumer for metric reporting: <{}:{}>", connectionId, consumerId); - metricsMap.remove(new CacheKey(connectionId, consumerId)); - } - - private void scheduleMetricReporting(final Duration metricCollectingInterval) { - LOGGER.info("Scheduling Kafka metric reporting in interval of: <{}>", metricCollectingInterval); - Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(this::reportMetrics, - metricCollectingInterval.getSeconds(), metricCollectingInterval.getSeconds(), TimeUnit.SECONDS); - } - - private void reportMetrics() { - LOGGER.debug("Reporting metrics for Kafka consumer streams. <{}> consumer streams registered", - metricsMap.size()); - metricsMap.forEach((cacheKey, kafkaConsumerMetrics) -> kafkaConsumerMetrics.reportMetrics()); - } - - private static final class CacheKey { - - private final ConnectionId connectionId; - private final String consumerId; - - private CacheKey(final ConnectionId connectionId, final String consumerId) { - this.connectionId = connectionId; - this.consumerId = consumerId; - } - - @Override - public boolean equals(@Nullable final Object o) { - if (o instanceof CacheKey) { - final var that = (CacheKey) o; - return Objects.equals(connectionId, that.connectionId) && - Objects.equals(consumerId, that.consumerId); - } else { - return false; - } - } - - @Override - public int hashCode() { - return Objects.hash(connectionId, consumerId); - } - - @Override - public String toString() { - return String.format("%s:%s", connectionId, consumerId); - } - - } - -} diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaConsumerStream.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaConsumerStream.java index 74a829f0a0..24c139fda8 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaConsumerStream.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaConsumerStream.java @@ -40,4 +40,9 @@ interface KafkaConsumerStream { */ CompletionStage stop(); + /** + * Triggers report of metrics to DittoMetrics. + */ + void reportMetrics(); + } diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaConsumerStreamFactory.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaConsumerStreamFactory.java index 9765403f5c..2fc1768372 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaConsumerStreamFactory.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaConsumerStreamFactory.java @@ -44,19 +44,16 @@ final class KafkaConsumerStreamFactory { private final AtMostOnceKafkaConsumerSourceSupplier atMostOnceKafkaConsumerSourceSupplier; private final AtLeastOnceKafkaConsumerSourceSupplier atLeastOnceKafkaConsumerSourceSupplier; private final ConnectionThrottlingConfig throttlingConfig; - private final KafkaConsumerMetricsRegistry kafkaConsumerMetricsRegistry; KafkaConsumerStreamFactory(final ConnectionThrottlingConfig throttlingConfig, final PropertiesFactory propertiesFactory, final ConsumerData consumerData, - final boolean dryRun, - final KafkaConsumerMetricsRegistry kafkaConsumerMetricsRegistry) { + final boolean dryRun) { this.throttlingConfig = throttlingConfig; this.consumerData = consumerData; this.dryRun = dryRun; this.propertiesFactory = propertiesFactory; - this.kafkaConsumerMetricsRegistry = kafkaConsumerMetricsRegistry; atMostOnceKafkaConsumerSourceSupplier = new AtMostOnceKafkaConsumerSourceSupplier(propertiesFactory, consumerData.getAddress(), dryRun); atLeastOnceKafkaConsumerSourceSupplier = @@ -74,8 +71,7 @@ final class KafkaConsumerStreamFactory { KafkaConsumerStreamFactory(final AtMostOnceKafkaConsumerSourceSupplier atMostOnceKafkaConsumerSourceSupplier, final AtLeastOnceKafkaConsumerSourceSupplier atLeastOnceKafkaConsumerSourceSupplier, final ConsumerData consumerData, - final boolean dryRun, - final KafkaConsumerMetricsRegistry kafkaConsumerMetricsRegistry) { + final boolean dryRun) { this.consumerData = consumerData; this.dryRun = dryRun; @@ -83,7 +79,6 @@ final class KafkaConsumerStreamFactory { throttlingConfig = ConnectionThrottlingConfig.of(ConfigFactory.empty()); this.atMostOnceKafkaConsumerSourceSupplier = atMostOnceKafkaConsumerSourceSupplier; this.atLeastOnceKafkaConsumerSourceSupplier = atLeastOnceKafkaConsumerSourceSupplier; - this.kafkaConsumerMetricsRegistry = kafkaConsumerMetricsRegistry; } KafkaConsumerStream newAtMostOnceConsumerStream( @@ -103,7 +98,6 @@ KafkaConsumerStream newAtMostOnceConsumerStream( inboundMonitor, messageMappingSink, dreSink, - kafkaConsumerMetricsRegistry, connectionId, consumerId); } @@ -128,7 +122,6 @@ KafkaConsumerStream newAtLeastOnceConsumerStream( ackMonitor, messageMappingSink, dreSink, - kafkaConsumerMetricsRegistry, connectionId, consumerId); } diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/RestartableKafkaConsumerStream.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/RestartableKafkaConsumerStream.java index e4f2558da1..e35dfd27e0 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/RestartableKafkaConsumerStream.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/RestartableKafkaConsumerStream.java @@ -59,6 +59,11 @@ public CompletionStage stop() { return kafkaConsumerStream.stop(); } + @Override + public void reportMetrics() { + kafkaConsumerStream.reportMetrics(); + } + /** * Stops the current stream and starts a new one which will be returned by the CompletionStage. * diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/kafka/AtLeastOnceConsumerStreamTest.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/kafka/AtLeastOnceConsumerStreamTest.java index 1739547529..20d9e7fe7d 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/kafka/AtLeastOnceConsumerStreamTest.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/kafka/AtLeastOnceConsumerStreamTest.java @@ -20,7 +20,6 @@ import static org.mutabilitydetector.unittesting.MutabilityAssert.assertInstancesOf; import static org.mutabilitydetector.unittesting.MutabilityMatchers.areImmutable; -import java.time.Duration; import java.time.Instant; import java.util.concurrent.atomic.AtomicReference; @@ -90,7 +89,7 @@ public void isImmutable() { assertInstancesOf(AtLeastOnceConsumerStream.class, areImmutable(), provided(ConnectionMonitor.class, Sink.class, Materializer.class, - Consumer.DrainingControl.class, KafkaConsumerMetricsRegistry.class).areAlsoImmutable()); + Consumer.DrainingControl.class, KafkaConsumerMetrics.class).areAlsoImmutable()); } @Test @@ -125,8 +124,8 @@ public void appliesBackPressureWhenMessagesAreNotAcknowledged() { new AtLeastOnceConsumerStream(sourceSupplier, CommitterSettings.apply(actorSystem), TestConstants.KAFKA_THROTTLING_CONFIG, messageTransformer, false, materializer, - connectionMonitor, ackMonitor, inboundMappingSink, dreSink, KafkaConsumerMetricsRegistry.getInstance( - Duration.ofSeconds(5L)), ConnectionId.generateRandom(), "someUniqueId"); + connectionMonitor, ackMonitor, inboundMappingSink, dreSink, + ConnectionId.generateRandom(), "someUniqueId"); inboundSinkProbe.ensureSubscription(); // Then we can offer those records and they are processed in parallel to the maximum of 'maxInflight' diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/kafka/AtMostOnceConsumerStreamTest.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/kafka/AtMostOnceConsumerStreamTest.java index 3bbfbf2c13..d4e46e8a47 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/kafka/AtMostOnceConsumerStreamTest.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/kafka/AtMostOnceConsumerStreamTest.java @@ -20,7 +20,6 @@ import static org.mutabilitydetector.unittesting.MutabilityAssert.assertInstancesOf; import static org.mutabilitydetector.unittesting.MutabilityMatchers.areImmutable; -import java.time.Duration; import java.time.Instant; import java.util.concurrent.atomic.AtomicReference; @@ -87,7 +86,7 @@ public void isImmutable() { assertInstancesOf(AtMostOnceConsumerStream.class, areImmutable(), provided(ConnectionMonitor.class, Sink.class, Materializer.class, Consumer.DrainingControl.class, - KafkaConsumerMetricsRegistry.class) + KafkaConsumerMetrics.class) .areAlsoImmutable()); } @@ -117,7 +116,6 @@ public void appliesBackPressureWhenMessagesAreNotAcknowledged() { new AtMostOnceConsumerStream(sourceSupplier, TestConstants.KAFKA_THROTTLING_CONFIG, messageTransformer, false, materializer, connectionMonitor, inboundMappingSink, dreSink, - KafkaConsumerMetricsRegistry.getInstance(Duration.ofSeconds(5L)), ConnectionId.generateRandom(), "someUniqueId"); diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaConsumerActorTest.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaConsumerActorTest.java index 9be0532133..7021e1816c 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaConsumerActorTest.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaConsumerActorTest.java @@ -128,14 +128,15 @@ protected Props getConsumerActorProps(final Sink inboundMapping .build(); final ConsumerData consumerData = new ConsumerData(connectionSource, address, "xy"); final KafkaConsumerStreamFactory consumerStreamFactory = - new KafkaConsumerStreamFactory(sourceSupplier, null, consumerData, false, KafkaConsumerMetricsRegistry.getInstance(Duration.ofSeconds(10L))); + new KafkaConsumerStreamFactory(sourceSupplier, null, consumerData, false); final DefaultExponentialBackOffConfig backOffConfig = DefaultExponentialBackOffConfig.of(ConfigFactory.empty()); return KafkaConsumerActor.props(CONNECTION, consumerStreamFactory, new ConsumerData(connectionSource, address, address + 0), inboundMappingSink, mock(ConnectivityStatusResolver.class), - backOffConfig); + backOffConfig, + Duration.ofSeconds(10)); } @Override