Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 16 additions & 14 deletions streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -821,7 +821,7 @@ public class StreamsConfig extends AbstractConfig {
private static final String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC = "Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift. Default is 1 day";

private static final String[] NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS =
new String[] {ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ConsumerConfig.GROUP_PROTOCOL_CONFIG};
new String[] {ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ConsumerConfig.GROUP_PROTOCOL_CONFIG, ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG};
private static final String[] NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS =
new String[] {ConsumerConfig.ISOLATION_LEVEL_CONFIG};
private static final String[] NON_CONFIGURABLE_PRODUCER_EOS_CONFIGS =
Expand Down Expand Up @@ -1255,7 +1255,8 @@ public class StreamsConfig extends AbstractConfig {
ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false",
ConsumerConfig.GROUP_PROTOCOL_CONFIG, "classic"
ConsumerConfig.GROUP_PROTOCOL_CONFIG, "classic",
ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false"
);

private static final Map<String, Object> CONSUMER_EOS_OVERRIDES;
Expand Down Expand Up @@ -1630,8 +1631,8 @@ private Map<String, Object> getCommonConsumerConfigs() {

clientProvidedProps.remove(GROUP_PROTOCOL_CONFIG);

checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS);
checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS);
checkIfUnexpectedUserSpecifiedClientConfig(clientProvidedProps, NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS);
checkIfUnexpectedUserSpecifiedClientConfig(clientProvidedProps, NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS);

final Map<String, Object> consumerProps = new HashMap<>(eosEnabled ? CONSUMER_EOS_OVERRIDES : CONSUMER_DEFAULT_OVERRIDES);
if (StreamsConfigUtils.eosEnabled(this)) {
Expand All @@ -1646,12 +1647,12 @@ private Map<String, Object> getCommonConsumerConfigs() {
return consumerProps;
}

private void checkIfUnexpectedUserSpecifiedConsumerConfig(final Map<String, Object> clientProvidedProps,
final String[] nonConfigurableConfigs) {
// Streams does not allow users to configure certain consumer/producer configurations, for example,
// enable.auto.commit. In cases where user tries to override such non-configurable
// consumer/producer configurations, log a warning and remove the user defined value from the Map.
// Thus, the default values for these consumer/producer configurations that are suitable for
private void checkIfUnexpectedUserSpecifiedClientConfig(final Map<String, Object> clientProvidedProps,
final String[] nonConfigurableConfigs) {
// Streams does not allow users to configure certain client configurations (consumer/producer),
// for example, enable.auto.commit or transactional.id. In cases where user tries to override
// such non-configurable client configurations, log a warning and remove the user defined value
// from the Map. Thus, the default values for these client configurations that are suitable for
// Streams will be used instead.

final String nonConfigurableConfigMessage = "Unexpected user-specified {} config '{}' found. {} setting ({}) will be ignored and the Streams default setting ({}) will be used.";
Expand Down Expand Up @@ -1759,6 +1760,7 @@ public Map<String, Object> getMainConsumerConfigs(final String groupId, final St

// Get main consumer override configs
final Map<String, Object> mainConsumerProps = originalsWithPrefix(MAIN_CONSUMER_PREFIX);
checkIfUnexpectedUserSpecifiedClientConfig(mainConsumerProps, NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS);
consumerProps.putAll(mainConsumerProps);

// this is a hack to work around StreamsConfig constructor inside StreamsPartitionAssignor to avoid casting
Expand Down Expand Up @@ -1789,9 +1791,6 @@ public Map<String, Object> getMainConsumerConfigs(final String groupId, final St
consumerProps.put(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG, getInt(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG));
consumerProps.put(TASK_ASSIGNOR_CLASS_CONFIG, getString(TASK_ASSIGNOR_CLASS_CONFIG));

// disable auto topic creation
consumerProps.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");

// verify that producer batch config is no larger than segment size, then add topic configs required for creating topics
final Map<String, Object> topicProps = originalsWithPrefix(TOPIC_PREFIX, false);
final Map<String, Object> producerProps = getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames());
Expand Down Expand Up @@ -1834,6 +1833,7 @@ public Map<String, Object> getRestoreConsumerConfigs(final String clientId) {

// Get restore consumer override configs
final Map<String, Object> restoreConsumerProps = originalsWithPrefix(RESTORE_CONSUMER_PREFIX);
checkIfUnexpectedUserSpecifiedClientConfig(restoreConsumerProps, NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS);
baseConsumerProps.putAll(restoreConsumerProps);

// no need to set group id for a restore consumer
Expand Down Expand Up @@ -1867,6 +1867,7 @@ public Map<String, Object> getGlobalConsumerConfigs(final String clientId) {

// Get global consumer override configs
final Map<String, Object> globalConsumerProps = originalsWithPrefix(GLOBAL_CONSUMER_PREFIX);
checkIfUnexpectedUserSpecifiedClientConfig(globalConsumerProps, NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS);
baseConsumerProps.putAll(globalConsumerProps);

// no need to set group id for a global consumer
Expand All @@ -1877,6 +1878,7 @@ public Map<String, Object> getGlobalConsumerConfigs(final String clientId) {
// add client id with stream client id prefix
baseConsumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-global-consumer");
baseConsumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");

return baseConsumerProps;
}

Expand All @@ -1893,7 +1895,7 @@ public Map<String, Object> getGlobalConsumerConfigs(final String clientId) {
public Map<String, Object> getProducerConfigs(final String clientId) {
final Map<String, Object> clientProvidedProps = getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames());

checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, NON_CONFIGURABLE_PRODUCER_EOS_CONFIGS);
checkIfUnexpectedUserSpecifiedClientConfig(clientProvidedProps, NON_CONFIGURABLE_PRODUCER_EOS_CONFIGS);

// generate producer configs from original properties and overridden maps
final Map<String, Object> props = new HashMap<>(eosEnabled ? PRODUCER_EOS_OVERRIDES : PRODUCER_DEFAULT_OVERRIDES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,79 @@ public void shouldOverrideAdminDefaultAdminClientEnableTelemetry() {
assertTrue((boolean) returnedProps.get(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG));
}

@Test
public void shouldNotAllowAutoCreateTopicsForConsumers_WithCommonConsumerPrefix() {
// Test with generic consumer.* prefix (affects all consumer types)
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), "true");

try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StreamsConfig.class)) {
appender.setClassLogger(StreamsConfig.class, Level.ERROR);

final StreamsConfig streamsConfig = new StreamsConfig(props);

// Main consumer - verify override is ignored
final Map<String, Object> mainConfigs = streamsConfig.getMainConsumerConfigs("group", "client", 0);
assertEquals("false", mainConfigs.get(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG),
"Main consumer should not allow auto topic creation with consumer.* override");

// Restore consumer - verify override is ignored
final Map<String, Object> restoreConfigs = streamsConfig.getRestoreConsumerConfigs("client");
assertEquals("false", restoreConfigs.get(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG),
"Restore consumer should not allow auto topic creation with consumer.* override");

// Global consumer - verify override is ignored
final Map<String, Object> globalConfigs = streamsConfig.getGlobalConsumerConfigs("client");
assertEquals("false", globalConfigs.get(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG),
"Global consumer should not allow auto topic creation with consumer.* override");

// Verify exactly 1 error is logged (consumer.* prefix is validated once in getCommonConsumerConfigs for each type of consumer)
final List<String> errorMessages = appender.getMessages();
final long errorCount = errorMessages.stream()
.filter(msg -> msg.contains("Unexpected user-specified consumer config 'allow.auto.create.topics' found"))
.count();
assertEquals(3, errorCount,
"Should log exactly 3 error for consumer.* prefix");
}
}

@Test
public void shouldNotAllowAutoCreateTopicsForConsumers_WithSpecificConsumerPrefixes() {
// Test with specific prefixes for each consumer type
props.put(StreamsConfig.mainConsumerPrefix(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), "true");
props.put(StreamsConfig.restoreConsumerPrefix(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), "true");
props.put(StreamsConfig.globalConsumerPrefix(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), "true");

try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StreamsConfig.class)) {
appender.setClassLogger(StreamsConfig.class, Level.ERROR);

final StreamsConfig streamsConfig = new StreamsConfig(props);

// Main consumer - verify override is ignored
final Map<String, Object> mainConfigs = streamsConfig.getMainConsumerConfigs("group", "client", 0);
assertEquals("false", mainConfigs.get(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG),
"Main consumer should not allow auto topic creation with main.consumer.* override");

// Restore consumer - verify override is ignored
final Map<String, Object> restoreConfigs = streamsConfig.getRestoreConsumerConfigs("client");
assertEquals("false", restoreConfigs.get(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG),
"Restore consumer should not allow auto topic creation with restore.consumer.* override");

// Global consumer - verify override is ignored
final Map<String, Object> globalConfigs = streamsConfig.getGlobalConsumerConfigs("client");
assertEquals("false", globalConfigs.get(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG),
"Global consumer should not allow auto topic creation with global.consumer.* override");

// Verify exactly 3 errors are logged (one for each specific prefix)
final List<String> errorMessages = appender.getMessages();
final long errorCount = errorMessages.stream()
.filter(msg -> msg.contains("Unexpected user-specified consumer config 'allow.auto.create.topics' found"))
.count();
assertEquals(3, errorCount,
"Should log exactly 3 errors: one for main.consumer.*, one for restore.consumer.*, one for global.consumer.*");
}
}


@Test
public void shouldSupportNonPrefixedAdminConfigs() {
props.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 10);
Expand Down
Loading