Skip to content

Commit

Permalink
DONE should mean fully processed in snapshot status (#86414)
Browse files Browse the repository at this point in the history
Today in an active snapshot the snapshot status API sometimes shows
shards in state `DONE` but with `processed` stats that do not match the
`incremental` stats that show the work to be done. The discrepancy is
due to files that are stored in the shard snapshot metadata rather than
in their own blobs. This commit fixes the discrepancy.

It also addresses a test bug in `testConcurrentCreateAndStatusAPICalls`
which wasn't actually running the status API concurrently with the
snapshot creations.
  • Loading branch information
DaveCTurner committed May 4, 2022
1 parent 9e54198 commit b932809
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 7 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/86414.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 86414
summary: DONE should mean fully processed in snapshot status
area: Snapshot/Restore
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusRequest;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
Expand All @@ -39,6 +42,7 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.allOf;
Expand All @@ -50,6 +54,7 @@
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.oneOf;

public class SnapshotStatusApisIT extends AbstractSnapshotIntegTestCase {

Expand Down Expand Up @@ -630,33 +635,77 @@ public void testGetSnapshotsRequest() throws Exception {
}

public void testConcurrentCreateAndStatusAPICalls() throws Exception {
for (int i = 0; i < randomIntBetween(1, 10); i++) {
createIndexWithContent("test-idx-" + i);
}
final var indexNames = IntStream.range(0, between(1, 10)).mapToObj(i -> "test-idx-" + i).toList();
indexNames.forEach(this::createIndexWithContent);
final String repoName = "test-repo";
createRepository(repoName, "fs");

if (randomBoolean()) {
// sometimes cause some deduplication
createSnapshot(repoName, "initial_snapshot", List.of());
for (final var indexName : indexNames) {
if (randomBoolean()) {
indexDoc(indexName, "another_id", "baz", "quux");
}
}
}

final int snapshots = randomIntBetween(10, 20);
final List<ActionFuture<SnapshotsStatusResponse>> statuses = new ArrayList<>(snapshots);
final List<ActionFuture<GetSnapshotsResponse>> gets = new ArrayList<>(snapshots);
final Client dataNodeClient = dataNodeClient();
final String[] snapshotNames = createNSnapshots(repoName, snapshots).toArray(Strings.EMPTY_ARRAY);

for (int i = 0; i < snapshots; i++) {
final var snapshotNames = IntStream.range(0, snapshots).mapToObj(i -> "test-snap-" + i).toArray(String[]::new);
final var waitForCompletion = randomBoolean();
final var createsListener = new PlainActionFuture<Void>();
final var createsGroupedListener = new GroupedActionListener<CreateSnapshotResponse>(
createsListener.map(ignored -> null),
snapshotNames.length
);
for (final var snapshotName : snapshotNames) {
clusterAdmin().prepareCreateSnapshot(repoName, snapshotName)
.setWaitForCompletion(waitForCompletion)
.execute(createsGroupedListener);
}
createsListener.get(60, TimeUnit.SECONDS);

// run enough parallel status requests to max out the SNAPSHOT_META threadpool
final var metaThreadPoolSize = internalCluster().getCurrentMasterNodeInstance(ThreadPool.class)
.info(ThreadPool.Names.SNAPSHOT_META)
.getMax();
for (int i = 0; i < metaThreadPoolSize * 2; i++) {
statuses.add(dataNodeClient.admin().cluster().prepareSnapshotStatus(repoName).setSnapshots(snapshotNames).execute());
gets.add(dataNodeClient.admin().cluster().prepareGetSnapshots(repoName).setSnapshots(snapshotNames).execute());
}

// ... and then some more status requests until all snapshots are done
var masterClusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
assertBusy(() -> {
final var stillRunning = masterClusterService.state()
.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY)
.isEmpty() == false;
statuses.add(dataNodeClient.admin().cluster().prepareSnapshotStatus(repoName).setSnapshots(snapshotNames).execute());
gets.add(dataNodeClient.admin().cluster().prepareGetSnapshots(repoName).setSnapshots(snapshotNames).execute());
assertFalse(stillRunning);
}, 60, TimeUnit.SECONDS);

for (ActionFuture<SnapshotsStatusResponse> status : statuses) {
assertThat(status.get().getSnapshots(), hasSize(snapshots));
for (SnapshotStatus snapshot : status.get().getSnapshots()) {
assertThat(snapshot.getState(), allOf(not(SnapshotsInProgress.State.FAILED), not(SnapshotsInProgress.State.ABORTED)));
for (final var shard : snapshot.getShards()) {
if (shard.getStage() == SnapshotIndexShardStage.DONE) {
assertEquals(shard.getStats().getIncrementalFileCount(), shard.getStats().getProcessedFileCount());
assertEquals(shard.getStats().getIncrementalSize(), shard.getStats().getProcessedSize());
}
}
}
}
for (ActionFuture<GetSnapshotsResponse> get : gets) {
final List<SnapshotInfo> snapshotInfos = get.get().getSnapshots();
assertThat(snapshotInfos, hasSize(snapshots));
for (SnapshotInfo snapshotInfo : snapshotInfos) {
assertThat(snapshotInfo.state(), is(SnapshotState.SUCCESS));
assertThat(snapshotInfo.state(), oneOf(SnapshotState.IN_PROGRESS, SnapshotState.SUCCESS));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,11 @@ public synchronized void addProcessedFile(long size) {
processedSize += size;
}

public synchronized void addProcessedFiles(int count, long totalSize) {
processedFileCount += count;
processedSize += totalSize;
}

/**
* Returns a copy of the current {@link IndexShardSnapshotStatus}. This method is
* intended to be used when a coherent state of {@link IndexShardSnapshotStatus} is needed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2679,6 +2679,8 @@ public void snapshotShard(SnapshotShardContext context) {
long indexIncrementalSize = 0;
long indexTotalFileSize = 0;
final BlockingQueue<BlobStoreIndexShardSnapshot.FileInfo> filesToSnapshot = new LinkedBlockingQueue<>();
int filesInShardMetadataCount = 0;
long filesInShardMetadataSize = 0;

if (store.indexSettings().getIndexMetadata().isSearchableSnapshot()) {
indexCommitPointFiles = Collections.emptyList();
Expand Down Expand Up @@ -2737,8 +2739,11 @@ public void snapshotShard(SnapshotShardContext context) {
indexCommitPointFiles.add(snapshotFileInfo);
if (needsWrite) {
filesToSnapshot.add(snapshotFileInfo);
} else {
assert assertFileContentsMatchHash(snapshotStatus, snapshotFileInfo, store);
filesInShardMetadataCount += 1;
filesInShardMetadataSize += md.length();
}
assert needsWrite || assertFileContentsMatchHash(snapshotStatus, snapshotFileInfo, store);
} else {
indexCommitPointFiles.add(existingFileInfo);
}
Expand Down Expand Up @@ -2787,6 +2792,7 @@ public void snapshotShard(SnapshotShardContext context) {
compress,
serializationParams
);
snapshotStatus.addProcessedFiles(filesInShardMetadataCount, filesInShardMetadataSize);
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(
shardId,
Expand Down Expand Up @@ -2815,6 +2821,9 @@ public void snapshotShard(SnapshotShardContext context) {
+ indexGeneration
+ "] when deleting index-N blobs "
+ blobsToDelete;
final var finalFilesInShardMetadataCount = filesInShardMetadataCount;
final var finalFilesInShardMetadataSize = filesInShardMetadataSize;

afterWriteSnapBlob = () -> {
try {
final Map<String, String> serializationParams = Collections.singletonMap(
Expand All @@ -2833,6 +2842,7 @@ public void snapshotShard(SnapshotShardContext context) {
e
);
}
snapshotStatus.addProcessedFiles(finalFilesInShardMetadataCount, finalFilesInShardMetadataSize);
try {
deleteFromContainer(shardContainer, blobsToDelete.iterator());
} catch (IOException e) {
Expand Down

0 comments on commit b932809

Please sign in to comment.