From 97689f6b66e74e4b041b6a71714e353b11ce0826 Mon Sep 17 00:00:00 2001 From: dengziming Date: Thu, 17 Mar 2022 16:25:45 +0800 Subject: [PATCH] KAFKA-13743: Prevent topics with conflicting metrics names from being created in KRaft mode #11910 In ZK mode, the topic "foo_bar" will conflict with "foo.bar" because of limitations in metric names. We should implement this in KRaft mode. This PR also changes TopicCommandIntegrationTest to support KRaft mode. Reviewers: Colin P. McCabe --- .../apache/kafka/common/internals/Topic.java | 13 +- .../kafka/common/internals/TopicTest.java | 9 + .../admin/TopicCommandIntegrationTest.scala | 306 +++++++++++------- .../controller/ReplicationControlManager.java | 51 ++- .../ReplicationControlManagerTest.java | 21 +- 5 files changed, 278 insertions(+), 122 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/internals/Topic.java b/clients/src/main/java/org/apache/kafka/common/internals/Topic.java index 7a5fefb3d9f2..3c93ef87b5c9 100644 --- a/clients/src/main/java/org/apache/kafka/common/internals/Topic.java +++ b/clients/src/main/java/org/apache/kafka/common/internals/Topic.java @@ -67,6 +67,17 @@ public static boolean hasCollisionChars(String topic) { return topic.contains("_") || topic.contains("."); } + /** + * Unify topic name with a period ('.') or underscore ('_'), this is only used to check collision and will not + * be used to really change topic name. + * + * @param topic A topic to unify + * @return A unified topic name + */ + public static String unifyCollisionChars(String topic) { + return topic.replace('.', '_'); + } + /** * Returns true if the topicNames collide due to a period ('.') or underscore ('_') in the same position. * @@ -75,7 +86,7 @@ public static boolean hasCollisionChars(String topic) { * @return true if the topics collide */ public static boolean hasCollision(String topicA, String topicB) { - return topicA.replace('.', '_').equals(topicB.replace('.', '_')); + return unifyCollisionChars(topicA).equals(unifyCollisionChars(topicB)); } /** diff --git a/clients/src/test/java/org/apache/kafka/common/internals/TopicTest.java b/clients/src/test/java/org/apache/kafka/common/internals/TopicTest.java index 9bf237fb1b31..03c0811fa450 100644 --- a/clients/src/test/java/org/apache/kafka/common/internals/TopicTest.java +++ b/clients/src/test/java/org/apache/kafka/common/internals/TopicTest.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.List; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -81,6 +82,14 @@ public void testTopicHasCollisionChars() { assertTrue(Topic.hasCollisionChars(topic)); } + @Test + public void testUnifyCollisionChars() { + assertEquals("topic", Topic.unifyCollisionChars("topic")); + assertEquals("_topic", Topic.unifyCollisionChars(".topic")); + assertEquals("_topic", Topic.unifyCollisionChars("_topic")); + assertEquals("__topic", Topic.unifyCollisionChars("_.topic")); + } + @Test public void testTopicHasCollision() { List periodFirstMiddleLastNone = Arrays.asList(".topic", "to.pic", "topic.", "topic"); diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandIntegrationTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandIntegrationTest.scala index 9a1fe378f6cf..ee7e64957ebb 100644 --- a/core/src/test/scala/unit/kafka/admin/TopicCommandIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/admin/TopicCommandIntegrationTest.scala @@ -17,23 +17,24 @@ package kafka.admin import java.util.{Collection, Collections, Optional, Properties} - import kafka.admin.TopicCommand.{TopicCommandOptions, TopicService} import kafka.integration.KafkaServerTestHarness -import kafka.server.{ConfigType, KafkaConfig} -import kafka.utils.{Logging, TestUtils} +import kafka.server.KafkaConfig +import kafka.utils.{Logging, TestInfoUtils, TestUtils} import kafka.zk.{ConfigEntityChangeNotificationZNode, DeleteTopicsTopicZNode} import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.admin._ import org.apache.kafka.common.config.{ConfigException, ConfigResource, TopicConfig} -import org.apache.kafka.common.errors.{ClusterAuthorizationException, ThrottlingQuotaExceededException, TopicExistsException} +import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidTopicException, ThrottlingQuotaExceededException, TopicExistsException} import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.{Node, TopicPartition, TopicPartitionInfo} import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo} +import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource import org.mockito.ArgumentMatcher import org.mockito.ArgumentMatchers.{eq => eqThat, _} import org.mockito.Mockito._ @@ -54,7 +55,7 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi */ override def generateConfigs: Seq[KafkaConfig] = TestUtils.createBrokerConfigs( numConfigs = 6, - zkConnect = zkConnect, + zkConnect = zkConnectOrNull, rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1", 4 -> "rack3", 5 -> "rack3"), numPartitions = numPartitions, defaultReplicationFactor = defaultReplicationFactor, @@ -76,7 +77,7 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi } private[this] def waitForTopicCreated(topicName: String, timeout: Int = 10000): Unit = { - TestUtils.waitForPartitionMetadata(servers, topicName, partition = 0, timeout) + TestUtils.waitForPartitionMetadata(brokers, topicName, partition = 0, timeout) } @BeforeEach @@ -98,16 +99,18 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi topicService.close() } - @Test - def testCreate(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testCreate(quorum: String): Unit = { createAndWaitTopic(new TopicCommandOptions( Array("--partitions", "2", "--replication-factor", "1", "--topic", testTopicName))) adminClient.listTopics().names().get().contains(testTopicName) } - @Test - def testCreateWithDefaults(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testCreateWithDefaults(quorum: String): Unit = { createAndWaitTopic(new TopicCommandOptions(Array("--topic", testTopicName))) val partitions = adminClient @@ -120,8 +123,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi assertEquals(partitions.get(0).replicas().size(), defaultReplicationFactor) } - @Test - def testCreateWithDefaultReplication(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testCreateWithDefaultReplication(quorum: String): Unit = { createAndWaitTopic(new TopicCommandOptions( Array("--topic", testTopicName, "--partitions", "2"))) @@ -135,8 +139,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi assertEquals(partitions.get(0).replicas().size(), defaultReplicationFactor) } - @Test - def testCreateWithDefaultPartitions(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testCreateWithDefaultPartitions(quorum: String): Unit = { createAndWaitTopic(new TopicCommandOptions( Array("--topic", testTopicName, "--replication-factor", "2"))) @@ -151,8 +156,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi assertEquals(partitions.get(0).replicas().size(), 2) } - @Test - def testCreateWithConfigs(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testCreateWithConfigs(quorum: String): Unit = { val configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName) createAndWaitTopic(new TopicCommandOptions( Array("--partitions", "2", "--replication-factor", "2", "--topic", testTopicName, "--config", "delete.retention.ms=1000"))) @@ -163,8 +169,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi assertEquals(1000, Integer.valueOf(configs.get("delete.retention.ms").value())) } - @Test - def testCreateWhenAlreadyExists(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testCreateWhenAlreadyExists(quorum: String): Unit = { val numPartitions = 1 // create the topic @@ -176,15 +183,17 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi assertThrows(classOf[TopicExistsException], () => topicService.createTopic(createOpts)) } - @Test - def testCreateWhenAlreadyExistsWithIfNotExists(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testCreateWhenAlreadyExistsWithIfNotExists(quorum: String): Unit = { val createOpts = new TopicCommandOptions(Array("--topic", testTopicName, "--if-not-exists")) createAndWaitTopic(createOpts) topicService.createTopic(createOpts) } - @Test - def testCreateWithReplicaAssignment(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testCreateWithReplicaAssignment(quorum: String): Unit = { // create the topic val createOpts = new TopicCommandOptions( Array("--replica-assignment", "5:4,3:2,1:0", "--topic", testTopicName)) @@ -202,37 +211,42 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi assertEquals(List(1, 0), partitions.get(2).replicas().asScala.map(_.id())) } - @Test - def testCreateWithInvalidReplicationFactor(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testCreateWithInvalidReplicationFactor(quorum: String): Unit = { assertThrows(classOf[IllegalArgumentException], () => topicService.createTopic(new TopicCommandOptions( Array("--partitions", "2", "--replication-factor", (Short.MaxValue+1).toString, "--topic", testTopicName)))) } - @Test - def testCreateWithNegativeReplicationFactor(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testCreateWithNegativeReplicationFactor(quorum: String): Unit = { assertThrows(classOf[IllegalArgumentException], () => topicService.createTopic(new TopicCommandOptions( Array("--partitions", "2", "--replication-factor", "-1", "--topic", testTopicName)))) } - @Test - def testCreateWithNegativePartitionCount(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testCreateWithNegativePartitionCount(quorum: String): Unit = { assertThrows(classOf[IllegalArgumentException], () => topicService.createTopic(new TopicCommandOptions( Array("--partitions", "-1", "--replication-factor", "1", "--topic", testTopicName)))) } - @Test - def testInvalidTopicLevelConfig(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testInvalidTopicLevelConfig(quorum: String): Unit = { val createOpts = new TopicCommandOptions( Array("--partitions", "1", "--replication-factor", "1", "--topic", testTopicName, "--config", "message.timestamp.type=boom")) assertThrows(classOf[ConfigException], () => topicService.createTopic(createOpts)) } - @Test - def testListTopics(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testListTopics(quorum: String): Unit = { createAndWaitTopic(new TopicCommandOptions( Array("--partitions", "1", "--replication-factor", "1", "--topic", testTopicName))) @@ -242,8 +256,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi assertTrue(output.contains(testTopicName)) } - @Test - def testListTopicsWithIncludeList(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testListTopicsWithIncludeList(quorum: String): Unit = { val topic1 = "kafka.testTopic1" val topic2 = "kafka.testTopic2" val topic3 = "oooof.testTopic1" @@ -264,8 +279,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi assertFalse(output.contains(topic3)) } - @Test - def testListTopicsWithExcludeInternal(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testListTopicsWithExcludeInternal(quorum: String): Unit = { val topic1 = "kafka.testTopic1" adminClient.createTopics( List(new NewTopic(topic1, 2, 2.toShort), @@ -280,8 +296,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi assertFalse(output.contains(Topic.GROUP_METADATA_TOPIC_NAME)) } - @Test - def testAlterPartitionCount(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testAlterPartitionCount(quorum: String): Unit = { adminClient.createTopics( List(new NewTopic(testTopicName, 2, 2.toShort)).asJavaCollection).all().get() waitForTopicCreated(testTopicName) @@ -289,12 +306,16 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi topicService.alterTopic(new TopicCommandOptions( Array("--topic", testTopicName, "--partitions", "3"))) + TestUtils.waitUntilTrue( + () => brokers.forall(_.metadataCache.getTopicPartitions(testTopicName).size == 3), + "Timeout waiting new assignment propagate to broker") val topicDescription = adminClient.describeTopics(Collections.singletonList(testTopicName)).topicNameValues().get(testTopicName).get() assertTrue(topicDescription.partitions().size() == 3) } - @Test - def testAlterAssignment(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testAlterAssignment(quorum: String): Unit = { adminClient.createTopics( Collections.singletonList(new NewTopic(testTopicName, 2, 2.toShort))).all().get() waitForTopicCreated(testTopicName) @@ -307,8 +328,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi assertEquals(List(4,2), topicDescription.partitions().get(2).replicas().asScala.map(_.id())) } - @Test - def testAlterAssignmentWithMoreAssignmentThanPartitions(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testAlterAssignmentWithMoreAssignmentThanPartitions(quorum: String): Unit = { adminClient.createTopics( List(new NewTopic(testTopicName, 2, 2.toShort)).asJavaCollection).all().get() waitForTopicCreated(testTopicName) @@ -318,8 +340,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi Array("--topic", testTopicName, "--replica-assignment", "5:3,3:1,4:2,3:2", "--partitions", "3")))) } - @Test - def testAlterAssignmentWithMorePartitionsThanAssignment(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testAlterAssignmentWithMorePartitionsThanAssignment(quorum: String): Unit = { adminClient.createTopics( List(new NewTopic(testTopicName, 2, 2.toShort)).asJavaCollection).all().get() waitForTopicCreated(testTopicName) @@ -329,8 +352,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi Array("--topic", testTopicName, "--replica-assignment", "5:3,3:1,4:2", "--partitions", "6")))) } - @Test - def testAlterWithInvalidPartitionCount(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testAlterWithInvalidPartitionCount(quorum: String): Unit = { createAndWaitTopic(new TopicCommandOptions( Array("--partitions", "1", "--replication-factor", "1", "--topic", testTopicName))) @@ -339,22 +363,25 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi Array("--partitions", "-1", "--topic", testTopicName)))) } - @Test - def testAlterWhenTopicDoesntExist(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testAlterWhenTopicDoesntExist(quorum: String): Unit = { // alter a topic that does not exist without --if-exists val alterOpts = new TopicCommandOptions(Array("--topic", testTopicName, "--partitions", "1")) val topicService = TopicService(adminClient) assertThrows(classOf[IllegalArgumentException], () => topicService.alterTopic(alterOpts)) } - @Test - def testAlterWhenTopicDoesntExistWithIfExists(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testAlterWhenTopicDoesntExistWithIfExists(quorum: String): Unit = { topicService.alterTopic(new TopicCommandOptions( Array("--topic", testTopicName, "--partitions", "1", "--if-exists"))) } - @Test - def testCreateAlterTopicWithRackAware(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testCreateAlterTopicWithRackAware(quorum: String): Unit = { val rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1", 4 -> "rack3", 5 -> "rack3") val numPartitions = 18 @@ -365,9 +392,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi "--topic", testTopicName)) createAndWaitTopic(createOpts) - var assignment = zkClient.getReplicaAssignmentForTopics(Set(testTopicName)).map { case (tp, replicas) => - tp.partition -> replicas - } + var assignment = adminClient.describeTopics(Collections.singletonList(testTopicName)) + .allTopicNames().get().get(testTopicName).partitions() + .asScala.map(info => info.partition() -> info.replicas().asScala.map(_.id())).toMap checkReplicaDistribution(assignment, rackInfo, rackInfo.size, numPartitions, replicationFactor) val alteredNumPartitions = 36 @@ -376,14 +403,19 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi "--partitions", alteredNumPartitions.toString, "--topic", testTopicName)) topicService.alterTopic(alterOpts) - assignment = zkClient.getReplicaAssignmentForTopics(Set(testTopicName)).map { case (tp, replicas) => - tp.partition -> replicas - } + + TestUtils.waitUntilTrue( + () => brokers.forall(_.metadataCache.getTopicPartitions(testTopicName).size == alteredNumPartitions), + "Timeout waiting new assignment propagate to broker") + assignment = adminClient.describeTopics(Collections.singletonList(testTopicName)) + .allTopicNames().get().get(testTopicName).partitions() + .asScala.map(info => info.partition() -> info.replicas().asScala.map(_.id())).toMap checkReplicaDistribution(assignment, rackInfo, rackInfo.size, alteredNumPartitions, replicationFactor) } - @Test - def testConfigPreservationAcrossPartitionAlteration(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testConfigPreservationAcrossPartitionAlteration(quorum: String): Unit = { val numPartitionsOriginal = 1 val cleanupKey = "cleanup.policy" val cleanupVal = "compact" @@ -395,25 +427,30 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi "--config", cleanupKey + "=" + cleanupVal, "--topic", testTopicName)) createAndWaitTopic(createOpts) - val props = adminZkClient.fetchEntityConfig(ConfigType.Topic, testTopicName) - assertTrue(props.containsKey(cleanupKey), "Properties after creation don't contain " + cleanupKey) - assertTrue(props.getProperty(cleanupKey).equals(cleanupVal), "Properties after creation have incorrect value") + val configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName) + val props = adminClient.describeConfigs(Collections.singleton(configResource)).all().get().get(configResource) + // val props = adminZkClient.fetchEntityConfig(ConfigType.Topic, testTopicName) + assertNotNull(props.get(cleanupKey), "Properties after creation don't contain " + cleanupKey) + assertEquals(cleanupVal, props.get(cleanupKey).value(), "Properties after creation have incorrect value") // pre-create the topic config changes path to avoid a NoNodeException - zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path) + if (!isKRaftTest()) { + zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path) + } // modify the topic to add new partitions val numPartitionsModified = 3 val alterOpts = new TopicCommandOptions( Array("--partitions", numPartitionsModified.toString, "--topic", testTopicName)) topicService.alterTopic(alterOpts) - val newProps = adminZkClient.fetchEntityConfig(ConfigType.Topic, testTopicName) - assertTrue(newProps.containsKey(cleanupKey), "Updated properties do not contain " + cleanupKey) - assertTrue(newProps.getProperty(cleanupKey).equals(cleanupVal), "Updated properties have incorrect value") + val newProps = adminClient.describeConfigs(Collections.singleton(configResource)).all().get().get(configResource) + assertNotNull(newProps.get(cleanupKey), "Updated properties do not contain " + cleanupKey) + assertEquals(cleanupVal, newProps.get(cleanupKey).value(), "Updated properties have incorrect value") } - @Test - def testTopicDeletion(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testTopicDeletion(quorum: String): Unit = { // create the NormalTopic val createOpts = new TopicCommandOptions(Array("--partitions", "1", "--replication-factor", "1", @@ -423,14 +460,17 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi // delete the NormalTopic val deleteOpts = new TopicCommandOptions(Array("--topic", testTopicName)) - val deletePath = DeleteTopicsTopicZNode.path(testTopicName) - assertFalse(zkClient.pathExists(deletePath), "Delete path for topic shouldn't exist before deletion.") + if (!isKRaftTest()) { + val deletePath = DeleteTopicsTopicZNode.path(testTopicName) + assertFalse(zkClient.pathExists(deletePath), "Delete path for topic shouldn't exist before deletion.") + } topicService.deleteTopic(deleteOpts) - TestUtils.verifyTopicDeletion(zkClient, testTopicName, 1, servers) + TestUtils.verifyTopicDeletion(zkClientOrNull, testTopicName, 1, brokers) } - @Test - def testDeleteInternalTopic(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testDeleteInternalTopic(quorum: String): Unit = { // create the offset topic val createOffsetTopicOpts = new TopicCommandOptions(Array("--partitions", "1", "--replication-factor", "1", @@ -443,25 +483,30 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi val deleteOffsetTopicOpts = new TopicCommandOptions( Array("--topic", Topic.GROUP_METADATA_TOPIC_NAME)) val deleteOffsetTopicPath = DeleteTopicsTopicZNode.path(Topic.GROUP_METADATA_TOPIC_NAME) - assertFalse(zkClient.pathExists(deleteOffsetTopicPath), "Delete path for topic shouldn't exist before deletion.") + if (!isKRaftTest()) { + assertFalse(zkClient.pathExists(deleteOffsetTopicPath), "Delete path for topic shouldn't exist before deletion.") + } topicService.deleteTopic(deleteOffsetTopicOpts) - TestUtils.verifyTopicDeletion(zkClient, Topic.GROUP_METADATA_TOPIC_NAME, 1, servers) + TestUtils.verifyTopicDeletion(zkClientOrNull, Topic.GROUP_METADATA_TOPIC_NAME, 1, brokers) } - @Test - def testDeleteWhenTopicDoesntExist(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testDeleteWhenTopicDoesntExist(quorum: String): Unit = { // delete a topic that does not exist val deleteOpts = new TopicCommandOptions(Array("--topic", testTopicName)) assertThrows(classOf[IllegalArgumentException], () => topicService.deleteTopic(deleteOpts)) } - @Test - def testDeleteWhenTopicDoesntExistWithIfExists(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testDeleteWhenTopicDoesntExistWithIfExists(quorum: String): Unit = { topicService.deleteTopic(new TopicCommandOptions(Array("--topic", testTopicName, "--if-exists"))) } - @Test - def testDescribe(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testDescribe(quorum: String): Unit = { adminClient.createTopics( Collections.singletonList(new NewTopic(testTopicName, 2, 2.toShort))).all().get() waitForTopicCreated(testTopicName) @@ -473,19 +518,22 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi assertTrue(rows(0).startsWith(s"Topic: $testTopicName")) } - @Test - def testDescribeWhenTopicDoesntExist(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testDescribeWhenTopicDoesntExist(quorum: String): Unit = { assertThrows(classOf[IllegalArgumentException], () => topicService.describeTopic(new TopicCommandOptions(Array("--topic", testTopicName)))) } - @Test - def testDescribeWhenTopicDoesntExistWithIfExists(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testDescribeWhenTopicDoesntExistWithIfExists(quorum: String): Unit = { topicService.describeTopic(new TopicCommandOptions(Array("--topic", testTopicName, "--if-exists"))) } - @Test - def testDescribeUnavailablePartitions(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testDescribeUnavailablePartitions(quorum: String): Unit = { adminClient.createTopics( Collections.singletonList(new NewTopic(testTopicName, 6, 1.toShort))).all().get() waitForTopicCreated(testTopicName) @@ -500,7 +548,7 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi // wait until the topic metadata for the test topic is propagated to each alive broker TestUtils.waitUntilTrue(() => { - servers + brokers .filterNot(_.config.brokerId == 0) .foldLeft(true) { (result, server) => { @@ -527,15 +575,16 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi } } - @Test - def testDescribeUnderReplicatedPartitions(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testDescribeUnderReplicatedPartitions(quorum: String): Unit = { adminClient.createTopics( Collections.singletonList(new NewTopic(testTopicName, 1, 6.toShort))).all().get() waitForTopicCreated(testTopicName) try { killBroker(0) - val aliveServers = servers.filterNot(_.config.brokerId == 0) + val aliveServers = brokers.filterNot(_.config.brokerId == 0) TestUtils.waitForPartitionMetadata(aliveServers, testTopicName, 0) val output = TestUtils.grabConsoleOutput( topicService.describeTopic(new TopicCommandOptions(Array("--under-replicated-partitions")))) @@ -546,8 +595,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi } } - @Test - def testDescribeUnderMinIsrPartitions(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testDescribeUnderMinIsrPartitions(quorum: String): Unit = { val configMap = new java.util.HashMap[String, String]() configMap.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "6") @@ -557,7 +607,7 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi try { killBroker(0) - val aliveServers = servers.filterNot(_.config.brokerId == 0) + val aliveServers = brokers.filterNot(_.config.brokerId == 0) TestUtils.waitForPartitionMetadata(aliveServers, testTopicName, 0) val output = TestUtils.grabConsoleOutput( topicService.describeTopic(new TopicCommandOptions(Array("--under-min-isr-partitions")))) @@ -568,8 +618,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi } } - @Test - def testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(quorum: String): Unit = { val configMap = new java.util.HashMap[String, String]() val replicationFactor: Short = 1 val partitions = 1 @@ -580,12 +631,12 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi waitForTopicCreated(testTopicName) // Produce multiple batches. - TestUtils.generateAndProduceMessages(servers, testTopicName, numMessages = 10, acks = -1) - TestUtils.generateAndProduceMessages(servers, testTopicName, numMessages = 10, acks = -1) + TestUtils.generateAndProduceMessages(brokers, testTopicName, numMessages = 10, acks = -1) + TestUtils.generateAndProduceMessages(brokers, testTopicName, numMessages = 10, acks = -1) // Enable throttling. Note the broker config sets the replica max fetch bytes to `1` upon to minimize replication // throughput so the reassignment doesn't complete quickly. - val brokerIds = servers.map(_.config.brokerId) + val brokerIds = brokers.map(_.config.brokerId) TestUtils.setReplicationThrottleForPartitions(adminClient, brokerIds, Set(tp), throttleBytes = 1) val testTopicDesc = adminClient.describeTopics(Collections.singleton(testTopicName)).allTopicNames().get().get(testTopicName) @@ -622,8 +673,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi TestUtils.waitForAllReassignmentsToComplete(adminClient) } - @Test - def testDescribeAtMinIsrPartitions(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testDescribeAtMinIsrPartitions(quorum: String): Unit = { val configMap = new java.util.HashMap[String, String]() configMap.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "4") @@ -653,8 +705,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi * * Output should only display the (1) topic with partition under min ISR count and (3) topic with offline partition */ - @Test - def testDescribeUnderMinIsrPartitionsMixed(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testDescribeUnderMinIsrPartitionsMixed(quorum: String): Unit = { val underMinIsrTopic = "under-min-isr-topic" val notUnderMinIsrTopic = "not-under-min-isr-topic" val offlineTopic = "offline-topic" @@ -677,7 +730,7 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi try { killBroker(0) - val aliveServers = servers.filterNot(_.config.brokerId == 0) + val aliveServers = brokers.filterNot(_.config.brokerId == 0) TestUtils.waitForPartitionMetadata(aliveServers, underMinIsrTopic, 0) val output = TestUtils.grabConsoleOutput( topicService.describeTopic(new TopicCommandOptions(Array("--under-min-isr-partitions")))) @@ -690,8 +743,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi } } - @Test - def testDescribeReportOverriddenConfigs(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testDescribeReportOverriddenConfigs(quorum: String): Unit = { val config = "file.delete.delay.ms=1000" createAndWaitTopic(new TopicCommandOptions( Array("--partitions", "2", "--replication-factor", "2", "--topic", testTopicName, "--config", config))) @@ -700,8 +754,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi assertTrue(output.contains(config), s"Describe output should have contained $config") } - @Test - def testDescribeAndListTopicsWithoutInternalTopics(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testDescribeAndListTopicsWithoutInternalTopics(quorum: String): Unit = { createAndWaitTopic( new TopicCommandOptions(Array("--partitions", "1", "--replication-factor", "1", "--topic", testTopicName))) // create a internal topic @@ -720,8 +775,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi assertFalse(output.contains(Topic.GROUP_METADATA_TOPIC_NAME)) } - @Test - def testDescribeDoesNotFailWhenListingReassignmentIsUnauthorized(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testDescribeDoesNotFailWhenListingReassignmentIsUnauthorized(quorum: String): Unit = { adminClient = spy(adminClient) topicService = TopicService(adminClient) @@ -746,8 +802,20 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi assertTrue(rows(0).startsWith(s"Topic: $testTopicName")) } - @Test - def testCreateTopicDoesNotRetryThrottlingQuotaExceededException(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testCreateWithTopicNameCollision(quorum: String): Unit = { + adminClient.createTopics( + Collections.singletonList(new NewTopic("foo_bar", 1, 6.toShort))).all().get() + waitForTopicCreated("foo_bar") + + assertThrows(classOf[InvalidTopicException], + () => topicService.createTopic(new TopicCommandOptions(Array("--topic", "foo.bar")))) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testCreateTopicDoesNotRetryThrottlingQuotaExceededException(quorum: String): Unit = { val adminClient = mock(classOf[Admin]) val topicService = TopicService(adminClient) @@ -766,8 +834,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi ) } - @Test - def testDeleteTopicDoesNotRetryThrottlingQuotaExceededException(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testDeleteTopicDoesNotRetryThrottlingQuotaExceededException(quorum: String): Unit = { val adminClient = mock(classOf[Admin]) val topicService = TopicService(adminClient) @@ -787,8 +856,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi ) } - @Test - def testCreatePartitionsDoesNotRetryThrottlingQuotaExceededException(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testCreatePartitionsDoesNotRetryThrottlingQuotaExceededException(quorum: String): Unit = { val adminClient = mock(classOf[Admin]) val topicService = TopicService(adminClient) diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index d8005e60ce64..f104364d1a6d 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -216,6 +216,23 @@ static Map translateCreationConfigs(CreateableTopicConfigCollect */ private final TimelineHashMap topicsByName; + /** + * We try to prevent topics from being created if their names would collide with + * existing topics when periods in the topic name are replaced with underscores. + * The reason for this is that some per-topic metrics do replace periods with + * underscores, and would therefore be ambiguous otherwise. + * + * This map is from normalized topic name to a set of topic names. So if we had two + * topics named foo.bar and foo_bar this map would contain + * a mapping from foo_bar to a set containing foo.bar and foo_bar. + * + * Since we reject topic creations that would collide, under normal conditions the + * sets in this map should only have a size of 1. However, if the cluster was + * upgraded from a version prior to KAFKA-13743, it may be possible to have more + * values here, since collidiing topic names will be "grandfathered in." + */ + private final TimelineHashMap> topicsWithCollisionChars; + /** * Maps topic UUIDs to structures containing topic information, including partitions. */ @@ -258,6 +275,7 @@ static Map translateCreationConfigs(CreateableTopicConfigCollect this.clusterControl = clusterControl; this.globalPartitionCount = new TimelineInteger(snapshotRegistry); this.topicsByName = new TimelineHashMap<>(snapshotRegistry, 0); + this.topicsWithCollisionChars = new TimelineHashMap<>(snapshotRegistry, 0); this.topics = new TimelineHashMap<>(snapshotRegistry, 0); this.brokersToIsrs = new BrokersToIsrs(snapshotRegistry); this.reassigningTopics = new TimelineHashMap<>(snapshotRegistry, 0); @@ -266,6 +284,15 @@ static Map translateCreationConfigs(CreateableTopicConfigCollect public void replay(TopicRecord record) { topicsByName.put(record.name(), record.topicId()); + if (Topic.hasCollisionChars(record.name())) { + String normalizedName = Topic.unifyCollisionChars(record.name()); + TimelineHashSet topicNames = topicsWithCollisionChars.get(normalizedName); + if (topicNames == null) { + topicNames = new TimelineHashSet<>(snapshotRegistry, 1); + topicsWithCollisionChars.put(normalizedName, topicNames); + } + topicNames.add(record.name()); + } topics.put(record.topicId(), new TopicControlInfo(record.name(), snapshotRegistry, record.topicId())); controllerMetrics.setGlobalTopicsCount(topics.size()); @@ -374,6 +401,16 @@ public void replay(RemoveTopicRecord record) { " to remove."); } topicsByName.remove(topic.name); + if (Topic.hasCollisionChars(topic.name)) { + String normalizedName = Topic.unifyCollisionChars(topic.name); + TimelineHashSet colliding = topicsWithCollisionChars.get(normalizedName); + if (colliding != null) { + colliding.remove(topic.name); + if (colliding.isEmpty()) { + topicsWithCollisionChars.remove(topic.name); + } + } + } reassigningTopics.remove(record.topicId()); // Delete the configurations associated with this topic. @@ -407,7 +444,7 @@ public void replay(RemoveTopicRecord record) { List records = new ArrayList<>(); // Check the topic names. - validateNewTopicNames(topicErrors, request.topics()); + validateNewTopicNames(topicErrors, request.topics(), topicsWithCollisionChars); // Identify topics that already exist and mark them with the appropriate error request.topics().stream().filter(creatableTopic -> topicsByName.containsKey(creatableTopic.name())) @@ -598,7 +635,8 @@ private ApiError maybeCheckCreateTopicPolicy(Supplier topicErrors, - CreatableTopicCollection topics) { + CreatableTopicCollection topics, + Map> topicsWithCollisionChars) { for (CreatableTopic topic : topics) { if (topicErrors.containsKey(topic.name())) continue; try { @@ -607,6 +645,15 @@ static void validateNewTopicNames(Map topicErrors, topicErrors.put(topic.name(), new ApiError(Errors.INVALID_TOPIC_EXCEPTION, e.getMessage())); } + if (Topic.hasCollisionChars(topic.name())) { + String normalizedName = Topic.unifyCollisionChars(topic.name()); + Set colliding = topicsWithCollisionChars.get(normalizedName); + if (colliding != null) { + topicErrors.put(topic.name(), new ApiError(Errors.INVALID_TOPIC_EXCEPTION, + "Topic '" + topic.name() + "' collides with existing topic: " + + colliding.iterator().next())); + } + } } } diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index 108f2eca665c..d095a9fe3729 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -82,6 +82,7 @@ import org.slf4j.LoggerFactory; import java.util.Arrays; +import java.util.TreeSet; import java.util.concurrent.atomic.AtomicLong; import java.util.ArrayList; import java.util.Collections; @@ -641,7 +642,7 @@ public void testValidateNewTopicNames() { topics.add(new CreatableTopic().setName("")); topics.add(new CreatableTopic().setName("woo")); topics.add(new CreatableTopic().setName(".")); - ReplicationControlManager.validateNewTopicNames(topicErrors, topics); + ReplicationControlManager.validateNewTopicNames(topicErrors, topics, Collections.emptyMap()); Map expectedTopicErrors = new HashMap<>(); expectedTopicErrors.put("", new ApiError(INVALID_TOPIC_EXCEPTION, "Topic name is illegal, it can't be empty")); @@ -650,6 +651,24 @@ public void testValidateNewTopicNames() { assertEquals(expectedTopicErrors, topicErrors); } + @Test + public void testTopicNameCollision() { + Map topicErrors = new HashMap<>(); + CreatableTopicCollection topics = new CreatableTopicCollection(); + topics.add(new CreatableTopic().setName("foo.bar")); + topics.add(new CreatableTopic().setName("woo.bar_foo")); + Map> collisionMap = new HashMap<>(); + collisionMap.put("foo_bar", new TreeSet<>(Arrays.asList("foo_bar"))); + collisionMap.put("woo_bar_foo", new TreeSet<>(Arrays.asList("woo.bar.foo", "woo_bar.foo"))); + ReplicationControlManager.validateNewTopicNames(topicErrors, topics, collisionMap); + Map expectedTopicErrors = new HashMap<>(); + expectedTopicErrors.put("foo.bar", new ApiError(INVALID_TOPIC_EXCEPTION, + "Topic 'foo.bar' collides with existing topic: foo_bar")); + expectedTopicErrors.put("woo.bar_foo", new ApiError(INVALID_TOPIC_EXCEPTION, + "Topic 'woo.bar_foo' collides with existing topic: woo.bar.foo")); + assertEquals(expectedTopicErrors, topicErrors); + } + @Test public void testRemoveLeaderships() throws Exception { ReplicationControlTestContext ctx = new ReplicationControlTestContext();