Skip to content

Commit

Permalink
KAFKA-10061; Fix flaky `ReassignPartitionsIntegrationTest.testCancell…
Browse files Browse the repository at this point in the history
…ation` (#8749)

We have seen this test failing from time to time. In spite of the low quota, it is possible for one or more of the reassignments to complete before verification that the reassignment is in progress. The patch makes this less likely by reducing the quota even further and increasing the amount of data in the topic.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Rajini Sivaram <rajinisivaram@googlemail.com>
  • Loading branch information
hachikuji committed May 29, 2020
1 parent fe948d3 commit 3b70176
Showing 1 changed file with 14 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,8 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness {
}

val unthrottledBrokerConfigs =
0.to(4).map {
case brokerId => (brokerId, brokerLevelThrottles.map {
case throttleName => (throttleName, -1L)
}.toMap)
0.to(4).map { brokerId =>
brokerId -> brokerLevelThrottles.map(throttle => (throttle, -1L)).toMap
}.toMap

/**
Expand Down Expand Up @@ -277,15 +275,15 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness {
def testCancellation(): Unit = {
cluster = new ReassignPartitionsTestCluster(zkConnect)
cluster.setup()
cluster.produceMessages("foo", 0, 60)
cluster.produceMessages("baz", 1, 80)
cluster.produceMessages("foo", 0, 200)
cluster.produceMessages("baz", 1, 200)
val assignment = """{"version":1,"partitions":""" +
"""[{"topic":"foo","partition":0,"replicas":[0,1,3],"log_dirs":["any","any","any"]},""" +
"""{"topic":"baz","partition":1,"replicas":[0,2,3],"log_dirs":["any","any","any"]}""" +
"""]}"""
assertEquals(unthrottledBrokerConfigs,
describeBrokerLevelThrottles(unthrottledBrokerConfigs.keySet.toSeq))
val interBrokerThrottle = 100L
val interBrokerThrottle = 1L
runExecuteAssignment(cluster.adminClient, false, assignment, interBrokerThrottle, -1L)
val throttledConfigMap = Map[String, Long](
brokerLevelLeaderThrottle -> interBrokerThrottle,
Expand All @@ -303,10 +301,9 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness {
// Verify that the reassignment is running. The very low throttle should keep it
// from completing before this runs.
waitForVerifyAssignment(cluster.adminClient, assignment, true,
VerifyAssignmentResult(Map(new TopicPartition("foo", 0) ->
PartitionReassignmentState(Seq(0, 1, 3, 2), Seq(0, 1, 3), false),
new TopicPartition("baz", 1) ->
PartitionReassignmentState(Seq(0, 2, 3, 1), Seq(0, 2, 3), false)),
VerifyAssignmentResult(Map(
new TopicPartition("foo", 0) -> PartitionReassignmentState(Seq(0, 1, 3, 2), Seq(0, 1, 3), false),
new TopicPartition("baz", 1) -> PartitionReassignmentState(Seq(0, 2, 3, 1), Seq(0, 2, 3), false)),
true, Map(), false))
// Cancel the reassignment.
assertEquals((Set(
Expand Down Expand Up @@ -339,13 +336,12 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness {
* information. The nested maps are keyed on throttle name.
*/
private def describeBrokerLevelThrottles(brokerIds: Seq[Int]): Map[Int, Map[String, Long]] = {
brokerIds.map {
case brokerId =>
val props = zkClient.getEntityConfigs("brokers", brokerId.toString)
(brokerId, brokerLevelThrottles.map {
case throttleName => (throttleName,
props.getOrDefault(throttleName, "-1").asInstanceOf[String].toLong)
}.toMap)
brokerIds.map { brokerId =>
val props = zkClient.getEntityConfigs("brokers", brokerId.toString)
val throttles = brokerLevelThrottles.map { throttleName =>
(throttleName, props.getOrDefault(throttleName, "-1").asInstanceOf[String].toLong)
}.toMap
brokerId -> throttles
}.toMap
}

Expand Down

0 comments on commit 3b70176

Please sign in to comment.