From fe4f3e4508ce7adb0731afeca5e5c34ab13f700f Mon Sep 17 00:00:00 2001 From: Fan Yang Date: Sat, 20 Sep 2025 18:23:28 +0800 Subject: [PATCH 1/3] KAFKA-19697: Fix unregistering connectorStatusMetrics NPE if tasks fail to start --- .../main/java/org/apache/kafka/connect/runtime/Worker.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index 53cc40d7fd8b7..6cfa492b51775 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -2394,6 +2394,10 @@ protected synchronized void recordTaskAdded(ConnectorTaskId connectorTaskId) { } protected synchronized void recordTaskRemoved(ConnectorTaskId connectorTaskId) { + if (!connectorStatusMetrics.containsKey(connectorTaskId.connector())) { + return; + } + // Unregister connector task count metric if we remove the last task of the connector if (tasks.keySet().stream().noneMatch(id -> id.connector().equals(connectorTaskId.connector()))) { connectorStatusMetrics.get(connectorTaskId.connector()).close(); From dd8beb4bbf6ef332fd1440d9854550494f5b90f7 Mon Sep 17 00:00:00 2001 From: Fan Yang Date: Mon, 22 Sep 2025 09:59:41 +0800 Subject: [PATCH 2/3] Add UT --- .../kafka/connect/runtime/WorkerTest.java | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java index e29eeebe88d60..8d9aa5891d722 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -146,6 +146,7 @@ import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG; import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG; import static org.apache.kafka.connect.sink.SinkTask.TOPICS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; @@ -898,6 +899,36 @@ public void testConnectorStatusMetricsGroup_taskStatusCounter(boolean enableTopi verifyKafkaClusterId(); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testConnectorStatusMetricsGroup_tasksFailedToStart(boolean enableTopicCreation) { + setup(enableTopicCreation); + ConcurrentMap> tasks = new ConcurrentHashMap<>(); + ConnectorTaskId taskId1 = new ConnectorTaskId("c1", 0); + ConnectorTaskId taskId2 = new ConnectorTaskId("c1", 1); + + mockKafkaClusterId(); + mockInternalConverters(); + mockFileConfigProvider(); + + worker = new Worker(WORKER_ID, + new MockTime(), + plugins, + config, + offsetBackingStore, + noneConnectorClientConfigOverridePolicy); + worker.herder = herder; + + Worker.ConnectorStatusMetricsGroup metricsGroup = new Worker.ConnectorStatusMetricsGroup( + worker.metrics(), tasks, herder + ); + + metricsGroup.recordTaskAdded(taskId1); + metricsGroup.recordTaskAdded(taskId2); + metricsGroup.recordTaskRemoved(taskId1); + assertDoesNotThrow(() -> metricsGroup.recordTaskRemoved(taskId2), "should not throw NPE"); + } + @ParameterizedTest @ValueSource(booleans = {true, false}) public void testStartTaskFailure(boolean enableTopicCreation) { From 399f5bc19ebade4e5a5c4bd3e812e5260edf6bd3 Mon Sep 17 00:00:00 2001 From: Fan Yang Date: Thu, 25 Sep 2025 09:53:14 +0800 Subject: [PATCH 3/3] Add UT --- .../org/apache/kafka/connect/runtime/WorkerTest.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java index 8d9aa5891d722..20a5b1e339539 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -903,10 +903,6 @@ public void testConnectorStatusMetricsGroup_taskStatusCounter(boolean enableTopi @ValueSource(booleans = {true, false}) public void testConnectorStatusMetricsGroup_tasksFailedToStart(boolean enableTopicCreation) { setup(enableTopicCreation); - ConcurrentMap> tasks = new ConcurrentHashMap<>(); - ConnectorTaskId taskId1 = new ConnectorTaskId("c1", 0); - ConnectorTaskId taskId2 = new ConnectorTaskId("c1", 1); - mockKafkaClusterId(); mockInternalConverters(); mockFileConfigProvider(); @@ -919,10 +915,13 @@ public void testConnectorStatusMetricsGroup_tasksFailedToStart(boolean enableTop noneConnectorClientConfigOverridePolicy); worker.herder = herder; + // Pass an empty tasks map to simulate all tasks failing to start Worker.ConnectorStatusMetricsGroup metricsGroup = new Worker.ConnectorStatusMetricsGroup( - worker.metrics(), tasks, herder + worker.metrics(), new ConcurrentHashMap<>(), herder ); + ConnectorTaskId taskId1 = new ConnectorTaskId("c1", 0); + ConnectorTaskId taskId2 = new ConnectorTaskId("c1", 1); metricsGroup.recordTaskAdded(taskId1); metricsGroup.recordTaskAdded(taskId2); metricsGroup.recordTaskRemoved(taskId1);