Skip to content

Commit

Permalink
Forget to recycle MessageMetadata protobuf object (#1541)
Browse files Browse the repository at this point in the history
* Forget to recycle MessageMetadata protobuf object

* Fixed merge
  • Loading branch information
merlimat committed Apr 16, 2018
1 parent cc9a9d9 commit bc0a4b8
Showing 1 changed file with 17 additions and 14 deletions.
Expand Up @@ -260,7 +260,7 @@ public void sendAsync(Message<T> message, SendCallback callback) {
}

MessageImpl<T> msg = (MessageImpl<T>) message;
MessageMetadata.Builder msgMetadata = msg.getMessageBuilder();
MessageMetadata.Builder msgMetadataBuilder = msg.getMessageBuilder();
ByteBuf payload = msg.getDataBuffer();

// If compression is enabled, we are compressing, otherwise it will simply use the same buffer
Expand All @@ -286,35 +286,35 @@ public void sendAsync(Message<T> message, SendCallback callback) {
return;
}

if (!msg.isReplicated() && msgMetadata.hasProducerName()) {
if (!msg.isReplicated() && msgMetadataBuilder.hasProducerName()) {
callback.sendComplete(new PulsarClientException.InvalidMessageException("Cannot re-use the same message"));
compressedPayload.release();
return;
}

if (schemaVersion.isPresent()) {
msgMetadata.setSchemaVersion(ByteString.copyFrom(schemaVersion.get()));
msgMetadataBuilder.setSchemaVersion(ByteString.copyFrom(schemaVersion.get()));
}

try {
synchronized (this) {
long sequenceId;
if (!msgMetadata.hasSequenceId()) {
if (!msgMetadataBuilder.hasSequenceId()) {
sequenceId = msgIdGeneratorUpdater.getAndIncrement(this);
msgMetadata.setSequenceId(sequenceId);
msgMetadataBuilder.setSequenceId(sequenceId);
} else {
sequenceId = msgMetadata.getSequenceId();
sequenceId = msgMetadataBuilder.getSequenceId();
}
if (!msgMetadata.hasPublishTime()) {
msgMetadata.setPublishTime(System.currentTimeMillis());
if (!msgMetadataBuilder.hasPublishTime()) {
msgMetadataBuilder.setPublishTime(System.currentTimeMillis());

checkArgument(!msgMetadata.hasProducerName());
checkArgument(!msgMetadataBuilder.hasProducerName());

msgMetadata.setProducerName(producerName);
msgMetadataBuilder.setProducerName(producerName);

if (conf.getCompressionType() != CompressionType.NONE) {
msgMetadata.setCompression(convertCompressionType(conf.getCompressionType()));
msgMetadata.setUncompressedSize(uncompressedSize);
msgMetadataBuilder.setCompression(convertCompressionType(conf.getCompressionType()));
msgMetadataBuilder.setUncompressedSize(uncompressedSize);
}
}

Expand All @@ -332,8 +332,11 @@ public void sendAsync(Message<T> message, SendCallback callback) {
doBatchSendAndAdd(msg, callback, payload);
}
} else {
ByteBuf encryptedPayload = encryptMessage(msgMetadata, compressedPayload);
ByteBufPair cmd = sendMessage(producerId, sequenceId, 1, msgMetadata.build(), encryptedPayload);
ByteBuf encryptedPayload = encryptMessage(msgMetadataBuilder, compressedPayload);

MessageMetadata msgMetadata = msgMetadataBuilder.build();
ByteBufPair cmd = sendMessage(producerId, sequenceId, 1, msgMetadata, encryptedPayload);
msgMetadataBuilder.recycle();
msgMetadata.recycle();

final OpSendMsg op = OpSendMsg.create(msg, cmd, sequenceId, callback);
Expand Down

0 comments on commit bc0a4b8

Please sign in to comment.