Skip to content

Commit

Permalink
Add current cluster state version to zen pings and use them in master…
Browse files Browse the repository at this point in the history
… election (#20384)

During a networking partition, cluster states updates (like mapping changes or shard assignments)
are committed if a majority of the masters node received the update correctly. This means that the current master has access to enough nodes in the cluster to continue to operate correctly. When the network partition heals, the isolated nodes catch up with the current state and get the changes they couldn't receive before. However, if a second partition happens while the cluster
is still recovering from the previous one *and* the old master is put in the minority side, it may be that a new master is elected which did not yet catch up. If that happens, cluster state updates can be lost.

This commit fixed 95% of this rare problem by adding the current cluster state version to `PingResponse` and use them when deciding which master to join (and thus casting the node's vote).

Note: this doesn't fully mitigate the problem as a cluster state update which is issued concurrently with a network partition can be lost if the partition prevents the commit message (part of the two phased commit of cluster state updates) from reaching any single node in the majority side *and* the partition does allow for the master to acknowledge the change. We are working on a more comprehensive fix but that requires considerate work  and is targeted at 6.0.
  • Loading branch information
bleskes committed Sep 15, 2016
1 parent 57871c6 commit 3ad66ba
Show file tree
Hide file tree
Showing 28 changed files with 471 additions and 376 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.zen.ElectMasterService;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.discovery.zen.fd.FaultDetection;
import org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing;
import org.elasticsearch.env.Environment;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,15 @@

package org.elasticsearch.discovery;

import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.Multibinder;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.ExtensionPoint;
import org.elasticsearch.discovery.local.LocalDiscovery;
import org.elasticsearch.discovery.zen.ElectMasterService;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.discovery.zen.ping.ZenPing;
import org.elasticsearch.discovery.zen.ping.ZenPingService;
import org.elasticsearch.discovery.zen.ping.unicast.UnicastHostsProvider;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@
* under the License.
*/

package org.elasticsearch.discovery.zen.elect;
package org.elasticsearch.discovery.zen;

import com.carrotsearch.hppc.ObjectContainer;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent;
Expand All @@ -33,9 +32,11 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

/**
*
Expand All @@ -45,17 +46,64 @@ public class ElectMasterService extends AbstractComponent {
public static final Setting<Integer> DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING =
Setting.intSetting("discovery.zen.minimum_master_nodes", -1, Property.Dynamic, Property.NodeScope);

// This is the minimum version a master needs to be on, otherwise it gets ignored
// This is based on the minimum compatible version of the current version this node is on
private final Version minMasterVersion;
private final NodeComparator nodeComparator = new NodeComparator();

private volatile int minimumMasterNodes;

/**
* a class to encapsulate all the information about a candidate in a master election
* that is needed to decided which of the candidates should win
*/
public static class MasterCandidate {

public static final long UNRECOVERED_CLUSTER_VERSION = -1;

final DiscoveryNode node;

final long clusterStateVersion;

public MasterCandidate(DiscoveryNode node, long clusterStateVersion) {
Objects.requireNonNull(node);
assert clusterStateVersion >= -1 : "got: " + clusterStateVersion;
assert node.isMasterNode();
this.node = node;
this.clusterStateVersion = clusterStateVersion;
}

public DiscoveryNode getNode() {
return node;
}

public long getClusterStateVersion() {
return clusterStateVersion;
}

@Override
public String toString() {
return "Candidate{" +
"node=" + node +
", clusterStateVersion=" + clusterStateVersion +
'}';
}

/**
* compares two candidates to indicate which the a better master.
* A higher cluster state version is better
*
* @return -1 if c1 is a batter candidate, 1 if c2.
*/
public static int compare(MasterCandidate c1, MasterCandidate c2) {
// we explicitly swap c1 and c2 here. the code expects "better" is lower in a sorted
// list, so if c2 has a higher cluster state version, it needs to come first.
int ret = Long.compare(c2.clusterStateVersion, c1.clusterStateVersion);
if (ret == 0) {
ret = compareNodes(c1.getNode(), c2.getNode());
}
return ret;
}
}

@Inject
public ElectMasterService(Settings settings) {
super(settings);
this.minMasterVersion = Version.CURRENT.minimumCompatibilityVersion();
this.minimumMasterNodes = DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(settings);
logger.debug("using minimum_master_nodes [{}]", minimumMasterNodes);
}
Expand All @@ -69,16 +117,41 @@ public int minimumMasterNodes() {
}

public boolean hasEnoughMasterNodes(Iterable<DiscoveryNode> nodes) {
if (minimumMasterNodes < 1) {
return true;
}
int count = 0;
for (DiscoveryNode node : nodes) {
if (node.isMasterNode()) {
count++;
}
}
return count >= minimumMasterNodes;
return count > 0 && (minimumMasterNodes < 0 || count >= minimumMasterNodes);
}

public boolean hasEnoughCandidates(Collection<MasterCandidate> candidates) {
if (candidates.isEmpty()) {
return false;
}
if (minimumMasterNodes < 1) {
return true;
}
assert candidates.stream().map(MasterCandidate::getNode).collect(Collectors.toSet()).size() == candidates.size() :
"duplicates ahead: " + candidates;
return candidates.size() >= minimumMasterNodes;
}

/**
* Elects a new master out of the possible nodes, returning it. Returns <tt>null</tt>
* if no master has been elected.
*/
public MasterCandidate electMaster(Collection<MasterCandidate> candidates) {
assert hasEnoughCandidates(candidates);
List<MasterCandidate> sortedCandidates = new ArrayList<>(candidates);
sortedCandidates.sort(MasterCandidate::compare);
return sortedCandidates.get(0);
}

/** selects the best active master to join, where multiple are discovered */
public DiscoveryNode tieBreakActiveMasters(Collection<DiscoveryNode> activeMasters) {
return activeMasters.stream().min(ElectMasterService::compareNodes).get();
}

public boolean hasTooManyMasterNodes(Iterable<DiscoveryNode> nodes) {
Expand Down Expand Up @@ -107,7 +180,7 @@ public void logMinimumMasterNodesWarningIfNecessary(ClusterState oldState, Clust
*/
public List<DiscoveryNode> sortByMasterLikelihood(Iterable<DiscoveryNode> nodes) {
ArrayList<DiscoveryNode> sortedNodes = CollectionUtils.iterableAsArrayList(nodes);
CollectionUtil.introSort(sortedNodes, nodeComparator);
CollectionUtil.introSort(sortedNodes, ElectMasterService::compareNodes);
return sortedNodes;
}

Expand All @@ -130,25 +203,6 @@ public DiscoveryNode[] nextPossibleMasters(ObjectContainer<DiscoveryNode> nodes,
return nextPossibleMasters.toArray(new DiscoveryNode[nextPossibleMasters.size()]);
}

/**
* Elects a new master out of the possible nodes, returning it. Returns <tt>null</tt>
* if no master has been elected.
*/
public DiscoveryNode electMaster(Iterable<DiscoveryNode> nodes) {
List<DiscoveryNode> sortedNodes = sortedMasterNodes(nodes);
if (sortedNodes == null || sortedNodes.isEmpty()) {
return null;
}
DiscoveryNode masterNode = sortedNodes.get(0);
// Sanity check: maybe we don't end up here, because serialization may have failed.
if (masterNode.getVersion().before(minMasterVersion)) {
logger.warn("ignoring master [{}], because the version [{}] is lower than the minimum compatible version [{}]", masterNode, masterNode.getVersion(), minMasterVersion);
return null;
} else {
return masterNode;
}
}

private List<DiscoveryNode> sortedMasterNodes(Iterable<DiscoveryNode> nodes) {
List<DiscoveryNode> possibleNodes = CollectionUtils.iterableAsArrayList(nodes);
if (possibleNodes.isEmpty()) {
Expand All @@ -161,21 +215,18 @@ private List<DiscoveryNode> sortedMasterNodes(Iterable<DiscoveryNode> nodes) {
it.remove();
}
}
CollectionUtil.introSort(possibleNodes, nodeComparator);
CollectionUtil.introSort(possibleNodes, ElectMasterService::compareNodes);
return possibleNodes;
}

private static class NodeComparator implements Comparator<DiscoveryNode> {

@Override
public int compare(DiscoveryNode o1, DiscoveryNode o2) {
if (o1.isMasterNode() && !o2.isMasterNode()) {
return -1;
}
if (!o1.isMasterNode() && o2.isMasterNode()) {
return 1;
}
return o1.getId().compareTo(o2.getId());
/** master nodes go before other nodes, with a secondary sort by id **/
private static int compareNodes(DiscoveryNode o1, DiscoveryNode o2) {
if (o1.isMasterNode() && !o2.isMasterNode()) {
return -1;
}
if (!o1.isMasterNode() && o2.isMasterNode()) {
return 1;
}
return o1.getId().compareTo(o2.getId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.discovery.zen.membership.MembershipAction;

import java.util.ArrayList;
Expand Down
Loading

0 comments on commit 3ad66ba

Please sign in to comment.