diff --git a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala index 6d8fbe1bbe79..7b21ef826861 100644 --- a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala +++ b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala @@ -79,7 +79,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness { topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "200") topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "100") - TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor, + TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor, topicConfig = topicConfig) verifyRemoteLogTopicConfigs(topicConfig) } @@ -91,7 +91,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness { topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") topicConfig.put(TopicConfig.RETENTION_BYTES_CONFIG, "512") topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "256") - TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor, + TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor, topicConfig = topicConfig) verifyRemoteLogTopicConfigs(topicConfig) } @@ -103,7 +103,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness { val topicConfig = new Properties() topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "1001") - TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor, + TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor, topicConfig = topicConfig) verifyRemoteLogTopicConfigs(topicConfig) } @@ -115,7 +115,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness { val topicConfig = new Properties() topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") topicConfig.put(TopicConfig.RETENTION_BYTES_CONFIG, "1025") - TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor, + TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor, topicConfig = topicConfig) verifyRemoteLogTopicConfigs(topicConfig) } @@ -128,7 +128,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness { topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "200") assertThrowsException(classOf[InvalidConfigurationException], () => - TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor, + TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor, topicConfig = topicConfig)) } @@ -140,7 +140,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness { topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") topicConfig.put(TopicConfig.RETENTION_BYTES_CONFIG, "512") assertThrowsException(classOf[InvalidConfigurationException], () => - TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor, + TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor, topicConfig = topicConfig)) } @@ -151,7 +151,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness { topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") topicConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact") assertThrowsException(classOf[InvalidConfigurationException], () => - TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor, + TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor, topicConfig = topicConfig)) } @@ -160,7 +160,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness { def testEnableRemoteLogOnExistingTopicTest(quorum: String): Unit = { val admin = createAdminClient() val topicConfig = new Properties() - TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, numPartitions, numReplicationFactor, + TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor, topicConfig = topicConfig) val configs = new util.HashMap[ConfigResource, util.Collection[AlterConfigOp]]() @@ -181,11 +181,11 @@ class RemoteTopicCrudTest extends IntegrationTestHarness { val topicConfigWithRemoteStorage = new Properties() topicConfigWithRemoteStorage.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") val message = assertThrowsException(classOf[InvalidConfigurationException], - () => TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, + () => TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor, topicConfig = topicConfigWithRemoteStorage)) assertTrue(message.getMessage.contains("Tiered Storage functionality is disabled in the broker")) - TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, numPartitions, numReplicationFactor) + TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor) val configs = new util.HashMap[ConfigResource, util.Collection[AlterConfigOp]]() configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName), Collections.singleton( @@ -203,7 +203,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness { val admin = createAdminClient() val topicConfig = new Properties() topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") - TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, numPartitions, numReplicationFactor, + TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor, topicConfig = topicConfig) val configs = new util.HashMap[ConfigResource, util.Collection[AlterConfigOp]]() @@ -224,7 +224,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness { val admin = createAdminClient() val topicConfig = new Properties() topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") - TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, numPartitions, numReplicationFactor, + TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor, topicConfig = topicConfig) val configs = new util.HashMap[ConfigResource, util.Collection[AlterConfigOp]]() @@ -245,7 +245,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness { val admin = createAdminClient() val topicConfig = new Properties() topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") - TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, numPartitions, numReplicationFactor, + TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor, topicConfig = topicConfig) // inherited local retention ms is 1000 @@ -265,7 +265,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness { val admin = createAdminClient() val topicConfig = new Properties() topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") - TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, numPartitions, numReplicationFactor, + TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor, topicConfig = topicConfig) // inherited local retention bytes is 1024 @@ -288,9 +288,9 @@ class RemoteTopicCrudTest extends IntegrationTestHarness { topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "200") topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "100") - TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, brokerCount, + TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers, numPartitions, brokerCount, topicConfig = topicConfig) - TestUtils.deleteTopicWithAdmin(createAdminClient(), testTopicName, brokers) + TestUtils.deleteTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers) assertThrowsException(classOf[UnknownTopicOrPartitionException], () => TestUtils.describeTopic(createAdminClient(), testTopicName), "Topic should be deleted") TestUtils.waitUntilTrue(() => @@ -304,7 +304,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness { val topicConfig = new Properties() topicConfig.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") - TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, brokerCount, + TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers, numPartitions, brokerCount, topicConfig = topicConfig) val tsDisabledProps = TestUtils.createBrokerConfigs(1, zkConnectOrNull).head @@ -326,7 +326,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness { val topicConfig = new Properties() topicConfig.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, false.toString) - TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, brokerCount, + TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers, numPartitions, brokerCount, topicConfig = topicConfig) val tsDisabledProps = TestUtils.createBrokerConfigs(1, zkConnectOrNull).head diff --git a/core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala index 9e1beaf6adfa..cf0bcda861d3 100644 --- a/core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala @@ -264,9 +264,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi val topic1 = "kafka.testTopic1" val topic2 = "kafka.testTopic2" val topic3 = "oooof.testTopic1" - TestUtils.createTopicWithAdmin(adminClient, topic1, brokers, 2, 2) - TestUtils.createTopicWithAdmin(adminClient, topic2, brokers, 2, 2) - TestUtils.createTopicWithAdmin(adminClient, topic3, brokers, 2, 2) + TestUtils.createTopicWithAdmin(adminClient, topic1, brokers, controllerServers, 2, 2) + TestUtils.createTopicWithAdmin(adminClient, topic2, brokers, controllerServers, 2, 2) + TestUtils.createTopicWithAdmin(adminClient, topic3, brokers, controllerServers, 2, 2) val output = TestUtils.grabConsoleOutput( topicService.listTopics(new TopicCommandOptions(Array("--topic", "kafka.*")))) @@ -280,8 +280,8 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi @ValueSource(strings = Array("zk", "kraft")) def testListTopicsWithExcludeInternal(quorum: String): Unit = { val topic1 = "kafka.testTopic1" - TestUtils.createTopicWithAdmin(adminClient, topic1, brokers, 2, 2) - TestUtils.createTopicWithAdmin(adminClient, Topic.GROUP_METADATA_TOPIC_NAME, brokers, 2, 2) + TestUtils.createTopicWithAdmin(adminClient, topic1, brokers, controllerServers, 2, 2) + TestUtils.createTopicWithAdmin(adminClient, Topic.GROUP_METADATA_TOPIC_NAME, brokers, controllerServers, 2, 2) val output = TestUtils.grabConsoleOutput( topicService.listTopics(new TopicCommandOptions(Array("--exclude-internal")))) @@ -293,7 +293,7 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) def testAlterPartitionCount(quorum: String): Unit = { - TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 2, 2) + TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, controllerServers, 2, 2) topicService.alterTopic(new TopicCommandOptions( Array("--topic", testTopicName, "--partitions", "3"))) @@ -308,7 +308,7 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) def testAlterAssignment(quorum: String): Unit = { - TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 2, 2) + TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, controllerServers, 2, 2) topicService.alterTopic(new TopicCommandOptions( Array("--topic", testTopicName, "--replica-assignment", "5:3,3:1,4:2", "--partitions", "3"))) @@ -324,7 +324,7 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) def testAlterAssignmentWithMoreAssignmentThanPartitions(quorum: String): Unit = { - TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 2, 2) + TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, controllerServers, 2, 2) assertThrows(classOf[ExecutionException], () => topicService.alterTopic(new TopicCommandOptions( @@ -334,7 +334,7 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) def testAlterAssignmentWithMorePartitionsThanAssignment(quorum: String): Unit = { - TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 2, 2) + TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, controllerServers, 2, 2) assertThrows(classOf[ExecutionException], () => topicService.alterTopic(new TopicCommandOptions( @@ -520,7 +520,7 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) def testDescribe(quorum: String): Unit = { - TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 2, 2) + TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, controllerServers, 2, 2) val output = TestUtils.grabConsoleOutput( topicService.describeTopic(new TopicCommandOptions(Array("--topic", testTopicName)))) @@ -545,7 +545,7 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) def testDescribeUnavailablePartitions(quorum: String): Unit = { - TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, numBrokers, 1) + TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, controllerServers, numBrokers, 1) try { // check which partition is on broker 0 which we'll kill @@ -587,7 +587,7 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) def testDescribeUnderReplicatedPartitions(quorum: String): Unit = { - TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 1, numBrokers) + TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, controllerServers, 1, numBrokers) try { killBroker(0) @@ -612,7 +612,7 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi topicProps.setProperty(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, numBrokers.toString) // create topic - TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 1, numBrokers, topicConfig = topicProps) + TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, controllerServers, 1, numBrokers, topicConfig = topicProps) try { killBroker(0) @@ -638,7 +638,7 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi def testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(quorum: String): Unit = { val tp = new TopicPartition(testTopicName, 0) - TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 1, 1) + TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, controllerServers, 1, 1) // Produce multiple batches. TestUtils.generateAndProduceMessages(brokers, testTopicName, numMessages = 10, acks = -1) @@ -690,7 +690,7 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi topicProps.setProperty(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "4") // create topic - TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 1, numBrokers, topicConfig = topicProps) + TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, controllerServers, 1, numBrokers, topicConfig = topicProps) try { killBroker(0) @@ -735,10 +735,10 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi topicProps.setProperty(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, numBrokers.toString) // create topic - TestUtils.createTopicWithAdmin(adminClient, underMinIsrTopic, brokers, 1, numBrokers, topicConfig = topicProps) - TestUtils.createTopicWithAdmin(adminClient, notUnderMinIsrTopic, brokers, 1, numBrokers) - TestUtils.createTopicWithAdmin(adminClient, offlineTopic, brokers, 1, replicaAssignment = Map(0 -> Seq(0))) - TestUtils.createTopicWithAdmin(adminClient, fullyReplicatedTopic, brokers, 1, replicaAssignment = Map(0 -> Seq(1, 2, 3))) + TestUtils.createTopicWithAdmin(adminClient, underMinIsrTopic, brokers, controllerServers, 1, numBrokers, topicConfig = topicProps) + TestUtils.createTopicWithAdmin(adminClient, notUnderMinIsrTopic, brokers, controllerServers, 1, numBrokers) + TestUtils.createTopicWithAdmin(adminClient, offlineTopic, brokers, controllerServers, 1, replicaAssignment = Map(0 -> Seq(0))) + TestUtils.createTopicWithAdmin(adminClient, fullyReplicatedTopic, brokers, controllerServers, 1, replicaAssignment = Map(0 -> Seq(1, 2, 3))) try { killBroker(0) @@ -811,7 +811,7 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi Set(new TopicPartition(testTopicName, 0)).asJava ) - TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 1, 1) + TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, controllerServers, 1, 1) val output = TestUtils.grabConsoleOutput( topicService.describeTopic(new TopicCommandOptions(Array("--topic", testTopicName)))) @@ -823,7 +823,7 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) def testCreateWithTopicNameCollision(quorum: String): Unit = { - TestUtils.createTopicWithAdmin(adminClient, "foo_bar", brokers, 1, numBrokers) + TestUtils.createTopicWithAdmin(adminClient, "foo_bar", brokers, controllerServers, 1, numBrokers) assertThrows(classOf[InvalidTopicException], () => topicService.createTopic(new TopicCommandOptions(Array("--topic", "foo.bar")))) diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala index 5f51d2bd41b1..93f7a7212c49 100644 --- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala @@ -154,7 +154,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { try { // create topic - TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2) + TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 1, 2) // send a normal record val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, "key".getBytes(StandardCharsets.UTF_8), @@ -208,7 +208,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { timeoutMs: Long = 20000L): Unit = { val partition = 0 try { - TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2) + TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 1, 2) val futures = for (i <- 1 to numRecords) yield { val record = new ProducerRecord(topic, partition, s"key$i".getBytes(StandardCharsets.UTF_8), @@ -263,7 +263,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { topicProps.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime") else topicProps.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "CreateTime") - TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2, topicConfig = topicProps) + TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 1, 2, topicConfig = topicProps) val recordAndFutures = for (i <- 1 to numRecords) yield { val record = new ProducerRecord(topic, partition, baseTimestamp + i, s"key$i".getBytes(StandardCharsets.UTF_8), @@ -296,7 +296,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { try { // create topic - TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2) + TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 1, 2) // non-blocking send a list of records val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, null, "key".getBytes(StandardCharsets.UTF_8), @@ -329,7 +329,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { val producer = createProducer() try { - TestUtils.createTopicWithAdmin(admin, topic, brokers, 2, 2) + TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 2, 2) val partition = 1 val now = System.currentTimeMillis() @@ -373,7 +373,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { val replicas = List(0, follower) try { - TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 3, Map(0 -> replicas)) + TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 1, 3, Map(0 -> replicas)) val partition = 0 val now = System.currentTimeMillis() @@ -422,7 +422,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { val producer = createProducer(maxBlockMs = 5 * 1000L) // create topic - TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2) + TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 1, 2) val partition0 = 0 var futures0 = (1 to numRecords).map { i => @@ -479,7 +479,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { def testFlush(quorum: String): Unit = { val producer = createProducer(lingerMs = Int.MaxValue, deliveryTimeoutMs = Int.MaxValue) try { - TestUtils.createTopicWithAdmin(admin, topic, brokers, 2, 2) + TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 2, 2) val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, "value".getBytes(StandardCharsets.UTF_8)) for (_ <- 0 until 50) { @@ -499,7 +499,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) def testCloseWithZeroTimeoutFromCallerThread(quorum: String): Unit = { - TestUtils.createTopicWithAdmin(admin, topic, brokers, 2, 2) + TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 2, 2) val partition = 0 consumer.assign(List(new TopicPartition(topic, partition)).asJava) val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, null, @@ -525,7 +525,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) def testCloseWithZeroTimeoutFromSenderThread(quorum: String): Unit = { - TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2) + TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 1, 2) val partition = 0 consumer.assign(List(new TopicPartition(topic, partition)).asJava) val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, null, "value".getBytes(StandardCharsets.UTF_8)) diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala index 7731efd360f6..46e674c00aa5 100644 --- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala @@ -167,7 +167,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas // create the test topic with all the brokers as replicas val superuserAdminClient = createSuperuserAdminClient() - TestUtils.createTopicWithAdmin(admin = superuserAdminClient, topic = topic, brokers = brokers, + TestUtils.createTopicWithAdmin(admin = superuserAdminClient, topic = topic, brokers = brokers, controllers = controllerServers, numPartitions = 1, replicationFactor = 3, topicConfig = new Properties) } diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 784374d23e8e..805fdda0e8a5 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -2692,11 +2692,11 @@ object PlaintextAdminIntegrationTest { // Create topics val topic1 = "invalid-alter-configs-topic-1" val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1) - createTopicWithAdmin(admin, topic1, test.brokers, numPartitions = 1, replicationFactor = 1) + createTopicWithAdmin(admin, topic1, test.brokers, test.controllerServers, numPartitions = 1, replicationFactor = 1) val topic2 = "invalid-alter-configs-topic-2" val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2) - createTopicWithAdmin(admin, topic2, test.brokers, numPartitions = 1, replicationFactor = 1) + createTopicWithAdmin(admin, topic2, test.brokers, test.controllerServers, numPartitions = 1, replicationFactor = 1) val topicConfigEntries1 = Seq( new ConfigEntry(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "1.1"), // this value is invalid as it's above 1.0 diff --git a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala index ac7b775c2283..77132d919bc2 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala @@ -66,7 +66,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest { val producer = createProducer(batchSize = 0) val numRecords = 10; try { - TestUtils.createTopicWithAdmin(admin, topic, brokers, 2) + TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 2) val futures = for (i <- 1 to numRecords) yield { val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, null, s"value$i".getBytes(StandardCharsets.UTF_8)) producer.send(record) @@ -128,7 +128,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest { // set the TopicConfig for timestamp validation to have 1 minute threshold. Note that recordTimestamp has 5 minutes diff val oneMinuteInMs: Long = 1 * 60 * 60 * 1000L topicProps.setProperty(messageTimeStampConfig, oneMinuteInMs.toString) - TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2, topicConfig = topicProps) + TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 1, 2, topicConfig = topicProps) val producer = createProducer() try { @@ -157,7 +157,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest { // set the TopicConfig for timestamp validation to be the same as the record timestamp topicProps.setProperty(messageTimeStampConfig, recordTimestamp.toString) - TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2, topicConfig = topicProps) + TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 1, 2, topicConfig = topicProps) val producer = createProducer() @@ -178,7 +178,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest { // set the TopicConfig for timestamp validation to have 10 minute threshold. Note that recordTimestamp has 5 minutes diff val tenMinutesInMs: Long = 10 * 60 * 60 * 1000L topicProps.setProperty(messageTimeStampConfig, tenMinutesInMs.toString) - TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2, topicConfig = topicProps) + TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 1, 2, topicConfig = topicProps) val producer = createProducer() diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala index 6135ec952ca9..08f10e89083f 100755 --- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala @@ -87,7 +87,7 @@ class ProducerCompressionTest extends QuorumTestHarness { val admin = TestUtils.createAdminClient(Seq(broker), ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)) try { - TestUtils.createTopicWithAdmin(admin, topic, Seq(broker)) + TestUtils.createTopicWithAdmin(admin, topic, Seq(broker), controllerServers) } finally { admin.close() } diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index 3bbfbbed78ab..435b87d330df 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -156,8 +156,8 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup createAdminClient(SecurityProtocol.SSL, SecureInternal) - TestUtils.createTopicWithAdmin(adminClients.head, topic, servers, numPartitions, replicationFactor = numServers) - TestUtils.createTopicWithAdmin(adminClients.head, Topic.GROUP_METADATA_TOPIC_NAME, servers, + TestUtils.createTopicWithAdmin(adminClients.head, topic, servers, controllerServers, numPartitions, replicationFactor = numServers) + TestUtils.createTopicWithAdmin(adminClients.head, Topic.GROUP_METADATA_TOPIC_NAME, servers, controllerServers, numPartitions = servers.head.config.offsetsTopicPartitions, replicationFactor = numServers, topicConfig = servers.head.groupCoordinator.groupMetadataTopicConfigs) @@ -356,7 +356,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup @ValueSource(strings = Array("zk", "kraft")) def testKeyStoreAlter(quorum: String): Unit = { val topic2 = "testtopic2" - TestUtils.createTopicWithAdmin(adminClients.head, topic2, servers, numPartitions, replicationFactor = numServers) + TestUtils.createTopicWithAdmin(adminClients.head, topic2, servers, controllerServers, numPartitions, replicationFactor = numServers) // Start a producer and consumer that work with the current broker keystore. // This should continue working while changes are made @@ -578,7 +578,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup val topic2 = "testtopic2" val topicProps = new Properties topicProps.put(KafkaConfig.MinInSyncReplicasProp, "2") - TestUtils.createTopicWithAdmin(adminClients.head, topic2, servers, numPartitions = 1, replicationFactor = numServers, topicConfig = topicProps) + TestUtils.createTopicWithAdmin(adminClients.head, topic2, servers, controllerServers, numPartitions = 1, replicationFactor = numServers, topicConfig = topicProps) def getLogOrThrow(tp: TopicPartition): UnifiedLog = { var (logOpt, found) = TestUtils.computeUntilTrue { diff --git a/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala b/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala index 3b6f28b4f8a7..7231c0599a61 100644 --- a/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala @@ -65,6 +65,7 @@ class FetchFromFollowerIntegrationTest extends BaseFetchRequestTest { admin, topic, brokers, + controllerServers, replicaAssignment = Map(0 -> Seq(leaderBrokerId, followerBrokerId)) ) @@ -105,6 +106,7 @@ class FetchFromFollowerIntegrationTest extends BaseFetchRequestTest { admin, topic, brokers, + controllerServers, replicaAssignment = Map(0 -> Seq(leaderBrokerId, followerBrokerId)) ) @@ -131,6 +133,7 @@ class FetchFromFollowerIntegrationTest extends BaseFetchRequestTest { admin, topic, brokers, + controllerServers, replicaAssignment = Map(0 -> Seq(leaderBrokerId, followerBrokerId)) ) diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala index 8775a8323d42..01e6a91eb854 100755 --- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala @@ -152,7 +152,7 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness { ): Unit = { if (isKRaftTest()) { resource(createAdminClient(brokers, listenerName, adminClientConfig)) { admin => - TestUtils.createOffsetsTopicWithAdmin(admin, brokers) + TestUtils.createOffsetsTopicWithAdmin(admin, brokers, controllerServers) } } else { TestUtils.createOffsetsTopic(zkClient, servers) @@ -178,6 +178,7 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness { admin = admin, topic = topic, brokers = brokers, + controllers = controllerServers, numPartitions = numPartitions, replicationFactor = replicationFactor, topicConfig = topicConfig @@ -211,7 +212,8 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness { admin = admin, topic = topic, replicaAssignment = partitionReplicaAssignment, - brokers = brokers + brokers = brokers, + controllers = controllerServers ) } } else { @@ -232,7 +234,8 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness { TestUtils.deleteTopicWithAdmin( admin = admin, topic = topic, - brokers = aliveBrokers) + brokers = aliveBrokers, + controllers = controllerServers) } } else { adminZkClient.deleteTopic(topic) diff --git a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala index 89cddfe97e72..b802a2ad6278 100644 --- a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala @@ -74,7 +74,8 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) { // in this test because it does not use FindCoordinator API. TestUtils.createOffsetsTopicWithAdmin( admin = admin, - brokers = raftCluster.brokers.collect(Collectors.toList[BrokerServer]).asScala + brokers = raftCluster.brokers.collect(Collectors.toList[BrokerServer]).asScala, + controllers = raftCluster.controllerServers().asScala.toSeq ) // Heartbeat request to join the group. Note that the member subscribes diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 159cdf5de5e9..b353954282fa 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -466,6 +466,7 @@ object TestUtils extends Logging { admin: Admin, topic: String, brokers: Seq[B], + controllers: Seq[ControllerServer], numPartitions: Int = 1, replicationFactor: Int = 1, replicaAssignment: collection.Map[Int, Seq[Int]] = Map.empty, @@ -503,6 +504,7 @@ object TestUtils extends Logging { // wait until we've propagated all partitions metadata to all brokers val allPartitionsMetadata = waitForAllPartitionsMetadata(brokers, topic, effectiveNumPartitions) + controllers.foreach(controller => ensureConsistentKRaftMetadata(brokers, controller)) (0 until effectiveNumPartitions).map { i => i -> allPartitionsMetadata.get(new TopicPartition(topic, i)).map(_.leader()).getOrElse( @@ -532,7 +534,8 @@ object TestUtils extends Logging { def createOffsetsTopicWithAdmin[B <: KafkaBroker]( admin: Admin, - brokers: Seq[B] + brokers: Seq[B], + controllers: Seq[ControllerServer] ): Map[Int, Int] = { val broker = brokers.head createTopicWithAdmin( @@ -541,6 +544,7 @@ object TestUtils extends Logging { numPartitions = broker.config.getInt(KafkaConfig.OffsetsTopicPartitionsProp), replicationFactor = broker.config.getShort(KafkaConfig.OffsetsTopicReplicationFactorProp).toInt, brokers = brokers, + controllers = controllers, topicConfig = broker.groupCoordinator.groupMetadataTopicConfigs, ) } @@ -549,6 +553,7 @@ object TestUtils extends Logging { admin: Admin, topic: String, brokers: Seq[B], + controllers: Seq[ControllerServer] ): Unit = { try { admin.deleteTopics(Collections.singletonList(topic)).all().get() @@ -558,6 +563,7 @@ object TestUtils extends Logging { // ignore } waitForAllPartitionsMetadata(brokers, topic, 0) + controllers.foreach(controller => ensureConsistentKRaftMetadata(brokers, controller)) } /** diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java index 59acae74ad3f..8d475fbfe3cc 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java @@ -179,7 +179,7 @@ private void updateResource(ConfigResource configResource, } public void deleteTopic(String topic) { - TestUtils.deleteTopicWithAdmin(admin, topic, harness.brokers()); + TestUtils.deleteTopicWithAdmin(admin, topic, harness.brokers(), harness.controllerServers()); } /**