From 1ffda4ca957bd2fcc87fa793e645ab820d9e4629 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 22 May 2024 10:43:57 +0300 Subject: [PATCH] [fix][broker] Fix multiple transfer corruption issues when TLS is enabled --- .../pulsar/common/protocol/ByteBufPair.java | 41 ++++++++++++++++--- 1 file changed, 35 insertions(+), 6 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java index cfd89d3bb28ab..b199df8c92606 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java @@ -20,6 +20,7 @@ import com.google.common.annotations.VisibleForTesting; import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; @@ -141,19 +142,47 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) if (msg instanceof ByteBufPair) { ByteBufPair b = (ByteBufPair) msg; + ChannelPromise compositePromise = ctx.newPromise(); + compositePromise.addListener(future -> { + // release the ByteBufPair after the write operation is completed + ReferenceCountUtil.safeRelease(b); + // complete the promise passed as an argument unless it's a void promise + if (!promise.isVoid()) { + if (future.isSuccess()) { + promise.setSuccess(); + } else { + promise.setFailure(future.cause()); + } + } + }); + // Some handlers in the pipeline will modify the bytebufs passed in to them (i.e. SslHandler). // For these handlers, we need to pass a copy of the buffers as the source buffers may be cached // for multiple requests. - try { - ctx.write(b.getFirst().copy(), ctx.voidPromise()); - ctx.write(b.getSecond().copy(), promise); - } finally { - ReferenceCountUtil.safeRelease(b); - } + ctx.write(nioBufferCopy(b.getFirst()), ctx.voidPromise()); + ctx.write(nioBufferCopy(b.getSecond()), compositePromise); } else { ctx.write(msg, promise); } } + + // Make a shallow copy of the ByteBuf using ByteBuf.nioBuffers()/nioBuffer() method. + // This is needed since SslHandler will call internalNioBuffer methods on the ByteBuf instance which is + // not thread safe when the ByteBuf instance is shared across multiple threads. + // This method works around the issue. + // Notice: The original ByteBuf continues to control the lifecycle of the underlying memory allocation. + // This is fine in this case since the ByteBufPair keeps the reference counts, and it is released after + // the write method completes. + private ByteBuf nioBufferCopy(ByteBuf buf) { + // avoid calling nioBufferCount() for performance reasons on CompositeByteBuf + // there's a similar optimization in Netty's SslHandler.wrap method where this is explained + if (buf instanceof CompositeByteBuf || buf.nioBufferCount() > 1) { + return Unpooled.wrappedBuffer(buf.nioBuffers()); + } else { + // Single buffer, no need to wrap it in an array as the nioBuffers() method would do + return Unpooled.wrappedBuffer(buf.nioBuffer()); + } + } } }