Skip to content

Commit

Permalink
Snapshot/Restore: keep the last legacy checksums file at the end of r…
Browse files Browse the repository at this point in the history
…estore

 This commit fixes the issue caused by restore process deleting all legacy checksum files at the end of restore process. Instead it keeps the latest version of the checksum intact. The issue manifests itself in losing checksum for all legacy files restored into post 1.3.0 cluster, which in turn causes unnecessary snapshotting of files that didn't change.

Fixes #8119
  • Loading branch information
imotov committed Nov 18, 2014
1 parent a7694b4 commit e516ee5
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 15 deletions.
Expand Up @@ -850,7 +850,7 @@ public void restore() throws IOException {
/// now, go over and clean files that are in the store, but were not in the snapshot
try {
for (String storeFile : store.directory().listAll()) {
if (!snapshot.containPhysicalIndexFile(storeFile)) {
if (!Store.isChecksum(storeFile) && !snapshot.containPhysicalIndexFile(storeFile)) {
try {
store.directory().deleteFile(storeFile);
} catch (IOException e) {
Expand Down
68 changes: 54 additions & 14 deletions src/main/java/org/elasticsearch/index/store/Store.java
Expand Up @@ -32,10 +32,12 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.lucene.Directories;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -519,6 +521,8 @@ public String toString() {
* @see StoreFileMetaData
*/
public final static class MetadataSnapshot implements Iterable<StoreFileMetaData> {
private static final ESLogger logger = Loggers.getLogger(MetadataSnapshot.class);

private final Map<String, StoreFileMetaData> metadata;

public static final MetadataSnapshot EMPTY = new MetadataSnapshot();
Expand All @@ -537,7 +541,7 @@ public MetadataSnapshot(Map<String, StoreFileMetaData> metadata) {

ImmutableMap<String, StoreFileMetaData> buildMetadata(IndexCommit commit, Directory directory, ESLogger logger) throws IOException {
ImmutableMap.Builder<String, StoreFileMetaData> builder = ImmutableMap.builder();
Map<String, String> checksumMap = readLegacyChecksums(directory);
Tuple<Map<String, String>, Long> tuple = readLegacyChecksums(directory);
try {
final SegmentInfos segmentCommitInfos = Store.readSegmentsInfo(commit, directory);
Version maxVersion = Version.LUCENE_3_0; // we don't know which version was used to write so we take the max version.
Expand All @@ -547,7 +551,7 @@ ImmutableMap<String, StoreFileMetaData> buildMetadata(IndexCommit commit, Direct
maxVersion = version;
}
for (String file : info.files()) {
String legacyChecksum = checksumMap.get(file);
String legacyChecksum = tuple.v1().get(file);
if (version.onOrAfter(Version.LUCENE_4_8) && legacyChecksum == null) {
checksumFromLuceneFile(directory, file, builder, logger, version, Lucene46SegmentInfoFormat.SI_EXTENSION.equals(IndexFileNames.getExtension(file)));
} else {
Expand All @@ -556,7 +560,7 @@ ImmutableMap<String, StoreFileMetaData> buildMetadata(IndexCommit commit, Direct
}
}
final String segmentsFile = segmentCommitInfos.getSegmentsFileName();
String legacyChecksum = checksumMap.get(segmentsFile);
String legacyChecksum = tuple.v1().get(segmentsFile);
if (maxVersion.onOrAfter(Version.LUCENE_4_8) && legacyChecksum == null) {
checksumFromLuceneFile(directory, segmentsFile, builder, logger, maxVersion, true);
} else {
Expand All @@ -582,7 +586,17 @@ ImmutableMap<String, StoreFileMetaData> buildMetadata(IndexCommit commit, Direct
return builder.build();
}

static Map<String, String> readLegacyChecksums(Directory directory) throws IOException {
/**
* Reads legacy checksum files found in the directory.
*
* Files are expected to start with _checksums- prefix
* followed by long file version. Only file with the highest version is read, all other files are ignored.
*
* @param directory the directory to read checksums from
* @return a map of file checksums and the checksum file version
* @throws IOException
*/
static Tuple<Map<String, String>, Long> readLegacyChecksums(Directory directory) throws IOException {
synchronized (directory) {
long lastFound = -1;
for (String name : directory.listAll()) {
Expand All @@ -597,10 +611,34 @@ static Map<String, String> readLegacyChecksums(Directory directory) throws IOExc
if (lastFound > -1) {
try (IndexInput indexInput = directory.openInput(CHECKSUMS_PREFIX + lastFound, IOContext.READONCE)) {
indexInput.readInt(); // version
return indexInput.readStringStringMap();
return new Tuple(indexInput.readStringStringMap(), lastFound);
}
}
return new Tuple(new HashMap<>(), -1l);
}
}

/**
* Deletes all checksum files with version lower than newVersion.
*
* @param directory the directory to clean
* @param newVersion the latest checksum file version
* @throws IOException
*/
static void cleanLegacyChecksums(Directory directory, long newVersion) throws IOException {
synchronized (directory) {
for (String name : directory.listAll()) {
if (isChecksum(name)) {
long current = Long.parseLong(name.substring(CHECKSUMS_PREFIX.length()));
if (current < newVersion) {
try {
directory.deleteFile(name);
} catch (IOException ex) {
logger.debug("can't delete old checksum file [{}]", ex, name);
}
}
}
}
return new HashMap<>();
}
}

Expand Down Expand Up @@ -805,24 +843,26 @@ public void add(StoreFileMetaData metaData) throws IOException {

public synchronized void write(Store store) throws IOException {
synchronized (store.distributorDirectory) {
Map<String, String> stringStringMap = MetadataSnapshot.readLegacyChecksums(store.distributorDirectory);
stringStringMap.putAll(legacyChecksums);
if (!stringStringMap.isEmpty()) {
writeChecksums(store.directory, stringStringMap);
Tuple<Map<String, String>, Long> tuple = MetadataSnapshot.readLegacyChecksums(store.distributorDirectory);
tuple.v1().putAll(legacyChecksums);
if (!tuple.v1().isEmpty()) {
writeChecksums(store.directory, tuple.v1(), tuple.v2());
}
}
}

synchronized void writeChecksums(Directory directory, Map<String, String> checksums) throws IOException {
String checksumName = CHECKSUMS_PREFIX + System.currentTimeMillis();
while (directory.fileExists(checksumName)) {
checksumName = CHECKSUMS_PREFIX + System.currentTimeMillis();
synchronized void writeChecksums(Directory directory, Map<String, String> checksums, long lastVersion) throws IOException {
long nextVersion = System.currentTimeMillis();
while (nextVersion <= lastVersion) {
nextVersion = System.currentTimeMillis();
}
final String checksumName = CHECKSUMS_PREFIX + nextVersion;
try (IndexOutput output = directory.createOutput(checksumName, IOContext.DEFAULT)) {
output.writeInt(0); // version
output.writeStringStringMap(checksums);
}
directory.sync(Collections.singleton(checksumName));
MetadataSnapshot.cleanLegacyChecksums(directory, nextVersion);
}

public void clear() {
Expand Down

0 comments on commit e516ee5

Please sign in to comment.