feat: Add max message per batch option#14
Merged
Conversation
Codecov Report
@@ Coverage Diff @@
## master #14 +/- ##
============================================
+ Coverage 57.25% 58.28% +1.02%
- Complexity 79 81 +2
============================================
Files 17 17
Lines 503 525 +22
Branches 16 18 +2
============================================
+ Hits 288 306 +18
- Misses 212 216 +4
Partials 3 3
Continue to review full report at Codecov.
|
dpcollins-google
suggested changes
Jan 26, 2021
| Map<Partition, SparkPartitionOffset> map = new HashMap<>(); | ||
| for (int i = 0; i < topicPartitionCount; i++) { | ||
| Partition p = Partition.of(i); | ||
| SparkPartitionOffset emptyPartition = SparkPartitionOffset.create(p, -1L); |
Contributor
There was a problem hiding this comment.
not for here, but can you check that this doesn't error out when the partition is actually empty?
Contributor
Author
There was a problem hiding this comment.
will do in real test.
dpcollins-google
approved these changes
Jan 28, 2021
Contributor
dpcollins-google
left a comment
There was a problem hiding this comment.
Approved. Consider addressing nits before submitting
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
With only PSL flow control, it's not enough to stop Spark executors from OOMing when the batch is large (large backlog). This adds support for hard limiting the max message per batch.
Note that startOffset + batch_offset_range might land on a non-message offset. Under the assumptions that headoffset - 1 is a message offset (the only corner case is broker ungracefully shut down and no new message to unblock), startOffset + batch_offset_range will be unblocked even if it is not a non-message offset.
Staging tested that adding this new option it won't OOM anymore.
EDIT: actually kafka has same parameter https://screenshot.googleplex.com/57nu34SUfMvcRr5.