New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-2978: consumer stops fetching when consumed and fetch positions get out of sync #666
Conversation
@guozhangwang Ping for review. I replaced the consumed/fetched variables with a single position. This seems to work fine with pre-fetching and it simplifies the code. Have a look and let me know if I missed something. |
I was a little confused with boxing/unboxing of long/Long and ==/equals, but the updates look ok. |
Michal tested this branch and it fixes the issue he was seeing. Is there a way we can write a test (system or otherwise) that captures the problem? |
@ijuma I was thinking about that. I think the main reason we didn't catch it in our system tests is that we weren't looking for it. In our current tests, we check after every rebalance that at least some messages are consumed, but we should strengthen that to assert that some messages from each partition are consumed. If it seems reasonable, I'll open a separate JIRA for this. |
That sounds good to me @hachikuji |
fetch.put(partition, new FetchRequest.PartitionData(fetched, this.fetchSize)); | ||
long position = this.subscriptions.position(partition); | ||
fetch.put(partition, new FetchRequest.PartitionData(position, this.fetchSize)); | ||
log.debug("Added fetch request for partition {} at offset {}", partition, position); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we might want to consider dropping some of these log.debug
s to log.trace
. Some of the logs in error conditions make sense at debug
, but logging every fetch request and response at debug
might make changing from info
to debug
a bit overwhelming.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Trace level is fine with me.
LGTM. @hachikuji do you want to change the logging level before merging. Also a good idea to strengthen system tests for this case. |
@guozhangwang I reduced the logging to trace level along the "happy paths." The build failure looks like an instance of KAFKA-2442. I can make the changes to the system tests here, but would that make cherry-picking more difficult for the 0.9.0 branch? |
IMO system tests could be done as a separate JIRA since this branch was validated by the reporter of this issue. |
@hachikuji did you run the system tests on this branch? If not, maybe a good idea to do it? |
@ijuma I haven't run all system tests, but I've run consumer_test.py a couple times without problems. |
FYI: I created KAFKA-2989 to improve the system tests. |
… get out of sync Author: Jason Gustafson <jason@confluent.io> Reviewers: Michal Turek, Ismael Juma, Guozhang Wang Closes #666 from hachikuji/KAFKA-2978 (cherry picked from commit e08b922) Signed-off-by: Guozhang Wang <wangguoz@gmail.com>
Merged to trunk and 0.9.0. |
No description provided.