From 1f792a86591bb296e07548d045c86c5c9215a8b2 Mon Sep 17 00:00:00 2001 From: Viktor Somogyi-Vass Date: Thu, 11 May 2023 13:03:45 +0200 Subject: [PATCH] KAFKA-14978 ExactlyOnceWorkerSourceTask should remove parent metrics (#13690) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reviewers: Chris Egerton , Viktor Somogyi-Vass Co-authored-by: Dániel Urbán <48119872+urbandan@users.noreply.github.com> --- .../runtime/ExactlyOnceWorkerSourceTask.java | 1 + .../ExactlyOnceWorkerSourceTaskTest.java | 20 +++++++++++++++++++ 2 files changed, 21 insertions(+) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java index 2b9b7aa75f17..30dafaac81d4 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java @@ -233,6 +233,7 @@ protected void finalOffsetCommit(boolean failed) { @Override public void removeMetrics() { Utils.closeQuietly(transactionMetrics, "source task transaction metrics tracker"); + super.removeMetrics(); } @Override diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java index 7c920a3d9881..9939becc9c1f 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java @@ -19,6 +19,7 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.RecordTooLargeException; @@ -82,7 +83,9 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import java.util.stream.Collectors; +import static java.util.Collections.emptySet; import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG; @@ -291,6 +294,23 @@ private void createWorkerTask(TargetState initialState, Converter keyConverter, sourceConfig, Runnable::run, preProducerCheck, postProducerCheck); } + @Test + public void testRemoveMetrics() { + createWorkerTask(); + + workerTask.removeMetrics(); + + assertEquals(emptySet(), filterToTaskMetrics(metrics.metrics().metrics().keySet())); + } + + private Set filterToTaskMetrics(Set metricNames) { + return metricNames + .stream() + .filter(m -> metrics.registry().taskGroupName().equals(m.group()) + || metrics.registry().sourceTaskGroupName().equals(m.group())) + .collect(Collectors.toSet()); + } + @Test public void testStartPaused() throws Exception { createWorkerTask(TargetState.PAUSED);