From a79cc84a6ee044af7c7c67fdbe96b5474027a201 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 11 Jun 2014 15:54:47 +0200 Subject: [PATCH] [Discovery] do not use versions to optimize cluster state copying for a first update from a new master We have an optimization which compares routing/meta data version of cluster states and tries to reuse the current object if the versions are equal. This can cause rare failures during recovery from a minimum_master_node breach when using the "new light rejoin" mechanism and simulated network disconnects. This happens where the current master updates it's state, doesn't manage to broadcast it to other nodes due to the disconnect and then steps down. The new master will start with a previous version and continue to update it. When the old master rejoins, the versions of it's state can equal but the content is different. Also improved DiscoveryWithNetworkFailuresTests to simulate this failure (and other improvements) Closes #6466 --- .../service/InternalClusterService.java | 14 -- .../discovery/DiscoverySettings.java | 1 + .../discovery/local/LocalDiscovery.java | 13 +- .../discovery/zen/ZenDiscovery.java | 18 +- .../DiscoveryWithNetworkFailuresTests.java | 237 ++++++++++++------ .../test/ElasticsearchIntegrationTest.java | 7 + 6 files changed, 191 insertions(+), 99 deletions(-) diff --git a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java index dbe0b4c7ad085..ff6f392425340 100644 --- a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java +++ b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java @@ -386,20 +386,6 @@ public void run() { } } } - } else { - if (previousClusterState.blocks().hasGlobalBlock(discoveryService.getNoMasterBlock()) && !newClusterState.blocks().hasGlobalBlock(discoveryService.getNoMasterBlock())) { - // force an update, its a fresh update from the master as we transition from a start of not having a master to having one - // have a fresh instances of routing and metadata to remove the chance that version might be the same - Builder builder = ClusterState.builder(newClusterState); - builder.routingTable(RoutingTable.builder(newClusterState.routingTable())); - builder.metaData(MetaData.builder(newClusterState.metaData())); - newClusterState = builder.build(); - logger.debug("got first state from fresh master [{}]", newClusterState.nodes().masterNodeId()); - } else if (newClusterState.version() < previousClusterState.version()) { - // we got a cluster state with older version, when we are *not* the master, let it in since it might be valid - // we check on version where applicable, like at ZenDiscovery#handleNewClusterStateFromMaster - logger.debug("got smaller cluster state when not master [" + newClusterState.version() + "<" + previousClusterState.version() + "] from source [" + source + "]"); - } } newClusterState.status(ClusterState.ClusterStateStatus.BEING_APPLIED); diff --git a/src/main/java/org/elasticsearch/discovery/DiscoverySettings.java b/src/main/java/org/elasticsearch/discovery/DiscoverySettings.java index c18bd4984671c..8304893f0ba0a 100644 --- a/src/main/java/org/elasticsearch/discovery/DiscoverySettings.java +++ b/src/main/java/org/elasticsearch/discovery/DiscoverySettings.java @@ -54,6 +54,7 @@ public DiscoverySettings(Settings settings, NodeSettingsService nodeSettingsServ super(settings); nodeSettingsService.addListener(new ApplySettings()); this.noMasterBlock = parseNoMasterBlock(settings.get(NO_MASTER_BLOCK, DEFAULT_NO_MASTER_BLOCK)); + this.publishTimeout = settings.getAsTime(PUBLISH_TIMEOUT, publishTimeout); } /** diff --git a/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java b/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java index 065f3b6e45fed..af91c3608b17d 100644 --- a/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java +++ b/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java @@ -58,6 +58,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem private final TransportService transportService; private final ClusterService clusterService; + private final DiscoveryService discoveryService; private final DiscoveryNodeService discoveryNodeService; private AllocationService allocationService; private final ClusterName clusterName; @@ -77,7 +78,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem @Inject public LocalDiscovery(Settings settings, ClusterName clusterName, TransportService transportService, ClusterService clusterService, - DiscoveryNodeService discoveryNodeService, Version version, DiscoverySettings discoverySettings) { + DiscoveryNodeService discoveryNodeService, Version version, DiscoverySettings discoverySettings, DiscoveryService discoveryService) { super(settings); this.clusterName = clusterName; this.clusterService = clusterService; @@ -85,6 +86,7 @@ public LocalDiscovery(Settings settings, ClusterName clusterName, TransportServi this.discoveryNodeService = discoveryNodeService; this.version = version; this.discoverySettings = discoverySettings; + this.discoveryService = discoveryService; } @Override @@ -305,6 +307,9 @@ private void publish(LocalDiscovery[] members, ClusterState clusterState, final nodeSpecificClusterState.status(ClusterState.ClusterStateStatus.RECEIVED); // ignore cluster state messages that do not include "me", not in the game yet... if (nodeSpecificClusterState.nodes().localNode() != null) { + assert nodeSpecificClusterState.nodes().masterNode() != null : "received a cluster state without a master"; + assert !nodeSpecificClusterState.blocks().hasGlobalBlock(discoveryService.getNoMasterBlock()) : "received a cluster state with a master block"; + discovery.clusterService.submitStateUpdateTask("local-disco-receive(from master)", new ProcessedClusterStateNonMasterUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { @@ -312,6 +317,12 @@ public ClusterState execute(ClusterState currentState) { return currentState; } + if (currentState.blocks().hasGlobalBlock(discoveryService.getNoMasterBlock())) { + // its a fresh update from the master as we transition from a start of not having a master to having one + logger.debug("got first state from fresh master [{}]", nodeSpecificClusterState.nodes().masterNodeId()); + return nodeSpecificClusterState; + } + ClusterState.Builder builder = ClusterState.builder(nodeSpecificClusterState); // if the routing table did not change, use the original one if (nodeSpecificClusterState.routingTable().version() == currentState.routingTable().version()) { diff --git a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index f6157c460e74f..da3a9ba8d09a4 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -83,6 +83,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen private final ClusterService clusterService; private AllocationService allocationService; private final ClusterName clusterName; + private final DiscoveryService discoveryService; private final DiscoveryNodeService discoveryNodeService; private final DiscoverySettings discoverySettings; private final ZenPingService pingService; @@ -126,12 +127,14 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen @Inject public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threadPool, TransportService transportService, ClusterService clusterService, NodeSettingsService nodeSettingsService, - DiscoveryNodeService discoveryNodeService, ZenPingService pingService, Version version, DiscoverySettings discoverySettings) { + DiscoveryNodeService discoveryNodeService, ZenPingService pingService, Version version, DiscoverySettings discoverySettings, + DiscoveryService discoveryService) { super(settings); this.clusterName = clusterName; this.threadPool = threadPool; this.clusterService = clusterService; this.transportService = transportService; + this.discoveryService = discoveryService; this.discoveryNodeService = discoveryNodeService; this.discoverySettings = discoverySettings; this.pingService = pingService; @@ -639,6 +642,10 @@ public void onFailure(String source, Throwable t) { final ProcessClusterState processClusterState = new ProcessClusterState(newClusterState, newStateProcessed); processNewClusterStates.add(processClusterState); + + assert newClusterState.nodes().masterNode() != null : "received a cluster state without a master"; + assert !newClusterState.blocks().hasGlobalBlock(discoveryService.getNoMasterBlock()) : "received a cluster state with a master block"; + clusterService.submitStateUpdateTask("zen-disco-receive(from master [" + newClusterState.nodes().masterNode() + "])", Priority.URGENT, new ProcessedClusterStateNonMasterUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { @@ -699,7 +706,16 @@ public ClusterState execute(ClusterState currentState) { masterFD.restart(latestDiscoNodes.masterNode(), "new cluster state received and we are monitoring the wrong master [" + masterFD.masterNode() + "]"); } + if (currentState.blocks().hasGlobalBlock(discoveryService.getNoMasterBlock())) { + // its a fresh update from the master as we transition from a start of not having a master to having one + logger.debug("got first state from fresh master [{}]", updatedState.nodes().masterNodeId()); + return updatedState; + } + + + // some optimizations to make sure we keep old objects where possible ClusterState.Builder builder = ClusterState.builder(updatedState); + // if the routing table did not change, use the original one if (updatedState.routingTable().version() == currentState.routingTable().version()) { builder.routingTable(currentState.routingTable()); diff --git a/src/test/java/org/elasticsearch/discovery/DiscoveryWithNetworkFailuresTests.java b/src/test/java/org/elasticsearch/discovery/DiscoveryWithNetworkFailuresTests.java index 1d6a346dbd9fa..905b45a65952a 100644 --- a/src/test/java/org/elasticsearch/discovery/DiscoveryWithNetworkFailuresTests.java +++ b/src/test/java/org/elasticsearch/discovery/DiscoveryWithNetworkFailuresTests.java @@ -35,10 +35,10 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.sort.SortOrder; @@ -47,9 +47,10 @@ import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.TransportModule; import org.elasticsearch.transport.TransportService; -import org.junit.Ignore; import org.junit.Test; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; @@ -61,13 +62,14 @@ /** */ -@ClusterScope(scope= Scope.TEST, numDataNodes =0) +@ClusterScope(scope = Scope.TEST, numDataNodes = 0) public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationTest { private static final Settings nodeSettings = ImmutableSettings.settingsBuilder() .put("discovery.type", "zen") // <-- To override the local setting if set externally .put("discovery.zen.fd.ping_timeout", "1s") // <-- for hitting simulated network failures quickly .put("discovery.zen.fd.ping_retries", "1") // <-- for hitting simulated network failures quickly + .put(DiscoverySettings.PUBLISH_TIMEOUT, "1s") // <-- for hitting simulated network failures quickly .put("discovery.zen.minimum_master_nodes", 2) .put(TransportModule.TRANSPORT_SERVICE_TYPE_KEY, MockTransportService.class.getName()) .build(); @@ -97,12 +99,8 @@ public void failWithMinimumMasterNodesConfigured() throws Exception { List nodes = internalCluster().startNodesAsync(3, nodeSettings).get(); - // Wait until a green status has been reaches and 3 nodes are part of the cluster - ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth() - .setWaitForEvents(Priority.LANGUID) - .setWaitForNodes("3") - .get(); - assertThat(clusterHealthResponse.isTimedOut(), is(false)); + // Wait until 3 nodes are part of the cluster + ensureStableCluster(3); // Figure out what is the elected master node DiscoveryNode masterDiscoNode = findMasterNode(nodes); @@ -155,11 +153,7 @@ public boolean apply(Object input) { } // Wait until the master node sees all 3 nodes again. - clusterHealthResponse = masterClient.admin().cluster().prepareHealth() - .setWaitForEvents(Priority.LANGUID) - .setWaitForNodes("3") - .get(); - assertThat(clusterHealthResponse.isTimedOut(), is(false)); + ensureStableCluster(3); for (String node : nodes) { ClusterState state = internalCluster().client(node).admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); @@ -171,17 +165,12 @@ public boolean apply(Object input) { } @Test - @Ignore @TestLogging("discovery.zen:TRACE,action:TRACE,cluster.service:TRACE,indices.recovery:TRACE,indices.cluster:TRACE") public void testDataConsistency() throws Exception { List nodes = internalCluster().startNodesAsync(3, nodeSettings).get(); - // Wait until a green status has been reaches and 3 nodes are part of the cluster - ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth() - .setWaitForEvents(Priority.LANGUID) - .setWaitForNodes("3") - .get(); - assertThat(clusterHealthResponse.isTimedOut(), is(false)); + // Wait until a 3 nodes are part of the cluster + ensureStableCluster(3); assertAcked(prepareCreate("test") .addMapping("type", "field", "type=long") @@ -216,35 +205,29 @@ public void testDataConsistency() throws Exception { ensureGreen("test"); // Pick a node that isn't the elected master. - String isolatedNode = nodes.get(0); - String nonIsolatedNode = nodes.get(1); - final Client nonIsolatedNodeClient = internalCluster().client(nonIsolatedNode); + final String isolatedNode = nodes.get(0); + final String nonIsolatedNode = nodes.get(1); // Simulate a network issue between the unlucky node and the rest of the cluster. - for (String nodeId : nodes) { - if (!nodeId.equals(isolatedNode)) { - addFailToSendNoConnectRule(nodeId, isolatedNode); - addFailToSendNoConnectRule(isolatedNode, nodeId); - } - } + randomIsolateNode(isolatedNode, nodes); try { - // Wait until elected master has removed that the unlucky node... + logger.info("wait until elected master has removed [{}]", isolatedNode); boolean applied = awaitBusy(new Predicate() { @Override public boolean apply(Object input) { - return nonIsolatedNodeClient.admin().cluster().prepareState().setLocal(true).get().getState().nodes().size() == 2; + return client(nonIsolatedNode).admin().cluster().prepareState().setLocal(true).get().getState().nodes().size() == 2; } }, 1, TimeUnit.MINUTES); assertThat(applied, is(true)); // The unlucky node must report *no* master node, since it can't connect to master and in fact it should // continuously ping until network failures have been resolved. However - final Client isolatedNodeClient = internalCluster().client(isolatedNode); // It may a take a bit before the node detects it has been cut off from the elected master + logger.info("waiting for isolated node [{}] to have no master", isolatedNode); applied = awaitBusy(new Predicate() { @Override public boolean apply(Object input) { - ClusterState localClusterState = isolatedNodeClient.admin().cluster().prepareState().setLocal(true).get().getState(); + ClusterState localClusterState = client(isolatedNode).admin().cluster().prepareState().setLocal(true).get().getState(); DiscoveryNodes localDiscoveryNodes = localClusterState.nodes(); logger.info("localDiscoveryNodes=" + localDiscoveryNodes.prettyPrint()); return localDiscoveryNodes.masterNode() == null; @@ -252,13 +235,14 @@ public boolean apply(Object input) { }, 10, TimeUnit.SECONDS); assertThat(applied, is(true)); - ClusterHealthResponse healthResponse = nonIsolatedNodeClient.admin().cluster().prepareHealth("test") + ClusterHealthResponse healthResponse = client(nonIsolatedNode).admin().cluster().prepareHealth("test") .setWaitForYellowStatus().get(); assertThat(healthResponse.isTimedOut(), is(false)); assertThat(healthResponse.getStatus(), equalTo(ClusterHealthStatus.YELLOW)); // Reads on the right side of the split must work - searchResponse = nonIsolatedNodeClient.prepareSearch("test").setTypes("type") + logger.info("verifying healthy part of cluster returns data"); + searchResponse = client(nonIsolatedNode).prepareSearch("test").setTypes("type") .addSort("field", SortOrder.ASC) .get(); assertHitCount(searchResponse, indexRequests.length); @@ -269,20 +253,21 @@ public boolean apply(Object input) { } // Reads on the wrong side of the split are partial - searchResponse = isolatedNodeClient.prepareSearch("test").setTypes("type") - .addSort("field", SortOrder.ASC) + logger.info("verifying isolated node [{}] returns partial data", isolatedNode); + searchResponse = client(isolatedNode).prepareSearch("test").setTypes("type") + .addSort("field", SortOrder.ASC).setPreference("_only_local") .get(); assertThat(searchResponse.getSuccessfulShards(), lessThan(searchResponse.getTotalShards())); assertThat(searchResponse.getHits().totalHits(), lessThan((long) indexRequests.length)); - // Writes on the right side of the split must work - UpdateResponse updateResponse = nonIsolatedNodeClient.prepareUpdate("test", "type", "0").setDoc("field2", 2).get(); + logger.info("verifying writes on healthy cluster"); + UpdateResponse updateResponse = client(nonIsolatedNode).prepareUpdate("test", "type", "0").setDoc("field2", 2).get(); assertThat(updateResponse.getVersion(), equalTo(2l)); - // Writes on the wrong side of the split fail try { - isolatedNodeClient.prepareUpdate("test", "type", "0").setDoc("field2", 2) - .setTimeout(TimeValue.timeValueSeconds(5)) // Fail quick, otherwise we wait 60 seconds. + logger.info("verifying writes on isolated [{}] fail", isolatedNode); + client(isolatedNode).prepareUpdate("test", "type", "0").setDoc("field2", 2) + .setTimeout("1s") // Fail quick, otherwise we wait 60 seconds. .get(); fail(); } catch (ClusterBlockException exception) { @@ -294,23 +279,13 @@ public boolean apply(Object input) { } finally { // stop simulating network failures, from this point on the unlucky node is able to rejoin // We also need to do this even if assertions fail, since otherwise the test framework can't work properly - for (String nodeId : nodes) { - if (!nodeId.equals(isolatedNode)) { - clearNoConnectRule(nodeId, isolatedNode); - clearNoConnectRule(isolatedNode, nodeId); - } - } + restoreIsolation(isolatedNode, nodes); } // Wait until the master node sees all 3 nodes again. - clusterHealthResponse = nonIsolatedNodeClient.admin().cluster().prepareHealth() - .setWaitForGreenStatus() - .setWaitForEvents(Priority.LANGUID) - .setWaitForNodes("3") - .get(); - assertThat(clusterHealthResponse.getStatus(), equalTo(ClusterHealthStatus.GREEN)); - assertThat(clusterHealthResponse.isTimedOut(), is(false)); + ensureStableCluster(3); + logger.info("verifying all nodes return all data"); for (Client client : clients()) { searchResponse = client.prepareSearch("test").setTypes("type") .addSort("field", SortOrder.ASC) @@ -334,41 +309,96 @@ public boolean apply(Object input) { } } + @Test - @Ignore @TestLogging("discovery.zen:TRACE,action:TRACE,cluster.service:TRACE,indices.recovery:TRACE,indices.cluster:TRACE") - public void testRejoinDocumentExistsInAllShardCopies() throws Exception { + public void voidIsolateMasterAndVerifyClusterStateConsensus() throws Exception { final List nodes = internalCluster().startNodesAsync(3, nodeSettings).get(); - ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth() - .setWaitForEvents(Priority.LANGUID) - .setWaitForNodes("3") - .get(); - assertThat(clusterHealthResponse.isTimedOut(), is(false)); + ensureStableCluster(3); + assertAcked(prepareCreate("test") .setSettings(ImmutableSettings.builder() - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2) - ) - .get()); - ensureGreen("test"); + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1 + randomInt(2)) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(2)) + )); - String isolatedNode = findMasterNode(nodes).getName(); - String notIsolatedNode = null; + ensureGreen(); + String isolatedNode = findMasterNode(nodes).name(); + String nonIsolatedNode = null; for (String node : nodes) { if (!node.equals(isolatedNode)) { - notIsolatedNode = node; + nonIsolatedNode = node; break; } } + randomIsolateNode(isolatedNode, nodes); - logger.info("Isolating node[" + isolatedNode + "]"); - for (String nodeId : nodes) { - if (!nodeId.equals(isolatedNode)) { - addFailToSendNoConnectRule(nodeId, isolatedNode); - addFailToSendNoConnectRule(isolatedNode, nodeId); + // make sure cluster reforms + ensureStableCluster(2, nonIsolatedNode); + + // restore isolation + restoreIsolation(isolatedNode, nodes); + + ensureStableCluster(3); + + logger.info("issue a reroute"); + // trigger a reroute now, instead of waiting for the background reroute of RerouteService + assertAcked(client().admin().cluster().prepareReroute()); + // and wait for it to finish. + assertFalse(client().admin().cluster().prepareHealth().setWaitForRelocatingShards(0).get().isTimedOut()); + + + // verify all cluster states are the same + ClusterState state = null; + for (String node : nodes) { + ClusterState nodeState = client(node).admin().cluster().prepareState().setLocal(true).get().getState(); + if (state == null) { + state = nodeState; + continue; + } + // assert nodes are identical + try { + assertEquals("unequal versions", state.version(), nodeState.version()); + assertEquals("unequal node count", state.nodes().size(), nodeState.nodes().size()); + assertEquals("different masters ", state.nodes().masterNodeId(), nodeState.nodes().masterNodeId()); + assertEquals("different meta data version", state.metaData().version(), nodeState.metaData().version()); + if (!state.routingTable().prettyPrint().equals(nodeState.routingTable().prettyPrint())) { + fail("different routing"); + } + } catch (AssertionError t) { + fail("failed comparing cluster state: " + t.getMessage() + "\n" + + "--- cluster state of node [" + nodes.get(0) + "]: ---\n" + state.prettyPrint() + + "\n--- cluster state [" + node + "]: ---\n" + nodeState.prettyPrint()); } + } - ensureYellow("test"); + + } + + + @Test + @TestLogging("discovery.zen:TRACE,action:TRACE,cluster.service:TRACE,indices.recovery:TRACE,indices.cluster:TRACE") + public void testRejoinDocumentExistsInAllShardCopies() throws Exception { + List nodes = internalCluster().startNodesAsync(3, nodeSettings).get(); + ensureStableCluster(3); + + assertAcked(prepareCreate("test") + .setSettings(ImmutableSettings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2) + ) + .get()); + ensureGreen("test"); + + nodes = new ArrayList<>(nodes); + Collections.shuffle(nodes, getRandom()); + String isolatedNode = nodes.get(0); + String notIsolatedNode = nodes.get(1); + + randomIsolateNode(isolatedNode, nodes); + ensureStableCluster(2, notIsolatedNode); + assertFalse(client(notIsolatedNode).admin().cluster().prepareHealth("test").setWaitForYellowStatus().get().isTimedOut()); + IndexResponse indexResponse = internalCluster().client(notIsolatedNode).prepareIndex("test", "type").setSource("field", "value").get(); assertThat(indexResponse.getVersion(), equalTo(1l)); @@ -381,13 +411,9 @@ public void testRejoinDocumentExistsInAllShardCopies() throws Exception { assertThat(getResponse.getVersion(), equalTo(1l)); assertThat(getResponse.getId(), equalTo(indexResponse.getId())); - for (String nodeId : nodes) { - if (!nodeId.equals(isolatedNode)) { - clearNoConnectRule(nodeId, isolatedNode); - clearNoConnectRule(isolatedNode, nodeId); - } - } + restoreIsolation(isolatedNode, nodes); + ensureStableCluster(3); ensureGreen("test"); for (String node : nodes) { @@ -401,6 +427,32 @@ public void testRejoinDocumentExistsInAllShardCopies() throws Exception { } } + protected void restoreIsolation(String isolatedNode, List nodes) { + logger.info("restoring isolation of [{}]", isolatedNode); + for (String nodeId : nodes) { + if (!nodeId.equals(isolatedNode)) { + clearNoConnectRule(nodeId, isolatedNode); + clearNoConnectRule(isolatedNode, nodeId); + } + } + } + + protected void randomIsolateNode(String isolatedNode, List nodes) { + boolean unresponsive = randomBoolean(); + logger.info("isolating [{}] with unresponsive: [{}]", isolatedNode, unresponsive); + for (String nodeId : nodes) { + if (!nodeId.equals(isolatedNode)) { + if (unresponsive) { + addUnresponsiveRule(nodeId, isolatedNode); + addUnresponsiveRule(isolatedNode, nodeId); + } else { + addFailToSendNoConnectRule(nodeId, isolatedNode); + addFailToSendNoConnectRule(isolatedNode, nodeId); + } + } + } + } + private DiscoveryNode findMasterNode(List nodes) { DiscoveryNode masterDiscoNode = null; for (String node : nodes) { @@ -421,9 +473,28 @@ private void addFailToSendNoConnectRule(String fromNode, String toNode) { ((MockTransportService) mockTransportService).addFailToSendNoConnectRule(internalCluster().getInstance(Discovery.class, toNode).localNode()); } + private void addUnresponsiveRule(String fromNode, String toNode) { + TransportService mockTransportService = internalCluster().getInstance(TransportService.class, fromNode); + ((MockTransportService) mockTransportService).addUnresponsiveRule(internalCluster().getInstance(Discovery.class, toNode).localNode()); + } + private void clearNoConnectRule(String fromNode, String toNode) { TransportService mockTransportService = internalCluster().getInstance(TransportService.class, fromNode); ((MockTransportService) mockTransportService).clearRule(internalCluster().getInstance(Discovery.class, toNode).localNode()); } + + private void ensureStableCluster(int nodeCount) { + ensureStableCluster(nodeCount, null); + } + + private void ensureStableCluster(int nodeCount, @Nullable String viaNode) { + ClusterHealthResponse clusterHealthResponse = client(viaNode).admin().cluster().prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForNodes(Integer.toString(nodeCount)) + .setWaitForRelocatingShards(0) + .get(); + assertThat(clusterHealthResponse.isTimedOut(), is(false)); + } + } diff --git a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java index d68340013fd19..1d1cd8cdc76d0 100644 --- a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java +++ b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java @@ -591,6 +591,13 @@ public ClusterService clusterService() { } public static Client client() { + return client(null); + } + + public static Client client(@Nullable String node) { + if (node != null) { + return internalCluster().client(node); + } Client client = cluster().client(); if (frequently()) { client = new RandomizingClient(client, getRandom());