Skip to content

Commit

Permalink
[fix][proxy] Fix connection read timeout handling in Pulsar Proxy (#2…
Browse files Browse the repository at this point in the history
…0014)

(cherry picked from commit dd05408)
  • Loading branch information
lhotari committed Apr 6, 2023
1 parent f21a96f commit 6be0317
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.ssl.SslProvider;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.util.CharsetUtil;
import java.net.InetSocketAddress;
import java.util.Arrays;
Expand Down Expand Up @@ -206,7 +207,7 @@ protected void initChannel(SocketChannel ch) {
int brokerProxyReadTimeoutMs = service.getConfiguration().getBrokerProxyReadTimeoutMs();
if (brokerProxyReadTimeoutMs > 0) {
ch.pipeline().addLast("readTimeoutHandler",
new ProxyReadTimeoutHandler(brokerProxyReadTimeoutMs, TimeUnit.MILLISECONDS));
new ReadTimeoutHandler(brokerProxyReadTimeoutMs, TimeUnit.MILLISECONDS));
}
ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
Commands.DEFAULT_MAX_MESSAGE_SIZE + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
Expand Down Expand Up @@ -364,6 +365,9 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exce

if (service.proxyZeroCopyModeEnabled && service.proxyLogLevel == 0) {
if (!isTlsOutboundChannel && !DirectProxyHandler.this.proxyConnection.isTlsInboundChannel) {
if (ctx.pipeline().get("readTimeoutHandler") != null) {
ctx.pipeline().remove("readTimeoutHandler");
}
ProxyConnection.spliceNIC2NIC((EpollSocketChannel) ctx.channel(),
(EpollSocketChannel) inboundChannel, ProxyConnection.SPLICE_BYTES)
.addListener(future -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,9 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exce

if (service.proxyZeroCopyModeEnabled && service.proxyLogLevel == 0) {
if (!directProxyHandler.isTlsOutboundChannel && !isTlsInboundChannel) {
if (ctx.pipeline().get("readTimeoutHandler") != null) {
ctx.pipeline().remove("readTimeoutHandler");
}
spliceNIC2NIC((EpollSocketChannel) ctx.channel(),
(EpollSocketChannel) directProxyHandler.outboundChannel, SPLICE_BYTES)
.addListener(future -> {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.ssl.SslProvider;
import io.netty.handler.timeout.ReadTimeoutHandler;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.OptionalProxyProtocolDecoder;
Expand Down Expand Up @@ -104,7 +105,7 @@ protected void initChannel(SocketChannel ch) throws Exception {
}
if (brokerProxyReadTimeoutMs > 0) {
ch.pipeline().addLast("readTimeoutHandler",
new ProxyReadTimeoutHandler(brokerProxyReadTimeoutMs, TimeUnit.MILLISECONDS));
new ReadTimeoutHandler(brokerProxyReadTimeoutMs, TimeUnit.MILLISECONDS));
}
if (proxyService.getConfiguration().isHaProxyProtocolEnabled()) {
ch.pipeline().addLast(OptionalProxyProtocolDecoder.NAME, new OptionalProxyProtocolDecoder());
Expand Down

0 comments on commit 6be0317

Please sign in to comment.