Skip to content

Commit

Permalink
zen: Don't join master nodes or accept join requests of old and too n…
Browse files Browse the repository at this point in the history
…ew nodes.

If the version of a node is lower than the minimum supported version or higher than the maximum supported version, a node shouldn't be allowed to join and nodes should join that elected master node

Closes #11924
  • Loading branch information
martijnvg committed Jul 2, 2015
1 parent 410704f commit a4b99e6
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 5 deletions.
Expand Up @@ -23,6 +23,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
Expand Down Expand Up @@ -886,12 +887,22 @@ static boolean shouldIgnoreOrRejectNewClusterState(ESLogger logger, ClusterState
}
}

private void handleJoinRequest(final DiscoveryNode node, final MembershipAction.JoinCallback callback) {
void handleJoinRequest(final DiscoveryNode node, final MembershipAction.JoinCallback callback) {

if (!transportService.addressSupported(node.address().getClass())) {
// TODO, what should we do now? Maybe inform that node that its crap?
logger.warn("received a wrong address type from [{}], ignoring...", node);
} else {
// The minimum supported version for a node joining a master:
Version minimumNodeJoinVersion = localNode().getVersion().minimumCompatibilityVersion();
// Sanity check: maybe we don't end up here, because serialization may have failed.
if (node.getVersion().before(minimumNodeJoinVersion)) {
callback.onFailure(
new IllegalStateException("Can't handle join request from a node with a version [" + node.getVersion() + "] that is lower than the minimum compatible version [" + minimumNodeJoinVersion.minimumCompatibilityVersion() + "]")
);
return;
}

// try and connect to the node, if it fails, we can raise an exception back to the client...
transportService.connectToNode(node);

Expand Down
Expand Up @@ -22,6 +22,7 @@
import com.carrotsearch.hppc.ObjectContainer;
import com.google.common.collect.Lists;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
Expand All @@ -36,13 +37,17 @@ public class ElectMasterService extends AbstractComponent {

public static final String DISCOVERY_ZEN_MINIMUM_MASTER_NODES = "discovery.zen.minimum_master_nodes";

// This is the minimum version a master needs to be on, otherwise it gets ignored
// This is based on the minimum compatible version of the current version this node is on
private final Version minMasterVersion;
private final NodeComparator nodeComparator = new NodeComparator();

private volatile int minimumMasterNodes;

@Inject
public ElectMasterService(Settings settings) {
public ElectMasterService(Settings settings, Version version) {
super(settings);
this.minMasterVersion = version.minimumCompatibilityVersion();
this.minimumMasterNodes = settings.getAsInt(DISCOVERY_ZEN_MINIMUM_MASTER_NODES, -1);
logger.debug("using minimum_master_nodes [{}]", minimumMasterNodes);
}
Expand Down Expand Up @@ -108,7 +113,14 @@ public DiscoveryNode electMaster(Iterable<DiscoveryNode> nodes) {
if (sortedNodes == null || sortedNodes.isEmpty()) {
return null;
}
return sortedNodes.get(0);
DiscoveryNode masterNode = sortedNodes.get(0);
// Sanity check: maybe we don't end up here, because serialization may have failed.
if (masterNode.getVersion().before(minMasterVersion)) {
logger.warn("ignoring master [{}], because the version [{}] is lower than the minimum compatible version [{}]", masterNode, masterNode.getVersion(), minMasterVersion);
return null;
} else {
return masterNode;
}
}

private List<DiscoveryNode> sortedMasterNodes(Iterable<DiscoveryNode> nodes) {
Expand Down
Expand Up @@ -32,7 +32,7 @@
public class ElectMasterServiceTest extends ElasticsearchTestCase {

ElectMasterService electMasterService() {
return new ElectMasterService(Settings.EMPTY);
return new ElectMasterService(Settings.EMPTY, Version.CURRENT);
}

List<DiscoveryNode> generateRandomNodes() {
Expand Down
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.discovery.zen;

import com.google.common.collect.Iterables;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
Expand All @@ -35,7 +36,9 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.discovery.zen.fd.FaultDetection;
import org.elasticsearch.discovery.zen.membership.MembershipAction;
import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.junit.annotations.TestLogging;
Expand All @@ -45,7 +48,10 @@
import org.junit.Test;

import java.io.IOException;
import java.lang.ref.Reference;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -215,4 +221,40 @@ public void handleException(TransportException exp) {
assertThat(reference.get(), notNullValue());
assertThat(ExceptionsHelper.detailedMessage(reference.get()), containsString("cluster state from a different master then the current one, rejecting "));
}

@Test
public void testHandleNodeJoin_incompatibleMinVersion() {
Settings nodeSettings = Settings.settingsBuilder()
.put("discovery.type", "zen") // <-- To override the local setting if set externally
.build();
String nodeName = internalCluster().startNode(nodeSettings, Version.V_2_0_0);
ZenDiscovery zenDiscovery = (ZenDiscovery) internalCluster().getInstance(Discovery.class, nodeName);

DiscoveryNode node = new DiscoveryNode("_node_id", new LocalTransportAddress("_id"), Version.V_1_6_0);
final AtomicReference<IllegalStateException> holder = new AtomicReference<>();
zenDiscovery.handleJoinRequest(node, new MembershipAction.JoinCallback() {
@Override
public void onSuccess() {
}

@Override
public void onFailure(Throwable t) {
holder.set((IllegalStateException) t);
}
});

assertThat(holder.get(), notNullValue());
assertThat(holder.get().getMessage(), equalTo("Can't handle join request from a node with a version [1.6.0] that is lower than the minimum compatible version [2.0.0-SNAPSHOT]"));
}

@Test
public void testJoinElectedMaster_incompatibleMinVersion() {
ElectMasterService electMasterService = new ElectMasterService(Settings.EMPTY, Version.V_2_0_0);

DiscoveryNode node = new DiscoveryNode("_node_id", new LocalTransportAddress("_id"), Version.V_2_0_0);
assertThat(electMasterService.electMaster(Collections.singletonList(node)), sameInstance(node));
node = new DiscoveryNode("_node_id", new LocalTransportAddress("_id"), Version.V_1_6_0);
assertThat("Can't join master because version 1.6.0 is lower than the minimum compatable version 2.0.0 can support", electMasterService.electMaster(Collections.singletonList(node)), nullValue());
}

}
Expand Up @@ -57,7 +57,7 @@ public void testSimplePings() throws InterruptedException {
ThreadPool threadPool = new ThreadPool(getClass().getName());
ClusterName clusterName = new ClusterName("test");
NetworkService networkService = new NetworkService(settings);
ElectMasterService electMasterService = new ElectMasterService(settings);
ElectMasterService electMasterService = new ElectMasterService(settings, Version.CURRENT);

NettyTransport transportA = new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT);
final TransportService transportServiceA = new TransportService(transportA, threadPool).start();
Expand Down

0 comments on commit a4b99e6

Please sign in to comment.