Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Index Deletion During Partial Snapshot Create #50234

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ private ShardGenerations(Map<IndexId, List<String>> shardGenerations) {
this.shardGenerations = shardGenerations;
}

/**
* Returns the total number of shards tracked by this instance.
*/
public int totalShards() {
return shardGenerations.values().stream().mapToInt(List::size).sum();
}

/**
* Returns all indices for which shard generations are tracked.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -568,16 +568,17 @@ public void onNoLongerMaster() {
private void cleanupAfterError(Exception exception) {
threadPool.generic().execute(() -> {
if (snapshotCreated) {
final MetaData metaData = clusterService.state().metaData();
repositoriesService.repository(snapshot.snapshot().getRepository())
.finalizeSnapshot(snapshot.snapshot().getSnapshotId(),
buildGenerations(snapshot),
buildGenerations(snapshot, metaData),
snapshot.startTime(),
ExceptionsHelper.stackTrace(exception),
0,
Collections.emptyList(),
snapshot.repositoryStateId(),
snapshot.includeGlobalState(),
metaDataForSnapshot(snapshot, clusterService.state().metaData()),
metaDataForSnapshot(snapshot, metaData),
snapshot.userMetadata(),
snapshot.useShardGenerations(),
ActionListener.runAfter(ActionListener.wrap(ignored -> {
Expand All @@ -593,11 +594,21 @@ private void cleanupAfterError(Exception exception) {
}
}

private static ShardGenerations buildGenerations(SnapshotsInProgress.Entry snapshot) {
private static ShardGenerations buildGenerations(SnapshotsInProgress.Entry snapshot, MetaData metaData) {
ShardGenerations.Builder builder = ShardGenerations.builder();
final Map<String, IndexId> indexLookup = new HashMap<>();
snapshot.indices().forEach(idx -> indexLookup.put(idx.getName(), idx));
snapshot.shards().forEach(c -> builder.put(indexLookup.get(c.key.getIndexName()), c.key.id(), c.value.generation()));
snapshot.shards().forEach(c -> {
if (metaData.index(c.key.getIndex()) == null) {
assert snapshot.partial() :
"Index [" + c.key.getIndex() + "] was deleted during a snapshot but snapshot was not partial.";
return;
}
final IndexId indexId = indexLookup.get(c.key.getIndexName());
if (indexId != null) {
builder.put(indexId, c.key.id(), c.value.generation());
}
});
return builder.build();
}

Expand Down Expand Up @@ -1032,12 +1043,13 @@ protected void doRun() {
shardFailures.add(new SnapshotShardFailure(status.nodeId(), shardId, status.reason()));
}
}
final ShardGenerations shardGenerations = buildGenerations(entry, metaData);
repository.finalizeSnapshot(
snapshot.getSnapshotId(),
buildGenerations(entry),
shardGenerations,
entry.startTime(),
failure,
entry.shards().size(),
entry.partial() ? shardGenerations.totalShards() : entry.shards().size(),
unmodifiableList(shardFailures),
entry.repositoryStateId(),
entry.includeGlobalState(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
Expand Down Expand Up @@ -143,6 +144,7 @@
import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.gateway.MetaStateService;
import org.elasticsearch.gateway.TransportNodesListGatewayStartedShards;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction;
import org.elasticsearch.index.seqno.RetentionLeaseSyncer;
Expand Down Expand Up @@ -211,6 +213,7 @@
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.mockito.Mockito.mock;

Expand Down Expand Up @@ -503,7 +506,7 @@ public void testConcurrentSnapshotCreateAndDeleteOther() {
}
}

public void testConcurrentSnapshotDeleteAndDeleteIndex() {
public void testConcurrentSnapshotDeleteAndDeleteIndex() throws IOException {
setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10));

String repoName = "repo";
Expand All @@ -514,11 +517,13 @@ public void testConcurrentSnapshotDeleteAndDeleteIndex() {
testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state());

final StepListener<Collection<CreateIndexResponse>> createIndicesListener = new StepListener<>();
final int indices = randomIntBetween(5, 20);

final SetOnce<Index> firstIndex = new SetOnce<>();
continueOrDie(createRepoAndIndex(repoName, index, 1), createIndexResponse -> {
firstIndex.set(masterNode.clusterService.state().metaData().index(index).getIndex());
// create a few more indices to make it more likely that the subsequent index delete operation happens before snapshot
// finalization
final int indices = randomIntBetween(5, 20);
final GroupedActionListener<CreateIndexResponse> listener = new GroupedActionListener<>(createIndicesListener, indices);
for (int i = 0; i < indices; ++i) {
client().admin().indices().create(new CreateIndexRequest("index-" + i), listener);
Expand All @@ -527,23 +532,54 @@ public void testConcurrentSnapshotDeleteAndDeleteIndex() {

final StepListener<CreateSnapshotResponse> createSnapshotResponseStepListener = new StepListener<>();

final boolean partialSnapshot = randomBoolean();

continueOrDie(createIndicesListener, createIndexResponses ->
client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName).setWaitForCompletion(false)
.execute(createSnapshotResponseStepListener));
.setPartial(partialSnapshot).execute(createSnapshotResponseStepListener));

continueOrDie(createSnapshotResponseStepListener,
createSnapshotResponse -> client().admin().indices().delete(new DeleteIndexRequest(index), noopListener()));
createSnapshotResponse -> client().admin().indices().delete(new DeleteIndexRequest(index), new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
if (partialSnapshot) {
// Recreate index by the same name to test that we don't snapshot conflicting metadata in this scenario
client().admin().indices().create(new CreateIndexRequest(index), noopListener());
}
}

@Override
public void onFailure(Exception e) {
if (partialSnapshot) {
throw new AssertionError("Delete index should always work during partial snapshots", e);
}
}
}));

deterministicTaskQueue.runAllRunnableTasks();

SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE);
assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false));
final Repository repository = masterNode.repositoriesService.repository(repoName);
Collection<SnapshotId> snapshotIds = getRepositoryData(repository).getSnapshotIds();
final RepositoryData repositoryData = getRepositoryData(repository);
Collection<SnapshotId> snapshotIds = repositoryData.getSnapshotIds();
assertThat(snapshotIds, hasSize(1));

final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotIds.iterator().next());
assertEquals(SnapshotState.SUCCESS, snapshotInfo.state());
if (partialSnapshot) {
// Single shard for each index so we either get all indices or all except for the deleted index
assertThat(snapshotInfo.successfulShards(), either(is(indices + 1)).or(is(indices)));
if (snapshotInfo.successfulShards() == indices + 1) {
final IndexMetaData indexMetaData =
repository.getSnapshotIndexMetaData(snapshotInfo.snapshotId(), repositoryData.resolveIndexId(index));
// Make sure we snapshotted the metadata of this index and not the recreated version
assertEquals(indexMetaData.getIndex(), firstIndex.get());
}
} else {
// Index delete must be blocked for non-partial snapshots and we get a snapshot for every index
assertEquals(snapshotInfo.successfulShards(), indices + 1);
}
assertEquals(0, snapshotInfo.failedShards());
}

Expand Down