Skip to content

Commit

Permalink
[STORE]: Make use of Lucene build-in checksums
Browse files Browse the repository at this point in the history
Since Lucene version 4.8 each file has a checksum written as it's
footer. We used to calculate the checksums for all files transparently
on the filesystem layer (Directory / Store) which is now not necessary
anymore. This commit makes use of the new checksums in a backwards
compatible way such that files written with the old checksum mechanism
are still compared against the corresponding Alder32 checksum while
newer files are compared against the Lucene build in CRC32 checksum.

Since now every written file is checksummed by default this commit
also verifies the checksum for files during recovery and restore if
applicable.

Closes elastic#5924
  • Loading branch information
s1monw committed Jul 7, 2014
1 parent 7023caa commit 0736650
Show file tree
Hide file tree
Showing 41 changed files with 2,776 additions and 637 deletions.
33 changes: 33 additions & 0 deletions src/main/java/org/elasticsearch/ExceptionsHelper.java
Expand Up @@ -19,12 +19,15 @@

package org.elasticsearch;

import org.apache.lucene.index.CorruptIndexException;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.rest.RestStatus;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

/**
*
Expand Down Expand Up @@ -120,4 +123,34 @@ public static String stackTrace(Throwable e) {
e.printStackTrace(printWriter);
return stackTraceStringWriter.toString();
}

/**
* Rethrows the first exception in the list and adds all remaining to the suppressed list.
* If the given list is empty no exception is thrown
*
*/
public static <T extends Throwable> void rethrowAndSuppress(List<T> exceptions) throws T {
T main = null;
for (T ex : exceptions) {
if (main == null) {
main = ex;
} else {
main.addSuppressed(ex);
}
}
if (main != null) {
throw main;
}
}

public static <T extends Throwable> T unwrap(Throwable t, Class<T> clazz) {
if (t != null) {
do {
if (clazz.isInstance(t)) {
return clazz.cast(t);
}
} while ((t = t.getCause()) != null);
}
return null;
}
}
Expand Up @@ -247,7 +247,6 @@ public int getRelocatingShardCount() {
* no primary is found or the primary is not active.
*/
public MutableShardRouting activePrimary(ShardRouting shard) {
assert !shard.primary();
for (MutableShardRouting shardRouting : assignedShards(shard.shardId())) {
if (shardRouting.primary() && shardRouting.active()) {
return shardRouting;
Expand Down
Expand Up @@ -420,11 +420,23 @@ private boolean applyFailedShard(RoutingAllocation allocation, ShardRouting fail
}

RoutingNodes routingNodes = allocation.routingNodes();
boolean dirty = false;
if (failedShard.primary()) {
List<MutableShardRouting> initializingReplicas = new ArrayList<>();
for (MutableShardRouting shard : routingNodes.assignedShards(failedShard)){
if (!shard.primary() && shard.initializing()) {
initializingReplicas.add(shard);
}
}
// we can't do this in the loop above since we 's
for (MutableShardRouting shard : initializingReplicas) {
dirty |= applyFailedShard(allocation, shard, addToIgnoreList);
}
}
if (failedShard.relocatingNodeId() != null) {
// the shard is relocating, either in initializing (recovery from another node) or relocating (moving to another node)
if (failedShard.state() == INITIALIZING) {
// the shard is initializing and recovering from another node
boolean dirty = false;
// first, we need to cancel the current node that is being initialized
RoutingNodes.RoutingNodeIterator initializingNode = routingNodes.routingNodeIter(failedShard.currentNodeId());
if (initializingNode != null) {
Expand Down Expand Up @@ -459,7 +471,6 @@ private boolean applyFailedShard(RoutingAllocation allocation, ShardRouting fail
}
return dirty;
} else if (failedShard.state() == RELOCATING) {
boolean dirty = false;
// the shard is relocating, meaning its the source the shard is relocating from
// first, we need to cancel the current relocation from the current node
// now, find the node that we are recovering from, cancel the relocation, remove it from the node
Expand Down Expand Up @@ -497,13 +508,11 @@ private boolean applyFailedShard(RoutingAllocation allocation, ShardRouting fail
} else {
logger.debug("failed shard {} not found in routingNodes, ignoring it", failedShard);
}
return dirty;
} else {
throw new ElasticsearchIllegalStateException("illegal state for a failed shard, relocating node id is set, but state does not match: " + failedShard);
}
} else {
// the shard is not relocating, its either started, or initializing, just cancel it and move on...
boolean dirty = false;
RoutingNodes.RoutingNodeIterator node = routingNodes.routingNodeIter(failedShard.currentNodeId());
if (node != null) {
while (node.hasNext()) {
Expand Down Expand Up @@ -541,7 +550,7 @@ private boolean applyFailedShard(RoutingAllocation allocation, ShardRouting fail
if (!dirty) {
logger.debug("failed shard {} not found in routingNodes, ignoring it", failedShard);
}
return dirty;
}
return dirty;
}
}
Expand Up @@ -54,6 +54,8 @@ public class ThrottlingAllocationDecider extends AllocationDecider {

public static final String CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES = "cluster.routing.allocation.node_initial_primaries_recoveries";
public static final String CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES = "cluster.routing.allocation.node_concurrent_recoveries";
public static final String CLUSTER_ROUTING_ALLOCATION_CONCURRENT_RECOVERIES = "cluster.routing.allocation.concurrent_recoveries";

public static final int DEFAULT_CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES = 2;
public static final int DEFAULT_CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES = 4;

Expand All @@ -65,7 +67,7 @@ public ThrottlingAllocationDecider(Settings settings, NodeSettingsService nodeSe
super(settings);

this.primariesInitialRecoveries = settings.getAsInt(CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES, DEFAULT_CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES);
this.concurrentRecoveries = settings.getAsInt("cluster.routing.allocation.concurrent_recoveries", settings.getAsInt(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES, DEFAULT_CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES));
this.concurrentRecoveries = settings.getAsInt(CLUSTER_ROUTING_ALLOCATION_CONCURRENT_RECOVERIES, settings.getAsInt(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES, DEFAULT_CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES));
logger.debug("using node_concurrent_recoveries [{}], node_initial_primaries_recoveries [{}]", concurrentRecoveries, primariesInitialRecoveries);

nodeSettingsService.addListener(new ApplySettings());
Expand Down
36 changes: 32 additions & 4 deletions src/main/java/org/elasticsearch/common/lucene/Lucene.java
Expand Up @@ -21,15 +21,17 @@

import org.apache.lucene.analysis.core.KeywordAnalyzer;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.index.AtomicReaderContext;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.*;
import org.apache.lucene.search.*;
import org.apache.lucene.store.ChecksumIndexInput;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.Version;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -96,6 +98,28 @@ public static SegmentInfos readSegmentInfos(Directory directory) throws IOExcept
return sis;
}

public static void checkSegmentInfoIntegrity(final Directory directory) throws IOException {
new SegmentInfos.FindSegmentsFile(directory) {

@Override
protected Object doBody(String segmentFileName) throws IOException {
try (IndexInput input = directory.openInput(segmentFileName, IOContext.READ)) {
final int format = input.readInt();
final int actualFormat;
if (format == CodecUtil.CODEC_MAGIC) {
// 4.0+
actualFormat = CodecUtil.checkHeaderNoMagic(input, "segments", SegmentInfos.VERSION_40, Integer.MAX_VALUE);
if (actualFormat >= SegmentInfos.VERSION_48) {
CodecUtil.checksumEntireFile(input);
}
}
// legacy....
}
return null;
}
}.run();
}

public static long count(IndexSearcher searcher, Query query) throws IOException {
TotalHitCountCollector countCollector = new TotalHitCountCollector();
// we don't need scores, so wrap it in a constant score query
Expand Down Expand Up @@ -371,4 +395,8 @@ private Lucene() {
public static final boolean indexExists(final Directory directory) throws IOException {
return DirectoryReader.indexExists(directory);
}

public static boolean isCorruptionException(Throwable t) {
return ExceptionsHelper.unwrap(t, CorruptIndexException.class) != null;
}
}
Expand Up @@ -50,9 +50,7 @@
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
import org.elasticsearch.transport.ConnectTransportException;

import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.ConcurrentMap;

/**
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/elasticsearch/index/engine/Engine.java
Expand Up @@ -135,7 +135,7 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
void recover(RecoveryHandler recoveryHandler) throws EngineException;

/** fail engine due to some error. the engine will also be closed. */
void failEngine(String reason, @Nullable Throwable failure);
void failEngine(String reason, Throwable failure);

static interface FailedEngineListener {
void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable t);
Expand Down
Expand Up @@ -92,6 +92,7 @@
*/
public class InternalEngine extends AbstractIndexShardComponent implements Engine {

private volatile boolean failEngineOnCorruption;
private volatile ByteSizeValue indexingBufferSize;
private volatile int indexConcurrency;
private volatile boolean compoundOnFlush = true;
Expand Down Expand Up @@ -201,7 +202,7 @@ public InternalEngine(ShardId shardId, @IndexSettings Settings indexSettings, Th
this.optimizeAutoGenerateId = indexSettings.getAsBoolean("index.optimize_auto_generated_id", true);

this.indexSettingsService.addListener(applySettings);

this.failEngineOnCorruption = indexSettings.getAsBoolean(ENGINE_FAIL_ON_CORRUPTION, false);
this.failOnMergeFailure = indexSettings.getAsBoolean(INDEX_FAIL_ON_MERGE_FAILURE, true);
if (failOnMergeFailure) {
this.mergeScheduler.addFailureListener(new FailEngineOnMergeFailure());
Expand Down Expand Up @@ -397,7 +398,7 @@ public void create(Create create) throws EngineException {

private void maybeFailEngine(Throwable t) {
if (t instanceof OutOfMemoryError || (t instanceof IllegalStateException && t.getMessage().contains("OutOfMemoryError"))) {
failEngine("out of memory", t);
failEngine("out of memory", t, false);
}
}

Expand Down Expand Up @@ -989,13 +990,15 @@ public void recover(RecoveryHandler recoveryHandler) throws EngineException {
try {
phase1Snapshot = deletionPolicy.snapshot();
} catch (Throwable e) {
failEngineIfCorrupted(e, "recovery");
Releasables.closeWhileHandlingException(onGoingRecoveries);
throw new RecoveryEngineException(shardId, 1, "Snapshot failed", e);
}

try {
recoveryHandler.phase1(phase1Snapshot);
} catch (Throwable e) {
failEngineIfCorrupted(e, "recovery phase 1");
Releasables.closeWhileHandlingException(onGoingRecoveries, phase1Snapshot);
throw new RecoveryEngineException(shardId, 1, "Execution failed", wrapIfClosed(e));
}
Expand All @@ -1004,13 +1007,14 @@ public void recover(RecoveryHandler recoveryHandler) throws EngineException {
try {
phase2Snapshot = translog.snapshot();
} catch (Throwable e) {
failEngineIfCorrupted(e, "snapshot recovery");
Releasables.closeWhileHandlingException(onGoingRecoveries, phase1Snapshot);
throw new RecoveryEngineException(shardId, 2, "Snapshot failed", wrapIfClosed(e));
}

try {
recoveryHandler.phase2(phase2Snapshot);
} catch (Throwable e) {
failEngineIfCorrupted(e, "recovery phase 2");
Releasables.closeWhileHandlingException(onGoingRecoveries, phase1Snapshot, phase2Snapshot);
throw new RecoveryEngineException(shardId, 2, "Execution failed", wrapIfClosed(e));
}
Expand All @@ -1023,13 +1027,24 @@ public void recover(RecoveryHandler recoveryHandler) throws EngineException {
recoveryHandler.phase3(phase3Snapshot);
success = true;
} catch (Throwable e) {
failEngineIfCorrupted(e, "recovery phase 3");
throw new RecoveryEngineException(shardId, 3, "Execution failed", wrapIfClosed(e));
} finally {
Releasables.close(success, onGoingRecoveries, writeLock, phase1Snapshot,
phase2Snapshot, phase3Snapshot); // hmm why can't we use try-with here?
}
}

private void failEngineIfCorrupted(Throwable e, String source) {
if (this.failEngineOnCorruption) {
if (Lucene.isCorruptionException(e)) {
failEngine("corrupt file detected by [" + source + "]", e, true);
} else {
logger.warn("Detected file corruption due to {}", e, source);
}
}
}

private Throwable wrapIfClosed(Throwable t) {
if (closed) {
return new EngineClosedException(shardId, t);
Expand Down Expand Up @@ -1180,8 +1195,14 @@ public void onFailedMerge(MergePolicy.MergeException e) {
}

@Override
public void failEngine(String reason, @Nullable Throwable failure) {
public void failEngine(String reason, Throwable failure) {
failEngine(reason, failure, Lucene.isCorruptionException(failure));
}

private void failEngine(String reason, Throwable failure, boolean markCorrupted) {
assert failure != null;
if (failEngineLock.tryLock()) {

assert !readLock.assertLockIsHeld() : "readLock is held by a thread that tries to fail the engine";
if (failedEngine != null) {
logger.debug("tried to fail engine but engine is already failed. ignoring. [{}]", reason, failure);
Expand All @@ -1190,17 +1211,23 @@ public void failEngine(String reason, @Nullable Throwable failure) {
try {
logger.warn("failed engine [{}]", reason, failure);
// we must set a failure exception, generate one if not supplied
if (failure == null) {
failedEngine = new EngineException(shardId(), reason);
} else {
failedEngine = failure;
}
failedEngine = failure;
for (FailedEngineListener listener : failedEngineListeners) {
listener.onFailedEngine(shardId, reason, failure);
}
} finally {
// close the engine whatever happens...
close();
try {
if (markCorrupted) {
try {
store.markStoreCorrupted();
} catch (IOException e) {
logger.trace("Couldn't marks store corrupted", e);
}
}
} finally {
// close the engine whatever happens...
close();
}
}

} else {
Expand Down Expand Up @@ -1302,6 +1329,8 @@ public void warm(AtomicReader reader) throws IOException {
public static final String INDEX_COMPOUND_ON_FLUSH = "index.compound_on_flush";
public static final String INDEX_GC_DELETES = "index.gc_deletes";
public static final String INDEX_FAIL_ON_MERGE_FAILURE = "index.fail_on_merge_failure";
public static final String ENGINE_FAIL_ON_CORRUPTION = "index.fail_on_corruption";


class ApplySettings implements IndexSettingsService.Listener {

Expand All @@ -1320,6 +1349,7 @@ public void onRefreshSettings(Settings settings) {
indexWriter.getConfig().setUseCompoundFile(compoundOnFlush);
}

InternalEngine.this.failEngineOnCorruption = indexSettings.getAsBoolean(ENGINE_FAIL_ON_CORRUPTION, InternalEngine.this.failEngineOnCorruption);
int indexConcurrency = settings.getAsInt(INDEX_INDEX_CONCURRENCY, InternalEngine.this.indexConcurrency);
boolean failOnMergeFailure = settings.getAsBoolean(INDEX_FAIL_ON_MERGE_FAILURE, InternalEngine.this.failOnMergeFailure);
String codecName = settings.get(INDEX_CODEC, InternalEngine.this.codecName);
Expand Down
Expand Up @@ -126,6 +126,9 @@ public void recover(boolean indexShouldExists, RecoveryState recoveryState) thro
files += " (failure=" + ExceptionsHelper.detailedMessage(e1) + ")";
}
if (indexShouldExists && indexShard.store().indexStore().persistent()) {
if (indexShard.store().isMarkedCorrupted()) {
throw new IndexShardGatewayRecoveryException(shardId(), "Shard is corrupted can't recover " + files, e);
}
throw new IndexShardGatewayRecoveryException(shardId(), "shard allocated for local recovery (post api), should exist, but doesn't, current files: " + files, e);
}
}
Expand Down Expand Up @@ -162,9 +165,13 @@ public void recover(boolean indexShouldExists, RecoveryState recoveryState) thro
totalSizeInBytes += length;
recoveryState.getIndex().addFileDetail(name, length, length);
}
recoveryState.getIndex().files(numberOfFiles, totalSizeInBytes, numberOfFiles, totalSizeInBytes);
recoveryState.getIndex().recoveredFileCount(numberOfFiles);
recoveryState.getIndex().recoveredByteCount(totalSizeInBytes);
RecoveryState.Index index = recoveryState.getIndex();
index.totalFileCount(numberOfFiles);
index.totalByteCount(totalSizeInBytes);
index.reusedFileCount(numberOfFiles);
index.reusedByteCount(totalSizeInBytes);
index.recoveredFileCount(numberOfFiles);
index.recoveredByteCount(totalSizeInBytes);
} catch (Exception e) {
// ignore
}
Expand Down
Expand Up @@ -53,7 +53,6 @@ public LogDocMergePolicyProvider(Store store, IndexSettingsService indexSettings
super(store);
Preconditions.checkNotNull(store, "Store must be provided to merge policy");
this.indexSettingsService = indexSettingsService;

this.minMergeDocs = componentSettings.getAsInt("min_merge_docs", LogDocMergePolicy.DEFAULT_MIN_MERGE_DOCS);
this.maxMergeDocs = componentSettings.getAsInt("max_merge_docs", LogDocMergePolicy.DEFAULT_MAX_MERGE_DOCS);
this.mergeFactor = componentSettings.getAsInt("merge_factor", LogDocMergePolicy.DEFAULT_MERGE_FACTOR);
Expand Down

0 comments on commit 0736650

Please sign in to comment.