From e016ae7434a4f6a4536c14986cba32fc3fab77a4 Mon Sep 17 00:00:00 2001 From: javanna Date: Tue, 23 Oct 2018 14:43:11 +0200 Subject: [PATCH 01/12] remove constructor --- .../transport/RemoteClusterConnection.java | 14 ++---- .../RemoteClusterConnectionTests.java | 48 +++++++++---------- 2 files changed, 28 insertions(+), 34 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index c9f3a2aa36540..de313631498b7 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -18,8 +18,6 @@ */ package org.elasticsearch.transport; -import java.net.InetSocketAddress; -import java.util.function.Supplier; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.util.SetOnce; @@ -48,6 +46,7 @@ import java.io.Closeable; import java.io.IOException; +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -64,6 +63,7 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; +import java.util.function.Supplier; import java.util.stream.Collectors; /** @@ -104,17 +104,11 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo * @param connectionManager the connection manager to use for this remote connection * @param maxNumRemoteConnections the maximum number of connections to the remote cluster * @param nodePredicate a predicate to filter eligible remote nodes to connect to + * @param proxyAddress the proxy address */ RemoteClusterConnection(Settings settings, String clusterAlias, List> seedNodes, TransportService transportService, ConnectionManager connectionManager, int maxNumRemoteConnections, - Predicate nodePredicate) { - this(settings, clusterAlias, seedNodes, transportService, connectionManager, maxNumRemoteConnections, nodePredicate, null); - } - - RemoteClusterConnection(Settings settings, String clusterAlias, List> seedNodes, - TransportService transportService, ConnectionManager connectionManager, int maxNumRemoteConnections, Predicate - nodePredicate, - String proxyAddress) { + Predicate nodePredicate, String proxyAddress) { super(settings); this.transportService = transportService; this.maxNumRemoteConnections = maxNumRemoteConnections; diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index 46364c19ee0ec..12b14b25bc458 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -18,9 +18,6 @@ */ package org.elasticsearch.transport; -import java.util.HashMap; -import java.util.Map; -import java.util.function.Supplier; import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; @@ -68,7 +65,9 @@ import java.util.Arrays; import java.util.Base64; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; @@ -79,6 +78,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import java.util.function.Supplier; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; @@ -163,7 +163,7 @@ public void testRemoteProfileIsUsedForLocalCluster() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { updateSeedNodes(connection, Arrays.asList(() -> seedNode)); assertTrue(service.nodeConnected(seedNode)); assertTrue(service.nodeConnected(discoverableNode)); @@ -206,7 +206,7 @@ public void testRemoteProfileIsUsedForRemoteCluster() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { updateSeedNodes(connection, Arrays.asList(() -> seedNode)); assertTrue(service.nodeConnected(seedNode)); assertTrue(service.nodeConnected(discoverableNode)); @@ -262,7 +262,7 @@ public void testDiscoverSingleNode() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { updateSeedNodes(connection, Arrays.asList(() -> seedNode)); assertTrue(service.nodeConnected(seedNode)); assertTrue(service.nodeConnected(discoverableNode)); @@ -291,7 +291,7 @@ public void testDiscoverSingleNodeWithIncompatibleSeed() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + seedNodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { updateSeedNodes(connection, seedNodes); assertTrue(service.nodeConnected(seedNode)); assertTrue(service.nodeConnected(discoverableNode)); @@ -318,7 +318,7 @@ public void testNodeDisconnected() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { updateSeedNodes(connection, Arrays.asList(() -> seedNode)); assertTrue(service.nodeConnected(seedNode)); assertTrue(service.nodeConnected(discoverableNode)); @@ -368,7 +368,7 @@ public void testFilterDiscoveredNodes() throws Exception { service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, - n -> n.equals(rejectedNode) == false)) { + n -> n.equals(rejectedNode) == false, null)) { updateSeedNodes(connection, Arrays.asList(() -> seedNode)); if (rejectedNode.equals(seedNode)) { assertFalse(service.nodeConnected(seedNode)); @@ -428,7 +428,7 @@ public void testConnectWithIncompatibleTransports() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { expectThrows(Exception.class, () -> updateSeedNodes(connection, Arrays.asList(() -> seedNode))); assertFalse(service.nodeConnected(seedNode)); assertTrue(connection.assertNoRunningConnections()); @@ -491,7 +491,7 @@ public void close() { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { connection.addConnectedNode(seedNode); for (DiscoveryNode node : knownNodes) { final Transport.Connection transportConnection = connection.getConnection(node); @@ -534,7 +534,7 @@ public void run() { CountDownLatch listenerCalled = new CountDownLatch(1); AtomicReference exceptionReference = new AtomicReference<>(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { ActionListener listener = ActionListener.wrap(x -> { listenerCalled.countDown(); fail("expected exception"); @@ -571,7 +571,7 @@ public void testFetchShards() throws Exception { service.acceptIncomingRequests(); List> nodes = Collections.singletonList(() -> seedNode); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - nodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + nodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { if (randomBoolean()) { updateSeedNodes(connection, nodes); } @@ -611,7 +611,7 @@ public void testFetchShardsThreadContextHeader() throws Exception { service.acceptIncomingRequests(); List> nodes = Collections.singletonList(() -> seedNode); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - nodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + nodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { SearchRequest request = new SearchRequest("test-index"); Thread[] threads = new Thread[10]; for (int i = 0; i < threads.length; i++) { @@ -665,7 +665,7 @@ public void testFetchShardsSkipUnavailable() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Collections.singletonList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + Collections.singletonList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { SearchRequest request = new SearchRequest("test-index"); ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest("test-index") @@ -775,7 +775,7 @@ public void testTriggerUpdatesConcurrently() throws IOException, InterruptedExce service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + seedNodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { int numThreads = randomIntBetween(4, 10); Thread[] threads = new Thread[numThreads]; CyclicBarrier barrier = new CyclicBarrier(numThreads); @@ -854,7 +854,7 @@ public void testCloseWhileConcurrentlyConnecting() throws IOException, Interrupt service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + seedNodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { int numThreads = randomIntBetween(4, 10); Thread[] threads = new Thread[numThreads]; CyclicBarrier barrier = new CyclicBarrier(numThreads + 1); @@ -943,7 +943,7 @@ public void testGetConnectionInfo() throws Exception { service.acceptIncomingRequests(); int maxNumConnections = randomIntBetween(1, 5); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, service.connectionManager(), maxNumConnections, n -> true)) { + seedNodes, service, service.connectionManager(), maxNumConnections, n -> true, null)) { // test no nodes connected RemoteConnectionInfo remoteConnectionInfo = assertSerialization(connection.getConnectionInfo()); assertNotNull(remoteConnectionInfo); @@ -1090,7 +1090,7 @@ public void testEnsureConnected() throws IOException, InterruptedException { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { assertFalse(service.nodeConnected(seedNode)); assertFalse(service.nodeConnected(discoverableNode)); assertTrue(connection.assertNoRunningConnections()); @@ -1139,7 +1139,7 @@ public void testCollectNodes() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { if (randomBoolean()) { updateSeedNodes(connection, Arrays.asList(() -> seedNode)); } @@ -1187,7 +1187,7 @@ public void testConnectedNodesConcurrentAccess() throws IOException, Interrupted service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + seedNodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { final int numGetThreads = randomIntBetween(4, 10); final Thread[] getThreads = new Thread[numGetThreads]; final int numModifyingThreads = randomIntBetween(4, 10); @@ -1277,7 +1277,7 @@ public void testClusterNameIsChecked() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList( () -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + Arrays.asList( () -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { updateSeedNodes(connection, Arrays.asList(() -> seedNode)); assertTrue(service.nodeConnected(seedNode)); assertTrue(service.nodeConnected(discoverableNode)); @@ -1357,7 +1357,7 @@ public void close() { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Collections.singletonList(() -> connectedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + Collections.singletonList(() -> connectedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { connection.addConnectedNode(connectedNode); for (int i = 0; i < 10; i++) { //always a direct connection as the remote node is already connected @@ -1399,7 +1399,7 @@ public void testLazyResolveTransportAddress() throws Exception { return seedNode; }; try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(seedSupplier), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + Arrays.asList(seedSupplier), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { updateSeedNodes(connection, Arrays.asList(seedSupplier)); // Closing connections leads to RemoteClusterConnection.ConnectHandler.collectRemoteNodes // being called again so we try to resolve the same seed node's host twice From edfdac52eff3d76ba83fd00ddad364d4911d57f0 Mon Sep 17 00:00:00 2001 From: javanna Date: Tue, 23 Oct 2018 14:51:07 +0200 Subject: [PATCH 02/12] remove connection manager public constructor --- .../org/elasticsearch/transport/ConnectionManager.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java b/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java index 4e4d369330c80..dc6241d1da338 100644 --- a/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java +++ b/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java @@ -18,8 +18,8 @@ */ package org.elasticsearch.transport; -import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -67,14 +67,10 @@ public class ConnectionManager implements Closeable { private final DelegatingNodeConnectionListener connectionListener = new DelegatingNodeConnectionListener(); public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool) { - this(settings, transport, threadPool, buildDefaultConnectionProfile(settings)); - } - - public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool, ConnectionProfile defaultProfile) { this.transport = transport; this.threadPool = threadPool; this.pingSchedule = TcpTransport.PING_SCHEDULE.get(settings); - this.defaultProfile = defaultProfile; + this.defaultProfile = buildDefaultConnectionProfile(settings); this.lifecycle.moveToStarted(); if (pingSchedule.millis() > 0) { From 95b55f39a57826a3229481a89b6b12065027a055 Mon Sep 17 00:00:00 2001 From: javanna Date: Tue, 23 Oct 2018 16:32:07 +0200 Subject: [PATCH 03/12] Schedule ping by default for remote clusters When we connect to remote clusters, there may be a few more routers/firewalls in-between compared to when we connect to nodes in the same cluster. We've experienced cases where firewalls drop connections completely and keep-alives seem not to be enough, or they are not properly configured. With this commit we enable application-level pings by default every 5 seconds from CCS nodes to the selected remote nodes. We also add a setting called `cluster.remote.ping_schedule` that allows to change the interval and potentially disable application-level pings, similar to `transport.ping_schedule` but the new setting only affects connections made to remote clusters. Relates to #34405 --- .../modules/remote-clusters.asciidoc | 6 +++ docs/reference/modules/transport.asciidoc | 8 +-- .../common/settings/ClusterSettings.java | 1 + .../transport/ConnectionManager.java | 11 +++- .../transport/RemoteClusterService.java | 32 +++++------ .../transport/RemoteClusterServiceTests.java | 53 +++++++++++++++++++ 6 files changed, 89 insertions(+), 22 deletions(-) diff --git a/docs/reference/modules/remote-clusters.asciidoc b/docs/reference/modules/remote-clusters.asciidoc index 81d882f5f0eb6..93f1bad06faff 100644 --- a/docs/reference/modules/remote-clusters.asciidoc +++ b/docs/reference/modules/remote-clusters.asciidoc @@ -129,6 +129,12 @@ PUT _cluster/settings The time to wait for remote connections to be established when the node starts. The default is `30s`. +`cluster.remote.ping_schedule`:: + + Schedule a regular application-level ping message to ensure that transport + connections to nodes belonging to remote clusters are kept alive. Defaults + to `5s`, it can be set to `-1` to disable pings. + `cluster.remote.node.attr`:: A node attribute to filter out nodes that are eligible as a gateway node in diff --git a/docs/reference/modules/transport.asciidoc b/docs/reference/modules/transport.asciidoc index 257181f70c507..757bc9eb10db6 100644 --- a/docs/reference/modules/transport.asciidoc +++ b/docs/reference/modules/transport.asciidoc @@ -46,10 +46,10 @@ between all nodes. Defaults to `false`. |`transport.ping_schedule` | Schedule a regular application-level ping message to ensure that transport connections between nodes are kept alive. Defaults to -`5s` in the transport client and `-1` (disabled) elsewhere. It is preferable to -correctly configure TCP keep-alives instead of using this feature, because TCP -keep-alives apply to all kinds of long-lived connection and not just to -transport connections. +`5s` in the transport client and connections to remote clusters, and `-1` +(disabled) elsewhere. It is preferable to correctly configure TCP keep-alives +instead of using this feature, because TCP keep-alives apply to all kinds of +long-lived connections and not just to transport connections. |======================================================================= diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 4b4ebb7414acb..4966d4497f27b 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -292,6 +292,7 @@ public void apply(Settings value, Settings current, Settings previous) { RemoteClusterService.SEARCH_REMOTE_NODE_ATTRIBUTE, RemoteClusterService.ENABLE_REMOTE_CLUSTERS, RemoteClusterService.SEARCH_ENABLE_REMOTE_CLUSTERS, + RemoteClusterService.REMOTE_PING_SCHEDULE, TransportService.TRACE_LOG_EXCLUDE_SETTING, TransportService.TRACE_LOG_INCLUDE_SETTING, TransportCloseIndexAction.CLUSTER_INDICES_CLOSE_ENABLE_SETTING, diff --git a/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java b/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java index dc6241d1da338..52bdd0e49177f 100644 --- a/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java +++ b/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java @@ -67,12 +67,15 @@ public class ConnectionManager implements Closeable { private final DelegatingNodeConnectionListener connectionListener = new DelegatingNodeConnectionListener(); public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool) { + this(settings, transport, threadPool, TcpTransport.PING_SCHEDULE.get(settings)); + } + + public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool, TimeValue pingSchedule) { this.transport = transport; this.threadPool = threadPool; - this.pingSchedule = TcpTransport.PING_SCHEDULE.get(settings); + this.pingSchedule = pingSchedule; this.defaultProfile = buildDefaultConnectionProfile(settings); this.lifecycle.moveToStarted(); - if (pingSchedule.millis() > 0) { threadPool.schedule(pingSchedule, ThreadPool.Names.GENERIC, new ScheduledPing()); } @@ -287,6 +290,10 @@ public void onFailure(Exception e) { } } + public TimeValue getPingSchedule() { + return pingSchedule; + } + private static final class DelegatingNodeConnectionListener implements TransportConnectionListener { private final CopyOnWriteArrayList listeners = new CopyOnWriteArrayList<>(); diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index dc3bd3a353604..639b3b6c5e3fc 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -71,6 +71,10 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl assert Version.CURRENT.major < 8; } + //the default here (5s) differs from the default in TcpTransport.PING_SCHEDULE (which is -1, hence disabled) + public static final Setting REMOTE_PING_SCHEDULE = + Setting.timeSetting("cluster.remote.ping_schedule", TimeValue.timeValueSeconds(5), Setting.Property.NodeScope); + public static final Setting SEARCH_REMOTE_CONNECTIONS_PER_CLUSTER = Setting.intSetting("search.remote.connections_per_cluster", 3, 1, Setting.Property.NodeScope, Setting.Property.Deprecated); @@ -211,9 +215,10 @@ private synchronized void updateRemoteClusters(Map listener) { - RemoteClusterConnection remoteClusterConnection = remoteClusters.get(clusterAlias); - if (remoteClusterConnection == null) { - throw new IllegalArgumentException("no such remote cluster: " + clusterAlias); - } - remoteClusterConnection.ensureConnected(listener); + void ensureConnected(String clusterAlias, ActionListener listener) { + getRemoteClusterConnection(clusterAlias).ensureConnected(listener); } public Transport.Connection getConnection(String cluster) { + return getRemoteClusterConnection(cluster).getConnection(); + } + + RemoteClusterConnection getRemoteClusterConnection(String cluster) { RemoteClusterConnection connection = remoteClusters.get(cluster); if (connection == null) { throw new IllegalArgumentException("no such remote cluster: " + cluster); } - return connection.getConnection(); + return connection; } @Override @@ -386,7 +387,6 @@ synchronized void updateSkipUnavailable(String clusterAlias, Boolean skipUnavail } } - @Override protected void updateRemoteCluster(String clusterAlias, List addresses, String proxyAddress) { updateRemoteCluster(clusterAlias, addresses, proxyAddress, ActionListener.wrap((x) -> {}, (x) -> {})); diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index 94ac7e963c1da..31aac92b68093 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -31,6 +31,7 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESTestCase; @@ -97,6 +98,7 @@ public void testSettingsAreRegistered() { assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER)); assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING)); assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_NODE_ATTRIBUTE)); + assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_PING_SCHEDULE)); } public void testRemoteClusterSeedSetting() { @@ -337,6 +339,57 @@ public void testIncrementallyAddClusters() throws IOException { } } + public void testDefaultPingSchedule() throws IOException { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService seedTransport = startTransport("cluster_1_node", knownNodes, Version.CURRENT)) { + DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); + knownNodes.add(seedTransport.getLocalDiscoNode()); + + try (MockTransportService transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, + null)) { + transportService.start(); + transportService.acceptIncomingRequests(); + try (RemoteClusterService service = new RemoteClusterService(Settings.EMPTY, transportService)) { + assertFalse(service.isCrossClusterSearchEnabled()); + service.initializeRemoteClusters(); + assertFalse(service.isCrossClusterSearchEnabled()); + service.updateRemoteCluster("cluster_1", Collections.singletonList(seedNode.getAddress().toString()), null); + assertTrue(service.isCrossClusterSearchEnabled()); + assertTrue(service.isRemoteClusterRegistered("cluster_1")); + RemoteClusterConnection remoteClusterConnection = service.getRemoteClusterConnection("cluster_1"); + assertEquals(TimeValue.timeValueSeconds(5), remoteClusterConnection.getConnectionManager().getPingSchedule()); + } + } + } + } + + public void testCustomPingSchedule() throws IOException { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService seedTransport = startTransport("cluster_1_node", knownNodes, Version.CURRENT)) { + DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); + knownNodes.add(seedTransport.getLocalDiscoNode()); + + try (MockTransportService transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, + null)) { + transportService.start(); + transportService.acceptIncomingRequests(); + Settings.Builder builder = Settings.builder(); + TimeValue pingSchedule = randomBoolean() ? TimeValue.MINUS_ONE : TimeValue.timeValueSeconds(randomIntBetween(1, 10)); + builder.put("cluster.remote.ping_schedule", pingSchedule); + try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) { + assertFalse(service.isCrossClusterSearchEnabled()); + service.initializeRemoteClusters(); + assertFalse(service.isCrossClusterSearchEnabled()); + service.updateRemoteCluster("cluster_1", Collections.singletonList(seedNode.getAddress().toString()), null); + assertTrue(service.isCrossClusterSearchEnabled()); + assertTrue(service.isRemoteClusterRegistered("cluster_1")); + RemoteClusterConnection remoteClusterConnection = service.getRemoteClusterConnection("cluster_1"); + assertEquals(pingSchedule, remoteClusterConnection.getConnectionManager().getPingSchedule()); + } + } + } + } + public void testRemoteNodeAttribute() throws IOException, InterruptedException { final Settings settings = Settings.builder().put("cluster.remote.node.attr", "gateway").build(); From 19b4bacfbc7a831569ee3d2c8667f2b2c52212c6 Mon Sep 17 00:00:00 2001 From: javanna Date: Tue, 23 Oct 2018 16:46:58 +0200 Subject: [PATCH 04/12] fix check-style --- .../transport/RemoteClusterConnectionTests.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index 12b14b25bc458..529b88eaa066c 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -665,7 +665,8 @@ public void testFetchShardsSkipUnavailable() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Collections.singletonList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { + Collections.singletonList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, + n -> true, null)) { SearchRequest request = new SearchRequest("test-index"); ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest("test-index") @@ -1357,7 +1358,8 @@ public void close() { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Collections.singletonList(() -> connectedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { + Collections.singletonList(() -> connectedNode), service, service.getConnectionManager(), + Integer.MAX_VALUE, n -> true, null)) { connection.addConnectedNode(connectedNode); for (int i = 0; i < 10; i++) { //always a direct connection as the remote node is already connected From 239ddc101136702ee1670975c03d9a02c41956c4 Mon Sep 17 00:00:00 2001 From: javanna Date: Wed, 24 Oct 2018 15:41:40 +0200 Subject: [PATCH 05/12] per cluster setting, default to -1 --- .../modules/remote-clusters.asciidoc | 14 +++--- docs/reference/modules/transport.asciidoc | 8 ++-- .../common/settings/ClusterSettings.java | 2 +- .../transport/RemoteClusterService.java | 19 +++++--- .../transport/RemoteClusterServiceTests.java | 45 +++++++++++++------ 5 files changed, 57 insertions(+), 31 deletions(-) diff --git a/docs/reference/modules/remote-clusters.asciidoc b/docs/reference/modules/remote-clusters.asciidoc index 93f1bad06faff..c807cfb3e3c72 100644 --- a/docs/reference/modules/remote-clusters.asciidoc +++ b/docs/reference/modules/remote-clusters.asciidoc @@ -129,12 +129,6 @@ PUT _cluster/settings The time to wait for remote connections to be established when the node starts. The default is `30s`. -`cluster.remote.ping_schedule`:: - - Schedule a regular application-level ping message to ensure that transport - connections to nodes belonging to remote clusters are kept alive. Defaults - to `5s`, it can be set to `-1` to disable pings. - `cluster.remote.node.attr`:: A node attribute to filter out nodes that are eligible as a gateway node in @@ -158,6 +152,14 @@ PUT _cluster/settings by default, but they can selectively be made optional by setting this setting to `true`. +`cluster.remote.${cluster_alias}.transport.ping_schedule`:: + + Schedule a regular application-level ping message to ensure that transport + connections to nodes belonging to remote clusters are kept alive. Defaults + to the value that the global `transport.ping_schedule` setting is set to + (which defaults to `-1`, meaning pings are disabled). + + [float] [[retrieve-remote-clusters-info]] === Retrieving remote clusters info diff --git a/docs/reference/modules/transport.asciidoc b/docs/reference/modules/transport.asciidoc index 757bc9eb10db6..c1bc83230e597 100644 --- a/docs/reference/modules/transport.asciidoc +++ b/docs/reference/modules/transport.asciidoc @@ -46,10 +46,10 @@ between all nodes. Defaults to `false`. |`transport.ping_schedule` | Schedule a regular application-level ping message to ensure that transport connections between nodes are kept alive. Defaults to -`5s` in the transport client and connections to remote clusters, and `-1` -(disabled) elsewhere. It is preferable to correctly configure TCP keep-alives -instead of using this feature, because TCP keep-alives apply to all kinds of -long-lived connections and not just to transport connections. +`5s` in the transport client and `-1` (disabled) elsewhere. It is preferable +to correctly configure TCP keep-alives instead of using this feature, because +TCP keep-alives apply to all kinds of long-lived connections and not just to +transport connections. |======================================================================= diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 4966d4497f27b..f296f88bb8de9 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -292,7 +292,7 @@ public void apply(Settings value, Settings current, Settings previous) { RemoteClusterService.SEARCH_REMOTE_NODE_ATTRIBUTE, RemoteClusterService.ENABLE_REMOTE_CLUSTERS, RemoteClusterService.SEARCH_ENABLE_REMOTE_CLUSTERS, - RemoteClusterService.REMOTE_PING_SCHEDULE, + RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE, TransportService.TRACE_LOG_EXCLUDE_SETTING, TransportService.TRACE_LOG_INCLUDE_SETTING, TransportCloseIndexAction.CLUSTER_INDICES_CLOSE_ENABLE_SETTING, diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index 639b3b6c5e3fc..08f08207eae39 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -60,6 +60,7 @@ import java.util.stream.Stream; import static org.elasticsearch.common.settings.Setting.boolSetting; +import static org.elasticsearch.common.settings.Setting.timeSetting; /** * Basic service for accessing remote clusters via gateway nodes @@ -71,10 +72,6 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl assert Version.CURRENT.major < 8; } - //the default here (5s) differs from the default in TcpTransport.PING_SCHEDULE (which is -1, hence disabled) - public static final Setting REMOTE_PING_SCHEDULE = - Setting.timeSetting("cluster.remote.ping_schedule", TimeValue.timeValueSeconds(5), Setting.Property.NodeScope); - public static final Setting SEARCH_REMOTE_CONNECTIONS_PER_CLUSTER = Setting.intSetting("search.remote.connections_per_cluster", 3, 1, Setting.Property.NodeScope, Setting.Property.Deprecated); @@ -170,6 +167,12 @@ public String getKey(final String key) { Setting.Property.NodeScope), REMOTE_CLUSTERS_SEEDS); + public static final Setting.AffixSetting REMOTE_CLUSTER_PING_SCHEDULE = Setting.affixKeySetting( + "cluster.remote.", + "transport.ping_schedule", + key -> timeSetting(key, TcpTransport.PING_SCHEDULE, Setting.Property.NodeScope), + REMOTE_CLUSTERS_SEEDS); + private static final Predicate DEFAULT_NODE_PREDICATE = (node) -> Version.CURRENT.isCompatible(node.getVersion()) && (node.isMasterNode() == false || node.isDataNode() || node.isIngestNode()); @@ -215,11 +218,13 @@ private synchronized void updateRemoteClusters(Map knownNodes = new CopyOnWriteArrayList<>(); - try (MockTransportService seedTransport = startTransport("cluster_1_node", knownNodes, Version.CURRENT)) { + try (MockTransportService seedTransport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); + MockTransportService otherSeedTransport = startTransport("cluster_2_node", knownNodes, Version.CURRENT)) { DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); + DiscoveryNode otherSeedNode = otherSeedTransport.getLocalDiscoNode(); knownNodes.add(seedTransport.getLocalDiscoNode()); + knownNodes.add(otherSeedTransport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); try (MockTransportService transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { transportService.start(); transportService.acceptIncomingRequests(); Settings.Builder builder = Settings.builder(); - TimeValue pingSchedule = randomBoolean() ? TimeValue.MINUS_ONE : TimeValue.timeValueSeconds(randomIntBetween(1, 10)); - builder.put("cluster.remote.ping_schedule", pingSchedule); + builder.putList("cluster.remote.cluster_1.seeds", seedNode.getAddress().toString()); + builder.putList("cluster.remote.cluster_2.seeds", otherSeedNode.getAddress().toString()); + TimeValue clusterOnePingSchedule = TimeValue.timeValueSeconds(randomIntBetween(1, 10)); + builder.put("cluster.remote.cluster_1.transport.ping_schedule", clusterOnePingSchedule); + TimeValue clusterTwoPingSchedule = TimeValue.timeValueSeconds(randomIntBetween(1, 10)); + builder.put("cluster.remote.cluster_2.transport.ping_schedule", clusterTwoPingSchedule); try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) { assertFalse(service.isCrossClusterSearchEnabled()); service.initializeRemoteClusters(); - assertFalse(service.isCrossClusterSearchEnabled()); + assertTrue(service.isCrossClusterSearchEnabled()); service.updateRemoteCluster("cluster_1", Collections.singletonList(seedNode.getAddress().toString()), null); assertTrue(service.isCrossClusterSearchEnabled()); assertTrue(service.isRemoteClusterRegistered("cluster_1")); - RemoteClusterConnection remoteClusterConnection = service.getRemoteClusterConnection("cluster_1"); - assertEquals(pingSchedule, remoteClusterConnection.getConnectionManager().getPingSchedule()); + RemoteClusterConnection remoteClusterConnection1 = service.getRemoteClusterConnection("cluster_1"); + assertEquals(clusterOnePingSchedule, remoteClusterConnection1.getConnectionManager().getPingSchedule()); + RemoteClusterConnection remoteClusterConnection2 = service.getRemoteClusterConnection("cluster_2"); + assertEquals(clusterTwoPingSchedule, remoteClusterConnection2.getConnectionManager().getPingSchedule()); } } } From fae393ad1c72a100dbb1a22deb585680511011f9 Mon Sep 17 00:00:00 2001 From: javanna Date: Wed, 24 Oct 2018 17:06:25 +0200 Subject: [PATCH 06/12] make method package private --- .../org/elasticsearch/transport/ConnectionManager.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java b/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java index 52bdd0e49177f..a33ef30989cec 100644 --- a/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java +++ b/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java @@ -251,6 +251,10 @@ private void ensureOpen() { } } + TimeValue getPingSchedule() { + return pingSchedule; + } + private class ScheduledPing extends AbstractLifecycleRunnable { private ScheduledPing() { @@ -290,10 +294,6 @@ public void onFailure(Exception e) { } } - public TimeValue getPingSchedule() { - return pingSchedule; - } - private static final class DelegatingNodeConnectionListener implements TransportConnectionListener { private final CopyOnWriteArrayList listeners = new CopyOnWriteArrayList<>(); From dafcabfc6c5a246ae3bfed01f725d03d957bef48 Mon Sep 17 00:00:00 2001 From: javanna Date: Thu, 25 Oct 2018 19:42:42 +0200 Subject: [PATCH 07/12] address comments --- .../modules/remote-clusters.asciidoc | 11 ++++++----- .../transport/RemoteClusterServiceTests.java | 19 ++++++++++++------- 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/docs/reference/modules/remote-clusters.asciidoc b/docs/reference/modules/remote-clusters.asciidoc index c807cfb3e3c72..4bf3073abd359 100644 --- a/docs/reference/modules/remote-clusters.asciidoc +++ b/docs/reference/modules/remote-clusters.asciidoc @@ -154,11 +154,12 @@ PUT _cluster/settings `cluster.remote.${cluster_alias}.transport.ping_schedule`:: - Schedule a regular application-level ping message to ensure that transport - connections to nodes belonging to remote clusters are kept alive. Defaults - to the value that the global `transport.ping_schedule` setting is set to - (which defaults to `-1`, meaning pings are disabled). - + Sets the time interval between regular application-level ping messages that + are sent to ensure that transport connections to nodes belonging to remote + clusters are kept alive. If set to `-1`, application-level ping messages to + this remote cluster are not sent. If unset, application-level ping messages + are sent according to the global `transport.ping_schedule` setting, which + defaults to ``-1` meaning that pings are not sent. [float] [[retrieve-remote-clusters-info]] diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index 32d276a5489de..ca52204403d00 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -381,18 +381,23 @@ public void testCustomPingSchedule() throws IOException { knownNodes.add(seedTransport.getLocalDiscoNode()); knownNodes.add(otherSeedTransport.getLocalDiscoNode()); Collections.shuffle(knownNodes, random()); + Settings.Builder settingsBuilder = Settings.builder(); + if (randomBoolean()) { + settingsBuilder.put(TcpTransport.PING_SCHEDULE.getKey(), TimeValue.timeValueSeconds(randomIntBetween(1, 10))); + } + Settings transportSettings = settingsBuilder.build(); - try (MockTransportService transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, + try (MockTransportService transportService = MockTransportService.createNewService(transportSettings, Version.CURRENT, threadPool, null)) { transportService.start(); transportService.acceptIncomingRequests(); Settings.Builder builder = Settings.builder(); builder.putList("cluster.remote.cluster_1.seeds", seedNode.getAddress().toString()); builder.putList("cluster.remote.cluster_2.seeds", otherSeedNode.getAddress().toString()); - TimeValue clusterOnePingSchedule = TimeValue.timeValueSeconds(randomIntBetween(1, 10)); - builder.put("cluster.remote.cluster_1.transport.ping_schedule", clusterOnePingSchedule); - TimeValue clusterTwoPingSchedule = TimeValue.timeValueSeconds(randomIntBetween(1, 10)); - builder.put("cluster.remote.cluster_2.transport.ping_schedule", clusterTwoPingSchedule); + TimeValue pingSchedule1 = randomBoolean() ? TimeValue.MINUS_ONE : TimeValue.timeValueSeconds(randomIntBetween(1, 10)); + builder.put("cluster.remote.cluster_1.transport.ping_schedule", pingSchedule1); + TimeValue pingSchedule2 = randomBoolean() ? TimeValue.MINUS_ONE : TimeValue.timeValueSeconds(randomIntBetween(1, 10)); + builder.put("cluster.remote.cluster_2.transport.ping_schedule", pingSchedule2); try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) { assertFalse(service.isCrossClusterSearchEnabled()); service.initializeRemoteClusters(); @@ -401,9 +406,9 @@ public void testCustomPingSchedule() throws IOException { assertTrue(service.isCrossClusterSearchEnabled()); assertTrue(service.isRemoteClusterRegistered("cluster_1")); RemoteClusterConnection remoteClusterConnection1 = service.getRemoteClusterConnection("cluster_1"); - assertEquals(clusterOnePingSchedule, remoteClusterConnection1.getConnectionManager().getPingSchedule()); + assertEquals(pingSchedule1, remoteClusterConnection1.getConnectionManager().getPingSchedule()); RemoteClusterConnection remoteClusterConnection2 = service.getRemoteClusterConnection("cluster_2"); - assertEquals(clusterTwoPingSchedule, remoteClusterConnection2.getConnectionManager().getPingSchedule()); + assertEquals(pingSchedule2, remoteClusterConnection2.getConnectionManager().getPingSchedule()); } } } From 74af6d73dc1d3770a529683b98bb0758b557ee3a Mon Sep 17 00:00:00 2001 From: javanna Date: Thu, 25 Oct 2018 20:22:39 +0200 Subject: [PATCH 08/12] checkstyle --- .../elasticsearch/transport/RemoteClusterServiceTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index ca52204403d00..b120bd0640c82 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -387,8 +387,8 @@ public void testCustomPingSchedule() throws IOException { } Settings transportSettings = settingsBuilder.build(); - try (MockTransportService transportService = MockTransportService.createNewService(transportSettings, Version.CURRENT, threadPool, - null)) { + try (MockTransportService transportService = MockTransportService.createNewService(transportSettings, Version.CURRENT, + threadPool, null)) { transportService.start(); transportService.acceptIncomingRequests(); Settings.Builder builder = Settings.builder(); From fd43e39123490abf7707337ae21d4d8526c19210 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 30 Oct 2018 11:51:24 +0100 Subject: [PATCH 09/12] Update server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java Co-Authored-By: javanna --- .../org/elasticsearch/transport/RemoteClusterServiceTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index b120bd0640c82..be0e58ae3fac0 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -343,7 +343,7 @@ public void testDefaultPingSchedule() throws IOException { List knownNodes = new CopyOnWriteArrayList<>(); try (MockTransportService seedTransport = startTransport("cluster_1_node", knownNodes, Version.CURRENT)) { DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); - knownNodes.add(seedTransport.getLocalDiscoNode()); + knownNodes.add(seedNode); TimeValue pingSchedule; Settings.Builder settingsBuilder = Settings.builder(); settingsBuilder.putList("cluster.remote.cluster_1.seeds", seedNode.getAddress().toString()); From 8550bad4444c51c6ae65e41eb82b6a9dc16dab9c Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 30 Oct 2018 11:51:44 +0100 Subject: [PATCH 10/12] Update server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java Co-Authored-By: javanna --- .../org/elasticsearch/transport/RemoteClusterServiceTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index be0e58ae3fac0..77653bdd606ee 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -378,7 +378,7 @@ public void testCustomPingSchedule() throws IOException { MockTransportService otherSeedTransport = startTransport("cluster_2_node", knownNodes, Version.CURRENT)) { DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); DiscoveryNode otherSeedNode = otherSeedTransport.getLocalDiscoNode(); - knownNodes.add(seedTransport.getLocalDiscoNode()); + knownNodes.add(seedNode); knownNodes.add(otherSeedTransport.getLocalDiscoNode()); Collections.shuffle(knownNodes, random()); Settings.Builder settingsBuilder = Settings.builder(); From a0ca97c9fc92209ca3751a4ec13804b88064edd3 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 30 Oct 2018 11:52:10 +0100 Subject: [PATCH 11/12] Update server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java Co-Authored-By: javanna --- .../org/elasticsearch/transport/RemoteClusterServiceTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index 77653bdd606ee..dd42f2f0f7f19 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -379,7 +379,7 @@ public void testCustomPingSchedule() throws IOException { DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); DiscoveryNode otherSeedNode = otherSeedTransport.getLocalDiscoNode(); knownNodes.add(seedNode); - knownNodes.add(otherSeedTransport.getLocalDiscoNode()); + knownNodes.add(otherSeedNode); Collections.shuffle(knownNodes, random()); Settings.Builder settingsBuilder = Settings.builder(); if (randomBoolean()) { From 3e984acfdbfdf8efe737c423023454fa39729e1b Mon Sep 17 00:00:00 2001 From: javanna Date: Tue, 30 Oct 2018 11:57:01 +0100 Subject: [PATCH 12/12] address review comments --- .../transport/RemoteClusterServiceTests.java | 72 +++++++++---------- 1 file changed, 36 insertions(+), 36 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index dd42f2f0f7f19..e2a0827c14d41 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -196,12 +196,12 @@ public void testBuildRemoteClustersDynamicConfigWithDuplicates() { public void testGroupClusterIndices() throws IOException { List knownNodes = new CopyOnWriteArrayList<>(); - try (MockTransportService seedTransport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); - MockTransportService otherSeedTransport = startTransport("cluster_2_node", knownNodes, Version.CURRENT)) { - DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); - DiscoveryNode otherSeedNode = otherSeedTransport.getLocalDiscoNode(); - knownNodes.add(seedTransport.getLocalDiscoNode()); - knownNodes.add(otherSeedTransport.getLocalDiscoNode()); + try (MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); + MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT)) { + DiscoveryNode cluster1Seed = cluster1Transport.getLocalDiscoNode(); + DiscoveryNode cluster2Seed = cluster2Transport.getLocalDiscoNode(); + knownNodes.add(cluster1Transport.getLocalDiscoNode()); + knownNodes.add(cluster2Transport.getLocalDiscoNode()); Collections.shuffle(knownNodes, random()); try (MockTransportService transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, @@ -209,8 +209,8 @@ public void testGroupClusterIndices() throws IOException { transportService.start(); transportService.acceptIncomingRequests(); Settings.Builder builder = Settings.builder(); - builder.putList("cluster.remote.cluster_1.seeds", seedNode.getAddress().toString()); - builder.putList("cluster.remote.cluster_2.seeds", otherSeedNode.getAddress().toString()); + builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); + builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString()); try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) { assertFalse(service.isCrossClusterSearchEnabled()); service.initializeRemoteClusters(); @@ -241,12 +241,12 @@ public void testGroupClusterIndices() throws IOException { public void testGroupIndices() throws IOException { List knownNodes = new CopyOnWriteArrayList<>(); - try (MockTransportService seedTransport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); - MockTransportService otherSeedTransport = startTransport("cluster_2_node", knownNodes, Version.CURRENT)) { - DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); - DiscoveryNode otherSeedNode = otherSeedTransport.getLocalDiscoNode(); - knownNodes.add(seedTransport.getLocalDiscoNode()); - knownNodes.add(otherSeedTransport.getLocalDiscoNode()); + try (MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); + MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT)) { + DiscoveryNode cluster1Seed = cluster1Transport.getLocalDiscoNode(); + DiscoveryNode cluster2Seed = cluster2Transport.getLocalDiscoNode(); + knownNodes.add(cluster1Transport.getLocalDiscoNode()); + knownNodes.add(cluster2Transport.getLocalDiscoNode()); Collections.shuffle(knownNodes, random()); try (MockTransportService transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, @@ -254,8 +254,8 @@ public void testGroupIndices() throws IOException { transportService.start(); transportService.acceptIncomingRequests(); Settings.Builder builder = Settings.builder(); - builder.putList("cluster.remote.cluster_1.seeds", seedNode.getAddress().toString()); - builder.putList("cluster.remote.cluster_2.seeds", otherSeedNode.getAddress().toString()); + builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); + builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString()); try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) { assertFalse(service.isCrossClusterSearchEnabled()); service.initializeRemoteClusters(); @@ -303,12 +303,12 @@ public void testGroupIndices() throws IOException { public void testIncrementallyAddClusters() throws IOException { List knownNodes = new CopyOnWriteArrayList<>(); - try (MockTransportService seedTransport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); - MockTransportService otherSeedTransport = startTransport("cluster_2_node", knownNodes, Version.CURRENT)) { - DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); - DiscoveryNode otherSeedNode = otherSeedTransport.getLocalDiscoNode(); - knownNodes.add(seedTransport.getLocalDiscoNode()); - knownNodes.add(otherSeedTransport.getLocalDiscoNode()); + try (MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); + MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT)) { + DiscoveryNode cluster1Seed = cluster1Transport.getLocalDiscoNode(); + DiscoveryNode cluster2Seed = cluster2Transport.getLocalDiscoNode(); + knownNodes.add(cluster1Transport.getLocalDiscoNode()); + knownNodes.add(cluster2Transport.getLocalDiscoNode()); Collections.shuffle(knownNodes, random()); try (MockTransportService transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, @@ -316,16 +316,16 @@ public void testIncrementallyAddClusters() throws IOException { transportService.start(); transportService.acceptIncomingRequests(); Settings.Builder builder = Settings.builder(); - builder.putList("cluster.remote.cluster_1.seeds", seedNode.getAddress().toString()); - builder.putList("cluster.remote.cluster_2.seeds", otherSeedNode.getAddress().toString()); + builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); + builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString()); try (RemoteClusterService service = new RemoteClusterService(Settings.EMPTY, transportService)) { assertFalse(service.isCrossClusterSearchEnabled()); service.initializeRemoteClusters(); assertFalse(service.isCrossClusterSearchEnabled()); - service.updateRemoteCluster("cluster_1", Collections.singletonList(seedNode.getAddress().toString()), null); + service.updateRemoteCluster("cluster_1", Collections.singletonList(cluster1Seed.getAddress().toString()), null); assertTrue(service.isCrossClusterSearchEnabled()); assertTrue(service.isRemoteClusterRegistered("cluster_1")); - service.updateRemoteCluster("cluster_2", Collections.singletonList(otherSeedNode.getAddress().toString()), null); + service.updateRemoteCluster("cluster_2", Collections.singletonList(cluster2Seed.getAddress().toString()), null); assertTrue(service.isCrossClusterSearchEnabled()); assertTrue(service.isRemoteClusterRegistered("cluster_1")); assertTrue(service.isRemoteClusterRegistered("cluster_2")); @@ -343,7 +343,7 @@ public void testDefaultPingSchedule() throws IOException { List knownNodes = new CopyOnWriteArrayList<>(); try (MockTransportService seedTransport = startTransport("cluster_1_node", knownNodes, Version.CURRENT)) { DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); - knownNodes.add(seedNode); + knownNodes.add(seedTransport.getLocalDiscoNode()); TimeValue pingSchedule; Settings.Builder settingsBuilder = Settings.builder(); settingsBuilder.putList("cluster.remote.cluster_1.seeds", seedNode.getAddress().toString()); @@ -374,12 +374,12 @@ public void testDefaultPingSchedule() throws IOException { public void testCustomPingSchedule() throws IOException { List knownNodes = new CopyOnWriteArrayList<>(); - try (MockTransportService seedTransport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); - MockTransportService otherSeedTransport = startTransport("cluster_2_node", knownNodes, Version.CURRENT)) { - DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); - DiscoveryNode otherSeedNode = otherSeedTransport.getLocalDiscoNode(); - knownNodes.add(seedNode); - knownNodes.add(otherSeedNode); + try (MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); + MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT)) { + DiscoveryNode cluster1Seed = cluster1Transport.getLocalDiscoNode(); + DiscoveryNode cluster2Seed = cluster2Transport.getLocalDiscoNode(); + knownNodes.add(cluster1Transport.getLocalDiscoNode()); + knownNodes.add(cluster2Transport.getLocalDiscoNode()); Collections.shuffle(knownNodes, random()); Settings.Builder settingsBuilder = Settings.builder(); if (randomBoolean()) { @@ -392,8 +392,8 @@ public void testCustomPingSchedule() throws IOException { transportService.start(); transportService.acceptIncomingRequests(); Settings.Builder builder = Settings.builder(); - builder.putList("cluster.remote.cluster_1.seeds", seedNode.getAddress().toString()); - builder.putList("cluster.remote.cluster_2.seeds", otherSeedNode.getAddress().toString()); + builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); + builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString()); TimeValue pingSchedule1 = randomBoolean() ? TimeValue.MINUS_ONE : TimeValue.timeValueSeconds(randomIntBetween(1, 10)); builder.put("cluster.remote.cluster_1.transport.ping_schedule", pingSchedule1); TimeValue pingSchedule2 = randomBoolean() ? TimeValue.MINUS_ONE : TimeValue.timeValueSeconds(randomIntBetween(1, 10)); @@ -402,7 +402,7 @@ public void testCustomPingSchedule() throws IOException { assertFalse(service.isCrossClusterSearchEnabled()); service.initializeRemoteClusters(); assertTrue(service.isCrossClusterSearchEnabled()); - service.updateRemoteCluster("cluster_1", Collections.singletonList(seedNode.getAddress().toString()), null); + service.updateRemoteCluster("cluster_1", Collections.singletonList(cluster1Seed.getAddress().toString()), null); assertTrue(service.isCrossClusterSearchEnabled()); assertTrue(service.isRemoteClusterRegistered("cluster_1")); RemoteClusterConnection remoteClusterConnection1 = service.getRemoteClusterConnection("cluster_1");