Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix handling of document failure exception in InternalEngine #22718

Merged
merged 1 commit into from Jan 20, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -63,6 +63,7 @@
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Map;

/** Performs shard-level bulk (index, delete or update) operations */
Expand Down Expand Up @@ -424,7 +425,7 @@ private Translog.Location locationToSync(Translog.Location current, Translog.Loc
* Execute the given {@link IndexRequest} on a replica shard, throwing a
* {@link RetryOnReplicaException} if the operation needs to be re-tried.
*/
public static Engine.IndexResult executeIndexRequestOnReplica(IndexRequest request, IndexShard replica) {
public static Engine.IndexResult executeIndexRequestOnReplica(IndexRequest request, IndexShard replica) throws IOException {
final ShardId shardId = replica.shardId();
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, shardId.getIndexName(), request.type(), request.id(), request.source())
.routing(request.routing()).parent(request.parent());
Expand Down Expand Up @@ -483,12 +484,12 @@ public static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest reque
return primary.index(operation);
}

public static Engine.DeleteResult executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard primary) {
public static Engine.DeleteResult executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard primary) throws IOException {
final Engine.Delete delete = primary.prepareDeleteOnPrimary(request.type(), request.id(), request.version(), request.versionType());
return primary.delete(delete);
}

public static Engine.DeleteResult executeDeleteRequestOnReplica(DeleteRequest request, IndexShard replica) {
public static Engine.DeleteResult executeDeleteRequestOnReplica(DeleteRequest request, IndexShard replica) throws IOException {
final Engine.Delete delete = replica.prepareDeleteOnReplica(request.type(), request.id(),
request.getSeqNo(), request.primaryTerm(), request.version(), request.versionType());
return replica.delete(delete);
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/org/elasticsearch/index/engine/Engine.java
Expand Up @@ -282,7 +282,7 @@ public Condition newCondition() {
*
* Note: engine level failures (i.e. persistent engine failures) are thrown
*/
public abstract IndexResult index(final Index index);
public abstract IndexResult index(final Index index) throws IOException;

/**
* Perform document delete operation on the engine
Expand All @@ -292,7 +292,7 @@ public Condition newCondition() {
*
* Note: engine level failures (i.e. persistent engine failures) are thrown
*/
public abstract DeleteResult delete(final Delete delete);
public abstract DeleteResult delete(final Delete delete) throws IOException;

public abstract NoOpResult noOp(final NoOp noOp);

Expand Down
180 changes: 78 additions & 102 deletions core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
Expand Up @@ -445,27 +445,18 @@ public GetResult get(Get get, Function<String, Searcher> searcherFactory) throws
}

/**
* Checks for version conflicts. If a version conflict exists, the optional return value represents the operation result. Otherwise, if
* no conflicts are found, the optional return value is not present.
* Checks for version conflicts. If a non-critical version conflict exists <code>true</code> is returned. In the case of a critical
* version conflict (if operation origin is primary) a {@link VersionConflictEngineException} is thrown.
*
* @param <T> the result type
* @param op the operation
* @param currentVersion the current version
* @param expectedVersion the expected version
* @param deleted {@code true} if the current version is not found or represents a delete
* @param onSuccess if there is a version conflict that can be ignored, the result of the operation
* @param onFailure if there is a version conflict that can not be ignored, the result of the operation
* @return if there is a version conflict, the optional value is present and represents the operation result, otherwise the return value
* is not present
* @return <code>true</code> iff a non-critical version conflict (origin recovery or replica) is found otherwise <code>false</code>
* @throws VersionConflictEngineException if a critical version conflict was found where the operation origin is primary
* @throws IllegalArgumentException if an unsupported version type is used.
*/
private <T extends Result> Optional<T> checkVersionConflict(
final Operation op,
final long currentVersion,
final long expectedVersion,
final boolean deleted,
final Supplier<T> onSuccess,
final Function<VersionConflictEngineException, T> onFailure) {
final T result;
private boolean checkVersionConflict(final Operation op, final long currentVersion, final long expectedVersion, final boolean deleted) {
if (op.versionType() == VersionType.FORCE) {
if (engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
// If index was created in 5.0 or later, 'force' is not allowed at all
Expand All @@ -479,23 +470,19 @@ private <T extends Result> Optional<T> checkVersionConflict(
if (op.versionType().isVersionConflictForWrites(currentVersion, expectedVersion, deleted)) {
if (op.origin() == Operation.Origin.PRIMARY) {
// fatal version conflict
final VersionConflictEngineException e =
new VersionConflictEngineException(
throw new VersionConflictEngineException(
shardId,
op.type(),
op.id(),
op.versionType().explainConflictForWrites(currentVersion, expectedVersion, deleted));
result = onFailure.apply(e);

} else {
/*
* Version conflicts during recovery and on replicas are normal due to asynchronous execution; as such, we should return a
* successful result.
*/
result = onSuccess.get();
/* Version conflicts during recovery and on replicas are normal due to asynchronous execution; as such, we should return a
* successful result.*/
return true;
}
return Optional.of(result);
} else {
return Optional.empty();
return false;
}
}

Expand All @@ -510,7 +497,7 @@ private long checkDeletedAndGCed(VersionValue versionValue) {
}

@Override
public IndexResult index(Index index) {
public IndexResult index(Index index) throws IOException {
IndexResult result;
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
Expand All @@ -522,52 +509,15 @@ public IndexResult index(Index index) {
result = innerIndex(index);
}
}
} catch (Exception e) {
result = new IndexResult(checkIfDocumentFailureOrThrow(index, e), index.version(), SequenceNumbersService.UNASSIGNED_SEQ_NO);
}
return result;
}

/**
* Inspects exception thrown when executing index or delete operations
*
* @return failure if the failure is a document specific failure (e.g. analysis chain failure)
* or throws Exception if the failure caused the engine to fail (e.g. out of disk, lucene tragic event)
* <p>
* Note: pkg-private for testing
*/
final Exception checkIfDocumentFailureOrThrow(final Operation operation, final Exception failure) {
boolean isDocumentFailure;
try {
// When indexing a document into Lucene, Lucene distinguishes between environment related errors
// (like out of disk space) and document specific errors (like analysis chain problems) by setting
// the IndexWriter.getTragicEvent() value for the former. maybeFailEngine checks for these kind of
// errors and returns true if that is the case. We use that to indicate a document level failure
// and set the error in operation.setFailure. In case of environment related errors, the failure
// is bubbled up
isDocumentFailure = maybeFailEngine(operation.operationType().getLowercase(), failure) == false;
if (failure instanceof AlreadyClosedException) {
// ensureOpen throws AlreadyClosedException which is not a document level issue
isDocumentFailure = false;
} catch (RuntimeException | IOException e) {
try {
maybeFailEngine("index", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
} catch (Exception inner) {
// we failed checking whether the failure can fail the engine, treat it as a persistent engine failure
isDocumentFailure = false;
failure.addSuppressed(inner);
}
if (isDocumentFailure) {
return failure;
} else {
// throw original exception in case the exception caused the engine to fail
rethrow(failure);
return null;
throw e;
}
}

// hack to rethrow original exception in case of engine level failures during index/delete operation
@SuppressWarnings("unchecked")
private static <T extends Throwable> void rethrow(Throwable t) throws T {
throw (T) t;
return result;
}

private boolean canOptimizeAddDocument(Index index) {
Expand Down Expand Up @@ -610,9 +560,9 @@ private boolean assertSequenceNumber(final Engine.Operation.Origin origin, final
}

private IndexResult innerIndex(Index index) throws IOException {
// TODO we gotta split this method up it's too big!
assert assertSequenceNumber(index.origin(), index.seqNo());
final Translog.Location location;
final long updatedVersion;
long seqNo = index.seqNo();
try (Releasable ignored = acquireLock(index.uid())) {
lastWriteNanos = index.startTime();
Expand Down Expand Up @@ -678,14 +628,14 @@ private IndexResult innerIndex(Index index) throws IOException {
}
}
final long expectedVersion = index.version();
final Optional<IndexResult> resultOnVersionConflict =
checkVersionConflict(
index,
currentVersion,
expectedVersion,
deleted,
() -> new IndexResult(currentVersion, index.seqNo(), false),
e -> new IndexResult(e, currentVersion, index.seqNo()));
Optional<IndexResult> resultOnVersionConflict;
try {
final boolean isVersionConflict = checkVersionConflict(index, currentVersion, expectedVersion, deleted);
resultOnVersionConflict = isVersionConflict ? Optional.of(new IndexResult(currentVersion, index.seqNo(), false))
: Optional.empty();
} catch (IllegalArgumentException | VersionConflictEngineException ex) {
resultOnVersionConflict = Optional.of(new IndexResult(ex, currentVersion, index.seqNo()));
}

final IndexResult indexResult;
if (resultOnVersionConflict.isPresent()) {
Expand All @@ -702,18 +652,38 @@ private IndexResult innerIndex(Index index) throws IOException {
* primary term here has already been set, see IndexShard#prepareIndex where the Engine$Index operation is created.
*/
index.parsedDoc().updateSeqID(seqNo, index.primaryTerm());
updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);
final long updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);
index.parsedDoc().version().setLongValue(updatedVersion);

if (currentVersion == Versions.NOT_FOUND && forceUpdateDocument == false) {
// document does not exists, we can optimize for create, but double check if assertions are running
assert assertDocDoesNotExist(index, canOptimizeAddDocument == false);
index(index.docs(), indexWriter);
} else {
update(index.uid(), index.docs(), indexWriter);
IndexResult innerIndexResult;
try {
if (currentVersion == Versions.NOT_FOUND && forceUpdateDocument == false) {
// document does not exists, we can optimize for create, but double check if assertions are running
assert assertDocDoesNotExist(index, canOptimizeAddDocument == false);
index(index.docs(), indexWriter);
} else {
update(index.uid(), index.docs(), indexWriter);
}
versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion));
innerIndexResult = new IndexResult(updatedVersion, seqNo, deleted);
} catch (Exception ex) {
if (indexWriter.getTragicException() == null) {
/* There is no tragic event recorded so this must be a document failure.
*
* The handling inside IW doesn't guarantee that an tragic / aborting exception
* will be used as THE tragicEventException since if there are multiple exceptions causing an abort in IW
* only one wins. Yet, only the one that wins will also close the IW and in turn fail the engine such that
* we can potentially handle the exception before the engine is failed.
* Bottom line is that we can only rely on the fact that if it's a document failure then
* `indexWriter.getTragicException()` will be null otherwise we have to rethrow and treat it as fatal or rather
* non-document failure
*/
innerIndexResult = new IndexResult(ex, currentVersion, index.seqNo());
} else {
throw ex;
}
}
indexResult = new IndexResult(updatedVersion, seqNo, deleted);
versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion));
assert innerIndexResult != null;
indexResult = innerIndexResult;
}
if (!indexResult.hasFailure()) {
location = index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY
Expand All @@ -729,7 +699,6 @@ private IndexResult innerIndex(Index index) throws IOException {
seqNoService().markSeqNoAsCompleted(seqNo);
}
}

}

private static void index(final List<ParseContext.Document> docs, final IndexWriter indexWriter) throws IOException {
Expand Down Expand Up @@ -769,14 +738,19 @@ private static void update(final Term uid, final List<ParseContext.Document> doc
}

@Override
public DeleteResult delete(Delete delete) {
public DeleteResult delete(Delete delete) throws IOException {
DeleteResult result;
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
// NOTE: we don't throttle this when merges fall behind because delete-by-id does not create new segments:
result = innerDelete(delete);
} catch (Exception e) {
result = new DeleteResult(checkIfDocumentFailureOrThrow(delete, e), delete.version(), delete.seqNo());
} catch (RuntimeException | IOException e) {
try {
maybeFailEngine("index", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw e;
}
maybePruneDeletedTombstones();
return result;
Expand Down Expand Up @@ -811,15 +785,14 @@ private DeleteResult innerDelete(Delete delete) throws IOException {
}

final long expectedVersion = delete.version();

final Optional<DeleteResult> resultOnVersionConflict =
checkVersionConflict(
delete,
currentVersion,
expectedVersion,
deleted,
() -> new DeleteResult(expectedVersion, delete.seqNo(), true),
e -> new DeleteResult(e, expectedVersion, delete.seqNo()));
Optional<DeleteResult> resultOnVersionConflict;
try {
final boolean isVersionConflict = checkVersionConflict(delete, currentVersion, expectedVersion, deleted);
resultOnVersionConflict = isVersionConflict ? Optional.of(new DeleteResult(expectedVersion, delete.seqNo(), true))
: Optional.empty();
} catch (IllegalArgumentException | VersionConflictEngineException ex) {
resultOnVersionConflict = Optional.of(new DeleteResult(ex, expectedVersion, delete.seqNo()));
}
final DeleteResult deleteResult;
if (resultOnVersionConflict.isPresent()) {
deleteResult = resultOnVersionConflict.get();
Expand Down Expand Up @@ -852,6 +825,7 @@ private DeleteResult innerDelete(Delete delete) throws IOException {
}

private boolean deleteIfFound(Term uid, long currentVersion, boolean deleted, VersionValue versionValue) throws IOException {
assert uid != null : "uid must not be null";
final boolean found;
if (currentVersion == Versions.NOT_FOUND) {
// doc does not exist and no prior deletes
Expand All @@ -861,6 +835,8 @@ private boolean deleteIfFound(Term uid, long currentVersion, boolean deleted, Ve
found = false;
} else {
// we deleted a currently existing document
// any exception that comes from this is a either an ACE or a fatal exception there can't be any document failures coming
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we assert for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is difficult since I need to throw a VirtualMachineError to trigger this and then all our test will go nuts I think we gotta trust this here

// from this.
indexWriter.deleteDocuments(uid);
found = true;
}
Expand Down