Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions clients/src/main/java/org/apache/kafka/common/Topic.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common;

import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class Topic {

private static final String INVALID_CHARS = "[^a-zA-Z0-9._\\-]";
private static final int MAX_NAME_LENGTH = 249;

public static void validate(String topic) {
if (isEmpty(topic))
throw new org.apache.kafka.common.errors.InvalidTopicException("topic name is illegal, can't be empty");
else if (containsOnlyPeriods(topic))
throw new org.apache.kafka.common.errors.InvalidTopicException("topic name cannot be \".\" or \"..\"");
else if (exceedsMaxLength(topic))
throw new org.apache.kafka.common.errors.InvalidTopicException("topic name is illegal, can't be longer than " + MAX_NAME_LENGTH + " characters");
else if (containsInvalidCharacters(topic)) throw new org.apache.kafka.common.errors.InvalidTopicException("topic name " + topic + " is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-'");
}

public static boolean isEmpty(String topic) {
return topic.length() <= 0;
}

public static boolean containsOnlyPeriods(String topic) {
return topic.equals(".") || topic.equals("..");
}

public static boolean exceedsMaxLength(String topic) {
return topic.length() > MAX_NAME_LENGTH;
}

/**
* Valid characters for Kafka topics are the ASCII alphanumerics, '.', '_', and '-'
*/
public static boolean containsInvalidCharacters(String topic) {
Pattern pattern = Pattern.compile(INVALID_CHARS);
Matcher matcher = pattern.matcher(topic);
return matcher.find();
}

}
59 changes: 59 additions & 0 deletions clients/src/test/java/org/apache/kafka/common/TopicTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common;

import org.junit.Test;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

public class TopicTest {

@Test
public void shouldRecognizeValidTopicNames() {
String[] validTopicNames = {"valid", "TOPIC", "nAmEs", "ar6", "VaL1d", "_0-9_."};

for (String topicName : validTopicNames) {
assertFalse(Topic.isEmpty(topicName));
assertFalse(Topic.containsOnlyPeriods(topicName));
assertFalse(Topic.exceedsMaxLength(topicName));
assertFalse(Topic.containsInvalidCharacters(topicName));
}
}

@Test
public void shouldRecognizeEmptyTopicNames() {
assertTrue(Topic.isEmpty(""));
}

@Test
public void shouldRecognizeTopicNamesThatExceedMaxLength() {
String longName = "ATCG";

for (int i = 0; i < 6; i++) {
longName += longName;
}

assertTrue(Topic.exceedsMaxLength(longName));
}

@Test
public void shouldRecognizeInvalidCharactersInTopicNames() {
Character[] invalidChars = {'/', '\\', ',', '\u0000', ':', '"', '\'', ';', '*', '?', ' ', '\t', '\r', '\n', '='};

for (Character c : invalidChars) {
String topicName = "Is " + c + "illegal";
assertTrue(Topic.containsInvalidCharacters(topicName));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,14 @@ public interface KGroupedStream<K, V> {
* query the value of the key on a parallel running instance of your Kafka Streams application.
* <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
* Therefore, the store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'.
* The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
* user-specified in {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} via parameter
* {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
* provide {@code storeName}, and "-changelog" is a fixed suffix.
* You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
*
* @param storeName the name of the underlying {@link KTable} state store
* @param storeName the name of the underlying {@link KTable} state store; valid characters are ASCII alphanumerics, '.', '_' and '-'
* @return a {@link KTable} that contains "update" records with unmodified keys and values that represent the latest
* (rolling) count (i.e., number of records) for each key
*/
Expand Down Expand Up @@ -133,14 +134,15 @@ public interface KGroupedStream<K, V> {
* query the value of the key on a parallel running instance of your Kafka Streams application.
* <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
* Therefore, the store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'.
* The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
* user-specified in {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} via parameter
* {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
* provide {@code storeName}, and "-changelog" is a fixed suffix.
* You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
*
* @param windows the specification of the aggregation {@link Windows}
* @param storeName the name of the underlying {@link KTable} state store
* @param storeName the name of the underlying {@link KTable} state store; valid characters are ASCII alphanumerics, '.', '_' and '-'
* @return a windowed {@link KTable} that contains "update" records with unmodified keys and values that represent
* the latest (rolling) count (i.e., number of records) for each key within a window
*/
Expand Down Expand Up @@ -205,7 +207,7 @@ <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows,
*
*
* @param sessionWindows the specification of the aggregation {@link SessionWindows}
* @param storeName the name of the state store created from this operation.
* @param storeName the name of the state store created from this operation; valid characters are ASCII alphanumerics, '.', '_' and '-'
* @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
* where each table contains records with unmodified keys and values
* that represent the latest (rolling) count (i.e., number of records) for each key within that window
Expand Down Expand Up @@ -277,14 +279,15 @@ KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows,
* query the value of the key on a parallel running instance of your Kafka Streams application.
* <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
* Therefore, the store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'.
* The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
* user-specified in {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} via parameter
* {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
* provide {@code storeName}, and "-changelog" is a fixed suffix.
* You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
*
* @param reducer a {@link Reducer} that computes a new aggregate result
* @param storeName the name of the underlying {@link KTable} state store
* @param storeName the name of the underlying {@link KTable} state store; valid characters are ASCII alphanumerics, '.', '_' and '-'
* @return a {@link KTable} that contains "update" records with unmodified keys and values that represent the latest
* (rolling) aggregate for each key
*/
Expand Down Expand Up @@ -365,6 +368,7 @@ KTable<K, V> reduce(final Reducer<V> reducer,
* query the value of the key on a parallel running instance of your Kafka Streams application.
* <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
* Therefore, the store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'.
* The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
* user-specified in {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} via parameter
* {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
Expand All @@ -373,7 +377,7 @@ KTable<K, V> reduce(final Reducer<V> reducer,
*
* @param reducer a {@link Reducer} that computes a new aggregate result
* @param windows the specification of the aggregation {@link Windows}
* @param storeName the name of the state store created from this operation
* @param storeName the name of the state store created from this operation; valid characters are ASCII alphanumerics, '.', '_' and '-'
* @return a windowed {@link KTable} that contains "update" records with unmodified keys and values that represent
* the latest (rolling) aggregate for each key within a window
*/
Expand Down Expand Up @@ -459,14 +463,15 @@ <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
* query the value of the key on a parallel running instance of your Kafka Streams application.
* <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
* Therefore, the store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'.
* The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
* user-specified in {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} via parameter
* {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
* provide {@code storeName}, and "-changelog" is a fixed suffix.
* You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
* @param reducer the instance of {@link Reducer}
* @param sessionWindows the specification of the aggregation {@link SessionWindows}
* @param storeName the name of the state store created from this operation
* @param storeName the name of the state store created from this operation; valid characters are ASCII alphanumerics, '.', '_' and '-'
* @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
* where each table contains records with unmodified keys and values
* that represent the latest (rolling) aggregate for each key within that window
Expand Down Expand Up @@ -509,6 +514,7 @@ KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
* query the value of the key on a parallel running instance of your Kafka Streams application.
* <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
* Therefore, the store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'.
* The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
* user-specified in {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} via parameter
* {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
Expand Down Expand Up @@ -562,6 +568,7 @@ KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
* query the value of the key on a parallel running instance of your Kafka Streams application.
* <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
* Therefore, the store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'.
* The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
* user-specified in {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} via parameter
* {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
Expand All @@ -572,7 +579,7 @@ KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
* @param aggregator an {@link Aggregator} that computes a new aggregate result
* @param aggValueSerde aggregate value serdes for materializing the aggregated table,
* if not specified the default serdes defined in the configs will be used
* @param storeName the name of the state store created from this operation
* @param storeName the name of the state store created from this operation; valid characters are ASCII alphanumerics, '.', '_' and '-'
* @param <VR> the value type of the resulting {@link KTable}
* @return a {@link KTable} that contains "update" records with unmodified keys and values that represent the latest
* (rolling) aggregate for each key
Expand Down Expand Up @@ -663,6 +670,7 @@ <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
* query the value of the key on a parallel running instance of your Kafka Streams application.
* <p>
* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
* Therefore, the store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'.
* The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
* user-specified in {@link org.apache.kafka.streams.StreamsConfig StreamsConfig} via parameter
* {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is the
Expand All @@ -676,7 +684,7 @@ <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
* @param aggValueSerde aggregate value serdes for materializing the aggregated table,
* if not specified the default serdes defined in the configs will be used
* @param <VR> the value type of the resulting {@link KTable}
* @param storeName the name of the state store created from this operation
* @param storeName the name of the state store created from this operation; valid characters are ASCII alphanumerics, '.', '_' and '-'
* @return a windowed {@link KTable} that contains "update" records with unmodified keys and values that represent
* the latest (rolling) aggregate for each key within a window
*/
Expand Down Expand Up @@ -773,7 +781,7 @@ <W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> i
* @param aggValueSerde aggregate value serdes for materializing the aggregated table,
* if not specified the default serdes defined in the configs will be used
* @param <T> the value type of the resulting {@link KTable}
* @param storeName the name of the state store created from this operation
* @param storeName the name of the state store created from this operation; valid characters are ASCII alphanumerics, '.', '_' and '-'
* @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s
* where each table contains records with unmodified keys and values with type {@code T}
* that represent the latest (rolling) aggregate for each key within that window
Expand Down
Loading