Skip to content

Commit

Permalink
KAFKA-3018: added topic name validator to ProducerRecord
Browse files Browse the repository at this point in the history
  • Loading branch information
Chi Hoang committed Feb 24, 2016
1 parent d142f82 commit de2e599
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
*/
package org.apache.kafka.clients.producer;

import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.utils.TopicValidator;

/**
* A key/value pair to be sent to Kafka. This consists of a topic name to which the record is being sent, an optional
* partition number, and an optional key and value.
Expand Down Expand Up @@ -56,6 +59,8 @@ public final class ProducerRecord<K, V> {
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
if (topic == null)
throw new IllegalArgumentException("Topic cannot be null");
if (!TopicValidator.isValidateTopicName(topic))
throw new InvalidTopicException(topic + " is not a valid topic name.");
if (timestamp != null && timestamp < 0)
throw new IllegalArgumentException("Invalid timestamp " + timestamp);
this.topic = topic;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common.utils;

import java.util.regex.Pattern;

/**
* Validate topics. Consider consolidating the logic with core/Topic.scala.
*/
public class TopicValidator {
private static final Pattern VALID_PATTERN;

static {
VALID_PATTERN = Pattern.compile("[a-zA-Z0-9\\._\\-]");
}

public static boolean isValidateTopicName(final String topicName) {
String leftOvers = VALID_PATTERN.matcher(topicName).replaceAll("");

return leftOvers.length() == 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.producer;

import org.apache.kafka.common.errors.InvalidTopicException;
import org.junit.Test;

import static org.junit.Assert.assertEquals;
Expand All @@ -25,28 +26,34 @@ public class ProducerRecordTest {

@Test
public void testEqualsAndHashCode() {
ProducerRecord<String, Integer> producerRecord = new ProducerRecord<String, Integer>("test", 1 , "key", 1);
ProducerRecord<String, Integer> producerRecord = new ProducerRecord<String, Integer>("test", 1, "key", 1);
assertEquals(producerRecord, producerRecord);
assertEquals(producerRecord.hashCode(), producerRecord.hashCode());

ProducerRecord<String, Integer> equalRecord = new ProducerRecord<String, Integer>("test", 1 , "key", 1);
ProducerRecord<String, Integer> equalRecord = new ProducerRecord<String, Integer>("test", 1, "key", 1);
assertEquals(producerRecord, equalRecord);
assertEquals(producerRecord.hashCode(), equalRecord.hashCode());

ProducerRecord<String, Integer> topicMisMatch = new ProducerRecord<String, Integer>("test-1", 1 , "key", 1);
ProducerRecord<String, Integer> topicMisMatch = new ProducerRecord<String, Integer>("test-1", 1, "key", 1);
assertFalse(producerRecord.equals(topicMisMatch));

ProducerRecord<String, Integer> partitionMismatch = new ProducerRecord<String, Integer>("test", 2 , "key", 1);
ProducerRecord<String, Integer> partitionMismatch = new ProducerRecord<String, Integer>("test", 2, "key", 1);
assertFalse(producerRecord.equals(partitionMismatch));

ProducerRecord<String, Integer> keyMisMatch = new ProducerRecord<String, Integer>("test", 1 , "key-1", 1);
ProducerRecord<String, Integer> keyMisMatch = new ProducerRecord<String, Integer>("test", 1, "key-1", 1);
assertFalse(producerRecord.equals(keyMisMatch));

ProducerRecord<String, Integer> valueMisMatch = new ProducerRecord<String, Integer>("test", 1 , "key", 2);
ProducerRecord<String, Integer> valueMisMatch = new ProducerRecord<String, Integer>("test", 1, "key", 2);
assertFalse(producerRecord.equals(valueMisMatch));

ProducerRecord<String, Integer> nullFieldsRecord = new ProducerRecord<String, Integer>("topic", null, null, null, null);
assertEquals(nullFieldsRecord, nullFieldsRecord);
assertEquals(nullFieldsRecord.hashCode(), nullFieldsRecord.hashCode());
}

@Test(expected = InvalidTopicException.class)
public void testInvalidTopicName() {
String topicName = "\"this/name/is/invalid\"";
ProducerRecord<String, Integer> record = new ProducerRecord<String, Integer>(topicName, "key", 1);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common.utils;

import org.junit.Assert;
import org.junit.Test;

/**
* Tests for TopicValidator
*/
public class TopicValidatorTest {

@Test
public void testIsValidateTopicName() throws Exception {
Assert.assertEquals(true, TopicValidator.isValidateTopicName("helloworld"));
Assert.assertEquals(true, TopicValidator.isValidateTopicName("hello-world"));
Assert.assertEquals(true, TopicValidator.isValidateTopicName("hello_world"));

Assert.assertEquals(false, TopicValidator.isValidateTopicName("hello world"));
Assert.assertEquals(false, TopicValidator.isValidateTopicName("hello$world"));
Assert.assertEquals(false, TopicValidator.isValidateTopicName("hello*world"));
Assert.assertEquals(false, TopicValidator.isValidateTopicName("hello/world"));
Assert.assertEquals(false, TopicValidator.isValidateTopicName("hello\\world"));
Assert.assertEquals(false, TopicValidator.isValidateTopicName("\"hello-world\""));
}
}

0 comments on commit de2e599

Please sign in to comment.