-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-8264: decrease the record size for flaky test #8885
Conversation
… all records in time
I am not familiar with the details of this test and would leave it to others to review and merge. Maybe @omkreddy can help? |
@omkreddy , could you help review this small PR? Thanks. |
retest this please |
@@ -800,7 +800,7 @@ class PlaintextConsumerTest extends BaseConsumerTest { | |||
awaitAssignment(consumer, partitions.toSet) | |||
|
|||
val producer = createProducer() | |||
val producerRecords = partitions.flatMap(sendRecords(producer, partitionCount, _)) | |||
val producerRecords = partitions.flatMap(sendRecords(producer, numRecords = 15, _)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test aims to send records to all the partitions (30). with this change, we only send to 15 partitions. maybe we need to enable debug logs to understand the root cause.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hi @omkreddy , no, with this change, we won't only sent to 15 partitions .
The variable partitions
in this line, is the total partitions in all topics, that is, we have 3 topics, and 30 partitions for each in the test, and the variable partitions
is with size of 90. And here, we send the records to all the 90 partitions with numRecords
records for each.
So, in the test, we first collect all the partitions, and then send each partition with the numRecords.
def testLowMaxFetchSizeForRequestAndPartition(): Unit = {
val topic1 = "topic1"
val topic2 = "topic2"
val topic3 = "topic3"
val partitionCount = 30
val topics = Seq(topic1, topic2, topic3)
....
// we collect all the TopicPartition info, that is, the partitions.size() will be 90 after this line
val partitions = topics.flatMap { topic =>
(0 until partitionCount).map(new TopicPartition(topic, _))
}
....
// so later, we send the records to all the 90 partitions with `numRecords` records for each.
// that is, the change to the number of the records sent won't affect the test itself
val producerRecords = partitions.flatMap(sendRecords(producer, partitionCount, _))
...
}
Thank you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation. I am still not sure, whey we are not able consume in 60seconds. Let us see, if can reproduce the issues with this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Thank you.
retest this please |
1 similar comment
retest this please |
hi @omkreddy , after running tests for 2 times, do you think we should run more tests for it? Thanks. |
retest this please |
@hachikuji , could you help review this small PR to fix flaky test? Thanks. |
@hachikuji , please help review this small PR. Thanks. |
hi @omkreddy , looks like @hachikuji is not available recently. Do you think we still need other people's comment? I think this change should be pretty straightforward and safe. Thanks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@showuon Thanks for the PR. LGTM.
This flaky test exists for a long time, and it happened more frequently recently. (also happened in my PR testing!! :( ) In KAFKA-8264 and KAFKA-8460, it described the issue for this test is that
I did some investigation. This test is to test:
And what it did, is to create 3 topics and 30 partitions for each. And then, iterate through all 90 partitions to send 30 records for each. Finally, verify the we can consume all the records successfully.
What the error message saying is that it cannot consume all the records in time (might be the busy system) So, we can actually decrease the record size to avoid it. I checked all the error messages we collected in KAFKA-8264 and KAFKA-8460, the failed cases can always consume at least 1440 up (total is 2700). So, I set the records half size of the original setting, it'll become 1350 records in total. It should make this test more stable.
Committer Checklist (excluded from commit message)