Skip to content

Commit

Permalink
Discovery: On join update the latestDiscoNodes, master flag and fault…
Browse files Browse the repository at this point in the history
… detection via a cluster state update task
  • Loading branch information
martijnvg committed Sep 18, 2014
1 parent f43a8e2 commit c38e418
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
*/
public class InternalClusterService extends AbstractLifecycleComponent<ClusterService> implements ClusterService {

public final static String UPDATE_THREAD_NAME = "clusterService#updateTask";

private final ThreadPool threadPool;

private final DiscoveryService discoveryService;
Expand Down Expand Up @@ -131,7 +133,7 @@ public void removeInitialStateBlock(ClusterBlock block) throws ElasticsearchIlle
protected void doStart() throws ElasticsearchException {
add(localNodeMasterListeners);
this.clusterState = ClusterState.builder(clusterState).blocks(initialBlocks).build();
this.updateTasksExecutor = EsExecutors.newSinglePrioritizing(daemonThreadFactory(settings, "clusterService#updateTask"));
this.updateTasksExecutor = EsExecutors.newSinglePrioritizing(daemonThreadFactory(settings, UPDATE_THREAD_NAME));
this.reconnectToNodes = threadPool.schedule(reconnectInterval, ThreadPool.Names.GENERIC, new ReconnectToNodes());
discoveryService.addLifecycleListener(new LifecycleListener() {
@Override
Expand Down
62 changes: 49 additions & 13 deletions src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.service.InternalClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
Expand Down Expand Up @@ -131,6 +132,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen

private final CopyOnWriteArrayList<InitialStateDiscoveryListener> initialStateListeners = new CopyOnWriteArrayList<>();

// TODO: Remove this field in favour of using latestDiscoNodes#masterNode().equals(localNode) or something similar
private volatile boolean master = false;

private volatile DiscoveryNodes latestDiscoNodes;
Expand Down Expand Up @@ -364,18 +366,23 @@ private void innerJoinCluster() {
return;
}
retry = false;
DiscoveryNode masterNode = findMaster();
final DiscoveryNode masterNode = findMaster();
if (masterNode == null) {
logger.trace("no masterNode returned");
retry = true;
continue;
}
if (localNode.equals(masterNode)) {
this.master = true;
nodesFD.start(); // start the nodes FD
clusterService.submitStateUpdateTask("zen-disco-join (elected_as_master)", Priority.IMMEDIATE, new ProcessedClusterStateNonMasterUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
if (currentState.nodes().masterNode() != null) {
logger.debug("New cluster state has {} as master, but we were about to become master, rejoin");
return rejoin(currentState, "rejoin_due_to_master_switch_after_local_was_picked_as_master");
}
master = true;
nodesFD.start(); // start the nodes FD

// Take into account the previous known nodes, if they happen not to be available
// then fault detection will remove these nodes.
DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder(latestDiscoNodes)
Expand Down Expand Up @@ -406,22 +413,49 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
}
});
} else {
this.master = false;
// send join request
retry = !joinElectedMaster(masterNode);
if (retry) {
continue;
}

if (latestDiscoNodes.masterNode() == null) {
logger.debug("no master node is set, despite of join request completing. retrying pings");
retry = true;
continue;
}
clusterService.submitStateUpdateTask("join_master", Priority.IMMEDIATE, new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
if (!masterNode.equals(currentState.nodes().masterNode())) {
logger.debug("Master node has switched on us, rejoining...");
return rejoin(currentState, "rejoin_due_to_master_switch");
}
// the joinElectedMaster should create a full circle and publish a state that includes "us"
// in it from the master node, whereby handleNewState will place the latest disco nodes
// with the new master node in it
// TODO in theory, there is no need to even start the masterFD, since it will be started in handleNewState
if (latestDiscoNodes.masterNode() == null) {
logger.debug("no master node is set, despite of join request completing, rejoining...");
return rejoin(currentState, "rejoin_because_no_master_node_set");
}

masterFD.start(masterNode, "initial_join");
long count = clusterJoinsCounter.incrementAndGet();
logger.trace("cluster joins counter set to [{}] (joined master)", count);
masterFD.start(masterNode, "initial_join");
long count = clusterJoinsCounter.incrementAndGet();
logger.trace("cluster joins counter set to [{}] (joined master)", count);
return currentState;
}

@Override
public void onFailure(String source, final Throwable t1) {
clusterService.submitStateUpdateTask("rejoin_on_join_master", Priority.IMMEDIATE, new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
logger.debug("Rejoining rejoin failed", t1);
return rejoin(currentState, "rejoin_on_join_master_failure");
}

@Override
public void onFailure(String source, Throwable t2) {
logger.error("Couldn't rejoin after original rejoin failed. Original error {}", t2, t1);
}
});
}
});
}
}
}
Expand Down Expand Up @@ -1022,6 +1056,8 @@ private DiscoveryNode findMaster() {
}

private ClusterState rejoin(ClusterState clusterState, String reason) {
// This method should only be invoked on a update task thread
assert Thread.currentThread().getName().contains(InternalClusterService.UPDATE_THREAD_NAME);
logger.warn(reason + ", current nodes: {}", clusterState.nodes());
nodesFD.stop();
masterFD.stop(reason);
Expand Down

0 comments on commit c38e418

Please sign in to comment.