Skip to content

Commit

Permalink
Wait till node is part of cluster state for join process
Browse files Browse the repository at this point in the history
When a node sends a join request to the master, only send back the response after it has been added to the master cluster state and published.
This will fix the rare cases where today, a join request can return, and the master, since its under load, have not yet added the node to its cluster state, and the node that joined will start a fault detect against the master, failing since its not part of the cluster state.
Since now the join request is longer, also increase the join request timeout default.
closes #6480
  • Loading branch information
kimchy committed Jun 12, 2014
1 parent 3a3f81d commit 2330421
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 20 deletions.
5 changes: 3 additions & 2 deletions docs/reference/modules/discovery/zen.asciidoc
Expand Up @@ -73,8 +73,9 @@ elected or joined to. This is done automatically. The
`discovery.zen.ping_timeout` (which defaults to `3s`) allows to
configure the election to handle cases of slow or congested networks
(higher values assure less chance of failure). Once a node joins, it
will send a join request to the master (`discovery.zen.join_timeout`)
with a timeout defaulting at 10 times the ping timeout.
will send a join request to the master (`discovery.zen.join_timeout`)
with a timeout defaulting at 20 times the ping timeout.
coming[1.3.0,Previously defaulted to 10 times the ping timeout].

Nodes can be excluded from becoming a master by setting `node.master` to
`false`. Note, once a node is a client node (`node.client` set to
Expand Down
24 changes: 14 additions & 10 deletions src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java
Expand Up @@ -135,7 +135,7 @@ public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threa

// also support direct discovery.zen settings, for cases when it gets extended
this.pingTimeout = settings.getAsTime("discovery.zen.ping.timeout", settings.getAsTime("discovery.zen.ping_timeout", componentSettings.getAsTime("ping_timeout", componentSettings.getAsTime("initial_ping_timeout", timeValueSeconds(3)))));
this.joinTimeout = settings.getAsTime("discovery.zen.join_timeout", TimeValue.timeValueMillis(pingTimeout.millis() * 10));
this.joinTimeout = settings.getAsTime("discovery.zen.join_timeout", TimeValue.timeValueMillis(pingTimeout.millis() * 20));
this.sendLeaveRequest = componentSettings.getAsBoolean("send_leave_request", true);

this.masterElectionFilterClientNodes = settings.getAsBoolean("discovery.zen.master_election.filter_client", true);
Expand Down Expand Up @@ -592,7 +592,7 @@ public void onFailure(String source, Throwable t) {
newStateProcessed.onNewClusterStateFailed(new ElasticsearchIllegalStateException("received state from a node that is not part of the cluster"));
} else {
if (currentJoinThread != null) {
logger.debug("got a new state from master node, though we are already trying to rejoin the cluster");
logger.trace("got a new state from master node while joining the cluster, this is a valid state during the last phase of the join process");
}

final ProcessClusterState processClusterState = new ProcessClusterState(newClusterState, newStateProcessed);
Expand Down Expand Up @@ -699,30 +699,29 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
}
}

private ClusterState handleJoinRequest(final DiscoveryNode node) {
private void handleJoinRequest(final DiscoveryNode node, final MembershipAction.JoinCallback callback) {
if (!master) {
throw new ElasticsearchIllegalStateException("Node [" + localNode + "] not master for join request from [" + node + "]");
}

ClusterState state = clusterService.state();
if (!transportService.addressSupported(node.address().getClass())) {
// TODO, what should we do now? Maybe inform that node that its crap?
logger.warn("received a wrong address type from [{}], ignoring...", node);
} else {
// try and connect to the node, if it fails, we can raise an exception back to the client...
transportService.connectToNode(node);
state = clusterService.state();
ClusterState state = clusterService.state();

// validate the join request, will throw a failure if it fails, which will get back to the
// node calling the join request
membership.sendValidateJoinRequestBlocking(node, state, joinTimeout);

clusterService.submitStateUpdateTask("zen-disco-receive(join from node[" + node + "])", Priority.IMMEDIATE, new ClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("zen-disco-receive(join from node[" + node + "])", Priority.IMMEDIATE, new ProcessedClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
if (currentState.nodes().nodeExists(node.id())) {
// the node already exists in the cluster
logger.warn("received a join request for an existing node [{}]", node);
logger.info("received a join request for an existing node [{}]", node);
// still send a new cluster state, so it will be re published and possibly update the other node
return ClusterState.builder(currentState).build();
}
Expand All @@ -741,10 +740,15 @@ public ClusterState execute(ClusterState currentState) {
@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
callback.onFailure(t);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
callback.onSuccess(newState);
}
});
}
return state;
}

private DiscoveryNode findMaster() {
Expand Down Expand Up @@ -869,8 +873,8 @@ public void onNewClusterState(ClusterState clusterState, NewStateProcessed newSt

private class MembershipListener implements MembershipAction.MembershipListener {
@Override
public ClusterState onJoin(DiscoveryNode node) {
return handleJoinRequest(node);
public void onJoin(DiscoveryNode node, MembershipAction.JoinCallback callback) {
handleJoinRequest(node, callback);
}

@Override
Expand Down
Expand Up @@ -39,8 +39,14 @@
*/
public class MembershipAction extends AbstractComponent {

public static interface JoinCallback {
void onSuccess(ClusterState state);

void onFailure(Throwable t);
}

public static interface MembershipListener {
ClusterState onJoin(DiscoveryNode node);
void onJoin(DiscoveryNode node, JoinCallback callback);

void onLeave(DiscoveryNode node);
}
Expand Down Expand Up @@ -160,13 +166,30 @@ public JoinRequest newInstance() {
}

@Override
public void messageReceived(JoinRequest request, TransportChannel channel) throws Exception {
ClusterState clusterState = listener.onJoin(request.node);
if (request.withClusterState) {
channel.sendResponse(new JoinResponse(clusterState));
} else {
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
public void messageReceived(final JoinRequest request, final TransportChannel channel) throws Exception {
listener.onJoin(request.node, new JoinCallback() {
@Override
public void onSuccess(ClusterState state) {
try {
if (request.withClusterState) {
channel.sendResponse(new JoinResponse(state));
} else {
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
} catch (Throwable t) {
onFailure(t);
}
}

@Override
public void onFailure(Throwable t) {
try {
channel.sendResponse(t);
} catch (Throwable e) {
logger.warn("failed to send back failure on join request", e);
}
}
});
}

@Override
Expand Down

0 comments on commit 2330421

Please sign in to comment.