Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Fix multiple transfer corruption issues when TLS is enabled #22760

Closed
wants to merge 10 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
Expand Down Expand Up @@ -141,19 +142,47 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
if (msg instanceof ByteBufPair) {
ByteBufPair b = (ByteBufPair) msg;

ChannelPromise compositePromise = ctx.newPromise();
compositePromise.addListener(future -> {
// release the ByteBufPair after the write operation is completed
ReferenceCountUtil.safeRelease(b);
// complete the promise passed as an argument unless it's a void promise
if (!promise.isVoid()) {
if (future.isSuccess()) {
promise.setSuccess();
} else {
promise.setFailure(future.cause());
}
}
});

// Some handlers in the pipeline will modify the bytebufs passed in to them (i.e. SslHandler).
// For these handlers, we need to pass a copy of the buffers as the source buffers may be cached
// for multiple requests.
try {
ctx.write(b.getFirst().copy(), ctx.voidPromise());
ctx.write(b.getSecond().copy(), promise);
} finally {
ReferenceCountUtil.safeRelease(b);
}
ctx.write(nioBufferCopy(b.getFirst()), ctx.voidPromise());
ctx.write(nioBufferCopy(b.getSecond()), compositePromise);
} else {
ctx.write(msg, promise);
}
}

// Make a shallow copy of the ByteBuf using ByteBuf.nioBuffers()/nioBuffer() method.
// This is needed since SslHandler will call internalNioBuffer methods on the ByteBuf instance which is
// not thread safe when the ByteBuf instance is shared across multiple threads.
// This method works around the issue.
// Notice: The original ByteBuf continues to control the lifecycle of the underlying memory allocation.
// This is fine in this case since the ByteBufPair keeps the reference counts, and it is released after
// the write method completes.
private ByteBuf nioBufferCopy(ByteBuf buf) {
// avoid calling nioBufferCount() for performance reasons on CompositeByteBuf
// there's a similar optimization in Netty's SslHandler.wrap method where this is explained
if (buf instanceof CompositeByteBuf || buf.nioBufferCount() > 1) {
return Unpooled.wrappedBuffer(buf.nioBuffers());
} else {
// Single buffer, no need to wrap it in an array as the nioBuffers() method would do
return Unpooled.wrappedBuffer(buf.nioBuffer());
}
}
}

}
Loading