Skip to content

Commit

Permalink
Fail engine if hit document failure on replicas (#43523)
Browse files Browse the repository at this point in the history
An indexing on a replica should never fail after it was successfully
indexed on a primary. Hence, we should fail an engine if we hit any
failure (document level or tragic failure) when processing an indexing
on a replica.

Relates #43228
Closes #40435
  • Loading branch information
dnhatn committed Apr 15, 2020
1 parent 521ba9b commit 8d6d296
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -965,7 +965,11 @@ public IndexResult index(Index index) throws IOException {
}
} catch (RuntimeException | IOException e) {
try {
maybeFailEngine("index", e);
if (e instanceof AlreadyClosedException == false && treatDocumentFailureAsTragicError(index)) {
failEngine("index id[" + index.id() + "] origin[" + index.origin() + "] seq#[" + index.seqNo() + "]", e);
} else {
maybeFailEngine("index id[" + index.id() + "] origin[" + index.origin() + "] seq#[" + index.seqNo() + "]", e);
}
} catch (Exception inner) {
e.addSuppressed(inner);
}
Expand Down Expand Up @@ -1121,7 +1125,8 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan)
}
return new IndexResult(plan.versionForIndexing, index.primaryTerm(), plan.seqNoForIndexing, plan.currentNotFoundOrDeleted);
} catch (Exception ex) {
if (indexWriter.getTragicException() == null) {
if (ex instanceof AlreadyClosedException == false &&
indexWriter.getTragicException() == null && treatDocumentFailureAsTragicError(index) == false) {
/* There is no tragic event recorded so this must be a document failure.
*
* The handling inside IW doesn't guarantee that an tragic / aborting exception
Expand All @@ -1142,6 +1147,16 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan)
}
}

/**
* Whether we should treat any document failure as tragic error.
* If we hit any failure while processing an indexing on a replica, we should treat that error as tragic and fail the engine.
* However, we prefer to fail a request individually (instead of a shard) if we hit a document failure on the primary.
*/
private boolean treatDocumentFailureAsTragicError(Index index) {
// TODO: can we enable this all origins except primary on the leader?
return index.origin() == Operation.Origin.REPLICA;
}

/**
* returns true if the indexing operation may have already be processed by this engine.
* Note that it is OK to rarely return true even if this is not the case. However a `false`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5959,4 +5959,29 @@ public void afterRefresh(boolean didRefresh) {
}
}
}

public void testHandleDocumentFailureOnReplica() throws Exception {
AtomicReference<IOException> addDocException = new AtomicReference<>();
IndexWriterFactory indexWriterFactory = (dir, iwc) -> new IndexWriter(dir, iwc) {
@Override
public long addDocument(Iterable<? extends IndexableField> doc) throws IOException {
final IOException ex = addDocException.getAndSet(null);
if (ex != null) {
throw ex;
}
return super.addDocument(doc);
}
};
try (Store store = createStore();
InternalEngine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, indexWriterFactory)) {
final ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), SOURCE, null);
Engine.Index index = new Engine.Index(newUid(doc), doc, randomNonNegativeLong(), primaryTerm.get(),
randomNonNegativeLong(), VersionType.EXTERNAL, REPLICA, System.nanoTime(), -1, false,
UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM);
addDocException.set(new IOException("simulated"));
expectThrows(IOException.class, () -> engine.index(index));
assertTrue(engine.isClosed.get());
assertNotNull(engine.failedEngine.get());
}
}
}

0 comments on commit 8d6d296

Please sign in to comment.