Skip to content

Commit

Permalink
Fix Snapshot + DataStream RollOver Bugs (#68258) (#68277)
Browse files Browse the repository at this point in the history
There were a number of issues around data streams rolling over during
a snapshot that are fixed in this PR:

1. If partial snapshots cause a data stream to not be fully snapshotted
because an index gets concurrently deleted we must not add it to the
resulting cluster state (otherwise we trip assertions on either snapshot itself
or a later restore, depending on whether or not the complete global state is
snapshotted).
2. When a non-partial snapshot is running we must not allow a datastream rollover,
otherwise we cannot finish the datastream snapshot correctly because the newly
created write index has not become part of the snapshot.
3. If any part of a datastream fails snapshotting or gets concurrently deleted during
a partial snapshot we must not add it to `SnapshotInfo` same as we do not add concurrently
deleted indices to it.
  • Loading branch information
original-brownbear committed Feb 1, 2021
1 parent a440fdf commit f933f9c
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.snapshots.SnapshotInProgressException;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Arrays;
Expand Down Expand Up @@ -211,6 +213,18 @@ private RolloverResult rolloverAlias(ClusterState currentState, IndexAbstraction
private RolloverResult rolloverDataStream(ClusterState currentState, IndexAbstraction.DataStream dataStream, String dataStreamName,
CreateIndexRequest createIndexRequest, List<Condition<?>> metConditions,
boolean silent, boolean onlyValidate) throws Exception {

if (SnapshotsService.snapshottingDataStreams(currentState, Collections.singleton(dataStream.getName())).isEmpty() == false) {
// we can't roll over the snapshot concurrently because the snapshot contains the indices that existed when it was started but
// the cluster metadata of when it completes so the new write index would not exist in the snapshot if there was a concurrent
// rollover
throw new SnapshotInProgressException(
"Cannot roll over data stream that is being snapshotted: "
+ dataStream.getName()
+ ". Try again after snapshot finishes or cancel the currently running snapshot."
);
}

lookupTemplateForDataStream(dataStreamName, currentState.metadata());

final Version minNodeVersion = currentState.nodes().getMinNodeVersion();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1000,15 +1000,29 @@ private static Metadata metadataForSnapshot(SnapshotsInProgress.Entry snapshot,
} else {
builder = Metadata.builder(metadata);
}
// Only keep those data streams in the metadata that were actually requested by the initial snapshot create operation
Map<String, DataStream> dataStreams = new HashMap<>();
// Only keep those data streams in the metadata that were actually requested by the initial snapshot create operation and that have
// all their indices contained in the snapshot
final Map<String, DataStream> dataStreams = new HashMap<>();
final Set<String> indicesInSnapshot = snapshot.indices().stream().map(IndexId::getName).collect(Collectors.toSet());
for (String dataStreamName : snapshot.dataStreams()) {
DataStream dataStream = metadata.dataStreams().get(dataStreamName);
if (dataStream == null) {
assert snapshot.partial() : "Data stream [" + dataStreamName +
"] was deleted during a snapshot but snapshot was not partial.";
} else {
dataStreams.put(dataStreamName, dataStream);
boolean missingIndex = false;
for (Index index : dataStream.getIndices()) {
final String indexName = index.getName();
if (builder.get(indexName) == null || indicesInSnapshot.contains(indexName) == false) {
assert snapshot.partial() : "Data stream [" + dataStreamName +
"] is missing index [" + index + "] but snapshot was not partial.";
missingIndex = true;
break;
}
}
if (missingIndex == false) {
dataStreams.put(dataStreamName, dataStream);
}
}
}
return builder.dataStreams(dataStreams).build();
Expand Down Expand Up @@ -1527,12 +1541,6 @@ private void finalizeSnapshotEntry(SnapshotsInProgress.Entry entry, Metadata met
}
final ShardGenerations shardGenerations = buildGenerations(entry, metadata);
final String repository = snapshot.getRepository();
final SnapshotInfo snapshotInfo = new SnapshotInfo(snapshot.getSnapshotId(),
shardGenerations.indices().stream().map(IndexId::getName).collect(Collectors.toList()),
entry.dataStreams(),
entry.startTime(), failure, threadPool.absoluteTimeInMillis(),
entry.partial() ? shardGenerations.totalShards() : entry.shards().size(), shardFailures,
entry.includeGlobalState(), entry.userMetadata());
final StepListener<Metadata> metadataListener = new StepListener<>();
final Repository repo = repositoriesService.repository(snapshot.getRepository());
if (entry.isClone()) {
Expand All @@ -1559,18 +1567,30 @@ private void finalizeSnapshotEntry(SnapshotsInProgress.Entry entry, Metadata met
} else {
metadataListener.onResponse(metadata);
}
metadataListener.whenComplete(meta -> repo.finalizeSnapshot(
shardGenerations,
repositoryData.getGenId(),
metadataForSnapshot(entry, meta),
snapshotInfo,
entry.version(),
state -> stateWithoutSnapshot(state, snapshot),
ActionListener.wrap(newRepoData -> {
completeListenersIgnoringException(endAndGetListenersToResolve(snapshot), Tuple.tuple(newRepoData, snapshotInfo));
logger.info("snapshot [{}] completed with state [{}]", snapshot, snapshotInfo.state());
runNextQueuedOperation(newRepoData, repository, true);
}, e -> handleFinalizationFailure(e, entry, repositoryData))),
metadataListener.whenComplete(meta -> {
final Metadata metaForSnapshot = metadataForSnapshot(entry, meta);
final SnapshotInfo snapshotInfo = new SnapshotInfo(snapshot.getSnapshotId(),
shardGenerations.indices().stream().map(IndexId::getName).collect(Collectors.toList()),
entry.partial() ? entry.dataStreams().stream()
.filter(metaForSnapshot.dataStreams()::containsKey)
.collect(Collectors.toList()) : entry.dataStreams(),
entry.startTime(), failure, threadPool.absoluteTimeInMillis(),
entry.partial() ? shardGenerations.totalShards() : entry.shards().size(), shardFailures,
entry.includeGlobalState(), entry.userMetadata());
repo.finalizeSnapshot(
shardGenerations,
repositoryData.getGenId(),
metaForSnapshot,
snapshotInfo,
entry.version(),
state -> stateWithoutSnapshot(state, snapshot),
ActionListener.wrap(newRepoData -> {
completeListenersIgnoringException(
endAndGetListenersToResolve(snapshot), Tuple.tuple(newRepoData, snapshotInfo));
logger.info("snapshot [{}] completed with state [{}]", snapshot, snapshotInfo.state());
runNextQueuedOperation(newRepoData, repository, true);
}, e -> handleFinalizationFailure(e, entry, repositoryData)));
},
e -> handleFinalizationFailure(e, entry, repositoryData));
} catch (Exception e) {
assert false : new AssertionError(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,16 @@
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
import org.elasticsearch.action.admin.indices.rollover.RolloverResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase;
import org.elasticsearch.snapshots.RestoreInfo;
import org.elasticsearch.snapshots.SnapshotInProgressException;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotRestoreException;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.xpack.core.action.CreateDataStreamAction;
import org.elasticsearch.xpack.core.action.DeleteDataStreamAction;
import org.elasticsearch.xpack.core.action.GetDataStreamAction;
Expand All @@ -26,10 +33,6 @@
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotRestoreException;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.snapshots.mockstore.MockRepository;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xpack.datastreams.DataStreamsPlugin;
Expand Down Expand Up @@ -494,6 +497,52 @@ public void testCloneSnapshotThatIncludesDataStream() throws Exception {
.setIndices(indexWithoutDataStream)
.get()
);
}

public void testSnapshotDSDuringRollover() throws Exception {
// repository consistency check requires at least one snapshot per registered repository
createFullSnapshot(REPO, "snap-so-repo-checks-pass");
final String repoName = "mock-repo";
createRepository(repoName, "mock");
final boolean partial = randomBoolean();
blockAllDataNodes(repoName);
final String snapshotName = "ds-snap";
final ActionFuture<CreateSnapshotResponse> snapshotFuture = client().admin()
.cluster()
.prepareCreateSnapshot(repoName, snapshotName)
.setWaitForCompletion(true)
.setPartial(partial)
.setIncludeGlobalState(randomBoolean())
.execute();
waitForBlockOnAnyDataNode(repoName);
awaitNumberOfSnapshotsInProgress(1);
final ActionFuture<RolloverResponse> rolloverResponse = client().admin().indices().rolloverIndex(new RolloverRequest("ds", null));

if (partial) {
assertTrue(rolloverResponse.get().isRolledOver());
} else {
SnapshotInProgressException e = expectThrows(SnapshotInProgressException.class, rolloverResponse::actionGet);
assertThat(e.getMessage(), containsString("Cannot roll over data stream that is being snapshotted:"));
}
unblockAllDataNodes(repoName);
final SnapshotInfo snapshotInfo = assertSuccessful(snapshotFuture);

if (snapshotInfo.dataStreams().contains("ds")) {
assertAcked(client().execute(DeleteDataStreamAction.INSTANCE, new DeleteDataStreamAction.Request(new String[] { "ds" })).get());

RestoreInfo restoreSnapshotResponse = client().admin()
.cluster()
.prepareRestoreSnapshot(repoName, snapshotName)
.setWaitForCompletion(true)
.setIndices("ds")
.get()
.getRestoreInfo();

assertEquals(restoreSnapshotResponse.successfulShards(), restoreSnapshotResponse.totalShards());
assertEquals(restoreSnapshotResponse.failedShards(), 0);
assertFalse(partial);
} else {
assertTrue(partial);
}
}
}

0 comments on commit f933f9c

Please sign in to comment.