From bd496225c1f9b4ead397423e3f25e34001ee0224 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 20 Jan 2017 15:31:16 +0100 Subject: [PATCH] Fix handling of document failure expcetion in InternalEngine Today we try to be smart and make a generic decision if an exception should be treated as a document failure but in some cases concurrency in the index writer make this decision very difficult since we don't have a consistent state in the case another thread is currently failing the IndexWriter/InternalEngine due to a tragic event. This change simplifies the exception handling and makes specific decisions about document failures rather than using a generic heuristic. This prevent exceptions to be treated as document failures that should have failed the engine but backed out of failing since since some other thread has already taken over the failure procedure but didn't finish yet. --- .../action/bulk/TransportShardBulkAction.java | 7 +- .../elasticsearch/index/engine/Engine.java | 4 +- .../index/engine/InternalEngine.java | 180 ++++++++---------- .../elasticsearch/index/shard/IndexShard.java | 12 +- .../shard/TranslogRecoveryPerformer.java | 6 +- .../index/engine/InternalEngineTests.java | 142 +++++++------- .../index/engine/ShadowEngineTests.java | 4 +- .../ESIndexLevelReplicationTestCase.java | 4 +- .../index/shard/RefreshListenersTests.java | 4 +- .../index/shard/IndexShardTestCase.java | 6 +- 10 files changed, 178 insertions(+), 191 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 2a9ee444941b9..12981664c2a66 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -63,6 +63,7 @@ import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; +import java.io.IOException; import java.util.Map; /** Performs shard-level bulk (index, delete or update) operations */ @@ -424,7 +425,7 @@ private Translog.Location locationToSync(Translog.Location current, Translog.Loc * Execute the given {@link IndexRequest} on a replica shard, throwing a * {@link RetryOnReplicaException} if the operation needs to be re-tried. */ - public static Engine.IndexResult executeIndexRequestOnReplica(IndexRequest request, IndexShard replica) { + public static Engine.IndexResult executeIndexRequestOnReplica(IndexRequest request, IndexShard replica) throws IOException { final ShardId shardId = replica.shardId(); SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, shardId.getIndexName(), request.type(), request.id(), request.source()) .routing(request.routing()).parent(request.parent()); @@ -483,12 +484,12 @@ public static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest reque return primary.index(operation); } - public static Engine.DeleteResult executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard primary) { + public static Engine.DeleteResult executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard primary) throws IOException { final Engine.Delete delete = primary.prepareDeleteOnPrimary(request.type(), request.id(), request.version(), request.versionType()); return primary.delete(delete); } - public static Engine.DeleteResult executeDeleteRequestOnReplica(DeleteRequest request, IndexShard replica) { + public static Engine.DeleteResult executeDeleteRequestOnReplica(DeleteRequest request, IndexShard replica) throws IOException { final Engine.Delete delete = replica.prepareDeleteOnReplica(request.type(), request.id(), request.getSeqNo(), request.primaryTerm(), request.version(), request.versionType()); return replica.delete(delete); diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 593ec90bead45..57bf65e1a6ed0 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -282,7 +282,7 @@ public Condition newCondition() { * * Note: engine level failures (i.e. persistent engine failures) are thrown */ - public abstract IndexResult index(final Index index); + public abstract IndexResult index(final Index index) throws IOException; /** * Perform document delete operation on the engine @@ -292,7 +292,7 @@ public Condition newCondition() { * * Note: engine level failures (i.e. persistent engine failures) are thrown */ - public abstract DeleteResult delete(final Delete delete); + public abstract DeleteResult delete(final Delete delete) throws IOException; public abstract NoOpResult noOp(final NoOp noOp); diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 37568979779ca..fb041603a38de 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -445,27 +445,18 @@ public GetResult get(Get get, Function searcherFactory) throws } /** - * Checks for version conflicts. If a version conflict exists, the optional return value represents the operation result. Otherwise, if - * no conflicts are found, the optional return value is not present. + * Checks for version conflicts. If a non-critical version conflict exists true is returned. In the case of a critical + * version conflict (if operation origin is primary) a {@link VersionConflictEngineException} is thrown. * - * @param the result type * @param op the operation * @param currentVersion the current version * @param expectedVersion the expected version * @param deleted {@code true} if the current version is not found or represents a delete - * @param onSuccess if there is a version conflict that can be ignored, the result of the operation - * @param onFailure if there is a version conflict that can not be ignored, the result of the operation - * @return if there is a version conflict, the optional value is present and represents the operation result, otherwise the return value - * is not present + * @return true iff a non-critical version conflict (origin recovery or replica) is found otherwise false + * @throws VersionConflictEngineException if a critical version conflict was found where the operation origin is primary + * @throws IllegalArgumentException if an unsupported version type is used. */ - private Optional checkVersionConflict( - final Operation op, - final long currentVersion, - final long expectedVersion, - final boolean deleted, - final Supplier onSuccess, - final Function onFailure) { - final T result; + private boolean checkVersionConflict(final Operation op, final long currentVersion, final long expectedVersion, final boolean deleted) { if (op.versionType() == VersionType.FORCE) { if (engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { // If index was created in 5.0 or later, 'force' is not allowed at all @@ -479,23 +470,19 @@ private Optional checkVersionConflict( if (op.versionType().isVersionConflictForWrites(currentVersion, expectedVersion, deleted)) { if (op.origin() == Operation.Origin.PRIMARY) { // fatal version conflict - final VersionConflictEngineException e = - new VersionConflictEngineException( + throw new VersionConflictEngineException( shardId, op.type(), op.id(), op.versionType().explainConflictForWrites(currentVersion, expectedVersion, deleted)); - result = onFailure.apply(e); + } else { - /* - * Version conflicts during recovery and on replicas are normal due to asynchronous execution; as such, we should return a - * successful result. - */ - result = onSuccess.get(); + /* Version conflicts during recovery and on replicas are normal due to asynchronous execution; as such, we should return a + * successful result.*/ + return true; } - return Optional.of(result); } else { - return Optional.empty(); + return false; } } @@ -510,7 +497,7 @@ private long checkDeletedAndGCed(VersionValue versionValue) { } @Override - public IndexResult index(Index index) { + public IndexResult index(Index index) throws IOException { IndexResult result; try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); @@ -522,52 +509,15 @@ public IndexResult index(Index index) { result = innerIndex(index); } } - } catch (Exception e) { - result = new IndexResult(checkIfDocumentFailureOrThrow(index, e), index.version(), SequenceNumbersService.UNASSIGNED_SEQ_NO); - } - return result; - } - - /** - * Inspects exception thrown when executing index or delete operations - * - * @return failure if the failure is a document specific failure (e.g. analysis chain failure) - * or throws Exception if the failure caused the engine to fail (e.g. out of disk, lucene tragic event) - *

- * Note: pkg-private for testing - */ - final Exception checkIfDocumentFailureOrThrow(final Operation operation, final Exception failure) { - boolean isDocumentFailure; - try { - // When indexing a document into Lucene, Lucene distinguishes between environment related errors - // (like out of disk space) and document specific errors (like analysis chain problems) by setting - // the IndexWriter.getTragicEvent() value for the former. maybeFailEngine checks for these kind of - // errors and returns true if that is the case. We use that to indicate a document level failure - // and set the error in operation.setFailure. In case of environment related errors, the failure - // is bubbled up - isDocumentFailure = maybeFailEngine(operation.operationType().getLowercase(), failure) == false; - if (failure instanceof AlreadyClosedException) { - // ensureOpen throws AlreadyClosedException which is not a document level issue - isDocumentFailure = false; + } catch (RuntimeException | IOException e) { + try { + maybeFailEngine("index", e); + } catch (Exception inner) { + e.addSuppressed(inner); } - } catch (Exception inner) { - // we failed checking whether the failure can fail the engine, treat it as a persistent engine failure - isDocumentFailure = false; - failure.addSuppressed(inner); - } - if (isDocumentFailure) { - return failure; - } else { - // throw original exception in case the exception caused the engine to fail - rethrow(failure); - return null; + throw e; } - } - - // hack to rethrow original exception in case of engine level failures during index/delete operation - @SuppressWarnings("unchecked") - private static void rethrow(Throwable t) throws T { - throw (T) t; + return result; } private boolean canOptimizeAddDocument(Index index) { @@ -610,9 +560,9 @@ private boolean assertSequenceNumber(final Engine.Operation.Origin origin, final } private IndexResult innerIndex(Index index) throws IOException { + // TODO we gotta split this method up it's too big! assert assertSequenceNumber(index.origin(), index.seqNo()); final Translog.Location location; - final long updatedVersion; long seqNo = index.seqNo(); try (Releasable ignored = acquireLock(index.uid())) { lastWriteNanos = index.startTime(); @@ -678,14 +628,14 @@ private IndexResult innerIndex(Index index) throws IOException { } } final long expectedVersion = index.version(); - final Optional resultOnVersionConflict = - checkVersionConflict( - index, - currentVersion, - expectedVersion, - deleted, - () -> new IndexResult(currentVersion, index.seqNo(), false), - e -> new IndexResult(e, currentVersion, index.seqNo())); + Optional resultOnVersionConflict; + try { + final boolean isVersionConflict = checkVersionConflict(index, currentVersion, expectedVersion, deleted); + resultOnVersionConflict = isVersionConflict ? Optional.of(new IndexResult(currentVersion, index.seqNo(), false)) + : Optional.empty(); + } catch (IllegalArgumentException | VersionConflictEngineException ex) { + resultOnVersionConflict = Optional.of(new IndexResult(ex, currentVersion, index.seqNo())); + } final IndexResult indexResult; if (resultOnVersionConflict.isPresent()) { @@ -702,18 +652,38 @@ private IndexResult innerIndex(Index index) throws IOException { * primary term here has already been set, see IndexShard#prepareIndex where the Engine$Index operation is created. */ index.parsedDoc().updateSeqID(seqNo, index.primaryTerm()); - updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion); + final long updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion); index.parsedDoc().version().setLongValue(updatedVersion); - - if (currentVersion == Versions.NOT_FOUND && forceUpdateDocument == false) { - // document does not exists, we can optimize for create, but double check if assertions are running - assert assertDocDoesNotExist(index, canOptimizeAddDocument == false); - index(index.docs(), indexWriter); - } else { - update(index.uid(), index.docs(), indexWriter); + IndexResult innerIndexResult; + try { + if (currentVersion == Versions.NOT_FOUND && forceUpdateDocument == false) { + // document does not exists, we can optimize for create, but double check if assertions are running + assert assertDocDoesNotExist(index, canOptimizeAddDocument == false); + index(index.docs(), indexWriter); + } else { + update(index.uid(), index.docs(), indexWriter); + } + versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion)); + innerIndexResult = new IndexResult(updatedVersion, seqNo, deleted); + } catch (Exception ex) { + if (indexWriter.getTragicException() == null) { + /* There is no tragic event recorded so this must be a document failure. + * + * The handling inside IW doesn't guarantee that an tragic / aborting exception + * will be used as THE tragicEventException since if there are multiple exceptions causing an abort in IW + * only one wins. Yet, only the one that wins will also close the IW and in turn fail the engine such that + * we can potentially handle the exception before the engine is failed. + * Bottom line is that we can only rely on the fact that if it's a document failure then + * `indexWriter.getTragicException()` will be null otherwise we have to rethrow and treat it as fatal or rather + * non-document failure + */ + innerIndexResult = new IndexResult(ex, currentVersion, index.seqNo()); + } else { + throw ex; + } } - indexResult = new IndexResult(updatedVersion, seqNo, deleted); - versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion)); + assert innerIndexResult != null; + indexResult = innerIndexResult; } if (!indexResult.hasFailure()) { location = index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY @@ -729,7 +699,6 @@ private IndexResult innerIndex(Index index) throws IOException { seqNoService().markSeqNoAsCompleted(seqNo); } } - } private static void index(final List docs, final IndexWriter indexWriter) throws IOException { @@ -769,14 +738,19 @@ private static void update(final Term uid, final List doc } @Override - public DeleteResult delete(Delete delete) { + public DeleteResult delete(Delete delete) throws IOException { DeleteResult result; try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); // NOTE: we don't throttle this when merges fall behind because delete-by-id does not create new segments: result = innerDelete(delete); - } catch (Exception e) { - result = new DeleteResult(checkIfDocumentFailureOrThrow(delete, e), delete.version(), delete.seqNo()); + } catch (RuntimeException | IOException e) { + try { + maybeFailEngine("index", e); + } catch (Exception inner) { + e.addSuppressed(inner); + } + throw e; } maybePruneDeletedTombstones(); return result; @@ -811,15 +785,14 @@ private DeleteResult innerDelete(Delete delete) throws IOException { } final long expectedVersion = delete.version(); - - final Optional resultOnVersionConflict = - checkVersionConflict( - delete, - currentVersion, - expectedVersion, - deleted, - () -> new DeleteResult(expectedVersion, delete.seqNo(), true), - e -> new DeleteResult(e, expectedVersion, delete.seqNo())); + Optional resultOnVersionConflict; + try { + final boolean isVersionConflict = checkVersionConflict(delete, currentVersion, expectedVersion, deleted); + resultOnVersionConflict = isVersionConflict ? Optional.of(new DeleteResult(expectedVersion, delete.seqNo(), true)) + : Optional.empty(); + } catch (IllegalArgumentException | VersionConflictEngineException ex) { + resultOnVersionConflict = Optional.of(new DeleteResult(ex, expectedVersion, delete.seqNo())); + } final DeleteResult deleteResult; if (resultOnVersionConflict.isPresent()) { deleteResult = resultOnVersionConflict.get(); @@ -852,6 +825,7 @@ private DeleteResult innerDelete(Delete delete) throws IOException { } private boolean deleteIfFound(Term uid, long currentVersion, boolean deleted, VersionValue versionValue) throws IOException { + assert uid != null : "uid must not be null"; final boolean found; if (currentVersion == Versions.NOT_FOUND) { // doc does not exist and no prior deletes @@ -861,6 +835,8 @@ private boolean deleteIfFound(Term uid, long currentVersion, boolean deleted, Ve found = false; } else { // we deleted a currently existing document + // any exception that comes from this is a either an ACE or a fatal exception there can't be any document failures coming + // from this. indexWriter.deleteDocuments(uid); found = true; } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index cb486de610b6e..a7094c240f4b6 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -543,13 +543,13 @@ static Engine.Index prepareIndex(DocumentMapperForType docMapper, SourceToParse return new Engine.Index(uid, doc, seqNo, primaryTerm, version, versionType, origin, startTime, autoGeneratedIdTimestamp, isRetry); } - public Engine.IndexResult index(Engine.Index index) { + public Engine.IndexResult index(Engine.Index index) throws IOException { ensureWriteAllowed(index); Engine engine = getEngine(); return index(engine, index); } - private Engine.IndexResult index(Engine engine, Engine.Index index) { + private Engine.IndexResult index(Engine engine, Engine.Index index) throws IOException { active.set(true); final Engine.IndexResult result; index = indexingOperationListeners.preIndex(shardId, index); @@ -592,13 +592,13 @@ static Engine.Delete prepareDelete(String type, String id, Term uid, long seqNo, return new Engine.Delete(type, id, uid, seqNo, primaryTerm, version, versionType, origin, startTime); } - public Engine.DeleteResult delete(Engine.Delete delete) { + public Engine.DeleteResult delete(Engine.Delete delete) throws IOException { ensureWriteAllowed(delete); Engine engine = getEngine(); return delete(engine, delete); } - private Engine.DeleteResult delete(Engine engine, Engine.Delete delete) { + private Engine.DeleteResult delete(Engine engine, Engine.Delete delete) throws IOException { active.set(true); final Engine.DeleteResult result; delete = indexingOperationListeners.preDelete(shardId, delete); @@ -1922,12 +1922,12 @@ public int recoveryFromSnapshot(Engine engine, Translog.Snapshot snapshot) throw } @Override - protected void index(Engine engine, Engine.Index engineIndex) { + protected void index(Engine engine, Engine.Index engineIndex) throws IOException { IndexShard.this.index(engine, engineIndex); } @Override - protected void delete(Engine engine, Engine.Delete engineDelete) { + protected void delete(Engine engine, Engine.Delete engineDelete) throws IOException { IndexShard.this.delete(engine, engineDelete); } } diff --git a/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java b/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java index b5c23d9dc467d..6f4ae66a12a03 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java +++ b/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java @@ -149,7 +149,7 @@ private void maybeAddMappingUpdate(String type, Mapping update, String docId, bo * cause a {@link MapperException} to be thrown if an update * is encountered. */ - private void performRecoveryOperation(Engine engine, Translog.Operation operation, boolean allowMappingUpdates, Engine.Operation.Origin origin) { + private void performRecoveryOperation(Engine engine, Translog.Operation operation, boolean allowMappingUpdates, Engine.Operation.Origin origin) throws IOException { try { switch (operation.opType()) { @@ -207,11 +207,11 @@ private void performRecoveryOperation(Engine engine, Translog.Operation operatio operationProcessed(); } - protected void index(Engine engine, Engine.Index engineIndex) { + protected void index(Engine engine, Engine.Index engineIndex) throws IOException { engine.index(engineIndex); } - protected void delete(Engine engine, Engine.Delete engineDelete) { + protected void delete(Engine engine, Engine.Delete engineDelete) throws IOException { engine.delete(engineDelete); } diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index ef05d8f27cac9..97638b53e372b 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -32,6 +32,7 @@ import org.apache.lucene.document.Field; import org.apache.lucene.document.LongPoint; import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.document.StoredField; import org.apache.lucene.document.TextField; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.DirectoryReader; @@ -63,6 +64,7 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.MockDirectoryWrapper; import org.apache.lucene.util.Bits; +import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.FixedBitSet; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.TestUtil; @@ -125,6 +127,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.IndexSettingsModule; import org.elasticsearch.test.OldIndexUtils; +import org.elasticsearch.test.rest.yaml.section.Assertion; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.hamcrest.MatcherAssert; @@ -133,6 +136,8 @@ import java.io.IOException; import java.io.InputStream; +import java.io.Reader; +import java.io.UncheckedIOException; import java.nio.charset.Charset; import java.nio.file.DirectoryStream; import java.nio.file.Files; @@ -1144,7 +1149,7 @@ public void testSyncedFlushVanishesOnReplay() throws IOException { assertNull("Sync ID must be gone since we have a document to replay", engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID)); } - public void testVersioningNewCreate() { + public void testVersioningNewCreate() throws IOException { ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); Engine.Index create = new Engine.Index(newUid(doc), doc, Versions.MATCH_DELETED); Engine.IndexResult indexResult = engine.index(create); @@ -1155,7 +1160,7 @@ public void testVersioningNewCreate() { assertThat(indexResult.getVersion(), equalTo(1L)); } - public void testVersioningNewIndex() { + public void testVersioningNewIndex() throws IOException { ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); Engine.Index index = indexForDoc(doc); Engine.IndexResult indexResult = engine.index(index); @@ -1166,7 +1171,7 @@ public void testVersioningNewIndex() { assertThat(indexResult.getVersion(), equalTo(1L)); } - public void testExternalVersioningNewIndex() { + public void testExternalVersioningNewIndex() throws IOException { ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); Engine.Index index = new Engine.Index(newUid(doc), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, 12, VersionType.EXTERNAL, PRIMARY, 0, -1, false); Engine.IndexResult indexResult = engine.index(index); @@ -1177,7 +1182,7 @@ public void testExternalVersioningNewIndex() { assertThat(indexResult.getVersion(), equalTo(12L)); } - public void testVersioningIndexConflict() { + public void testVersioningIndexConflict() throws IOException { ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); Engine.Index index = indexForDoc(doc); Engine.IndexResult indexResult = engine.index(index); @@ -1199,7 +1204,7 @@ public void testVersioningIndexConflict() { assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); } - public void testExternalVersioningIndexConflict() { + public void testExternalVersioningIndexConflict() throws IOException { ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); Engine.Index index = new Engine.Index(newUid(doc), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, 12, VersionType.EXTERNAL, PRIMARY, 0, -1, false); Engine.IndexResult indexResult = engine.index(index); @@ -1242,7 +1247,7 @@ public void testForceVersioningNotAllowedExceptForOlderIndices() throws Exceptio } } - public void testVersioningIndexConflictWithFlush() { + public void testVersioningIndexConflictWithFlush() throws IOException { ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); Engine.Index index = indexForDoc(doc); Engine.IndexResult indexResult = engine.index(index); @@ -1266,7 +1271,7 @@ public void testVersioningIndexConflictWithFlush() { assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); } - public void testExternalVersioningIndexConflictWithFlush() { + public void testExternalVersioningIndexConflictWithFlush() throws IOException { ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); Engine.Index index = new Engine.Index(newUid(doc), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, 12, VersionType.EXTERNAL, PRIMARY, 0, -1, false); Engine.IndexResult indexResult = engine.index(index); @@ -1361,6 +1366,8 @@ public void run() { } } catch (AlreadyClosedException ex) { // fine + } catch (IOException e) { + throw new AssertionError(e); } } }; @@ -1379,7 +1386,7 @@ public void run() { } - public void testVersioningDeleteConflict() { + public void testVersioningDeleteConflict() throws IOException { ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); Engine.Index index = indexForDoc(doc); Engine.IndexResult indexResult = engine.index(index); @@ -1412,7 +1419,7 @@ public void testVersioningDeleteConflict() { assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); } - public void testVersioningDeleteConflictWithFlush() { + public void testVersioningDeleteConflictWithFlush() throws IOException { ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); Engine.Index index = indexForDoc(doc); Engine.IndexResult indexResult = engine.index(index); @@ -1451,7 +1458,7 @@ public void testVersioningDeleteConflictWithFlush() { assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); } - public void testVersioningCreateExistsException() { + public void testVersioningCreateExistsException() throws IOException { ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); Engine.Index create = new Engine.Index(newUid(doc), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0, -1, false); Engine.IndexResult indexResult = engine.index(create); @@ -1463,7 +1470,7 @@ public void testVersioningCreateExistsException() { assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); } - public void testVersioningCreateExistsExceptionWithFlush() { + public void testVersioningCreateExistsExceptionWithFlush() throws IOException { ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); Engine.Index create = new Engine.Index(newUid(doc), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0, -1, false); Engine.IndexResult indexResult = engine.index(create); @@ -1477,7 +1484,7 @@ public void testVersioningCreateExistsExceptionWithFlush() { assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); } - public void testVersioningReplicaConflict1() { + public void testVersioningReplicaConflict1() throws IOException { final ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); final Engine.Index v1Index = indexForDoc(doc); final Engine.IndexResult v1Result = engine.index(v1Index); @@ -1526,7 +1533,7 @@ public void testVersioningReplicaConflict1() { assertThat(replicaV2ReplayResult.getVersion(), equalTo(2L)); } - public void testVersioningReplicaConflict2() { + public void testVersioningReplicaConflict2() throws IOException { final ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); final Engine.Index v1Index = indexForDoc(doc); final Engine.IndexResult v1Result = engine.index(v1Index); @@ -1595,7 +1602,7 @@ public void testVersioningReplicaConflict2() { assertThat(replicaV2Result.getVersion(), equalTo(3L)); } - public void testBasicCreatedFlag() { + public void testBasicCreatedFlag() throws IOException { ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); Engine.Index index = indexForDoc(doc); Engine.IndexResult indexResult = engine.index(index); @@ -1612,7 +1619,7 @@ public void testBasicCreatedFlag() { assertTrue(indexResult.isCreated()); } - public void testCreatedFlagAfterFlush() { + public void testCreatedFlagAfterFlush() throws IOException { ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); Engine.Index index = indexForDoc(doc); Engine.IndexResult indexResult = engine.index(index); @@ -1654,7 +1661,7 @@ public void append(LogEvent event) { // #5891: make sure IndexWriter's infoStream output is // sent to lucene.iw with log level TRACE: - public void testIndexWriterInfoStream() throws IllegalAccessException { + public void testIndexWriterInfoStream() throws IllegalAccessException, IOException { assumeFalse("who tests the tester?", VERBOSE); MockAppender mockAppender = new MockAppender("testIndexWriterInfoStream"); mockAppender.start(); @@ -1915,7 +1922,7 @@ private static FixedBitSet getSeqNosSet(final IndexReader reader, final long hig } // #8603: make sure we can separately log IFD's messages - public void testIndexWriterIFDInfoStream() throws IllegalAccessException { + public void testIndexWriterIFDInfoStream() throws IllegalAccessException, IOException { assumeFalse("who tests the tester?", VERBOSE); MockAppender mockAppender = new MockAppender("testIndexWriterIFDInfoStream"); mockAppender.start(); @@ -2532,20 +2539,8 @@ public void testCurrentTranslogIDisCommitted() throws IOException { } } - public void testCheckDocumentFailure() throws Exception { - ParsedDocument doc = testParsedDocument("1", "test", null, testDocumentWithTextField(), B_1, null); - Exception documentFailure = engine.checkIfDocumentFailureOrThrow(indexForDoc(doc), new IOException("simulated document failure")); - assertThat(documentFailure, instanceOf(IOException.class)); - try { - engine.checkIfDocumentFailureOrThrow(indexForDoc(doc), new CorruptIndexException("simulated environment failure", "")); - fail("expected exception to be thrown"); - } catch (Exception envirnomentException) { - assertThat(envirnomentException.getMessage(), containsString("simulated environment failure")); - } - } - private static class ThrowingIndexWriter extends IndexWriter { - private AtomicReference failureToThrow = new AtomicReference<>(); + private AtomicReference> failureToThrow = new AtomicReference<>(); public ThrowingIndexWriter(Directory d, IndexWriterConfig conf) throws IOException { super(d, conf); @@ -2558,13 +2553,15 @@ public long addDocument(Iterable doc) throws IOExcepti } private void maybeThrowFailure() throws IOException { - Exception failure = failureToThrow.get(); - if (failure instanceof RuntimeException) { - throw (RuntimeException)failure; - } else if (failure instanceof IOException) { - throw (IOException)failure; - } else { - assert failure == null : "unsupported failure class: " + failure.getClass().getCanonicalName(); + if (failureToThrow.get() != null) { + Exception failure = failureToThrow.get().get(); + if (failure instanceof RuntimeException) { + throw (RuntimeException) failure; + } else if (failure instanceof IOException) { + throw (IOException) failure; + } else { + assert false: "unsupported failure class: " + failure.getClass().getCanonicalName(); + } } } @@ -2574,14 +2571,8 @@ public long deleteDocuments(Term... terms) throws IOException { return super.deleteDocuments(terms); } - public void setThrowFailure(IOException documentFailure) { - Objects.requireNonNull(documentFailure); - failureToThrow.set(documentFailure); - } - - public void setThrowFailure(RuntimeException runtimeFailure) { - Objects.requireNonNull(runtimeFailure); - failureToThrow.set(runtimeFailure); + public void setThrowFailure(Supplier failureSupplier) { + failureToThrow.set(failureSupplier); } public void clearFailure() { @@ -2594,13 +2585,14 @@ public void testHandleDocumentFailure() throws Exception { final ParsedDocument doc1 = testParsedDocument("1", "test", null, testDocumentWithTextField(), B_1, null); final ParsedDocument doc2 = testParsedDocument("2", "test", null, testDocumentWithTextField(), B_1, null); final ParsedDocument doc3 = testParsedDocument("3", "test", null, testDocumentWithTextField(), B_1, null); + ThrowingIndexWriter throwingIndexWriter = new ThrowingIndexWriter(store.directory(), new IndexWriterConfig()); try (Engine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, () -> throwingIndexWriter)) { // test document failure while indexing if (randomBoolean()) { - throwingIndexWriter.setThrowFailure(new IOException("simulated")); + throwingIndexWriter.setThrowFailure(() -> new IOException("simulated")); } else { - throwingIndexWriter.setThrowFailure(new IllegalArgumentException("simulated max token length")); + throwingIndexWriter.setThrowFailure(() -> new IllegalArgumentException("simulated max token length")); } Engine.IndexResult indexResult = engine.index(indexForDoc(doc1)); assertNotNull(indexResult.getFailure()); @@ -2610,29 +2602,33 @@ public void testHandleDocumentFailure() throws Exception { assertNull(indexResult.getFailure()); engine.index(indexForDoc(doc2)); - // test document failure while deleting + // test failure while deleting + // all these simulated exceptions are not fatal to the IW so we treat them as document failures if (randomBoolean()) { - throwingIndexWriter.setThrowFailure(new IOException("simulated")); + throwingIndexWriter.setThrowFailure(() -> new IOException("simulated")); + expectThrows(IOException.class, () -> engine.delete(new Engine.Delete("test", "1", newUid(doc1)))); } else { - throwingIndexWriter.setThrowFailure(new IllegalArgumentException("simulated max token length")); + throwingIndexWriter.setThrowFailure(() -> new IllegalArgumentException("simulated max token length")); + expectThrows(IllegalArgumentException.class, () -> engine.delete(new Engine.Delete("test", "1", newUid(doc1)))); } - Engine.DeleteResult deleteResult = engine.delete(new Engine.Delete("test", "1", newUid(doc1))); - assertNotNull(deleteResult.getFailure()); // test non document level failure is thrown if (randomBoolean()) { // simulate close by corruption - throwingIndexWriter.setThrowFailure(new CorruptIndexException("simulated", "hello")); - try { - if (randomBoolean()) { - engine.index(indexForDoc(doc3)); - } else { - engine.delete(new Engine.Delete("test", "2", newUid(doc2))); - } - fail("corruption should throw exceptions"); - } catch (Exception e) { - assertThat(e, instanceOf(CorruptIndexException.class)); - } + throwingIndexWriter.setThrowFailure(null); + UncheckedIOException uncheckedIOException = expectThrows(UncheckedIOException.class, () -> { + Engine.Index index = indexForDoc(doc3); + index.parsedDoc().rootDoc().add(new StoredField("foo", "bar") { + // this is a hack to add a failure during store document which triggers a tragic event + // and in turn fails the engine + @Override + public BytesRef binaryValue() { + throw new UncheckedIOException(new MockDirectoryWrapper.FakeIOException()); + } + }); + engine.index(index); + }); + assertTrue(uncheckedIOException.getCause() instanceof MockDirectoryWrapper.FakeIOException); } else { // normal close engine.close(); @@ -2807,7 +2803,11 @@ public void testRetryConcurrently() throws InterruptedException, IOException { } int docOffset; while ((docOffset = offset.incrementAndGet()) < docs.size()) { - engine.index(docs.get(docOffset)); + try { + engine.index(docs.get(docOffset)); + } catch (IOException e) { + throw new AssertionError(e); + } } }); thread[i].start(); @@ -2867,7 +2867,11 @@ public void run() { } int docOffset; while ((docOffset = offset.incrementAndGet()) < docs.size()) { - engine.index(docs.get(docOffset)); + try { + engine.index(docs.get(docOffset)); + } catch (IOException e) { + throw new AssertionError(e); + } } } }; @@ -3055,7 +3059,13 @@ public long generateSeqNo() { final ParsedDocument doc = testParsedDocument(id, "test", null, testDocumentWithTextField(), SOURCE, null); skip.set(randomBoolean()); - final Thread thread = new Thread(() -> finalInitialEngine.index(indexForDoc(doc))); + final Thread thread = new Thread(() -> { + try { + finalInitialEngine.index(indexForDoc(doc)); + } catch (IOException e) { + throw new AssertionError(e); + } + }); thread.start(); if (skip.get()) { threads.add(thread); diff --git a/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java index cc92d9bd9c22c..219af4af4daa9 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java @@ -267,7 +267,7 @@ private Engine.Index indexForDoc(ParsedDocument doc) { protected static final BytesReference B_2 = new BytesArray(new byte[]{2}); protected static final BytesReference B_3 = new BytesArray(new byte[]{3}); - public void testCommitStats() { + public void testCommitStats() throws IOException { // create a doc and refresh ParsedDocument doc = testParsedDocument("1", "test", null, testDocumentWithTextField(), B_1, null); primaryEngine.index(indexForDoc(doc)); @@ -846,7 +846,7 @@ public void testSearchResultRelease() throws Exception { searchResult.close(); } - public void testFailEngineOnCorruption() { + public void testFailEngineOnCorruption() throws IOException { ParsedDocument doc = testParsedDocument("1", "test", null, testDocumentWithTextField(), B_1, null); primaryEngine.index(indexForDoc(doc)); primaryEngine.flush(); diff --git a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 3216d30bdaa8f..042eb85cf36ec 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -329,7 +329,7 @@ protected Set getInSyncAllocationIds(ShardId shardId, ClusterState clust protected abstract PrimaryResult performOnPrimary(IndexShard primary, Request request) throws Exception; - protected abstract void performOnReplica(ReplicaRequest request, IndexShard replica); + protected abstract void performOnReplica(ReplicaRequest request, IndexShard replica) throws IOException; class PrimaryRef implements ReplicationOperation.Primary { @@ -449,7 +449,7 @@ protected PrimaryResult performOnPrimary(IndexShard primary, IndexRequest reques } @Override - protected void performOnReplica(IndexRequest request, IndexShard replica) { + protected void performOnReplica(IndexRequest request, IndexShard replica) throws IOException { final Engine.IndexResult result = executeIndexRequestOnReplica(request, replica); TransportWriteActionTestHelper.performPostWriteActions(replica, request, result.getTranslogLocation(), logger); } diff --git a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index e95d7ace10b13..d5c10dddc3653 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -320,11 +320,11 @@ public void testLotsOfThreads() throws Exception { refresher.cancel(); } - private Engine.IndexResult index(String id) { + private Engine.IndexResult index(String id) throws IOException { return index(id, "test"); } - private Engine.IndexResult index(String id, String testFieldValue) { + private Engine.IndexResult index(String id, String testFieldValue) throws IOException { String type = "test"; String uid = type + ":" + id; Document document = new Document(); diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 605b9026c2636..650e0794efa7b 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -449,11 +449,11 @@ protected void assertDocs(IndexShard shard, Uid... uids) throws IOException { } - protected Engine.Index indexDoc(IndexShard shard, String type, String id) { + protected Engine.Index indexDoc(IndexShard shard, String type, String id) throws IOException { return indexDoc(shard, type, id, "{}"); } - protected Engine.Index indexDoc(IndexShard shard, String type, String id, String source) { + protected Engine.Index indexDoc(IndexShard shard, String type, String id, String source) throws IOException { final Engine.Index index; if (shard.routingEntry().primary()) { index = shard.prepareIndexOnPrimary( @@ -471,7 +471,7 @@ protected Engine.Index indexDoc(IndexShard shard, String type, String id, String return index; } - protected Engine.Delete deleteDoc(IndexShard shard, String type, String id) { + protected Engine.Delete deleteDoc(IndexShard shard, String type, String id) throws IOException { final Engine.Delete delete; if (shard.routingEntry().primary()) { delete = shard.prepareDeleteOnPrimary(type, id, Versions.MATCH_ANY, VersionType.INTERNAL);