Skip to content

Commit

Permalink
Speed up operations on BlobStoreIndexShardSnapshots (#88912)
Browse files Browse the repository at this point in the history
This fixes a couple of slow points in `BlobStoreIndexShardSnapshots`,
which become performance critical when working with large repositories.

1. Fix `physicalFiles` containing the same `FileInfo` instances repeatedly for every
snapshot that holds the file. Without this fix the map can hold lists as long as the
number of snapshots for the shard for files common to all snapshots of the shard.
Also, only lazy build the map since it's only used during snapshotting and internalize
the logic into `BlobStoreIndexShardSnapshots` so we don't have to bother with wrapping
as unmodifiable.
2. Add efficient copy constructors for all 3 operations on the shard to avoid
expensive looping over all snapshots and their files in many cases.
3. Use list instead of redundant map in deserialization, we weren't using the map
for any deduplication anyways and are safe here thanks to Jackson's duplicate name
detection
  • Loading branch information
original-brownbear committed Jul 29, 2022
1 parent be4d809 commit bd624ba
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,25 @@

import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.xcontent.XContentParserUtils;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;
import org.elasticsearch.index.store.StoreFileMetadata;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.xcontent.ToXContentFragment;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import static java.util.Collections.unmodifiableMap;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Contains information about all snapshots for the given shard in repository
Expand All @@ -33,54 +38,53 @@
*/
public class BlobStoreIndexShardSnapshots implements Iterable<SnapshotFiles>, ToXContentFragment {

public static final BlobStoreIndexShardSnapshots EMPTY = new BlobStoreIndexShardSnapshots(Collections.emptyList());
public static final BlobStoreIndexShardSnapshots EMPTY = new BlobStoreIndexShardSnapshots(Map.of(), List.of());

private final List<SnapshotFiles> shardSnapshots;
private final Map<String, FileInfo> files;
private final Map<String, List<FileInfo>> physicalFiles;

public BlobStoreIndexShardSnapshots(List<SnapshotFiles> shardSnapshots) {
private BlobStoreIndexShardSnapshots(Map<String, FileInfo> files, List<SnapshotFiles> shardSnapshots) {
this.shardSnapshots = List.copyOf(shardSnapshots);
// Map between blob names and file info
this.files = files;
}

public BlobStoreIndexShardSnapshots withRetainedSnapshots(Set<SnapshotId> retainedSnapshots) {
if (retainedSnapshots.isEmpty()) {
return EMPTY;
}
final var survivingSnapshotNames = retainedSnapshots.stream().map(SnapshotId::getName).collect(Collectors.toSet());
final ArrayList<SnapshotFiles> updatedSnapshots = new ArrayList<>(survivingSnapshotNames.size());
Map<String, FileInfo> newFiles = new HashMap<>();
// Map between original physical names and file info
Map<String, List<FileInfo>> physicalFiles = new HashMap<>();
for (SnapshotFiles snapshot : shardSnapshots) {
// First we build map between filenames in the repo and their original file info
// this map will be used in the next loop
if (survivingSnapshotNames.contains(snapshot.snapshot()) == false) {
continue;
}
updatedSnapshots.add(snapshot);
for (FileInfo fileInfo : snapshot.indexFiles()) {
FileInfo oldFile = newFiles.put(fileInfo.name(), fileInfo);
assert oldFile == null || oldFile.isSame(fileInfo);
}
// We are doing it in two loops here so we keep only one copy of the fileInfo per blob
// the first loop de-duplicates fileInfo objects that were loaded from different snapshots but refer to
// the same blob
for (FileInfo fileInfo : snapshot.indexFiles()) {
physicalFiles.computeIfAbsent(fileInfo.physicalName(), k -> new ArrayList<>()).add(newFiles.get(fileInfo.name()));
}
}
Map<String, List<FileInfo>> mapBuilder = new HashMap<>();
for (Map.Entry<String, List<FileInfo>> entry : physicalFiles.entrySet()) {
mapBuilder.put(entry.getKey(), List.copyOf(entry.getValue()));
}
this.physicalFiles = unmodifiableMap(mapBuilder);
this.files = unmodifiableMap(newFiles);
return new BlobStoreIndexShardSnapshots(newFiles, updatedSnapshots);
}

private BlobStoreIndexShardSnapshots(Map<String, FileInfo> files, List<SnapshotFiles> shardSnapshots) {
this.shardSnapshots = shardSnapshots;
this.files = files;
Map<String, List<FileInfo>> physicalFiles = new HashMap<>();
for (SnapshotFiles snapshot : shardSnapshots) {
for (FileInfo fileInfo : snapshot.indexFiles()) {
physicalFiles.computeIfAbsent(fileInfo.physicalName(), k -> new ArrayList<>()).add(files.get(fileInfo.name()));
public BlobStoreIndexShardSnapshots withAddedSnapshot(SnapshotFiles snapshotFiles) {
Map<String, FileInfo> updatedFiles = null;
for (FileInfo fileInfo : snapshotFiles.indexFiles()) {
final FileInfo known = files.get(fileInfo.name());
if (known == null) {
if (updatedFiles == null) {
updatedFiles = new HashMap<>(files);
}
updatedFiles.put(fileInfo.name(), fileInfo);
} else {
assert fileInfo.isSame(known);
}
}
Map<String, List<FileInfo>> mapBuilder = new HashMap<>();
for (Map.Entry<String, List<FileInfo>> entry : physicalFiles.entrySet()) {
mapBuilder.put(entry.getKey(), List.copyOf(entry.getValue()));
}
this.physicalFiles = unmodifiableMap(mapBuilder);
return new BlobStoreIndexShardSnapshots(
updatedFiles == null ? files : updatedFiles,
CollectionUtils.appendToCopyNoNullElements(shardSnapshots, snapshotFiles)
);
}

/**
Expand All @@ -102,7 +106,10 @@ public BlobStoreIndexShardSnapshots withClone(String source, String target) {
if (sourceFiles == null) {
throw new IllegalArgumentException("unknown source [" + source + "]");
}
return new BlobStoreIndexShardSnapshots(CollectionUtils.appendToCopy(shardSnapshots, sourceFiles.withSnapshotName(target)));
return new BlobStoreIndexShardSnapshots(
files,
CollectionUtils.appendToCopyNoNullElements(shardSnapshots, sourceFiles.withSnapshotName(target))
);
}

/**
Expand All @@ -114,14 +121,40 @@ public List<SnapshotFiles> snapshots() {
return this.shardSnapshots;
}

// index of Lucene file name to collection of file info in the repository
// lazy computed because building this is map is rather expensive and only needed for the snapshot create operation
private Map<String, Collection<FileInfo>> physicalFiles;

/**
* Finds reference to a snapshotted file by its original name
* Finds reference to a snapshotted file by its {@link StoreFileMetadata}
*
* @param physicalName original name
* @return a list of file infos that match specified physical file or null if the file is not present in any of snapshots
* @param storeFileMetadata store file metadata to find file info for
* @return the file info that matches the specified physical file or null if the file is not present in any of snapshots
*/
public List<FileInfo> findPhysicalIndexFiles(String physicalName) {
return physicalFiles.get(physicalName);
public FileInfo findPhysicalIndexFile(StoreFileMetadata storeFileMetadata) {
var p = this.physicalFiles;
if (p == null) {
p = new HashMap<>();
for (SnapshotFiles snapshot : shardSnapshots) {
for (FileInfo fileInfo : snapshot.indexFiles()) {
// we use identity hash set since we lookup all instances from the same map and thus equality == instance equality
// and we don't want to add the same file to the map multiple times
p.computeIfAbsent(fileInfo.physicalName(), k -> Collections.newSetFromMap(new IdentityHashMap<>()))
.add(files.get(fileInfo.name()));
}
}
physicalFiles = p;
}
final var found = p.get(storeFileMetadata.name());
if (found == null) {
return null;
}
for (FileInfo fileInfo : found) {
if (fileInfo.isSame(storeFileMetadata)) {
return fileInfo;
}
}
return null;
}

/**
Expand Down Expand Up @@ -228,7 +261,8 @@ public static BlobStoreIndexShardSnapshots fromXContent(XContentParser parser) t
if (token == null) { // New parser
token = parser.nextToken();
}
Map<String, List<String>> snapshotsMap = new HashMap<>();
// list of tuples of snapshot name and file ids in the snapshot
List<Tuple<String, List<String>>> snapshotsAndFiles = new ArrayList<>();
Map<String, String> historyUUIDs = new HashMap<>();
Map<String, FileInfo> files = new HashMap<>();
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, token, parser);
Expand Down Expand Up @@ -256,7 +290,9 @@ public static BlobStoreIndexShardSnapshots fromXContent(XContentParser parser) t
token = parser.nextToken();
if (Fields.FILES.equals(currentFieldName)) {
if (token == XContentParser.Token.START_ARRAY) {
snapshotsMap.put(snapshot, XContentParserUtils.parseList(parser, XContentParser::text));
snapshotsAndFiles.add(
Tuple.tuple(snapshot, XContentParserUtils.parseList(parser, XContentParser::text))
);
}
} else if (Fields.SHARD_STATE_ID.equals(currentFieldName)) {
historyUUIDs.put(snapshot, parser.text());
Expand All @@ -268,19 +304,17 @@ public static BlobStoreIndexShardSnapshots fromXContent(XContentParser parser) t
}
}

List<SnapshotFiles> snapshots = new ArrayList<>(snapshotsMap.size());
for (Map.Entry<String, List<String>> entry : snapshotsMap.entrySet()) {
List<SnapshotFiles> snapshots = new ArrayList<>(snapshotsAndFiles.size());
for (Tuple<String, List<String>> entry : snapshotsAndFiles) {
List<FileInfo> fileInfosBuilder = new ArrayList<>();
for (String file : entry.getValue()) {
for (String file : entry.v2()) {
FileInfo fileInfo = files.get(file);
assert fileInfo != null;
fileInfosBuilder.add(fileInfo);
}
snapshots.add(
new SnapshotFiles(entry.getKey(), Collections.unmodifiableList(fileInfosBuilder), historyUUIDs.get(entry.getKey()))
);
snapshots.add(new SnapshotFiles(entry.v1(), Collections.unmodifiableList(fileInfosBuilder), historyUUIDs.get(entry.v1())));
}
return new BlobStoreIndexShardSnapshots(files, Collections.unmodifiableList(snapshots));
return new BlobStoreIndexShardSnapshots(files, snapshots);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2696,18 +2696,7 @@ public void snapshotShard(SnapshotShardContext context) {

logger.trace("[{}] [{}] Processing [{}]", shardId, snapshotId, fileName);
final StoreFileMetadata md = metadataFromStore.get(fileName);
BlobStoreIndexShardSnapshot.FileInfo existingFileInfo = null;
List<BlobStoreIndexShardSnapshot.FileInfo> filesInfo = snapshots.findPhysicalIndexFiles(fileName);
if (filesInfo != null) {
for (BlobStoreIndexShardSnapshot.FileInfo fileInfo : filesInfo) {
if (fileInfo.isSame(md)) {
// a commit point file with the same name, size and checksum was already copied to repository
// we will reuse it for this snapshot
existingFileInfo = fileInfo;
break;
}
}
}
BlobStoreIndexShardSnapshot.FileInfo existingFileInfo = snapshots.findPhysicalIndexFile(md);

// We can skip writing blobs where the metadata hash is equal to the blob's contents because we store the hash/contents
// directly in the shard level metadata in this case
Expand All @@ -2733,6 +2722,8 @@ public void snapshotShard(SnapshotShardContext context) {
filesInShardMetadataSize += md.length();
}
} else {
// a commit point file with the same name, size and checksum was already copied to repository
// we will reuse it for this snapshot
indexCommitPointFiles.add(existingFileInfo);
}
}
Expand All @@ -2756,12 +2747,9 @@ public void snapshotShard(SnapshotShardContext context) {
final boolean writeShardGens = SnapshotsService.useShardGenerations(context.getRepositoryMetaVersion());
final boolean writeFileInfoWriterUUID = SnapshotsService.includeFileInfoWriterUUID(context.getRepositoryMetaVersion());
// build a new BlobStoreIndexShardSnapshot, that includes this one and all the saved ones
List<SnapshotFiles> newSnapshotsList = new ArrayList<>();
newSnapshotsList.add(new SnapshotFiles(snapshotId.getName(), indexCommitPointFiles, context.stateIdentifier()));
for (SnapshotFiles point : snapshots) {
newSnapshotsList.add(point);
}
final BlobStoreIndexShardSnapshots updatedBlobStoreIndexShardSnapshots = new BlobStoreIndexShardSnapshots(newSnapshotsList);
final BlobStoreIndexShardSnapshots updatedBlobStoreIndexShardSnapshots = snapshots.withAddedSnapshot(
new SnapshotFiles(snapshotId.getName(), indexCommitPointFiles, context.stateIdentifier())
);
final Runnable afterWriteSnapBlob;
if (writeShardGens) {
// When using shard generations we can safely write the index-${uuid} blob before writing out any of the actual data
Expand Down Expand Up @@ -3253,19 +3241,12 @@ private ShardSnapshotMetaDeleteResult deleteFromShardSnapshotMeta(
long indexGeneration
) {
// Build a list of snapshots that should be preserved
List<SnapshotFiles> newSnapshotsList = new ArrayList<>();
final Set<String> survivingSnapshotNames = survivingSnapshots.stream().map(SnapshotId::getName).collect(Collectors.toSet());
for (SnapshotFiles point : snapshots) {
if (survivingSnapshotNames.contains(point.snapshot())) {
newSnapshotsList.add(point);
}
}
final BlobStoreIndexShardSnapshots updatedSnapshots = snapshots.withRetainedSnapshots(survivingSnapshots);
ShardGeneration writtenGeneration = null;
try {
if (newSnapshotsList.isEmpty()) {
if (updatedSnapshots.snapshots().isEmpty()) {
return new ShardSnapshotMetaDeleteResult(indexId, snapshotShardId, ShardGenerations.DELETED_SHARD_GEN, blobs);
} else {
final BlobStoreIndexShardSnapshots updatedSnapshots = new BlobStoreIndexShardSnapshots(newSnapshotsList);
if (indexGeneration < 0L) {
writtenGeneration = ShardGeneration.newGeneration();
INDEX_SHARD_SNAPSHOTS_FORMAT.write(updatedSnapshots, shardContainer, writtenGeneration.toBlobNamePart(), compress);
Expand Down

0 comments on commit bd624ba

Please sign in to comment.