Skip to content

Commit

Permalink
Validate build hash in handshake (#65732)
Browse files Browse the repository at this point in the history
There is no guarantee of wire compatibility between nodes running
different builds of the same version, but today we do not validate
whether two communicating nodes are compatible or not. This results in
confusing failures that look like serialization bugs, and it usually
takes nontrivial effort to determine that the failure is in fact due to
the user running incompatible builds.

This commit adds the build hash to the transport service handshake and
validates that matching versions have matching build hashes.

Closes #65249
  • Loading branch information
DaveCTurner committed Dec 2, 2020
1 parent 6f323ad commit aba2f3e
Show file tree
Hide file tree
Showing 8 changed files with 246 additions and 29 deletions.
124 changes: 108 additions & 16 deletions server/src/main/java/org/elasticsearch/transport/TransportService.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.Build;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
Expand All @@ -34,6 +35,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.ClusterSettings;
Expand Down Expand Up @@ -68,10 +70,27 @@
import java.util.function.Predicate;
import java.util.function.Supplier;

public class TransportService extends AbstractLifecycleComponent implements ReportingService<TransportInfo>, TransportMessageListener,
TransportConnectionListener {
public class TransportService extends AbstractLifecycleComponent
implements ReportingService<TransportInfo>, TransportMessageListener, TransportConnectionListener {

private static final Logger logger = LogManager.getLogger(TransportService.class);

private static final String PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS_KEY = "es.unsafely_permit_handshake_from_incompatible_builds";
private static final boolean PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS;

static {
final String value = System.getProperty(PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS_KEY);
if (value == null) {
PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS = false;
} else if (Boolean.parseBoolean(value)) {
PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS = true;
} else {
throw new IllegalArgumentException("invalid value [" + value + "] for system property ["
+ PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS_KEY + "]");
}
}


public static final String DIRECT_RESPONSE_PROFILE = ".direct";
public static final String HANDSHAKE_ACTION_NAME = "internal:transport/handshake";

Expand Down Expand Up @@ -182,7 +201,14 @@ public TransportService(Settings settings, Transport transport, ThreadPool threa
false, false,
HandshakeRequest::new,
(request, channel, task) -> channel.sendResponse(
new HandshakeResponse(localNode, clusterName, localNode.getVersion())));
new HandshakeResponse(localNode.getVersion(), Build.CURRENT.hash(), localNode, clusterName)));

if (PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS) {
logger.warn("transport handshakes from incompatible builds are unsafely permitted on this node; remove system property [" +
PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS_KEY + "] to resolve this warning");
DeprecationLogger.getLogger(TransportService.class).deprecate("permit_handshake_from_incompatible_builds",
"system property [" + PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS_KEY + "] is deprecated and should be removed");
}
}

public RemoteClusterService getRemoteClusterService() {
Expand Down Expand Up @@ -440,8 +466,8 @@ public void onResponse(HandshakeResponse response) {
public void onFailure(Exception e) {
listener.onFailure(e);
}
}
, HandshakeResponse::new, ThreadPool.Names.GENERIC
},
HandshakeResponse::new, ThreadPool.Names.GENERIC
));
}

Expand All @@ -463,28 +489,89 @@ private HandshakeRequest() {
}

public static class HandshakeResponse extends TransportResponse {

private static final Version BUILD_HASH_HANDSHAKE_VERSION = Version.V_8_0_0;

private final Version version;

@Nullable // if version < BUILD_HASH_HANDSHAKE_VERSION
private final String buildHash;

private final DiscoveryNode discoveryNode;

private final ClusterName clusterName;
private final Version version;

public HandshakeResponse(DiscoveryNode discoveryNode, ClusterName clusterName, Version version) {
this.discoveryNode = discoveryNode;
this.version = version;
this.clusterName = clusterName;
public HandshakeResponse(Version version, String buildHash, DiscoveryNode discoveryNode, ClusterName clusterName) {
this.buildHash = Objects.requireNonNull(buildHash);
this.discoveryNode = Objects.requireNonNull(discoveryNode);
this.version = Objects.requireNonNull(version);
this.clusterName = Objects.requireNonNull(clusterName);
}

public HandshakeResponse(StreamInput in) throws IOException {
super(in);
discoveryNode = in.readOptionalWriteable(DiscoveryNode::new);
clusterName = new ClusterName(in);
version = Version.readVersion(in);
if (in.getVersion().onOrAfter(BUILD_HASH_HANDSHAKE_VERSION)) {
// the first two fields need only VInts and raw (ASCII) characters, so we cross our fingers and hope that they appear
// on the wire as we expect them to even if this turns out to be an incompatible build
version = Version.readVersion(in);
buildHash = in.readString();

try {
// If the remote node is incompatible then make an effort to identify it anyway, so we can mention it in the exception
// message, but recognise that this may fail
discoveryNode = new DiscoveryNode(in);
} catch (Exception e) {
if (isIncompatibleBuild(version, buildHash)) {
throw new IllegalArgumentException("unidentifiable remote node is build [" + buildHash +
"] of version [" + version + "] but this node is build [" + Build.CURRENT.hash() +
"] of version [" + Version.CURRENT + "] which has an incompatible wire format", e);
} else {
throw e;
}
}

if (isIncompatibleBuild(version, buildHash)) {
if (PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS) {
logger.warn("remote node [{}] is build [{}] of version [{}] but this node is build [{}] of version [{}] " +
"which may not be compatible; remove system property [{}] to resolve this warning",
discoveryNode, buildHash, version, Build.CURRENT.hash(), Version.CURRENT,
PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS_KEY);
} else {
throw new IllegalArgumentException("remote node [" + discoveryNode + "] is build [" + buildHash +
"] of version [" + version + "] but this node is build [" + Build.CURRENT.hash() +
"] of version [" + Version.CURRENT + "] which has an incompatible wire format");
}
}

clusterName = new ClusterName(in);
} else {
discoveryNode = in.readOptionalWriteable(DiscoveryNode::new);
clusterName = new ClusterName(in);
version = Version.readVersion(in);
buildHash = null;
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(discoveryNode);
clusterName.writeTo(out);
Version.writeVersion(version, out);
if (out.getVersion().onOrAfter(BUILD_HASH_HANDSHAKE_VERSION)) {
Version.writeVersion(version, out);
out.writeString(buildHash);
discoveryNode.writeTo(out);
clusterName.writeTo(out);
} else {
out.writeOptionalWriteable(discoveryNode);
clusterName.writeTo(out);
Version.writeVersion(version, out);
}
}

public Version getVersion() {
return version;
}

public String getBuildHash() {
return buildHash;
}

public DiscoveryNode getDiscoveryNode() {
Expand All @@ -494,6 +581,10 @@ public DiscoveryNode getDiscoveryNode() {
public ClusterName getClusterName() {
return clusterName;
}

private static boolean isIncompatibleBuild(Version version, String buildHash) {
return version == Version.CURRENT && Build.CURRENT.hash().equals(buildHash) == false;
}
}

public void disconnectFromNode(DiscoveryNode node) {
Expand Down Expand Up @@ -1293,4 +1384,5 @@ public void onResponseReceived(long requestId, Transport.ResponseContext holder)
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.elasticsearch.Build;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
Expand Down Expand Up @@ -479,7 +480,7 @@ private TestTransportService(Transport transport, ThreadPool threadPool) {
@Override
public void handshake(Transport.Connection connection, TimeValue timeout, Predicate<ClusterName> clusterNamePredicate,
ActionListener<HandshakeResponse> listener) {
listener.onResponse(new HandshakeResponse(connection.getNode(), new ClusterName(""), Version.CURRENT));
listener.onResponse(new HandshakeResponse(Version.CURRENT, Build.CURRENT.hash(), connection.getNode(), new ClusterName("")));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.cluster.coordination;

import org.elasticsearch.Build;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
Expand Down Expand Up @@ -227,7 +228,11 @@ public void testFailsNodeThatDisconnects() {
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {
assertFalse(node.equals(localNode));
if (action.equals(HANDSHAKE_ACTION_NAME)) {
handleResponse(requestId, new TransportService.HandshakeResponse(node, ClusterName.DEFAULT, Version.CURRENT));
handleResponse(requestId, new TransportService.HandshakeResponse(
Version.CURRENT,
Build.CURRENT.hash(),
node,
ClusterName.DEFAULT));
return;
}
deterministicTaskQueue.scheduleNow(new Runnable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.cluster.coordination;

import org.elasticsearch.Build;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
Expand Down Expand Up @@ -222,7 +223,11 @@ public void testFollowerFailsImmediatelyOnDisconnection() {
@Override
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {
if (action.equals(HANDSHAKE_ACTION_NAME)) {
handleResponse(requestId, new TransportService.HandshakeResponse(node, ClusterName.DEFAULT, Version.CURRENT));
handleResponse(requestId, new TransportService.HandshakeResponse(
Version.CURRENT,
Build.CURRENT.hash(),
node,
ClusterName.DEFAULT));
return;
}
assertThat(action, equalTo(LEADER_CHECK_ACTION_NAME));
Expand Down Expand Up @@ -332,7 +337,11 @@ public void testFollowerFailsImmediatelyOnHealthCheckFailure() {
@Override
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {
if (action.equals(HANDSHAKE_ACTION_NAME)) {
handleResponse(requestId, new TransportService.HandshakeResponse(node, ClusterName.DEFAULT, Version.CURRENT));
handleResponse(requestId, new TransportService.HandshakeResponse(
Version.CURRENT,
Build.CURRENT.hash(),
node,
ClusterName.DEFAULT));
return;
}
assertThat(action, equalTo(LEADER_CHECK_ACTION_NAME));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.elasticsearch.cluster.coordination;

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.Build;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterName;
Expand Down Expand Up @@ -161,8 +162,12 @@ private void setupMasterServiceAndCoordinator(long term, ClusterState initialSta
@Override
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode destination) {
if (action.equals(HANDSHAKE_ACTION_NAME)) {
handleResponse(requestId, new TransportService.HandshakeResponse(destination, initialState.getClusterName(),
destination.getVersion()));
handleResponse(requestId, new TransportService.HandshakeResponse(
destination.getVersion(),
Build.CURRENT.hash(),
destination,
initialState.getClusterName()
));
} else if (action.equals(JoinHelper.VALIDATE_JOIN_ACTION_NAME)) {
handleResponse(requestId, new TransportResponse.Empty());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Build;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
Expand Down Expand Up @@ -98,7 +99,11 @@ protected void onSendRequest(long requestId, String action, TransportRequest req
if (fullConnectionFailure != null && node.getAddress().equals(remoteNode.getAddress())) {
handleError(requestId, fullConnectionFailure);
} else {
handleResponse(requestId, new HandshakeResponse(remoteNode, new ClusterName(remoteClusterName), Version.CURRENT));
handleResponse(requestId, new HandshakeResponse(
Version.CURRENT,
Build.CURRENT.hash(),
remoteNode,
new ClusterName(remoteClusterName)));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.transport;

import org.elasticsearch.Build;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.ClusterName;
Expand Down Expand Up @@ -58,7 +59,11 @@ public void testDeserializationFailureLogIdentifiesListener() {
@Override
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {
if (action.equals(TransportService.HANDSHAKE_ACTION_NAME)) {
handleResponse(requestId, new TransportService.HandshakeResponse(otherNode, new ClusterName(""), Version.CURRENT));
handleResponse(requestId, new TransportService.HandshakeResponse(
Version.CURRENT,
Build.CURRENT.hash(),
otherNode,
new ClusterName("")));
}
}
};
Expand Down

0 comments on commit aba2f3e

Please sign in to comment.