Skip to content

Commit

Permalink
[TEST] fix synced flush for bwc tests
Browse files Browse the repository at this point in the history
  • Loading branch information
brwe committed May 30, 2015
1 parent 7a40162 commit d22e134
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 3 deletions.
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.indices.flush.IndicesSyncedFlushResult;
import org.elasticsearch.indices.flush.SyncedFlushUtil;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.test.ElasticsearchBackwardsCompatIntegrationTest;
Expand Down Expand Up @@ -78,7 +79,12 @@ public void testReusePeerRecovery() throws Exception {
ensureGreen();
if (randomBoolean()) { // just make sure it doesn't break anything - we seal before we actually bump replicas
logger.info("--> trying to sync flush");
assertEquals(SyncedFlushUtil.attemptSyncedFlush(backwardsCluster().internalCluster(), "test").failedShards(), 0);
// we should not try a synced flush if we do not have at least one node with current version in it
// because old versions might not have an instance of SyncedFlushService (added in 1.6)
if (backwardsCluster().numNewDataNodes() > 0) {
// we try and sync flush but do not check the result as some of the shards might fail if they are on an older version
SyncedFlushUtil.attemptSyncedFlush(backwardsCluster().internalCluster(), "test");
}
}
logger.info("--> bump number of replicas from 0 to 1");
client().admin().indices().prepareFlush().execute().actionGet();
Expand Down
Expand Up @@ -1478,8 +1478,27 @@ private void postIndexAsyncActions(String[] indices, List<CountDownLatch> inFlig
client().admin().indices().prepareFlush(indices).setIndicesOptions(IndicesOptions.lenientExpandOpen()).execute(
new LatchedActionListener<FlushResponse>(newLatch(inFlightAsyncOperations)));
} else {
internalCluster().getInstance(SyncedFlushService.class).attemptSyncedFlush(indices, IndicesOptions.lenientExpandOpen(),
new LatchedActionListener<IndicesSyncedFlushResult>(newLatch(inFlightAsyncOperations)));
InternalTestCluster internalTestCluster = null;
// get an instance of InternalTestCluster if possible
if (!isInternalCluster()) {
// see if it is a composite cluster and get the internal cluster from it
// for a backwards compat test this would contain the nodes with the current version
if (cluster() instanceof CompositeTestCluster) {
CompositeTestCluster compositeTestCluster = (CompositeTestCluster) cluster();
// we should not try a synced flush if we do not have at least one node with current version in it
// because old versions might not have an instance of SyncedFlushService (added in 1.6)
if (compositeTestCluster.numNewDataNodes() > 0) {
internalTestCluster = compositeTestCluster.internalCluster();
}
}
} else {
internalTestCluster = internalCluster();
}
if (internalTestCluster != null) {
// try a synced flush on the index
internalTestCluster.getInstance(SyncedFlushService.class).attemptSyncedFlush(indices, IndicesOptions.lenientExpandOpen(),
new LatchedActionListener<IndicesSyncedFlushResult>(newLatch(inFlightAsyncOperations)));
}
}
} else if (rarely()) {
client().admin().indices().prepareOptimize(indices).setIndicesOptions(IndicesOptions.lenientExpandOpen()).setMaxNumSegments(between(1, 10)).setFlush(maybeFlush && randomBoolean()).execute(
Expand Down

0 comments on commit d22e134

Please sign in to comment.