Skip to content

Commit

Permalink
[Proxy] Fix proxy connection leak when inbound connection closes whil…
Browse files Browse the repository at this point in the history
…e connecting is in progress (apache#15366)

- backports apache#15366 to branch-2.7
  • Loading branch information
lhotari committed Apr 29, 2022
1 parent d0ac074 commit 4621ca6
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,7 @@ public class DirectProxyHandler {
private final ProxyService service;
private final Runnable onHandshakeCompleteAction;

public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection, String brokerHostAndPort,
InetSocketAddress targetBrokerAddress, int protocolVersion,
Supplier<SslHandler> sslHandlerSupplier) {
public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection) {
this.service = service;
this.authentication = proxyConnection.getClientAuthentication();
this.inboundChannel = proxyConnection.ctx().channel();
Expand All @@ -92,6 +90,10 @@ public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection,
this.clientAuthData = proxyConnection.clientAuthData;
this.clientAuthMethod = proxyConnection.clientAuthMethod;
this.onHandshakeCompleteAction = proxyConnection::cancelKeepAliveTask;
}

public void connect(String brokerHostAndPort, InetSocketAddress targetBrokerAddress,
int protocolVersion, Supplier<SslHandler> sslHandlerSupplier) {
ProxyConfiguration config = service.getConfiguration();

// Start the connection attempt.
Expand Down Expand Up @@ -208,6 +210,12 @@ private ByteBuf encodeProxyProtocolMessage(HAProxyMessage msg) {
(byte) 'Y',
};

public void close() {
if (outboundChannel != null) {
outboundChannel.close();
}
}

enum BackendState {
Init, HandshakeCompleted
}
Expand Down Expand Up @@ -344,10 +352,7 @@ protected void handleConnected(CommandConnected connected) {
onHandshakeCompleteAction.run();
startDirectProxying(connected);

int maxMessageSize =
connected.hasMaxMessageSize() ? connected.getMaxMessageSize() : Commands.INVALID_MAX_MESSAGE_SIZE;
inboundChannel.writeAndFlush(Commands.newConnected(connected.getProtocolVersion(), maxMessageSize))
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
proxyConnection.brokerConnected(DirectProxyHandler.this, connected);
}

private void startDirectProxying(CommandConnected connected) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@
package org.apache.pulsar.proxy.server;

import static com.google.common.base.Preconditions.checkArgument;

import static com.google.common.base.Preconditions.checkState;
import io.netty.channel.ChannelFutureListener;
import io.netty.handler.codec.haproxy.HAProxyMessage;
import io.netty.resolver.dns.DnsNameResolver;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -51,6 +53,7 @@
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnected;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema;
Expand Down Expand Up @@ -113,6 +116,9 @@ enum State {
// Follow redirects
ProxyLookupRequests,

// Connecting to the broker
ProxyConnectingToBroker,

// If we are proxying a connection to a specific broker, we
// are just forwarding data between the 2 connections, without
// looking into it
Expand Down Expand Up @@ -166,8 +172,8 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
public synchronized void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);

if (directProxyHandler != null && directProxyHandler.outboundChannel != null) {
directProxyHandler.outboundChannel.close();
if (directProxyHandler != null) {
directProxyHandler.close();
directProxyHandler = null;
}

Expand All @@ -188,11 +194,22 @@ public synchronized void channelInactive(ChannelHandlerContext ctx) throws Excep

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
state = State.Closing;
super.exceptionCaught(ctx, cause);
LOG.warn("[{}] Got exception {} : {} {}", remoteAddress, cause.getClass().getSimpleName(), cause.getMessage(),
LOG.warn("[{}] Got exception {} : Message: {} State: {}", remoteAddress, cause.getClass().getSimpleName(),
cause.getMessage(), state,
ClientCnx.isKnownException(cause) ? null : cause);
ctx.close();
if (state != State.Closed) {
state = State.Closing;
}
if (ctx.channel().isOpen()) {
ctx.close();
} else {
// close connection to broker if that is present
if (directProxyHandler != null) {
directProxyHandler.close();
directProxyHandler = null;
}
}
}

@Override
Expand Down Expand Up @@ -221,18 +238,26 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exce
break;

case ProxyConnectionToBroker:
// Pass the buffer to the outbound connection and schedule next read
// only if we can write on the connection
ProxyService.opsCounter.inc();
if (msg instanceof ByteBuf) {
int bytes = ((ByteBuf) msg).readableBytes();
directProxyHandler.getInboundChannelRequestsRate().recordEvent(bytes);
ProxyService.bytesCounter.inc(bytes);
if (directProxyHandler != null) {
ProxyService.opsCounter.inc();
if (msg instanceof ByteBuf) {
int bytes = ((ByteBuf) msg).readableBytes();
directProxyHandler.getInboundChannelRequestsRate().recordEvent(bytes);
ProxyService.bytesCounter.inc(bytes);
}
directProxyHandler.outboundChannel.writeAndFlush(msg)
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
} else {
LOG.warn("Received message of type {} while connection to broker is missing in state {}. "
+ "Dropping the input message (readable bytes={}).", msg.getClass(), state,
msg instanceof ByteBuf ? ((ByteBuf) msg).readableBytes() : -1);
}
directProxyHandler.outboundChannel.writeAndFlush(msg)
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
break;

case ProxyConnectingToBroker:
LOG.warn("Received message of type {} while connecting to broker. "
+ "Dropping the input message (readable bytes={}).", msg.getClass(),
msg instanceof ByteBuf ? ((ByteBuf) msg).readableBytes() : -1);
break;
default:
break;
}
Expand Down Expand Up @@ -278,14 +303,9 @@ private synchronized void completeConnect(AuthData clientData) throws PulsarClie
return;
}

state = State.ProxyConnectingToBroker;
brokerProxyValidator.resolveAndCheckTargetAddress(proxyToBrokerUrl)
.thenAcceptAsync(address -> {
// Client already knows which broker to connect. Let's open a
// connection there and just pass bytes in both directions
state = State.ProxyConnectionToBroker;
directProxyHandler = new DirectProxyHandler(service, this, proxyToBrokerUrl, address,
protocolVersionToAdvertise, sslHandlerSupplier);
}, ctx.executor())
.thenAcceptAsync(this::connectToBroker, ctx.executor())
.exceptionally(throwable -> {
if (throwable instanceof TargetAddressDeniedException
|| throwable.getCause() instanceof TargetAddressDeniedException) {
Expand Down Expand Up @@ -318,6 +338,43 @@ private synchronized void completeConnect(AuthData clientData) throws PulsarClie
}
}

private void handleBrokerConnected(DirectProxyHandler directProxyHandler, CommandConnected connected) {
checkState(ctx.executor().inEventLoop(), "This method should be called in the event loop");
if (state == State.ProxyConnectingToBroker && ctx.channel().isOpen() && this.directProxyHandler == null) {
this.directProxyHandler = directProxyHandler;
state = State.ProxyConnectionToBroker;
int maxMessageSize =
connected.hasMaxMessageSize() ? connected.getMaxMessageSize() : Commands.INVALID_MAX_MESSAGE_SIZE;
ctx.writeAndFlush(Commands.newConnected(connected.getProtocolVersion(), maxMessageSize))
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
} else {
LOG.warn("[{}] Channel is {}. ProxyConnection is in {}. "
+ "Closing connection to broker '{}'.",
remoteAddress, ctx.channel().isOpen() ? "open" : "already closed",
state != State.ProxyConnectingToBroker ? "invalid state " + state : "state " + state,
proxyToBrokerUrl);
directProxyHandler.close();
ctx.close();
}
}

private void connectToBroker(InetSocketAddress brokerAddress) {
checkState(ctx.executor().inEventLoop(), "This method should be called in the event loop");
DirectProxyHandler directProxyHandler = new DirectProxyHandler(service, this);
directProxyHandler.connect(proxyToBrokerUrl, brokerAddress,
protocolVersionToAdvertise, sslHandlerSupplier);
}

public void brokerConnected(DirectProxyHandler directProxyHandler, CommandConnected connected) {
try {
final CommandConnected finalConnected = connected.toBuilder().build();
ctx.executor().submit(() -> handleBrokerConnected(directProxyHandler, finalConnected));
} catch (RejectedExecutionException e) {
LOG.error("Event loop was already closed. Closing broker connection.", e);
directProxyHandler.close();
}
}

// According to auth result, send newConnected or newAuthChallenge command.
private void doAuthentication(AuthData clientData) throws Exception {
AuthData brokerData = authState.authenticate(clientData);
Expand Down

0 comments on commit 4621ca6

Please sign in to comment.