Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't join master nodes or accept join requests of old and too new nodes #11972

Merged
merged 1 commit into from Jul 2, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -23,6 +23,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
Expand Down Expand Up @@ -886,12 +887,22 @@ static boolean shouldIgnoreOrRejectNewClusterState(ESLogger logger, ClusterState
}
}

private void handleJoinRequest(final DiscoveryNode node, final MembershipAction.JoinCallback callback) {
void handleJoinRequest(final DiscoveryNode node, final MembershipAction.JoinCallback callback) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can go back to private now, no?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops... no, because it is used in a test (ZenDiscoveryTests#testHandleNodeJoin_incompatibleMinVersion)


if (!transportService.addressSupported(node.address().getClass())) {
// TODO, what should we do now? Maybe inform that node that its crap?
logger.warn("received a wrong address type from [{}], ignoring...", node);
} else {
// The minimum supported version for a node joining a master:
Version minimumNodeJoinVersion = localNode().getVersion().minimumCompatibilityVersion();
// Sanity check: maybe we don't end up here, because serialization may have failed.
if (node.getVersion().before(minimumNodeJoinVersion)) {
callback.onFailure(
new IllegalStateException("Can't handle join request from a node with a version [" + node.getVersion() + "] that is lower than the minimum compatible version [" + minimumNodeJoinVersion.minimumCompatibilityVersion() + "]")
);
return;
}

// try and connect to the node, if it fails, we can raise an exception back to the client...
transportService.connectToNode(node);

Expand Down
Expand Up @@ -22,6 +22,7 @@
import com.carrotsearch.hppc.ObjectContainer;
import com.google.common.collect.Lists;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
Expand All @@ -36,13 +37,17 @@ public class ElectMasterService extends AbstractComponent {

public static final String DISCOVERY_ZEN_MINIMUM_MASTER_NODES = "discovery.zen.minimum_master_nodes";

// This is the minimum version a master needs to be on, otherwise it gets ignored
// This is based on the minimum compatible version of the current version this node is on
private final Version minMasterVersion;
private final NodeComparator nodeComparator = new NodeComparator();

private volatile int minimumMasterNodes;

@Inject
public ElectMasterService(Settings settings) {
public ElectMasterService(Settings settings, Version version) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we rename version into minMasterVersion, call it with Version. minimumCompatibilityVersion() and document what it means? I think it will be clearer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It turns out it's hard(ish) to remove ElectMasterService from guice.. not doing.

super(settings);
this.minMasterVersion = version.minimumCompatibilityVersion();
this.minimumMasterNodes = settings.getAsInt(DISCOVERY_ZEN_MINIMUM_MASTER_NODES, -1);
logger.debug("using minimum_master_nodes [{}]", minimumMasterNodes);
}
Expand Down Expand Up @@ -108,7 +113,14 @@ public DiscoveryNode electMaster(Iterable<DiscoveryNode> nodes) {
if (sortedNodes == null || sortedNodes.isEmpty()) {
return null;
}
return sortedNodes.get(0);
DiscoveryNode masterNode = sortedNodes.get(0);
// Sanity check: maybe we don't end up here, because serialization may have failed.
if (masterNode.getVersion().before(minMasterVersion)) {
logger.warn("ignoring master [{}], because the version [{}] is lower than the minimum compatible version [{}]", masterNode, masterNode.getVersion(), minMasterVersion);
return null;
} else {
return masterNode;
}
}

private List<DiscoveryNode> sortedMasterNodes(Iterable<DiscoveryNode> nodes) {
Expand Down
Expand Up @@ -32,7 +32,7 @@
public class ElectMasterServiceTest extends ElasticsearchTestCase {

ElectMasterService electMasterService() {
return new ElectMasterService(Settings.EMPTY);
return new ElectMasterService(Settings.EMPTY, Version.CURRENT);
}

List<DiscoveryNode> generateRandomNodes() {
Expand Down
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.discovery.zen;

import com.google.common.collect.Iterables;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
Expand All @@ -35,7 +36,9 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.discovery.zen.fd.FaultDetection;
import org.elasticsearch.discovery.zen.membership.MembershipAction;
import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.junit.annotations.TestLogging;
Expand All @@ -45,7 +48,10 @@
import org.junit.Test;

import java.io.IOException;
import java.lang.ref.Reference;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -215,4 +221,40 @@ public void handleException(TransportException exp) {
assertThat(reference.get(), notNullValue());
assertThat(ExceptionsHelper.detailedMessage(reference.get()), containsString("cluster state from a different master then the current one, rejecting "));
}

@Test
public void testHandleNodeJoin_incompatibleMinVersion() {
Settings nodeSettings = Settings.settingsBuilder()
.put("discovery.type", "zen") // <-- To override the local setting if set externally
.build();
String nodeName = internalCluster().startNode(nodeSettings, Version.V_2_0_0);
ZenDiscovery zenDiscovery = (ZenDiscovery) internalCluster().getInstance(Discovery.class, nodeName);

DiscoveryNode node = new DiscoveryNode("_node_id", new LocalTransportAddress("_id"), Version.V_1_6_0);
final AtomicReference<IllegalStateException> holder = new AtomicReference<>();
zenDiscovery.handleJoinRequest(node, new MembershipAction.JoinCallback() {
@Override
public void onSuccess() {
}

@Override
public void onFailure(Throwable t) {
holder.set((IllegalStateException) t);
}
});

assertThat(holder.get(), notNullValue());
assertThat(holder.get().getMessage(), equalTo("Can't handle join request from a node with a version [1.6.0] that is lower than the minimum compatible version [2.0.0-SNAPSHOT]"));
}

@Test
public void testJoinElectedMaster_incompatibleMinVersion() {
ElectMasterService electMasterService = new ElectMasterService(Settings.EMPTY, Version.V_2_0_0);

DiscoveryNode node = new DiscoveryNode("_node_id", new LocalTransportAddress("_id"), Version.V_2_0_0);
assertThat(electMasterService.electMaster(Collections.singletonList(node)), sameInstance(node));
node = new DiscoveryNode("_node_id", new LocalTransportAddress("_id"), Version.V_1_6_0);
assertThat("Can't join master because version 1.6.0 is lower than the minimum compatable version 2.0.0 can support", electMasterService.electMaster(Collections.singletonList(node)), nullValue());
}

}
Expand Up @@ -57,7 +57,7 @@ public void testSimplePings() throws InterruptedException {
ThreadPool threadPool = new ThreadPool(getClass().getName());
ClusterName clusterName = new ClusterName("test");
NetworkService networkService = new NetworkService(settings);
ElectMasterService electMasterService = new ElectMasterService(settings);
ElectMasterService electMasterService = new ElectMasterService(settings, Version.CURRENT);

NettyTransport transportA = new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT);
final TransportService transportServiceA = new TransportService(transportA, threadPool).start();
Expand Down