Skip to content

Commit

Permalink
Better Incrementality for Snapshots of Unchanged Shards (#52182)
Browse files Browse the repository at this point in the history
Use sequence numbers and force merge UUID to determine whether a shard has changed or not instead before falling back to comparing files to get incremental snapshots on primary fail-over.
  • Loading branch information
original-brownbear committed Mar 23, 2020
1 parent aa56f91 commit 87c910b
Show file tree
Hide file tree
Showing 15 changed files with 397 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ static final class Fields {

static final class ParseFields {
static final ParseField FILES = new ParseField("files");
static final ParseField SHARD_STATE_ID = new ParseField("shard_state_id");
static final ParseField SNAPSHOTS = new ParseField("snapshots");
}

Expand Down Expand Up @@ -207,6 +208,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.value(fileInfo.name());
}
builder.endArray();
if (snapshot.shardStateIdentifier() != null) {
builder.field(ParseFields.SHARD_STATE_ID.getPreferredName(), snapshot.shardStateIdentifier());
}
builder.endObject();
}
builder.endObject();
Expand All @@ -219,6 +223,8 @@ public static BlobStoreIndexShardSnapshots fromXContent(XContentParser parser) t
token = parser.nextToken();
}
Map<String, List<String>> snapshotsMap = new HashMap<>();
Map<String, String> historyUUIDs = new HashMap<>();
Map<String, Long> globalCheckpoints = new HashMap<>();
Map<String, FileInfo> files = new HashMap<>();
if (token == XContentParser.Token.START_OBJECT) {
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
Expand Down Expand Up @@ -250,15 +256,16 @@ public static BlobStoreIndexShardSnapshots fromXContent(XContentParser parser) t
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
if (parser.nextToken() == XContentParser.Token.START_ARRAY) {
if (ParseFields.FILES.match(currentFieldName, parser.getDeprecationHandler()) == false) {
throw new ElasticsearchParseException("unknown array [{}]", currentFieldName);
}
if (ParseFields.FILES.match(currentFieldName, parser.getDeprecationHandler()) &&
parser.nextToken() == XContentParser.Token.START_ARRAY) {
List<String> fileNames = new ArrayList<>();
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
fileNames.add(parser.text());
}
snapshotsMap.put(snapshot, fileNames);
} else if (ParseFields.SHARD_STATE_ID.match(currentFieldName, parser.getDeprecationHandler())) {
parser.nextToken();
historyUUIDs.put(snapshot, parser.text());
}
}
}
Expand All @@ -277,7 +284,8 @@ public static BlobStoreIndexShardSnapshots fromXContent(XContentParser parser) t
assert fileInfo != null;
fileInfosBuilder.add(fileInfo);
}
snapshots.add(new SnapshotFiles(entry.getKey(), Collections.unmodifiableList(fileInfosBuilder)));
snapshots.add(new SnapshotFiles(entry.getKey(), Collections.unmodifiableList(fileInfosBuilder),
historyUUIDs.get(entry.getKey())));
}
return new BlobStoreIndexShardSnapshots(files, Collections.unmodifiableList(snapshots));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.index.snapshots.blobstore;

import org.elasticsearch.common.Nullable;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;

import java.util.HashMap;
Expand All @@ -33,6 +34,9 @@ public class SnapshotFiles {

private final List<FileInfo> indexFiles;

@Nullable
private final String shardStateIdentifier;

private Map<String, FileInfo> physicalFiles = null;

/**
Expand All @@ -45,12 +49,23 @@ public String snapshot() {
}

/**
* @param snapshot snapshot name
* @param indexFiles index files
* @param snapshot snapshot name
* @param indexFiles index files
* @param shardStateIdentifier unique identifier for the state of the shard that this snapshot was taken from
*/
public SnapshotFiles(String snapshot, List<FileInfo> indexFiles ) {
public SnapshotFiles(String snapshot, List<FileInfo> indexFiles, @Nullable String shardStateIdentifier) {
this.snapshot = snapshot;
this.indexFiles = indexFiles;
this.shardStateIdentifier = shardStateIdentifier;
}

/**
* Returns an identifier for the shard state that can be used to check whether a shard has changed between
* snapshots or not.
*/
@Nullable
public String shardStateIdentifier() {
return shardStateIdentifier;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,10 @@ public boolean isReadOnly() {

@Override
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, Version repositoryMetaVersion,
Map<String, Object> userMetadata, ActionListener<String> listener) {
in.snapshotShard(
store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus, repositoryMetaVersion, userMetadata, listener);
IndexCommit snapshotIndexCommit, String shardStateIdentifier, IndexShardSnapshotStatus snapshotStatus,
Version repositoryMetaVersion, Map<String, Object> userMetadata, ActionListener<String> listener) {
in.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, shardStateIdentifier, snapshotStatus,
repositoryMetaVersion, userMetadata, listener);
}
@Override
public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -199,14 +200,17 @@ void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations,
* @param snapshotId snapshot id
* @param indexId id for the index being snapshotted
* @param snapshotIndexCommit commit point
* @param shardStateIdentifier a unique identifier of the state of the shard that is stored with the shard's snapshot and used
* to detect if the shard has changed between snapshots. If {@code null} is passed as the identifier
* snapshotting will be done by inspecting the physical files referenced by {@code snapshotIndexCommit}
* @param snapshotStatus snapshot status
* @param repositoryMetaVersion version of the updated repository metadata to write
* @param userMetadata user metadata of the snapshot found in {@link SnapshotsInProgress.Entry#userMetadata()}
* @param listener listener invoked on completion
*/
void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
IndexShardSnapshotStatus snapshotStatus, Version repositoryMetaVersion, Map<String, Object> userMetadata,
ActionListener<String> listener);
@Nullable String shardStateIdentifier, IndexShardSnapshotStatus snapshotStatus, Version repositoryMetaVersion,
Map<String, Object> userMetadata, ActionListener<String> listener);

/**
* Restores snapshot of the shard.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -1534,8 +1535,8 @@ private void writeAtomic(final String blobName, final BytesReference bytesRef, b

@Override
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, Version repositoryMetaVersion,
Map<String, Object> userMetadata, ActionListener<String> listener) {
IndexCommit snapshotIndexCommit, String shardStateIdentifier, IndexShardSnapshotStatus snapshotStatus,
Version repositoryMetaVersion, Map<String, Object> userMetadata, ActionListener<String> listener) {
final ShardId shardId = store.shardId();
final long startTime = threadPool.absoluteTimeInMillis();
try {
Expand All @@ -1561,76 +1562,92 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s
throw new IndexShardSnapshotFailedException(shardId,
"Duplicate snapshot name [" + snapshotId.getName() + "] detected, aborting");
}

final List<BlobStoreIndexShardSnapshot.FileInfo> indexCommitPointFiles = new ArrayList<>();
final BlockingQueue<BlobStoreIndexShardSnapshot.FileInfo> filesToSnapshot = new LinkedBlockingQueue<>();
store.incRef();
final Collection<String> fileNames;
final Store.MetadataSnapshot metadataFromStore;
try {
// TODO apparently we don't use the MetadataSnapshot#.recoveryDiff(...) here but we should
try {
logger.trace(
"[{}] [{}] Loading store metadata using index commit [{}]", shardId, snapshotId, snapshotIndexCommit);
metadataFromStore = store.getMetadata(snapshotIndexCommit);
fileNames = snapshotIndexCommit.getFileNames();
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to get store file metadata", e);
// First inspect all known SegmentInfos instances to see if we already have an equivalent commit in the repository
final List<BlobStoreIndexShardSnapshot.FileInfo> filesFromSegmentInfos = Optional.ofNullable(shardStateIdentifier).map(id -> {
for (SnapshotFiles snapshotFileSet : snapshots.snapshots()) {
if (id.equals(snapshotFileSet.shardStateIdentifier())) {
return snapshotFileSet.indexFiles();
}
}
} finally {
store.decRef();
}
return null;
}).orElse(null);

final List<BlobStoreIndexShardSnapshot.FileInfo> indexCommitPointFiles;
int indexIncrementalFileCount = 0;
int indexTotalNumberOfFiles = 0;
long indexIncrementalSize = 0;
long indexTotalFileCount = 0;
for (String fileName : fileNames) {
if (snapshotStatus.isAborted()) {
logger.debug("[{}] [{}] Aborted on the file [{}], exiting", shardId, snapshotId, fileName);
throw new IndexShardSnapshotFailedException(shardId, "Aborted");
long indexTotalFileSize = 0;
final BlockingQueue<BlobStoreIndexShardSnapshot.FileInfo> filesToSnapshot = new LinkedBlockingQueue<>();
// If we did not find a set of files that is equal to the current commit we determine the files to upload by comparing files
// in the commit with files already in the repository
if (filesFromSegmentInfos == null) {
indexCommitPointFiles = new ArrayList<>();
store.incRef();
final Collection<String> fileNames;
final Store.MetadataSnapshot metadataFromStore;
try {
// TODO apparently we don't use the MetadataSnapshot#.recoveryDiff(...) here but we should
try {
logger.trace(
"[{}] [{}] Loading store metadata using index commit [{}]", shardId, snapshotId, snapshotIndexCommit);
metadataFromStore = store.getMetadata(snapshotIndexCommit);
fileNames = snapshotIndexCommit.getFileNames();
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to get store file metadata", e);
}
} finally {
store.decRef();
}
for (String fileName : fileNames) {
if (snapshotStatus.isAborted()) {
logger.debug("[{}] [{}] Aborted on the file [{}], exiting", shardId, snapshotId, fileName);
throw new IndexShardSnapshotFailedException(shardId, "Aborted");
}

logger.trace("[{}] [{}] Processing [{}]", shardId, snapshotId, fileName);
final StoreFileMetaData md = metadataFromStore.get(fileName);
BlobStoreIndexShardSnapshot.FileInfo existingFileInfo = null;
List<BlobStoreIndexShardSnapshot.FileInfo> filesInfo = snapshots.findPhysicalIndexFiles(fileName);
if (filesInfo != null) {
for (BlobStoreIndexShardSnapshot.FileInfo fileInfo : filesInfo) {
if (fileInfo.isSame(md)) {
// a commit point file with the same name, size and checksum was already copied to repository
// we will reuse it for this snapshot
existingFileInfo = fileInfo;
break;
logger.trace("[{}] [{}] Processing [{}]", shardId, snapshotId, fileName);
final StoreFileMetaData md = metadataFromStore.get(fileName);
BlobStoreIndexShardSnapshot.FileInfo existingFileInfo = null;
List<BlobStoreIndexShardSnapshot.FileInfo> filesInfo = snapshots.findPhysicalIndexFiles(fileName);
if (filesInfo != null) {
for (BlobStoreIndexShardSnapshot.FileInfo fileInfo : filesInfo) {
if (fileInfo.isSame(md)) {
// a commit point file with the same name, size and checksum was already copied to repository
// we will reuse it for this snapshot
existingFileInfo = fileInfo;
break;
}
}
}
}

// We can skip writing blobs where the metadata hash is equal to the blob's contents because we store the hash/contents
// directly in the shard level metadata in this case
final boolean needsWrite = md.hashEqualsContents() == false;
indexTotalFileCount += md.length();
indexTotalNumberOfFiles++;

if (existingFileInfo == null) {
indexIncrementalFileCount++;
indexIncrementalSize += md.length();
// create a new FileInfo
BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo =
new BlobStoreIndexShardSnapshot.FileInfo(
(needsWrite ? UPLOADED_DATA_BLOB_PREFIX : VIRTUAL_DATA_BLOB_PREFIX) + UUIDs.randomBase64UUID(),
md, chunkSize());
indexCommitPointFiles.add(snapshotFileInfo);
if (needsWrite) {
filesToSnapshot.add(snapshotFileInfo);
// We can skip writing blobs where the metadata hash is equal to the blob's contents because we store the hash/contents
// directly in the shard level metadata in this case
final boolean needsWrite = md.hashEqualsContents() == false;
indexTotalFileSize += md.length();
indexTotalNumberOfFiles++;

if (existingFileInfo == null) {
indexIncrementalFileCount++;
indexIncrementalSize += md.length();
// create a new FileInfo
BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo =
new BlobStoreIndexShardSnapshot.FileInfo(
(needsWrite ? UPLOADED_DATA_BLOB_PREFIX : VIRTUAL_DATA_BLOB_PREFIX) + UUIDs.randomBase64UUID(),
md, chunkSize());
indexCommitPointFiles.add(snapshotFileInfo);
if (needsWrite) {
filesToSnapshot.add(snapshotFileInfo);
}
assert needsWrite || assertFileContentsMatchHash(snapshotFileInfo, store);
} else {
indexCommitPointFiles.add(existingFileInfo);
}
assert needsWrite || assertFileContentsMatchHash(snapshotFileInfo, store);
} else {
indexCommitPointFiles.add(existingFileInfo);
}
} else {
indexCommitPointFiles = filesFromSegmentInfos;
}

snapshotStatus.moveToStarted(startTime, indexIncrementalFileCount,
indexTotalNumberOfFiles, indexIncrementalSize, indexTotalFileCount);
indexTotalNumberOfFiles, indexIncrementalSize, indexTotalFileSize);

final StepListener<Collection<Void>> allFilesUploadedListener = new StepListener<>();
allFilesUploadedListener.whenComplete(v -> {
Expand All @@ -1655,7 +1672,7 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s
}
// build a new BlobStoreIndexShardSnapshot, that includes this one and all the saved ones
List<SnapshotFiles> newSnapshotsList = new ArrayList<>();
newSnapshotsList.add(new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles()));
newSnapshotsList.add(new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles(), shardStateIdentifier));
for (SnapshotFiles point : snapshots) {
newSnapshotsList.add(point);
}
Expand Down Expand Up @@ -1742,7 +1759,7 @@ public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, Sh
final BlobContainer container = shardContainer(indexId, snapshotShardId);
executor.execute(ActionRunnable.wrap(restoreListener, l -> {
final BlobStoreIndexShardSnapshot snapshot = loadShardSnapshot(container, snapshotId);
final SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles());
final SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles(), null);
new FileRestoreContext(metadata.name(), shardId, snapshotId, recoveryState) {
@Override
protected void restoreFiles(List<BlobStoreIndexShardSnapshot.FileInfo> filesToRecover, Store store,
Expand Down
Loading

0 comments on commit 87c910b

Please sign in to comment.