Skip to content

Commit

Permalink
Fix handling of document failure expcetion in InternalEngine (#22718)
Browse files Browse the repository at this point in the history
Today we try to be smart and make a generic decision if an exception should
be treated as a document failure but in some cases concurrency in the index writer
make this decision very difficult since we don't have a consistent state in the case
another thread is currently failing the IndexWriter/InternalEngine due to a tragic event.

This change simplifies the exception handling and makes specific decisions about document failures
rather than using a generic heuristic. This prevent exceptions to be treated as document failures
that should have failed the engine but backed out of failing since since some other thread has
already taken over the failure procedure but didn't finish yet.
  • Loading branch information
s1monw committed Jan 20, 2017
1 parent f017842 commit 824beea
Show file tree
Hide file tree
Showing 10 changed files with 178 additions and 191 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Map;

/** Performs shard-level bulk (index, delete or update) operations */
Expand Down Expand Up @@ -424,7 +425,7 @@ private Translog.Location locationToSync(Translog.Location current, Translog.Loc
* Execute the given {@link IndexRequest} on a replica shard, throwing a
* {@link RetryOnReplicaException} if the operation needs to be re-tried.
*/
public static Engine.IndexResult executeIndexRequestOnReplica(IndexRequest request, IndexShard replica) {
public static Engine.IndexResult executeIndexRequestOnReplica(IndexRequest request, IndexShard replica) throws IOException {
final ShardId shardId = replica.shardId();
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, shardId.getIndexName(), request.type(), request.id(), request.source())
.routing(request.routing()).parent(request.parent());
Expand Down Expand Up @@ -483,12 +484,12 @@ public static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest reque
return primary.index(operation);
}

public static Engine.DeleteResult executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard primary) {
public static Engine.DeleteResult executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard primary) throws IOException {
final Engine.Delete delete = primary.prepareDeleteOnPrimary(request.type(), request.id(), request.version(), request.versionType());
return primary.delete(delete);
}

public static Engine.DeleteResult executeDeleteRequestOnReplica(DeleteRequest request, IndexShard replica) {
public static Engine.DeleteResult executeDeleteRequestOnReplica(DeleteRequest request, IndexShard replica) throws IOException {
final Engine.Delete delete = replica.prepareDeleteOnReplica(request.type(), request.id(),
request.getSeqNo(), request.primaryTerm(), request.version(), request.versionType());
return replica.delete(delete);
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ public Condition newCondition() {
*
* Note: engine level failures (i.e. persistent engine failures) are thrown
*/
public abstract IndexResult index(final Index index);
public abstract IndexResult index(final Index index) throws IOException;

/**
* Perform document delete operation on the engine
Expand All @@ -292,7 +292,7 @@ public Condition newCondition() {
*
* Note: engine level failures (i.e. persistent engine failures) are thrown
*/
public abstract DeleteResult delete(final Delete delete);
public abstract DeleteResult delete(final Delete delete) throws IOException;

public abstract NoOpResult noOp(final NoOp noOp);

Expand Down
180 changes: 78 additions & 102 deletions core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -445,27 +445,18 @@ public GetResult get(Get get, Function<String, Searcher> searcherFactory) throws
}

/**
* Checks for version conflicts. If a version conflict exists, the optional return value represents the operation result. Otherwise, if
* no conflicts are found, the optional return value is not present.
* Checks for version conflicts. If a non-critical version conflict exists <code>true</code> is returned. In the case of a critical
* version conflict (if operation origin is primary) a {@link VersionConflictEngineException} is thrown.
*
* @param <T> the result type
* @param op the operation
* @param currentVersion the current version
* @param expectedVersion the expected version
* @param deleted {@code true} if the current version is not found or represents a delete
* @param onSuccess if there is a version conflict that can be ignored, the result of the operation
* @param onFailure if there is a version conflict that can not be ignored, the result of the operation
* @return if there is a version conflict, the optional value is present and represents the operation result, otherwise the return value
* is not present
* @return <code>true</code> iff a non-critical version conflict (origin recovery or replica) is found otherwise <code>false</code>
* @throws VersionConflictEngineException if a critical version conflict was found where the operation origin is primary
* @throws IllegalArgumentException if an unsupported version type is used.
*/
private <T extends Result> Optional<T> checkVersionConflict(
final Operation op,
final long currentVersion,
final long expectedVersion,
final boolean deleted,
final Supplier<T> onSuccess,
final Function<VersionConflictEngineException, T> onFailure) {
final T result;
private boolean checkVersionConflict(final Operation op, final long currentVersion, final long expectedVersion, final boolean deleted) {
if (op.versionType() == VersionType.FORCE) {
if (engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
// If index was created in 5.0 or later, 'force' is not allowed at all
Expand All @@ -479,23 +470,19 @@ private <T extends Result> Optional<T> checkVersionConflict(
if (op.versionType().isVersionConflictForWrites(currentVersion, expectedVersion, deleted)) {
if (op.origin() == Operation.Origin.PRIMARY) {
// fatal version conflict
final VersionConflictEngineException e =
new VersionConflictEngineException(
throw new VersionConflictEngineException(
shardId,
op.type(),
op.id(),
op.versionType().explainConflictForWrites(currentVersion, expectedVersion, deleted));
result = onFailure.apply(e);

} else {
/*
* Version conflicts during recovery and on replicas are normal due to asynchronous execution; as such, we should return a
* successful result.
*/
result = onSuccess.get();
/* Version conflicts during recovery and on replicas are normal due to asynchronous execution; as such, we should return a
* successful result.*/
return true;
}
return Optional.of(result);
} else {
return Optional.empty();
return false;
}
}

Expand All @@ -510,7 +497,7 @@ private long checkDeletedAndGCed(VersionValue versionValue) {
}

@Override
public IndexResult index(Index index) {
public IndexResult index(Index index) throws IOException {
IndexResult result;
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
Expand All @@ -522,52 +509,15 @@ public IndexResult index(Index index) {
result = innerIndex(index);
}
}
} catch (Exception e) {
result = new IndexResult(checkIfDocumentFailureOrThrow(index, e), index.version(), SequenceNumbersService.UNASSIGNED_SEQ_NO);
}
return result;
}

/**
* Inspects exception thrown when executing index or delete operations
*
* @return failure if the failure is a document specific failure (e.g. analysis chain failure)
* or throws Exception if the failure caused the engine to fail (e.g. out of disk, lucene tragic event)
* <p>
* Note: pkg-private for testing
*/
final Exception checkIfDocumentFailureOrThrow(final Operation operation, final Exception failure) {
boolean isDocumentFailure;
try {
// When indexing a document into Lucene, Lucene distinguishes between environment related errors
// (like out of disk space) and document specific errors (like analysis chain problems) by setting
// the IndexWriter.getTragicEvent() value for the former. maybeFailEngine checks for these kind of
// errors and returns true if that is the case. We use that to indicate a document level failure
// and set the error in operation.setFailure. In case of environment related errors, the failure
// is bubbled up
isDocumentFailure = maybeFailEngine(operation.operationType().getLowercase(), failure) == false;
if (failure instanceof AlreadyClosedException) {
// ensureOpen throws AlreadyClosedException which is not a document level issue
isDocumentFailure = false;
} catch (RuntimeException | IOException e) {
try {
maybeFailEngine("index", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
} catch (Exception inner) {
// we failed checking whether the failure can fail the engine, treat it as a persistent engine failure
isDocumentFailure = false;
failure.addSuppressed(inner);
}
if (isDocumentFailure) {
return failure;
} else {
// throw original exception in case the exception caused the engine to fail
rethrow(failure);
return null;
throw e;
}
}

// hack to rethrow original exception in case of engine level failures during index/delete operation
@SuppressWarnings("unchecked")
private static <T extends Throwable> void rethrow(Throwable t) throws T {
throw (T) t;
return result;
}

private boolean canOptimizeAddDocument(Index index) {
Expand Down Expand Up @@ -610,9 +560,9 @@ private boolean assertSequenceNumber(final Engine.Operation.Origin origin, final
}

private IndexResult innerIndex(Index index) throws IOException {
// TODO we gotta split this method up it's too big!
assert assertSequenceNumber(index.origin(), index.seqNo());
final Translog.Location location;
final long updatedVersion;
long seqNo = index.seqNo();
try (Releasable ignored = acquireLock(index.uid())) {
lastWriteNanos = index.startTime();
Expand Down Expand Up @@ -678,14 +628,14 @@ private IndexResult innerIndex(Index index) throws IOException {
}
}
final long expectedVersion = index.version();
final Optional<IndexResult> resultOnVersionConflict =
checkVersionConflict(
index,
currentVersion,
expectedVersion,
deleted,
() -> new IndexResult(currentVersion, index.seqNo(), false),
e -> new IndexResult(e, currentVersion, index.seqNo()));
Optional<IndexResult> resultOnVersionConflict;
try {
final boolean isVersionConflict = checkVersionConflict(index, currentVersion, expectedVersion, deleted);
resultOnVersionConflict = isVersionConflict ? Optional.of(new IndexResult(currentVersion, index.seqNo(), false))
: Optional.empty();
} catch (IllegalArgumentException | VersionConflictEngineException ex) {
resultOnVersionConflict = Optional.of(new IndexResult(ex, currentVersion, index.seqNo()));
}

final IndexResult indexResult;
if (resultOnVersionConflict.isPresent()) {
Expand All @@ -702,18 +652,38 @@ private IndexResult innerIndex(Index index) throws IOException {
* primary term here has already been set, see IndexShard#prepareIndex where the Engine$Index operation is created.
*/
index.parsedDoc().updateSeqID(seqNo, index.primaryTerm());
updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);
final long updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);
index.parsedDoc().version().setLongValue(updatedVersion);

if (currentVersion == Versions.NOT_FOUND && forceUpdateDocument == false) {
// document does not exists, we can optimize for create, but double check if assertions are running
assert assertDocDoesNotExist(index, canOptimizeAddDocument == false);
index(index.docs(), indexWriter);
} else {
update(index.uid(), index.docs(), indexWriter);
IndexResult innerIndexResult;
try {
if (currentVersion == Versions.NOT_FOUND && forceUpdateDocument == false) {
// document does not exists, we can optimize for create, but double check if assertions are running
assert assertDocDoesNotExist(index, canOptimizeAddDocument == false);
index(index.docs(), indexWriter);
} else {
update(index.uid(), index.docs(), indexWriter);
}
versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion));
innerIndexResult = new IndexResult(updatedVersion, seqNo, deleted);
} catch (Exception ex) {
if (indexWriter.getTragicException() == null) {
/* There is no tragic event recorded so this must be a document failure.
*
* The handling inside IW doesn't guarantee that an tragic / aborting exception
* will be used as THE tragicEventException since if there are multiple exceptions causing an abort in IW
* only one wins. Yet, only the one that wins will also close the IW and in turn fail the engine such that
* we can potentially handle the exception before the engine is failed.
* Bottom line is that we can only rely on the fact that if it's a document failure then
* `indexWriter.getTragicException()` will be null otherwise we have to rethrow and treat it as fatal or rather
* non-document failure
*/
innerIndexResult = new IndexResult(ex, currentVersion, index.seqNo());
} else {
throw ex;
}
}
indexResult = new IndexResult(updatedVersion, seqNo, deleted);
versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion));
assert innerIndexResult != null;
indexResult = innerIndexResult;
}
if (!indexResult.hasFailure()) {
location = index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY
Expand All @@ -729,7 +699,6 @@ private IndexResult innerIndex(Index index) throws IOException {
seqNoService().markSeqNoAsCompleted(seqNo);
}
}

}

private static void index(final List<ParseContext.Document> docs, final IndexWriter indexWriter) throws IOException {
Expand Down Expand Up @@ -769,14 +738,19 @@ private static void update(final Term uid, final List<ParseContext.Document> doc
}

@Override
public DeleteResult delete(Delete delete) {
public DeleteResult delete(Delete delete) throws IOException {
DeleteResult result;
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
// NOTE: we don't throttle this when merges fall behind because delete-by-id does not create new segments:
result = innerDelete(delete);
} catch (Exception e) {
result = new DeleteResult(checkIfDocumentFailureOrThrow(delete, e), delete.version(), delete.seqNo());
} catch (RuntimeException | IOException e) {
try {
maybeFailEngine("index", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw e;
}
maybePruneDeletedTombstones();
return result;
Expand Down Expand Up @@ -811,15 +785,14 @@ private DeleteResult innerDelete(Delete delete) throws IOException {
}

final long expectedVersion = delete.version();

final Optional<DeleteResult> resultOnVersionConflict =
checkVersionConflict(
delete,
currentVersion,
expectedVersion,
deleted,
() -> new DeleteResult(expectedVersion, delete.seqNo(), true),
e -> new DeleteResult(e, expectedVersion, delete.seqNo()));
Optional<DeleteResult> resultOnVersionConflict;
try {
final boolean isVersionConflict = checkVersionConflict(delete, currentVersion, expectedVersion, deleted);
resultOnVersionConflict = isVersionConflict ? Optional.of(new DeleteResult(expectedVersion, delete.seqNo(), true))
: Optional.empty();
} catch (IllegalArgumentException | VersionConflictEngineException ex) {
resultOnVersionConflict = Optional.of(new DeleteResult(ex, expectedVersion, delete.seqNo()));
}
final DeleteResult deleteResult;
if (resultOnVersionConflict.isPresent()) {
deleteResult = resultOnVersionConflict.get();
Expand Down Expand Up @@ -852,6 +825,7 @@ private DeleteResult innerDelete(Delete delete) throws IOException {
}

private boolean deleteIfFound(Term uid, long currentVersion, boolean deleted, VersionValue versionValue) throws IOException {
assert uid != null : "uid must not be null";
final boolean found;
if (currentVersion == Versions.NOT_FOUND) {
// doc does not exist and no prior deletes
Expand All @@ -861,6 +835,8 @@ private boolean deleteIfFound(Term uid, long currentVersion, boolean deleted, Ve
found = false;
} else {
// we deleted a currently existing document
// any exception that comes from this is a either an ACE or a fatal exception there can't be any document failures coming
// from this.
indexWriter.deleteDocuments(uid);
found = true;
}
Expand Down
Loading

0 comments on commit 824beea

Please sign in to comment.