Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait till node is part of cluster state for join process #6480

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 3 additions & 2 deletions docs/reference/modules/discovery/zen.asciidoc
Expand Up @@ -73,8 +73,9 @@ elected or joined to. This is done automatically. The
`discovery.zen.ping_timeout` (which defaults to `3s`) allows to
configure the election to handle cases of slow or congested networks
(higher values assure less chance of failure). Once a node joins, it
will send a join request to the master (`discovery.zen.join_timeout`)
with a timeout defaulting at 10 times the ping timeout.
will send a join request to the master (`discovery.zen.join_timeout`)
with a timeout defaulting at 20 times the ping timeout.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this have some note when this was changed?

coming[1.3.0,Previously defaulted to 10 times the ping timeout].

Nodes can be excluded from becoming a master by setting `node.master` to
`false`. Note, once a node is a client node (`node.client` set to
Expand Down
24 changes: 14 additions & 10 deletions src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java
Expand Up @@ -135,7 +135,7 @@ public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threa

// also support direct discovery.zen settings, for cases when it gets extended
this.pingTimeout = settings.getAsTime("discovery.zen.ping.timeout", settings.getAsTime("discovery.zen.ping_timeout", componentSettings.getAsTime("ping_timeout", componentSettings.getAsTime("initial_ping_timeout", timeValueSeconds(3)))));
this.joinTimeout = settings.getAsTime("discovery.zen.join_timeout", TimeValue.timeValueMillis(pingTimeout.millis() * 10));
this.joinTimeout = settings.getAsTime("discovery.zen.join_timeout", TimeValue.timeValueMillis(pingTimeout.millis() * 20));
this.sendLeaveRequest = componentSettings.getAsBoolean("send_leave_request", true);

this.masterElectionFilterClientNodes = settings.getAsBoolean("discovery.zen.master_election.filter_client", true);
Expand Down Expand Up @@ -592,7 +592,7 @@ public void onFailure(String source, Throwable t) {
newStateProcessed.onNewClusterStateFailed(new ElasticsearchIllegalStateException("received state from a node that is not part of the cluster"));
} else {
if (currentJoinThread != null) {
logger.debug("got a new state from master node, though we are already trying to rejoin the cluster");
logger.trace("got a new state from master node while joining the cluster, this is a valid state during the last phase of the join process");
}

final ProcessClusterState processClusterState = new ProcessClusterState(newClusterState, newStateProcessed);
Expand Down Expand Up @@ -699,30 +699,29 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
}
}

private ClusterState handleJoinRequest(final DiscoveryNode node) {
private void handleJoinRequest(final DiscoveryNode node, final MembershipAction.JoinCallback callback) {
if (!master) {
throw new ElasticsearchIllegalStateException("Node [" + localNode + "] not master for join request from [" + node + "]");
}

ClusterState state = clusterService.state();
if (!transportService.addressSupported(node.address().getClass())) {
// TODO, what should we do now? Maybe inform that node that its crap?
logger.warn("received a wrong address type from [{}], ignoring...", node);
} else {
// try and connect to the node, if it fails, we can raise an exception back to the client...
transportService.connectToNode(node);
state = clusterService.state();
ClusterState state = clusterService.state();

// validate the join request, will throw a failure if it fails, which will get back to the
// node calling the join request
membership.sendValidateJoinRequestBlocking(node, state, joinTimeout);

clusterService.submitStateUpdateTask("zen-disco-receive(join from node[" + node + "])", Priority.IMMEDIATE, new ClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("zen-disco-receive(join from node[" + node + "])", Priority.IMMEDIATE, new ProcessedClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
if (currentState.nodes().nodeExists(node.id())) {
// the node already exists in the cluster
logger.warn("received a join request for an existing node [{}]", node);
logger.info("received a join request for an existing node [{}]", node);
// still send a new cluster state, so it will be re published and possibly update the other node
return ClusterState.builder(currentState).build();
}
Expand All @@ -741,10 +740,15 @@ public ClusterState execute(ClusterState currentState) {
@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
callback.onFailure(t);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
callback.onSuccess(newState);
}
});
}
return state;
}

private DiscoveryNode findMaster() {
Expand Down Expand Up @@ -869,8 +873,8 @@ public void onNewClusterState(ClusterState clusterState, NewStateProcessed newSt

private class MembershipListener implements MembershipAction.MembershipListener {
@Override
public ClusterState onJoin(DiscoveryNode node) {
return handleJoinRequest(node);
public void onJoin(DiscoveryNode node, MembershipAction.JoinCallback callback) {
handleJoinRequest(node, callback);
}

@Override
Expand Down
Expand Up @@ -39,8 +39,14 @@
*/
public class MembershipAction extends AbstractComponent {

public static interface JoinCallback {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at some point we need to find all classes that have this pattern and make them a generic class Callback<Type>

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aye, this would be nice

void onSuccess(ClusterState state);

void onFailure(Throwable t);
}

public static interface MembershipListener {
ClusterState onJoin(DiscoveryNode node);
void onJoin(DiscoveryNode node, JoinCallback callback);

void onLeave(DiscoveryNode node);
}
Expand Down Expand Up @@ -160,13 +166,30 @@ public JoinRequest newInstance() {
}

@Override
public void messageReceived(JoinRequest request, TransportChannel channel) throws Exception {
ClusterState clusterState = listener.onJoin(request.node);
if (request.withClusterState) {
channel.sendResponse(new JoinResponse(clusterState));
} else {
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
public void messageReceived(final JoinRequest request, final TransportChannel channel) throws Exception {
listener.onJoin(request.node, new JoinCallback() {
@Override
public void onSuccess(ClusterState state) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this error handling should happen upstream no? I mean we should call onFailure if onSuccess fails? I mean in theory we should have an abstract baseclass that handles that :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, I didn't want to complicate the code to be honest to do so, would be nice to have a Callback base class that does that as you mentioned before, but I suggest in a different change

try {
if (request.withClusterState) {
channel.sendResponse(new JoinResponse(state));
} else {
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
} catch (Throwable t) {
onFailure(t);
}
}

@Override
public void onFailure(Throwable t) {
try {
channel.sendResponse(t);
} catch (Throwable e) {
logger.warn("failed to send back failure on join request", e);
}
}
});
}

@Override
Expand Down