From 690a974e5401cdc533c2f941eaf6138178092046 Mon Sep 17 00:00:00 2001 From: Arpit goyal Date: Fri, 17 Oct 2025 15:08:08 +0530 Subject: [PATCH 1/5] Adding allow topic creation false for global and restore consumer --- .../main/java/org/apache/kafka/streams/StreamsConfig.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 4830fb960b94e..6b7893ecd9ae8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -1845,6 +1845,9 @@ public Map getRestoreConsumerConfigs(final String clientId) { baseConsumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId); baseConsumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none"); + // disable auto topic creation + baseConsumerProps.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false"); + return baseConsumerProps; } @@ -1877,6 +1880,10 @@ public Map getGlobalConsumerConfigs(final String clientId) { // add client id with stream client id prefix baseConsumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-global-consumer"); baseConsumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none"); + + // disable auto topic creation + baseConsumerProps.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false"); + return baseConsumerProps; } From 789bb13faa0fcba059f7575c053a5159be2e4fa9 Mon Sep 17 00:00:00 2001 From: Arpit goyal Date: Sat, 18 Oct 2025 14:04:59 +0530 Subject: [PATCH 2/5] adding test whivh verify override does not work --- .../kafka/streams/StreamsConfigTest.java | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index 65147a81101fa..85b9ccfa231ac 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -442,6 +442,32 @@ public void shouldOverrideAdminDefaultAdminClientEnableTelemetry() { assertTrue((boolean) returnedProps.get(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG)); } + @Test + public void testAutoCreateTopicsCannotBeOverriddenForStreamsConsumers() { + // User tries to override the setting + props.put(StreamsConfig.restoreConsumerPrefix(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), "true"); + props.put(StreamsConfig.globalConsumerPrefix(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), "true"); + props.put(StreamsConfig.mainConsumerPrefix(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), "true"); + + final StreamsConfig streamsConfig = new StreamsConfig(props); + + // Main consumer - verify override is ignored + final Map mainConfigs = streamsConfig.getMainConsumerConfigs("group", "client", 0); + assertEquals("false", mainConfigs.get(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), + "Main consumer should not allow auto topic creation even with override"); + + // Restore consumer - verify override is ignored + final Map restoreConfigs = streamsConfig.getRestoreConsumerConfigs("client"); + assertEquals("false", restoreConfigs.get(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), + "Restore consumer should not allow auto topic creation even with override"); + + // Global consumer - verify override is ignored + final Map globalConfigs = streamsConfig.getGlobalConsumerConfigs("client"); + assertEquals("false", globalConfigs.get(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), + "Global consumer should not allow auto topic creation even with override"); + } + + @Test public void shouldSupportNonPrefixedAdminConfigs() { props.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 10); From 3779545ac78c0502767fb34c9c6cd3d4cccc8f46 Mon Sep 17 00:00:00 2001 From: Arpit goyal Date: Sat, 25 Oct 2025 10:44:45 +0530 Subject: [PATCH 3/5] Adding auto create topic in non configurable default --- .../apache/kafka/streams/StreamsConfig.java | 17 ++++++----------- .../kafka/streams/StreamsConfigTest.java | 19 +++++++++++++++++++ 2 files changed, 25 insertions(+), 11 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 6b7893ecd9ae8..1a1952b9aa483 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -821,7 +821,7 @@ public class StreamsConfig extends AbstractConfig { private static final String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC = "Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift. Default is 1 day"; private static final String[] NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS = - new String[] {ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ConsumerConfig.GROUP_PROTOCOL_CONFIG}; + new String[] {ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ConsumerConfig.GROUP_PROTOCOL_CONFIG, ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG}; private static final String[] NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS = new String[] {ConsumerConfig.ISOLATION_LEVEL_CONFIG}; private static final String[] NON_CONFIGURABLE_PRODUCER_EOS_CONFIGS = @@ -1255,7 +1255,8 @@ public class StreamsConfig extends AbstractConfig { ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000", ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest", ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false", - ConsumerConfig.GROUP_PROTOCOL_CONFIG, "classic" + ConsumerConfig.GROUP_PROTOCOL_CONFIG, "classic", + ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false" ); private static final Map CONSUMER_EOS_OVERRIDES; @@ -1759,6 +1760,7 @@ public Map getMainConsumerConfigs(final String groupId, final St // Get main consumer override configs final Map mainConsumerProps = originalsWithPrefix(MAIN_CONSUMER_PREFIX); + checkIfUnexpectedUserSpecifiedConsumerConfig(mainConsumerProps, NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS); consumerProps.putAll(mainConsumerProps); // this is a hack to work around StreamsConfig constructor inside StreamsPartitionAssignor to avoid casting @@ -1789,9 +1791,6 @@ public Map getMainConsumerConfigs(final String groupId, final St consumerProps.put(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG, getInt(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG)); consumerProps.put(TASK_ASSIGNOR_CLASS_CONFIG, getString(TASK_ASSIGNOR_CLASS_CONFIG)); - // disable auto topic creation - consumerProps.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false"); - // verify that producer batch config is no larger than segment size, then add topic configs required for creating topics final Map topicProps = originalsWithPrefix(TOPIC_PREFIX, false); final Map producerProps = getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames()); @@ -1834,6 +1833,7 @@ public Map getRestoreConsumerConfigs(final String clientId) { // Get restore consumer override configs final Map restoreConsumerProps = originalsWithPrefix(RESTORE_CONSUMER_PREFIX); + checkIfUnexpectedUserSpecifiedConsumerConfig(restoreConsumerProps, NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS); baseConsumerProps.putAll(restoreConsumerProps); // no need to set group id for a restore consumer @@ -1845,9 +1845,6 @@ public Map getRestoreConsumerConfigs(final String clientId) { baseConsumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId); baseConsumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none"); - // disable auto topic creation - baseConsumerProps.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false"); - return baseConsumerProps; } @@ -1870,6 +1867,7 @@ public Map getGlobalConsumerConfigs(final String clientId) { // Get global consumer override configs final Map globalConsumerProps = originalsWithPrefix(GLOBAL_CONSUMER_PREFIX); + checkIfUnexpectedUserSpecifiedConsumerConfig(globalConsumerProps, NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS); baseConsumerProps.putAll(globalConsumerProps); // no need to set group id for a global consumer @@ -1881,9 +1879,6 @@ public Map getGlobalConsumerConfigs(final String clientId) { baseConsumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-global-consumer"); baseConsumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none"); - // disable auto topic creation - baseConsumerProps.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false"); - return baseConsumerProps; } diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index 85b9ccfa231ac..8c88ef4c925dc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -445,6 +445,7 @@ public void shouldOverrideAdminDefaultAdminClientEnableTelemetry() { @Test public void testAutoCreateTopicsCannotBeOverriddenForStreamsConsumers() { // User tries to override the setting + props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), "true"); props.put(StreamsConfig.restoreConsumerPrefix(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), "true"); props.put(StreamsConfig.globalConsumerPrefix(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), "true"); props.put(StreamsConfig.mainConsumerPrefix(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), "true"); @@ -467,6 +468,24 @@ public void testAutoCreateTopicsCannotBeOverriddenForStreamsConsumers() { "Global consumer should not allow auto topic creation even with override"); } + @Test + public void shouldLogErrorWhenUserTriesToOverrideAutoCreateTopics() { + props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), "true"); + + try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StreamsConfig.class)) { + appender.setClassLogger(StreamsConfig.class, Level.ERROR); + + final StreamsConfig streamsConfig = new StreamsConfig(props); + // Trigger the warning by getting consumer configs + streamsConfig.getMainConsumerConfigs("group", "client", 0); + + assertTrue(appender.getMessages().stream() + .anyMatch(msg -> msg.contains("Unexpected user-specified consumer config 'allow.auto.create.topics' found") + && msg.contains("User setting (true) will be ignored and the Streams default setting (false) will be used")), + "Should log error when user tries to override allow.auto.create.topics"); + } + } + @Test public void shouldSupportNonPrefixedAdminConfigs() { From ac73d1e523551cc577fcda8ae61efeb5ceef06cb Mon Sep 17 00:00:00 2001 From: Arpit goyal Date: Thu, 6 Nov 2025 06:21:01 +0530 Subject: [PATCH 4/5] renaming checkIfUnexpectedUserSpecifiedConsumerConfig to checkIfUnexpectedUserSpecifiedClientConfig --- .../apache/kafka/streams/StreamsConfig.java | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 1a1952b9aa483..7df46a2bdcc60 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -1631,8 +1631,8 @@ private Map getCommonConsumerConfigs() { clientProvidedProps.remove(GROUP_PROTOCOL_CONFIG); - checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS); - checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS); + checkIfUnexpectedUserSpecifiedClientConfig(clientProvidedProps, NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS); + checkIfUnexpectedUserSpecifiedClientConfig(clientProvidedProps, NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS); final Map consumerProps = new HashMap<>(eosEnabled ? CONSUMER_EOS_OVERRIDES : CONSUMER_DEFAULT_OVERRIDES); if (StreamsConfigUtils.eosEnabled(this)) { @@ -1647,12 +1647,12 @@ private Map getCommonConsumerConfigs() { return consumerProps; } - private void checkIfUnexpectedUserSpecifiedConsumerConfig(final Map clientProvidedProps, - final String[] nonConfigurableConfigs) { - // Streams does not allow users to configure certain consumer/producer configurations, for example, - // enable.auto.commit. In cases where user tries to override such non-configurable - // consumer/producer configurations, log a warning and remove the user defined value from the Map. - // Thus, the default values for these consumer/producer configurations that are suitable for + private void checkIfUnexpectedUserSpecifiedClientConfig(final Map clientProvidedProps, + final String[] nonConfigurableConfigs) { + // Streams does not allow users to configure certain client configurations (consumer/producer), + // for example, enable.auto.commit or transactional.id. In cases where user tries to override + // such non-configurable client configurations, log a warning and remove the user defined value + // from the Map. Thus, the default values for these client configurations that are suitable for // Streams will be used instead. final String nonConfigurableConfigMessage = "Unexpected user-specified {} config '{}' found. {} setting ({}) will be ignored and the Streams default setting ({}) will be used."; @@ -1760,7 +1760,7 @@ public Map getMainConsumerConfigs(final String groupId, final St // Get main consumer override configs final Map mainConsumerProps = originalsWithPrefix(MAIN_CONSUMER_PREFIX); - checkIfUnexpectedUserSpecifiedConsumerConfig(mainConsumerProps, NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS); + checkIfUnexpectedUserSpecifiedClientConfig(mainConsumerProps, NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS); consumerProps.putAll(mainConsumerProps); // this is a hack to work around StreamsConfig constructor inside StreamsPartitionAssignor to avoid casting @@ -1833,7 +1833,7 @@ public Map getRestoreConsumerConfigs(final String clientId) { // Get restore consumer override configs final Map restoreConsumerProps = originalsWithPrefix(RESTORE_CONSUMER_PREFIX); - checkIfUnexpectedUserSpecifiedConsumerConfig(restoreConsumerProps, NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS); + checkIfUnexpectedUserSpecifiedClientConfig(restoreConsumerProps, NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS); baseConsumerProps.putAll(restoreConsumerProps); // no need to set group id for a restore consumer @@ -1867,7 +1867,7 @@ public Map getGlobalConsumerConfigs(final String clientId) { // Get global consumer override configs final Map globalConsumerProps = originalsWithPrefix(GLOBAL_CONSUMER_PREFIX); - checkIfUnexpectedUserSpecifiedConsumerConfig(globalConsumerProps, NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS); + checkIfUnexpectedUserSpecifiedClientConfig(globalConsumerProps, NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS); baseConsumerProps.putAll(globalConsumerProps); // no need to set group id for a global consumer @@ -1895,7 +1895,7 @@ public Map getGlobalConsumerConfigs(final String clientId) { public Map getProducerConfigs(final String clientId) { final Map clientProvidedProps = getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames()); - checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, NON_CONFIGURABLE_PRODUCER_EOS_CONFIGS); + checkIfUnexpectedUserSpecifiedClientConfig(clientProvidedProps, NON_CONFIGURABLE_PRODUCER_EOS_CONFIGS); // generate producer configs from original properties and overridden maps final Map props = new HashMap<>(eosEnabled ? PRODUCER_EOS_OVERRIDES : PRODUCER_DEFAULT_OVERRIDES); From 6410aafa8a701bfc621756328756d84c09315362 Mon Sep 17 00:00:00 2001 From: Arpit goyal Date: Sat, 8 Nov 2025 12:34:00 +0530 Subject: [PATCH 5/5] Adding test for common and specific consumer prefix --- .../kafka/streams/StreamsConfigTest.java | 84 ++++++++++++------- 1 file changed, 56 insertions(+), 28 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index 8c88ef4c925dc..ca4048ee9c694 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -443,46 +443,74 @@ public void shouldOverrideAdminDefaultAdminClientEnableTelemetry() { } @Test - public void testAutoCreateTopicsCannotBeOverriddenForStreamsConsumers() { - // User tries to override the setting + public void shouldNotAllowAutoCreateTopicsForConsumers_WithCommonConsumerPrefix() { + // Test with generic consumer.* prefix (affects all consumer types) props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), "true"); - props.put(StreamsConfig.restoreConsumerPrefix(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), "true"); - props.put(StreamsConfig.globalConsumerPrefix(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), "true"); - props.put(StreamsConfig.mainConsumerPrefix(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), "true"); - - final StreamsConfig streamsConfig = new StreamsConfig(props); - // Main consumer - verify override is ignored - final Map mainConfigs = streamsConfig.getMainConsumerConfigs("group", "client", 0); - assertEquals("false", mainConfigs.get(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), - "Main consumer should not allow auto topic creation even with override"); + try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StreamsConfig.class)) { + appender.setClassLogger(StreamsConfig.class, Level.ERROR); - // Restore consumer - verify override is ignored - final Map restoreConfigs = streamsConfig.getRestoreConsumerConfigs("client"); - assertEquals("false", restoreConfigs.get(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), - "Restore consumer should not allow auto topic creation even with override"); + final StreamsConfig streamsConfig = new StreamsConfig(props); - // Global consumer - verify override is ignored - final Map globalConfigs = streamsConfig.getGlobalConsumerConfigs("client"); - assertEquals("false", globalConfigs.get(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), - "Global consumer should not allow auto topic creation even with override"); + // Main consumer - verify override is ignored + final Map mainConfigs = streamsConfig.getMainConsumerConfigs("group", "client", 0); + assertEquals("false", mainConfigs.get(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), + "Main consumer should not allow auto topic creation with consumer.* override"); + + // Restore consumer - verify override is ignored + final Map restoreConfigs = streamsConfig.getRestoreConsumerConfigs("client"); + assertEquals("false", restoreConfigs.get(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), + "Restore consumer should not allow auto topic creation with consumer.* override"); + + // Global consumer - verify override is ignored + final Map globalConfigs = streamsConfig.getGlobalConsumerConfigs("client"); + assertEquals("false", globalConfigs.get(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), + "Global consumer should not allow auto topic creation with consumer.* override"); + + // Verify exactly 1 error is logged (consumer.* prefix is validated once in getCommonConsumerConfigs for each type of consumer) + final List errorMessages = appender.getMessages(); + final long errorCount = errorMessages.stream() + .filter(msg -> msg.contains("Unexpected user-specified consumer config 'allow.auto.create.topics' found")) + .count(); + assertEquals(3, errorCount, + "Should log exactly 3 error for consumer.* prefix"); + } } @Test - public void shouldLogErrorWhenUserTriesToOverrideAutoCreateTopics() { - props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), "true"); + public void shouldNotAllowAutoCreateTopicsForConsumers_WithSpecificConsumerPrefixes() { + // Test with specific prefixes for each consumer type + props.put(StreamsConfig.mainConsumerPrefix(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), "true"); + props.put(StreamsConfig.restoreConsumerPrefix(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), "true"); + props.put(StreamsConfig.globalConsumerPrefix(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), "true"); try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StreamsConfig.class)) { appender.setClassLogger(StreamsConfig.class, Level.ERROR); - + final StreamsConfig streamsConfig = new StreamsConfig(props); - // Trigger the warning by getting consumer configs - streamsConfig.getMainConsumerConfigs("group", "client", 0); - assertTrue(appender.getMessages().stream() - .anyMatch(msg -> msg.contains("Unexpected user-specified consumer config 'allow.auto.create.topics' found") - && msg.contains("User setting (true) will be ignored and the Streams default setting (false) will be used")), - "Should log error when user tries to override allow.auto.create.topics"); + // Main consumer - verify override is ignored + final Map mainConfigs = streamsConfig.getMainConsumerConfigs("group", "client", 0); + assertEquals("false", mainConfigs.get(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), + "Main consumer should not allow auto topic creation with main.consumer.* override"); + + // Restore consumer - verify override is ignored + final Map restoreConfigs = streamsConfig.getRestoreConsumerConfigs("client"); + assertEquals("false", restoreConfigs.get(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), + "Restore consumer should not allow auto topic creation with restore.consumer.* override"); + + // Global consumer - verify override is ignored + final Map globalConfigs = streamsConfig.getGlobalConsumerConfigs("client"); + assertEquals("false", globalConfigs.get(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), + "Global consumer should not allow auto topic creation with global.consumer.* override"); + + // Verify exactly 3 errors are logged (one for each specific prefix) + final List errorMessages = appender.getMessages(); + final long errorCount = errorMessages.stream() + .filter(msg -> msg.contains("Unexpected user-specified consumer config 'allow.auto.create.topics' found")) + .count(); + assertEquals(3, errorCount, + "Should log exactly 3 errors: one for main.consumer.*, one for restore.consumer.*, one for global.consumer.*"); } }