diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index 82b921bd233b0..e37f46c5517db 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -87,6 +87,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo private volatile boolean skipUnavailable; private final ConnectHandler connectHandler; private SetOnce remoteClusterName = new SetOnce<>(); + private final ClusterName localClusterName; /** * Creates a new {@link RemoteClusterConnection} @@ -100,6 +101,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo RemoteClusterConnection(Settings settings, String clusterAlias, List seedNodes, TransportService transportService, int maxNumRemoteConnections, Predicate nodePredicate) { super(settings); + this.localClusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings); this.transportService = transportService; this.maxNumRemoteConnections = maxNumRemoteConnections; this.nodePredicate = nodePredicate; @@ -310,6 +312,21 @@ public boolean isClosed() { return connectHandler.isClosed(); } + private ConnectionProfile getRemoteProfile(ClusterName name) { + // we can only compare the cluster name to make a decision if we should use a remote profile + // we can't use a cluster UUID here since we could be connecting to that remote cluster before + // the remote node has joined its cluster and have a cluster UUID. The fact that we just lose a + // rather smallish optimization on the connection layer under certain situations where remote clusters + // have the same name as the local one is minor here. + // the alternative here is to complicate the remote infrastructure to also wait until we formed a cluster, + // gained a cluster UUID and then start connecting etc. we rather use this simplification in order to maintain simplicity + if (this.localClusterName.equals(name)) { + return null; + } else { + return remoteProfile; + } + } + /** * The connect handler manages node discovery and the actual connect to the remote cluster. * There is at most one connect job running at any time. If such a connect job is triggered @@ -419,7 +436,6 @@ protected void doRun() { collectRemoteNodes(seedNodes.iterator(), transportService, listener); } }); - } void collectRemoteNodes(Iterator seedNodes, @@ -431,21 +447,27 @@ void collectRemoteNodes(Iterator seedNodes, if (seedNodes.hasNext()) { cancellableThreads.executeIO(() -> { final DiscoveryNode seedNode = seedNodes.next(); - final DiscoveryNode handshakeNode; + final TransportService.HandshakeResponse handshakeResponse; Transport.Connection connection = transportService.openConnection(seedNode, ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, null, null)); boolean success = false; try { try { - handshakeNode = transportService.handshake(connection, remoteProfile.getHandshakeTimeout().millis(), + handshakeResponse = transportService.handshake(connection, remoteProfile.getHandshakeTimeout().millis(), (c) -> remoteClusterName.get() == null ? true : c.equals(remoteClusterName.get())); } catch (IllegalStateException ex) { logger.warn(() -> new ParameterizedMessage("seed node {} cluster name mismatch expected " + "cluster name {}", connection.getNode(), remoteClusterName.get()), ex); throw ex; } + + final DiscoveryNode handshakeNode = handshakeResponse.getDiscoveryNode(); if (nodePredicate.test(handshakeNode) && connectedNodes.size() < maxNumRemoteConnections) { - transportService.connectToNode(handshakeNode, remoteProfile); + transportService.connectToNode(handshakeNode, getRemoteProfile(handshakeResponse.getClusterName())); + if (remoteClusterName.get() == null) { + assert handshakeResponse.getClusterName().value() != null; + remoteClusterName.set(handshakeResponse.getClusterName()); + } connectedNodes.add(handshakeNode); } ClusterStateRequest request = new ClusterStateRequest(); @@ -552,7 +574,8 @@ public void handleResponse(ClusterStateResponse response) { for (DiscoveryNode node : nodesIter) { if (nodePredicate.test(node) && connectedNodes.size() < maxNumRemoteConnections) { try { - transportService.connectToNode(node, remoteProfile); // noop if node is connected + transportService.connectToNode(node, getRemoteProfile(remoteClusterName.get())); // noop if node is + // connected connectedNodes.add(node); } catch (ConnectTransportException | IllegalStateException ex) { // ISE if we fail the handshake with an version incompatible node diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index 9755898be5fef..656d8c3841769 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -342,7 +342,7 @@ public void connectToNode(final DiscoveryNode node, ConnectionProfile connection } transport.connectToNode(node, connectionProfile, (newConnection, actualProfile) -> { // We don't validate cluster names to allow for CCS connections. - final DiscoveryNode remote = handshake(newConnection, actualProfile.getHandshakeTimeout().millis(), cn -> true); + final DiscoveryNode remote = handshake(newConnection, actualProfile.getHandshakeTimeout().millis(), cn -> true).discoveryNode; if (validateConnections && node.equals(remote) == false) { throw new ConnectTransportException(node, "handshake failed. unexpected remote node " + remote); } @@ -378,7 +378,7 @@ public Transport.Connection openConnection(final DiscoveryNode node, ConnectionP public DiscoveryNode handshake( final Transport.Connection connection, final long handshakeTimeout) throws ConnectTransportException { - return handshake(connection, handshakeTimeout, clusterName::equals); + return handshake(connection, handshakeTimeout, clusterName::equals).discoveryNode; } /** @@ -390,11 +390,11 @@ public DiscoveryNode handshake( * @param connection the connection to a specific node * @param handshakeTimeout handshake timeout * @param clusterNamePredicate cluster name validation predicate - * @return the connected node + * @return the handshake response * @throws ConnectTransportException if the connection failed * @throws IllegalStateException if the handshake failed */ - public DiscoveryNode handshake( + public HandshakeResponse handshake( final Transport.Connection connection, final long handshakeTimeout, Predicate clusterNamePredicate) throws ConnectTransportException { final HandshakeResponse response; @@ -420,7 +420,7 @@ public HandshakeResponse newInstance() { throw new IllegalStateException("handshake failed, incompatible version [" + response.version + "] - " + node); } - return response.discoveryNode; + return response; } static class HandshakeRequest extends TransportRequest { @@ -461,6 +461,14 @@ public void writeTo(StreamOutput out) throws IOException { clusterName.writeTo(out); Version.writeVersion(version, out); } + + public DiscoveryNode getDiscoveryNode() { + return discoveryNode; + } + + public ClusterName getClusterName() { + return clusterName; + } } public void disconnectFromNode(DiscoveryNode node) { diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index ac6f99351e46d..637b8fb26a880 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -142,6 +142,102 @@ public static MockTransportService startTransport( } } + public void testLocalProfileIsUsedForLocalCluster() throws Exception { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT); + MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) { + DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); + DiscoveryNode discoverableNode = discoverableTransport.getLocalDiscoNode(); + knownNodes.add(seedTransport.getLocalDiscoNode()); + knownNodes.add(discoverableTransport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + + try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { + service.start(); + service.acceptIncomingRequests(); + try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", + Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) { + updateSeedNodes(connection, Arrays.asList(seedNode)); + assertTrue(service.nodeConnected(seedNode)); + assertTrue(service.nodeConnected(discoverableNode)); + assertTrue(connection.assertNoRunningConnections()); + PlainTransportFuture futureHandler = new PlainTransportFuture<>( + new FutureTransportResponseHandler() { + @Override + public ClusterSearchShardsResponse read(StreamInput in) throws IOException { + ClusterSearchShardsResponse inst = new ClusterSearchShardsResponse(); + inst.readFrom(in); + return inst; + } + }); + TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.BULK) + .build(); + service.sendRequest(connection.getConnection(), ClusterSearchShardsAction.NAME, new ClusterSearchShardsRequest(), + options, futureHandler); + futureHandler.txGet(); + } + } + } + } + + public void testRemoteProfileIsUsedForRemoteCluster() throws Exception { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT, threadPool, + Settings.builder().put("cluster.name", "foobar").build()); + MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT, + threadPool, Settings.builder().put("cluster.name", "foobar").build())) { + DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); + DiscoveryNode discoverableNode = discoverableTransport.getLocalDiscoNode(); + knownNodes.add(seedTransport.getLocalDiscoNode()); + knownNodes.add(discoverableTransport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + + try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { + service.start(); + service.acceptIncomingRequests(); + try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", + Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) { + updateSeedNodes(connection, Arrays.asList(seedNode)); + assertTrue(service.nodeConnected(seedNode)); + assertTrue(service.nodeConnected(discoverableNode)); + assertTrue(connection.assertNoRunningConnections()); + PlainTransportFuture futureHandler = new PlainTransportFuture<>( + new FutureTransportResponseHandler() { + @Override + public ClusterSearchShardsResponse read(StreamInput in) throws IOException { + ClusterSearchShardsResponse inst = new ClusterSearchShardsResponse(); + inst.readFrom(in); + return inst; + } + }); + TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.BULK) + .build(); + IllegalStateException ise = (IllegalStateException) expectThrows(SendRequestTransportException.class, () -> { + service.sendRequest(discoverableNode, + ClusterSearchShardsAction.NAME, new ClusterSearchShardsRequest(), options, futureHandler); + futureHandler.txGet(); + }).getCause(); + assertEquals(ise.getMessage(), "can't select channel size is 0 for types: [RECOVERY, BULK, STATE]"); + + PlainTransportFuture handler = new PlainTransportFuture<>( + new FutureTransportResponseHandler() { + @Override + public ClusterSearchShardsResponse read(StreamInput in) throws IOException { + ClusterSearchShardsResponse inst = new ClusterSearchShardsResponse(); + inst.readFrom(in); + return inst; + } + }); + TransportRequestOptions ops = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.REG) + .build(); + service.sendRequest(connection.getConnection(), ClusterSearchShardsAction.NAME, new ClusterSearchShardsRequest(), + ops, handler); + handler.txGet(); + } + } + } + } + public void testDiscoverSingleNode() throws Exception { List knownNodes = new CopyOnWriteArrayList<>(); try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java index 37bf95d0b153a..8831c46c01136 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java @@ -191,7 +191,22 @@ protected MockChannel initiateChannel(InetSocketAddress address, ActionListener< @Override protected ConnectionProfile resolveConnectionProfile(ConnectionProfile connectionProfile) { ConnectionProfile connectionProfile1 = resolveConnectionProfile(connectionProfile, defaultConnectionProfile); - ConnectionProfile.Builder builder = new ConnectionProfile.Builder(LIGHT_PROFILE); + ConnectionProfile.Builder builder = new ConnectionProfile.Builder(); + Set allTypesWithConnection = new HashSet<>(); + Set allTypesWithoutConnection = new HashSet<>(); + for (ConnectionProfile.ConnectionTypeHandle handle : connectionProfile1.getHandles()) { + Set types = handle.getTypes(); + if (handle.length > 0) { + allTypesWithConnection.addAll(types); + } else { + allTypesWithoutConnection.addAll(types); + } + } + // make sure we maintain at least the types that are supported by this profile even if we only use a single channel for them. + builder.addConnections(1, allTypesWithConnection.toArray(new TransportRequestOptions.Type[0])); + if (allTypesWithoutConnection.isEmpty() == false) { + builder.addConnections(0, allTypesWithoutConnection.toArray(new TransportRequestOptions.Type[0])); + } builder.setHandshakeTimeout(connectionProfile1.getHandshakeTimeout()); builder.setConnectTimeout(connectionProfile1.getConnectTimeout()); return builder.build();