Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better Incrementality for Snapshots of Unchanged Shards #52182

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
73 commits
Select commit Hold shift + click to select a range
cbf3905
bck
original-brownbear Jan 28, 2020
68a54a2
Merge remote-tracking branch 'elastic/master' into fix-incrementality…
original-brownbear Jan 28, 2020
11ac711
bck
original-brownbear Jan 28, 2020
841216d
Merge remote-tracking branch 'elastic/master' into fix-incrementality…
original-brownbear Jan 30, 2020
a47de87
bck
original-brownbear Jan 30, 2020
4e54a40
bck
original-brownbear Jan 30, 2020
4dcea76
bck
original-brownbear Jan 30, 2020
0cf43bb
Merge remote-tracking branch 'elastic/master' into source-only-snapsh…
original-brownbear Jan 30, 2020
12bb77b
Merge remote-tracking branch 'elastic/master' into source-only-snapsh…
original-brownbear Jan 30, 2020
48171e1
Merge remote-tracking branch 'elastic/master' into source-only-snapsh…
original-brownbear Jan 31, 2020
dec9ddb
Merge remote-tracking branch 'elastic/master' into source-only-snapsh…
original-brownbear Feb 1, 2020
1ab25bc
Stop creating new segments_N needlessly
original-brownbear Feb 1, 2020
fead313
reorg
original-brownbear Feb 2, 2020
7cacd74
Merge remote-tracking branch 'elastic/master' into source-only-snapsh…
original-brownbear Feb 2, 2020
cd052b4
Merge remote-tracking branch 'elastic/master' into source-only-snapsh…
original-brownbear Feb 4, 2020
fcd23db
bck
original-brownbear Feb 4, 2020
28e3ae0
bck
original-brownbear Feb 4, 2020
c44644d
bck
original-brownbear Feb 4, 2020
04ee734
bck
original-brownbear Feb 4, 2020
d3e71c0
Merge remote-tracking branch 'elastic/master' into source-only-snapsh…
original-brownbear Feb 5, 2020
3ced3f0
better
original-brownbear Feb 5, 2020
6a6270b
bck
original-brownbear Feb 5, 2020
d4ab9fb
bck
original-brownbear Feb 5, 2020
625ab71
Merge remote-tracking branch 'elastic/master' into source-only-snapsh…
original-brownbear Feb 5, 2020
82ec5f0
Merge remote-tracking branch 'elastic/master' into source-only-snapsh…
original-brownbear Feb 5, 2020
d4358f1
Merge remote-tracking branch 'elastic/master' into source-only-snapsh…
original-brownbear Feb 5, 2020
d607c94
bck
original-brownbear Feb 6, 2020
54da54a
Merge remote-tracking branch 'elastic/master' into source-only-snapsh…
original-brownbear Feb 6, 2020
fec8d13
bck
original-brownbear Feb 6, 2020
28a8a51
Merge remote-tracking branch 'elastic/master' into source-only-snapsh…
original-brownbear Feb 8, 2020
2a87a71
bck
original-brownbear Feb 8, 2020
e54d34d
sorta
original-brownbear Feb 8, 2020
6688903
sorta
original-brownbear Feb 10, 2020
3040e08
reverts
original-brownbear Feb 10, 2020
97ccb91
shorter
original-brownbear Feb 10, 2020
0defec8
works so far :)
original-brownbear Feb 10, 2020
3929452
nicer
original-brownbear Feb 10, 2020
15d186a
makes sense
original-brownbear Feb 11, 2020
e110f83
nicer
original-brownbear Feb 11, 2020
a0130e0
Merge remote-tracking branch 'elastic/master' into better-incremental…
original-brownbear Feb 11, 2020
72f8392
nicer
original-brownbear Feb 11, 2020
a57ca43
nicer
original-brownbear Feb 11, 2020
82d5bd4
doccs
original-brownbear Feb 11, 2020
31c46f8
test and fix delete case
original-brownbear Feb 11, 2020
6ffb2de
Merge remote-tracking branch 'elastic/master' into better-incremental…
original-brownbear Feb 17, 2020
02733bf
bck
original-brownbear Feb 17, 2020
c584d99
Merge remote-tracking branch 'elastic/master' into better-incremental…
original-brownbear Feb 18, 2020
ed0477c
works
original-brownbear Feb 18, 2020
5cd66be
Merge remote-tracking branch 'elastic/master' into better-incremental…
original-brownbear Feb 18, 2020
2aefef3
cleanup useless changes
original-brownbear Feb 18, 2020
10a14c4
cleanup useless changes
original-brownbear Feb 18, 2020
69eb658
Merge remote-tracking branch 'elastic/master' into better-incremental…
original-brownbear Feb 20, 2020
2b7a19f
determine whether commit is safe in snapshot shards service
original-brownbear Feb 20, 2020
8dc590a
Merge remote-tracking branch 'elastic/master' into better-incremental…
original-brownbear Feb 21, 2020
91048f4
bck
original-brownbear Feb 21, 2020
d0b19ac
pass down global checkpoint
original-brownbear Feb 21, 2020
8284af2
review comments
original-brownbear Feb 21, 2020
db58597
renaming
original-brownbear Feb 21, 2020
bf4ff3a
Merge remote-tracking branch 'elastic/master' into better-incremental…
original-brownbear Feb 21, 2020
624b1fb
shorter
original-brownbear Feb 21, 2020
5e00e06
Merge remote-tracking branch 'elastic/master' into better-incremental…
original-brownbear Feb 21, 2020
d90216a
CR comments
original-brownbear Feb 21, 2020
1ef5328
Merge remote-tracking branch 'elastic/master' into better-incremental…
original-brownbear Mar 11, 2020
ba70d50
Merge remote-tracking branch 'elastic/master' into better-incremental…
original-brownbear Mar 22, 2020
c17c7c6
CR: use single identifier and pass it to repos
original-brownbear Mar 22, 2020
2802876
remove pointless commit
original-brownbear Mar 22, 2020
4c59524
better docs
original-brownbear Mar 22, 2020
0bfb717
safety
original-brownbear Mar 22, 2020
f8ca09d
simpler
original-brownbear Mar 22, 2020
2b12eb8
better wording
original-brownbear Mar 22, 2020
983eb58
better wording
original-brownbear Mar 22, 2020
e3ddc56
Merge remote-tracking branch 'elastic/master' into better-incremental…
original-brownbear Mar 23, 2020
b508fb3
CR: add test for force merge scenario
original-brownbear Mar 23, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ static final class Fields {

static final class ParseFields {
static final ParseField FILES = new ParseField("files");
static final ParseField SHARD_STATE_ID = new ParseField("shard_state_id");
static final ParseField SNAPSHOTS = new ParseField("snapshots");
}

Expand Down Expand Up @@ -207,6 +208,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.value(fileInfo.name());
}
builder.endArray();
if (snapshot.shardStateIdentifier() != null) {
builder.field(ParseFields.SHARD_STATE_ID.getPreferredName(), snapshot.shardStateIdentifier());
}
builder.endObject();
}
builder.endObject();
Expand All @@ -219,6 +223,8 @@ public static BlobStoreIndexShardSnapshots fromXContent(XContentParser parser) t
token = parser.nextToken();
}
Map<String, List<String>> snapshotsMap = new HashMap<>();
Map<String, String> historyUUIDs = new HashMap<>();
Map<String, Long> globalCheckpoints = new HashMap<>();
Map<String, FileInfo> files = new HashMap<>();
if (token == XContentParser.Token.START_OBJECT) {
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
Expand Down Expand Up @@ -250,15 +256,16 @@ public static BlobStoreIndexShardSnapshots fromXContent(XContentParser parser) t
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
if (parser.nextToken() == XContentParser.Token.START_ARRAY) {
if (ParseFields.FILES.match(currentFieldName, parser.getDeprecationHandler()) == false) {
throw new ElasticsearchParseException("unknown array [{}]", currentFieldName);
}
if (ParseFields.FILES.match(currentFieldName, parser.getDeprecationHandler()) &&
ywelsch marked this conversation as resolved.
Show resolved Hide resolved
parser.nextToken() == XContentParser.Token.START_ARRAY) {
List<String> fileNames = new ArrayList<>();
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
fileNames.add(parser.text());
}
snapshotsMap.put(snapshot, fileNames);
} else if (ParseFields.SHARD_STATE_ID.match(currentFieldName, parser.getDeprecationHandler())) {
parser.nextToken();
historyUUIDs.put(snapshot, parser.text());
}
}
}
Expand All @@ -277,7 +284,8 @@ public static BlobStoreIndexShardSnapshots fromXContent(XContentParser parser) t
assert fileInfo != null;
fileInfosBuilder.add(fileInfo);
}
snapshots.add(new SnapshotFiles(entry.getKey(), Collections.unmodifiableList(fileInfosBuilder)));
snapshots.add(new SnapshotFiles(entry.getKey(), Collections.unmodifiableList(fileInfosBuilder),
historyUUIDs.get(entry.getKey())));
}
return new BlobStoreIndexShardSnapshots(files, Collections.unmodifiableList(snapshots));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.index.snapshots.blobstore;

import org.elasticsearch.common.Nullable;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;

import java.util.HashMap;
Expand All @@ -33,6 +34,9 @@ public class SnapshotFiles {

private final List<FileInfo> indexFiles;

@Nullable
private final String shardStateIdentifier;

private Map<String, FileInfo> physicalFiles = null;

/**
Expand All @@ -45,12 +49,23 @@ public String snapshot() {
}

/**
* @param snapshot snapshot name
* @param indexFiles index files
* @param snapshot snapshot name
* @param indexFiles index files
* @param shardStateIdentifier unique identifier for the state of the shard that this snapshot was taken from
*/
public SnapshotFiles(String snapshot, List<FileInfo> indexFiles ) {
public SnapshotFiles(String snapshot, List<FileInfo> indexFiles, @Nullable String shardStateIdentifier) {
this.snapshot = snapshot;
this.indexFiles = indexFiles;
this.shardStateIdentifier = shardStateIdentifier;
}

/**
* Returns an identifier for the shard state that can be used to check whether a shard has changed between
* snapshots or not.
*/
@Nullable
public String shardStateIdentifier() {
return shardStateIdentifier;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,10 @@ public boolean isReadOnly() {

@Override
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, Version repositoryMetaVersion,
Map<String, Object> userMetadata, ActionListener<String> listener) {
in.snapshotShard(
store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus, repositoryMetaVersion, userMetadata, listener);
IndexCommit snapshotIndexCommit, String shardStateIdentifier, IndexShardSnapshotStatus snapshotStatus,
Version repositoryMetaVersion, Map<String, Object> userMetadata, ActionListener<String> listener) {
in.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, shardStateIdentifier, snapshotStatus,
repositoryMetaVersion, userMetadata, listener);
}
@Override
public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -199,14 +200,17 @@ void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations,
* @param snapshotId snapshot id
* @param indexId id for the index being snapshotted
* @param snapshotIndexCommit commit point
* @param shardStateIdentifier a unique identifier of the state of the shard that is stored with the shard's snapshot and used
* to detect if the shard has changed between snapshots. If {@code null} is passed as the identifier
* snapshotting will be done by inspecting the physical files referenced by {@code snapshotIndexCommit}
* @param snapshotStatus snapshot status
* @param repositoryMetaVersion version of the updated repository metadata to write
* @param userMetadata user metadata of the snapshot found in {@link SnapshotsInProgress.Entry#userMetadata()}
* @param listener listener invoked on completion
*/
void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
IndexShardSnapshotStatus snapshotStatus, Version repositoryMetaVersion, Map<String, Object> userMetadata,
ActionListener<String> listener);
@Nullable String shardStateIdentifier, IndexShardSnapshotStatus snapshotStatus, Version repositoryMetaVersion,
Map<String, Object> userMetadata, ActionListener<String> listener);

/**
* Restores snapshot of the shard.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -1534,8 +1535,8 @@ private void writeAtomic(final String blobName, final BytesReference bytesRef, b

@Override
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, Version repositoryMetaVersion,
Map<String, Object> userMetadata, ActionListener<String> listener) {
IndexCommit snapshotIndexCommit, String shardStateIdentifier, IndexShardSnapshotStatus snapshotStatus,
Version repositoryMetaVersion, Map<String, Object> userMetadata, ActionListener<String> listener) {
final ShardId shardId = store.shardId();
final long startTime = threadPool.absoluteTimeInMillis();
try {
Expand All @@ -1561,76 +1562,92 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s
throw new IndexShardSnapshotFailedException(shardId,
"Duplicate snapshot name [" + snapshotId.getName() + "] detected, aborting");
}

final List<BlobStoreIndexShardSnapshot.FileInfo> indexCommitPointFiles = new ArrayList<>();
final BlockingQueue<BlobStoreIndexShardSnapshot.FileInfo> filesToSnapshot = new LinkedBlockingQueue<>();
store.incRef();
final Collection<String> fileNames;
final Store.MetadataSnapshot metadataFromStore;
try {
// TODO apparently we don't use the MetadataSnapshot#.recoveryDiff(...) here but we should
try {
logger.trace(
"[{}] [{}] Loading store metadata using index commit [{}]", shardId, snapshotId, snapshotIndexCommit);
metadataFromStore = store.getMetadata(snapshotIndexCommit);
fileNames = snapshotIndexCommit.getFileNames();
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to get store file metadata", e);
// First inspect all known SegmentInfos instances to see if we already have an equivalent commit in the repository
final List<BlobStoreIndexShardSnapshot.FileInfo> filesFromSegmentInfos = Optional.ofNullable(shardStateIdentifier).map(id -> {
for (SnapshotFiles snapshotFileSet : snapshots.snapshots()) {
if (id.equals(snapshotFileSet.shardStateIdentifier())) {
return snapshotFileSet.indexFiles();
}
}
} finally {
store.decRef();
}
return null;
}).orElse(null);

final List<BlobStoreIndexShardSnapshot.FileInfo> indexCommitPointFiles;
int indexIncrementalFileCount = 0;
int indexTotalNumberOfFiles = 0;
long indexIncrementalSize = 0;
long indexTotalFileCount = 0;
for (String fileName : fileNames) {
if (snapshotStatus.isAborted()) {
logger.debug("[{}] [{}] Aborted on the file [{}], exiting", shardId, snapshotId, fileName);
throw new IndexShardSnapshotFailedException(shardId, "Aborted");
long indexTotalFileSize = 0;
final BlockingQueue<BlobStoreIndexShardSnapshot.FileInfo> filesToSnapshot = new LinkedBlockingQueue<>();
// If we did not find a set of files that is equal to the current commit we determine the files to upload by comparing files
// in the commit with files already in the repository
ywelsch marked this conversation as resolved.
Show resolved Hide resolved
if (filesFromSegmentInfos == null) {
indexCommitPointFiles = new ArrayList<>();
store.incRef();
final Collection<String> fileNames;
final Store.MetadataSnapshot metadataFromStore;
try {
// TODO apparently we don't use the MetadataSnapshot#.recoveryDiff(...) here but we should
try {
logger.trace(
"[{}] [{}] Loading store metadata using index commit [{}]", shardId, snapshotId, snapshotIndexCommit);
metadataFromStore = store.getMetadata(snapshotIndexCommit);
fileNames = snapshotIndexCommit.getFileNames();
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to get store file metadata", e);
}
} finally {
store.decRef();
}
for (String fileName : fileNames) {
if (snapshotStatus.isAborted()) {
logger.debug("[{}] [{}] Aborted on the file [{}], exiting", shardId, snapshotId, fileName);
throw new IndexShardSnapshotFailedException(shardId, "Aborted");
}

logger.trace("[{}] [{}] Processing [{}]", shardId, snapshotId, fileName);
final StoreFileMetaData md = metadataFromStore.get(fileName);
BlobStoreIndexShardSnapshot.FileInfo existingFileInfo = null;
List<BlobStoreIndexShardSnapshot.FileInfo> filesInfo = snapshots.findPhysicalIndexFiles(fileName);
if (filesInfo != null) {
for (BlobStoreIndexShardSnapshot.FileInfo fileInfo : filesInfo) {
if (fileInfo.isSame(md)) {
// a commit point file with the same name, size and checksum was already copied to repository
// we will reuse it for this snapshot
existingFileInfo = fileInfo;
break;
logger.trace("[{}] [{}] Processing [{}]", shardId, snapshotId, fileName);
final StoreFileMetaData md = metadataFromStore.get(fileName);
BlobStoreIndexShardSnapshot.FileInfo existingFileInfo = null;
List<BlobStoreIndexShardSnapshot.FileInfo> filesInfo = snapshots.findPhysicalIndexFiles(fileName);
if (filesInfo != null) {
for (BlobStoreIndexShardSnapshot.FileInfo fileInfo : filesInfo) {
if (fileInfo.isSame(md)) {
// a commit point file with the same name, size and checksum was already copied to repository
// we will reuse it for this snapshot
existingFileInfo = fileInfo;
break;
}
}
}
}

// We can skip writing blobs where the metadata hash is equal to the blob's contents because we store the hash/contents
// directly in the shard level metadata in this case
final boolean needsWrite = md.hashEqualsContents() == false;
indexTotalFileCount += md.length();
indexTotalNumberOfFiles++;

if (existingFileInfo == null) {
indexIncrementalFileCount++;
indexIncrementalSize += md.length();
// create a new FileInfo
BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo =
new BlobStoreIndexShardSnapshot.FileInfo(
(needsWrite ? UPLOADED_DATA_BLOB_PREFIX : VIRTUAL_DATA_BLOB_PREFIX) + UUIDs.randomBase64UUID(),
md, chunkSize());
indexCommitPointFiles.add(snapshotFileInfo);
if (needsWrite) {
filesToSnapshot.add(snapshotFileInfo);
// We can skip writing blobs where the metadata hash is equal to the blob's contents because we store the hash/contents
// directly in the shard level metadata in this case
final boolean needsWrite = md.hashEqualsContents() == false;
indexTotalFileSize += md.length();
indexTotalNumberOfFiles++;

if (existingFileInfo == null) {
indexIncrementalFileCount++;
indexIncrementalSize += md.length();
// create a new FileInfo
BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo =
new BlobStoreIndexShardSnapshot.FileInfo(
(needsWrite ? UPLOADED_DATA_BLOB_PREFIX : VIRTUAL_DATA_BLOB_PREFIX) + UUIDs.randomBase64UUID(),
md, chunkSize());
indexCommitPointFiles.add(snapshotFileInfo);
if (needsWrite) {
filesToSnapshot.add(snapshotFileInfo);
}
assert needsWrite || assertFileContentsMatchHash(snapshotFileInfo, store);
} else {
indexCommitPointFiles.add(existingFileInfo);
}
assert needsWrite || assertFileContentsMatchHash(snapshotFileInfo, store);
} else {
indexCommitPointFiles.add(existingFileInfo);
}
} else {
indexCommitPointFiles = filesFromSegmentInfos;
}

snapshotStatus.moveToStarted(startTime, indexIncrementalFileCount,
indexTotalNumberOfFiles, indexIncrementalSize, indexTotalFileCount);
indexTotalNumberOfFiles, indexIncrementalSize, indexTotalFileSize);

final StepListener<Collection<Void>> allFilesUploadedListener = new StepListener<>();
allFilesUploadedListener.whenComplete(v -> {
Expand All @@ -1655,7 +1672,7 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s
}
// build a new BlobStoreIndexShardSnapshot, that includes this one and all the saved ones
List<SnapshotFiles> newSnapshotsList = new ArrayList<>();
newSnapshotsList.add(new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles()));
newSnapshotsList.add(new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles(), shardStateIdentifier));
for (SnapshotFiles point : snapshots) {
newSnapshotsList.add(point);
}
Expand Down Expand Up @@ -1742,7 +1759,7 @@ public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, Sh
final BlobContainer container = shardContainer(indexId, snapshotShardId);
executor.execute(ActionRunnable.wrap(restoreListener, l -> {
final BlobStoreIndexShardSnapshot snapshot = loadShardSnapshot(container, snapshotId);
final SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles());
final SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles(), null);
new FileRestoreContext(metadata.name(), shardId, snapshotId, recoveryState) {
@Override
protected void restoreFiles(List<BlobStoreIndexShardSnapshot.FileInfo> filesToRecover, Store store,
Expand Down
Loading