Skip to content

Commit

Permalink
KAFKA-15780: Wait for consistent KRaft metadata when creating or dele…
Browse files Browse the repository at this point in the history
…ting topics (apache#14695) (apache#14713)

TestUtils.createTopicWithAdmin calls waitForAllPartitionsMetadata which waits for partition(s) to be present in each brokers' metadata cache. This is a sufficient check in ZK mode because the controller sends an LISR request before sending an UpdateMetadataRequest which means that the partition in the ReplicaManager will be updated before the metadata cache.

In KRaft mode, the metadata cache is updated first, so the check may return before partitions and other metadata listeners are fully initialized.

Testing:
Insert a Thread.sleep(100) in BrokerMetadataPublisher.onMetadataUpdate after

      // Publish the new metadata image to the metadata cache.
      metadataCache.setImage(newImage)
and run EdgeCaseRequestTest.testProduceRequestWithNullClientId and the test will fail locally nearly deterministically. After the change(s), the test no longer fails.

Conflicts:
	core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
	core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala

Reviewers: Justine Olshan <jolshan@confluent.io>, Divij Vaidya <diviv@amazon.com>, David Mao <dmao@confluent.io>
  • Loading branch information
jolshan authored and mjsax committed Nov 22, 2023
1 parent c491778 commit b3d3d1c
Show file tree
Hide file tree
Showing 13 changed files with 80 additions and 67 deletions.
Expand Up @@ -79,7 +79,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "200")
topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "100")
TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor,
TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor,
topicConfig = topicConfig)
verifyRemoteLogTopicConfigs(topicConfig)
}
Expand All @@ -91,7 +91,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
topicConfig.put(TopicConfig.RETENTION_BYTES_CONFIG, "512")
topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "256")
TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor,
TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor,
topicConfig = topicConfig)
verifyRemoteLogTopicConfigs(topicConfig)
}
Expand All @@ -103,7 +103,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "1001")
TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor,
TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor,
topicConfig = topicConfig)
verifyRemoteLogTopicConfigs(topicConfig)
}
Expand All @@ -115,7 +115,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
topicConfig.put(TopicConfig.RETENTION_BYTES_CONFIG, "1025")
TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor,
TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor,
topicConfig = topicConfig)
verifyRemoteLogTopicConfigs(topicConfig)
}
Expand All @@ -128,7 +128,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "200")
assertThrowsException(classOf[InvalidConfigurationException], () =>
TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor,
TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor,
topicConfig = topicConfig))
}

Expand All @@ -140,7 +140,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
topicConfig.put(TopicConfig.RETENTION_BYTES_CONFIG, "512")
assertThrowsException(classOf[InvalidConfigurationException], () =>
TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor,
TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor,
topicConfig = topicConfig))
}

Expand All @@ -151,7 +151,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
topicConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact")
assertThrowsException(classOf[InvalidConfigurationException], () =>
TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor,
TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor,
topicConfig = topicConfig))
}

Expand All @@ -160,7 +160,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
def testEnableRemoteLogOnExistingTopicTest(quorum: String): Unit = {
val admin = createAdminClient()
val topicConfig = new Properties()
TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, numPartitions, numReplicationFactor,
TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor,
topicConfig = topicConfig)

val configs = new util.HashMap[ConfigResource, util.Collection[AlterConfigOp]]()
Expand All @@ -181,11 +181,11 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
val topicConfigWithRemoteStorage = new Properties()
topicConfigWithRemoteStorage.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
val message = assertThrowsException(classOf[InvalidConfigurationException],
() => TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions,
() => TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers, numPartitions,
numReplicationFactor, topicConfig = topicConfigWithRemoteStorage))
assertTrue(message.getMessage.contains("Tiered Storage functionality is disabled in the broker"))

TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, numPartitions, numReplicationFactor)
TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor)
val configs = new util.HashMap[ConfigResource, util.Collection[AlterConfigOp]]()
configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName),
Collections.singleton(
Expand All @@ -203,7 +203,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
val admin = createAdminClient()
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, numPartitions, numReplicationFactor,
TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor,
topicConfig = topicConfig)

val configs = new util.HashMap[ConfigResource, util.Collection[AlterConfigOp]]()
Expand All @@ -224,7 +224,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
val admin = createAdminClient()
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, numPartitions, numReplicationFactor,
TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor,
topicConfig = topicConfig)

val configs = new util.HashMap[ConfigResource, util.Collection[AlterConfigOp]]()
Expand All @@ -245,7 +245,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
val admin = createAdminClient()
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, numPartitions, numReplicationFactor,
TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor,
topicConfig = topicConfig)

// inherited local retention ms is 1000
Expand All @@ -265,7 +265,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
val admin = createAdminClient()
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, numPartitions, numReplicationFactor,
TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor,
topicConfig = topicConfig)

// inherited local retention bytes is 1024
Expand All @@ -288,9 +288,9 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "200")
topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "100")
TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, brokerCount,
TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers, numPartitions, brokerCount,
topicConfig = topicConfig)
TestUtils.deleteTopicWithAdmin(createAdminClient(), testTopicName, brokers)
TestUtils.deleteTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers)
assertThrowsException(classOf[UnknownTopicOrPartitionException],
() => TestUtils.describeTopic(createAdminClient(), testTopicName), "Topic should be deleted")
TestUtils.waitUntilTrue(() =>
Expand All @@ -304,7 +304,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
val topicConfig = new Properties()
topicConfig.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")

TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, brokerCount,
TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers, numPartitions, brokerCount,
topicConfig = topicConfig)

val tsDisabledProps = TestUtils.createBrokerConfigs(1, zkConnectOrNull).head
Expand All @@ -326,7 +326,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
val topicConfig = new Properties()
topicConfig.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, false.toString)

TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, brokerCount,
TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers, numPartitions, brokerCount,
topicConfig = topicConfig)

val tsDisabledProps = TestUtils.createBrokerConfigs(1, zkConnectOrNull).head
Expand Down
Expand Up @@ -264,9 +264,9 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
val topic1 = "kafka.testTopic1"
val topic2 = "kafka.testTopic2"
val topic3 = "oooof.testTopic1"
TestUtils.createTopicWithAdmin(adminClient, topic1, brokers, 2, 2)
TestUtils.createTopicWithAdmin(adminClient, topic2, brokers, 2, 2)
TestUtils.createTopicWithAdmin(adminClient, topic3, brokers, 2, 2)
TestUtils.createTopicWithAdmin(adminClient, topic1, brokers, controllerServers, 2, 2)
TestUtils.createTopicWithAdmin(adminClient, topic2, brokers, controllerServers, 2, 2)
TestUtils.createTopicWithAdmin(adminClient, topic3, brokers, controllerServers, 2, 2)

val output = TestUtils.grabConsoleOutput(
topicService.listTopics(new TopicCommandOptions(Array("--topic", "kafka.*"))))
Expand All @@ -280,8 +280,8 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
@ValueSource(strings = Array("zk", "kraft"))
def testListTopicsWithExcludeInternal(quorum: String): Unit = {
val topic1 = "kafka.testTopic1"
TestUtils.createTopicWithAdmin(adminClient, topic1, brokers, 2, 2)
TestUtils.createTopicWithAdmin(adminClient, Topic.GROUP_METADATA_TOPIC_NAME, brokers, 2, 2)
TestUtils.createTopicWithAdmin(adminClient, topic1, brokers, controllerServers, 2, 2)
TestUtils.createTopicWithAdmin(adminClient, Topic.GROUP_METADATA_TOPIC_NAME, brokers, controllerServers, 2, 2)

val output = TestUtils.grabConsoleOutput(
topicService.listTopics(new TopicCommandOptions(Array("--exclude-internal"))))
Expand All @@ -293,7 +293,7 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testAlterPartitionCount(quorum: String): Unit = {
TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 2, 2)
TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, controllerServers, 2, 2)

topicService.alterTopic(new TopicCommandOptions(
Array("--topic", testTopicName, "--partitions", "3")))
Expand All @@ -308,7 +308,7 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testAlterAssignment(quorum: String): Unit = {
TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 2, 2)
TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, controllerServers, 2, 2)

topicService.alterTopic(new TopicCommandOptions(
Array("--topic", testTopicName, "--replica-assignment", "5:3,3:1,4:2", "--partitions", "3")))
Expand All @@ -324,7 +324,7 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testAlterAssignmentWithMoreAssignmentThanPartitions(quorum: String): Unit = {
TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 2, 2)
TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, controllerServers, 2, 2)

assertThrows(classOf[ExecutionException],
() => topicService.alterTopic(new TopicCommandOptions(
Expand All @@ -334,7 +334,7 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testAlterAssignmentWithMorePartitionsThanAssignment(quorum: String): Unit = {
TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 2, 2)
TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, controllerServers, 2, 2)

assertThrows(classOf[ExecutionException],
() => topicService.alterTopic(new TopicCommandOptions(
Expand Down Expand Up @@ -520,7 +520,7 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testDescribe(quorum: String): Unit = {
TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 2, 2)
TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, controllerServers, 2, 2)

val output = TestUtils.grabConsoleOutput(
topicService.describeTopic(new TopicCommandOptions(Array("--topic", testTopicName))))
Expand All @@ -545,7 +545,7 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testDescribeUnavailablePartitions(quorum: String): Unit = {
TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, numBrokers, 1)
TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, controllerServers, numBrokers, 1)

try {
// check which partition is on broker 0 which we'll kill
Expand Down Expand Up @@ -587,7 +587,7 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testDescribeUnderReplicatedPartitions(quorum: String): Unit = {
TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 1, numBrokers)
TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, controllerServers, 1, numBrokers)

try {
killBroker(0)
Expand All @@ -612,7 +612,7 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
topicProps.setProperty(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, numBrokers.toString)

// create topic
TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 1, numBrokers, topicConfig = topicProps)
TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, controllerServers, 1, numBrokers, topicConfig = topicProps)

try {
killBroker(0)
Expand All @@ -638,7 +638,7 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
def testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(quorum: String): Unit = {
val tp = new TopicPartition(testTopicName, 0)

TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 1, 1)
TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, controllerServers, 1, 1)

// Produce multiple batches.
TestUtils.generateAndProduceMessages(brokers, testTopicName, numMessages = 10, acks = -1)
Expand Down Expand Up @@ -690,7 +690,7 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
topicProps.setProperty(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "4")

// create topic
TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 1, numBrokers, topicConfig = topicProps)
TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, controllerServers, 1, numBrokers, topicConfig = topicProps)

try {
killBroker(0)
Expand Down Expand Up @@ -735,10 +735,10 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
topicProps.setProperty(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, numBrokers.toString)

// create topic
TestUtils.createTopicWithAdmin(adminClient, underMinIsrTopic, brokers, 1, numBrokers, topicConfig = topicProps)
TestUtils.createTopicWithAdmin(adminClient, notUnderMinIsrTopic, brokers, 1, numBrokers)
TestUtils.createTopicWithAdmin(adminClient, offlineTopic, brokers, 1, replicaAssignment = Map(0 -> Seq(0)))
TestUtils.createTopicWithAdmin(adminClient, fullyReplicatedTopic, brokers, 1, replicaAssignment = Map(0 -> Seq(1, 2, 3)))
TestUtils.createTopicWithAdmin(adminClient, underMinIsrTopic, brokers, controllerServers, 1, numBrokers, topicConfig = topicProps)
TestUtils.createTopicWithAdmin(adminClient, notUnderMinIsrTopic, brokers, controllerServers, 1, numBrokers)
TestUtils.createTopicWithAdmin(adminClient, offlineTopic, brokers, controllerServers, 1, replicaAssignment = Map(0 -> Seq(0)))
TestUtils.createTopicWithAdmin(adminClient, fullyReplicatedTopic, brokers, controllerServers, 1, replicaAssignment = Map(0 -> Seq(1, 2, 3)))

try {
killBroker(0)
Expand Down Expand Up @@ -811,7 +811,7 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
Set(new TopicPartition(testTopicName, 0)).asJava
)

TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 1, 1)
TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, controllerServers, 1, 1)

val output = TestUtils.grabConsoleOutput(
topicService.describeTopic(new TopicCommandOptions(Array("--topic", testTopicName))))
Expand All @@ -823,7 +823,7 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testCreateWithTopicNameCollision(quorum: String): Unit = {
TestUtils.createTopicWithAdmin(adminClient, "foo_bar", brokers, 1, numBrokers)
TestUtils.createTopicWithAdmin(adminClient, "foo_bar", brokers, controllerServers, 1, numBrokers)

assertThrows(classOf[InvalidTopicException],
() => topicService.createTopic(new TopicCommandOptions(Array("--topic", "foo.bar"))))
Expand Down

0 comments on commit b3d3d1c

Please sign in to comment.