Skip to content

Commit

Permalink
[STORE]: Make use of Lucene build-in checksums
Browse files Browse the repository at this point in the history
Since Lucene version 4.8 each file has a checksum written as it's
footer. We used to calculate the checksums for all files transparently
on the filesystem layer (Directory / Store) which is now not necessary
anymore. This commit makes use of the new checksums in a backwards
compatible way such that files written with the old checksum mechanism
are still compared against the corresponding Alder32 checksum while
newer files are compared against the Lucene build in CRC32 checksum.

Since now every written file is checksummed by default this commit
also verifies the checksum for files during recovery and restore if
applicable.

Closes #5924

This commit also has a fix for #6808 since the added tests in
`CorruptedFileTest.java` exposed the issue.

Closes #6808
  • Loading branch information
s1monw committed Jul 10, 2014
1 parent 9ca5e6e commit 72e6150
Show file tree
Hide file tree
Showing 47 changed files with 2,710 additions and 1,088 deletions.
101 changes: 0 additions & 101 deletions src/main/java/org/apache/lucene/store/BufferedChecksumIndexOutput.java

This file was deleted.

155 changes: 0 additions & 155 deletions src/main/java/org/apache/lucene/store/BufferedIndexOutput.java

This file was deleted.

44 changes: 44 additions & 0 deletions src/main/java/org/elasticsearch/ExceptionsHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.List;

/**
*
Expand Down Expand Up @@ -120,4 +121,47 @@ public static String stackTrace(Throwable e) {
e.printStackTrace(printWriter);
return stackTraceStringWriter.toString();
}

/**
* Rethrows the first exception in the list and adds all remaining to the suppressed list.
* If the given list is empty no exception is thrown
*
*/
public static <T extends Throwable> void rethrowAndSuppress(List<T> exceptions) throws T {
T main = null;
for (T ex : exceptions) {
if (main == null) {
main = ex;
} else {
main.addSuppressed(ex);
}
}
if (main != null) {
throw main;
}
}

public static <T extends Throwable> T unwrap(Throwable t, Class<T> clazz) {
if (t != null) {
do {
if (clazz.isInstance(t)) {
return clazz.cast(t);
}
} while ((t = t.getCause()) != null);
}
return null;
}

/**
* Returns <code>true</code> iff the given throwable is and OutOfMemoryException, otherwise <code>false</code>
*/
public static boolean isOOM(Throwable t) {
return t != null
&& (t instanceof OutOfMemoryError
|| (t instanceof IllegalStateException
&& t.getMessage() != null
&& t.getMessage().contains("OutOfMemoryError")
)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,6 @@ public int getRelocatingShardCount() {
* no primary is found or the primary is not active.
*/
public MutableShardRouting activePrimary(ShardRouting shard) {
assert !shard.primary();
for (MutableShardRouting shardRouting : assignedShards(shard.shardId())) {
if (shardRouting.primary() && shardRouting.active()) {
return shardRouting;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,11 +420,28 @@ private boolean applyFailedShard(RoutingAllocation allocation, ShardRouting fail
}

RoutingNodes routingNodes = allocation.routingNodes();
boolean dirty = false;
if (failedShard.primary()) {
// we have to fail the initializing replicas if the primary fails
// since they might now yet have started the recovery and then they will
// stick in the cluster-state forever since the replica has a retry logic that
// retries infinitely in that case.
List<MutableShardRouting> initializingReplicas = new ArrayList<>();
for (MutableShardRouting shard : routingNodes.assignedShards(failedShard)){
if (!shard.primary() && shard.initializing()) {
initializingReplicas.add(shard);
}
}
// we can't do this in the loop above since we modify the iterator and will get
// concurrent modification exceptions
for (MutableShardRouting shard : initializingReplicas) {
dirty |= applyFailedShard(allocation, shard, addToIgnoreList);
}
}
if (failedShard.relocatingNodeId() != null) {
// the shard is relocating, either in initializing (recovery from another node) or relocating (moving to another node)
if (failedShard.state() == INITIALIZING) {
// the shard is initializing and recovering from another node
boolean dirty = false;
// first, we need to cancel the current node that is being initialized
RoutingNodes.RoutingNodeIterator initializingNode = routingNodes.routingNodeIter(failedShard.currentNodeId());
if (initializingNode != null) {
Expand Down Expand Up @@ -459,7 +476,6 @@ private boolean applyFailedShard(RoutingAllocation allocation, ShardRouting fail
}
return dirty;
} else if (failedShard.state() == RELOCATING) {
boolean dirty = false;
// the shard is relocating, meaning its the source the shard is relocating from
// first, we need to cancel the current relocation from the current node
// now, find the node that we are recovering from, cancel the relocation, remove it from the node
Expand Down Expand Up @@ -497,13 +513,11 @@ private boolean applyFailedShard(RoutingAllocation allocation, ShardRouting fail
} else {
logger.debug("failed shard {} not found in routingNodes, ignoring it", failedShard);
}
return dirty;
} else {
throw new ElasticsearchIllegalStateException("illegal state for a failed shard, relocating node id is set, but state does not match: " + failedShard);
}
} else {
// the shard is not relocating, its either started, or initializing, just cancel it and move on...
boolean dirty = false;
RoutingNodes.RoutingNodeIterator node = routingNodes.routingNodeIter(failedShard.currentNodeId());
if (node != null) {
while (node.hasNext()) {
Expand Down Expand Up @@ -541,7 +555,7 @@ private boolean applyFailedShard(RoutingAllocation allocation, ShardRouting fail
if (!dirty) {
logger.debug("failed shard {} not found in routingNodes, ignoring it", failedShard);
}
return dirty;
}
return dirty;
}
}
Loading

0 comments on commit 72e6150

Please sign in to comment.