Skip to content

Commit

Permalink
core: disable auto gen id optimization
Browse files Browse the repository at this point in the history
This pr removes the optimization for auto generated ids.
Previously, when ids were auto generated by elasticsearch then there was no
check to see if a document with same id already existed and instead the new
document was only appended. However, due to lucene improvements this
optimization does not add much value. In addition, under rare circumstances it might
cause duplicate documents:

When an indexing request is retried (due to connect lost, node closed etc),
then a flag 'canHaveDuplicates' is set to true for the indexing request
that is send a second time. This was to make sure that even
when an indexing request for a document with autogenerated id comes in
we do not have to update unless this flag is set and instead only append.

However, it might happen that for a retry or for the replication the
indexing request that has the canHaveDuplicates set to true (the retried request) arrives
at the destination before the original request that does have it set false.
In this case both request add a document and we have a duplicated a document.
This commit adds a workaround: remove the optimization for auto
generated ids and always update the document.
The asumtion is that this will not slow down indexing more than 10 percent,
see: http://benchmarks.elasticsearch.org/

closes elastic#8788
closes elastic#9468
  • Loading branch information
brwe committed Feb 4, 2015
1 parent 4d38cb4 commit 23cd9ec
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 2 deletions.
Expand Up @@ -207,7 +207,7 @@ public InternalEngine(ShardId shardId, @IndexSettings Settings indexSettings, Th
for (int i = 0; i < dirtyLocks.length; i++) {
dirtyLocks[i] = new Object();
}
this.optimizeAutoGenerateId = indexSettings.getAsBoolean("index.optimize_auto_generated_id", true);
this.optimizeAutoGenerateId = indexSettings.getAsBoolean("index.optimize_auto_generated_id", false);

this.indexSettingsService.addListener(applySettings);
this.failEngineOnCorruption = indexSettings.getAsBoolean(INDEX_FAIL_ON_CORRUPTION, true);
Expand Down
Expand Up @@ -33,6 +33,10 @@
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.lucene.index.*;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
Expand All @@ -50,6 +54,7 @@
import org.elasticsearch.index.engine.*;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.indexing.slowlog.ShardSlowLogIndexingService;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.ParseContext.Document;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.internal.SourceFieldMapper;
Expand Down Expand Up @@ -1350,4 +1355,88 @@ null, store, createSnapshotDeletionPolicy(), createTranslog(), createMergePolicy
protected Term newUid(String id) {
return new Term("_uid", id);
}

@Test
public void testRetryWithAutogeneratedIdWorksAndNoDuplicateDocs() throws IOException {

ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), Lucene.STANDARD_ANALYZER, B_1, false);
boolean canHaveDuplicates = false;
boolean autoGeneratedId = true;

Engine.Create index = new Engine.Create(null, newUid("1"), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), canHaveDuplicates, autoGeneratedId);
engine.create(index);
assertThat(index.version(), equalTo(1l));

index = new Engine.Create(null, newUid("1"), doc, index.version(), index.versionType().versionTypeForReplicationAndRecovery(), REPLICA, System.nanoTime(), canHaveDuplicates, autoGeneratedId);
replicaEngine.create(index);
assertThat(index.version(), equalTo(1l));

canHaveDuplicates = true;
index = new Engine.Create(null, newUid("1"), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), canHaveDuplicates, autoGeneratedId);
engine.create(index);
assertThat(index.version(), equalTo(1l));
engine.refresh(new Engine.Refresh("test").force(true));
Engine.Searcher searcher = engine.acquireSearcher("test");
TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), 10);
assertThat(topDocs.totalHits, equalTo(1));

index = new Engine.Create(null, newUid("1"), doc, index.version(), index.versionType().versionTypeForReplicationAndRecovery(), REPLICA, System.nanoTime(), canHaveDuplicates, autoGeneratedId);
try {
replicaEngine.create(index);
fail();
} catch (VersionConflictEngineException e) {
// we ignore version conflicts on replicas, see TransportShardReplicationOperationAction.ignoreReplicaException
}
replicaEngine.refresh(new Engine.Refresh("test").force(true));
Engine.Searcher replicaSearcher = replicaEngine.acquireSearcher("test");
topDocs = replicaSearcher.searcher().search(new MatchAllDocsQuery(), 10);
assertThat(topDocs.totalHits, equalTo(1));
searcher.close();
replicaSearcher.close();
}

@Test
public void testRetryWithAutogeneratedIdsAndWrongOrderWorksAndNoDuplicateDocs() throws IOException {

ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), Lucene.STANDARD_ANALYZER, B_1, false);
boolean canHaveDuplicates = true;
boolean autoGeneratedId = true;

Engine.Create firstIndexRequest = new Engine.Create(null, newUid("1"), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), canHaveDuplicates, autoGeneratedId);
engine.create(firstIndexRequest);
assertThat(firstIndexRequest.version(), equalTo(1l));

Engine.Create firstIndexRequestReplica = new Engine.Create(null, newUid("1"), doc, firstIndexRequest.version(), firstIndexRequest.versionType().versionTypeForReplicationAndRecovery(), REPLICA, System.nanoTime(), canHaveDuplicates, autoGeneratedId);
replicaEngine.create(firstIndexRequestReplica);
assertThat(firstIndexRequestReplica.version(), equalTo(1l));

canHaveDuplicates = false;
Engine.Create secondIndexRequest = new Engine.Create(null, newUid("1"), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), canHaveDuplicates, autoGeneratedId);
try {
engine.create(secondIndexRequest);
fail();
} catch (DocumentAlreadyExistsException e) {
// we can ignore the exception. In case this happens because the retry request arrived first then this error will not be sent back anyway.
// in any other case this is an actual error
}
engine.refresh(new Engine.Refresh("test").force(true));
Engine.Searcher searcher = engine.acquireSearcher("test");
TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), 10);
assertThat(topDocs.totalHits, equalTo(1));

Engine.Create secondIndexRequestReplica = new Engine.Create(null, newUid("1"), doc, firstIndexRequest.version(), firstIndexRequest.versionType().versionTypeForReplicationAndRecovery(), REPLICA, System.nanoTime(), canHaveDuplicates, autoGeneratedId);
try {
replicaEngine.create(secondIndexRequestReplica);
fail();
} catch (VersionConflictEngineException e) {
// we ignore version conflicts on replicas, see TransportShardReplicationOperationAction.ignoreReplicaException.
}
replicaEngine.refresh(new Engine.Refresh("test").force(true));
Engine.Searcher replicaSearcher = replicaEngine.acquireSearcher("test");
topDocs = replicaSearcher.searcher().search(new MatchAllDocsQuery(), 10);
assertThat(topDocs.totalHits, equalTo(1));
searcher.close();
replicaSearcher.close();
}

}
Expand Up @@ -67,7 +67,6 @@ protected Settings nodeSettings(int nodeOrdinal) {
* see https://github.com/elasticsearch/elasticsearch/issues/8788
*/
@Test
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elasticsearch/elasticsearch/issues/8788")
public void testRetryDueToExceptionOnNetworkLayer() throws ExecutionException, InterruptedException, IOException {
final AtomicBoolean exceptionThrown = new AtomicBoolean(false);
int numDocs = scaledRandomIntBetween(100, 1000);
Expand Down

0 comments on commit 23cd9ec

Please sign in to comment.