diff --git a/docs/reference/modules/remote-clusters.asciidoc b/docs/reference/modules/remote-clusters.asciidoc index 81d882f5f0eb6..4bf3073abd359 100644 --- a/docs/reference/modules/remote-clusters.asciidoc +++ b/docs/reference/modules/remote-clusters.asciidoc @@ -152,6 +152,15 @@ PUT _cluster/settings by default, but they can selectively be made optional by setting this setting to `true`. +`cluster.remote.${cluster_alias}.transport.ping_schedule`:: + + Sets the time interval between regular application-level ping messages that + are sent to ensure that transport connections to nodes belonging to remote + clusters are kept alive. If set to `-1`, application-level ping messages to + this remote cluster are not sent. If unset, application-level ping messages + are sent according to the global `transport.ping_schedule` setting, which + defaults to ``-1` meaning that pings are not sent. + [float] [[retrieve-remote-clusters-info]] === Retrieving remote clusters info diff --git a/docs/reference/modules/transport.asciidoc b/docs/reference/modules/transport.asciidoc index 257181f70c507..c1bc83230e597 100644 --- a/docs/reference/modules/transport.asciidoc +++ b/docs/reference/modules/transport.asciidoc @@ -46,9 +46,9 @@ between all nodes. Defaults to `false`. |`transport.ping_schedule` | Schedule a regular application-level ping message to ensure that transport connections between nodes are kept alive. Defaults to -`5s` in the transport client and `-1` (disabled) elsewhere. It is preferable to -correctly configure TCP keep-alives instead of using this feature, because TCP -keep-alives apply to all kinds of long-lived connection and not just to +`5s` in the transport client and `-1` (disabled) elsewhere. It is preferable +to correctly configure TCP keep-alives instead of using this feature, because +TCP keep-alives apply to all kinds of long-lived connections and not just to transport connections. |======================================================================= diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 66a4aa65c4480..f72f31772aa38 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -293,6 +293,7 @@ public void apply(Settings value, Settings current, Settings previous) { RemoteClusterService.SEARCH_REMOTE_NODE_ATTRIBUTE, RemoteClusterService.ENABLE_REMOTE_CLUSTERS, RemoteClusterService.SEARCH_ENABLE_REMOTE_CLUSTERS, + RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE, TransportService.TRACE_LOG_EXCLUDE_SETTING, TransportService.TRACE_LOG_INCLUDE_SETTING, TransportCloseIndexAction.CLUSTER_INDICES_CLOSE_ENABLE_SETTING, diff --git a/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java b/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java index 5f2635fac88d9..114bb8c986a07 100644 --- a/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java +++ b/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java @@ -18,8 +18,8 @@ */ package org.elasticsearch.transport; -import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -67,16 +67,15 @@ public class ConnectionManager implements Closeable { private final DelegatingNodeConnectionListener connectionListener = new DelegatingNodeConnectionListener(); public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool) { - this(settings, transport, threadPool, ConnectionProfile.buildDefaultConnectionProfile(settings)); + this(settings, transport, threadPool, TcpTransport.PING_SCHEDULE.get(settings)); } - public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool, ConnectionProfile defaultProfile) { + public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool, TimeValue pingSchedule) { this.transport = transport; this.threadPool = threadPool; - this.pingSchedule = TcpTransport.PING_SCHEDULE.get(settings); - this.defaultProfile = defaultProfile; + this.pingSchedule = pingSchedule; + this.defaultProfile = ConnectionProfile.buildDefaultConnectionProfile(settings); this.lifecycle.moveToStarted(); - if (pingSchedule.millis() > 0) { threadPool.schedule(pingSchedule, ThreadPool.Names.GENERIC, new ScheduledPing()); } @@ -252,6 +251,10 @@ private void ensureOpen() { } } + TimeValue getPingSchedule() { + return pingSchedule; + } + private class ScheduledPing extends AbstractLifecycleRunnable { private ScheduledPing() { diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index 0cfa2abd754ef..4dadc362b80ff 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -18,8 +18,6 @@ */ package org.elasticsearch.transport; -import java.net.InetSocketAddress; -import java.util.function.Supplier; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.util.SetOnce; @@ -48,6 +46,7 @@ import java.io.Closeable; import java.io.IOException; +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -64,6 +63,7 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; +import java.util.function.Supplier; import java.util.stream.Collectors; /** @@ -105,13 +105,8 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo * @param connectionManager the connection manager to use for this remote connection * @param maxNumRemoteConnections the maximum number of connections to the remote cluster * @param nodePredicate a predicate to filter eligible remote nodes to connect to + * @param proxyAddress the proxy address */ - RemoteClusterConnection(Settings settings, String clusterAlias, List> seedNodes, - TransportService transportService, ConnectionManager connectionManager, int maxNumRemoteConnections, - Predicate nodePredicate) { - this(settings, clusterAlias, seedNodes, transportService, connectionManager, maxNumRemoteConnections, nodePredicate, null); - } - RemoteClusterConnection(Settings settings, String clusterAlias, List> seedNodes, TransportService transportService, ConnectionManager connectionManager, int maxNumRemoteConnections, Predicate nodePredicate, String proxyAddress) { @@ -151,7 +146,7 @@ private static DiscoveryNode maybeAddProxyAddress(String proxyAddress, Discovery InetSocketAddress proxyInetAddress = RemoteClusterAware.parseSeedAddress(proxyAddress); return new DiscoveryNode(node.getName(), node.getId(), node.getEphemeralId(), node.getHostName(), node .getHostAddress(), new TransportAddress(proxyInetAddress), node.getAttributes(), node.getRoles(), node.getVersion()); - } + } } /** diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index dc3bd3a353604..08f08207eae39 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -60,6 +60,7 @@ import java.util.stream.Stream; import static org.elasticsearch.common.settings.Setting.boolSetting; +import static org.elasticsearch.common.settings.Setting.timeSetting; /** * Basic service for accessing remote clusters via gateway nodes @@ -166,6 +167,12 @@ public String getKey(final String key) { Setting.Property.NodeScope), REMOTE_CLUSTERS_SEEDS); + public static final Setting.AffixSetting REMOTE_CLUSTER_PING_SCHEDULE = Setting.affixKeySetting( + "cluster.remote.", + "transport.ping_schedule", + key -> timeSetting(key, TcpTransport.PING_SCHEDULE, Setting.Property.NodeScope), + REMOTE_CLUSTERS_SEEDS); + private static final Predicate DEFAULT_NODE_PREDICATE = (node) -> Version.CURRENT.isCompatible(node.getVersion()) && (node.isMasterNode() == false || node.isDataNode() || node.isIngestNode()); @@ -211,10 +218,13 @@ private synchronized void updateRemoteClusters(Map listener) { - RemoteClusterConnection remoteClusterConnection = remoteClusters.get(clusterAlias); - if (remoteClusterConnection == null) { - throw new IllegalArgumentException("no such remote cluster: " + clusterAlias); - } - remoteClusterConnection.ensureConnected(listener); + void ensureConnected(String clusterAlias, ActionListener listener) { + getRemoteClusterConnection(clusterAlias).ensureConnected(listener); } public Transport.Connection getConnection(String cluster) { + return getRemoteClusterConnection(cluster).getConnection(); + } + + RemoteClusterConnection getRemoteClusterConnection(String cluster) { RemoteClusterConnection connection = remoteClusters.get(cluster); if (connection == null) { throw new IllegalArgumentException("no such remote cluster: " + cluster); } - return connection.getConnection(); + return connection; } @Override @@ -386,7 +392,6 @@ synchronized void updateSkipUnavailable(String clusterAlias, Boolean skipUnavail } } - @Override protected void updateRemoteCluster(String clusterAlias, List addresses, String proxyAddress) { updateRemoteCluster(clusterAlias, addresses, proxyAddress, ActionListener.wrap((x) -> {}, (x) -> {})); diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index 6c27680d74162..e6f7b449b43f3 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -18,9 +18,6 @@ */ package org.elasticsearch.transport; -import java.util.HashMap; -import java.util.Map; -import java.util.function.Supplier; import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; @@ -68,7 +65,9 @@ import java.util.Arrays; import java.util.Base64; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; @@ -79,6 +78,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import java.util.function.Supplier; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; @@ -163,7 +163,7 @@ public void testRemoteProfileIsUsedForLocalCluster() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { updateSeedNodes(connection, Arrays.asList(() -> seedNode)); assertTrue(service.nodeConnected(seedNode)); assertTrue(service.nodeConnected(discoverableNode)); @@ -204,7 +204,7 @@ public void testRemoteProfileIsUsedForRemoteCluster() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { updateSeedNodes(connection, Arrays.asList(() -> seedNode)); assertTrue(service.nodeConnected(seedNode)); assertTrue(service.nodeConnected(discoverableNode)); @@ -256,7 +256,7 @@ public void testDiscoverSingleNode() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { updateSeedNodes(connection, Arrays.asList(() -> seedNode)); assertTrue(service.nodeConnected(seedNode)); assertTrue(service.nodeConnected(discoverableNode)); @@ -285,7 +285,7 @@ public void testDiscoverSingleNodeWithIncompatibleSeed() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + seedNodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { updateSeedNodes(connection, seedNodes); assertTrue(service.nodeConnected(seedNode)); assertTrue(service.nodeConnected(discoverableNode)); @@ -312,7 +312,7 @@ public void testNodeDisconnected() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { updateSeedNodes(connection, Arrays.asList(() -> seedNode)); assertTrue(service.nodeConnected(seedNode)); assertTrue(service.nodeConnected(discoverableNode)); @@ -362,7 +362,7 @@ public void testFilterDiscoveredNodes() throws Exception { service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, - n -> n.equals(rejectedNode) == false)) { + n -> n.equals(rejectedNode) == false, null)) { updateSeedNodes(connection, Arrays.asList(() -> seedNode)); if (rejectedNode.equals(seedNode)) { assertFalse(service.nodeConnected(seedNode)); @@ -422,7 +422,7 @@ public void testConnectWithIncompatibleTransports() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { expectThrows(Exception.class, () -> updateSeedNodes(connection, Arrays.asList(() -> seedNode))); assertFalse(service.nodeConnected(seedNode)); assertTrue(connection.assertNoRunningConnections()); @@ -485,7 +485,7 @@ public void close() { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { connection.addConnectedNode(seedNode); for (DiscoveryNode node : knownNodes) { final Transport.Connection transportConnection = connection.getConnection(node); @@ -528,7 +528,7 @@ public void run() { CountDownLatch listenerCalled = new CountDownLatch(1); AtomicReference exceptionReference = new AtomicReference<>(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { ActionListener listener = ActionListener.wrap(x -> { listenerCalled.countDown(); fail("expected exception"); @@ -565,7 +565,7 @@ public void testFetchShards() throws Exception { service.acceptIncomingRequests(); List> nodes = Collections.singletonList(() -> seedNode); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - nodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + nodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { if (randomBoolean()) { updateSeedNodes(connection, nodes); } @@ -605,7 +605,7 @@ public void testFetchShardsThreadContextHeader() throws Exception { service.acceptIncomingRequests(); List> nodes = Collections.singletonList(() -> seedNode); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - nodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + nodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { SearchRequest request = new SearchRequest("test-index"); Thread[] threads = new Thread[10]; for (int i = 0; i < threads.length; i++) { @@ -659,7 +659,8 @@ public void testFetchShardsSkipUnavailable() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Collections.singletonList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + Collections.singletonList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, + n -> true, null)) { SearchRequest request = new SearchRequest("test-index"); ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest("test-index") @@ -769,7 +770,7 @@ public void testTriggerUpdatesConcurrently() throws IOException, InterruptedExce service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + seedNodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { int numThreads = randomIntBetween(4, 10); Thread[] threads = new Thread[numThreads]; CyclicBarrier barrier = new CyclicBarrier(numThreads); @@ -848,7 +849,7 @@ public void testCloseWhileConcurrentlyConnecting() throws IOException, Interrupt service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + seedNodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { int numThreads = randomIntBetween(4, 10); Thread[] threads = new Thread[numThreads]; CyclicBarrier barrier = new CyclicBarrier(numThreads + 1); @@ -937,7 +938,7 @@ public void testGetConnectionInfo() throws Exception { service.acceptIncomingRequests(); int maxNumConnections = randomIntBetween(1, 5); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, service.connectionManager(), maxNumConnections, n -> true)) { + seedNodes, service, service.connectionManager(), maxNumConnections, n -> true, null)) { // test no nodes connected RemoteConnectionInfo remoteConnectionInfo = assertSerialization(connection.getConnectionInfo()); assertNotNull(remoteConnectionInfo); @@ -1084,7 +1085,7 @@ public void testEnsureConnected() throws IOException, InterruptedException { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { assertFalse(service.nodeConnected(seedNode)); assertFalse(service.nodeConnected(discoverableNode)); assertTrue(connection.assertNoRunningConnections()); @@ -1133,7 +1134,7 @@ public void testCollectNodes() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { if (randomBoolean()) { updateSeedNodes(connection, Arrays.asList(() -> seedNode)); } @@ -1181,7 +1182,7 @@ public void testConnectedNodesConcurrentAccess() throws IOException, Interrupted service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + seedNodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { final int numGetThreads = randomIntBetween(4, 10); final Thread[] getThreads = new Thread[numGetThreads]; final int numModifyingThreads = randomIntBetween(4, 10); @@ -1271,7 +1272,7 @@ public void testClusterNameIsChecked() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList( () -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + Arrays.asList( () -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { updateSeedNodes(connection, Arrays.asList(() -> seedNode)); assertTrue(service.nodeConnected(seedNode)); assertTrue(service.nodeConnected(discoverableNode)); @@ -1351,7 +1352,8 @@ public void close() { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Collections.singletonList(() -> connectedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + Collections.singletonList(() -> connectedNode), service, service.getConnectionManager(), + Integer.MAX_VALUE, n -> true, null)) { connection.addConnectedNode(connectedNode); for (int i = 0; i < 10; i++) { //always a direct connection as the remote node is already connected @@ -1393,7 +1395,7 @@ public void testLazyResolveTransportAddress() throws Exception { return seedNode; }; try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(seedSupplier), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { + Arrays.asList(seedSupplier), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, null)) { updateSeedNodes(connection, Arrays.asList(seedSupplier)); // Closing connections leads to RemoteClusterConnection.ConnectHandler.collectRemoteNodes // being called again so we try to resolve the same seed node's host twice diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index 94ac7e963c1da..e2a0827c14d41 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -31,6 +31,7 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESTestCase; @@ -97,6 +98,7 @@ public void testSettingsAreRegistered() { assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER)); assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING)); assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_NODE_ATTRIBUTE)); + assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE)); } public void testRemoteClusterSeedSetting() { @@ -194,12 +196,12 @@ public void testBuildRemoteClustersDynamicConfigWithDuplicates() { public void testGroupClusterIndices() throws IOException { List knownNodes = new CopyOnWriteArrayList<>(); - try (MockTransportService seedTransport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); - MockTransportService otherSeedTransport = startTransport("cluster_2_node", knownNodes, Version.CURRENT)) { - DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); - DiscoveryNode otherSeedNode = otherSeedTransport.getLocalDiscoNode(); - knownNodes.add(seedTransport.getLocalDiscoNode()); - knownNodes.add(otherSeedTransport.getLocalDiscoNode()); + try (MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); + MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT)) { + DiscoveryNode cluster1Seed = cluster1Transport.getLocalDiscoNode(); + DiscoveryNode cluster2Seed = cluster2Transport.getLocalDiscoNode(); + knownNodes.add(cluster1Transport.getLocalDiscoNode()); + knownNodes.add(cluster2Transport.getLocalDiscoNode()); Collections.shuffle(knownNodes, random()); try (MockTransportService transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, @@ -207,8 +209,8 @@ public void testGroupClusterIndices() throws IOException { transportService.start(); transportService.acceptIncomingRequests(); Settings.Builder builder = Settings.builder(); - builder.putList("cluster.remote.cluster_1.seeds", seedNode.getAddress().toString()); - builder.putList("cluster.remote.cluster_2.seeds", otherSeedNode.getAddress().toString()); + builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); + builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString()); try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) { assertFalse(service.isCrossClusterSearchEnabled()); service.initializeRemoteClusters(); @@ -239,12 +241,12 @@ public void testGroupClusterIndices() throws IOException { public void testGroupIndices() throws IOException { List knownNodes = new CopyOnWriteArrayList<>(); - try (MockTransportService seedTransport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); - MockTransportService otherSeedTransport = startTransport("cluster_2_node", knownNodes, Version.CURRENT)) { - DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); - DiscoveryNode otherSeedNode = otherSeedTransport.getLocalDiscoNode(); - knownNodes.add(seedTransport.getLocalDiscoNode()); - knownNodes.add(otherSeedTransport.getLocalDiscoNode()); + try (MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); + MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT)) { + DiscoveryNode cluster1Seed = cluster1Transport.getLocalDiscoNode(); + DiscoveryNode cluster2Seed = cluster2Transport.getLocalDiscoNode(); + knownNodes.add(cluster1Transport.getLocalDiscoNode()); + knownNodes.add(cluster2Transport.getLocalDiscoNode()); Collections.shuffle(knownNodes, random()); try (MockTransportService transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, @@ -252,8 +254,8 @@ public void testGroupIndices() throws IOException { transportService.start(); transportService.acceptIncomingRequests(); Settings.Builder builder = Settings.builder(); - builder.putList("cluster.remote.cluster_1.seeds", seedNode.getAddress().toString()); - builder.putList("cluster.remote.cluster_2.seeds", otherSeedNode.getAddress().toString()); + builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); + builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString()); try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) { assertFalse(service.isCrossClusterSearchEnabled()); service.initializeRemoteClusters(); @@ -301,12 +303,12 @@ public void testGroupIndices() throws IOException { public void testIncrementallyAddClusters() throws IOException { List knownNodes = new CopyOnWriteArrayList<>(); - try (MockTransportService seedTransport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); - MockTransportService otherSeedTransport = startTransport("cluster_2_node", knownNodes, Version.CURRENT)) { - DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); - DiscoveryNode otherSeedNode = otherSeedTransport.getLocalDiscoNode(); - knownNodes.add(seedTransport.getLocalDiscoNode()); - knownNodes.add(otherSeedTransport.getLocalDiscoNode()); + try (MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); + MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT)) { + DiscoveryNode cluster1Seed = cluster1Transport.getLocalDiscoNode(); + DiscoveryNode cluster2Seed = cluster2Transport.getLocalDiscoNode(); + knownNodes.add(cluster1Transport.getLocalDiscoNode()); + knownNodes.add(cluster2Transport.getLocalDiscoNode()); Collections.shuffle(knownNodes, random()); try (MockTransportService transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, @@ -314,16 +316,16 @@ public void testIncrementallyAddClusters() throws IOException { transportService.start(); transportService.acceptIncomingRequests(); Settings.Builder builder = Settings.builder(); - builder.putList("cluster.remote.cluster_1.seeds", seedNode.getAddress().toString()); - builder.putList("cluster.remote.cluster_2.seeds", otherSeedNode.getAddress().toString()); + builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); + builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString()); try (RemoteClusterService service = new RemoteClusterService(Settings.EMPTY, transportService)) { assertFalse(service.isCrossClusterSearchEnabled()); service.initializeRemoteClusters(); assertFalse(service.isCrossClusterSearchEnabled()); - service.updateRemoteCluster("cluster_1", Collections.singletonList(seedNode.getAddress().toString()), null); + service.updateRemoteCluster("cluster_1", Collections.singletonList(cluster1Seed.getAddress().toString()), null); assertTrue(service.isCrossClusterSearchEnabled()); assertTrue(service.isRemoteClusterRegistered("cluster_1")); - service.updateRemoteCluster("cluster_2", Collections.singletonList(otherSeedNode.getAddress().toString()), null); + service.updateRemoteCluster("cluster_2", Collections.singletonList(cluster2Seed.getAddress().toString()), null); assertTrue(service.isCrossClusterSearchEnabled()); assertTrue(service.isRemoteClusterRegistered("cluster_1")); assertTrue(service.isRemoteClusterRegistered("cluster_2")); @@ -337,6 +339,81 @@ public void testIncrementallyAddClusters() throws IOException { } } + public void testDefaultPingSchedule() throws IOException { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService seedTransport = startTransport("cluster_1_node", knownNodes, Version.CURRENT)) { + DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); + knownNodes.add(seedTransport.getLocalDiscoNode()); + TimeValue pingSchedule; + Settings.Builder settingsBuilder = Settings.builder(); + settingsBuilder.putList("cluster.remote.cluster_1.seeds", seedNode.getAddress().toString()); + if (randomBoolean()) { + pingSchedule = TimeValue.timeValueSeconds(randomIntBetween(1, 10)); + settingsBuilder.put(TcpTransport.PING_SCHEDULE.getKey(), pingSchedule).build(); + } else { + pingSchedule = TimeValue.MINUS_ONE; + } + Settings settings = settingsBuilder.build(); + try (MockTransportService transportService = MockTransportService.createNewService(settings, + Version.CURRENT, threadPool, null)) { + transportService.start(); + transportService.acceptIncomingRequests(); + try (RemoteClusterService service = new RemoteClusterService(settings, transportService)) { + assertFalse(service.isCrossClusterSearchEnabled()); + service.initializeRemoteClusters(); + assertTrue(service.isCrossClusterSearchEnabled()); + service.updateRemoteCluster("cluster_1", Collections.singletonList(seedNode.getAddress().toString()), null); + assertTrue(service.isCrossClusterSearchEnabled()); + assertTrue(service.isRemoteClusterRegistered("cluster_1")); + RemoteClusterConnection remoteClusterConnection = service.getRemoteClusterConnection("cluster_1"); + assertEquals(pingSchedule, remoteClusterConnection.getConnectionManager().getPingSchedule()); + } + } + } + } + + public void testCustomPingSchedule() throws IOException { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); + MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT)) { + DiscoveryNode cluster1Seed = cluster1Transport.getLocalDiscoNode(); + DiscoveryNode cluster2Seed = cluster2Transport.getLocalDiscoNode(); + knownNodes.add(cluster1Transport.getLocalDiscoNode()); + knownNodes.add(cluster2Transport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + Settings.Builder settingsBuilder = Settings.builder(); + if (randomBoolean()) { + settingsBuilder.put(TcpTransport.PING_SCHEDULE.getKey(), TimeValue.timeValueSeconds(randomIntBetween(1, 10))); + } + Settings transportSettings = settingsBuilder.build(); + + try (MockTransportService transportService = MockTransportService.createNewService(transportSettings, Version.CURRENT, + threadPool, null)) { + transportService.start(); + transportService.acceptIncomingRequests(); + Settings.Builder builder = Settings.builder(); + builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); + builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString()); + TimeValue pingSchedule1 = randomBoolean() ? TimeValue.MINUS_ONE : TimeValue.timeValueSeconds(randomIntBetween(1, 10)); + builder.put("cluster.remote.cluster_1.transport.ping_schedule", pingSchedule1); + TimeValue pingSchedule2 = randomBoolean() ? TimeValue.MINUS_ONE : TimeValue.timeValueSeconds(randomIntBetween(1, 10)); + builder.put("cluster.remote.cluster_2.transport.ping_schedule", pingSchedule2); + try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) { + assertFalse(service.isCrossClusterSearchEnabled()); + service.initializeRemoteClusters(); + assertTrue(service.isCrossClusterSearchEnabled()); + service.updateRemoteCluster("cluster_1", Collections.singletonList(cluster1Seed.getAddress().toString()), null); + assertTrue(service.isCrossClusterSearchEnabled()); + assertTrue(service.isRemoteClusterRegistered("cluster_1")); + RemoteClusterConnection remoteClusterConnection1 = service.getRemoteClusterConnection("cluster_1"); + assertEquals(pingSchedule1, remoteClusterConnection1.getConnectionManager().getPingSchedule()); + RemoteClusterConnection remoteClusterConnection2 = service.getRemoteClusterConnection("cluster_2"); + assertEquals(pingSchedule2, remoteClusterConnection2.getConnectionManager().getPingSchedule()); + } + } + } + } + public void testRemoteNodeAttribute() throws IOException, InterruptedException { final Settings settings = Settings.builder().put("cluster.remote.node.attr", "gateway").build();