Skip to content

Commit

Permalink
Copy ByteBufPair buffers when using with SSL (#2401) (#2464)
Browse files Browse the repository at this point in the history
The netty SSL handler uses a coalescing buffer queue, which modifies
the buffers used to queue the writes so that SSL_write can be given
larger chunks, thereby increasing the 'goodput'.

If we pass in a retained duplicate as we have been doing until now,
then later clients will be passed junk, as SSL will have modified cached
entry buffers.

This patch introduces a copying ByteBufPair encoder, which is only
used with SSL connections.
  • Loading branch information
ivankelly authored and merlimat committed Aug 31, 2018
1 parent 79b88cf commit 34e9fd6
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,11 @@ protected void initChannel(SocketChannel ch) throws Exception {
serviceConfig.getTlsCiphers(), serviceConfig.getTlsProtocols(),
serviceConfig.getTlsRequireTrustedClientCertOnConnect());
ch.pipeline().addLast(TLS_HANDLER, sslCtx.newHandler(ch.alloc()));
ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.COPYING_ENCODER);
} else {
ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
}

ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(PulsarDecoder.MaxFrameSize, 0, 4, 0, 4));
ch.pipeline().addLast("handler", new ServerCnx(pulsar));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,10 @@ public void initChannel(SocketChannel ch) throws Exception {
conf.getTlsTrustCertsFilePath());
}
ch.pipeline().addLast(TLS_HANDLER, sslCtx.newHandler(ch.alloc()));
ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.COPYING_ENCODER);
} else {
ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
}

ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(MaxMessageSize, 0, 4, 0, 4));
ch.pipeline().addLast("handler", clientCnxSupplier.get());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public ReferenceCounted touch(Object hint) {
}

public static final Encoder ENCODER = new Encoder();
public static final CopyingEncoder COPYING_ENCODER = new CopyingEncoder();

@Sharable
public static class Encoder extends ChannelOutboundHandlerAdapter {
Expand All @@ -132,4 +133,26 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
}
}

}
@Sharable
public static class CopyingEncoder extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg instanceof ByteBufPair) {
ByteBufPair b = (ByteBufPair) msg;

// Some handlers in the pipeline will modify the bytebufs passed in to them (i.e. SslHandler).
// For these handlers, we need to pass a copy of the buffers as the source buffers may be cached
// for multiple requests.
try {
ctx.write(b.getFirst().copy(), ctx.voidPromise());
ctx.write(b.getSecond().copy(), promise);
} finally {
ReferenceCountUtil.safeRelease(b);
}
} else {
ctx.write(msg, promise);
}
}
}

}

0 comments on commit 34e9fd6

Please sign in to comment.