Skip to content

Commit

Permalink
Share IT Infrastructure between Core Snapshot and SLM ITs (#59082) (#…
Browse files Browse the repository at this point in the history
…59119)

For #58994 it would be useful to be able to share test infrastructure.
This PR shares `AbstractSnapshotIntegTestCase` for that purpose, dries up SLM tests
accordingly and adds a shared and efficient (compared to the previous implementations)
way of waiting for no running snapshot operations to the test infrastructure to dry things up further.
  • Loading branch information
original-brownbear committed Jul 7, 2020
1 parent e217f9a commit d6d6df1
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
Expand Down Expand Up @@ -137,7 +135,7 @@ public void clusterChanged(ClusterChangedEvent event) {
logger.info("--> waiting for disruption to start");
assertTrue(disruptionStarted.await(1, TimeUnit.MINUTES));

assertAllSnapshotsCompleted();
awaitNoMoreRunningOperations(dataNode);

logger.info("--> verify that snapshot was successful or no longer exist");
assertBusy(() -> {
Expand All @@ -154,13 +152,13 @@ public void clusterChanged(ClusterChangedEvent event) {
logger.info("--> done");

future.get();
assertAllSnapshotsCompleted();
awaitNoMoreRunningOperations(masterNode1);
}

public void testDisruptionAfterFinalization() throws Exception {
final String idxName = "test";
internalCluster().startMasterOnlyNodes(3);
internalCluster().startDataOnlyNode();
final String dataNode = internalCluster().startDataOnlyNode();
ensureStableCluster(4);

createRandomIndex(idxName);
Expand Down Expand Up @@ -205,7 +203,7 @@ public void clusterChanged(ClusterChangedEvent event) {
logger.info("--> waiting for disruption to start");
assertTrue(disruptionStarted.await(1, TimeUnit.MINUTES));

assertAllSnapshotsCompleted();
awaitNoMoreRunningOperations(dataNode);

logger.info("--> verify that snapshot was successful or no longer exist");
assertBusy(() -> {
Expand Down Expand Up @@ -233,7 +231,7 @@ public void clusterChanged(ClusterChangedEvent event) {
assertThat(sne.getSnapshotName(), is(snapshot));
}

assertAllSnapshotsCompleted();
awaitNoMoreRunningOperations(dataNode);
}

public void testDisruptionAfterShardFinalization() throws Exception {
Expand Down Expand Up @@ -322,33 +320,14 @@ public void testMasterFailOverDuringShardSnapshots() throws Exception {
unblockNode(repoName, dataNode);

networkDisruption.stopDisrupting();
assertAllSnapshotsCompleted();
awaitNoMoreRunningOperations(dataNode);

logger.info("--> make sure isolated master responds to snapshot request");
final SnapshotException sne =
expectThrows(SnapshotException.class, () -> snapshotResponse.actionGet(TimeValue.timeValueSeconds(30L)));
assertThat(sne.getMessage(), endsWith("no longer master"));
}

private void assertAllSnapshotsCompleted() throws Exception {
logger.info("--> wait until the snapshot is done");
assertBusy(() -> {
ClusterState state = dataNodeClient().admin().cluster().prepareState().get().getState();
SnapshotsInProgress snapshots = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
SnapshotDeletionsInProgress snapshotDeletionsInProgress =
state.custom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY);
if (snapshots.entries().isEmpty() == false) {
logger.info("Current snapshot state [{}]", snapshots.entries().get(0).state());
fail("Snapshot is still running");
} else if (snapshotDeletionsInProgress.hasDeletionsInProgress()) {
logger.info("Current snapshot deletion state [{}]", snapshotDeletionsInProgress);
fail("Snapshot deletion is still running");
} else {
logger.info("Snapshot is no longer in the cluster state");
}
}, 1L, TimeUnit.MINUTES);
}

private void assertSnapshotExists(String repository, String snapshot) {
GetSnapshotsResponse snapshotsStatusResponse = dataNodeClient().admin().cluster().prepareGetSnapshots(repository)
.setSnapshots(snapshot).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,13 @@
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -37,6 +41,7 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
Expand All @@ -48,6 +53,7 @@
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;

import java.io.IOException;
Expand All @@ -62,7 +68,9 @@
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.empty;
Expand Down Expand Up @@ -373,4 +381,36 @@ protected long getCountForIndex(String indexName) {
protected void assertDocCount(String index, long count) {
assertEquals(getCountForIndex(index), count);
}

protected void awaitNoMoreRunningOperations(String viaNode) throws Exception {
logger.info("--> verify no more operations in the cluster state");
awaitClusterState(viaNode, state -> state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries().isEmpty() &&
state.custom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY).hasDeletionsInProgress() == false);
}

private void awaitClusterState(String viaNode, Predicate<ClusterState> statePredicate) throws Exception {
final ClusterService clusterService = internalCluster().getInstance(ClusterService.class, viaNode);
final ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, viaNode);
final ClusterStateObserver observer = new ClusterStateObserver(clusterService, logger, threadPool.getThreadContext());
if (statePredicate.test(observer.setAndGetObservedState()) == false) {
final PlainActionFuture<Void> future = PlainActionFuture.newFuture();
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
future.onResponse(null);
}

@Override
public void onClusterServiceClose() {
future.onFailure(new NodeClosedException(clusterService.localNode()));
}

@Override
public void onTimeout(TimeValue timeout) {
future.onFailure(new TimeoutException());
}
}, statePredicate);
future.get(30L, TimeUnit.SECONDS);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,16 @@
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase;
import org.elasticsearch.snapshots.ConcurrentSnapshotExecutionException;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotMissingException;
Expand Down Expand Up @@ -54,7 +52,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.empty;
Expand All @@ -65,7 +62,7 @@
* Tests for Snapshot Lifecycle Management that require a slow or blocked snapshot repo (using {@link MockRepository}
*/
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase {
public class SLMSnapshotBlockingIntegTests extends AbstractSnapshotIntegTestCase {
private static final String NEVER_EXECUTE_CRON_SCHEDULE = "* * * 31 FEB ? *";

private static final String REPO = "repo-id";
Expand Down Expand Up @@ -117,9 +114,7 @@ public void testSnapshotInProgress() throws Exception {
for (int i = 0; i < docCount; i++) {
index(indexName, "_doc", i + "", Collections.singletonMap("foo", "bar"));
}

// Create a snapshot repo
initializeRepo(REPO);
createRepository(REPO, "mock");

logger.info("--> creating policy {}", policyName);
createSnapshotPolicy(policyName, "snap", NEVER_EXECUTE_CRON_SCHEDULE, REPO, indexName, true);
Expand Down Expand Up @@ -150,7 +145,7 @@ public void testSnapshotInProgress() throws Exception {

logger.info("--> unblocking snapshots");
unblockAllDataNodes(REPO);
unblockRepo(REPO);
unblockNode(REPO, internalCluster().getMasterName());

// Cancel/delete the snapshot
try {
Expand All @@ -167,8 +162,7 @@ public void testRetentionWhileSnapshotInProgress() throws Exception {
for (int i = 0; i < docCount; i++) {
index(indexName, "_doc", null, Collections.singletonMap("foo", "bar"));
}

initializeRepo(REPO);
createRepository(REPO, "mock");

logger.info("--> creating policy {}", policyId);
createSnapshotPolicy(policyId, "snap", NEVER_EXECUTE_CRON_SCHEDULE, REPO, indexName, true,
Expand All @@ -190,12 +184,7 @@ public void testRetentionWhileSnapshotInProgress() throws Exception {
}
});

// Wait for all running snapshots to be cleared from cluster state
assertBusy(() -> {
logger.info("--> waiting for cluster state to be clear of snapshots");
ClusterState state = client().admin().cluster().prepareState().setCustoms(true).get().getState();
assertTrue("cluster state was not ready for deletion " + state, SnapshotRetentionTask.okayToDeleteSnapshots(state));
});
awaitNoMoreRunningOperations(randomFrom(dataNodeNames));

logger.info("--> indexing more docs to force new segment files");
for (int i = 0; i < docCount; i++) {
Expand All @@ -211,11 +200,10 @@ public void testRetentionWhileSnapshotInProgress() throws Exception {
final String secondSnapName = executePolicy(policyId);
logger.info("--> executed policy, got snapname [{}]", secondSnapName);


// Check that the executed snapshot shows up in the SLM output as in_progress
logger.info("--> Waiting for at least one data node to hit the block");
waitForBlockOnAnyDataNode(REPO, TimeValue.timeValueSeconds(30L));
assertBusy(() -> {
logger.info("--> Waiting for at least one data node to hit the block");
assertTrue(dataNodeNames.stream().anyMatch(node -> checkBlocked(node, REPO)));
logger.info("--> at least one data node has hit the block");
GetSnapshotLifecycleAction.Response getResp =
client().execute(GetSnapshotLifecycleAction.INSTANCE, new GetSnapshotLifecycleAction.Request(policyId)).get();
Expand All @@ -238,7 +226,7 @@ public void testRetentionWhileSnapshotInProgress() throws Exception {
new ExecuteSnapshotRetentionAction.Request()).get().isAcknowledged());

logger.info("--> unblocking snapshots");
unblockRepo(REPO);
unblockNode(REPO, internalCluster().getMasterName());
unblockAllDataNodes(REPO);

// Check that the snapshot created by the policy has been removed by retention
Expand Down Expand Up @@ -275,7 +263,7 @@ public void testRetentionWhileSnapshotInProgress() throws Exception {
assertThat(resp.getHits().getTotalHits().value, equalTo(2L));
});
} finally {
unblockRepo(REPO);
unblockNode(REPO, internalCluster().getMasterName());
unblockAllDataNodes(REPO);
}
}
Expand All @@ -295,9 +283,7 @@ private void testUnsuccessfulSnapshotRetention(boolean partialSuccess) throws Ex
final SnapshotState expectedUnsuccessfulState = partialSuccess ? SnapshotState.PARTIAL : SnapshotState.FAILED;
// Setup
createAndPopulateIndex(indexName);

// Create a snapshot repo
initializeRepo(REPO);
createRepository(REPO, "mock");

createSnapshotPolicy(policyId, "snap", NEVER_EXECUTE_CRON_SCHEDULE, REPO, indexName, true,
partialSuccess, new SnapshotRetentionConfiguration(null, 1, 2));
Expand Down Expand Up @@ -355,7 +341,7 @@ private void testUnsuccessfulSnapshotRetention(boolean partialSuccess) throws Ex
createAndPopulateIndex(indexName);

logger.info("--> unblocking snapshots");
unblockRepo(REPO);
unblockNode(REPO, internalCluster().getMasterName());
unblockAllDataNodes(REPO);

logger.info("--> taking new snapshot");
Expand Down Expand Up @@ -405,18 +391,14 @@ private void testUnsuccessfulSnapshotRetention(boolean partialSuccess) throws Ex
assertEquals(SnapshotState.SUCCESS, snapshotInfo.state());
});
}
awaitNoMoreRunningOperations(internalCluster().getMasterName());
}

public void testSLMRetentionAfterRestore() throws Exception {
final String indexName = "test";
final String policyName = "test-policy";
int docCount = 20;
for (int i = 0; i < docCount; i++) {
index(indexName, "_doc", i + "", Collections.singletonMap("foo", "bar"));
}

// Create a snapshot repo
initializeRepo(REPO);
indexRandomDocs(indexName, 20);
createRepository(REPO, "mock");

logger.info("--> creating policy {}", policyName);
createSnapshotPolicy(policyName, "snap", NEVER_EXECUTE_CRON_SCHEDULE, REPO, indexName, true, false,
Expand Down Expand Up @@ -475,27 +457,9 @@ private SnapshotsStatusResponse getSnapshotStatus(String snapshotName) {

private void createAndPopulateIndex(String indexName) throws InterruptedException {
logger.info("--> creating and populating index [{}]", indexName);
assertAcked(prepareCreate(indexName, 0, Settings.builder()
.put("number_of_shards", 6).put("number_of_replicas", 0)));
assertAcked(prepareCreate(indexName, 0, indexSettingsNoReplicas(6)));
ensureGreen();

final int numdocs = randomIntBetween(50, 100);
IndexRequestBuilder[] builders = new IndexRequestBuilder[numdocs];
for (int i = 0; i < builders.length; i++) {
builders[i] = client().prepareIndex(indexName, SINGLE_MAPPING_NAME, Integer.toString(i)).setSource("field1", "bar " + i);
}
indexRandom(true, builders);
flushAndRefresh();
}

private void initializeRepo(String repoName) {
client().admin().cluster().preparePutRepository(repoName)
.setType("mock")
.setSettings(Settings.builder()
.put("compress", randomBoolean())
.put("location", randomAlphaOfLength(6))
.build())
.get();
indexRandomDocs(indexName, randomIntBetween(50, 100));
}

private void createSnapshotPolicy(String policyName, String snapshotNamePattern, String schedule, String REPO,
Expand Down Expand Up @@ -546,49 +510,4 @@ private String executePolicy(String policyId) {
return "bad";
}
}

public static String blockMasterFromFinalizingSnapshotOnIndexFile(final String repositoryName) {
final String masterName = internalCluster().getMasterName();
((MockRepository)internalCluster().getInstance(RepositoriesService.class, masterName)
.repository(repositoryName)).setBlockOnWriteIndexFile(true);
return masterName;
}

public static String unblockRepo(final String repositoryName) {
final String masterName = internalCluster().getMasterName();
((MockRepository)internalCluster().getInstance(RepositoriesService.class, masterName)
.repository(repositoryName)).unblock();
return masterName;
}

public static void blockAllDataNodes(String repository) {
for(RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) {
((MockRepository)repositoriesService.repository(repository)).blockOnDataFiles(true);
}
}

public static void unblockAllDataNodes(String repository) {
for(RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) {
((MockRepository)repositoriesService.repository(repository)).unblock();
}
}

public void waitForBlock(String node, String repository, TimeValue timeout) throws InterruptedException {
long start = System.currentTimeMillis();
RepositoriesService repositoriesService = internalCluster().getInstance(RepositoriesService.class, node);
MockRepository mockRepository = (MockRepository) repositoriesService.repository(repository);
while (System.currentTimeMillis() - start < timeout.millis()) {
if (mockRepository.blocked()) {
return;
}
Thread.sleep(100);
}
fail("Timeout waiting for node [" + node + "] to be blocked");
}

public boolean checkBlocked(String node, String repository) {
RepositoriesService repositoriesService = internalCluster().getInstance(RepositoriesService.class, node);
MockRepository mockRepository = (MockRepository) repositoriesService.repository(repository);
return mockRepository.blocked();
}
}

0 comments on commit d6d6df1

Please sign in to comment.