New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added a test case to validate no message loss during rebalance #1016
Conversation
At least one pull request committer is not linked to a user. See https://help.github.com/en/articles/why-are-my-commits-linked-to-the-wrong-user#commits-are-not-linked-to-any-user |
@jhooda Thank you for taking the time to implement a test. I did a quick pass and nothing jumps out at me about your setup. I would expect to see message duplication since rebalances are being triggered, but not message loss. I'll dig into this test soon. |
@seglo thanks for quick response. Below is a quick view of 100 iterations of the test showing log file size. All log files with size > 2.6K are tests that failed. One of the failure run log file (gz) was also attached in the original pull request. ubuntu@ubuntu-usnorth1:~/Downloads/alpakka-kafka/logs$ ls -lrt Message loss grep: |
@seglo After analyzing the logs there seems to be some race condition between the older dying consumers and the new consumers: offsets are missed/jumped by the new consumers in a multiple of chunk determined by the config setting ConsumerConfig.MAX_POLL_RECORDS_CONFIG (e.g., "20" in this test case). Most likely Committer.sink is corrupting the offset start value for the second poll polled by the new consumers. (This is very similar to #336 ) |
Hi @jhooda |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suspected the mapAsyncUnordered
causes the message-loss, but that was alright.
t2 = subscribeAndConsumeMessages(consumerClientId2, probe2subscription, topicCount, partitionCount) | ||
control2 = t2._1; probe2 = t2._2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the purpose of re-using these variables?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There isn't much purpose, except to make them follow the client suffix, e.g., control2, probe2 are for client2, useful to correlate when that particular client is closed and restarted. Looks like you took care of it in #1017. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could mapAsyncUnordered in Committer::batchFlow cause this? Can we make Committer listen to rebalance events and skip commits emitted by defunct consumers?
Purpose
Added a test case that shows sometime offsets are skipped during re-balance
when multiple consumers belong to same group
References
References #382, #589, #946, #336
Changes
When the submitted test is run multiple times, we observed one failure approximately
every ten test runs on
OS: ubuntu 14.04LTS (6-cores),
java version: "1.8.0_144", Java HotSpot(TM) 64-Bit Server VM (build 25.144-b01, mixed mode))
Background Context
There was a discussion in #946 in which @seglo suggested to submit a test to demonstrate that offsets are skipped with a Spec test.
my_test_run_loop_63510_77.txt.gz
my_test_run_loop.sh.txt