[fix][client]Fix Producer exceeds PulsarClient memory limit when sending fail timeout using CompressionType.NONE#17663
Closed
AnonHxy wants to merge 2 commits intoapache:masterfrom
Closed
Conversation
1fb329b to
fd1386f
Compare
HQebupt
reviewed
Sep 15, 2022
| target.writeBytes(raw); | ||
| raw.resetReaderIndex(); | ||
| return target; | ||
| } |
Contributor
There was a problem hiding this comment.
Do you mean: The MemoryLimitController manages memory according to the real size of message, but the message is allocated a bigger memory by BatchMessageContainerImpl?
lhotari
approved these changes
Sep 15, 2022
Member
lhotari
left a comment
There was a problem hiding this comment.
Good work.
I just wonder if the title of the PR should mention that this applies to CompressionType.NONE .
Contributor
Author
Sure |
HQebupt
approved these changes
Sep 15, 2022
merlimat
requested changes
Sep 15, 2022
| ByteBuf target = PulsarByteBufAllocator.DEFAULT.buffer(readableBytes, readableBytes); | ||
| target.writeBytes(raw); | ||
| raw.resetReaderIndex(); | ||
| return target; |
Contributor
There was a problem hiding this comment.
I don't think this is the right solution because it would cause a memory copy for all the batches.
Contributor
There was a problem hiding this comment.
+1.
- We should take the whole buffer size into account in
MemoryLimitController. Because it's allocated by our producer. - There is a memory waste problem in producer batch container, as the max buffer size only grows.
Jason918
requested changes
Sep 16, 2022
| ByteBuf target = PulsarByteBufAllocator.DEFAULT.buffer(readableBytes, readableBytes); | ||
| target.writeBytes(raw); | ||
| raw.resetReaderIndex(); | ||
| return target; |
Contributor
There was a problem hiding this comment.
+1.
- We should take the whole buffer size into account in
MemoryLimitController. Because it's allocated by our producer. - There is a memory waste problem in producer batch container, as the max buffer size only grows.
Contributor
Contributor
Author
|
closed this pr becase it' not the best solution |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Fixes #17662
Motivation
Fixes [Bug] Producer exceeds PulsarClient memory limit when sending fail timeout using CompressionType.NONE #17662
The root cause is that:
After we ever sent a big message, e.g, 2M bytes, the
AbstractBatchMessageContainer#maxBatchSizewill never be less than 2M bytes(see line171) in the following batches, even if the message size we added is 1 bytes only.pulsar/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
Lines 167 to 173 in a98f025
So when we add the first message in the following batches, we will always allocate a byte buffer greater or equals to 2M, see line102
pulsar/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
Lines 98 to 103 in a98f025
When we use
CompressionCodecNoneto encode the 2M bytes buffer it just retain the buffer, which means the 2M bytes buffer will be added toProducerImpl#pendingMessagesand will be release until timeout(30s by default). Note that we sent 1 bytes message(or batched, but it should much less that 2M), but retain 2M bytes buffer, so it will exceed PulsarClient memory limit.Modifications
Copy readable bytes to a new Bytebuf if necessary, like
CompressionCodecLZ4or otherCompressionCodecimplementVerifying this change
Documentation
doc-required(Your PR needs to update docs and you will update later)
doc-not-needed(Please explain why)
doc(Your PR contains doc changes)
doc-complete(Docs have been already added)