Skip to content

Commit

Permalink
MINOR: Hopefully fix flaky FetchFromFollowerIntegrationTest.testRackA…
Browse files Browse the repository at this point in the history
…wareRangeAssignor (#13754)

This test fails regularly in CI ([example](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-13753/1/tests)). This is an attempt to stabilize it.

Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
  • Loading branch information
dajac committed May 30, 2023
1 parent 45520c1 commit d557f79
Showing 1 changed file with 10 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -200,20 +200,27 @@ class FetchFromFollowerIntegrationTest extends BaseFetchRequestTest {
val assignments = partitionOrder.map { p =>
topics.map(topic => new TopicPartition(topic, p)).toSet
}

val assignmentFutures = consumers.zipWithIndex.map { case (consumer, i) =>
executor.submit(() => {
val expectedAssignment = assignments(i)
TestUtils.pollUntilTrue(consumer, () => consumer.assignment() == expectedAssignment.asJava,
s"Timed out while awaiting expected assignment $expectedAssignment. The current assignment is ${consumer.assignment()}")
}, 0)
}
assignmentFutures.foreach(future => assertEquals(0, future.get(20, TimeUnit.SECONDS)))
assignmentFutures.foreach(future => assertEquals(0, future.get(30, TimeUnit.SECONDS)))

assignments.flatten.foreach { tp =>
producer.send(new ProducerRecord(tp.topic, tp.partition, s"key-$tp".getBytes, s"value-$tp".getBytes))
}
consumers.zipWithIndex.foreach { case (consumer, i) =>
val records = TestUtils.pollUntilAtLeastNumRecords(consumer, assignments(i).size)

val recordFutures = consumers.zipWithIndex.map { case (consumer, i) =>
executor.submit(() => {
TestUtils.pollUntilAtLeastNumRecords(consumer, assignments(i).size, waitTimeMs = 30000)
})
}
recordFutures.zipWithIndex.foreach { case (future, i) =>
val records = future.get(30, TimeUnit.SECONDS)
assertEquals(assignments(i), records.map(r => new TopicPartition(r.topic, r.partition)).toSet)
}
}
Expand Down

0 comments on commit d557f79

Please sign in to comment.