Skip to content

Commit

Permalink
Discovery: concurrent node failures can cause unneeded cluster state …
Browse files Browse the repository at this point in the history
…publishing

When a node fails (or closes), the master processes the network disconnect event and removes the node from the cluster state. If multiple nodes fail (or shut down) in rapid succession, we process the events and remove the nodes one by one. During this process, the intermediate cluster states may cause the node fault detection to signal the failure of nodes that are not yet removed from the cluster state. While this is fine, it currently causes unneeded reroutes and cluster state publishing, which can be cumbersome in big clusters.

Closes #8804
Closes #8933
  • Loading branch information
bleskes committed Dec 15, 2014
1 parent ebac087 commit 6f8d3a2
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 1 deletion.
Expand Up @@ -544,6 +544,10 @@ private void handleNodeFailure(final DiscoveryNode node, String reason) {
clusterService.submitStateUpdateTask("zen-disco-node_failed(" + node + "), reason " + reason, Priority.IMMEDIATE, new ProcessedClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
if (currentState.nodes().get(node.id()) == null) {
logger.debug("node [{}] already removed from cluster state. ignoring.", node);
return currentState;
}
DiscoveryNodes.Builder builder = DiscoveryNodes.builder(currentState.nodes())
.remove(node.id());
currentState = ClusterState.builder(currentState).nodes(builder).build();
Expand Down
Expand Up @@ -21,20 +21,30 @@

import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.zen.fd.FaultDetection;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.hamcrest.Matchers;
import org.junit.Test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;

import static org.hamcrest.Matchers.*;

/**
*/
@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.TEST, numDataNodes = 0, numClientNodes = 0)
public class ZenDiscoveryRejoinOnMaster extends ElasticsearchIntegrationTest {
public class ZenDiscoveryTests extends ElasticsearchIntegrationTest {

@Test
public void testChangeRejoinOnMasterOptionIsDynamic() throws Exception {
Expand Down Expand Up @@ -99,4 +109,47 @@ public void run() {
assertThat(numRecoveriesAfterNewMaster, equalTo(numRecoveriesBeforeNewMaster));
}

@Test
public void testNodeFailuresAreProcessedOnce() throws ExecutionException, InterruptedException, IOException {
Settings defaultSettings = ImmutableSettings.builder()
.put(FaultDetection.SETTING_PING_TIMEOUT, "1s")
.put(FaultDetection.SETTING_PING_RETRIES, "1")
.put("discovery.type", "zen")
.build();

Settings masterNodeSettings = ImmutableSettings.builder()
.put("node.data", false)
.put(defaultSettings)
.build();
String master = internalCluster().startNode(masterNodeSettings);
Settings dateNodeSettings = ImmutableSettings.builder()
.put("node.master", false)
.put(defaultSettings)
.build();
internalCluster().startNodesAsync(2, dateNodeSettings).get();
client().admin().cluster().prepareHealth().setWaitForNodes("3").get();

ClusterService clusterService = internalCluster().getInstance(ClusterService.class, master);
final ArrayList<ClusterState> statesFound = new ArrayList<>();
final CountDownLatch nodesStopped = new CountDownLatch(1);
clusterService.add(new ClusterStateListener() {
@Override
public void clusterChanged(ClusterChangedEvent event) {
statesFound.add(event.state());
try {
// block until both nodes have stopped to accumulate node failures
nodesStopped.await();
} catch (InterruptedException e) {
//meh
}
}
});

internalCluster().stopRandomNonMasterNode();
internalCluster().stopRandomNonMasterNode();
nodesStopped.countDown();

client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get(); // wait for all to be processed
assertThat(statesFound, Matchers.hasSize(2));
}
}

0 comments on commit 6f8d3a2

Please sign in to comment.