diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index 53cc40d7fd8b7..6cfa492b51775 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -2394,6 +2394,10 @@ protected synchronized void recordTaskAdded(ConnectorTaskId connectorTaskId) { } protected synchronized void recordTaskRemoved(ConnectorTaskId connectorTaskId) { + if (!connectorStatusMetrics.containsKey(connectorTaskId.connector())) { + return; + } + // Unregister connector task count metric if we remove the last task of the connector if (tasks.keySet().stream().noneMatch(id -> id.connector().equals(connectorTaskId.connector()))) { connectorStatusMetrics.get(connectorTaskId.connector()).close(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java index e29eeebe88d60..20a5b1e339539 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -146,6 +146,7 @@ import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG; import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG; import static org.apache.kafka.connect.sink.SinkTask.TOPICS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; @@ -898,6 +899,35 @@ public void testConnectorStatusMetricsGroup_taskStatusCounter(boolean enableTopi verifyKafkaClusterId(); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testConnectorStatusMetricsGroup_tasksFailedToStart(boolean enableTopicCreation) { + setup(enableTopicCreation); + mockKafkaClusterId(); + mockInternalConverters(); + mockFileConfigProvider(); + + worker = new Worker(WORKER_ID, + new MockTime(), + plugins, + config, + offsetBackingStore, + noneConnectorClientConfigOverridePolicy); + worker.herder = herder; + + // Pass an empty tasks map to simulate all tasks failing to start + Worker.ConnectorStatusMetricsGroup metricsGroup = new Worker.ConnectorStatusMetricsGroup( + worker.metrics(), new ConcurrentHashMap<>(), herder + ); + + ConnectorTaskId taskId1 = new ConnectorTaskId("c1", 0); + ConnectorTaskId taskId2 = new ConnectorTaskId("c1", 1); + metricsGroup.recordTaskAdded(taskId1); + metricsGroup.recordTaskAdded(taskId2); + metricsGroup.recordTaskRemoved(taskId1); + assertDoesNotThrow(() -> metricsGroup.recordTaskRemoved(taskId2), "should not throw NPE"); + } + @ParameterizedTest @ValueSource(booleans = {true, false}) public void testStartTaskFailure(boolean enableTopicCreation) {