Skip to content

Commit

Permalink
Zen Discovery: Add minimum_master_nodes setting helping with split …
Browse files Browse the repository at this point in the history
…brains, closes #1079.
  • Loading branch information
kimchy committed Jun 30, 2011
1 parent c31ee7d commit e78e665
Show file tree
Hide file tree
Showing 9 changed files with 474 additions and 62 deletions.
Expand Up @@ -210,6 +210,16 @@ public Delta delta(DiscoveryNodes other) {
return new Delta(previousMasterNode, newMasterNode, localNodeId, ImmutableList.copyOf(removed), ImmutableList.copyOf(added));
}

@Override public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("{");
for (DiscoveryNode node : this) {
sb.append(node).append(',');
}
sb.append("}");
return sb.toString();
}

public String prettyPrint() {
StringBuilder sb = new StringBuilder();
sb.append("nodes: \n");
Expand Down
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.jsr166y.LinkedTransferQueue;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoveryService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
Expand Down Expand Up @@ -80,7 +81,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe

private volatile ClusterState clusterState = newClusterStateBuilder().build();

private final ClusterBlocks.Builder initialBlocks = ClusterBlocks.builder();
private final ClusterBlocks.Builder initialBlocks = ClusterBlocks.builder().addGlobalBlock(Discovery.NO_MASTER_BLOCK);

private volatile ScheduledFuture reconnectToNodes;

Expand Down
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
Expand Down Expand Up @@ -110,7 +111,9 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
nodesBuilder.put(discovery.localNode);
}
nodesBuilder.localNodeId(master.localNode().id()).masterNodeId(master.localNode().id());
return newClusterStateBuilder().state(currentState).nodes(nodesBuilder).build();
// remove the NO_MASTER block in this case
ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeGlobalBlock(Discovery.NO_MASTER_BLOCK);
return newClusterStateBuilder().state(currentState).nodes(nodesBuilder).blocks(blocks).build();
}

@Override public void clusterStateProcessed(ClusterState clusterState) {
Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.common.UUID;
import org.elasticsearch.common.collect.Sets;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.inject.Inject;
Expand All @@ -46,11 +47,13 @@
import org.elasticsearch.discovery.zen.ping.ZenPing;
import org.elasticsearch.discovery.zen.ping.ZenPingService;
import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -225,6 +228,10 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
}

private void asyncJoinCluster() {
if (currentJoinThread != null) {
// we are already joining, ignore...
return;
}
threadPool.cached().execute(new Runnable() {
@Override public void run() {
currentJoinThread = Thread.currentThread();
Expand Down Expand Up @@ -319,6 +326,31 @@ private void innterJoinCluster() {
}
}

private void handleLeaveRequest(final DiscoveryNode node) {
if (lifecycleState() != Lifecycle.State.STARTED) {
// not started, ignore a node failure
return;
}
if (master) {
clusterService.submitStateUpdateTask("zen-disco-node_left(" + node + ")", new ClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder()
.putAll(currentState.nodes())
.remove(node.id());
latestDiscoNodes = builder.build();
currentState = newClusterStateBuilder().state(currentState).nodes(latestDiscoNodes).build();
// check if we have enough master nodes, if not, we need to move into joining the cluster again
if (!electMaster.hasEnoughMasterNodes(currentState.nodes())) {
return disconnectFromCluster(currentState, "not enough master nodes");
}
return currentState;
}
});
} else {
handleMasterGone(node, "shut_down");
}
}

private void handleNodeFailure(final DiscoveryNode node, String reason) {
if (lifecycleState() != Lifecycle.State.STARTED) {
// not started, ignore a node failure
Expand All @@ -334,7 +366,12 @@ private void handleNodeFailure(final DiscoveryNode node, String reason) {
.putAll(currentState.nodes())
.remove(node.id());
latestDiscoNodes = builder.build();
return newClusterStateBuilder().state(currentState).nodes(latestDiscoNodes).build();
currentState = newClusterStateBuilder().state(currentState).nodes(latestDiscoNodes).build();
// check if we have enough master nodes, if not, we need to move into joining the cluster again
if (!electMaster.hasEnoughMasterNodes(currentState.nodes())) {
return disconnectFromCluster(currentState, "not enough master nodes");
}
return currentState;
}

@Override public void clusterStateProcessed(ClusterState clusterState) {
Expand Down Expand Up @@ -362,50 +399,36 @@ private void handleMasterGone(final DiscoveryNode masterNode, final String reaso
return currentState;
}

ClusterBlocks clusterBlocks = currentState.blocks();
MetaData metaData = currentState.metaData();
RoutingTable routingTable = currentState.routingTable();
List<DiscoveryNode> nodes = newArrayList(currentState.nodes().nodes().values());
nodes.remove(masterNode); // remove the master node from the list, it has failed
final DiscoveryNode electedMaster = electMaster.electMaster(nodes); // elect master
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.newNodesBuilder()
.putAll(currentState.nodes())
// make sure the old master node, which has failed, is not part of the nodes we publish
.remove(masterNode.id())
.masterNodeId(null);

if (!electMaster.hasEnoughMasterNodes(nodesBuilder.build())) {
return disconnectFromCluster(ClusterState.builder().state(currentState).nodes(nodesBuilder).build(), "not enough master nodes after master left (reason = " + reason + ")");
}

final DiscoveryNode electedMaster = electMaster.electMaster(nodesBuilder.build()); // elect master
if (localNode.equals(electedMaster)) {
master = true;
masterFD.stop("got elected as new master since master left (reason = " + reason + ")");
nodesFD.start();
DiscoveryNodes.Builder builder = DiscoveryNodes.newNodesBuilder()
.putAll(currentState.nodes())
// make sure the old master node, which has failed, is not part of the nodes we publish
.remove(masterNode.id())
.masterNodeId(localNode.id());
latestDiscoNodes = builder.build();
nodesBuilder.masterNodeId(localNode.id());
latestDiscoNodes = nodesBuilder.build();
return newClusterStateBuilder().state(currentState).nodes(latestDiscoNodes).build();
} else {
nodesFD.stop();
DiscoveryNodes.Builder builder = DiscoveryNodes.newNodesBuilder()
.putAll(currentState.nodes()).remove(masterNode.id());
if (electedMaster != null) {
builder.masterNodeId(electedMaster.id());
nodesBuilder.masterNodeId(electedMaster.id());
masterFD.restart(electedMaster, "possible elected master since master left (reason = " + reason + ")");
latestDiscoNodes = nodesBuilder.build();
return newClusterStateBuilder().state(currentState)
.nodes(latestDiscoNodes)
.build();
} else {
logger.warn("master_left and no other node elected to become master, current nodes: {}", nodes);
builder.masterNodeId(null);
clusterBlocks = ClusterBlocks.builder().blocks(clusterBlocks).addGlobalBlock(NO_MASTER_BLOCK).build();
// if this is a data node, clean the metadata and routing, since we want to recreate the indices and shards
if (currentState.nodes().localNode().dataNode()) {
// TODO, what happens with versioning here?
metaData = MetaData.newMetaDataBuilder().build();
routingTable = RoutingTable.builder().build();
}
masterFD.stop("no master elected since master left (reason = " + reason + ")");
asyncJoinCluster();
return disconnectFromCluster(newClusterStateBuilder().state(currentState).nodes(nodesBuilder.build()).build(), "master_left and no other node elected to become master");
}
latestDiscoNodes = builder.build();
return newClusterStateBuilder().state(currentState)
.blocks(clusterBlocks)
.nodes(latestDiscoNodes)
.metaData(metaData)
.routingTable(routingTable)
.build();
}
}

Expand All @@ -423,8 +446,19 @@ void handleNewClusterStateFromMaster(final ClusterState newState) {
if (newState.nodes().localNode() == null) {
logger.warn("received a cluster state from [{}] and not part of the cluster, should not happen", newState.nodes().masterNode());
} else {
if (currentJoinThread != null) {
logger.debug("got a new state from master node, though we are already trying to rejoin the cluster");
}

clusterService.submitStateUpdateTask("zen-disco-receive(from master [" + newState.nodes().masterNode() + "])", new ProcessedClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {

// we don't need to do this, since we ping the master, and get notified when it has moved from being a master
// because it doesn't have enough master nodes...
//if (!electMaster.hasEnoughMasterNodes(newState.nodes())) {
// return disconnectFromCluster(newState, "not enough master nodes on new cluster state received from [" + newState.nodes().masterNode() + "]");
//}

latestDiscoNodes = newState.nodes();

// check to see that we monitor the correct master of the cluster
Expand Down Expand Up @@ -453,22 +487,6 @@ void handleNewClusterStateFromMaster(final ClusterState newState) {
}
}

private void handleLeaveRequest(final DiscoveryNode node) {
if (master) {
clusterService.submitStateUpdateTask("zen-disco-node_left(" + node + ")", new ClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder()
.putAll(currentState.nodes())
.remove(node.id());
latestDiscoNodes = builder.build();
return newClusterStateBuilder().state(currentState).nodes(latestDiscoNodes).build();
}
});
} else {
handleMasterGone(node, "shut_down");
}
}

private ClusterState handleJoinRequest(final DiscoveryNode node) {
if (!master) {
throw new ElasticSearchIllegalStateException("Node [" + localNode + "] not master for join request from [" + node + "]");
Expand Down Expand Up @@ -520,13 +538,20 @@ private DiscoveryNode findMaster() {
pingMasters.add(pingResponse.master());
}
}

Set<DiscoveryNode> possibleMasterNodes = Sets.newHashSet();
possibleMasterNodes.add(localNode);
for (ZenPing.PingResponse pingResponse : pingResponses) {
possibleMasterNodes.add(pingResponse.target());
}
// if we don't have enough master nodes, we bail, even if we get a response that indicates
// there is a master by other node, we don't see enough...
if (!electMaster.hasEnoughMasterNodes(possibleMasterNodes)) {
return null;
}

if (pingMasters.isEmpty()) {
// lets tie break between discovered nodes
List<DiscoveryNode> possibleMasterNodes = newArrayList();
possibleMasterNodes.add(localNode);
for (ZenPing.PingResponse pingResponse : pingResponses) {
possibleMasterNodes.add(pingResponse.target());
}
DiscoveryNode electedMaster = electMaster.electMaster(possibleMasterNodes);
if (localNode.equals(electedMaster)) {
return localNode;
Expand All @@ -540,6 +565,35 @@ private DiscoveryNode findMaster() {
return null;
}

private ClusterState disconnectFromCluster(ClusterState clusterState, String reason) {
logger.warn(reason + ", current nodes: {}", clusterState.nodes());
nodesFD.stop();
masterFD.stop(reason);
master = false;

ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(clusterState.blocks())
.addGlobalBlock(NO_MASTER_BLOCK)
.addGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)
.build();

// clear the routing table, we have no master, so we need to recreate the routing when we reform the cluster
RoutingTable routingTable = RoutingTable.builder().version(clusterState.routingTable().version()).build();
// we also clean the metadata, since we are going to recover it if we become master
MetaData metaData = MetaData.builder().build();

// clean the nodes, we are now not connected to anybody, since we try and reform the cluster
latestDiscoNodes = new DiscoveryNodes.Builder().put(localNode).localNodeId(localNode.id()).build();

asyncJoinCluster();

return newClusterStateBuilder().state(clusterState)
.blocks(clusterBlocks)
.nodes(latestDiscoNodes)
.routingTable(routingTable)
.metaData(metaData)
.build();
}

private void sendInitialStateEventIfNeeded() {
if (initialStateSent.compareAndSet(false, true)) {
for (InitialStateDiscoveryListener listener : initialStateListeners) {
Expand Down
Expand Up @@ -36,8 +36,24 @@ public class ElectMasterService extends AbstractComponent {

private final NodeComparator nodeComparator = new NodeComparator();

private final int minimumMasterNodes;

public ElectMasterService(Settings settings) {
super(settings);
this.minimumMasterNodes = settings.getAsInt("discovery.zen.minimum_master_nodes", -1);
}

public boolean hasEnoughMasterNodes(Iterable<DiscoveryNode> nodes) {
if (minimumMasterNodes < 1) {
return true;
}
int count = 0;
for (DiscoveryNode node : nodes) {
if (node.masterNode()) {
count++;
}
}
return count >= minimumMasterNodes;
}

/**
Expand Down Expand Up @@ -77,7 +93,7 @@ private List<DiscoveryNode> sortedMasterNodes(Iterable<DiscoveryNode> nodes) {
return null;
}
// clean non master nodes
for (Iterator<DiscoveryNode> it = possibleNodes.iterator(); it.hasNext();) {
for (Iterator<DiscoveryNode> it = possibleNodes.iterator(); it.hasNext(); ) {
DiscoveryNode node = it.next();
if (!node.masterNode()) {
it.remove();
Expand Down

0 comments on commit e78e665

Please sign in to comment.