Skip to content
Permalink
Browse files

Added ServiceDisruptionScheme(s) and testAckedIndexing

This commit adds the notion of ServiceDisruptionScheme allowing for introducing disruptions in our test cluster. This
abstraction as used in a couple of wrappers around the functionality offered by MockTransportService to simulate various
network partions. There is also one implementation for causing a node to be slow in processing cluster state updates.

This new mechnaism is integrated into existing tests DiscoveryWithNetworkFailuresTests.

A new test called testAckedIndexing is added to verify retrieval of documents whose indexing was acked during various disruptions.

Closes #6505
  • Loading branch information...
bleskes committed May 16, 2014
1 parent 797b4b5 commit ef759322231b21aa3c8b160f86b895483cff1ebf
@@ -340,7 +340,7 @@ public ClusterState execute(ClusterState currentState) {

@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
logger.error("unexpected failure during [{}]", t, source);
}

@Override
@@ -406,8 +406,7 @@ public ClusterState execute(ClusterState currentState) {
public void onFailure(String source, Throwable t) {
if (t instanceof ClusterService.NoLongerMasterException) {
logger.debug("not processing {} leave request as we are no longer master", node);
}
else {
} else {
logger.error("unexpected failure during [{}]", t, source);
}
}
@@ -446,8 +445,7 @@ public ClusterState execute(ClusterState currentState) {
public void onFailure(String source, Throwable t) {
if (t instanceof ClusterService.NoLongerMasterException) {
logger.debug("not processing [{}] as we are no longer master", source);
}
else {
} else {
logger.error("unexpected failure during [{}]", t, source);
}
}
@@ -484,8 +482,7 @@ public ClusterState execute(ClusterState currentState) {
public void onFailure(String source, Throwable t) {
if (t instanceof ClusterService.NoLongerMasterException) {
logger.debug("not processing [{}] as we are no longer master", source);
}
else {
} else {
logger.error("unexpected failure during [{}]", t, source);
}
}
@@ -594,7 +591,7 @@ void handleNewClusterStateFromMaster(ClusterState newClusterState, final Publish
return;
}
if (master) {
logger.debug("received cluster state from [{}] which is also master but with cluster name [{}]", newClusterState.nodes().masterNode(), incomingClusterName);
logger.debug("received cluster state from [{}] which is also master but with cluster name [{}]", newClusterState.nodes().masterNode(), incomingClusterName);
final ClusterState newState = newClusterState;
clusterService.submitStateUpdateTask("zen-disco-master_receive_cluster_state_from_another_master [" + newState.nodes().masterNode() + "]", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
@Override
@@ -638,7 +635,6 @@ public void onFailure(String source, Throwable t) {
final ProcessClusterState processClusterState = new ProcessClusterState(newClusterState, newStateProcessed);
processNewClusterStates.add(processClusterState);


assert newClusterState.nodes().masterNode() != null : "received a cluster state without a master";
assert !newClusterState.blocks().hasGlobalBlock(discoveryService.getNoMasterBlock()) : "received a cluster state with a master block";

@@ -1014,8 +1010,7 @@ public ClusterState execute(ClusterState currentState) {
public void onFailure(String source, Throwable t) {
if (t instanceof ClusterService.NoLongerMasterException) {
logger.debug("not processing [{}] as we are no longer master", source);
}
else {
} else {
logger.error("unexpected failure during [{}]", t, source);
}
}
@@ -257,6 +257,10 @@ public void removeHandler(String action) {
}
}

protected TransportRequestHandler getHandler(String action) {
return serverHandlers.get(action);
}

class Adapter implements TransportServiceAdapter {

final MeanMetric rxMetric = new MeanMetric();

Large diffs are not rendered by default.

@@ -43,7 +43,6 @@
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout;
import static org.hamcrest.Matchers.equalTo;

public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest {
@@ -25,7 +25,6 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.recovery.RecoveryWhileUnderLoadTests;
import org.junit.Assert;

import java.util.concurrent.CopyOnWriteArrayList;
@@ -40,7 +39,7 @@

public class BackgroundIndexer implements AutoCloseable {

private final ESLogger logger = Loggers.getLogger(RecoveryWhileUnderLoadTests.class);
private final ESLogger logger = Loggers.getLogger(getClass());

final Thread[] writers;
final CountDownLatch stopLatch;
@@ -218,7 +217,7 @@ public void continueIndexing(int numOfDocs) {
setBudget(numOfDocs);
}

/** Stop all background threads **/
/** Stop all background threads * */
public void stop() throws InterruptedException {
if (stop.get()) {
return;
@@ -83,6 +83,7 @@
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.test.client.RandomizingClient;
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
import org.junit.*;

import java.io.IOException;
@@ -322,7 +323,7 @@ private void randomIndexTemplate() throws IOException {
if (randomBoolean()) {
mappings.startObject(IdFieldMapper.NAME)
.field("index", randomFrom("not_analyzed", "no"))
.endObject();
.endObject();
}
mappings.startArray("dynamic_templates")
.startObject()
@@ -441,7 +442,7 @@ protected boolean randomizeNumberOfShardsAndReplicas() {
case 3:
builder.put(MergeSchedulerModule.MERGE_SCHEDULER_TYPE_KEY, ConcurrentMergeSchedulerProvider.class);
final int maxThreadCount = RandomInts.randomIntBetween(random, 1, 4);
final int maxMergeCount = RandomInts.randomIntBetween(random, maxThreadCount, maxThreadCount+4);
final int maxMergeCount = RandomInts.randomIntBetween(random, maxThreadCount, maxThreadCount + 4);
builder.put(ConcurrentMergeSchedulerProvider.MAX_MERGE_COUNT, maxMergeCount);
builder.put(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT, maxThreadCount);
break;
@@ -499,6 +500,7 @@ protected final void afterInternal() throws IOException {
boolean success = false;
try {
logger.info("[{}#{}]: cleaning up after test", getTestClass().getSimpleName(), getTestName());
clearDisruptionScheme();
final Scope currentClusterScope = getCurrentClusterScope();
try {
if (currentClusterScope != Scope.TEST) {
@@ -606,6 +608,15 @@ protected int numberOfReplicas() {
return between(minimumNumberOfReplicas(), maximumNumberOfReplicas());
}


public void setDisruptionScheme(ServiceDisruptionScheme scheme) {
internalCluster().setDisruptionScheme(scheme);
}

public void clearDisruptionScheme() {
internalCluster().clearDisruptionScheme();
}

/**
* Returns a settings object used in {@link #createIndex(String...)} and {@link #prepareCreate(String)} and friends.
* This method can be overwritten by subclasses to set defaults for the indices that are created by the test.
@@ -1011,8 +1022,7 @@ public void indexRandom(boolean forceRefresh, boolean dummyDocuments, IndexReque
* @param forceRefresh if <tt>true</tt> all involved indices are refreshed once the documents are indexed. Additionally if <tt>true</tt>
* some empty dummy documents are may be randomly inserted into the document list and deleted once all documents are indexed.
* This is useful to produce deleted documents on the server side.
* @param builders the documents to index.
*
* @param builders the documents to index.
* @see #indexRandom(boolean, boolean, java.util.List)
*/
public void indexRandom(boolean forceRefresh, List<IndexRequestBuilder> builders) throws InterruptedException, ExecutionException {
@@ -1026,10 +1036,10 @@ public void indexRandom(boolean forceRefresh, List<IndexRequestBuilder> builders
* segment or if only one document is in a segment etc. This method prevents issues like this by randomizing the index
* layout.
*
* @param forceRefresh if <tt>true</tt> all involved indices are refreshed once the documents are indexed.
* @param forceRefresh if <tt>true</tt> all involved indices are refreshed once the documents are indexed.
* @param dummyDocuments if <tt>true</tt> some empty dummy documents are may be randomly inserted into the document list and deleted once
* all documents are indexed. This is useful to produce deleted documents on the server side.
* @param builders the documents to index.
* @param builders the documents to index.
*/
public void indexRandom(boolean forceRefresh, boolean dummyDocuments, List<IndexRequestBuilder> builders) throws InterruptedException, ExecutionException {
Random random = getRandom();
@@ -1042,7 +1052,7 @@ public void indexRandom(boolean forceRefresh, boolean dummyDocuments, List<Index
builders = new ArrayList<>(builders);
final String[] indices = indicesSet.toArray(new String[0]);
// inject some bogus docs
final int numBogusDocs = scaledRandomIntBetween(1, builders.size()*2);
final int numBogusDocs = scaledRandomIntBetween(1, builders.size() * 2);
final int unicodeLen = between(1, 10);
for (int i = 0; i < numBogusDocs; i++) {
String id = randomRealisticUnicodeOfLength(unicodeLen);
@@ -1094,10 +1104,10 @@ public void indexRandom(boolean forceRefresh, boolean dummyDocuments, List<Index
}
assertThat(actualErrors, emptyIterable());
if (!bogusIds.isEmpty()) {
// delete the bogus types again - it might trigger merges or at least holes in the segments and enforces deleted docs!
for (Tuple<String, String> doc : bogusIds) {
assertTrue("failed to delete a dummy doc", client().prepareDelete(doc.v1(), RANDOM_BOGUS_TYPE, doc.v2()).get().isFound());
}
// delete the bogus types again - it might trigger merges or at least holes in the segments and enforces deleted docs!
for (Tuple<String, String> doc : bogusIds) {
assertTrue("failed to delete a dummy doc", client().prepareDelete(doc.v1(), RANDOM_BOGUS_TYPE, doc.v2()).get().isFound());
}
}
if (forceRefresh) {
assertNoFailures(client().admin().indices().prepareRefresh(indices).setIndicesOptions(IndicesOptions.lenientExpandOpen()).execute().get());
@@ -43,6 +43,7 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
@@ -69,6 +70,7 @@
import org.elasticsearch.search.SearchService;
import org.elasticsearch.test.cache.recycler.MockBigArraysModule;
import org.elasticsearch.test.cache.recycler.MockPageCacheRecyclerModule;
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
import org.elasticsearch.test.engine.MockEngineModule;
import org.elasticsearch.test.store.MockFSIndexStoreModule;
import org.elasticsearch.test.transport.AssertingLocalTransportModule;
@@ -169,6 +171,8 @@

private final boolean hasFilterCache;

private ServiceDisruptionScheme activeDisruptionScheme;

public InternalTestCluster(long clusterSeed, String clusterName) {
this(clusterSeed, DEFAULT_MIN_NUM_DATA_NODES, DEFAULT_MAX_NUM_DATA_NODES, clusterName, NodeSettingsSource.EMPTY, DEFAULT_NUM_CLIENT_NODES, DEFAULT_ENABLE_RANDOM_BENCH_NODES);
}
@@ -244,6 +248,10 @@ public String getClusterName() {
return clusterName;
}

public String[] getNodeNames() {
return nodes.keySet().toArray(Strings.EMPTY_ARRAY);
}

private static boolean isLocalTransportConfigured() {
if ("local".equals(System.getProperty("es.node.mode", "network"))) {
return true;
@@ -428,6 +436,7 @@ public synchronized void ensureAtMostNumDataNodes(int n) {
while (limit.hasNext()) {
NodeAndClient next = limit.next();
nodesToRemove.add(next);
removeDistruptionSchemeFromNode(next);
next.close();
}
for (NodeAndClient toRemove : nodesToRemove) {
@@ -591,6 +600,10 @@ public boolean apply(NodeAndClient nodeAndClient) {
@Override
public void close() {
if (this.open.compareAndSet(true, false)) {
if (activeDisruptionScheme != null) {
activeDisruptionScheme.testClusterClosed();
activeDisruptionScheme = null;
}
IOUtils.closeWhileHandlingException(nodes.values());
nodes.clear();
executor.shutdownNow();
@@ -768,6 +781,7 @@ public synchronized void beforeTest(Random random, double transportClientRatio)
}

private synchronized void reset(boolean wipeData) {
clearDisruptionScheme();
resetClients(); /* reset all clients - each test gets its own client based on the Random instance created above. */
if (wipeData) {
wipeDataDirectories();
@@ -964,6 +978,7 @@ public synchronized void stopRandomDataNode() {
NodeAndClient nodeAndClient = getRandomNodeAndClient(new DataNodePredicate());
if (nodeAndClient != null) {
logger.info("Closing random node [{}] ", nodeAndClient.name);
removeDistruptionSchemeFromNode(nodeAndClient);
nodes.remove(nodeAndClient.name);
nodeAndClient.close();
}
@@ -983,6 +998,7 @@ public boolean apply(NodeAndClient nodeAndClient) {
});
if (nodeAndClient != null) {
logger.info("Closing filtered random node [{}] ", nodeAndClient.name);
removeDistruptionSchemeFromNode(nodeAndClient);
nodes.remove(nodeAndClient.name);
nodeAndClient.close();
}
@@ -997,6 +1013,7 @@ public synchronized void stopCurrentMasterNode() {
String masterNodeName = getMasterName();
assert nodes.containsKey(masterNodeName);
logger.info("Closing master node [{}] ", masterNodeName);
removeDistruptionSchemeFromNode(nodes.get(masterNodeName));
NodeAndClient remove = nodes.remove(masterNodeName);
remove.close();
}
@@ -1008,6 +1025,7 @@ public void stopRandomNonMasterNode() {
NodeAndClient nodeAndClient = getRandomNodeAndClient(Predicates.not(new MasterNodePredicate(getMasterName())));
if (nodeAndClient != null) {
logger.info("Closing random non master node [{}] current master [{}] ", nodeAndClient.name, getMasterName());
removeDistruptionSchemeFromNode(nodeAndClient);
nodes.remove(nodeAndClient.name);
nodeAndClient.close();
}
@@ -1061,6 +1079,9 @@ private void restartAllNodes(boolean rollingRestart, RestartCallback callback) t
if (!callback.doRestart(nodeAndClient.name)) {
logger.info("Closing node [{}] during restart", nodeAndClient.name);
toRemove.add(nodeAndClient);
if (activeDisruptionScheme != null) {
activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
}
nodeAndClient.close();
}
}
@@ -1075,18 +1096,33 @@ private void restartAllNodes(boolean rollingRestart, RestartCallback callback) t
for (NodeAndClient nodeAndClient : nodes.values()) {
callback.doAfterNodes(numNodesRestarted++, nodeAndClient.nodeClient());
logger.info("Restarting node [{}] ", nodeAndClient.name);
if (activeDisruptionScheme != null) {
activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
}
nodeAndClient.restart(callback);
if (activeDisruptionScheme != null) {
activeDisruptionScheme.applyToNode(nodeAndClient.name, this);
}
}
} else {
int numNodesRestarted = 0;
for (NodeAndClient nodeAndClient : nodes.values()) {
callback.doAfterNodes(numNodesRestarted++, nodeAndClient.nodeClient());
logger.info("Stopping node [{}] ", nodeAndClient.name);
if (activeDisruptionScheme != null) {
activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
}
nodeAndClient.node.close();
}
for (NodeAndClient nodeAndClient : nodes.values()) {
logger.info("Starting node [{}] ", nodeAndClient.name);
if (activeDisruptionScheme != null) {
activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
}
nodeAndClient.restart(callback);
if (activeDisruptionScheme != null) {
activeDisruptionScheme.applyToNode(nodeAndClient.name, this);
}
}
}
}
@@ -1294,6 +1330,7 @@ private synchronized void publishNode(NodeAndClient nodeAndClient) {
dataDirToClean.addAll(Arrays.asList(nodeEnv.nodeDataLocations()));
}
nodes.put(nodeAndClient.name, nodeAndClient);
applyDisruptionSchemeToNode(nodeAndClient);
}

public void closeNonSharedNodes(boolean wipeData) {
@@ -1315,6 +1352,33 @@ public boolean hasFilterCache() {
return hasFilterCache;
}

public void setDisruptionScheme(ServiceDisruptionScheme scheme) {
clearDisruptionScheme();
scheme.applyToCluster(this);
activeDisruptionScheme = scheme;
}

public void clearDisruptionScheme() {
if (activeDisruptionScheme != null) {
activeDisruptionScheme.removeFromCluster(this);
}
activeDisruptionScheme = null;
}

private void applyDisruptionSchemeToNode(NodeAndClient nodeAndClient) {
if (activeDisruptionScheme != null) {
assert nodes.containsKey(nodeAndClient.name);
activeDisruptionScheme.applyToNode(nodeAndClient.name, this);
}
}

private void removeDistruptionSchemeFromNode(NodeAndClient nodeAndClient) {
if (activeDisruptionScheme != null) {
assert nodes.containsKey(nodeAndClient.name);
activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
}
}

private synchronized Collection<NodeAndClient> dataNodeAndClients() {
return Collections2.filter(nodes.values(), new DataNodePredicate());
}
@@ -26,6 +26,7 @@
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.indices.IndexMissingException;

0 comments on commit ef75932

Please sign in to comment.
You can’t perform that action at this time.