Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update ZenDiscovery fields via the cluster service update task. #7790

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -61,6 +61,8 @@
*/
public class InternalClusterService extends AbstractLifecycleComponent<ClusterService> implements ClusterService {

public final static String UPDATE_THREAD_NAME = "clusterService#updateTask";

private final ThreadPool threadPool;

private final DiscoveryService discoveryService;
Expand Down Expand Up @@ -131,7 +133,7 @@ public void removeInitialStateBlock(ClusterBlock block) throws ElasticsearchIlle
protected void doStart() throws ElasticsearchException {
add(localNodeMasterListeners);
this.clusterState = ClusterState.builder(clusterState).blocks(initialBlocks).build();
this.updateTasksExecutor = EsExecutors.newSinglePrioritizing(daemonThreadFactory(settings, "clusterService#updateTask"));
this.updateTasksExecutor = EsExecutors.newSinglePrioritizing(daemonThreadFactory(settings, UPDATE_THREAD_NAME));
this.reconnectToNodes = threadPool.schedule(reconnectInterval, ThreadPool.Names.GENERIC, new ReconnectToNodes());
discoveryService.addLifecycleListener(new LifecycleListener() {
@Override
Expand Down
62 changes: 49 additions & 13 deletions src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.service.InternalClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
Expand Down Expand Up @@ -131,6 +132,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen

private final CopyOnWriteArrayList<InitialStateDiscoveryListener> initialStateListeners = new CopyOnWriteArrayList<>();

// TODO: Remove this field in favour of using latestDiscoNodes#masterNode().equals(localNode) or something similar
private volatile boolean master = false;

private volatile DiscoveryNodes latestDiscoNodes;
Expand Down Expand Up @@ -364,18 +366,23 @@ private void innerJoinCluster() {
return;
}
retry = false;
DiscoveryNode masterNode = findMaster();
final DiscoveryNode masterNode = findMaster();
if (masterNode == null) {
logger.trace("no masterNode returned");
retry = true;
continue;
}
if (localNode.equals(masterNode)) {
this.master = true;
nodesFD.start(); // start the nodes FD
clusterService.submitStateUpdateTask("zen-disco-join (elected_as_master)", Priority.IMMEDIATE, new ProcessedClusterStateNonMasterUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
if (currentState.nodes().masterNode() != null) {
logger.debug("New cluster state has {} as master, but we were about to become master, rejoin");
return rejoin(currentState, "rejoin_due_to_master_switch_after_local_was_picked_as_master");
}
master = true;
nodesFD.start(); // start the nodes FD

// Take into account the previous known nodes, if they happen not to be available
// then fault detection will remove these nodes.
DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder(latestDiscoNodes)
Expand Down Expand Up @@ -406,22 +413,49 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
}
});
} else {
this.master = false;
// send join request
retry = !joinElectedMaster(masterNode);
if (retry) {
continue;
}

if (latestDiscoNodes.masterNode() == null) {
logger.debug("no master node is set, despite of join request completing. retrying pings");
retry = true;
continue;
}
clusterService.submitStateUpdateTask("join_master", Priority.IMMEDIATE, new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
// the joinElectedMaster method should create a full circle and publish a state that includes
// "us" in it from the master node, whereby handleNewClusterStateFromMaster will place the
// latest disco nodes with the new master node in it
if (!masterNode.equals(currentState.nodes().masterNode())) {
// The master has switched on us or a full rejoin cycle hasn't completed.
// We need to be sure that the master in the new cluster state is the same
// as the one we picked before joining it, so we retry by doing a retry
logger.debug("Master node has switched on us, rejoining...");
return rejoin(currentState, "rejoin_due_to_master_switch");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might fail because the currentJoinThread is not null. I wonder if we can keep this simple and have an atomic success boolean this task sets to true + a count down latch for the outer thread to wait on.

} else {
// TODO in theory, there is no need to even start the masterFD, since it will be started in
// handleNewClusterStateFromMaster
masterFD.start(masterNode, "initial_join");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on removing this - it's not the job of this thread

long count = clusterJoinsCounter.incrementAndGet();
logger.trace("cluster joins counter set to [{}] (joined master)", count);
return currentState;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the above test effectively has the same test as below, the currentState will by definition need to be updated to reflect the fact that the joining node will get the published state with it "in it" and the master node set. I think that we can remove the below check, yet still have the mentioned comment above, and mention that the master might get switched on us or that we haven't completed a full circle and we will retry again

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the pr, the check has been removed and changed the comment

}

@Override
public void onFailure(String source, final Throwable t1) {
clusterService.submitStateUpdateTask("rejoin_on_join_master", Priority.IMMEDIATE, new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
logger.debug("Rejoining rejoin failed", t1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AtomicBoolean for success will make this simpler too.

return rejoin(currentState, "rejoin_on_join_master_failure");
}

masterFD.start(masterNode, "initial_join");
long count = clusterJoinsCounter.incrementAndGet();
logger.trace("cluster joins counter set to [{}] (joined master)", count);
@Override
public void onFailure(String source, Throwable t2) {
logger.error("Couldn't rejoin after original rejoin failed. Original error {}", t2, t1);
}
});
}
});
}
}
}
Expand Down Expand Up @@ -1022,6 +1056,8 @@ private DiscoveryNode findMaster() {
}

private ClusterState rejoin(ClusterState clusterState, String reason) {
// This method should only be invoked on a update task thread
assert Thread.currentThread().getName().contains(InternalClusterService.UPDATE_THREAD_NAME);
logger.warn(reason + ", current nodes: {}", clusterState.nodes());
nodesFD.stop();
masterFD.stop(reason);
Expand Down