Skip to content

Commit

Permalink
Reduce size of buffer used to assemble batches (#1204)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat committed Feb 8, 2018
1 parent b538165 commit 3615e25
Showing 1 changed file with 13 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.apache.pulsar.client.impl;

import static com.google.common.base.Preconditions.checkArgument;

import java.util.List;

import org.apache.pulsar.common.api.Commands;
Expand Down Expand Up @@ -59,7 +57,12 @@ class BatchMessageContainer {
// keep track of callbacks for individual messages being published in a batch
SendCallback firstCallback;

protected static final long MAX_MESSAGE_BATCH_SIZE_BYTES = 128 * 1024;
private static final int INITIAL_BATCH_BUFFER_SIZE = 1024;
protected static final int MAX_MESSAGE_BATCH_SIZE_BYTES = 128 * 1024;

// This will be the largest size for a batch sent from this particular producer. This is used as a baseline to
// allocate a new buffer that can hold the entire batch without needing costly reallocations
private int maxBatchSize = INITIAL_BATCH_BUFFER_SIZE;

BatchMessageContainer(int maxNumMessagesInBatch, PulsarApi.CompressionType compressionType, String topicName,
String producerName) {
Expand Down Expand Up @@ -88,8 +91,8 @@ void add(MessageImpl msg, SendCallback callback) {
// the first message
sequenceId = Commands.initBatchMessageMetadata(messageMetadata, msg.getMessageBuilder());
this.firstCallback = callback;
batchedMessageMetadataAndPayload = PooledByteBufAllocator.DEFAULT.buffer((int) MAX_MESSAGE_BATCH_SIZE_BYTES,
(int) (PulsarDecoder.MaxMessageSize));
batchedMessageMetadataAndPayload = PooledByteBufAllocator.DEFAULT
.buffer(Math.min(maxBatchSize, MAX_MESSAGE_BATCH_SIZE_BYTES), PulsarDecoder.MaxMessageSize);
}

if (previousCallback != null) {
Expand All @@ -113,6 +116,10 @@ ByteBuf getCompressedBatchMetadataAndPayload() {
messageMetadata.setCompression(compressionType);
messageMetadata.setUncompressedSize(uncompressedSize);
}

// Update the current max batch size using the uncompressed size, which is what we need in any case to
// accumulate the batch content
maxBatchSize = Math.max(maxBatchSize, uncompressedSize);
return compressedPayload;
}

Expand All @@ -137,6 +144,7 @@ void clear() {
numMessagesInBatch = 0;
currentBatchSizeBytes = 0;
sequenceId = -1;
batchedMessageMetadataAndPayload = null;
}

boolean isEmpty() {
Expand Down

0 comments on commit 3615e25

Please sign in to comment.