Skip to content

Commit

Permalink
[fix][broker] Fix multiple transfer corruption issues when TLS is ena…
Browse files Browse the repository at this point in the history
…bled
  • Loading branch information
lhotari committed May 22, 2024
1 parent 13806d7 commit 1ffda4c
Showing 1 changed file with 35 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
Expand Down Expand Up @@ -141,19 +142,47 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
if (msg instanceof ByteBufPair) {
ByteBufPair b = (ByteBufPair) msg;

ChannelPromise compositePromise = ctx.newPromise();
compositePromise.addListener(future -> {
// release the ByteBufPair after the write operation is completed
ReferenceCountUtil.safeRelease(b);
// complete the promise passed as an argument unless it's a void promise
if (!promise.isVoid()) {
if (future.isSuccess()) {
promise.setSuccess();
} else {
promise.setFailure(future.cause());
}
}
});

// Some handlers in the pipeline will modify the bytebufs passed in to them (i.e. SslHandler).
// For these handlers, we need to pass a copy of the buffers as the source buffers may be cached
// for multiple requests.
try {
ctx.write(b.getFirst().copy(), ctx.voidPromise());
ctx.write(b.getSecond().copy(), promise);
} finally {
ReferenceCountUtil.safeRelease(b);
}
ctx.write(nioBufferCopy(b.getFirst()), ctx.voidPromise());
ctx.write(nioBufferCopy(b.getSecond()), compositePromise);
} else {
ctx.write(msg, promise);
}
}

// Make a shallow copy of the ByteBuf using ByteBuf.nioBuffers()/nioBuffer() method.
// This is needed since SslHandler will call internalNioBuffer methods on the ByteBuf instance which is
// not thread safe when the ByteBuf instance is shared across multiple threads.
// This method works around the issue.
// Notice: The original ByteBuf continues to control the lifecycle of the underlying memory allocation.
// This is fine in this case since the ByteBufPair keeps the reference counts, and it is released after
// the write method completes.
private ByteBuf nioBufferCopy(ByteBuf buf) {
// avoid calling nioBufferCount() for performance reasons on CompositeByteBuf
// there's a similar optimization in Netty's SslHandler.wrap method where this is explained
if (buf instanceof CompositeByteBuf || buf.nioBufferCount() > 1) {
return Unpooled.wrappedBuffer(buf.nioBuffers());
} else {
// Single buffer, no need to wrap it in an array as the nioBuffers() method would do
return Unpooled.wrappedBuffer(buf.nioBuffer());
}
}
}

}

0 comments on commit 1ffda4c

Please sign in to comment.