-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[client] Introduce batchingMaxBytes
setting in pulsar producer
#5045
Conversation
*Motivation* The message size can vary between applications. Using number of messages to estimate the resources used for batching leads to unpredictability. This pull request introduces a new setting `batchingMaxBytes` in pulsar producer. It allows applications planning the resource usage for batching in a better way.
@@ -212,6 +212,12 @@ private ProducerBuilderImpl(PulsarClientImpl client, ProducerConfigurationData c | |||
return this; | |||
} | |||
|
|||
@Override | |||
public ProducerBuilder<T> batchingMaxBytes(int batchingMaxBytes) { | |||
conf.setBatchingMaxBytes(batchingMaxBytes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd say to do validation check here for > 0
and < MAX_FRAME_SIZE
(depending on the max frame size configured)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed the check for >0
to allow users choosing the right batching strategy.
I will add the check for MAX_FRAME_SIZE.
@@ -83,5 +89,9 @@ public void setProducer(ProducerImpl<?> producer) { | |||
.convertToWireProtocol(producer.getConfiguration().getCompressionType()); | |||
this.compressor = CompressionCodecProvider.getCompressionCodec(compressionType); | |||
this.maxNumMessagesInBatch = producer.getConfiguration().getBatchingMaxMessages(); | |||
this.maxBytesInBatch = producer.getConfiguration().getBatchingMaxBytes(); | |||
if (this.maxBytesInBatch <= 0) { | |||
this.maxBytesInBatch = MAX_MESSAGE_BATCH_SIZE_BYTES; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The constant MAX_MESSAGE_BATCH_SIZE_BYTES
is set to 128 KB which is now different from the default max size (1MB).
In any case, I'd prefer the sanity check to be in the builder itself
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
didn't realize MAX_MESSAGE_BATCH_SIZE_BYTES is 128KB. will fix
I fixed the batch container check logic for handling max message size. although I didn't add any validations in producer configuration data. because:
|
*/ | ||
ProducerBuilder<T> batchingMaxMessages(int batchMessagesMaxMessagesPerBatch); | ||
|
||
/** | ||
* Set the maximum number of bytes permitted in a batch. <i>default: 1MB</i> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* Set the maximum number of bytes permitted in a batch. <i>default: 1MB</i> | |
* Set the maximum number of bytes permitted in a batch. <i>default: 128KB</i> |
@@ -212,6 +212,12 @@ private ProducerBuilderImpl(PulsarClientImpl client, ProducerConfigurationData c | |||
return this; | |||
} | |||
|
|||
@Override | |||
public ProducerBuilder<T> batchingMaxBytes(int batchingMaxBytes) { | |||
conf.setBatchingMaxBytes(batchingMaxBytes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add check for batchingMaxBytes <= ClientCnx.getMaxMessageSize().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't do that. Because the max message size is only set when a client connects to a broker.
I explained that case in previous comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I see, max message size can be set at broker side.
protected int numMessagesInBatch = 0; | ||
protected long currentBatchSizeBytes = 0; | ||
|
||
protected static final int INITIAL_BATCH_BUFFER_SIZE = 1024; | ||
protected static final int MAX_MESSAGE_BATCH_SIZE_BYTES = 128 * 1024; | ||
protected static final int INITIAL_BATCH_BUFFER_SIZE = 128 * 1024; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to keep INITIAL_BATCH_BUFFER_SIZE as 1KB, in key_based batcher properly create lot of ByteBuf, if the INITIAL_BATCH_BUFFER_SIZE is 128KB, more memory overhead may be incurred.
We can add the option for init batch buffer size in the future, so that user can decide according to their own situation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I will probably just assign different values for different containers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, initially we were using the max batch size for this buffer, but it was using a huge amount of memory, especially when publishing on a partitioned topic.
The buffer size is self-adapting to the needed usage (eg. it will keep using the same buffer size), so I don't think we need to make it configurable.
run cpp tests |
run java8 tests |
1 similar comment
run java8 tests |
@codelipenghui @jiazhai @merlimat I have addressed all the comments. PTAL |
retest this please |
ping @jiazhai @codelipenghui @merlimat // run cpp tests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me
run cpp tests |
run java8 tests |
run java8 tests |
1 similar comment
run java8 tests |
run java8 tests |
1 similar comment
run java8 tests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
retest this please |
@sijie Please take a look the tests. |
run java8 tests |
Motivation
The message size can vary between applications. Using number of messages to estimate
the resources used for batching leads to unpredictability.
This pull request introduces a new setting
batchingMaxBytes
in pulsar producer.It allows applications planning the resource usage for batching in a better way.