diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index 7f04914c8d12..7bbcf0f68616 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -311,7 +311,7 @@ protected Map> buildTasksConfig(String conn Map> configs = new HashMap<>(); for (ConnectorTaskId cti : configState.tasks(connector)) { - configs.put(cti, configState.taskConfig(cti)); + configs.put(cti, configState.rawTaskConfig(cti)); } return configs; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java index c7fee9e67153..babb157f772c 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java @@ -579,7 +579,7 @@ public int hashCode() { public void tasksConfig(String connName, Callback>> callback) { Map> tasksConfig = buildTasksConfig(connName); if (tasksConfig.isEmpty()) { - callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), tasksConfig); + callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), null); return; } callback.onCompletion(null, tasksConfig); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index bec8369a3b09..2b774133d364 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -2091,6 +2091,8 @@ public void testAccessors() throws Exception { herder.connectorConfig(CONN1, connectorConfigCb); FutureCallback> taskConfigsCb = new FutureCallback<>(); herder.taskConfigs(CONN1, taskConfigsCb); + FutureCallback>> tasksConfigCb = new FutureCallback<>(); + herder.tasksConfig(CONN1, tasksConfigCb); herder.tick(); assertTrue(listConnectorsCb.isDone()); @@ -2107,6 +2109,11 @@ public void testAccessors() throws Exception { new TaskInfo(TASK1, TASK_CONFIG), new TaskInfo(TASK2, TASK_CONFIG)), taskConfigsCb.get()); + Map> tasksConfig = new HashMap<>(); + tasksConfig.put(TASK0, TASK_CONFIG); + tasksConfig.put(TASK1, TASK_CONFIG); + tasksConfig.put(TASK2, TASK_CONFIG); + assertEquals(tasksConfig, tasksConfigCb.get()); // Config transformation should not occur when requesting connector or task info verify(configTransformer, never()).transform(eq(CONN1), any()); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java index 32c54f58035b..500ce58b8c65 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java @@ -765,6 +765,7 @@ public void testAccessors() throws Exception { Callback connectorInfoCb = PowerMock.createMock(Callback.class); Callback> connectorConfigCb = PowerMock.createMock(Callback.class); Callback> taskConfigsCb = PowerMock.createMock(Callback.class); + Callback>> tasksConfigCb = PowerMock.createMock(Callback.class); // Check accessors with empty worker listConnectorsCb.onCompletion(null, Collections.EMPTY_SET); @@ -775,6 +776,8 @@ public void testAccessors() throws Exception { EasyMock.expectLastCall(); taskConfigsCb.onCompletion(EasyMock.anyObject(), EasyMock.isNull()); EasyMock.expectLastCall(); + tasksConfigCb.onCompletion(EasyMock.anyObject(), EasyMock.isNull()); + EasyMock.expectLastCall(); // Create connector connector = PowerMock.createMock(BogusSourceConnector.class); @@ -795,6 +798,10 @@ public void testAccessors() throws Exception { taskConfigsCb.onCompletion(null, Arrays.asList(taskInfo)); EasyMock.expectLastCall(); + Map> tasksConfig = Collections.singletonMap(new ConnectorTaskId(CONNECTOR_NAME, 0), + taskConfig(SourceSink.SOURCE)); + tasksConfigCb.onCompletion(null, tasksConfig); + EasyMock.expectLastCall(); PowerMock.replayAll(); @@ -803,6 +810,7 @@ public void testAccessors() throws Exception { herder.connectorInfo(CONNECTOR_NAME, connectorInfoCb); herder.connectorConfig(CONNECTOR_NAME, connectorConfigCb); herder.taskConfigs(CONNECTOR_NAME, taskConfigsCb); + herder.tasksConfig(CONNECTOR_NAME, tasksConfigCb); herder.putConnectorConfig(CONNECTOR_NAME, connConfig, false, createCallback); Herder.Created connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS); @@ -818,6 +826,7 @@ public void testAccessors() throws Exception { herder.connectorInfo(CONNECTOR_NAME, connectorInfoCb); herder.connectorConfig(CONNECTOR_NAME, connectorConfigCb); herder.taskConfigs(CONNECTOR_NAME, taskConfigsCb); + herder.tasksConfig(CONNECTOR_NAME, tasksConfigCb); PowerMock.verifyAll(); }