Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Execute low level handshake in #openConnection (#22440)
Today we execute the low level handshake on the TCP layer in #connectToNode.
If #openConnection is used directly, which is truly expert, no handshake is executed
which allows connecting to nodes that are not necessarily compatible. This change
moves the handshake to #openConnection to prevent bypassing this logic.
  • Loading branch information
s1monw committed Jan 5, 2017
1 parent bf51522 commit a5daa5d
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 11 deletions.
20 changes: 10 additions & 10 deletions core/src/main/java/org/elasticsearch/transport/TcpTransport.java
Expand Up @@ -458,13 +458,6 @@ public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfil
"failed to connect to [{}], cleaning dangling connections", node), e);
throw e;
}
Channel channel = nodeChannels.channel(TransportRequestOptions.Type.PING);
final TimeValue connectTimeout = connectionProfile.getConnectTimeout() == null ?
defaultConnectionProfile.getConnectTimeout() :
connectionProfile.getConnectTimeout();
final TimeValue handshakeTimeout = connectionProfile.getHandshakeTimeout() == null ?
connectTimeout : connectionProfile.getHandshakeTimeout();
Version version = executeHandshake(node, channel, handshakeTimeout);
// we acquire a connection lock, so no way there is an existing connection
connectedNodes.put(node, nodeChannels);
if (logger.isDebugEnabled()) {
Expand All @@ -483,11 +476,18 @@ public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfil
}

@Override
public final NodeChannels openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException {
public final NodeChannels openConnection(DiscoveryNode node, ConnectionProfile connectionProfile) throws IOException {
try {
NodeChannels nodeChannels = connectToChannels(node, profile);
NodeChannels nodeChannels = connectToChannels(node, connectionProfile);
final Channel channel = nodeChannels.getChannels().get(0); // one channel is guaranteed by the connection profile
final TimeValue connectTimeout = connectionProfile.getConnectTimeout() == null ?
defaultConnectionProfile.getConnectTimeout() :
connectionProfile.getConnectTimeout();
final TimeValue handshakeTimeout = connectionProfile.getHandshakeTimeout() == null ?
connectTimeout : connectionProfile.getHandshakeTimeout();
final Version version = executeHandshake(node, channel, handshakeTimeout);
transportServiceAdapter.onConnectionOpened(node);
return nodeChannels;
return new NodeChannels(nodeChannels, version); // clone the channels - we now have the correct version
} catch (ConnectTransportException e) {
throw e;
} catch (Exception e) {
Expand Down
Expand Up @@ -684,6 +684,11 @@ public DiscoveryNode getNode() {
return connection.getNode();
}

@Override
public Version getVersion() {
return connection.getVersion();
}

@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws IOException, TransportException {
Expand Down
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.mocksocket.MockServerSocket;
import org.elasticsearch.node.Node;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -1818,6 +1819,52 @@ public void testTimeoutPerConnection() throws IOException {
}
}

public void testHandshakeWithIncompatVersion() {
assumeTrue("only tcp transport has a handshake method", serviceA.getOriginalTransport() instanceof TcpTransport);
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
try (MockTcpTransport transport = new MockTcpTransport(Settings.EMPTY, threadPool, BigArrays.NON_RECYCLING_INSTANCE,
new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Settings.EMPTY, Collections.emptyList()),
Version.fromString("2.0.0"))) {
transport.transportServiceAdapter(serviceA.new Adapter());
transport.start();
DiscoveryNode node =
new DiscoveryNode("TS_TPC", "TS_TPC", transport.boundAddress().publishAddress(), emptyMap(), emptySet(), version0);
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
builder.addConnections(1,
TransportRequestOptions.Type.BULK,
TransportRequestOptions.Type.PING,
TransportRequestOptions.Type.RECOVERY,
TransportRequestOptions.Type.REG,
TransportRequestOptions.Type.STATE);
expectThrows(ConnectTransportException.class, () -> serviceA.openConnection(node, builder.build()));
}
}

public void testHandshakeUpdatesVersion() throws IOException {
assumeTrue("only tcp transport has a handshake method", serviceA.getOriginalTransport() instanceof TcpTransport);
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
Version version = VersionUtils.randomVersionBetween(random(), Version.CURRENT.minimumCompatibilityVersion(), Version.CURRENT);
try (MockTcpTransport transport = new MockTcpTransport(Settings.EMPTY, threadPool, BigArrays.NON_RECYCLING_INSTANCE,
new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Settings.EMPTY, Collections.emptyList()),version)) {
transport.transportServiceAdapter(serviceA.new Adapter());
transport.start();
DiscoveryNode node =
new DiscoveryNode("TS_TPC", "TS_TPC", transport.boundAddress().publishAddress(), emptyMap(), emptySet(),
Version.fromString("2.0.0"));
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
builder.addConnections(1,
TransportRequestOptions.Type.BULK,
TransportRequestOptions.Type.PING,
TransportRequestOptions.Type.RECOVERY,
TransportRequestOptions.Type.REG,
TransportRequestOptions.Type.STATE);
try (Transport.Connection connection = serviceA.openConnection(node, builder.build())) {
assertEquals(connection.getVersion(), version);
}
}
}


public void testTcpHandshake() throws IOException, InterruptedException {
assumeTrue("only tcp transport has a handshake method", serviceA.getOriginalTransport() instanceof TcpTransport);
TcpTransport originalTransport = (TcpTransport) serviceA.getOriginalTransport();
Expand All @@ -1830,7 +1877,7 @@ protected String handleRequest(MockChannel mockChannel, String profileName, Stre
int messageLengthBytes, Version version, InetSocketAddress remoteAddress, byte status)
throws IOException {
return super.handleRequest(mockChannel, profileName, stream, requestId, messageLengthBytes, version, remoteAddress,
(byte)(status & ~(1<<3))); // we flip the isHanshake bit back and ackt like the handler is not found
(byte)(status & ~(1<<3))); // we flip the isHandshake bit back and act like the handler is not found
}
}) {
transport.transportServiceAdapter(serviceA.new Adapter());
Expand Down

0 comments on commit a5daa5d

Please sign in to comment.