diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 083ca82e549d7..2672de8d47f36 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -43,6 +43,7 @@ import org.apache.kafka.streams.errors.ProcessingExceptionHandler; import org.apache.kafka.streams.errors.ProductionExceptionHandler; import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.internals.ApplicationServerConfigValidator; import org.apache.kafka.streams.internals.StreamsConfigUtils; import org.apache.kafka.streams.internals.UpgradeFromValues; import org.apache.kafka.streams.kstream.SessionWindowedDeserializer; @@ -1075,6 +1076,7 @@ public class StreamsConfig extends AbstractConfig { .define(APPLICATION_SERVER_CONFIG, Type.STRING, "", + new ApplicationServerConfigValidator(), Importance.LOW, APPLICATION_SERVER_DOC) .define(BUFFERED_RECORDS_PER_PARTITION_CONFIG, diff --git a/streams/src/main/java/org/apache/kafka/streams/internals/ApplicationServerConfigValidator.java b/streams/src/main/java/org/apache/kafka/streams/internals/ApplicationServerConfigValidator.java new file mode 100644 index 0000000000000..6ba884c6cfde4 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/internals/ApplicationServerConfigValidator.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.internals; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.utils.Utils; + + +public class ApplicationServerConfigValidator implements ConfigDef.Validator { + + @Override + public void ensureValid(final String name, final Object value) { + if (!(value instanceof String)) { + throw new ConfigException(name + " must be a string"); + } + + final String endPoint = (String) value; + + if (Utils.isBlank(endPoint)) { + return; + } + + final String host = Utils.getHost(endPoint); + final Integer port; + try { + port = Utils.getPort(endPoint); + } catch (final NumberFormatException e) { + throw new ConfigException(name, value, "Invalid port: " + e.getMessage()); + } + + if (host == null || port == null) { + throw new ConfigException( + name, value, String.format("Error parsing host address %s. Expected format host:port.", endPoint) + ); + } + } + + @Override + public String toString() { + return "A host:port pair, protocol://host:port, or an empty string"; + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index 34db1ccc25302..1170550b33333 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -249,7 +249,7 @@ public void consumerConfigMustContainStreamPartitionAssignorConfig() { props.put(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, 9); props.put(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, 99_999L); props.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, 7L); - props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "dummy:host"); + props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "dummy:8080"); props.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG), 1024 * 1024); final StreamsConfig streamsConfig = new StreamsConfig(props); final Map returnedProps = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx); @@ -264,7 +264,7 @@ public void consumerConfigMustContainStreamPartitionAssignorConfig() { returnedProps.get(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG) ); assertEquals(7L, returnedProps.get(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG)); - assertEquals("dummy:host", returnedProps.get(StreamsConfig.APPLICATION_SERVER_CONFIG)); + assertEquals("dummy:8080", returnedProps.get(StreamsConfig.APPLICATION_SERVER_CONFIG)); assertEquals(1024 * 1024, returnedProps.get(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG))); } @@ -1866,6 +1866,22 @@ public void shouldLogWarningWhenProcessingExceptionHandlerIsNotEnabledOnGlobalTh } } + @ParameterizedTest + @ValueSource(strings = {"dummy:host", "dummy:9999999999999999999999999", "dummy", "dummy:", ":port"}) + public void shouldThrowConfigExceptionWithInvalidApplicationServerConfigValue(final String applicationServerConfig) { + props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, applicationServerConfig); + assertThrows(ConfigException.class, () -> new StreamsConfig(props)); + } + + @ParameterizedTest + @ValueSource(strings = {"", "127.0.0.1:8080", "localhost:8080", "[::1]:8080", "http://localhost:8080"}) + public void shouldAcceptWithValidApplicationServerConfigValue(final String applicationServerConfigValue) { + props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, applicationServerConfigValue); + final StreamsConfig streamsConfig = new StreamsConfig(props); + final Map returnedProps = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx); + assertEquals(applicationServerConfigValue, returnedProps.get(StreamsConfig.APPLICATION_SERVER_CONFIG)); + } + @SuppressWarnings("deprecation") @Test public void shouldNotLogWarningWhenProcessingExceptionHandlerIsEnabledOnGlobalThread() {