Skip to content

Commit

Permalink
[TEST] Added LongGCDisruption and a test simulating GC on master nodes
Browse files Browse the repository at this point in the history
Also rename DiscoveryWithNetworkFailuresTests to DiscoveryWithServiceDisruptions which better suites what we do.
  • Loading branch information
bleskes authored and martijnvg committed Jul 29, 2014
1 parent ad72037 commit cd8cfaa
Show file tree
Hide file tree
Showing 5 changed files with 293 additions and 15 deletions.
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1203,6 +1203,11 @@
<bundledSignature>jdk-unsafe</bundledSignature>
<bundledSignature>jdk-deprecated</bundledSignature>
</bundledSignatures>
<excludes>
<!-- start exclude for test GC simulation using Thread.suspend -->
<exclude>org/elasticsearch/test/disruption/LongGCDisruption.class</exclude>
<!-- end exclude for Channels -->
</excludes>
<signaturesFiles>
<signaturesFile>test-signatures.txt</signaturesFile>
<signaturesFile>all-signatures.txt</signaturesFile>
Expand Down
11 changes: 10 additions & 1 deletion src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,15 @@ public void run() {
});
}


/**
* returns true if there is a currently a background thread active for (re)joining the cluster
* used for testing.
*/
public boolean joiningCluster() {
return currentJoinThread != null;
}

private void innerJoinCluster() {
boolean retry = true;
while (retry) {
Expand Down Expand Up @@ -408,7 +417,7 @@ private boolean joinElectedMaster(DiscoveryNode masterNode) {
}
} else {
if (logger.isTraceEnabled()) {
logger.trace("failed to send join request to master [{}]", t);
logger.trace("failed to send join request to master [{}]", t, masterNode);
} else {
logger.info("failed to send join request to master [{}], reason [{}]", masterNode, ExceptionsHelper.detailedMessage(t));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ public Settings settings(int nodeOrdinal) {
.put("discovery.zen.ping.multicast.enabled", false);

String[] unicastHosts = new String[unicastHostOrdinals.length];
if (InternalTestCluster.NODE_MODE.equals("local")) {
String mode = baseSettings.get("node.mode", InternalTestCluster.NODE_MODE);
if (mode.equals("local")) {
builder.put(LocalTransport.TRANSPORT_LOCAL_ADDRESS, "node_" + nodeOrdinal);
for (int i = 0; i < unicastHosts.length; i++) {
unicastHosts[i] = "node_" + unicastHostOrdinals[i];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

package org.elasticsearch.discovery;

import com.google.common.base.Predicate;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
Expand All @@ -39,6 +41,7 @@
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.discovery.zen.ping.ZenPing;
import org.elasticsearch.discovery.zen.ping.ZenPingService;
Expand All @@ -62,14 +65,14 @@
import static org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.*;

/**
*/
@ClusterScope(scope = Scope.TEST, numDataNodes = 0, transportClientRatio = 0)
@LuceneTestCase.Slow
@TestLogging("discovery.zen:TRACE")
public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationTest {
@ClusterScope(scope = Scope.TEST, numDataNodes = 0, transportClientRatio = 0)
public class DiscoveryWithServiceDisruptions extends ElasticsearchIntegrationTest {

private static final TimeValue DISRUPTION_HEALING_OVERHEAD = TimeValue.timeValueSeconds(40); // we use 30s as timeout in many places.

Expand Down Expand Up @@ -109,8 +112,9 @@ private List<String> startCluster(int numberOfNodes, int minimumMasterNode) thro
}

final static Settings DEFAULT_SETTINGS = ImmutableSettings.builder()
.put("discovery.zen.fd.ping_timeout", "1s") // <-- for hitting simulated network failures quickly
.put("discovery.zen.fd.ping_retries", "1") // <-- for hitting simulated network failures quickly
.put("discovery.zen.fd.ping_timeout", "1s") // for hitting simulated network failures quickly
.put("discovery.zen.fd.ping_retries", "1") // for hitting simulated network failures quickly
.put("discovery.zen.join_timeout", "10s") // still long to induce failures but to long so test won't time out
.put(DiscoverySettings.PUBLISH_TIMEOUT, "1s") // <-- for hitting simulated network failures quickly
.put("http.enabled", false) // just to make test quicker
.put("gateway.local.list_timeout", "10s") // still long to induce failures but to long so test won't time out
Expand All @@ -136,21 +140,26 @@ private List<String> startMulticastCluster(int numberOfNodes, int minimumMasterN
return nodes;
}

private List<String> startUnicastCluster(int numberOfNodes,@Nullable int[] unicastHostsOrdinals, int minimumMasterNode) throws ExecutionException, InterruptedException {
private List<String> startUnicastCluster(int numberOfNodes, @Nullable int[] unicastHostsOrdinals, int minimumMasterNode) throws ExecutionException, InterruptedException {
return startUnicastCluster(numberOfNodes, unicastHostsOrdinals, minimumMasterNode, ImmutableSettings.EMPTY);
}

private List<String> startUnicastCluster(int numberOfNodes, @Nullable int[] unicastHostsOrdinals, int minimumMasterNode, Settings settings) throws ExecutionException, InterruptedException {
if (minimumMasterNode < 0) {
minimumMasterNode = numberOfNodes / 2 + 1;
}
// TODO: Rarely use default settings form some of these
Settings settings = ImmutableSettings.builder()
Settings nodeSettings = ImmutableSettings.builder()
.put(DEFAULT_SETTINGS)
.put(settings)
.put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES, minimumMasterNode)
.build();

if (discoveryConfig == null) {
if (unicastHostsOrdinals == null) {
discoveryConfig = new ClusterDiscoveryConfiguration.UnicastZen(numberOfNodes, settings);
discoveryConfig = new ClusterDiscoveryConfiguration.UnicastZen(numberOfNodes, nodeSettings);
} else {
discoveryConfig = new ClusterDiscoveryConfiguration.UnicastZen(numberOfNodes, settings, unicastHostsOrdinals);
discoveryConfig = new ClusterDiscoveryConfiguration.UnicastZen(numberOfNodes, nodeSettings, unicastHostsOrdinals);
}
}
List<String> nodes = internalCluster().startNodesAsync(numberOfNodes).get();
Expand Down Expand Up @@ -494,6 +503,58 @@ public void run() {
}
}

/**
* Test that cluster recovers from a long GC on master that causes other nodes to elect a new one
*/
@Test
@TestLogging("discovery.zen:TRACE,action:TRACE,cluster.service:TRACE,indices.recovery:TRACE,indices.cluster:TRACE")
public void testMasterNodeGCs() throws Exception {
// TODO: on mac OS multicast threads are shared between nodes and we therefore we can't simulate GC and stop pinging for just one node
// find a way to block thread creation in the generic thread pool to avoid this.
// TODO: with local transport the threads of the source node enter the target node, since everything is local and like above we can't simulate GC on one node
// with netty transport the threads of different nodes don't touch each other due to the network threading Netty uses
List<String> nodes = startUnicastCluster(3, null, -1, ImmutableSettings.builder().put("node.mode", "network").build());

String oldMasterNode = internalCluster().getMasterName();
// a very long GC, but it's OK as we remove the disruption when it has had an effect
SingleNodeDisruption masterNodeDisruption = new LongGCDisruption(oldMasterNode, getRandom(), 100, 200, 30000, 60000);
internalCluster().setDisruptionScheme(masterNodeDisruption);
masterNodeDisruption.startDisrupting();

Set<String> oldNonMasterNodesSet = new HashSet<>(nodes);
oldNonMasterNodesSet.remove(oldMasterNode);

List<String> oldNonMasterNodes = new ArrayList<>(oldNonMasterNodesSet);

logger.info("waiting for nodes to de-elect master [{}]", oldMasterNode);
for (String node : oldNonMasterNodesSet) {
assertDifferentMaster(node, oldMasterNode);
}

logger.info("waiting for nodes to elect a new master");
ensureStableCluster(2, oldNonMasterNodes.get(0));

logger.info("waiting for any pinging to stop");
for (final String node : oldNonMasterNodes) {
assertTrue("node [" + node + "] is still joining master", awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
return !((ZenDiscovery) internalCluster().getInstance(Discovery.class, node)).joiningCluster();
}
}, 30, TimeUnit.SECONDS));
}

// restore GC
masterNodeDisruption.stopDisrupting();
ensureStableCluster(3, new TimeValue(DISRUPTION_HEALING_OVERHEAD.millis() + masterNodeDisruption.expectedTimeToHeal().millis()),
oldNonMasterNodes.get(0));

// make sure all nodes agree on master
String newMaster = internalCluster().getMasterName();
assertThat(newMaster, not(equalTo(oldMasterNode)));
assertMaster(newMaster, nodes);
}

/**
* Test that a document which is indexed on the majority side of a partition, is available from the minory side,
* once the partition is healed
Expand Down Expand Up @@ -559,7 +620,7 @@ public void testRejoinDocumentExistsInAllShardCopies() throws Exception {
@Test
@TestLogging("discovery.zen:TRACE,action:TRACE")
public void unicastSinglePingResponseContainsMaster() throws Exception {
List<String> nodes = startUnicastCluster(4, new int[] {0}, -1);
List<String> nodes = startUnicastCluster(4, new int[]{0}, -1);
// Figure out what is the elected master node
final String masterNode = internalCluster().getMasterName();
logger.info("---> legit elected master node=" + masterNode);
Expand Down Expand Up @@ -699,13 +760,21 @@ private void ensureStableCluster(int nodeCount, @Nullable String viaNode) {
}

private void ensureStableCluster(int nodeCount, TimeValue timeValue, @Nullable String viaNode) {
if (viaNode == null) {
viaNode = randomFrom(internalCluster().getNodeNames());
}
logger.debug("ensuring cluster is stable with [{}] nodes. access node: [{}]. timeout: [{}]", nodeCount, viaNode, timeValue);
ClusterHealthResponse clusterHealthResponse = client(viaNode).admin().cluster().prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNodes(Integer.toString(nodeCount))
.setTimeout(timeValue)
.setWaitForRelocatingShards(0)
.get();
if (clusterHealthResponse.isTimedOut()) {
ClusterStateResponse stateResponse = client(viaNode).admin().cluster().prepareState().get();
fail("failed to reach a stable cluster of [" + nodeCount + "] nodes. Tried via [" + viaNode + "]. last cluster state:\n"
+ stateResponse.getState().prettyPrint());
}
assertThat(clusterHealthResponse.isTimedOut(), is(false));
}

Expand Down Expand Up @@ -736,11 +805,28 @@ public void run() {
}, maxWaitTime.getMillis(), TimeUnit.MILLISECONDS);
}

private void assertDifferentMaster(final String node, final String oldMasterNode) throws Exception {
assertBusy(new Runnable() {
@Override
public void run() {
ClusterState state = getNodeClusterState(node);
String masterNode = null;
if (state.nodes().masterNode() != null) {
masterNode = state.nodes().masterNode().name();
}
logger.trace("[{}] master is [{}]", node, state.nodes().masterNode());
assertThat("node [" + node + "] still has [" + masterNode + "] as master",
oldMasterNode, not(equalTo(masterNode)));
}
}, 10, TimeUnit.SECONDS);
}

private void assertMaster(String masterNode, List<String> nodes) {
for (String node : nodes) {
ClusterState state = getNodeClusterState(node);
assertThat(state.nodes().size(), equalTo(nodes.size()));
assertThat(state.nodes().masterNode().name(), equalTo(masterNode));
String failMsgSuffix = "cluster_state:\n" + state.prettyPrint();
assertThat("wrong node count on [" + node + "]. " + failMsgSuffix, state.nodes().size(), equalTo(nodes.size()));
assertThat("wrong master on node [" + node + "]. " + failMsgSuffix, state.nodes().masterNode().name(), equalTo(masterNode));
}
}
}
Loading

0 comments on commit cd8cfaa

Please sign in to comment.