-
Notifications
You must be signed in to change notification settings - Fork 13.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-2978: consumer stops fetching when consumed and fetch positions get out of sync #666
Closed
Closed
Changes from all commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, it seems like we could have also solved this problem by fixing the places where the
fetched
andconsumed
get into inconsistent states. I like that we've simplified things, but I'm wondering if this also means we're ruling out (or making more complicated) possible improvements (e.g., prefetching larger chunks of data by allowing fetch requests even whenconsumed < fetched
) or if such changes would require effectively reverting the bulk of this patch.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree this is the main question with this simplification. However, keep in mind that we cannot send more than one fetch request at a time because we don't know the following offset to fetch from. So to get any advantage from allowing the fetched position to get farther than a single fetch ahead of the consumed position, we would need to initiate fetches after the last fetch returned from the server, but before the results were returned to the user. I don't see a lot of opportunity for optimization here, but I could be missing something. I think instead the way for consumers to tune the amount of data fetched is with
fetch.min.bytes
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, I was trying to figure out under what conditions it would be useful to allow for multiple fetch requests while you haven't asked for the data back. I can think of two cases:
fetch.min.bytes
, but also to be able to buffer up data so that when it's coming in fast, you'll .Both are probably fairly niche and you probably need to hit an extreme case to warrant adjusting consumer settings much, but I don't think we have a way to control it at the moment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These kind of cases are probably best left to be handled in user code. The user can collect the messages in their own batch until they have enough data to process. Until we have a really compelling use case, I think I prefer the simpler approach in this patch since maintaining
consumed
andfetched
has proven error-prone.There is one unfortunate side effect of this change which doesn't appear to impact current code, but should mentioned anyway. If you call
initFetches
while you have records available in the internal buffer, you will end up sending an unnecessary fetch. I'm checking if there's a simple way to fix this problem.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another thing to consider is memory limit on the consumer (we currently do not have such management as we did in producer, but it is tracked in KAFKA-2045). I agree that pre-fetch would be helpful with bursty network, or more generally speaking you would probably want to get some data in each selector.poll() even if there is data buffered for all partitions, as long as buffered data does not exceed the memory limit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's also worth saying that if we need to bring back this distinction in order to implement prefetching improvements, it's straightforward to bring it back. I'm also in favour of simplifying for now.