Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TEST] GC Disruption #7082

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1203,6 +1203,11 @@
<bundledSignature>jdk-unsafe</bundledSignature>
<bundledSignature>jdk-deprecated</bundledSignature>
</bundledSignatures>
<excludes>
<!-- start exclude for test GC simulation using Thread.suspend -->
<exclude>org/elasticsearch/test/disruption/LongGCDisruption.class</exclude>
<!-- end exclude for Channels -->
</excludes>
<signaturesFiles>
<signaturesFile>test-signatures.txt</signaturesFile>
<signaturesFile>all-signatures.txt</signaturesFile>
Expand Down
11 changes: 10 additions & 1 deletion src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,15 @@ public void run() {
});
}


/**
* returns true if there is a currently a background thread active for (re)joining the cluster
* used for testing.
*/
public boolean joiningCluster() {
return currentJoinThread != null;
}

private void innerJoinCluster() {
boolean retry = true;
while (retry) {
Expand Down Expand Up @@ -408,7 +417,7 @@ private boolean joinElectedMaster(DiscoveryNode masterNode) {
}
} else {
if (logger.isTraceEnabled()) {
logger.trace("failed to send join request to master [{}]", t);
logger.trace("failed to send join request to master [{}]", t, masterNode);
} else {
logger.info("failed to send join request to master [{}], reason [{}]", masterNode, ExceptionsHelper.detailedMessage(t));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ public Settings settings(int nodeOrdinal) {
.put("discovery.zen.ping.multicast.enabled", false);

String[] unicastHosts = new String[unicastHostOrdinals.length];
if (InternalTestCluster.NODE_MODE.equals("local")) {
String mode = baseSettings.get("node.mode", InternalTestCluster.NODE_MODE);
if (mode.equals("local")) {
builder.put(LocalTransport.TRANSPORT_LOCAL_ADDRESS, "node_" + nodeOrdinal);
for (int i = 0; i < unicastHosts.length; i++) {
unicastHosts[i] = "node_" + unicastHostOrdinals[i];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

package org.elasticsearch.discovery;

import com.google.common.base.Predicate;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
Expand All @@ -39,6 +41,7 @@
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.discovery.zen.ping.ZenPing;
import org.elasticsearch.discovery.zen.ping.ZenPingService;
Expand All @@ -62,14 +65,14 @@
import static org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.*;

/**
*/
@ClusterScope(scope = Scope.TEST, numDataNodes = 0, transportClientRatio = 0)
@LuceneTestCase.Slow
@TestLogging("discovery.zen:TRACE")
public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationTest {
@ClusterScope(scope = Scope.TEST, numDataNodes = 0, transportClientRatio = 0)
public class DiscoveryWithServiceDisruptions extends ElasticsearchIntegrationTest {

private static final TimeValue DISRUPTION_HEALING_OVERHEAD = TimeValue.timeValueSeconds(40); // we use 30s as timeout in many places.

Expand Down Expand Up @@ -109,8 +112,9 @@ private List<String> startCluster(int numberOfNodes, int minimumMasterNode) thro
}

final static Settings DEFAULT_SETTINGS = ImmutableSettings.builder()
.put("discovery.zen.fd.ping_timeout", "1s") // <-- for hitting simulated network failures quickly
.put("discovery.zen.fd.ping_retries", "1") // <-- for hitting simulated network failures quickly
.put("discovery.zen.fd.ping_timeout", "1s") // for hitting simulated network failures quickly
.put("discovery.zen.fd.ping_retries", "1") // for hitting simulated network failures quickly
.put("discovery.zen.join_timeout", "10s") // still long to induce failures but to long so test won't time out
.put(DiscoverySettings.PUBLISH_TIMEOUT, "1s") // <-- for hitting simulated network failures quickly
.put("http.enabled", false) // just to make test quicker
.put("gateway.local.list_timeout", "10s") // still long to induce failures but to long so test won't time out
Expand All @@ -136,21 +140,26 @@ private List<String> startMulticastCluster(int numberOfNodes, int minimumMasterN
return nodes;
}

private List<String> startUnicastCluster(int numberOfNodes,@Nullable int[] unicastHostsOrdinals, int minimumMasterNode) throws ExecutionException, InterruptedException {
private List<String> startUnicastCluster(int numberOfNodes, @Nullable int[] unicastHostsOrdinals, int minimumMasterNode) throws ExecutionException, InterruptedException {
return startUnicastCluster(numberOfNodes, unicastHostsOrdinals, minimumMasterNode, ImmutableSettings.EMPTY);
}

private List<String> startUnicastCluster(int numberOfNodes, @Nullable int[] unicastHostsOrdinals, int minimumMasterNode, Settings settings) throws ExecutionException, InterruptedException {
if (minimumMasterNode < 0) {
minimumMasterNode = numberOfNodes / 2 + 1;
}
// TODO: Rarely use default settings form some of these
Settings settings = ImmutableSettings.builder()
Settings nodeSettings = ImmutableSettings.builder()
.put(DEFAULT_SETTINGS)
.put(settings)
.put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES, minimumMasterNode)
.build();

if (discoveryConfig == null) {
if (unicastHostsOrdinals == null) {
discoveryConfig = new ClusterDiscoveryConfiguration.UnicastZen(numberOfNodes, settings);
discoveryConfig = new ClusterDiscoveryConfiguration.UnicastZen(numberOfNodes, nodeSettings);
} else {
discoveryConfig = new ClusterDiscoveryConfiguration.UnicastZen(numberOfNodes, settings, unicastHostsOrdinals);
discoveryConfig = new ClusterDiscoveryConfiguration.UnicastZen(numberOfNodes, nodeSettings, unicastHostsOrdinals);
}
}
List<String> nodes = internalCluster().startNodesAsync(numberOfNodes).get();
Expand Down Expand Up @@ -494,6 +503,58 @@ public void run() {
}
}

/**
* Test that cluster recovers from a long GC on master that causes other nodes to elect a new one
*/
@Test
@TestLogging("discovery.zen:TRACE,action:TRACE,cluster.service:TRACE,indices.recovery:TRACE,indices.cluster:TRACE")
public void testMasterNodeGCs() throws Exception {
// TODO: on mac OS multicast threads are shared between nodes and we therefore we can't simulate GC and stop pinging for just one node
// find a way to block thread creation in the generic thread pool to avoid this.
// TODO: with local transport the threads of the source node enter the target node, since everything is local and like above we can't simulate GC on one node
// with netty transport the threads of different nodes don't touch each other due to the network threading Netty uses
List<String> nodes = startUnicastCluster(3, null, -1, ImmutableSettings.builder().put("node.mode", "network").build());

String oldMasterNode = internalCluster().getMasterName();
// a very long GC, but it's OK as we remove the disruption when it has had an effect
SingleNodeDisruption masterNodeDisruption = new LongGCDisruption(oldMasterNode, getRandom(), 100, 200, 30000, 60000);
internalCluster().setDisruptionScheme(masterNodeDisruption);
masterNodeDisruption.startDisrupting();

Set<String> oldNonMasterNodesSet = new HashSet<>(nodes);
oldNonMasterNodesSet.remove(oldMasterNode);

List<String> oldNonMasterNodes = new ArrayList<>(oldNonMasterNodesSet);

logger.info("waiting for nodes to de-elect master [{}]", oldMasterNode);
for (String node : oldNonMasterNodesSet) {
assertDifferentMaster(node, oldMasterNode);
}

logger.info("waiting for nodes to elect a new master");
ensureStableCluster(2, oldNonMasterNodes.get(0));

logger.info("waiting for any pinging to stop");
for (final String node : oldNonMasterNodes) {
assertTrue("node [" + node + "] is still joining master", awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
return !((ZenDiscovery) internalCluster().getInstance(Discovery.class, node)).joiningCluster();
}
}, 30, TimeUnit.SECONDS));
}

// restore GC
masterNodeDisruption.stopDisrupting();
ensureStableCluster(3, new TimeValue(DISRUPTION_HEALING_OVERHEAD.millis() + masterNodeDisruption.expectedTimeToHeal().millis()),
oldNonMasterNodes.get(0));

// make sure all nodes agree on master
String newMaster = internalCluster().getMasterName();
assertThat(newMaster, not(equalTo(oldMasterNode)));
assertMaster(newMaster, nodes);
}

/**
* Test that a document which is indexed on the majority side of a partition, is available from the minory side,
* once the partition is healed
Expand Down Expand Up @@ -559,7 +620,7 @@ public void testRejoinDocumentExistsInAllShardCopies() throws Exception {
@Test
@TestLogging("discovery.zen:TRACE,action:TRACE")
public void unicastSinglePingResponseContainsMaster() throws Exception {
List<String> nodes = startUnicastCluster(4, new int[] {0}, -1);
List<String> nodes = startUnicastCluster(4, new int[]{0}, -1);
// Figure out what is the elected master node
final String masterNode = internalCluster().getMasterName();
logger.info("---> legit elected master node=" + masterNode);
Expand Down Expand Up @@ -699,13 +760,21 @@ private void ensureStableCluster(int nodeCount, @Nullable String viaNode) {
}

private void ensureStableCluster(int nodeCount, TimeValue timeValue, @Nullable String viaNode) {
if (viaNode == null) {
viaNode = randomFrom(internalCluster().getNodeNames());
}
logger.debug("ensuring cluster is stable with [{}] nodes. access node: [{}]. timeout: [{}]", nodeCount, viaNode, timeValue);
ClusterHealthResponse clusterHealthResponse = client(viaNode).admin().cluster().prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNodes(Integer.toString(nodeCount))
.setTimeout(timeValue)
.setWaitForRelocatingShards(0)
.get();
if (clusterHealthResponse.isTimedOut()) {
ClusterStateResponse stateResponse = client(viaNode).admin().cluster().prepareState().get();
fail("failed to reach a stable cluster of [" + nodeCount + "] nodes. Tried via [" + viaNode + "]. last cluster state:\n"
+ stateResponse.getState().prettyPrint());
}
assertThat(clusterHealthResponse.isTimedOut(), is(false));
}

Expand Down Expand Up @@ -736,11 +805,28 @@ public void run() {
}, maxWaitTime.getMillis(), TimeUnit.MILLISECONDS);
}

private void assertDifferentMaster(final String node, final String oldMasterNode) throws Exception {
assertBusy(new Runnable() {
@Override
public void run() {
ClusterState state = getNodeClusterState(node);
String masterNode = null;
if (state.nodes().masterNode() != null) {
masterNode = state.nodes().masterNode().name();
}
logger.trace("[{}] master is [{}]", node, state.nodes().masterNode());
assertThat("node [" + node + "] still has [" + masterNode + "] as master",
oldMasterNode, not(equalTo(masterNode)));
}
}, 10, TimeUnit.SECONDS);
}

private void assertMaster(String masterNode, List<String> nodes) {
for (String node : nodes) {
ClusterState state = getNodeClusterState(node);
assertThat(state.nodes().size(), equalTo(nodes.size()));
assertThat(state.nodes().masterNode().name(), equalTo(masterNode));
String failMsgSuffix = "cluster_state:\n" + state.prettyPrint();
assertThat("wrong node count on [" + node + "]. " + failMsgSuffix, state.nodes().size(), equalTo(nodes.size()));
assertThat("wrong master on node [" + node + "]. " + failMsgSuffix, state.nodes().masterNode().name(), equalTo(masterNode));
}
}
}
Loading