diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java index 30a83eeed7efd..4cb0f89adedac 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java @@ -35,12 +35,15 @@ public class Netty4TcpChannel implements TcpChannel { private final Channel channel; + private final boolean isServer; private final String profile; private final CompletableContext connectContext; private final CompletableContext closeContext = new CompletableContext<>(); + private final ChannelStats stats = new ChannelStats(); - Netty4TcpChannel(Channel channel, String profile, @Nullable ChannelFuture connectFuture) { + Netty4TcpChannel(Channel channel, boolean isServer, String profile, @Nullable ChannelFuture connectFuture) { this.channel = channel; + this.isServer = isServer; this.profile = profile; this.connectContext = new CompletableContext<>(); this.channel.closeFuture().addListener(f -> { @@ -77,6 +80,11 @@ public void close() { channel.close(); } + @Override + public boolean isServerChannel() { + return isServer; + } + @Override public String getProfile() { return profile; @@ -92,6 +100,11 @@ public void addConnectListener(ActionListener listener) { connectContext.addListener(ActionListener.toBiConsumer(listener)); } + @Override + public ChannelStats getChannelStats() { + return stats; + } + @Override public boolean isOpen() { return channel.isOpen(); diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java index f26e55d7ac647..63f5982d3ed43 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java @@ -232,7 +232,7 @@ protected Netty4TcpChannel initiateChannel(DiscoveryNode node) throws IOExceptio } addClosedExceptionLogger(channel); - Netty4TcpChannel nettyChannel = new Netty4TcpChannel(channel, "default", connectFuture); + Netty4TcpChannel nettyChannel = new Netty4TcpChannel(channel, false, "default", connectFuture); channel.attr(CHANNEL_KEY).set(nettyChannel); return nettyChannel; @@ -246,14 +246,6 @@ protected Netty4TcpServerChannel bind(String name, InetSocketAddress address) { return esChannel; } - long successfulPingCount() { - return successfulPings.count(); - } - - long failedPingCount() { - return failedPings.count(); - } - @Override @SuppressForbidden(reason = "debug") protected void stopInternal() { @@ -297,8 +289,7 @@ protected ServerChannelInitializer(String name) { @Override protected void initChannel(Channel ch) throws Exception { addClosedExceptionLogger(ch); - Netty4TcpChannel nettyTcpChannel = new Netty4TcpChannel(ch, name, ch.newSucceededFuture()); - + Netty4TcpChannel nettyTcpChannel = new Netty4TcpChannel(ch, true, name, ch.newSucceededFuture()); ch.attr(CHANNEL_KEY).set(nettyTcpChannel); serverAcceptedChannel(nettyTcpChannel); ch.pipeline().addLast("logging", new ESLoggingHandler()); diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4ScheduledPingTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4ScheduledPingTests.java deleted file mode 100644 index 688e87bba8181..0000000000000 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4ScheduledPingTests.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.transport.netty4; - -import org.elasticsearch.Version; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.lease.Releasables; -import org.elasticsearch.common.network.NetworkService; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.indices.breaker.CircuitBreakerService; -import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.test.transport.MockTransportService; -import org.elasticsearch.threadpool.TestThreadPool; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TcpTransport; -import org.elasticsearch.transport.TransportException; -import org.elasticsearch.transport.TransportRequest; -import org.elasticsearch.transport.TransportRequestOptions; -import org.elasticsearch.transport.TransportResponse; -import org.elasticsearch.transport.TransportResponseHandler; -import org.elasticsearch.transport.TransportService; - -import java.io.IOException; -import java.util.Collections; - -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; - -public class Netty4ScheduledPingTests extends ESTestCase { - - public void testScheduledPing() throws Exception { - ThreadPool threadPool = new TestThreadPool(getClass().getName()); - - Settings settings = Settings.builder() - .put(TcpTransport.PING_SCHEDULE.getKey(), "5ms") - .put(TcpTransport.PORT.getKey(), 0) - .put("cluster.name", "test") - .build(); - - CircuitBreakerService circuitBreakerService = new NoneCircuitBreakerService(); - - NamedWriteableRegistry registry = new NamedWriteableRegistry(Collections.emptyList()); - final Netty4Transport nettyA = new Netty4Transport(settings, Version.CURRENT, threadPool, - new NetworkService(Collections.emptyList()), BigArrays.NON_RECYCLING_INSTANCE, registry, circuitBreakerService); - MockTransportService serviceA = new MockTransportService(settings, nettyA, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, - null); - serviceA.start(); - serviceA.acceptIncomingRequests(); - - final Netty4Transport nettyB = new Netty4Transport(settings, Version.CURRENT, threadPool, - new NetworkService(Collections.emptyList()), BigArrays.NON_RECYCLING_INSTANCE, registry, circuitBreakerService); - MockTransportService serviceB = new MockTransportService(settings, nettyB, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, - null); - - serviceB.start(); - serviceB.acceptIncomingRequests(); - - DiscoveryNode nodeA = serviceA.getLocalDiscoNode(); - DiscoveryNode nodeB = serviceB.getLocalDiscoNode(); - - serviceA.connectToNode(nodeB); - serviceB.connectToNode(nodeA); - - assertBusy(() -> { - assertThat(nettyA.successfulPingCount(), greaterThan(100L)); - assertThat(nettyB.successfulPingCount(), greaterThan(100L)); - }); - assertThat(nettyA.failedPingCount(), equalTo(0L)); - assertThat(nettyB.failedPingCount(), equalTo(0L)); - - serviceA.registerRequestHandler("internal:sayHello", TransportRequest.Empty::new, ThreadPool.Names.GENERIC, - (request, channel) -> { - try { - channel.sendResponse(TransportResponse.Empty.INSTANCE); - } catch (IOException e) { - logger.error("Unexpected failure", e); - fail(e.getMessage()); - } - }); - - int rounds = scaledRandomIntBetween(100, 5000); - for (int i = 0; i < rounds; i++) { - serviceB.submitRequest(nodeA, "internal:sayHello", - TransportRequest.Empty.INSTANCE, TransportRequestOptions.builder().withCompress(randomBoolean()).build(), - new TransportResponseHandler() { - @Override - public TransportResponse.Empty read(StreamInput in) { - return TransportResponse.Empty.INSTANCE; - } - - @Override - public String executor() { - return ThreadPool.Names.GENERIC; - } - - @Override - public void handleResponse(TransportResponse.Empty response) { - } - - @Override - public void handleException(TransportException exp) { - logger.error("Unexpected failure", exp); - fail("got exception instead of a response: " + exp.getMessage()); - } - }).txGet(); - } - - assertBusy(() -> { - assertThat(nettyA.successfulPingCount(), greaterThan(200L)); - assertThat(nettyB.successfulPingCount(), greaterThan(200L)); - }); - assertThat(nettyA.failedPingCount(), equalTo(0L)); - assertThat(nettyB.failedPingCount(), equalTo(0L)); - - Releasables.close(serviceA, serviceB); - terminate(threadPool); - } - -} diff --git a/server/src/main/java/org/elasticsearch/common/AsyncBiFunction.java b/server/src/main/java/org/elasticsearch/common/AsyncBiFunction.java new file mode 100644 index 0000000000000..d5bf7b7504347 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/AsyncBiFunction.java @@ -0,0 +1,29 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.common; + +import org.elasticsearch.action.ActionListener; + +/** + * A {@link java.util.function.BiFunction}-like interface designed to be used with asynchronous executions. + */ +public interface AsyncBiFunction { + + void apply(T t, U u, ActionListener listener); +} diff --git a/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java b/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java index 114bb8c986a07..d5c576784fc00 100644 --- a/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java +++ b/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java @@ -27,10 +27,7 @@ import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.AbstractLifecycleRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.KeyedLock; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.threadpool.ThreadPool; @@ -53,13 +50,13 @@ * the connection when the connection manager is closed. */ public class ConnectionManager implements Closeable { + private static final Logger logger = LogManager.getLogger(ConnectionManager.class); private final ConcurrentMap connectedNodes = ConcurrentCollections.newConcurrentMap(); private final KeyedLock connectionLock = new KeyedLock<>(); private final Transport transport; private final ThreadPool threadPool; - private final TimeValue pingSchedule; private final ConnectionProfile defaultProfile; private final Lifecycle lifecycle = new Lifecycle(); private final AtomicBoolean closed = new AtomicBoolean(false); @@ -67,18 +64,14 @@ public class ConnectionManager implements Closeable { private final DelegatingNodeConnectionListener connectionListener = new DelegatingNodeConnectionListener(); public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool) { - this(settings, transport, threadPool, TcpTransport.PING_SCHEDULE.get(settings)); + this(ConnectionProfile.buildDefaultConnectionProfile(settings), transport, threadPool); } - public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool, TimeValue pingSchedule) { + public ConnectionManager(ConnectionProfile connectionProfile, Transport transport, ThreadPool threadPool) { this.transport = transport; this.threadPool = threadPool; - this.pingSchedule = pingSchedule; - this.defaultProfile = ConnectionProfile.buildDefaultConnectionProfile(settings); + this.defaultProfile = connectionProfile; this.lifecycle.moveToStarted(); - if (pingSchedule.millis() > 0) { - threadPool.schedule(pingSchedule, ThreadPool.Names.GENERIC, new ScheduledPing()); - } } public void addListener(TransportConnectionListener listener) { @@ -251,47 +244,8 @@ private void ensureOpen() { } } - TimeValue getPingSchedule() { - return pingSchedule; - } - - private class ScheduledPing extends AbstractLifecycleRunnable { - - private ScheduledPing() { - super(lifecycle, logger); - } - - @Override - protected void doRunInLifecycle() { - for (Map.Entry entry : connectedNodes.entrySet()) { - Transport.Connection connection = entry.getValue(); - if (connection.sendPing() == false) { - logger.warn("attempted to send ping to connection without support for pings [{}]", connection); - } - } - } - - @Override - protected void onAfterInLifecycle() { - try { - threadPool.schedule(pingSchedule, ThreadPool.Names.GENERIC, this); - } catch (EsRejectedExecutionException ex) { - if (ex.isExecutorShutdown()) { - logger.debug("couldn't schedule new ping execution, executor is shutting down", ex); - } else { - throw ex; - } - } - } - - @Override - public void onFailure(Exception e) { - if (lifecycle.stoppedOrClosed()) { - logger.trace("failed to send ping transport message", e); - } else { - logger.warn("failed to send ping transport message", e); - } - } + ConnectionProfile getConnectionProfile() { + return defaultProfile; } private static final class DelegatingNodeConnectionListener implements TransportConnectionListener { diff --git a/server/src/main/java/org/elasticsearch/transport/ConnectionProfile.java b/server/src/main/java/org/elasticsearch/transport/ConnectionProfile.java index 4fd03d86d9587..bcab23c1fbdd6 100644 --- a/server/src/main/java/org/elasticsearch/transport/ConnectionProfile.java +++ b/server/src/main/java/org/elasticsearch/transport/ConnectionProfile.java @@ -46,7 +46,7 @@ public static ConnectionProfile resolveConnectionProfile(@Nullable ConnectionPro if (profile == null) { return fallbackProfile; } else if (profile.getConnectTimeout() != null && profile.getHandshakeTimeout() != null - && profile.getCompressionEnabled() != null) { + && profile.getPingInterval() != null && profile.getCompressionEnabled() != null) { return profile; } else { ConnectionProfile.Builder builder = new ConnectionProfile.Builder(profile); @@ -56,6 +56,9 @@ public static ConnectionProfile resolveConnectionProfile(@Nullable ConnectionPro if (profile.getHandshakeTimeout() == null) { builder.setHandshakeTimeout(fallbackProfile.getHandshakeTimeout()); } + if (profile.getPingInterval() == null) { + builder.setPingInterval(fallbackProfile.getPingInterval()); + } if (profile.getCompressionEnabled() == null) { builder.setCompressionEnabled(fallbackProfile.getCompressionEnabled()); } @@ -78,6 +81,7 @@ public static ConnectionProfile buildDefaultConnectionProfile(Settings settings) Builder builder = new Builder(); builder.setConnectTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings)); builder.setHandshakeTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings)); + builder.setPingInterval(TcpTransport.PING_SCHEDULE.get(settings)); builder.setCompressionEnabled(Transport.TRANSPORT_TCP_COMPRESS.get(settings)); builder.addConnections(connectionsPerNodeBulk, TransportRequestOptions.Type.BULK); builder.addConnections(connectionsPerNodePing, TransportRequestOptions.Type.PING); @@ -94,7 +98,7 @@ public static ConnectionProfile buildDefaultConnectionProfile(Settings settings) * when opening single use connections */ public static ConnectionProfile buildSingleChannelProfile(TransportRequestOptions.Type channelType) { - return buildSingleChannelProfile(channelType, null, null, null); + return buildSingleChannelProfile(channelType, null, null, null, null); } /** @@ -102,7 +106,7 @@ public static ConnectionProfile buildSingleChannelProfile(TransportRequestOption * settings. */ public static ConnectionProfile buildSingleChannelProfile(TransportRequestOptions.Type channelType, boolean compressionEnabled) { - return buildSingleChannelProfile(channelType, null, null, compressionEnabled); + return buildSingleChannelProfile(channelType, null, null, null, compressionEnabled); } /** @@ -111,7 +115,7 @@ public static ConnectionProfile buildSingleChannelProfile(TransportRequestOption */ public static ConnectionProfile buildSingleChannelProfile(TransportRequestOptions.Type channelType, @Nullable TimeValue connectTimeout, @Nullable TimeValue handshakeTimeout) { - return buildSingleChannelProfile(channelType, connectTimeout, handshakeTimeout, null); + return buildSingleChannelProfile(channelType, connectTimeout, handshakeTimeout, null, null); } /** @@ -119,7 +123,8 @@ public static ConnectionProfile buildSingleChannelProfile(TransportRequestOption * handshake timeouts and compression settings. */ public static ConnectionProfile buildSingleChannelProfile(TransportRequestOptions.Type channelType, @Nullable TimeValue connectTimeout, - @Nullable TimeValue handshakeTimeout, @Nullable Boolean compressionEnabled) { + @Nullable TimeValue handshakeTimeout, @Nullable TimeValue pingInterval, + @Nullable Boolean compressionEnabled) { Builder builder = new Builder(); builder.addConnections(1, channelType); final EnumSet otherTypes = EnumSet.allOf(TransportRequestOptions.Type.class); @@ -131,6 +136,9 @@ public static ConnectionProfile buildSingleChannelProfile(TransportRequestOption if (handshakeTimeout != null) { builder.setHandshakeTimeout(handshakeTimeout); } + if (pingInterval != null) { + builder.setPingInterval(pingInterval); + } if (compressionEnabled != null) { builder.setCompressionEnabled(compressionEnabled); } @@ -141,14 +149,16 @@ public static ConnectionProfile buildSingleChannelProfile(TransportRequestOption private final int numConnections; private final TimeValue connectTimeout; private final TimeValue handshakeTimeout; + private final TimeValue pingInterval; private final Boolean compressionEnabled; private ConnectionProfile(List handles, int numConnections, TimeValue connectTimeout, - TimeValue handshakeTimeout, Boolean compressionEnabled) { + TimeValue handshakeTimeout, TimeValue pingInterval, Boolean compressionEnabled) { this.handles = handles; this.numConnections = numConnections; this.connectTimeout = connectTimeout; this.handshakeTimeout = handshakeTimeout; + this.pingInterval = pingInterval; this.compressionEnabled = compressionEnabled; } @@ -159,9 +169,10 @@ public static class Builder { private final List handles = new ArrayList<>(); private final Set addedTypes = EnumSet.noneOf(TransportRequestOptions.Type.class); private int numConnections = 0; - private Boolean compressionEnabled; private TimeValue connectTimeout; private TimeValue handshakeTimeout; + private Boolean compressionEnabled; + private TimeValue pingInterval; /** create an empty builder */ public Builder() { @@ -175,32 +186,44 @@ public Builder(ConnectionProfile source) { connectTimeout = source.getConnectTimeout(); handshakeTimeout = source.getHandshakeTimeout(); compressionEnabled = source.getCompressionEnabled(); + pingInterval = source.getPingInterval(); } /** * Sets a connect timeout for this connection profile */ - public void setConnectTimeout(TimeValue connectTimeout) { + public Builder setConnectTimeout(TimeValue connectTimeout) { if (connectTimeout.millis() < 0) { throw new IllegalArgumentException("connectTimeout must be non-negative but was: " + connectTimeout); } this.connectTimeout = connectTimeout; + return this; } /** * Sets a handshake timeout for this connection profile */ - public void setHandshakeTimeout(TimeValue handshakeTimeout) { + public Builder setHandshakeTimeout(TimeValue handshakeTimeout) { if (handshakeTimeout.millis() < 0) { throw new IllegalArgumentException("handshakeTimeout must be non-negative but was: " + handshakeTimeout); } this.handshakeTimeout = handshakeTimeout; + return this; + } + + /** + * Sets a ping interval for this connection profile + */ + public Builder setPingInterval(TimeValue pingInterval) { + this.pingInterval = pingInterval; + return this; } /** * Sets compression enabled for this connection profile */ - public void setCompressionEnabled(boolean compressionEnabled) { + public Builder setCompressionEnabled(boolean compressionEnabled) { this.compressionEnabled = compressionEnabled; + return this; } /** @@ -208,7 +231,7 @@ public void setCompressionEnabled(boolean compressionEnabled) { * @param numConnections the number of connections to use in the pool for the given connection types * @param types a set of types that should share the given number of connections */ - public void addConnections(int numConnections, TransportRequestOptions.Type... types) { + public Builder addConnections(int numConnections, TransportRequestOptions.Type... types) { if (types == null || types.length == 0) { throw new IllegalArgumentException("types must not be null"); } @@ -220,6 +243,7 @@ public void addConnections(int numConnections, TransportRequestOptions.Type... t addedTypes.addAll(Arrays.asList(types)); handles.add(new ConnectionTypeHandle(this.numConnections, numConnections, EnumSet.copyOf(Arrays.asList(types)))); this.numConnections += numConnections; + return this; } /** @@ -233,7 +257,7 @@ public ConnectionProfile build() { throw new IllegalStateException("not all types are added for this connection profile - missing types: " + types); } return new ConnectionProfile(Collections.unmodifiableList(handles), numConnections, connectTimeout, handshakeTimeout, - compressionEnabled); + pingInterval, compressionEnabled); } } @@ -252,6 +276,13 @@ public TimeValue getHandshakeTimeout() { return handshakeTimeout; } + /** + * Returns the ping interval or null if no explicit ping interval is set on this profile. + */ + public TimeValue getPingInterval() { + return pingInterval; + } + /** * Returns boolean indicating if compression is enabled or null if no explicit compression * is set on this profile. diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index 69000ade292ea..39b732059885d 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -73,6 +73,7 @@ import java.util.stream.Collectors; import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CLUSTER_COMPRESS; +import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE; /** * Represents a connection to a single remote cluster. In contrast to a local cluster a remote cluster is not joined such that the @@ -93,10 +94,8 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos private final TransportService transportService; private final ConnectionManager connectionManager; - private final ConnectionProfile remoteProfile; private final ConnectedNodes connectedNodes; private final String clusterAlias; - private final boolean compress; private final int maxNumRemoteConnections; private final Predicate nodePredicate; private final ThreadPool threadPool; @@ -113,36 +112,32 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos * @param clusterAlias the configured alias of the cluster to connect to * @param seedNodes a list of seed nodes to discover eligible nodes from * @param transportService the local nodes transport service - * @param connectionManager the connection manager to use for this remote connection * @param maxNumRemoteConnections the maximum number of connections to the remote cluster * @param nodePredicate a predicate to filter eligible remote nodes to connect to * @param proxyAddress the proxy address */ RemoteClusterConnection(Settings settings, String clusterAlias, List> seedNodes, - TransportService transportService, ConnectionManager connectionManager, int maxNumRemoteConnections, - Predicate nodePredicate, String proxyAddress) { + TransportService transportService, int maxNumRemoteConnections, Predicate nodePredicate, + String proxyAddress) { + this(settings, clusterAlias, seedNodes, transportService, maxNumRemoteConnections, nodePredicate, proxyAddress, + createConnectionManager(settings, clusterAlias, transportService)); + } + + // Public for tests to pass a StubbableConnectionManager + RemoteClusterConnection(Settings settings, String clusterAlias, List> seedNodes, + TransportService transportService, int maxNumRemoteConnections, Predicate nodePredicate, + String proxyAddress, ConnectionManager connectionManager) { this.transportService = transportService; this.maxNumRemoteConnections = maxNumRemoteConnections; this.nodePredicate = nodePredicate; this.clusterAlias = clusterAlias; - this.compress = REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace(clusterAlias).get(settings); - ConnectionProfile.Builder builder = new ConnectionProfile.Builder(); - builder.setConnectTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings)); - builder.setHandshakeTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings)); - builder.addConnections(6, TransportRequestOptions.Type.REG, TransportRequestOptions.Type.PING); // TODO make this configurable? - builder.addConnections(0, // we don't want this to be used for anything else but search - TransportRequestOptions.Type.BULK, - TransportRequestOptions.Type.STATE, - TransportRequestOptions.Type.RECOVERY); - builder.setCompressionEnabled(compress); - remoteProfile = builder.build(); - connectedNodes = new ConnectedNodes(clusterAlias); + this.connectionManager = connectionManager; + this.connectedNodes = new ConnectedNodes(clusterAlias); this.seedNodes = Collections.unmodifiableList(seedNodes); this.skipUnavailable = RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE - .getConcreteSettingForNamespace(clusterAlias).get(settings); + .getConcreteSettingForNamespace(clusterAlias).get(settings); this.connectHandler = new ConnectHandler(); this.threadPool = transportService.threadPool; - this.connectionManager = connectionManager; connectionManager.addListener(this); // we register the transport service here as a listener to make sure we notify handlers on disconnect etc. connectionManager.addListener(transportService); @@ -150,6 +145,7 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos initialConnectionTimeout = RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings); } + private static DiscoveryNode maybeAddProxyAddress(String proxyAddress, DiscoveryNode node) { if (proxyAddress == null || proxyAddress.isEmpty()) { return node; @@ -333,11 +329,6 @@ public void sendRequest(long requestId, String action, TransportRequest request, TransportActionProxy.wrapRequest(targetNode, request), options); } - @Override - public boolean sendPing() { - return proxyConnection.sendPing(); - } - @Override public void close() { assert false: "proxy connections must not be closed"; @@ -496,13 +487,13 @@ private void collectRemoteNodes(Iterator> seedNodes, fin logger.debug("[{}] opening connection to seed node: [{}] proxy address: [{}]", clusterAlias, seedNode, proxyAddress); final TransportService.HandshakeResponse handshakeResponse; - ConnectionProfile profile = ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, - compress); + ConnectionProfile profile = ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG); Transport.Connection connection = manager.openConnection(seedNode, profile); boolean success = false; try { try { - handshakeResponse = transportService.handshake(connection, remoteProfile.getHandshakeTimeout().millis(), + ConnectionProfile connectionProfile = connectionManager.getConnectionProfile(); + handshakeResponse = transportService.handshake(connection, connectionProfile.getHandshakeTimeout().millis(), (c) -> remoteClusterName.get() == null ? true : c.equals(remoteClusterName.get())); } catch (IllegalStateException ex) { logger.warn(() -> new ParameterizedMessage("seed node {} cluster name mismatch expected " + @@ -512,7 +503,7 @@ private void collectRemoteNodes(Iterator> seedNodes, fin final DiscoveryNode handshakeNode = maybeAddProxyAddress(proxyAddress, handshakeResponse.getDiscoveryNode()); if (nodePredicate.test(handshakeNode) && connectedNodes.size() < maxNumRemoteConnections) { - manager.connectToNode(handshakeNode, remoteProfile, transportService.connectionValidator(handshakeNode)); + manager.connectToNode(handshakeNode, null, transportService.connectionValidator(handshakeNode)); if (remoteClusterName.get() == null) { assert handshakeResponse.getClusterName().value() != null; remoteClusterName.set(handshakeResponse.getClusterName()); @@ -626,7 +617,7 @@ public void handleResponse(ClusterStateResponse response) { DiscoveryNode node = maybeAddProxyAddress(proxyAddress, n); if (nodePredicate.test(node) && connectedNodes.size() < maxNumRemoteConnections) { try { - connectionManager.connectToNode(node, remoteProfile, + connectionManager.connectToNode(node, null, transportService.connectionValidator(node)); // noop if node is connected connectedNodes.add(node); } catch (ConnectTransportException | IllegalStateException ex) { @@ -822,6 +813,20 @@ private synchronized void ensureIteratorAvailable() { } } + private static ConnectionManager createConnectionManager(Settings settings, String clusterAlias, TransportService transportService) { + ConnectionProfile.Builder builder = new ConnectionProfile.Builder() + .setConnectTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings)) + .setHandshakeTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings)) + .addConnections(6, TransportRequestOptions.Type.REG, TransportRequestOptions.Type.PING) // TODO make this configurable? + // we don't want this to be used for anything else but search + .addConnections(0, TransportRequestOptions.Type.BULK, + TransportRequestOptions.Type.STATE, + TransportRequestOptions.Type.RECOVERY) + .setCompressionEnabled(REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace(clusterAlias).get(settings)) + .setPingInterval(REMOTE_CLUSTER_PING_SCHEDULE.getConcreteSettingForNamespace(clusterAlias).get(settings)); + return new ConnectionManager(builder.build(), transportService.transport, transportService.threadPool); + } + ConnectionManager getConnectionManager() { return connectionManager; } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index c7310241aeb8b..a68c225409dcb 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -223,11 +223,8 @@ private synchronized void updateRemoteClusters(Map listener); + + /** + * Returns stats about this channel + */ + ChannelStats getChannelStats(); + + class ChannelStats { + + private volatile long lastAccessedTime; + + public ChannelStats() { + lastAccessedTime = TimeValue.nsecToMSec(System.nanoTime()); + } + + void markAccessed(long relativeMillisTime) { + lastAccessedTime = relativeMillisTime; + } + + long lastAccessedTime() { + return lastAccessedTime; + } + } } diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index ef83498e7e5aa..04b3d79352f28 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -46,7 +46,6 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.network.CloseableChannel; import org.elasticsearch.common.network.NetworkAddress; @@ -61,6 +60,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.internal.io.IOUtils; @@ -91,7 +91,6 @@ import java.util.Objects; import java.util.Set; import java.util.TreeSet; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; @@ -168,9 +167,6 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements // This is the number of bytes necessary to read the message size private static final int BYTES_NEEDED_FOR_MESSAGE_SIZE = TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE; - private static final int PING_DATA_SIZE = -1; - protected final CounterMetric successfulPings = new CounterMetric(); - protected final CounterMetric failedPings = new CounterMetric(); private static final long NINETY_PER_HEAP_SIZE = (long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * 0.9); private static final BytesReference EMPTY_BYTES_REFERENCE = new BytesArray(new byte[0]); @@ -190,14 +186,14 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements private final ConcurrentMap profileBoundAddresses = newConcurrentMap(); private final Map> serverChannels = newConcurrentMap(); - private final Set acceptedChannels = Collections.newSetFromMap(new ConcurrentHashMap<>()); + private final Set acceptedChannels = ConcurrentCollections.newConcurrentSet(); private final NamedWriteableRegistry namedWriteableRegistry; // this lock is here to make sure we close this transport and disconnect all the client nodes // connections while no connect operations is going on private final ReadWriteLock closeLock = new ReentrantReadWriteLock(); - protected final boolean compressResponses; + private final boolean compressResponses; private volatile BoundTransportAddress boundAddress; private final String transportName; @@ -205,8 +201,9 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements private final MeanMetric transmittedBytesMetric = new MeanMetric(); private volatile Map> requestHandlers = Collections.emptyMap(); private final ResponseHandlers responseHandlers = new ResponseHandlers(); + private final TcpTransportHandshaker handshaker; - private final BytesReference pingMessage; + private final TransportKeepAlive keepAlive; private final String nodeName; public TcpTransport(String transportName, Settings settings, Version version, ThreadPool threadPool, BigArrays bigArrays, @@ -229,6 +226,7 @@ public TcpTransport(String transportName, Settings settings, Version version, T TransportStatus.setHandshake((byte) 0)), (v, features, channel, response, requestId) -> sendResponse(v, features, channel, response, requestId, TcpTransportHandshaker.HANDSHAKE_ACTION_NAME, TransportResponseOptions.EMPTY, TransportStatus.setHandshake((byte) 0))); + this.keepAlive = new TransportKeepAlive(threadPool, this::internalSendMessage); this.nodeName = Node.NODE_NAME_SETTING.get(settings); final Settings defaultFeatures = DEFAULT_FEATURES_SETTING.get(settings); @@ -243,15 +241,6 @@ public TcpTransport(String transportName, Settings settings, Version version, T // use a sorted set to present the features in a consistent order this.features = new TreeSet<>(defaultFeatures.names()).toArray(new String[defaultFeatures.names().size()]); } - - try (BytesStreamOutput out = new BytesStreamOutput()) { - out.writeByte((byte) 'E'); - out.writeByte((byte) 'S'); - out.writeInt(TcpTransport.PING_DATA_SIZE); - pingMessage = out.bytes(); - } catch (IOException e) { - throw new AssertionError(e.getMessage(), e); // won't happen - } } @Override @@ -319,31 +308,6 @@ public TcpChannel channel(TransportRequestOptions.Type type) { return connectionTypeHandle.getChannel(channels); } - @Override - public boolean sendPing() { - for (TcpChannel channel : channels) { - internalSendMessage(channel, pingMessage, new SendMetricListener(pingMessage.length()) { - @Override - protected void innerInnerOnResponse(Void v) { - successfulPings.inc(); - } - - @Override - protected void innerOnFailure(Exception e) { - if (channel.isOpen()) { - logger.debug(() -> new ParameterizedMessage("[{}] failed to send ping transport message", node), e); - failedPings.inc(); - } else { - logger.trace(() -> - new ParameterizedMessage("[{}] failed to send ping transport message (channel closed)", node), e); - } - - } - }); - } - return true; - } - @Override public void close() { if (isClosing.compareAndSet(false, true)) { @@ -498,7 +462,7 @@ protected void bindServer(ProfileSettings profileSettings) { } } - protected InetSocketAddress bindToPort(final String name, final InetAddress hostAddress, String port) { + private InetSocketAddress bindToPort(final String name, final InetAddress hostAddress, String port) { PortsRange portsRange = new PortsRange(port); final AtomicReference lastException = new AtomicReference<>(); final AtomicReference boundSocket = new AtomicReference<>(); @@ -670,6 +634,8 @@ protected final void doStop() { threadPool.generic().execute(() -> { closeLock.writeLock().lock(); try { + keepAlive.close(); + // first stop to accept any incoming connections so nobody can connect to this transport for (Map.Entry> entry : serverChannels.entrySet()) { String profile = entry.getKey(); @@ -729,14 +695,14 @@ public void onException(TcpChannel channel, Exception e) { // in case we are able to return data, serialize the exception content and sent it back to the client if (channel.isOpen()) { BytesArray message = new BytesArray(e.getMessage().getBytes(StandardCharsets.UTF_8)); - final SendMetricListener listener = new SendMetricListener(message.length()) { + ActionListener listener = new ActionListener() { @Override - protected void innerInnerOnResponse(Void v) { + public void onResponse(Void aVoid) { CloseableChannel.closeChannel(channel); } @Override - protected void innerOnFailure(Exception e) { + public void onFailure(Exception e) { logger.debug("failed to send message to httpOnTransport channel", e); CloseableChannel.closeChannel(channel); } @@ -745,7 +711,7 @@ protected void innerOnFailure(Exception e) { // elasticsearch binary message. We are just serializing an exception here. Not formatting it // as an elasticsearch transport message. try { - channel.sendMessage(message, listener); + channel.sendMessage(message, new SendListener(channel, message.length(), listener)); } catch (Exception ex) { listener.onFailure(ex); } @@ -774,6 +740,8 @@ protected void onNonChannelException(Exception exception) { protected void serverAcceptedChannel(TcpChannel channel) { boolean addedOnThisCall = acceptedChannels.add(channel); assert addedOnThisCall : "Channel should only be added to accepted channel set once"; + // Mark the channel init time + channel.getChannelStats().markAccessed(threadPool.relativeTimeInMillis()); channel.addCloseListener(ActionListener.wrap(() -> acceptedChannels.remove(channel))); logger.trace(() -> new ParameterizedMessage("Tcp transport channel accepted: {}", channel)); } @@ -835,9 +803,9 @@ private void sendRequestToChannel(final DiscoveryNode node, final TcpChannel cha BytesReference message = buildMessage(requestId, status, node.getVersion(), request, stream); final TransportRequestOptions finalOptions = options; // this might be called in a different thread - SendListener onRequestSent = new SendListener(channel, stream, - () -> messageListener.onRequestSent(node, requestId, action, request, finalOptions), message.length()); - internalSendMessage(channel, message, onRequestSent); + ReleaseListener releaseListener = new ReleaseListener(stream, + () -> messageListener.onRequestSent(node, requestId, action, request, finalOptions)); + internalSendMessage(channel, message, releaseListener); addedReleaseListener = true; } finally { if (!addedReleaseListener) { @@ -849,9 +817,10 @@ private void sendRequestToChannel(final DiscoveryNode node, final TcpChannel cha /** * sends a message to the given channel, using the given callbacks. */ - private void internalSendMessage(TcpChannel channel, BytesReference message, SendMetricListener listener) { + private void internalSendMessage(TcpChannel channel, BytesReference message, ActionListener listener) { + channel.getChannelStats().markAccessed(threadPool.relativeTimeInMillis()); try { - channel.sendMessage(message, listener); + channel.sendMessage(message, new SendListener(channel, message.length(), listener)); } catch (Exception ex) { // call listener to ensure that any resources are released listener.onFailure(ex); @@ -889,9 +858,9 @@ public void sendErrorResponse( final BytesReference bytes = stream.bytes(); final BytesReference header = buildHeader(requestId, status, nodeVersion, bytes.length()); CompositeBytesReference message = new CompositeBytesReference(header, bytes); - SendListener onResponseSent = new SendListener(channel, null, - () -> messageListener.onResponseSent(requestId, action, error), message.length()); - internalSendMessage(channel, message, onResponseSent); + ReleaseListener releaseListener = new ReleaseListener(null, + () -> messageListener.onResponseSent(requestId, action, error)); + internalSendMessage(channel, message, releaseListener); } } @@ -939,9 +908,9 @@ private void sendResponse( final TransportResponseOptions finalOptions = options; // this might be called in a different thread - SendListener listener = new SendListener(channel, stream, - () -> messageListener.onResponseSent(requestId, action, response, finalOptions), message.length()); - internalSendMessage(channel, message, listener); + ReleaseListener releaseListener = new ReleaseListener(stream, + () -> messageListener.onResponseSent(requestId, action, response, finalOptions)); + internalSendMessage(channel, message, releaseListener); addedReleaseListener = true; } finally { if (!addedReleaseListener) { @@ -1003,9 +972,12 @@ private BytesReference buildMessage(long requestId, byte status, Version nodeVer */ public void inboundMessage(TcpChannel channel, BytesReference message) { try { + channel.getChannelStats().markAccessed(threadPool.relativeTimeInMillis()); // Message length of 0 is a ping if (message.length() != 0) { messageReceived(message, channel); + } else { + keepAlive.receiveKeepAlive(channel); } } catch (Exception e) { onException(channel, e); @@ -1099,7 +1071,7 @@ private static int readHeaderBuffer(BytesReference headerBuffer) throws IOExcept messageLength = input.readInt(); } - if (messageLength == TcpTransport.PING_DATA_SIZE) { + if (messageLength == TransportKeepAlive.PING_DATA_SIZE) { // This is a ping return 0; } @@ -1405,6 +1377,10 @@ public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionP handshaker.sendHandshake(responseHandlers.newRequestId(), node, channel, profile.getHandshakeTimeout(), listener); } + final TransportKeepAlive getKeepAlive() { + return keepAlive; + } + final int getNumPendingHandshakes() { return handshaker.getNumPendingHandshakes(); } @@ -1427,42 +1403,48 @@ protected final void ensureOpen() { /** * This listener increments the transmitted bytes metric on success. */ - private abstract class SendMetricListener extends NotifyOnceListener { + private class SendListener extends NotifyOnceListener { + + private final TcpChannel channel; private final long messageSize; + private final ActionListener delegateListener; - private SendMetricListener(long messageSize) { + private SendListener(TcpChannel channel, long messageSize, ActionListener delegateListener) { + this.channel = channel; this.messageSize = messageSize; + this.delegateListener = delegateListener; } @Override - protected final void innerOnResponse(Void object) { + protected void innerOnResponse(Void v) { transmittedBytesMetric.inc(messageSize); - innerInnerOnResponse(object); + delegateListener.onResponse(v); } - protected abstract void innerInnerOnResponse(Void object); + @Override + protected void innerOnFailure(Exception e) { + logger.warn(() -> new ParameterizedMessage("send message failed [channel: {}]", channel), e); + delegateListener.onFailure(e); + } } - private final class SendListener extends SendMetricListener { - private final TcpChannel channel; + private class ReleaseListener implements ActionListener { + private final Closeable optionalCloseable; private final Runnable transportAdaptorCallback; - private SendListener(TcpChannel channel, Closeable optionalCloseable, Runnable transportAdaptorCallback, long messageLength) { - super(messageLength); - this.channel = channel; + private ReleaseListener(Closeable optionalCloseable, Runnable transportAdaptorCallback) { this.optionalCloseable = optionalCloseable; this.transportAdaptorCallback = transportAdaptorCallback; } @Override - protected void innerInnerOnResponse(Void v) { + public void onResponse(Void aVoid) { closeAndCallback(null); } @Override - protected void innerOnFailure(Exception e) { - logger.warn(() -> new ParameterizedMessage("send message failed [channel: {}]", channel), e); + public void onFailure(Exception e) { closeAndCallback(e); } @@ -1618,7 +1600,13 @@ public void onResponse(Void v) { @Override public void onResponse(Version version) { NodeChannels nodeChannels = new NodeChannels(node, channels, connectionProfile, version); - nodeChannels.channels.forEach(ch -> ch.addCloseListener(ActionListener.wrap(nodeChannels::close))); + long relativeMillisTime = threadPool.relativeTimeInMillis(); + nodeChannels.channels.forEach(ch -> { + // Mark the channel init time + ch.getChannelStats().markAccessed(relativeMillisTime); + ch.addCloseListener(ActionListener.wrap(nodeChannels::close)); + }); + keepAlive.registerNodeConnection(nodeChannels.channels, connectionProfile); listener.onResponse(nodeChannels); } diff --git a/server/src/main/java/org/elasticsearch/transport/Transport.java b/server/src/main/java/org/elasticsearch/transport/Transport.java index e13213dca066a..011c3214dfbef 100644 --- a/server/src/main/java/org/elasticsearch/transport/Transport.java +++ b/server/src/main/java/org/elasticsearch/transport/Transport.java @@ -115,10 +115,6 @@ interface Connection extends Closeable { void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException; - default boolean sendPing() { - return false; - } - /** * The listener's {@link ActionListener#onResponse(Object)} method will be called when this * connection is closed. No implementations currently throw an exception during close, so diff --git a/server/src/main/java/org/elasticsearch/transport/TransportKeepAlive.java b/server/src/main/java/org/elasticsearch/transport/TransportKeepAlive.java new file mode 100644 index 0000000000000..b8d06e7e1174e --- /dev/null +++ b/server/src/main/java/org/elasticsearch/transport/TransportKeepAlive.java @@ -0,0 +1,210 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.transport; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.AsyncBiFunction; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.component.Lifecycle; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.metrics.CounterMetric; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.AbstractLifecycleRunnable; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Implements the scheduling and sending of keep alive pings. Client channels send keep alive pings to the + * server and server channels respond. Pings are only sent at the scheduled time if the channel did not send + * and receive a message since the last ping. + */ +final class TransportKeepAlive implements Closeable { + + static final int PING_DATA_SIZE = -1; + + private final Logger logger = LogManager.getLogger(TransportKeepAlive.class); + private final CounterMetric successfulPings = new CounterMetric(); + private final CounterMetric failedPings = new CounterMetric(); + private final ConcurrentMap pingIntervals = ConcurrentCollections.newConcurrentMap(); + private final Lifecycle lifecycle = new Lifecycle(); + private final ThreadPool threadPool; + private final AsyncBiFunction pingSender; + private final BytesReference pingMessage; + + TransportKeepAlive(ThreadPool threadPool, AsyncBiFunction pingSender) { + this.threadPool = threadPool; + this.pingSender = pingSender; + + try (BytesStreamOutput out = new BytesStreamOutput()) { + out.writeByte((byte) 'E'); + out.writeByte((byte) 'S'); + out.writeInt(PING_DATA_SIZE); + pingMessage = out.bytes(); + } catch (IOException e) { + throw new AssertionError(e.getMessage(), e); // won't happen + } + + this.lifecycle.moveToStarted(); + } + + void registerNodeConnection(List nodeChannels, ConnectionProfile connectionProfile) { + TimeValue pingInterval = connectionProfile.getPingInterval(); + if (pingInterval.millis() < 0) { + return; + } + + final ScheduledPing scheduledPing = pingIntervals.computeIfAbsent(pingInterval, ScheduledPing::new); + scheduledPing.ensureStarted(); + + for (TcpChannel channel : nodeChannels) { + scheduledPing.addChannel(channel); + + channel.addCloseListener(ActionListener.wrap(() -> { + scheduledPing.removeChannel(channel); + })); + } + } + + /** + * Called when a keep alive ping is received. If the channel that received the keep alive ping is a + * server channel, a ping is sent back. If the channel that received the keep alive is a client channel, + * this method does nothing as the client initiated the ping in the first place. + * + * @param channel that received the keep alive ping + */ + void receiveKeepAlive(TcpChannel channel) { + // The client-side initiates pings and the server-side responds. So if this is a client channel, this + // method is a no-op. + if (channel.isServerChannel()) { + sendPing(channel); + } + } + + long successfulPingCount() { + return successfulPings.count(); + } + + long failedPingCount() { + return failedPings.count(); + } + + private void sendPing(TcpChannel channel) { + pingSender.apply(channel, pingMessage, new ActionListener() { + + @Override + public void onResponse(Void v) { + successfulPings.inc(); + } + + @Override + public void onFailure(Exception e) { + if (channel.isOpen()) { + logger.debug(() -> new ParameterizedMessage("[{}] failed to send transport ping", channel), e); + failedPings.inc(); + } else { + logger.trace(() -> new ParameterizedMessage("[{}] failed to send transport ping (channel closed)", channel), e); + } + } + }); + } + + @Override + public void close() { + lifecycle.moveToStopped(); + lifecycle.moveToClosed(); + } + + private class ScheduledPing extends AbstractLifecycleRunnable { + + private final TimeValue pingInterval; + + private final Set channels = ConcurrentCollections.newConcurrentSet(); + + private final AtomicBoolean isStarted = new AtomicBoolean(false); + private volatile long lastPingRelativeMillis; + + private ScheduledPing(TimeValue pingInterval) { + super(lifecycle, logger); + this.pingInterval = pingInterval; + this.lastPingRelativeMillis = threadPool.relativeTimeInMillis(); + } + + void ensureStarted() { + if (isStarted.get() == false && isStarted.compareAndSet(false, true)) { + threadPool.schedule(pingInterval, ThreadPool.Names.GENERIC, this); + } + } + + void addChannel(TcpChannel channel) { + channels.add(channel); + } + + void removeChannel(TcpChannel channel) { + channels.remove(channel); + } + + @Override + protected void doRunInLifecycle() { + for (TcpChannel channel : channels) { + // In the future it is possible that we may want to kill a channel if we have not read from + // the channel since the last ping. However, this will need to be backwards compatible with + // pre-6.6 nodes that DO NOT respond to pings + if (needsKeepAlivePing(channel)) { + sendPing(channel); + } + } + this.lastPingRelativeMillis = threadPool.relativeTimeInMillis(); + } + + @Override + protected void onAfterInLifecycle() { + try { + threadPool.schedule(pingInterval, ThreadPool.Names.GENERIC, this); + } catch (EsRejectedExecutionException ex) { + if (ex.isExecutorShutdown()) { + logger.debug("couldn't schedule new ping execution, executor is shutting down", ex); + } else { + throw ex; + } + } + } + + @Override + public void onFailure(Exception e) { + logger.warn("failed to send ping transport message", e); + } + + private boolean needsKeepAlivePing(TcpChannel channel) { + TcpChannel.ChannelStats stats = channel.getChannelStats(); + long accessedDelta = stats.lastAccessedTime() - lastPingRelativeMillis; + return accessedDelta <= 0; + } + } +} diff --git a/server/src/test/java/org/elasticsearch/transport/ConnectionManagerTests.java b/server/src/test/java/org/elasticsearch/transport/ConnectionManagerTests.java index 03ecf65737d58..976f0e905c050 100644 --- a/server/src/test/java/org/elasticsearch/transport/ConnectionManagerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/ConnectionManagerTests.java @@ -54,7 +54,9 @@ public void createConnectionManager() { transport = mock(Transport.class); connectionManager = new ConnectionManager(settings, transport, threadPool); TimeValue oneSecond = new TimeValue(1000); - connectionProfile = ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, oneSecond, oneSecond, false); + TimeValue oneMinute = TimeValue.timeValueMinutes(1); + connectionProfile = ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, oneSecond, oneSecond, + oneMinute, false); } @After diff --git a/server/src/test/java/org/elasticsearch/transport/ConnectionProfileTests.java b/server/src/test/java/org/elasticsearch/transport/ConnectionProfileTests.java index 8d053f7ade630..4f380de08ed1c 100644 --- a/server/src/test/java/org/elasticsearch/transport/ConnectionProfileTests.java +++ b/server/src/test/java/org/elasticsearch/transport/ConnectionProfileTests.java @@ -36,6 +36,7 @@ public void testBuildConnectionProfile() { ConnectionProfile.Builder builder = new ConnectionProfile.Builder(); TimeValue connectTimeout = TimeValue.timeValueMillis(randomIntBetween(1, 10)); TimeValue handshakeTimeout = TimeValue.timeValueMillis(randomIntBetween(1, 10)); + TimeValue pingInterval = TimeValue.timeValueMillis(randomIntBetween(1, 10)); boolean compressionEnabled = randomBoolean(); final boolean setConnectTimeout = randomBoolean(); if (setConnectTimeout) { @@ -49,6 +50,10 @@ public void testBuildConnectionProfile() { if (setCompress) { builder.setCompressionEnabled(compressionEnabled); } + final boolean setPingInterval = randomBoolean(); + if (setPingInterval) { + builder.setPingInterval(pingInterval); + } builder.addConnections(1, TransportRequestOptions.Type.BULK); builder.addConnections(2, TransportRequestOptions.Type.STATE, TransportRequestOptions.Type.RECOVERY); builder.addConnections(3, TransportRequestOptions.Type.PING); @@ -82,6 +87,12 @@ public void testBuildConnectionProfile() { assertNull(build.getCompressionEnabled()); } + if (setPingInterval) { + assertEquals(pingInterval, build.getPingInterval()); + } else { + assertNull(build.getPingInterval()); + } + List list = new ArrayList<>(10); for (int i = 0; i < 10; i++) { list.add(i); @@ -160,7 +171,10 @@ public void testConnectionProfileResolve() { if (connectionHandshakeSet) { builder.setHandshakeTimeout(TimeValue.timeValueMillis(randomNonNegativeLong())); } - + final boolean pingIntervalSet = randomBoolean(); + if (pingIntervalSet) { + builder.setPingInterval(TimeValue.timeValueMillis(randomNonNegativeLong())); + } final boolean connectionCompressSet = randomBoolean(); if (connectionCompressSet) { builder.setCompressionEnabled(randomBoolean()); @@ -176,6 +190,8 @@ public void testConnectionProfileResolve() { equalTo(connectionTimeoutSet ? profile.getConnectTimeout() : defaultProfile.getConnectTimeout())); assertThat(resolved.getHandshakeTimeout(), equalTo(connectionHandshakeSet ? profile.getHandshakeTimeout() : defaultProfile.getHandshakeTimeout())); + assertThat(resolved.getPingInterval(), + equalTo(pingIntervalSet ? profile.getPingInterval() : defaultProfile.getPingInterval())); assertThat(resolved.getCompressionEnabled(), equalTo(connectionCompressSet ? profile.getCompressionEnabled() : defaultProfile.getCompressionEnabled())); } @@ -191,6 +207,7 @@ public void testDefaultConnectionProfile() { assertEquals(TransportService.TCP_CONNECT_TIMEOUT.get(Settings.EMPTY), profile.getConnectTimeout()); assertEquals(TransportService.TCP_CONNECT_TIMEOUT.get(Settings.EMPTY), profile.getHandshakeTimeout()); assertEquals(Transport.TRANSPORT_TCP_COMPRESS.get(Settings.EMPTY), profile.getCompressionEnabled()); + assertEquals(TcpTransport.PING_SCHEDULE.get(Settings.EMPTY), profile.getPingInterval()); profile = ConnectionProfile.buildDefaultConnectionProfile(Settings.builder().put("node.master", false).build()); assertEquals(12, profile.getNumConnections()); diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index 66ab81121072e..a4f81a659c60c 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -59,6 +59,7 @@ import org.elasticsearch.test.VersionUtils; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.test.transport.StubbableConnectionManager; import org.elasticsearch.test.transport.StubbableTransport; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; @@ -170,10 +171,11 @@ public void testRemoteProfileIsUsedForLocalCluster() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { + Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true, null)) { + ConnectionManager connectionManager = connection.getConnectionManager(); updateSeedNodes(connection, Arrays.asList(() -> seedNode)); - assertTrue(service.nodeConnected(seedNode)); - assertTrue(service.nodeConnected(discoverableNode)); + assertTrue(connectionManager.nodeConnected(seedNode)); + assertTrue(connectionManager.nodeConnected(discoverableNode)); assertTrue(connection.assertNoRunningConnections()); PlainTransportFuture futureHandler = new PlainTransportFuture<>( new FutureTransportResponseHandler() { @@ -185,7 +187,7 @@ public ClusterSearchShardsResponse read(StreamInput in) throws IOException { TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.BULK) .build(); IllegalStateException ise = (IllegalStateException) expectThrows(SendRequestTransportException.class, () -> { - service.sendRequest(discoverableNode, + service.sendRequest(connectionManager.getConnection(discoverableNode), ClusterSearchShardsAction.NAME, new ClusterSearchShardsRequest(), options, futureHandler); futureHandler.txGet(); }).getCause(); @@ -211,10 +213,11 @@ public void testRemoteProfileIsUsedForRemoteCluster() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { + Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true, null)) { + ConnectionManager connectionManager = connection.getConnectionManager(); updateSeedNodes(connection, Arrays.asList(() -> seedNode)); - assertTrue(service.nodeConnected(seedNode)); - assertTrue(service.nodeConnected(discoverableNode)); + assertTrue(connectionManager.nodeConnected(seedNode)); + assertTrue(connectionManager.nodeConnected(discoverableNode)); assertTrue(connection.assertNoRunningConnections()); PlainTransportFuture futureHandler = new PlainTransportFuture<>( new FutureTransportResponseHandler() { @@ -226,7 +229,7 @@ public ClusterSearchShardsResponse read(StreamInput in) throws IOException { TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.BULK) .build(); IllegalStateException ise = (IllegalStateException) expectThrows(SendRequestTransportException.class, () -> { - service.sendRequest(discoverableNode, + service.sendRequest(connectionManager.getConnection(discoverableNode), ClusterSearchShardsAction.NAME, new ClusterSearchShardsRequest(), options, futureHandler); futureHandler.txGet(); }).getCause(); @@ -263,10 +266,11 @@ public void testDiscoverSingleNode() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { + Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true, null)) { + ConnectionManager connectionManager = connection.getConnectionManager(); updateSeedNodes(connection, Arrays.asList(() -> seedNode)); - assertTrue(service.nodeConnected(seedNode)); - assertTrue(service.nodeConnected(discoverableNode)); + assertTrue(connectionManager.nodeConnected(seedNode)); + assertTrue(connectionManager.nodeConnected(discoverableNode)); assertTrue(connection.assertNoRunningConnections()); } } @@ -292,11 +296,12 @@ public void testDiscoverSingleNodeWithIncompatibleSeed() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { + seedNodes, service, Integer.MAX_VALUE, n -> true, null)) { + ConnectionManager connectionManager = connection.getConnectionManager(); updateSeedNodes(connection, seedNodes); - assertTrue(service.nodeConnected(seedNode)); - assertTrue(service.nodeConnected(discoverableNode)); - assertFalse(service.nodeConnected(incompatibleSeedNode)); + assertTrue(connectionManager.nodeConnected(seedNode)); + assertTrue(connectionManager.nodeConnected(discoverableNode)); + assertFalse(connectionManager.nodeConnected(incompatibleSeedNode)); assertTrue(connection.assertNoRunningConnections()); } } @@ -319,15 +324,16 @@ public void testNodeDisconnected() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { + Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true, null)) { + ConnectionManager connectionManager = connection.getConnectionManager(); updateSeedNodes(connection, Arrays.asList(() -> seedNode)); - assertTrue(service.nodeConnected(seedNode)); - assertTrue(service.nodeConnected(discoverableNode)); - assertFalse(service.nodeConnected(spareNode)); + assertTrue(connectionManager.nodeConnected(seedNode)); + assertTrue(connectionManager.nodeConnected(discoverableNode)); + assertFalse(connectionManager.nodeConnected(spareNode)); knownNodes.add(spareNode); CountDownLatch latchDisconnect = new CountDownLatch(1); CountDownLatch latchConnected = new CountDownLatch(1); - service.addConnectionListener(new TransportConnectionListener() { + connectionManager.addListener(new TransportConnectionListener() { @Override public void onNodeDisconnected(DiscoveryNode node) { if (node.equals(discoverableNode)) { @@ -347,7 +353,7 @@ public void onNodeConnected(DiscoveryNode node) { // now make sure we try to connect again to other nodes once we got disconnected assertTrue(latchDisconnect.await(10, TimeUnit.SECONDS)); assertTrue(latchConnected.await(10, TimeUnit.SECONDS)); - assertTrue(service.nodeConnected(spareNode)); + assertTrue(connectionManager.nodeConnected(spareNode)); } } } @@ -368,15 +374,15 @@ public void testFilterDiscoveredNodes() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, - n -> n.equals(rejectedNode) == false, null)) { + Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> n.equals(rejectedNode) == false, null)) { + ConnectionManager connectionManager = connection.getConnectionManager(); updateSeedNodes(connection, Arrays.asList(() -> seedNode)); if (rejectedNode.equals(seedNode)) { - assertFalse(service.nodeConnected(seedNode)); - assertTrue(service.nodeConnected(discoverableNode)); + assertFalse(connectionManager.nodeConnected(seedNode)); + assertTrue(connectionManager.nodeConnected(discoverableNode)); } else { - assertTrue(service.nodeConnected(seedNode)); - assertFalse(service.nodeConnected(discoverableNode)); + assertTrue(connectionManager.nodeConnected(seedNode)); + assertFalse(connectionManager.nodeConnected(discoverableNode)); } assertTrue(connection.assertNoRunningConnections()); } @@ -412,9 +418,10 @@ public void testConnectWithIncompatibleTransports() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { + Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true, null)) { + ConnectionManager connectionManager = connection.getConnectionManager(); expectThrows(Exception.class, () -> updateSeedNodes(connection, Arrays.asList(() -> seedNode))); - assertFalse(service.nodeConnected(seedNode)); + assertFalse(connectionManager.nodeConnected(seedNode)); assertTrue(connection.assertNoRunningConnections()); } } @@ -437,7 +444,7 @@ public void testRemoteConnectionVersionMatchesTransportConnectionVersion() throw assertThat(seedNode.getVersion(), not(equalTo(oldVersionNode.getVersion()))); try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { - final Transport.Connection seedConnection = new Transport.Connection() { + final Transport.Connection seedConnection = new CloseableConnection() { @Override public DiscoveryNode getNode() { return seedNode; @@ -448,34 +455,23 @@ public void sendRequest(long requestId, String action, TransportRequest request, throws IOException, TransportException { // no-op } - - @Override - public void addCloseListener(ActionListener listener) { - // no-op - } - - @Override - public boolean isClosed() { - return false; - } - - @Override - public void close() { - // no-op - } }; - service.addGetConnectionBehavior(seedNode.getAddress(), (connectionManager, discoveryNode) -> { + ConnectionManager delegate = new ConnectionManager(Settings.EMPTY, service.transport, threadPool); + StubbableConnectionManager connectionManager = new StubbableConnectionManager(delegate, Settings.EMPTY, service.transport, + threadPool); + + connectionManager.addConnectBehavior(seedNode.getAddress(), (cm, discoveryNode) -> { if (discoveryNode == seedNode) { return seedConnection; } - return connectionManager.getConnection(discoveryNode); + return cm.getConnection(discoveryNode); }); service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { + Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true, null, connectionManager)) { connection.addConnectedNode(seedNode); for (DiscoveryNode node : knownNodes) { final Transport.Connection transportConnection = connection.getConnection(node); @@ -488,7 +484,7 @@ public void close() { } @SuppressForbidden(reason = "calls getLocalHost here but it's fine in this case") - public void testSlowNodeCanBeCanceled() throws IOException, InterruptedException { + public void testSlowNodeCanBeCancelled() throws IOException, InterruptedException { try (ServerSocket socket = new MockServerSocket()) { socket.bind(new InetSocketAddress(InetAddress.getLocalHost(), 0), 1); socket.setReuseAddress(true); @@ -518,7 +514,7 @@ public void run() { CountDownLatch listenerCalled = new CountDownLatch(1); AtomicReference exceptionReference = new AtomicReference<>(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { + Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true, null)) { ActionListener listener = ActionListener.wrap(x -> { listenerCalled.countDown(); fail("expected exception"); @@ -555,7 +551,7 @@ public void testFetchShards() throws Exception { service.acceptIncomingRequests(); List> nodes = Collections.singletonList(() -> seedNode); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - nodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { + nodes, service, Integer.MAX_VALUE, n -> true, null)) { if (randomBoolean()) { updateSeedNodes(connection, nodes); } @@ -595,7 +591,7 @@ public void testFetchShardsThreadContextHeader() throws Exception { service.acceptIncomingRequests(); List> nodes = Collections.singletonList(() -> seedNode); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - nodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { + nodes, service, Integer.MAX_VALUE, n -> true, null)) { SearchRequest request = new SearchRequest("test-index"); Thread[] threads = new Thread[10]; for (int i = 0; i < threads.length; i++) { @@ -649,8 +645,8 @@ public void testFetchShardsSkipUnavailable() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Collections.singletonList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, - n -> true, null)) { + Collections.singletonList(() -> seedNode), service, Integer.MAX_VALUE, n -> true, null)) { + ConnectionManager connectionManager = connection.getConnectionManager(); SearchRequest request = new SearchRequest("test-index"); ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest("test-index") @@ -671,7 +667,7 @@ public void testFetchShardsSkipUnavailable() throws Exception { } CountDownLatch disconnectedLatch = new CountDownLatch(1); - service.addConnectionListener(new TransportConnectionListener() { + connectionManager.addListener(new TransportConnectionListener() { @Override public void onNodeDisconnected(DiscoveryNode node) { if (node.equals(seedNode)) { @@ -760,7 +756,8 @@ public void testTriggerUpdatesConcurrently() throws IOException, InterruptedExce service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { + seedNodes, service, Integer.MAX_VALUE, n -> true, null)) { + ConnectionManager connectionManager = connection.getConnectionManager(); int numThreads = randomIntBetween(4, 10); Thread[] threads = new Thread[numThreads]; CyclicBarrier barrier = new CyclicBarrier(numThreads); @@ -811,9 +808,9 @@ public void run() { for (int i = 0; i < threads.length; i++) { threads[i].join(); } - assertTrue(service.nodeConnected(seedNode)); - assertTrue(service.nodeConnected(discoverableNode)); - assertTrue(service.nodeConnected(seedNode1)); + assertTrue(connectionManager.nodeConnected(seedNode)); + assertTrue(connectionManager.nodeConnected(discoverableNode)); + assertTrue(connectionManager.nodeConnected(seedNode1)); assertTrue(connection.assertNoRunningConnections()); } } @@ -839,7 +836,7 @@ public void testCloseWhileConcurrentlyConnecting() throws IOException, Interrupt service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { + seedNodes, service, Integer.MAX_VALUE, n -> true, null)) { int numThreads = randomIntBetween(4, 10); Thread[] threads = new Thread[numThreads]; CyclicBarrier barrier = new CyclicBarrier(numThreads + 1); @@ -902,7 +899,6 @@ public void run() { threads[i].start(); } barrier.await(); - connection.close(); } } } @@ -944,7 +940,7 @@ public void testGetConnectionInfo() throws Exception { service.acceptIncomingRequests(); int maxNumConnections = randomIntBetween(1, 5); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, service.connectionManager(), maxNumConnections, n -> true, null)) { + seedNodes, service, maxNumConnections, n -> true, null)) { // test no nodes connected RemoteConnectionInfo remoteConnectionInfo = assertSerialization(getRemoteConnectionInfo(connection)); assertNotNull(remoteConnectionInfo); @@ -1139,9 +1135,10 @@ public void testEnsureConnected() throws IOException, InterruptedException { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { - assertFalse(service.nodeConnected(seedNode)); - assertFalse(service.nodeConnected(discoverableNode)); + Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true, null)) { + ConnectionManager connectionManager = connection.getConnectionManager(); + assertFalse(connectionManager.nodeConnected(seedNode)); + assertFalse(connectionManager.nodeConnected(discoverableNode)); assertTrue(connection.assertNoRunningConnections()); CountDownLatch latch = new CountDownLatch(1); connection.ensureConnected(new LatchedActionListener<>(new ActionListener() { @@ -1155,8 +1152,8 @@ public void onFailure(Exception e) { } }, latch)); latch.await(); - assertTrue(service.nodeConnected(seedNode)); - assertTrue(service.nodeConnected(discoverableNode)); + assertTrue(connectionManager.nodeConnected(seedNode)); + assertTrue(connectionManager.nodeConnected(discoverableNode)); assertTrue(connection.assertNoRunningConnections()); // exec again we are already connected @@ -1171,8 +1168,8 @@ public void onFailure(Exception e) { } }, latch)); latch.await(); - assertTrue(service.nodeConnected(seedNode)); - assertTrue(service.nodeConnected(discoverableNode)); + assertTrue(connectionManager.nodeConnected(seedNode)); + assertTrue(connectionManager.nodeConnected(discoverableNode)); assertTrue(connection.assertNoRunningConnections()); } } @@ -1188,7 +1185,7 @@ public void testCollectNodes() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { + Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true, null)) { if (randomBoolean()) { updateSeedNodes(connection, Arrays.asList(() -> seedNode)); } @@ -1236,7 +1233,7 @@ public void testConnectedNodesConcurrentAccess() throws IOException, Interrupted service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { + seedNodes, service, Integer.MAX_VALUE, n -> true, null)) { final int numGetThreads = randomIntBetween(4, 10); final Thread[] getThreads = new Thread[numGetThreads]; final int numModifyingThreads = randomIntBetween(4, 10); @@ -1326,21 +1323,22 @@ public void testClusterNameIsChecked() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList( () -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { - updateSeedNodes(connection, Arrays.asList(() -> seedNode)); - assertTrue(service.nodeConnected(seedNode)); - assertTrue(service.nodeConnected(discoverableNode)); + Arrays.asList( () -> seedNode), service, Integer.MAX_VALUE, n -> true, null)) { + ConnectionManager connectionManager = connection.getConnectionManager(); + updateSeedNodes(connection, Collections.singletonList(() -> seedNode)); + assertTrue(connectionManager.nodeConnected(seedNode)); + assertTrue(connectionManager.nodeConnected(discoverableNode)); assertTrue(connection.assertNoRunningConnections()); List> discoveryNodes = - Arrays.asList(() -> otherClusterTransport.getLocalDiscoNode(), () -> seedNode); + Arrays.asList(otherClusterTransport::getLocalDiscoNode, () -> seedNode); Collections.shuffle(discoveryNodes, random()); updateSeedNodes(connection, discoveryNodes); - assertTrue(service.nodeConnected(seedNode)); + assertTrue(connectionManager.nodeConnected(seedNode)); for (DiscoveryNode otherClusterNode : otherClusterKnownNodes) { - assertFalse(service.nodeConnected(otherClusterNode)); + assertFalse(connectionManager.nodeConnected(otherClusterNode)); } - assertFalse(service.nodeConnected(otherClusterTransport.getLocalDiscoNode())); - assertTrue(service.nodeConnected(discoverableNode)); + assertFalse(connectionManager.nodeConnected(otherClusterTransport.getLocalDiscoNode())); + assertTrue(connectionManager.nodeConnected(discoverableNode)); assertTrue(connection.assertNoRunningConnections()); IllegalStateException illegalStateException = expectThrows(IllegalStateException.class, () -> updateSeedNodes(connection, Arrays.asList(() -> otherClusterTransport.getLocalDiscoNode()))); @@ -1366,7 +1364,7 @@ public void testGetConnection() throws Exception { knownNodes.add(disconnectedNode); try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { - Transport.Connection seedConnection = new Transport.Connection() { + Transport.Connection seedConnection = new CloseableConnection() { @Override public DiscoveryNode getNode() { return connectedNode; @@ -1377,37 +1375,25 @@ public void sendRequest(long requestId, String action, TransportRequest request, throws TransportException { // no-op } - - @Override - public void addCloseListener(ActionListener listener) { - // no-op - } - - @Override - public boolean isClosed() { - return false; - } - - @Override - public void close() { - // no-op - } }; - service.addNodeConnectedBehavior(connectedNode.getAddress(), (connectionManager, discoveryNode) + ConnectionManager delegate = new ConnectionManager(Settings.EMPTY, service.transport, threadPool); + StubbableConnectionManager connectionManager = new StubbableConnectionManager(delegate, Settings.EMPTY, service.transport, + threadPool); + + connectionManager.addNodeConnectedBehavior(connectedNode.getAddress(), (cm, discoveryNode) -> discoveryNode.equals(connectedNode)); - service.addGetConnectionBehavior(connectedNode.getAddress(), (connectionManager, discoveryNode) -> { + connectionManager.addConnectBehavior(connectedNode.getAddress(), (cm, discoveryNode) -> { if (discoveryNode == connectedNode) { return seedConnection; } - return connectionManager.getConnection(discoveryNode); + return cm.getConnection(discoveryNode); }); service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Collections.singletonList(() -> connectedNode), service, service.getConnectionManager(), - Integer.MAX_VALUE, n -> true, null)) { + Collections.singletonList(() -> connectedNode), service, Integer.MAX_VALUE, n -> true, null, connectionManager)) { connection.addConnectedNode(connectedNode); for (int i = 0; i < 10; i++) { //always a direct connection as the remote node is already connected @@ -1449,7 +1435,7 @@ public void testLazyResolveTransportAddress() throws Exception { return seedNode; }; try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(seedSupplier), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { + Arrays.asList(seedSupplier), service, Integer.MAX_VALUE, n -> true, null)) { updateSeedNodes(connection, Arrays.asList(seedSupplier)); // Closing connections leads to RemoteClusterConnection.ConnectHandler.collectRemoteNodes // being called again so we try to resolve the same seed node's host twice @@ -1481,7 +1467,7 @@ public void testProxyMode() throws Exception { RemoteClusterAware.buildSeedNode("some-remote-cluster", "node_0:" + randomIntBetween(1, 10000), true); assertEquals("node_0", seedSupplier.get().getAttributes().get("server_name")); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(seedSupplier), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, proxyAddress)) { + Arrays.asList(seedSupplier), service, Integer.MAX_VALUE, n -> true, proxyAddress)) { updateSeedNodes(connection, Arrays.asList(seedSupplier), proxyAddress); assertEquals(2, connection.getNumNodesConnected()); assertNotNull(connection.getConnection(discoverableTransport.getLocalDiscoNode())); @@ -1490,7 +1476,7 @@ public void testProxyMode() throws Exception { .getNode().getAddress().toString()); assertEquals(proxyAddress, connection.getConnection(discoverableTransport.getLocalDiscoNode()) .getNode().getAddress().toString()); - service.getConnectionManager().disconnectFromNode(knownNodes.get(0)); + connection.getConnectionManager().disconnectFromNode(knownNodes.get(0)); // ensure we reconnect assertBusy(() -> { assertEquals(2, connection.getNumNodesConnected()); diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index 0a02b66b50a29..f03b202deec37 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -366,7 +366,7 @@ public void testDefaultPingSchedule() throws IOException { assertTrue(service.isCrossClusterSearchEnabled()); assertTrue(service.isRemoteClusterRegistered("cluster_1")); RemoteClusterConnection remoteClusterConnection = service.getRemoteClusterConnection("cluster_1"); - assertEquals(pingSchedule, remoteClusterConnection.getConnectionManager().getPingSchedule()); + assertEquals(pingSchedule, remoteClusterConnection.getConnectionManager().getConnectionProfile().getPingInterval()); } } } @@ -394,9 +394,11 @@ public void testCustomPingSchedule() throws IOException { Settings.Builder builder = Settings.builder(); builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString()); - TimeValue pingSchedule1 = randomBoolean() ? TimeValue.MINUS_ONE : TimeValue.timeValueSeconds(randomIntBetween(1, 10)); + TimeValue pingSchedule1 = // randomBoolean() ? TimeValue.MINUS_ONE : + TimeValue.timeValueSeconds(randomIntBetween(1, 10)); builder.put("cluster.remote.cluster_1.transport.ping_schedule", pingSchedule1); - TimeValue pingSchedule2 = randomBoolean() ? TimeValue.MINUS_ONE : TimeValue.timeValueSeconds(randomIntBetween(1, 10)); + TimeValue pingSchedule2 = //randomBoolean() ? TimeValue.MINUS_ONE : + TimeValue.timeValueSeconds(randomIntBetween(1, 10)); builder.put("cluster.remote.cluster_2.transport.ping_schedule", pingSchedule2); try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) { assertFalse(service.isCrossClusterSearchEnabled()); @@ -406,9 +408,9 @@ public void testCustomPingSchedule() throws IOException { assertTrue(service.isCrossClusterSearchEnabled()); assertTrue(service.isRemoteClusterRegistered("cluster_1")); RemoteClusterConnection remoteClusterConnection1 = service.getRemoteClusterConnection("cluster_1"); - assertEquals(pingSchedule1, remoteClusterConnection1.getConnectionManager().getPingSchedule()); + assertEquals(pingSchedule1, remoteClusterConnection1.getConnectionManager().getConnectionProfile().getPingInterval()); RemoteClusterConnection remoteClusterConnection2 = service.getRemoteClusterConnection("cluster_2"); - assertEquals(pingSchedule2, remoteClusterConnection2.getConnectionManager().getPingSchedule()); + assertEquals(pingSchedule2, remoteClusterConnection2.getConnectionManager().getConnectionProfile().getPingInterval()); } } } diff --git a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java index 7ec5ebd10a54e..199cf42546d12 100644 --- a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java @@ -182,13 +182,13 @@ public void testCompressRequest() throws IOException { new BigArrays(new PageCacheRecycler(Settings.EMPTY), null), null, null, null) { @Override - protected FakeChannel bind(String name, InetSocketAddress address) throws IOException { + protected FakeServerChannel bind(String name, InetSocketAddress address) throws IOException { return null; } @Override - protected FakeChannel initiateChannel(DiscoveryNode node) throws IOException { - return new FakeChannel(messageCaptor); + protected FakeTcpChannel initiateChannel(DiscoveryNode node) throws IOException { + return new FakeTcpChannel(true, messageCaptor); } @Override @@ -203,7 +203,7 @@ public NodeChannels openConnection(DiscoveryNode node, ConnectionProfile connect int numConnections = connectionProfile.getNumConnections(); ArrayList fakeChannels = new ArrayList<>(numConnections); for (int i = 0; i < numConnections; ++i) { - fakeChannels.add(new FakeChannel(messageCaptor)); + fakeChannels.add(new FakeTcpChannel(false, messageCaptor)); } return new NodeChannels(node, fakeChannels, connectionProfile, Version.CURRENT); } @@ -248,13 +248,7 @@ public NodeChannels openConnection(DiscoveryNode node, ConnectionProfile connect } } - private static final class FakeChannel implements TcpChannel, TcpServerChannel { - - private final AtomicReference messageCaptor; - - FakeChannel(AtomicReference messageCaptor) { - this.messageCaptor = messageCaptor; - } + private static final class FakeServerChannel implements TcpServerChannel { @Override public void close() { @@ -269,10 +263,6 @@ public String getProfile() { public void addCloseListener(ActionListener listener) { } - @Override - public void addConnectListener(ActionListener listener) { - } - @Override public boolean isOpen() { return false; @@ -282,16 +272,6 @@ public boolean isOpen() { public InetSocketAddress getLocalAddress() { return null; } - - @Override - public InetSocketAddress getRemoteAddress() { - return null; - } - - @Override - public void sendMessage(BytesReference reference, ActionListener listener) { - messageCaptor.set(reference); - } } private static final class Req extends TransportRequest { diff --git a/server/src/test/java/org/elasticsearch/transport/TcpTransportHandshakerTests.java b/server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java similarity index 99% rename from server/src/test/java/org/elasticsearch/transport/TcpTransportHandshakerTests.java rename to server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java index 23e3870842e20..ec6860f6adddf 100644 --- a/server/src/test/java/org/elasticsearch/transport/TcpTransportHandshakerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java @@ -36,7 +36,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; -public class TcpTransportHandshakerTests extends ESTestCase { +public class TransportHandshakerTests extends ESTestCase { private TcpTransportHandshaker handshaker; private DiscoveryNode node; diff --git a/server/src/test/java/org/elasticsearch/transport/TransportKeepAliveTests.java b/server/src/test/java/org/elasticsearch/transport/TransportKeepAliveTests.java new file mode 100644 index 0000000000000..a56db579cec80 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/transport/TransportKeepAliveTests.java @@ -0,0 +1,220 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.transport; + +import org.elasticsearch.common.AsyncBiFunction; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Arrays; +import java.util.Collections; +import java.util.Deque; +import java.util.concurrent.ScheduledFuture; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class TransportKeepAliveTests extends ESTestCase { + + private final ConnectionProfile defaultProfile = ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY); + private BytesReference expectedPingMessage; + private AsyncBiFunction pingSender; + private TransportKeepAlive keepAlive; + private CapturingThreadPool threadPool; + + @Override + @SuppressWarnings("unchecked") + public void setUp() throws Exception { + super.setUp(); + pingSender = mock(AsyncBiFunction.class); + threadPool = new CapturingThreadPool(); + keepAlive = new TransportKeepAlive(threadPool, pingSender); + + try (BytesStreamOutput out = new BytesStreamOutput()) { + out.writeByte((byte) 'E'); + out.writeByte((byte) 'S'); + out.writeInt(-1); + expectedPingMessage = out.bytes(); + } catch (IOException e) { + throw new AssertionError(e.getMessage(), e); // won't happen + } + } + + @Override + public void tearDown() throws Exception { + threadPool.shutdown(); + super.tearDown(); + } + + public void testRegisterNodeConnectionSchedulesKeepAlive() { + TimeValue pingInterval = TimeValue.timeValueSeconds(randomLongBetween(1, 60)); + ConnectionProfile connectionProfile = new ConnectionProfile.Builder(defaultProfile) + .setPingInterval(pingInterval) + .build(); + + assertEquals(0, threadPool.scheduledTasks.size()); + + TcpChannel channel1 = new FakeTcpChannel(); + TcpChannel channel2 = new FakeTcpChannel(); + channel1.getChannelStats().markAccessed(threadPool.relativeTimeInMillis()); + channel2.getChannelStats().markAccessed(threadPool.relativeTimeInMillis()); + keepAlive.registerNodeConnection(Arrays.asList(channel1, channel2), connectionProfile); + + assertEquals(1, threadPool.scheduledTasks.size()); + Tuple taskTuple = threadPool.scheduledTasks.poll(); + assertEquals(pingInterval, taskTuple.v1()); + Runnable keepAliveTask = taskTuple.v2(); + assertEquals(0, threadPool.scheduledTasks.size()); + keepAliveTask.run(); + + verify(pingSender, times(1)).apply(same(channel1), eq(expectedPingMessage), any()); + verify(pingSender, times(1)).apply(same(channel2), eq(expectedPingMessage), any()); + + // Test that the task has rescheduled itself + assertEquals(1, threadPool.scheduledTasks.size()); + Tuple rescheduledTask = threadPool.scheduledTasks.poll(); + assertEquals(pingInterval, rescheduledTask.v1()); + } + + public void testRegisterMultipleKeepAliveIntervals() { + TimeValue pingInterval1 = TimeValue.timeValueSeconds(randomLongBetween(1, 30)); + ConnectionProfile connectionProfile1 = new ConnectionProfile.Builder(defaultProfile) + .setPingInterval(pingInterval1) + .build(); + + TimeValue pingInterval2 = TimeValue.timeValueSeconds(randomLongBetween(31, 60)); + ConnectionProfile connectionProfile2 = new ConnectionProfile.Builder(defaultProfile) + .setPingInterval(pingInterval2) + .build(); + + assertEquals(0, threadPool.scheduledTasks.size()); + + TcpChannel channel1 = new FakeTcpChannel(); + TcpChannel channel2 = new FakeTcpChannel(); + channel1.getChannelStats().markAccessed(threadPool.relativeTimeInMillis()); + channel2.getChannelStats().markAccessed(threadPool.relativeTimeInMillis()); + keepAlive.registerNodeConnection(Collections.singletonList(channel1), connectionProfile1); + keepAlive.registerNodeConnection(Collections.singletonList(channel2), connectionProfile2); + + assertEquals(2, threadPool.scheduledTasks.size()); + Tuple taskTuple1 = threadPool.scheduledTasks.poll(); + Tuple taskTuple2 = threadPool.scheduledTasks.poll(); + assertEquals(pingInterval1, taskTuple1.v1()); + assertEquals(pingInterval2, taskTuple2.v1()); + Runnable keepAliveTask1 = taskTuple1.v2(); + Runnable keepAliveTask2 = taskTuple1.v2(); + + assertEquals(0, threadPool.scheduledTasks.size()); + keepAliveTask1.run(); + assertEquals(1, threadPool.scheduledTasks.size()); + keepAliveTask2.run(); + assertEquals(2, threadPool.scheduledTasks.size()); + } + + public void testClosingChannelUnregistersItFromKeepAlive() { + TimeValue pingInterval1 = TimeValue.timeValueSeconds(randomLongBetween(1, 30)); + ConnectionProfile connectionProfile = new ConnectionProfile.Builder(defaultProfile) + .setPingInterval(pingInterval1) + .build(); + + TcpChannel channel1 = new FakeTcpChannel(); + TcpChannel channel2 = new FakeTcpChannel(); + channel1.getChannelStats().markAccessed(threadPool.relativeTimeInMillis()); + channel2.getChannelStats().markAccessed(threadPool.relativeTimeInMillis()); + keepAlive.registerNodeConnection(Collections.singletonList(channel1), connectionProfile); + keepAlive.registerNodeConnection(Collections.singletonList(channel2), connectionProfile); + + channel1.close(); + + Runnable task = threadPool.scheduledTasks.poll().v2(); + task.run(); + + verify(pingSender, times(0)).apply(same(channel1), eq(expectedPingMessage), any()); + verify(pingSender, times(1)).apply(same(channel2), eq(expectedPingMessage), any()); + } + + public void testKeepAliveResponseIfServer() { + TcpChannel channel = new FakeTcpChannel(true); + channel.getChannelStats().markAccessed(threadPool.relativeTimeInMillis()); + + keepAlive.receiveKeepAlive(channel); + + verify(pingSender, times(1)).apply(same(channel), eq(expectedPingMessage), any()); + } + + public void testNoKeepAliveResponseIfClient() { + TcpChannel channel = new FakeTcpChannel(false); + channel.getChannelStats().markAccessed(threadPool.relativeTimeInMillis()); + + keepAlive.receiveKeepAlive(channel); + + verify(pingSender, times(0)).apply(same(channel), eq(expectedPingMessage), any()); + } + + public void testOnlySendPingIfWeHaveNotWrittenAndReadSinceLastPing() { + TimeValue pingInterval = TimeValue.timeValueSeconds(15); + ConnectionProfile connectionProfile = new ConnectionProfile.Builder(defaultProfile) + .setPingInterval(pingInterval) + .build(); + + TcpChannel channel1 = new FakeTcpChannel(); + TcpChannel channel2 = new FakeTcpChannel(); + channel1.getChannelStats().markAccessed(threadPool.relativeTimeInMillis()); + channel2.getChannelStats().markAccessed(threadPool.relativeTimeInMillis()); + keepAlive.registerNodeConnection(Arrays.asList(channel1, channel2), connectionProfile); + + Tuple taskTuple = threadPool.scheduledTasks.poll(); + taskTuple.v2().run(); + + TcpChannel.ChannelStats stats = channel1.getChannelStats(); + stats.markAccessed(threadPool.relativeTimeInMillis() + (pingInterval.millis() / 2)); + + taskTuple = threadPool.scheduledTasks.poll(); + taskTuple.v2().run(); + + verify(pingSender, times(1)).apply(same(channel1), eq(expectedPingMessage), any()); + verify(pingSender, times(2)).apply(same(channel2), eq(expectedPingMessage), any()); + } + + private class CapturingThreadPool extends TestThreadPool { + + private final Deque> scheduledTasks = new ArrayDeque<>(); + + private CapturingThreadPool() { + super(getTestName()); + } + + @Override + public ScheduledFuture schedule(TimeValue delay, String executor, Runnable task) { + scheduledTasks.add(new Tuple<>(delay, task)); + return null; + } + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java index d35fe609c0855..ce0e38a83f88d 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java @@ -210,11 +210,6 @@ public void sendRequest(long requestId, String action, TransportRequest request, } } - @Override - public boolean sendPing() { - return connection.sendPing(); - } - @Override public void addCloseListener(ActionListener listener) { connection.addCloseListener(listener); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index 3741be92b8da0..aa8da669cdbbc 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -2013,7 +2013,26 @@ public void testHandshakeUpdatesVersion() throws IOException { } } - public void testTcpHandshake() throws IOException, InterruptedException { + public void testKeepAlivePings() throws Exception { + assumeTrue("only tcp transport has keep alive pings", serviceA.getOriginalTransport() instanceof TcpTransport); + TcpTransport originalTransport = (TcpTransport) serviceA.getOriginalTransport(); + + ConnectionProfile defaultProfile = ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY); + ConnectionProfile connectionProfile = new ConnectionProfile.Builder(defaultProfile) + .setPingInterval(TimeValue.timeValueMillis(50)) + .build(); + try (TransportService service = buildService("TS_TPC", Version.CURRENT, null); + TcpTransport.NodeChannels connection = originalTransport.openConnection( + new DiscoveryNode("TS_TPC", "TS_TPC", service.boundAddress().publishAddress(), emptyMap(), emptySet(), version0), + connectionProfile)) { + assertBusy(() -> { + assertTrue(originalTransport.getKeepAlive().successfulPingCount() > 30); + }); + assertEquals(0, originalTransport.getKeepAlive().failedPingCount()); + } + } + + public void testTcpHandshake() { assumeTrue("only tcp transport has a handshake method", serviceA.getOriginalTransport() instanceof TcpTransport); TcpTransport originalTransport = (TcpTransport) serviceA.getOriginalTransport(); NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList()); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/FakeTcpChannel.java b/test/framework/src/main/java/org/elasticsearch/transport/FakeTcpChannel.java new file mode 100644 index 0000000000000..cd598a6ca3106 --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/transport/FakeTcpChannel.java @@ -0,0 +1,104 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.transport; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.concurrent.CompletableContext; + +import java.net.InetSocketAddress; +import java.util.concurrent.atomic.AtomicReference; + +public class FakeTcpChannel implements TcpChannel { + + private final boolean isServer; + private final String profile; + private final AtomicReference messageCaptor; + private final ChannelStats stats = new ChannelStats(); + private final CompletableContext closeContext = new CompletableContext<>(); + + public FakeTcpChannel() { + this(false, "profile", new AtomicReference<>()); + } + + public FakeTcpChannel(boolean isServer) { + this(isServer, "profile", new AtomicReference<>()); + } + + public FakeTcpChannel(boolean isServer, AtomicReference messageCaptor) { + this(isServer, "profile", messageCaptor); + } + + + public FakeTcpChannel(boolean isServer, String profile, AtomicReference messageCaptor) { + this.isServer = isServer; + this.profile = profile; + this.messageCaptor = messageCaptor; + } + + @Override + public boolean isServerChannel() { + return isServer; + } + + @Override + public String getProfile() { + return profile; + } + + @Override + public InetSocketAddress getLocalAddress() { + return null; + } + + @Override + public InetSocketAddress getRemoteAddress() { + return null; + } + + @Override + public void sendMessage(BytesReference reference, ActionListener listener) { + messageCaptor.set(reference); + } + + @Override + public void addConnectListener(ActionListener listener) { + + } + + @Override + public void close() { + closeContext.complete(null); + } + + @Override + public void addCloseListener(ActionListener listener) { + closeContext.addListener(ActionListener.toBiConsumer(listener)); + } + + @Override + public boolean isOpen() { + return closeContext.isDone() == false; + } + + @Override + public ChannelStats getChannelStats() { + return stats; + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java index 48358960c3f3a..b5cdbeae2c960 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java @@ -78,7 +78,7 @@ public class MockTcpTransport extends TcpTransport { * A pre-built light connection profile that shares a single connection across all * types. */ - public static final ConnectionProfile LIGHT_PROFILE; + static final ConnectionProfile LIGHT_PROFILE; private final Set openChannels = new HashSet<>(); @@ -173,7 +173,7 @@ private void readMessage(MockChannel mockChannel, StreamInput input) throws IOEx protected MockChannel initiateChannel(DiscoveryNode node) throws IOException { InetSocketAddress address = node.getAddress().address(); final MockSocket socket = new MockSocket(); - final MockChannel channel = new MockChannel(socket, address, "none"); + final MockChannel channel = new MockChannel(socket, address, false, "none"); boolean success = false; try { @@ -219,6 +219,7 @@ protected ConnectionProfile maybeOverrideConnectionProfile(ConnectionProfile con } builder.setHandshakeTimeout(connectionProfile.getHandshakeTimeout()); builder.setConnectTimeout(connectionProfile.getConnectTimeout()); + builder.setPingInterval(connectionProfile.getPingInterval()); builder.setCompressionEnabled(connectionProfile.getCompressionEnabled()); return builder.build(); } @@ -242,10 +243,12 @@ public final class MockChannel implements Closeable, TcpChannel, TcpServerChanne private final ServerSocket serverSocket; private final Set workerChannels = Collections.newSetFromMap(new ConcurrentHashMap<>()); private final Socket activeChannel; + private final boolean isServer; private final String profile; private final CancellableThreads cancellableThreads = new CancellableThreads(); private final CompletableContext closeFuture = new CompletableContext<>(); private final CompletableContext connectFuture = new CompletableContext<>(); + private final ChannelStats stats = new ChannelStats(); /** * Constructs a new MockChannel instance intended for handling the actual incoming / outgoing traffic. @@ -254,9 +257,10 @@ public final class MockChannel implements Closeable, TcpChannel, TcpServerChanne * @param localAddress Address associated with the corresponding local server socket. Must not be null. * @param profile The associated profile name. */ - public MockChannel(Socket socket, InetSocketAddress localAddress, String profile) { + MockChannel(Socket socket, InetSocketAddress localAddress, boolean isServer, String profile) { this.localAddress = localAddress; this.activeChannel = socket; + this.isServer = isServer; this.serverSocket = null; this.profile = profile; synchronized (openChannels) { @@ -274,6 +278,7 @@ public MockChannel(Socket socket, InetSocketAddress localAddress, String profile this.localAddress = (InetSocketAddress) serverSocket.getLocalSocketAddress(); this.serverSocket = serverSocket; this.profile = profile; + this.isServer = false; this.activeChannel = null; synchronized (openChannels) { openChannels.add(this); @@ -288,8 +293,9 @@ public void accept(Executor executor) throws IOException { configureSocket(incomingSocket); synchronized (this) { if (isOpen.get()) { - incomingChannel = new MockChannel(incomingSocket, - new InetSocketAddress(incomingSocket.getLocalAddress(), incomingSocket.getPort()), profile); + InetSocketAddress localAddress = new InetSocketAddress(incomingSocket.getLocalAddress(), + incomingSocket.getPort()); + incomingChannel = new MockChannel(incomingSocket, localAddress, true, profile); MockChannel finalIncomingChannel = incomingChannel; incomingChannel.addCloseListener(new ActionListener() { @Override @@ -389,6 +395,11 @@ public String getProfile() { return profile; } + @Override + public boolean isServerChannel() { + return isServer; + } + @Override public void addCloseListener(ActionListener listener) { closeFuture.addListener(ActionListener.toBiConsumer(listener)); @@ -399,6 +410,11 @@ public void addConnectListener(ActionListener listener) { connectFuture.addListener(ActionListener.toBiConsumer(listener)); } + @Override + public ChannelStats getChannelStats() { + return stats; + } + @Override public boolean isOpen() { return isOpen.get();