From 3b7017671f2ae0ed52ba499f2cb1032344b333cd Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Fri, 29 May 2020 12:09:09 -0700 Subject: [PATCH] KAFKA-10061; Fix flaky `ReassignPartitionsIntegrationTest.testCancellation` (#8749) We have seen this test failing from time to time. In spite of the low quota, it is possible for one or more of the reassignments to complete before verification that the reassignment is in progress. The patch makes this less likely by reducing the quota even further and increasing the amount of data in the topic. Reviewers: Chia-Ping Tsai , Rajini Sivaram --- .../ReassignPartitionsIntegrationTest.scala | 32 ++++++++----------- 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala index c365d5fb8417..974877b7afbd 100644 --- a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala @@ -56,10 +56,8 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness { } val unthrottledBrokerConfigs = - 0.to(4).map { - case brokerId => (brokerId, brokerLevelThrottles.map { - case throttleName => (throttleName, -1L) - }.toMap) + 0.to(4).map { brokerId => + brokerId -> brokerLevelThrottles.map(throttle => (throttle, -1L)).toMap }.toMap /** @@ -277,15 +275,15 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness { def testCancellation(): Unit = { cluster = new ReassignPartitionsTestCluster(zkConnect) cluster.setup() - cluster.produceMessages("foo", 0, 60) - cluster.produceMessages("baz", 1, 80) + cluster.produceMessages("foo", 0, 200) + cluster.produceMessages("baz", 1, 200) val assignment = """{"version":1,"partitions":""" + """[{"topic":"foo","partition":0,"replicas":[0,1,3],"log_dirs":["any","any","any"]},""" + """{"topic":"baz","partition":1,"replicas":[0,2,3],"log_dirs":["any","any","any"]}""" + """]}""" assertEquals(unthrottledBrokerConfigs, describeBrokerLevelThrottles(unthrottledBrokerConfigs.keySet.toSeq)) - val interBrokerThrottle = 100L + val interBrokerThrottle = 1L runExecuteAssignment(cluster.adminClient, false, assignment, interBrokerThrottle, -1L) val throttledConfigMap = Map[String, Long]( brokerLevelLeaderThrottle -> interBrokerThrottle, @@ -303,10 +301,9 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness { // Verify that the reassignment is running. The very low throttle should keep it // from completing before this runs. waitForVerifyAssignment(cluster.adminClient, assignment, true, - VerifyAssignmentResult(Map(new TopicPartition("foo", 0) -> - PartitionReassignmentState(Seq(0, 1, 3, 2), Seq(0, 1, 3), false), - new TopicPartition("baz", 1) -> - PartitionReassignmentState(Seq(0, 2, 3, 1), Seq(0, 2, 3), false)), + VerifyAssignmentResult(Map( + new TopicPartition("foo", 0) -> PartitionReassignmentState(Seq(0, 1, 3, 2), Seq(0, 1, 3), false), + new TopicPartition("baz", 1) -> PartitionReassignmentState(Seq(0, 2, 3, 1), Seq(0, 2, 3), false)), true, Map(), false)) // Cancel the reassignment. assertEquals((Set( @@ -339,13 +336,12 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness { * information. The nested maps are keyed on throttle name. */ private def describeBrokerLevelThrottles(brokerIds: Seq[Int]): Map[Int, Map[String, Long]] = { - brokerIds.map { - case brokerId => - val props = zkClient.getEntityConfigs("brokers", brokerId.toString) - (brokerId, brokerLevelThrottles.map { - case throttleName => (throttleName, - props.getOrDefault(throttleName, "-1").asInstanceOf[String].toLong) - }.toMap) + brokerIds.map { brokerId => + val props = zkClient.getEntityConfigs("brokers", brokerId.toString) + val throttles = brokerLevelThrottles.map { throttleName => + (throttleName, props.getOrDefault(throttleName, "-1").asInstanceOf[String].toLong) + }.toMap + brokerId -> throttles }.toMap }