Skip to content
Permalink
Browse files

Always fail engine if delete operation fails (#40117)

Unlike index operations which can fail at the document level to
analyzing errors, delete operations should never fail at the document
level whether soft-deletes is enabled or not. With this change, we will
always fail the engine if we fail to apply a delete operation to Lucene.

Closes #33256
  • Loading branch information...
dnhatn committed Mar 19, 2019
1 parent a87b139 commit a520cc53dc2be904794299c9b00270d9ac9e80d9
@@ -1256,26 +1256,16 @@ public DeleteResult delete(Delete delete) throws IOException {
plan.versionOfDeletion, getPrimaryTerm(), delete.seqNo(), plan.currentlyDeleted == false);
}
}
if (delete.origin().isFromTranslog() == false) {
final Translog.Location location;
if (deleteResult.getResultType() == Result.Type.SUCCESS) {
location = translog.add(new Translog.Delete(delete, deleteResult));
} else if (deleteResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
// if we have document failure, record it as a no-op in the translog and Lucene with the generated seq_no
final NoOp noOp = new NoOp(deleteResult.getSeqNo(), delete.primaryTerm(), delete.origin(),
delete.startTime(), deleteResult.getFailure().toString());
location = innerNoOp(noOp).getTranslogLocation();
} else {
location = null;
}
if (delete.origin().isFromTranslog() == false && deleteResult.getResultType() == Result.Type.SUCCESS) {
final Translog.Location location = translog.add(new Translog.Delete(delete, deleteResult));
deleteResult.setTranslogLocation(location);
}
localCheckpointTracker.markSeqNoAsCompleted(deleteResult.getSeqNo());
deleteResult.setTook(System.nanoTime() - delete.startTime());
deleteResult.freeze();
} catch (RuntimeException | IOException e) {
try {
maybeFailEngine("index", e);
maybeFailEngine("delete", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
@@ -1395,12 +1385,9 @@ private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan) throws
plan.versionOfDeletion, getPrimaryTerm(), delete.seqNo(), plan.currentlyDeleted == false);
} catch (Exception ex) {
if (indexWriter.getTragicException() == null) {
// there is no tragic event and such it must be a document level failure
return new DeleteResult(
ex, plan.versionOfDeletion, delete.primaryTerm(), delete.seqNo(), plan.currentlyDeleted == false);
} else {
throw ex;
throw new AssertionError("delete operation should never fail at document level", ex);
}
throw ex;
}
}

@@ -3259,22 +3259,6 @@ public void testHandleDocumentFailure() throws Exception {
assertNotNull(indexResult.getTranslogLocation());
engine.index(indexForDoc(doc2));

// test failure while deleting
// all these simulated exceptions are not fatal to the IW so we treat them as document failures
final Engine.DeleteResult deleteResult;
if (randomBoolean()) {
throwingIndexWriter.get().setThrowFailure(() -> new IOException("simulated"));
deleteResult = engine.delete(new Engine.Delete("test", "1", newUid(doc1), primaryTerm.get()));
assertThat(deleteResult.getFailure(), instanceOf(IOException.class));
} else {
throwingIndexWriter.get().setThrowFailure(() -> new IllegalArgumentException("simulated max token length"));
deleteResult = engine.delete(new Engine.Delete("test", "1", newUid(doc1), primaryTerm.get()));
assertThat(deleteResult.getFailure(),
instanceOf(IllegalArgumentException.class));
}
assertThat(deleteResult.getVersion(), equalTo(2L));
assertThat(deleteResult.getSeqNo(), equalTo(3L));

// test non document level failure is thrown
if (randomBoolean()) {
// simulate close by corruption
@@ -3308,6 +3292,40 @@ public BytesRef binaryValue() {
}
}

public void testDeleteWithFatalError() throws Exception {
final IllegalStateException tragicException = new IllegalStateException("fail to store tombstone");
try (Store store = createStore()) {
EngineConfig.TombstoneDocSupplier tombstoneDocSupplier = new EngineConfig.TombstoneDocSupplier() {
@Override
public ParsedDocument newDeleteTombstoneDoc(String type, String id) {
ParsedDocument parsedDocument = tombstoneDocSupplier().newDeleteTombstoneDoc(type, id);
parsedDocument.rootDoc().add(new StoredField("foo", "bar") {
// this is a hack to add a failure during store document which triggers a tragic event
// and in turn fails the engine
@Override
public BytesRef binaryValue() {
throw tragicException;
}
});
return parsedDocument;
}

@Override
public ParsedDocument newNoopTombstoneDoc(String reason) {
return tombstoneDocSupplier().newNoopTombstoneDoc(reason);
}
};
try (InternalEngine engine = createEngine(null, null, null, config(this.engine.config(), store, tombstoneDocSupplier))) {
final ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), SOURCE, null);
engine.index(indexForDoc(doc));
expectThrows(IllegalStateException.class,
() -> engine.delete(new Engine.Delete("test", "1", newUid("1"), primaryTerm.get())));
assertTrue(engine.isClosed.get());
assertSame(tragicException, engine.failedEngine.get());
}
}
}

public void testDoubleDeliveryPrimary() throws IOException {
final ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(),
new BytesArray("{}".getBytes(Charset.defaultCharset())), null);
@@ -18,7 +18,6 @@
*/
package org.elasticsearch.index.replication;

import org.apache.lucene.document.Field;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.Term;
@@ -58,7 +57,6 @@
import org.hamcrest.Matcher;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -418,10 +416,8 @@ public void testReplicaOperationWithConcurrentPrimaryPromotion() throws Exceptio
*/
public void testDocumentFailureReplication() throws Exception {
final IOException indexException = new IOException("simulated indexing failure");
final IOException deleteException = new IOException("simulated deleting failure");
final EngineFactory engineFactory = config -> InternalEngineTests.createInternalEngine((dir, iwc) ->
new IndexWriter(dir, iwc) {
final AtomicBoolean throwAfterIndexedOneDoc = new AtomicBoolean(); // need one document to trigger delete in IW.
@Override
public long addDocument(Iterable<? extends IndexableField> doc) throws IOException {
boolean isTombstone = false;
@@ -430,20 +426,12 @@ public long addDocument(Iterable<? extends IndexableField> doc) throws IOExcepti
isTombstone = true;
}
}
if (isTombstone == false && throwAfterIndexedOneDoc.getAndSet(true)) {
throw indexException;
if (isTombstone) {
return super.addDocument(doc); // allow to add Noop
} else {
return super.addDocument(doc);
throw indexException;
}
}
@Override
public long deleteDocuments(Term... terms) throws IOException {
throw deleteException;
}
@Override
public long softUpdateDocument(Term term, Iterable<? extends IndexableField> doc, Field...fields) throws IOException {
throw deleteException; // a delete uses softUpdateDocument API if soft-deletes enabled
}
}, null, null, config);
try (ReplicationGroup shards = new ReplicationGroup(buildIndexMetaData(0)) {
@Override
@@ -454,20 +442,13 @@ public long softUpdateDocument(Term term, Iterable<? extends IndexableField> doc
long primaryTerm = shards.getPrimary().getPendingPrimaryTerm();
List<Translog.Operation> expectedTranslogOps = new ArrayList<>();
BulkItemResponse indexResp = shards.index(new IndexRequest(index.getName(), "type", "1").source("{}", XContentType.JSON));
assertThat(indexResp.isFailed(), equalTo(false));
expectedTranslogOps.add(new Translog.Index("type", "1", 0, primaryTerm, 1, "{}".getBytes(StandardCharsets.UTF_8), null, -1));
assertThat(indexResp.isFailed(), equalTo(true));
assertThat(indexResp.getFailure().getCause(), equalTo(indexException));
expectedTranslogOps.add(new Translog.NoOp(0, primaryTerm, indexException.toString()));
try (Translog.Snapshot snapshot = getTranslog(shards.getPrimary()).newSnapshot()) {
assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps));
}

indexResp = shards.index(new IndexRequest(index.getName(), "type", "any").source("{}", XContentType.JSON));
assertThat(indexResp.getFailure().getCause(), equalTo(indexException));
expectedTranslogOps.add(new Translog.NoOp(1, primaryTerm, indexException.toString()));

BulkItemResponse deleteResp = shards.delete(new DeleteRequest(index.getName(), "type", "1"));
assertThat(deleteResp.getFailure().getCause(), equalTo(deleteException));
expectedTranslogOps.add(new Translog.NoOp(2, primaryTerm, deleteException.toString()));
shards.assertAllEqual(1);
shards.assertAllEqual(0);

int nReplica = randomIntBetween(1, 3);
for (int i = 0; i < nReplica; i++) {
@@ -482,14 +463,10 @@ public long softUpdateDocument(Term term, Iterable<? extends IndexableField> doc
assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps));
}
}
// unlike previous failures, these two failures replicated directly from the replication channel.
// the failure replicated directly from the replication channel.
indexResp = shards.index(new IndexRequest(index.getName(), "type", "any").source("{}", XContentType.JSON));
assertThat(indexResp.getFailure().getCause(), equalTo(indexException));
expectedTranslogOps.add(new Translog.NoOp(3, primaryTerm, indexException.toString()));

deleteResp = shards.delete(new DeleteRequest(index.getName(), "type", "1"));
assertThat(deleteResp.getFailure().getCause(), equalTo(deleteException));
expectedTranslogOps.add(new Translog.NoOp(4, primaryTerm, deleteException.toString()));
expectedTranslogOps.add(new Translog.NoOp(1, primaryTerm, indexException.toString()));

for (IndexShard shard : shards) {
try (Translog.Snapshot snapshot = getTranslog(shard).newSnapshot()) {
@@ -499,7 +476,7 @@ public long softUpdateDocument(Term term, Iterable<? extends IndexableField> doc
assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps));
}
}
shards.assertAllEqual(1);
shards.assertAllEqual(0);
}
}

@@ -500,10 +500,10 @@ protected InternalEngine createEngine(EngineConfig config) throws IOException {
return createEngine(null, null, null, config);
}

private InternalEngine createEngine(@Nullable IndexWriterFactory indexWriterFactory,
@Nullable BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier,
@Nullable ToLongBiFunction<Engine, Engine.Operation> seqNoForOperation,
EngineConfig config) throws IOException {
protected InternalEngine createEngine(@Nullable IndexWriterFactory indexWriterFactory,
@Nullable BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier,
@Nullable ToLongBiFunction<Engine, Engine.Operation> seqNoForOperation,
EngineConfig config) throws IOException {
final Store store = config.getStore();
final Directory directory = store.directory();
if (Lucene.indexExists(directory) == false) {
@@ -697,6 +697,19 @@ public EngineConfig config(
tombstoneDocSupplier());
}

protected EngineConfig config(EngineConfig config, Store store, EngineConfig.TombstoneDocSupplier tombstoneDocSupplier) {
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test",
Settings.builder().put(config.getIndexSettings().getSettings())
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build());
return new EngineConfig(config.getShardId(), config.getAllocationId(), config.getThreadPool(),
indexSettings, config.getWarmer(), store, config.getMergePolicy(), config.getAnalyzer(), config.getSimilarity(),
new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(),
config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(),
config.getInternalRefreshListener(), config.getIndexSort(), config.getCircuitBreakerService(),
config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(),
config.getPrimaryTermSupplier(), tombstoneDocSupplier);
}

protected EngineConfig noOpConfig(IndexSettings indexSettings, Store store, Path translogPath) {
return noOpConfig(indexSettings, store, translogPath, null);
}

0 comments on commit a520cc5

Please sign in to comment.
You can’t perform that action at this time.