Skip to content

Commit

Permalink
CR: handle index recreate and stronger tests
Browse files Browse the repository at this point in the history
  • Loading branch information
original-brownbear committed Dec 16, 2019
1 parent 8a80b78 commit d219dfa
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ private ShardGenerations(Map<IndexId, List<String>> shardGenerations) {
this.shardGenerations = shardGenerations;
}

/**
* Returns the total number of shards tracked by this instance.
*/
public int totalShards() {
return shardGenerations.values().stream().mapToInt(List::size).sum();
}

/**
* Returns all indices for which shard generations are tracked.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -597,14 +597,13 @@ private void cleanupAfterError(Exception exception) {
private static ShardGenerations buildGenerations(SnapshotsInProgress.Entry snapshot, MetaData metaData) {
ShardGenerations.Builder builder = ShardGenerations.builder();
final Map<String, IndexId> indexLookup = new HashMap<>();
snapshot.indices().forEach(idx -> {
if (metaData.index(idx.getName()) != null) {
indexLookup.put(idx.getName(), idx);
} else {
assert snapshot.partial() : "Index [" + idx + "] was deleted during a snapshot but snapshot was not partial.";
}
});
snapshot.indices().forEach(idx -> indexLookup.put(idx.getName(), idx));
snapshot.shards().forEach(c -> {
if (metaData.index(c.key.getIndex()) == null) {
assert snapshot.partial() :
"Index [" + c.key.getIndex() + "] was deleted during a snapshot but snapshot was not partial.";
return;
}
final IndexId indexId = indexLookup.get(c.key.getIndexName());
if (indexId != null) {
builder.put(indexId, c.key.id(), c.value.generation());
Expand Down Expand Up @@ -1044,12 +1043,13 @@ protected void doRun() {
shardFailures.add(new SnapshotShardFailure(status.nodeId(), shardId, status.reason()));
}
}
final ShardGenerations shardGenerations = buildGenerations(entry, metaData);
repository.finalizeSnapshot(
snapshot.getSnapshotId(),
buildGenerations(entry, metaData),
shardGenerations,
entry.startTime(),
failure,
entry.shards().size(),
entry.partial() ? shardGenerations.totalShards() : entry.shards().size(),
unmodifiableList(shardFailures),
entry.repositoryStateId(),
entry.includeGlobalState(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
Expand Down Expand Up @@ -143,6 +144,7 @@
import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.gateway.MetaStateService;
import org.elasticsearch.gateway.TransportNodesListGatewayStartedShards;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction;
import org.elasticsearch.index.seqno.RetentionLeaseSyncer;
Expand Down Expand Up @@ -504,7 +506,7 @@ public void testConcurrentSnapshotCreateAndDeleteOther() {
}
}

public void testConcurrentSnapshotDeleteAndDeleteIndex() {
public void testConcurrentSnapshotDeleteAndDeleteIndex() throws IOException {
setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10));

String repoName = "repo";
Expand All @@ -517,7 +519,9 @@ public void testConcurrentSnapshotDeleteAndDeleteIndex() {
final StepListener<Collection<CreateIndexResponse>> createIndicesListener = new StepListener<>();
final int indices = randomIntBetween(5, 20);

final SetOnce<Index> firstIndex = new SetOnce<>();
continueOrDie(createRepoAndIndex(repoName, index, 1), createIndexResponse -> {
firstIndex.set(masterNode.clusterService.state().metaData().index(index).getIndex());
// create a few more indices to make it more likely that the subsequent index delete operation happens before snapshot
// finalization
final GroupedActionListener<CreateIndexResponse> listener = new GroupedActionListener<>(createIndicesListener, indices);
Expand All @@ -535,21 +539,43 @@ public void testConcurrentSnapshotDeleteAndDeleteIndex() {
.setPartial(partialSnapshot).execute(createSnapshotResponseStepListener));

continueOrDie(createSnapshotResponseStepListener,
createSnapshotResponse -> client().admin().indices().delete(new DeleteIndexRequest(index), noopListener()));
createSnapshotResponse -> client().admin().indices().delete(new DeleteIndexRequest(index), new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
if (partialSnapshot) {
// Recreate index by the same name to test that we don't snapshot conflicting metadata in this scenario
client().admin().indices().create(new CreateIndexRequest(index), noopListener());
}
}

@Override
public void onFailure(Exception e) {
if (partialSnapshot) {
throw new AssertionError("Delete index should always work during partial snapshots", e);
}
}
}));

deterministicTaskQueue.runAllRunnableTasks();

SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE);
assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false));
final Repository repository = masterNode.repositoriesService.repository(repoName);
Collection<SnapshotId> snapshotIds = getRepositoryData(repository).getSnapshotIds();
final RepositoryData repositoryData = getRepositoryData(repository);
Collection<SnapshotId> snapshotIds = repositoryData.getSnapshotIds();
assertThat(snapshotIds, hasSize(1));

final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotIds.iterator().next());
assertEquals(SnapshotState.SUCCESS, snapshotInfo.state());
if (partialSnapshot) {
// Single shard for each index so we either get all indices or all except for the deleted index
assertThat(snapshotInfo.successfulShards(), either(is(indices + 1)).or(is(indices)));
if (snapshotInfo.successfulShards() == indices + 1) {
final IndexMetaData indexMetaData =
repository.getSnapshotIndexMetaData(snapshotInfo.snapshotId(), repositoryData.resolveIndexId(index));
// Make sure we snapshotted the metadata of this index and not the recreated version
assertEquals(indexMetaData.getIndex(), firstIndex.get());
}
} else {
// Index delete must be blocked for non-partial snapshots and we get a snapshot for every index
assertEquals(snapshotInfo.successfulShards(), indices + 1);
Expand Down

0 comments on commit d219dfa

Please sign in to comment.