Skip to content

Commit

Permalink
[STORE] Improve recovery / snapshot restoring file identity handling
Browse files Browse the repository at this point in the history
This commit changes the way how files are selected for retransmission
on recovery / restore. Today this happens on a per-file basis where the
rather weak checksum and the file length in bytes is compared to check if
a file is identical. This is prone to fail in the case of a checksum collision
which can happen under certain circumstances.
The changes in this commit move the identity comparsion to a per-commit / per-segment
level where files are only treated as identical iff all the other files in the
commit / segment are the same. This "all or nothing" strategy is reducing the chance for
a collision dramatically since we also use a strong hash to identify commits / segments
based on the content of the ".si" / "segments.N" file.

Closes #7351
  • Loading branch information
s1monw committed Aug 21, 2014
1 parent 2f0da0a commit 8263de9
Show file tree
Hide file tree
Showing 7 changed files with 504 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.index.snapshots.blobstore;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.store.*;
Expand Down Expand Up @@ -49,10 +50,7 @@
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -716,9 +714,9 @@ public void restore() {
long totalSize = 0;
int numberOfReusedFiles = 0;
long reusedTotalSize = 0;
Map<String, StoreFileMetaData> metadata = Collections.emptyMap();
Store.MetadataSnapshot recoveryTargetMetadata = Store.MetadataSnapshot.EMPTY;
try {
metadata = store.getMetadata().asMap();
recoveryTargetMetadata = store.getMetadata();
} catch (CorruptIndexException e) {
logger.warn("{} Can't read metadata from store", e, shardId);
throw new IndexShardRestoreFailedException(shardId, "Can't restore corrupted shard", e);
Expand All @@ -727,33 +725,42 @@ public void restore() {
logger.warn("{} Can't read metadata from store", e, shardId);
}

List<FileInfo> filesToRecover = Lists.newArrayList();
for (FileInfo fileInfo : snapshot.indexFiles()) {
String fileName = fileInfo.physicalName();
final StoreFileMetaData md = metadata.get(fileName);
final List<FileInfo> filesToRecover = Lists.newArrayList();
final Map<String, StoreFileMetaData> snapshotMetaData = new HashMap<>();
final Map<String, FileInfo> fileInfos = new HashMap<>();

for (final FileInfo fileInfo : snapshot.indexFiles()) {
snapshotMetaData.put(fileInfo.metadata().name(), fileInfo.metadata());
fileInfos.put(fileInfo.metadata().name(), fileInfo);
}
final Store.MetadataSnapshot sourceMetaData = new Store.MetadataSnapshot(snapshotMetaData);
final Store.RecoveryDiff diff = sourceMetaData.recoveryDiff(recoveryTargetMetadata);
for (StoreFileMetaData md : diff.identical) {
FileInfo fileInfo = fileInfos.get(md.name());
numberOfFiles++;
if (md != null && fileInfo.isSame(md)) {
totalSize += md.length();
numberOfReusedFiles++;
reusedTotalSize += md.length();
recoveryState.getIndex().addReusedFileDetail(fileInfo.name(), fileInfo.length());
if (logger.isTraceEnabled()) {
logger.trace("[{}] [{}] not_recovering [{}] from [{}], exists in local store and is same", shardId, snapshotId, fileInfo.physicalName(), fileInfo.name());
}
} else {
totalSize += fileInfo.length();
filesToRecover.add(fileInfo);
recoveryState.getIndex().addFileDetail(fileInfo.name(), fileInfo.length());
if (logger.isTraceEnabled()) {
if (md == null) {
logger.trace("[{}] [{}] recovering [{}] from [{}], does not exists in local store", shardId, snapshotId, fileInfo.physicalName(), fileInfo.name());
} else {
logger.trace("[{}] [{}] recovering [{}] from [{}], exists in local store but is different", shardId, snapshotId, fileInfo.physicalName(), fileInfo.name());
}
}
totalSize += md.length();
numberOfReusedFiles++;
reusedTotalSize += md.length();
recoveryState.getIndex().addReusedFileDetail(fileInfo.name(), fileInfo.length());
if (logger.isTraceEnabled()) {
logger.trace("[{}] [{}] not_recovering [{}] from [{}], exists in local store and is same", shardId, snapshotId, fileInfo.physicalName(), fileInfo.name());
}
}

for (StoreFileMetaData md : Iterables.concat(diff.different, diff.missing)) {
FileInfo fileInfo = fileInfos.get(md.name());
numberOfFiles++;
totalSize += fileInfo.length();
filesToRecover.add(fileInfo);
recoveryState.getIndex().addFileDetail(fileInfo.name(), fileInfo.length());
if (logger.isTraceEnabled()) {
if (md == null) {
logger.trace("[{}] [{}] recovering [{}] from [{}], does not exists in local store", shardId, snapshotId, fileInfo.physicalName(), fileInfo.name());
} else {
logger.trace("[{}] [{}] recovering [{}] from [{}], exists in local store but is different", shardId, snapshotId, fileInfo.physicalName(), fileInfo.name());
}
}
}
final RecoveryState.Index index = recoveryState.getIndex();
index.totalFileCount(numberOfFiles);
index.totalByteCount(totalSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
package org.elasticsearch.index.snapshots.blobstore;

import com.google.common.collect.ImmutableList;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.Version;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.ToXContent;
Expand Down Expand Up @@ -196,6 +198,7 @@ static final class Fields {
static final XContentBuilderString CHECKSUM = new XContentBuilderString("checksum");
static final XContentBuilderString PART_SIZE = new XContentBuilderString("part_size");
static final XContentBuilderString WRITTEN_BY = new XContentBuilderString("written_by");
static final XContentBuilderString META_HASH = new XContentBuilderString("meta_hash");
}

/**
Expand All @@ -221,6 +224,10 @@ public static void toXContent(FileInfo file, XContentBuilder builder, ToXContent
if (file.metadata.writtenBy() != null) {
builder.field(Fields.WRITTEN_BY, file.metadata.writtenBy());
}

if (file.metadata.hash() != null && file.metadata().hash().length > 0) {
builder.field(Fields.META_HASH, new BytesArray(file.metadata.hash()));
}
builder.endObject();
}

Expand All @@ -239,6 +246,7 @@ public static FileInfo fromXContent(XContentParser parser) throws IOException {
String checksum = null;
ByteSizeValue partSize = null;
Version writtenBy = null;
BytesRef metaHash = new BytesRef();
if (token == XContentParser.Token.START_OBJECT) {
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
Expand All @@ -257,6 +265,10 @@ public static FileInfo fromXContent(XContentParser parser) throws IOException {
partSize = new ByteSizeValue(parser.longValue());
} else if ("written_by".equals(currentFieldName)) {
writtenBy = Lucene.parseVersionLenient(parser.text(), null);
} else if ("meta_hash".equals(currentFieldName)) {
metaHash.bytes = parser.binaryValue();
metaHash.offset = 0;
metaHash.length = metaHash.bytes.length;
} else {
throw new ElasticsearchParseException("unknown parameter [" + currentFieldName + "]");
}
Expand All @@ -269,7 +281,7 @@ public static FileInfo fromXContent(XContentParser parser) throws IOException {
}
}
// TODO: Verify???
return new FileInfo(name, new StoreFileMetaData(physicalName, length, checksum, writtenBy), partSize);
return new FileInfo(name, new StoreFileMetaData(physicalName, length, checksum, writtenBy, metaHash), partSize);
}

}
Expand Down

0 comments on commit 8263de9

Please sign in to comment.