Skip to content

Commit

Permalink
Remove Connection.getVersion (#99817)
Browse files Browse the repository at this point in the history
Migrate all remaining calls to use getNode().getVersion() instead
  • Loading branch information
thecoop committed Nov 23, 2023
1 parent 27cddf2 commit b18b5cb
Show file tree
Hide file tree
Showing 8 changed files with 17 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ private boolean checkMinimumVersion(GroupShardsIterator<SearchShardIterator> sha
if (it.getTargetNodeIds().isEmpty() == false) {
boolean isCompatible = it.getTargetNodeIds().stream().anyMatch(nodeId -> {
Transport.Connection conn = getConnection(it.getClusterAlias(), nodeId);
return conn == null ? true : conn.getVersion().onOrAfter(request.minCompatibleShardNode());
return conn == null || conn.getNode().getVersion().onOrAfter(request.minCompatibleShardNode());
});
if (isCompatible == false) {
return false;
Expand Down Expand Up @@ -745,7 +745,7 @@ final void onPhaseDone() { // as a tribute to @kimchy aka. finishHim()
public final Transport.Connection getConnection(String clusterAlias, String nodeId) {
Transport.Connection conn = nodeIdToConnection.apply(clusterAlias, nodeId);
Version minVersion = request.minCompatibleShardNode();
if (minVersion != null && conn != null && conn.getVersion().before(minVersion)) {
if (minVersion != null && conn != null && conn.getNode().getVersion().before(minVersion)) {
throw new VersionMismatchException("One of the shards is incompatible with the required minimum version [{}]", minVersion);
}
return conn;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ private boolean checkMinimumVersion(GroupShardsIterator<SearchShardIterator> sha
if (it.getTargetNodeIds().isEmpty() == false) {
boolean isCompatible = it.getTargetNodeIds().stream().anyMatch(nodeId -> {
Transport.Connection conn = getConnection(new SendingTarget(it.getClusterAlias(), nodeId));
return conn == null || conn.getVersion().onOrAfter(request.minCompatibleShardNode());
return conn == null || conn.getNode().getVersion().onOrAfter(request.minCompatibleShardNode());
});
if (isCompatible == false) {
return false;
Expand Down Expand Up @@ -419,7 +419,7 @@ public void onPhaseFailure(String msg, Exception cause) {
public Transport.Connection getConnection(SendingTarget sendingTarget) {
Transport.Connection conn = nodeIdToConnection.apply(sendingTarget.clusterAlias, sendingTarget.nodeId);
Version minVersion = request.minCompatibleShardNode();
if (minVersion != null && conn != null && conn.getVersion().before(minVersion)) {
if (minVersion != null && conn != null && conn.getNode().getVersion().before(minVersion)) {
throw new VersionMismatchException("One of the shards is incompatible with the required minimum version [{}]", minVersion);
}
return conn;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.util.CollectionUtils;
Expand Down Expand Up @@ -256,11 +255,6 @@ public boolean isClosed() {
return connection.isClosed();
}

@Override
public Version getVersion() {
return connection.getVersion();
}

@Override
public TransportVersion getTransportVersion() {
return connection.getTransportVersion();
Expand Down Expand Up @@ -346,11 +340,6 @@ public boolean isClosed() {
return connection.isClosed();
}

@Override
public Version getVersion() {
return connection.getVersion();
}

@Override
public TransportVersion getTransportVersion() {
return connection.getTransportVersion();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -264,13 +263,6 @@ public final class NodeChannels extends CloseableConnection {
compressionScheme = connectionProfile.getCompressionScheme();
}

@Override
public Version getVersion() {
// TODO: this should be the below, but in some cases the node version does not match the passed-in version.
// return node.getVersion();
return Version.fromId(version.id());
}

@Override
public TransportVersion getTransportVersion() {
return version;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.elasticsearch.transport;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.LifecycleComponent;
Expand Down Expand Up @@ -127,13 +126,6 @@ void sendRequest(long requestId, String action, TransportRequest request, Transp

boolean isClosed();

/**
* Returns the version of the node on the other side of this channel.
*/
default Version getVersion() {
return getNode().getVersion();
}

/**
* Returns the version of the data to communicate in this channel.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import org.apache.logging.log4j.Level;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -755,11 +754,6 @@ public DiscoveryNode getNode() {
return node;
}

@Override
public Version getVersion() {
return node.getVersion();
}

@Override
public TransportVersion getTransportVersion() {
return TransportVersion.current();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
package org.elasticsearch.transport;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand All @@ -18,6 +17,7 @@
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.RemoteConnectionManager.ProxyConnection;
import org.mockito.Mockito;

import java.io.IOException;
Expand All @@ -27,8 +27,8 @@
import java.util.concurrent.ExecutionException;

import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CLUSTER_HANDSHAKE_ACTION_NAME;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
Expand Down Expand Up @@ -68,7 +68,7 @@ public void testGetConnection() {
// Add duplicate connect attempt to ensure that we do not get duplicate connections in the round robin
remoteConnectionManager.connectToRemoteClusterNode(node1, validator, new PlainActionFuture<>());

DiscoveryNode node2 = DiscoveryNodeUtils.create("node-2", address, Version.CURRENT.minimumCompatibilityVersion());
DiscoveryNode node2 = DiscoveryNodeUtils.create("node-2", address);
PlainActionFuture<Void> future2 = new PlainActionFuture<>();
remoteConnectionManager.connectToRemoteClusterNode(node2, validator, future2);
assertTrue(future2.isDone());
Expand All @@ -77,24 +77,23 @@ public void testGetConnection() {
assertEquals(node2, remoteConnectionManager.getConnection(node2).getNode());

DiscoveryNode node4 = DiscoveryNodeUtils.create("node-4", address);
assertThat(remoteConnectionManager.getConnection(node4), instanceOf(RemoteConnectionManager.ProxyConnection.class));
assertThat(remoteConnectionManager.getConnection(node4), instanceOf(ProxyConnection.class));

// Test round robin
Set<Version> versions = new HashSet<>();
versions.add(remoteConnectionManager.getConnection(node4).getVersion());
versions.add(remoteConnectionManager.getConnection(node4).getVersion());
Set<String> proxyNodes = new HashSet<>();
proxyNodes.add(((ProxyConnection) remoteConnectionManager.getConnection(node4)).getConnection().getNode().getId());
proxyNodes.add(((ProxyConnection) remoteConnectionManager.getConnection(node4)).getConnection().getNode().getId());

assertThat(versions, hasItems(Version.CURRENT, Version.CURRENT.minimumCompatibilityVersion()));
assertThat(proxyNodes, containsInAnyOrder("node-1", "node-2"));

// Test that the connection is cleared from the round robin list when it is closed
remoteConnectionManager.getConnection(node1).close();

versions.clear();
versions.add(remoteConnectionManager.getConnection(node4).getVersion());
versions.add(remoteConnectionManager.getConnection(node4).getVersion());
proxyNodes.clear();
proxyNodes.add(((ProxyConnection) remoteConnectionManager.getConnection(node4)).getConnection().getNode().getId());
proxyNodes.add(((ProxyConnection) remoteConnectionManager.getConnection(node4)).getConnection().getNode().getId());

assertThat(versions, hasItems(Version.CURRENT.minimumCompatibilityVersion()));
assertEquals(1, versions.size());
assertThat(proxyNodes, containsInAnyOrder("node-2"));
}

public void testResolveRemoteClusterAlias() throws ExecutionException, InterruptedException {
Expand All @@ -111,7 +110,7 @@ public void testResolveRemoteClusterAlias() throws ExecutionException, Interrupt

DiscoveryNode remoteNode2 = DiscoveryNodeUtils.create("remote-node-2", address);
Transport.Connection proxyConnection = remoteConnectionManager.getConnection(remoteNode2);
assertThat(proxyConnection, instanceOf(RemoteConnectionManager.ProxyConnection.class));
assertThat(proxyConnection, instanceOf(ProxyConnection.class));
assertThat(RemoteConnectionManager.resolveRemoteClusterAlias(proxyConnection).get(), equalTo("remote-cluster"));

PlainActionFuture<Transport.Connection> future2 = new PlainActionFuture<>();
Expand Down Expand Up @@ -156,11 +155,6 @@ public DiscoveryNode getNode() {
return node;
}

@Override
public Version getVersion() {
return node.getVersion();
}

@Override
public TransportVersion getTransportVersion() {
return TransportVersion.current();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.elasticsearch.test.transport;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.Lifecycle;
Expand Down Expand Up @@ -267,11 +266,6 @@ public boolean isClosed() {
return connection.isClosed();
}

@Override
public Version getVersion() {
return connection.getVersion();
}

@Override
public TransportVersion getTransportVersion() {
return connection.getTransportVersion();
Expand Down

0 comments on commit b18b5cb

Please sign in to comment.