Skip to content

Commit

Permalink
Ensure RCS client connection use remote cluster specific TCP settings (
Browse files Browse the repository at this point in the history
…#94423)

RCS remote cluster has separate tcp settings, e.g.
remote_cluster.tcp.keepalive. They default to the corresponding main
transport settings, but can also have different values. This PR ensures
that if different values are specified, they are used for remote cluster
client connections.
  • Loading branch information
ywangd committed Mar 22, 2023
1 parent 12ab625 commit 7ff4352
Show file tree
Hide file tree
Showing 4 changed files with 571 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ public class Netty4Transport extends TcpTransport {
);

public static final Setting<Integer> NETTY_BOSS_COUNT = intSetting("transport.netty.boss_count", 1, 1, Property.NodeScope);
public static final ChannelOption<Integer> OPTION_TCP_KEEP_IDLE = NioChannelOption.of(NetUtils.getTcpKeepIdleSocketOption());
public static final ChannelOption<Integer> OPTION_TCP_KEEP_INTERVAL = NioChannelOption.of(NetUtils.getTcpKeepIntervalSocketOption());
public static final ChannelOption<Integer> OPTION_TCP_KEEP_COUNT = NioChannelOption.of(NetUtils.getTcpKeepCountSocketOption());

private final SharedGroupFactory sharedGroupFactory;
private final RecvByteBufAllocator recvByteBufAllocator;
Expand Down Expand Up @@ -160,24 +163,21 @@ private Bootstrap createClientBootstrap(SharedGroupFactory.SharedGroup sharedGro
bootstrap.channel(NettyAllocator.getChannelType());
bootstrap.option(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator());

// The TCP options are re-configured for client connections to RCS remote clusters
// If how options are configured is changed here, please also update RemoteClusterClientBootstrapOptions#configure
// which is used inside SecurityNetty4Transport#getClientBootstrap
bootstrap.option(ChannelOption.TCP_NODELAY, TransportSettings.TCP_NO_DELAY.get(settings));
bootstrap.option(ChannelOption.SO_KEEPALIVE, TransportSettings.TCP_KEEP_ALIVE.get(settings));
if (TransportSettings.TCP_KEEP_ALIVE.get(settings)) {
// Note that Netty logs a warning if it can't set the option
if (TransportSettings.TCP_KEEP_IDLE.get(settings) >= 0) {
bootstrap.option(NioChannelOption.of(NetUtils.getTcpKeepIdleSocketOption()), TransportSettings.TCP_KEEP_IDLE.get(settings));
bootstrap.option(OPTION_TCP_KEEP_IDLE, TransportSettings.TCP_KEEP_IDLE.get(settings));
}
if (TransportSettings.TCP_KEEP_INTERVAL.get(settings) >= 0) {
bootstrap.option(
NioChannelOption.of(NetUtils.getTcpKeepIntervalSocketOption()),
TransportSettings.TCP_KEEP_INTERVAL.get(settings)
);
bootstrap.option(OPTION_TCP_KEEP_INTERVAL, TransportSettings.TCP_KEEP_INTERVAL.get(settings));
}
if (TransportSettings.TCP_KEEP_COUNT.get(settings) >= 0) {
bootstrap.option(
NioChannelOption.of(NetUtils.getTcpKeepCountSocketOption()),
TransportSettings.TCP_KEEP_COUNT.get(settings)
);
bootstrap.option(OPTION_TCP_KEEP_COUNT, TransportSettings.TCP_KEEP_COUNT.get(settings));
}
}

Expand Down Expand Up @@ -278,7 +278,7 @@ protected ChannelHandler getClientChannelInitializer(DiscoveryNode node, Connect
@Override
protected Netty4TcpChannel initiateChannel(DiscoveryNode node, ConnectionProfile connectionProfile) throws IOException {
InetSocketAddress address = node.getAddress().address();
Bootstrap bootstrapWithHandler = clientBootstrap.clone();
Bootstrap bootstrapWithHandler = getClientBootstrap(connectionProfile);
bootstrapWithHandler.handler(getClientChannelInitializer(node, connectionProfile));
bootstrapWithHandler.remoteAddress(address);
ChannelFuture connectFuture = bootstrapWithHandler.connect();
Expand All @@ -301,6 +301,10 @@ protected Netty4TcpChannel initiateChannel(DiscoveryNode node, ConnectionProfile
return nettyChannel;
}

protected Bootstrap getClientBootstrap(ConnectionProfile connectionProfile) {
return clientBootstrap.clone();
}

@Override
protected Netty4TcpServerChannel bind(String name, InetSocketAddress address) {
Channel channel = serverBootstraps.get(name).bind(address).syncUninterruptibly().channel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
*/
package org.elasticsearch.xpack.core.security.transport.netty4;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.handler.ssl.SslHandler;
Expand All @@ -22,11 +24,13 @@
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.ssl.SslConfiguration;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.RemoteClusterPortSettings;
import org.elasticsearch.transport.TcpChannel;
import org.elasticsearch.transport.TransportSettings;
import org.elasticsearch.transport.netty4.Netty4Transport;
Expand Down Expand Up @@ -66,6 +70,7 @@ public class SecurityNetty4Transport extends Netty4Transport {
private final boolean remoteClusterPortEnabled;
private final boolean remoteClusterServerSslEnabled;
private final SslConfiguration remoteClusterClientSslConfiguration;
private final RemoteClusterClientBootstrapOptions remoteClusterClientBootstrapOptions;

public SecurityNetty4Transport(
final Settings settings,
Expand Down Expand Up @@ -104,6 +109,7 @@ public SecurityNetty4Transport(
} else {
this.remoteClusterClientSslConfiguration = null;
}
this.remoteClusterClientBootstrapOptions = RemoteClusterClientBootstrapOptions.fromSettings(settings);
}

@Override
Expand Down Expand Up @@ -143,6 +149,21 @@ protected ChannelHandler getClientChannelInitializer(DiscoveryNode node, Connect
return new SecurityClientChannelInitializer(node, connectionProfile);
}

@Override
protected Bootstrap getClientBootstrap(ConnectionProfile connectionProfile) {
final Bootstrap bootstrap = super.getClientBootstrap(connectionProfile);
if (false == REMOTE_CLUSTER_PROFILE.equals(connectionProfile.getTransportProfile())
|| remoteClusterClientBootstrapOptions.isEmpty()) {
return bootstrap;
}

logger.trace("reconfiguring client bootstrap for remote cluster client connection");
// Only client connections to a new RCS remote cluster can have transport profile of _remote_cluster
// All other client connections use the default transport profile regardless of the transport profile used on the server side.
remoteClusterClientBootstrapOptions.configure(bootstrap);
return bootstrap;
}

@Override
public void onException(TcpChannel channel, Exception e) {
exceptionHandler.accept(channel, e);
Expand Down Expand Up @@ -279,4 +300,163 @@ public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, Sock
super.connect(ctx, remoteAddress, localAddress, connectPromise);
}
}

// This class captures the differences of client side TCP network settings between default and _remote_cluster transport profiles.
// A field will be null if there is no difference between associated settings of the two profiles. It has a non-null value only
// when the _remote_cluster profile has a different value from the default profile.
record RemoteClusterClientBootstrapOptions(
Boolean tcpNoDelay,
Boolean tcpKeepAlive,
Integer tcpKeepIdle,
Integer tcpKeepInterval,
Integer tcpKeepCount,
ByteSizeValue tcpSendBufferSize,
ByteSizeValue tcpReceiveBufferSize,
Boolean tcpReuseAddress
) {

boolean isEmpty() {
return tcpNoDelay == null
&& tcpKeepAlive == null
&& tcpKeepIdle == null
&& tcpKeepInterval == null
&& tcpKeepCount == null
&& tcpSendBufferSize == null
&& tcpReceiveBufferSize == null
&& tcpReuseAddress == null;
}

void configure(Bootstrap bootstrap) {
if (tcpNoDelay != null) {
bootstrap.option(ChannelOption.TCP_NODELAY, tcpNoDelay);
}

if (tcpKeepAlive != null) {
bootstrap.option(ChannelOption.SO_KEEPALIVE, tcpKeepAlive);
if (tcpKeepAlive) {
// Note that Netty logs a warning if it can't set the option
if (tcpKeepIdle != null) {
if (tcpKeepIdle >= 0) {
bootstrap.option(OPTION_TCP_KEEP_IDLE, tcpKeepIdle);
} else {
bootstrap.option(OPTION_TCP_KEEP_IDLE, null);
}
}
if (tcpKeepInterval != null) {
if (tcpKeepInterval >= 0) {
bootstrap.option(OPTION_TCP_KEEP_INTERVAL, tcpKeepInterval);
} else {
bootstrap.option(OPTION_TCP_KEEP_INTERVAL, null);
}
}
if (tcpKeepCount != null) {
if (tcpKeepCount >= 0) {
bootstrap.option(OPTION_TCP_KEEP_COUNT, tcpKeepCount);
} else {
bootstrap.option(OPTION_TCP_KEEP_COUNT, null);
}
}
} else {
bootstrap.option(OPTION_TCP_KEEP_IDLE, null);
bootstrap.option(OPTION_TCP_KEEP_INTERVAL, null);
bootstrap.option(OPTION_TCP_KEEP_COUNT, null);
}
}

if (tcpSendBufferSize != null) {
if (tcpSendBufferSize.getBytes() > 0) {
bootstrap.option(ChannelOption.SO_SNDBUF, Math.toIntExact(tcpSendBufferSize.getBytes()));
} else {
bootstrap.option(ChannelOption.SO_SNDBUF, null);
}
}

if (tcpReceiveBufferSize != null) {
if (tcpReceiveBufferSize.getBytes() > 0) {
bootstrap.option(ChannelOption.SO_RCVBUF, Math.toIntExact(tcpReceiveBufferSize.getBytes()));
} else {
bootstrap.option(ChannelOption.SO_RCVBUF, null);
}
}

if (tcpReuseAddress != null) {
bootstrap.option(ChannelOption.SO_REUSEADDR, tcpReuseAddress);
}
}

static RemoteClusterClientBootstrapOptions fromSettings(Settings settings) {
Boolean tcpNoDelay = RemoteClusterPortSettings.TCP_NO_DELAY.get(settings);
if (tcpNoDelay == TransportSettings.TCP_NO_DELAY.get(settings)) {
tcpNoDelay = null;
}

// It is possible that both default and _remote_cluster enable keepAlive but have different
// values for either keepIdle, keepInterval or keepCount. In this case, we need have a
// non-null value for keepAlive even it is the same between default and _remote_cluster.
Boolean tcpKeepAlive = RemoteClusterPortSettings.TCP_KEEP_ALIVE.get(settings);
Integer tcpKeepIdle = RemoteClusterPortSettings.TCP_KEEP_IDLE.get(settings);
Integer tcpKeepInterval = RemoteClusterPortSettings.TCP_KEEP_INTERVAL.get(settings);
Integer tcpKeepCount = RemoteClusterPortSettings.TCP_KEEP_COUNT.get(settings);
final Boolean defaultTcpKeepAlive = TransportSettings.TCP_KEEP_ALIVE.get(settings);

if (tcpKeepAlive) {
if (defaultTcpKeepAlive) {
// Both profiles have keepAlive enabled, we need to check whether any keepIdle, keepInterval, keepCount is different
if (tcpKeepIdle.equals(TransportSettings.TCP_KEEP_IDLE.get(settings))) {
tcpKeepIdle = null;
}
if (tcpKeepInterval.equals(TransportSettings.TCP_KEEP_INTERVAL.get(settings))) {
tcpKeepInterval = null;
}
if (tcpKeepCount.equals(TransportSettings.TCP_KEEP_COUNT.get(settings))) {
tcpKeepCount = null;
}
if (tcpKeepIdle == null && tcpKeepInterval == null && tcpKeepCount == null) {
// If keepIdle, keepInterval, keepCount are all identical, keepAlive can be null as well.
// That is no need to update anything keepXxx related
tcpKeepAlive = null;
}
}
} else {
if (false == defaultTcpKeepAlive) {
tcpKeepAlive = null;
}
// _remote_cluster has keepAlive disabled, all other keepXxx has no reason to exist
tcpKeepIdle = null;
tcpKeepInterval = null;
tcpKeepCount = null;
}

assert (tcpKeepAlive == null && tcpKeepIdle == null && tcpKeepInterval == null && tcpKeepCount == null)
|| (tcpKeepAlive == false && tcpKeepIdle == null && tcpKeepInterval == null && tcpKeepCount == null)
|| (tcpKeepAlive && (tcpKeepIdle != null || tcpKeepInterval != null || tcpKeepCount != null))
: "keepAlive == true must be accompanied with either keepIdle, keepInterval or keepCount change";

ByteSizeValue tcpSendBufferSize = RemoteClusterPortSettings.TCP_SEND_BUFFER_SIZE.get(settings);
if (tcpSendBufferSize.equals(TransportSettings.TCP_SEND_BUFFER_SIZE.get(settings))) {
tcpSendBufferSize = null;
}

ByteSizeValue tcpReceiveBufferSize = RemoteClusterPortSettings.TCP_RECEIVE_BUFFER_SIZE.get(settings);
if (tcpReceiveBufferSize.equals(TransportSettings.TCP_RECEIVE_BUFFER_SIZE.get(settings))) {
tcpReceiveBufferSize = null;
}

Boolean tcpReuseAddress = RemoteClusterPortSettings.TCP_REUSE_ADDRESS.get(settings);
if (tcpReuseAddress == TransportSettings.TCP_REUSE_ADDRESS.get(settings)) {
tcpReuseAddress = null;
}

return new RemoteClusterClientBootstrapOptions(
tcpNoDelay,
tcpKeepAlive,
tcpKeepIdle,
tcpKeepInterval,
tcpKeepCount,
tcpSendBufferSize,
tcpReceiveBufferSize,
tcpReuseAddress
);
}
}
}

0 comments on commit 7ff4352

Please sign in to comment.