From 50bc665477fc8fea4eacb129b2af705c95d30277 Mon Sep 17 00:00:00 2001 From: Nikki Thean Date: Fri, 6 Jan 2017 15:36:41 -0500 Subject: [PATCH 1/3] MINOR: Fix small error in javadoc for persistent Stores --- .../src/main/java/org/apache/kafka/streams/state/Stores.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java index f2c3b53a43cc5..437f7c09a4644 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java @@ -337,7 +337,7 @@ public interface KeyValueFactory { * Keep all key-value entries off-heap in a local database, although for durability all entries are recorded in a Kafka * topic that can be read to restore the entries if they are lost. * - * @return the factory to create in-memory key-value stores; never null + * @return the factory to create persistent key-value stores; never null */ PersistentKeyValueFactory persistent(); } From ca95791596a4882c17e5b0419150222ba180d80b Mon Sep 17 00:00:00 2001 From: Nikki Thean Date: Fri, 6 Jan 2017 18:21:38 -0500 Subject: [PATCH 2/3] KAFKA-4607: Validate the names of auto-generated internal topics --- .../kafka/streams/processor/internals/InternalTopicConfig.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java index c23f1347fd3a1..3127397d2dad6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.streams.processor.internals; +import kafka.common.Topic; + import java.util.Map; import java.util.Objects; import java.util.Properties; @@ -36,6 +38,7 @@ public enum CleanupPolicy { compact, delete } public InternalTopicConfig(final String name, final Set defaultCleanupPolicies, final Map logConfig) { Objects.requireNonNull(name, "name can't be null"); + Topic.validate(name); if (defaultCleanupPolicies.isEmpty()) { throw new IllegalArgumentException("Must provide at least one cleanup policy"); } From a235831ae24f0e45cfa3579d01303c6926bbcdf5 Mon Sep 17 00:00:00 2001 From: Nikki Thean Date: Mon, 9 Jan 2017 14:16:32 -0500 Subject: [PATCH 3/3] Throw exception closer to user, add tests --- .../java/org/apache/kafka/common/Topic.java | 58 ++++++++++++++++++ .../org/apache/kafka/common/TopicTest.java | 59 +++++++++++++++++++ .../kafka/streams/kstream/KGroupedStream.java | 26 +++++--- .../kafka/streams/kstream/KGroupedTable.java | 12 ++-- .../apache/kafka/streams/kstream/KTable.java | 12 ++-- .../kstream/internals/AbstractStream.java | 3 + .../kstream/internals/KGroupedStreamImpl.java | 16 +++++ .../kstream/internals/KGroupedTableImpl.java | 9 +++ .../internals/InternalTopicConfig.java | 3 +- .../internals/KGroupedStreamImplTest.java | 54 ++++++++++++++++- .../internals/KGroupedTableImplTest.java | 17 +++++- .../internals/InternalTopicConfigTest.java | 6 ++ 12 files changed, 252 insertions(+), 23 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/common/Topic.java create mode 100644 clients/src/test/java/org/apache/kafka/common/TopicTest.java diff --git a/clients/src/main/java/org/apache/kafka/common/Topic.java b/clients/src/main/java/org/apache/kafka/common/Topic.java new file mode 100644 index 0000000000000..f5aff92f67af4 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/Topic.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class Topic { + + private static final String INVALID_CHARS = "[^a-zA-Z0-9._\\-]"; + private static final int MAX_NAME_LENGTH = 249; + + public static void validate(String topic) { + if (isEmpty(topic)) + throw new org.apache.kafka.common.errors.InvalidTopicException("topic name is illegal, can't be empty"); + else if (containsOnlyPeriods(topic)) + throw new org.apache.kafka.common.errors.InvalidTopicException("topic name cannot be \".\" or \"..\""); + else if (exceedsMaxLength(topic)) + throw new org.apache.kafka.common.errors.InvalidTopicException("topic name is illegal, can't be longer than " + MAX_NAME_LENGTH + " characters"); + else if (containsInvalidCharacters(topic)) throw new org.apache.kafka.common.errors.InvalidTopicException("topic name " + topic + " is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-'"); + } + + public static boolean isEmpty(String topic) { + return topic.length() <= 0; + } + + public static boolean containsOnlyPeriods(String topic) { + return topic.equals(".") || topic.equals(".."); + } + + public static boolean exceedsMaxLength(String topic) { + return topic.length() > MAX_NAME_LENGTH; + } + + /** + * Valid characters for Kafka topics are the ASCII alphanumerics, '.', '_', and '-' + */ + public static boolean containsInvalidCharacters(String topic) { + Pattern pattern = Pattern.compile(INVALID_CHARS); + Matcher matcher = pattern.matcher(topic); + return matcher.find(); + } + +} diff --git a/clients/src/test/java/org/apache/kafka/common/TopicTest.java b/clients/src/test/java/org/apache/kafka/common/TopicTest.java new file mode 100644 index 0000000000000..6102b4e2ff0a7 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/TopicTest.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common; + +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class TopicTest { + + @Test + public void shouldRecognizeValidTopicNames() { + String[] validTopicNames = {"valid", "TOPIC", "nAmEs", "ar6", "VaL1d", "_0-9_."}; + + for (String topicName : validTopicNames) { + assertFalse(Topic.isEmpty(topicName)); + assertFalse(Topic.containsOnlyPeriods(topicName)); + assertFalse(Topic.exceedsMaxLength(topicName)); + assertFalse(Topic.containsInvalidCharacters(topicName)); + } + } + + @Test + public void shouldRecognizeEmptyTopicNames() { + assertTrue(Topic.isEmpty("")); + } + + @Test + public void shouldRecognizeTopicNamesThatExceedMaxLength() { + String longName = "ATCG"; + + for (int i = 0; i < 6; i++) { + longName += longName; + } + + assertTrue(Topic.exceedsMaxLength(longName)); + } + + @Test + public void shouldRecognizeInvalidCharactersInTopicNames() { + Character[] invalidChars = {'/', '\\', ',', '\u0000', ':', '"', '\'', ';', '*', '?', ' ', '\t', '\r', '\n', '='}; + + for (Character c : invalidChars) { + String topicName = "Is " + c + "illegal"; + assertTrue(Topic.containsInvalidCharacters(topicName)); + } + } +} \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java index fc2881ac9d2b9..ad8020e00618c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java @@ -64,13 +64,14 @@ public interface KGroupedStream { * query the value of the key on a parallel running instance of your Kafka Streams application. *

* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. + * Therefore, the store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'. * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is * user-specified in {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} via parameter * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the * provide {@code storeName}, and "-changelog" is a fixed suffix. * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}. * - * @param storeName the name of the underlying {@link KTable} state store + * @param storeName the name of the underlying {@link KTable} state store; valid characters are ASCII alphanumerics, '.', '_' and '-' * @return a {@link KTable} that contains "update" records with unmodified keys and values that represent the latest * (rolling) count (i.e., number of records) for each key */ @@ -133,6 +134,7 @@ public interface KGroupedStream { * query the value of the key on a parallel running instance of your Kafka Streams application. *

* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. + * Therefore, the store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'. * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is * user-specified in {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} via parameter * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the @@ -140,7 +142,7 @@ public interface KGroupedStream { * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}. * * @param windows the specification of the aggregation {@link Windows} - * @param storeName the name of the underlying {@link KTable} state store + * @param storeName the name of the underlying {@link KTable} state store; valid characters are ASCII alphanumerics, '.', '_' and '-' * @return a windowed {@link KTable} that contains "update" records with unmodified keys and values that represent * the latest (rolling) count (i.e., number of records) for each key within a window */ @@ -205,7 +207,7 @@ KTable, Long> count(final Windows windows, * * * @param sessionWindows the specification of the aggregation {@link SessionWindows} - * @param storeName the name of the state store created from this operation. + * @param storeName the name of the state store created from this operation; valid characters are ASCII alphanumerics, '.', '_' and '-' * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s * where each table contains records with unmodified keys and values * that represent the latest (rolling) count (i.e., number of records) for each key within that window @@ -277,6 +279,7 @@ KTable, Long> count(final SessionWindows sessionWindows, * query the value of the key on a parallel running instance of your Kafka Streams application. *

* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. + * Therefore, the store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'. * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is * user-specified in {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} via parameter * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the @@ -284,7 +287,7 @@ KTable, Long> count(final SessionWindows sessionWindows, * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}. * * @param reducer a {@link Reducer} that computes a new aggregate result - * @param storeName the name of the underlying {@link KTable} state store + * @param storeName the name of the underlying {@link KTable} state store; valid characters are ASCII alphanumerics, '.', '_' and '-' * @return a {@link KTable} that contains "update" records with unmodified keys and values that represent the latest * (rolling) aggregate for each key */ @@ -365,6 +368,7 @@ KTable reduce(final Reducer reducer, * query the value of the key on a parallel running instance of your Kafka Streams application. *

* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. + * Therefore, the store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'. * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is * user-specified in {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} via parameter * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the @@ -373,7 +377,7 @@ KTable reduce(final Reducer reducer, * * @param reducer a {@link Reducer} that computes a new aggregate result * @param windows the specification of the aggregation {@link Windows} - * @param storeName the name of the state store created from this operation + * @param storeName the name of the state store created from this operation; valid characters are ASCII alphanumerics, '.', '_' and '-' * @return a windowed {@link KTable} that contains "update" records with unmodified keys and values that represent * the latest (rolling) aggregate for each key within a window */ @@ -459,6 +463,7 @@ KTable, V> reduce(final Reducer reducer, * query the value of the key on a parallel running instance of your Kafka Streams application. *

* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. + * Therefore, the store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'. * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is * user-specified in {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} via parameter * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the @@ -466,7 +471,7 @@ KTable, V> reduce(final Reducer reducer, * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}. * @param reducer the instance of {@link Reducer} * @param sessionWindows the specification of the aggregation {@link SessionWindows} - * @param storeName the name of the state store created from this operation + * @param storeName the name of the state store created from this operation; valid characters are ASCII alphanumerics, '.', '_' and '-' * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s * where each table contains records with unmodified keys and values * that represent the latest (rolling) aggregate for each key within that window @@ -509,6 +514,7 @@ KTable, V> reduce(final Reducer reducer, * query the value of the key on a parallel running instance of your Kafka Streams application. *

* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. + * Therefore, the store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'. * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is * user-specified in {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} via parameter * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the @@ -562,6 +568,7 @@ KTable, V> reduce(final Reducer reducer, * query the value of the key on a parallel running instance of your Kafka Streams application. *

* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. + * Therefore, the store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'. * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is * user-specified in {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} via parameter * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the @@ -572,7 +579,7 @@ KTable, V> reduce(final Reducer reducer, * @param aggregator an {@link Aggregator} that computes a new aggregate result * @param aggValueSerde aggregate value serdes for materializing the aggregated table, * if not specified the default serdes defined in the configs will be used - * @param storeName the name of the state store created from this operation + * @param storeName the name of the state store created from this operation; valid characters are ASCII alphanumerics, '.', '_' and '-' * @param the value type of the resulting {@link KTable} * @return a {@link KTable} that contains "update" records with unmodified keys and values that represent the latest * (rolling) aggregate for each key @@ -663,6 +670,7 @@ KTable aggregate(final Initializer initializer, * query the value of the key on a parallel running instance of your Kafka Streams application. *

* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. + * Therefore, the store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'. * The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is * user-specified in {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} via parameter * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the @@ -676,7 +684,7 @@ KTable aggregate(final Initializer initializer, * @param aggValueSerde aggregate value serdes for materializing the aggregated table, * if not specified the default serdes defined in the configs will be used * @param the value type of the resulting {@link KTable} - * @param storeName the name of the state store created from this operation + * @param storeName the name of the state store created from this operation; valid characters are ASCII alphanumerics, '.', '_' and '-' * @return a windowed {@link KTable} that contains "update" records with unmodified keys and values that represent * the latest (rolling) aggregate for each key within a window */ @@ -773,7 +781,7 @@ KTable, VR> aggregate(final Initializer i * @param aggValueSerde aggregate value serdes for materializing the aggregated table, * if not specified the default serdes defined in the configs will be used * @param the value type of the resulting {@link KTable} - * @param storeName the name of the state store created from this operation + * @param storeName the name of the state store created from this operation; valid characters are ASCII alphanumerics, '.', '_' and '-' * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s * where each table contains records with unmodified keys and values with type {@code T} * that represent the latest (rolling) aggregate for each key within that window diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java index c5875386c1ba9..79b3fa92b0345 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java @@ -41,10 +41,11 @@ public interface KGroupedTable { * store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog" * will be automatically created in Kafka for failure recovery, where "applicationID" * is specified by the user in {@link org.apache.kafka.streams.StreamsConfig}. + * Therefore, the store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'. * * @param adder the instance of {@link Reducer} for addition * @param subtractor the instance of {@link Reducer} for subtraction - * @param storeName the name of the underlying {@link KTable} state store + * @param storeName the name of the underlying {@link KTable} state store; valid characters are ASCII alphanumerics, '.', '_' and '-' * @return a {@link KTable} with the same key and value types as this {@link KGroupedTable}, * containing aggregated values for each key */ @@ -73,13 +74,14 @@ KTable reduce(Reducer adder, * store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog" * will be automatically created in Kafka for failure recovery, where "applicationID" * is specified by the user in {@link org.apache.kafka.streams.StreamsConfig}. + * Therefore, the store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'. * * @param initializer the instance of {@link Initializer} * @param adder the instance of {@link Aggregator} for addition * @param subtractor the instance of {@link Aggregator} for subtraction * @param aggValueSerde value serdes for materializing the aggregated table, * if not specified the default serdes defined in the configs will be used - * @param storeName the name of the underlying {@link KTable} state store + * @param storeName the name of the underlying {@link KTable} state store; valid characters are ASCII alphanumerics, '.', '_' and '-' * @param the value type of the aggregated {@link KTable} * @return a {@link KTable} with same key and aggregated value type {@code T}, * containing aggregated values for each key @@ -97,11 +99,12 @@ KTable aggregate(Initializer initializer, * store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog" * will be automatically created in Kafka for failure recovery, where "applicationID" * is specified by the user in {@link org.apache.kafka.streams.StreamsConfig}. + * Therefore, the store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'. * * @param initializer the instance of {@link Initializer} * @param adder the instance of {@link Aggregator} for addition * @param subtractor the instance of {@link Aggregator} for subtraction - * @param storeName the name of the underlying {@link KTable} state store + * @param storeName the name of the underlying {@link KTable} state store; valid characters are ASCII alphanumerics, '.', '_' and '-' * @param the value type of the aggregated {@link KTable} * @return a {@link KTable} with same key and aggregated value type {@code T}, * containing aggregated values for each key @@ -136,8 +139,9 @@ KTable aggregate(Initializer initializer, * store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog" * will be automatically created in Kafka for failure recovery, where "applicationID" * is specified by the user in {@link org.apache.kafka.streams.StreamsConfig}. + * Therefore, the store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'. * - * @param storeName the name of the underlying {@link KTable} state store + * @param storeName the name of the underlying {@link KTable} state store; valid characters are ASCII alphanumerics, '.', '_' and '-' * @return a {@link KTable} with same key and {@link Long} value type as this {@link KGroupedTable}, * containing the number of values for each key */ diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java index 29be3e15311e2..7f7fde0190016 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java @@ -173,9 +173,10 @@ public interface KTable { * store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog" * will be automatically created in Kafka for failure recovery, where "applicationID" * is specified by the user in {@link org.apache.kafka.streams.StreamsConfig}. + * The store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'. * * @param topic the topic name - * @param storeName the state store name used for this KTable + * @param storeName the state store name used for this KTable; valid characters are ASCII alphanumerics, '.', '_' and '-' * @return a new {@link KTable} that contains the exact same records as this {@link KTable} */ KTable through(String topic, String storeName); @@ -188,11 +189,12 @@ public interface KTable { * store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog" * will be automatically created in Kafka for failure recovery, where "applicationID" * is specified by the user in {@link org.apache.kafka.streams.StreamsConfig}. + * The store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'. * * @param partitioner the function used to determine how records are distributed among partitions of the topic, * if not specified producer's {@link DefaultPartitioner} will be used * @param topic the topic name - * @param storeName the state store name used for this KTable + * @param storeName the state store name used for this KTable; valid characters are ASCII alphanumerics, '.', '_' and '-' * @return a new {@link KTable} that contains the exact same records as this {@link KTable} */ KTable through(StreamPartitioner partitioner, String topic, String storeName); @@ -208,13 +210,14 @@ public interface KTable { * store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog" * will be automatically created in Kafka for failure recovery, where "applicationID" * is specified by the user in {@link org.apache.kafka.streams.StreamsConfig}. + * The store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'. * * @param keySerde key serde used to send key-value pairs, * if not specified the default key serde defined in the configuration will be used * @param valSerde value serde used to send key-value pairs, * if not specified the default value serde defined in the configuration will be used * @param topic the topic name - * @param storeName the state store name used for this KTable + * @param storeName the state store name used for this KTable; valid characters are ASCII alphanumerics, '.', '_' and '-' * @return a new {@link KTable} that contains the exact same records as this {@link KTable} */ KTable through(Serde keySerde, Serde valSerde, String topic, String storeName); @@ -228,6 +231,7 @@ public interface KTable { * store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog" * will be automatically created in Kafka for failure recovery, where "applicationID" * is specified by the user in {@link org.apache.kafka.streams.StreamsConfig}. + * The store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'. * * @param keySerde key serde used to send key-value pairs, * if not specified the default key serde defined in the configuration will be used @@ -238,7 +242,7 @@ public interface KTable { * {@link org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner} will be used * — otherwise {@link DefaultPartitioner} will be used * @param topic the topic name - * @param storeName the state store name used for this KTable + * @param storeName the state store name used for this KTable; valid characters are ASCII alphanumerics, '.', '_' and '-' * @return a new {@link KTable} that contains the exact same records as this {@link KTable} */ KTable through(Serde keySerde, Serde valSerde, StreamPartitioner partitioner, String topic, String storeName); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java index 31a3dc63b3416..749205abb36f4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.kstream.internals; +import org.apache.kafka.common.Topic; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.ValueJoiner; @@ -77,6 +78,7 @@ public static StateStoreSupplier keyValueStore(final Serd final Serde aggValueSerde, final String storeName) { Objects.requireNonNull(storeName, "storeName can't be null"); + Topic.validate(storeName); return storeFactory(keySerde, aggValueSerde, storeName).build(); } @@ -86,6 +88,7 @@ public static StateStoreSupplier windowed final Windows windows, final String storeName) { Objects.requireNonNull(storeName, "storeName can't be null"); + Topic.validate(storeName); return storeFactory(keySerde, aggValSerde, storeName) .windowed(windows.size(), windows.maintainMs(), windows.segments, false) .build(); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java index a4fc7932202ba..8e09d57a19345 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java @@ -14,6 +14,7 @@ */ package org.apache.kafka.streams.kstream.internals; +import org.apache.kafka.common.Topic; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.kstream.Aggregator; @@ -60,6 +61,8 @@ class KGroupedStreamImpl extends AbstractStream implements KGroupedStre @Override public KTable reduce(final Reducer reducer, final String storeName) { + Objects.requireNonNull(storeName, "storeName can't be null"); + Topic.validate(storeName); return reduce(reducer, keyValueStore(keySerde, valSerde, storeName)); } @@ -80,6 +83,8 @@ public KTable reduce(final Reducer reducer, public KTable, V> reduce(final Reducer reducer, final Windows windows, final String storeName) { + Objects.requireNonNull(storeName, "storeName can't be null"); + Topic.validate(storeName); return reduce(reducer, windows, windowedStore(keySerde, valSerde, windows, storeName)); } @@ -103,6 +108,8 @@ public KTable aggregate(final Initializer initializer, final Aggregator aggregator, final Serde aggValueSerde, final String storeName) { + Objects.requireNonNull(storeName, "storeName can't be null"); + Topic.validate(storeName); return aggregate(initializer, aggregator, keyValueStore(keySerde, aggValueSerde, storeName)); } @@ -126,6 +133,8 @@ public KTable, T> aggregate(final Initializer< final Windows windows, final Serde aggValueSerde, final String storeName) { + Objects.requireNonNull(storeName, "storeName can't be null"); + Topic.validate(storeName); return aggregate(initializer, aggregator, windows, windowedStore(keySerde, aggValueSerde, windows, storeName)); } @@ -148,6 +157,8 @@ public KTable, T> aggregate(final Initializer< @Override public KTable count(final String storeName) { + Objects.requireNonNull(storeName, "storeName can't be null"); + Topic.validate(storeName); return count(keyValueStore(keySerde, Serdes.Long(), storeName)); } @@ -169,6 +180,8 @@ public Long apply(K aggKey, V value, Long aggregate) { @Override public KTable, Long> count(final Windows windows, final String storeName) { + Objects.requireNonNull(storeName, "storeName can't be null"); + Topic.validate(storeName); return count(windows, windowedStore(keySerde, Serdes.Long(), windows, storeName)); } @@ -200,6 +213,7 @@ public KTable, T> aggregate(final Initializer initializer, final Serde aggValueSerde, final String storeName) { Objects.requireNonNull(storeName, "storeName can't be null"); + Topic.validate(storeName); return aggregate(initializer, aggregator, sessionMerger, @@ -235,6 +249,7 @@ public KTable, T> aggregate(final Initializer initializer, @SuppressWarnings("unchecked") public KTable, Long> count(final SessionWindows sessionWindows, final String storeName) { Objects.requireNonNull(storeName, "storeName can't be null"); + Topic.validate(storeName); return count(sessionWindows, storeFactory(keySerde, Serdes.Long(), storeName) .sessionWindowed(sessionWindows.maintainMs()).build()); @@ -276,6 +291,7 @@ public KTable, V> reduce(final Reducer reducer, final String storeName) { Objects.requireNonNull(storeName, "storeName can't be null"); + Topic.validate(storeName); return reduce(reducer, sessionWindows, storeFactory(keySerde, valSerde, storeName) .sessionWindowed(sessionWindows.maintainMs()).build()); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java index 4ca69d6972c04..ccff121d2acbf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.kstream.internals; +import org.apache.kafka.common.Topic; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; @@ -65,6 +66,8 @@ public KTable aggregate(Initializer initializer, Aggregator subtractor, Serde aggValueSerde, String storeName) { + Objects.requireNonNull(storeName, "storeName can't be null"); + Topic.validate(storeName); return aggregate(initializer, adder, subtractor, keyValueStore(keySerde, aggValueSerde, storeName)); } @@ -74,6 +77,8 @@ public KTable aggregate(Initializer initializer, Aggregator adder, Aggregator subtractor, String storeName) { + Objects.requireNonNull(storeName, "storeName can't be null"); + Topic.validate(storeName); return aggregate(initializer, adder, subtractor, null, storeName); } @@ -126,6 +131,8 @@ private KTable doAggregate(ProcessorSupplier> aggregateSu public KTable reduce(Reducer adder, Reducer subtractor, String storeName) { + Objects.requireNonNull(storeName, "storeName can't be null"); + Topic.validate(storeName); return reduce(adder, subtractor, keyValueStore(keySerde, valSerde, storeName)); } @@ -142,6 +149,8 @@ public KTable reduce(Reducer adder, @Override public KTable count(String storeName) { + Objects.requireNonNull(storeName, "storeName can't be null"); + Topic.validate(storeName); return count(keyValueStore(keySerde, Serdes.Long(), storeName)); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java index 3127397d2dad6..ce86db32ae0f6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.streams.processor.internals; -import kafka.common.Topic; +import org.apache.kafka.common.Topic; import java.util.Map; import java.util.Objects; @@ -39,6 +39,7 @@ public enum CleanupPolicy { compact, delete } public InternalTopicConfig(final String name, final Set defaultCleanupPolicies, final Map logConfig) { Objects.requireNonNull(name, "name can't be null"); Topic.validate(name); + if (defaultCleanupPolicies.isEmpty()) { throw new IllegalArgumentException("Must provide at least one cleanup policy"); } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java index 82e8c6cf09925..2e04390c8b37c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.kstream.internals; +import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.ForeachAction; @@ -68,6 +69,12 @@ public void shouldNotHaveNullStoreNameOnReduce() throws Exception { groupedStream.reduce(MockReducer.STRING_ADDER, storeName); } + @Test(expected = InvalidTopicException.class) + public void shouldNotHaveInvalidStoreNameOnReduce() throws Exception { + String storeName = "~foo bar~"; + groupedStream.reduce(MockReducer.STRING_ADDER, storeName); + } + @Test(expected = NullPointerException.class) public void shouldNotHaveNullStoreSupplierOnReduce() throws Exception { StateStoreSupplier storeSupplier = null; @@ -90,6 +97,12 @@ public void shouldNotHaveNullStoreNameWithWindowedReduce() throws Exception { groupedStream.reduce(MockReducer.STRING_ADDER, TimeWindows.of(10), storeName); } + @Test(expected = InvalidTopicException.class) + public void shouldNotHaveInvalidStoreNameWithWindowedReduce() throws Exception { + String storeName = "~foo bar~"; + groupedStream.reduce(MockReducer.STRING_ADDER, TimeWindows.of(10), storeName); + } + @Test(expected = NullPointerException.class) public void shouldNotHaveNullInitializerOnAggregate() throws Exception { groupedStream.aggregate(null, MockAggregator.STRING_ADDER, Serdes.String(), "store"); @@ -106,6 +119,12 @@ public void shouldNotHaveNullStoreNameOnAggregate() throws Exception { groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, Serdes.String(), storeName); } + @Test(expected = InvalidTopicException.class) + public void shouldNotHaveInvalidStoreNameOnAggregate() throws Exception { + String storeName = "~foo bar~"; + groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, Serdes.String(), storeName); + } + @Test(expected = NullPointerException.class) public void shouldNotHaveNullInitializerOnWindowedAggregate() throws Exception { groupedStream.aggregate(null, MockAggregator.STRING_ADDER, TimeWindows.of(10), Serdes.String(), "store"); @@ -127,6 +146,12 @@ public void shouldNotHaveNullStoreNameOnWindowedAggregate() throws Exception { groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, TimeWindows.of(10), Serdes.String(), storeName); } + @Test(expected = InvalidTopicException.class) + public void shouldNotHaveInvalidStoreNameOnWindowedAggregate() throws Exception { + String storeName = "~foo bar~"; + groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, TimeWindows.of(10), Serdes.String(), storeName); + } + @Test(expected = NullPointerException.class) public void shouldNotHaveNullStoreSupplierOnWindowedAggregate() throws Exception { StateStoreSupplier storeSupplier = null; @@ -258,8 +283,14 @@ public void shouldNotAcceptNullStoreNameWhenReducingSessionWindows() throws Exce groupedStream.reduce(MockReducer.STRING_ADDER, SessionWindows.with(10), (String) null); } + @Test(expected = InvalidTopicException.class) + public void shouldNotAcceptInvalidStoreNameWhenReducingSessionWindows() throws Exception { + String storeName = "~foo bar~"; + groupedStream.reduce(MockReducer.STRING_ADDER, SessionWindows.with(10), storeName); + } + @Test(expected = NullPointerException.class) - public void shouldNotAcceptNullStateStoreSupplierNameWhenReducingSessionWindows() throws Exception { + public void shouldNotAcceptNullStateStoreSupplierWhenReducingSessionWindows() throws Exception { groupedStream.reduce(MockReducer.STRING_ADDER, SessionWindows.with(10), (StateStoreSupplier) null); } @@ -313,8 +344,19 @@ public String apply(final String aggKey, final String aggOne, final String aggTw }, SessionWindows.with(10), Serdes.String(), (String) null); } + @Test(expected = InvalidTopicException.class) + public void shouldNotAcceptInvalidStoreNameWhenAggregatingSessionWindows() throws Exception { + String storeName = "~foo bar~"; + groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, new Merger() { + @Override + public String apply(final String aggKey, final String aggOne, final String aggTwo) { + return null; + } + }, SessionWindows.with(10), Serdes.String(), storeName); + } + @Test(expected = NullPointerException.class) - public void shouldNotAcceptNullStateStoreSupplierNameWhenAggregatingSessionWindows() throws Exception { + public void shouldNotAcceptNullStateStoreSupplierWhenAggregatingSessionWindows() throws Exception { groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, new Merger() { @Override public String apply(final String aggKey, final String aggOne, final String aggTwo) { @@ -333,8 +375,14 @@ public void shouldNotAcceptNullStoreNameWhenCountingSessionWindows() throws Exce groupedStream.count(SessionWindows.with(90), (String) null); } + @Test(expected = InvalidTopicException.class) + public void shouldNotAcceptInvalidStoreNameWhenCountingSessionWindows() throws Exception { + String storeName = "~foo bar~"; + groupedStream.count(SessionWindows.with(90), storeName); + } + @Test(expected = NullPointerException.class) - public void shouldNotAcceptNullStoreStoreSupplierNameWhenCountingSessionWindows() throws Exception { + public void shouldNotAcceptNullStoreStoreSupplierWhenCountingSessionWindows() throws Exception { groupedStream.count(SessionWindows.with(90), (StateStoreSupplier) null); } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java index 85e2073a541f3..1180529759695 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java @@ -17,12 +17,14 @@ package org.apache.kafka.streams.kstream.internals; +import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.kstream.ForeachAction; import org.apache.kafka.streams.kstream.KGroupedTable; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.processor.StateStoreSupplier; +import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockAggregator; import org.apache.kafka.test.MockInitializer; @@ -55,6 +57,12 @@ public void shouldNotAllowNullStoreNameOnAggregate() throws Exception { groupedTable.aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, MockAggregator.STRING_REMOVER, storeName); } + @Test(expected = InvalidTopicException.class) + public void shouldNotAllowInvalidStoreNameOnAggregate() throws Exception { + String storeName = "~foo bar~"; + groupedTable.aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, MockAggregator.STRING_REMOVER, storeName); + } + @Test(expected = NullPointerException.class) public void shouldNotAllowNullInitializerOnAggregate() throws Exception { groupedTable.aggregate(null, MockAggregator.STRING_ADDER, MockAggregator.STRING_REMOVER, "store"); @@ -86,10 +94,15 @@ public void shouldNotAllowNullStoreNameOnReduce() throws Exception { groupedTable.reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, storeName); } + @Test(expected = InvalidTopicException.class) + public void shouldNotAllowInvalidStoreNameOnReduce() throws Exception { + String storeName = "~foo bar~"; + groupedTable.reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, storeName); + } + @Test(expected = NullPointerException.class) public void shouldNotAllowNullStoreSupplierOnReduce() throws Exception { - StateStoreSupplier storeName = null; - groupedTable.reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, storeName); + groupedTable.reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, (StateStoreSupplier) null); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java index b0a198bd00844..05ad4cfc7c24b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.utils.Utils; import org.junit.Test; @@ -46,6 +47,11 @@ public void shouldThrowIfNameIsNull() throws Exception { new InternalTopicConfig(null, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), Collections.emptyMap()); } + @Test(expected = InvalidTopicException.class) + public void shouldThrowIfNameIsInvalid() throws Exception { + new InternalTopicConfig("foo bar baz", Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), Collections.emptyMap()); + } + @Test public void shouldConfigureRetentionMsWithAdditionalRetentionWhenCompactAndDelete() throws Exception { final InternalTopicConfig topicConfig = new InternalTopicConfig("name",