From 3b9c3da97847518c57863cbff8a9bef2fd1f7ada Mon Sep 17 00:00:00 2001 From: Yash Mayya Date: Thu, 24 Aug 2023 23:44:53 +0100 Subject: [PATCH] KAFKA-15377: Don't expose externalized secret values in tasks-config API endpoint (#14244) Reviewers: Greg Harris --- .../org/apache/kafka/connect/runtime/AbstractHerder.java | 2 +- .../connect/runtime/standalone/StandaloneHerder.java | 2 +- .../runtime/distributed/DistributedHerderTest.java | 7 +++++++ .../connect/runtime/standalone/StandaloneHerderTest.java | 9 +++++++++ 4 files changed, 18 insertions(+), 2 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index 84bd0f811086..d5d4edaca952 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -311,7 +311,7 @@ protected Map> buildTasksConfig(String conn Map> configs = new HashMap<>(); for (ConnectorTaskId cti : configState.tasks(connector)) { - configs.put(cti, configState.taskConfig(cti)); + configs.put(cti, configState.rawTaskConfig(cti)); } return configs; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java index b3c83e3dd789..6a74224def14 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java @@ -551,7 +551,7 @@ public int hashCode() { public void tasksConfig(String connName, Callback>> callback) { Map> tasksConfig = buildTasksConfig(connName); if (tasksConfig.isEmpty()) { - callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), tasksConfig); + callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), null); return; } callback.onCompletion(null, tasksConfig); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index cc4147737df4..e42a720fcc3f 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -2839,6 +2839,8 @@ public void testAccessors() throws Exception { herder.connectorConfig(CONN1, connectorConfigCb); FutureCallback> taskConfigsCb = new FutureCallback<>(); herder.taskConfigs(CONN1, taskConfigsCb); + FutureCallback>> tasksConfigCb = new FutureCallback<>(); + herder.tasksConfig(CONN1, tasksConfigCb); herder.tick(); assertTrue(listConnectorsCb.isDone()); @@ -2855,6 +2857,11 @@ public void testAccessors() throws Exception { new TaskInfo(TASK1, TASK_CONFIG), new TaskInfo(TASK2, TASK_CONFIG)), taskConfigsCb.get()); + Map> tasksConfig = new HashMap<>(); + tasksConfig.put(TASK0, TASK_CONFIG); + tasksConfig.put(TASK1, TASK_CONFIG); + tasksConfig.put(TASK2, TASK_CONFIG); + assertEquals(tasksConfig, tasksConfigCb.get()); PowerMock.verifyAll(); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java index 9eb29d5de534..0d8e706713e7 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java @@ -762,6 +762,7 @@ public void testAccessors() throws Exception { Callback connectorInfoCb = PowerMock.createMock(Callback.class); Callback> connectorConfigCb = PowerMock.createMock(Callback.class); Callback> taskConfigsCb = PowerMock.createMock(Callback.class); + Callback>> tasksConfigCb = PowerMock.createMock(Callback.class); // Check accessors with empty worker listConnectorsCb.onCompletion(null, Collections.EMPTY_SET); @@ -772,6 +773,8 @@ public void testAccessors() throws Exception { EasyMock.expectLastCall(); taskConfigsCb.onCompletion(EasyMock.anyObject(), EasyMock.isNull()); EasyMock.expectLastCall(); + tasksConfigCb.onCompletion(EasyMock.anyObject(), EasyMock.isNull()); + EasyMock.expectLastCall(); // Create connector connector = PowerMock.createMock(BogusSourceConnector.class); @@ -792,6 +795,10 @@ public void testAccessors() throws Exception { taskConfigsCb.onCompletion(null, Arrays.asList(taskInfo)); EasyMock.expectLastCall(); + Map> tasksConfig = Collections.singletonMap(new ConnectorTaskId(CONNECTOR_NAME, 0), + taskConfig(SourceSink.SOURCE)); + tasksConfigCb.onCompletion(null, tasksConfig); + EasyMock.expectLastCall(); PowerMock.replayAll(); @@ -800,6 +807,7 @@ public void testAccessors() throws Exception { herder.connectorInfo(CONNECTOR_NAME, connectorInfoCb); herder.connectorConfig(CONNECTOR_NAME, connectorConfigCb); herder.taskConfigs(CONNECTOR_NAME, taskConfigsCb); + herder.tasksConfig(CONNECTOR_NAME, tasksConfigCb); herder.putConnectorConfig(CONNECTOR_NAME, connConfig, false, createCallback); Herder.Created connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS); @@ -815,6 +823,7 @@ public void testAccessors() throws Exception { herder.connectorInfo(CONNECTOR_NAME, connectorInfoCb); herder.connectorConfig(CONNECTOR_NAME, connectorConfigCb); herder.taskConfigs(CONNECTOR_NAME, taskConfigsCb); + herder.tasksConfig(CONNECTOR_NAME, tasksConfigCb); PowerMock.verifyAll(); }