Skip to content

Commit

Permalink
Add a merge policy that prunes ID postings for soft-deleted but retai…
Browse files Browse the repository at this point in the history
…ned documents (elastic#40741)

* Add a merge policy that prunes soft-deleted postings

This change adds a merge policy that drops all postings for documents that
are marked as deleted. This is usually unnecessary unless soft-deletes are used
with a rentention policy since otherwise a merge would remove deleted documents anyway.
Yet, this merge policy prevents extreme cases where a very large number of soft-deleted
documents are retained and are impacting search and update perfromance.
Note, using this merge policy will remove all search capabilities for soft-deleted documents.

* fix checkstyle

* fix assertion

* fix imports

* fix compilation

* add predicate to select fields to prune

* only purne ID field

* beef up test

* roll back retention query

* foo

* remove redundant modifier

* fix assumption about empty Terms

* remove null check

* Add test for the engine to check if we prune the IDs of retained docs away
  • Loading branch information
s1monw authored and alpar-t committed Jun 10, 2019
1 parent 0fe1948 commit 6f4b463
Show file tree
Hide file tree
Showing 5 changed files with 507 additions and 4 deletions.
Expand Up @@ -74,7 +74,9 @@ final class PerThreadIDVersionAndSeqNoLookup {
// If a segment contains only no-ops, it does not have _uid but has both _soft_deletes and _tombstone fields.
final NumericDocValues softDeletesDV = reader.getNumericDocValues(Lucene.SOFT_DELETES_FIELD);
final NumericDocValues tombstoneDV = reader.getNumericDocValues(SeqNoFieldMapper.TOMBSTONE_NAME);
if (softDeletesDV == null || tombstoneDV == null) {
// this is a special case when we pruned away all IDs in a segment since all docs are deleted.
final boolean allDocsDeleted = (softDeletesDV != null && reader.numDocs() == 0);
if ((softDeletesDV == null || tombstoneDV == null) && allDocsDeleted == false) {
throw new IllegalArgumentException("reader does not have _uid terms but not a no-op segment; " +
"_soft_deletes [" + softDeletesDV + "], _tombstone [" + tombstoneDV + "]");
}
Expand Down
Expand Up @@ -2176,7 +2176,8 @@ private IndexWriterConfig getIndexWriterConfig() {
iwc.setSoftDeletesField(Lucene.SOFT_DELETES_FIELD);
if (softDeleteEnabled) {
mergePolicy = new RecoverySourcePruneMergePolicy(SourceFieldMapper.RECOVERY_SOURCE_NAME, softDeletesPolicy::getRetentionQuery,
new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETES_FIELD, softDeletesPolicy::getRetentionQuery, mergePolicy));
new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETES_FIELD, softDeletesPolicy::getRetentionQuery,
new PrunePostingsMergePolicy(mergePolicy, IdFieldMapper.NAME)));
}
iwc.setMergePolicy(new ElasticsearchMergePolicy(mergePolicy));
iwc.setSimilarity(engineConfig.getSimilarity());
Expand Down
@@ -0,0 +1,224 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.engine;

import org.apache.lucene.codecs.FieldsProducer;
import org.apache.lucene.index.CodecReader;
import org.apache.lucene.index.FilterCodecReader;
import org.apache.lucene.index.FilterLeafReader;
import org.apache.lucene.index.FilteredTermsEnum;
import org.apache.lucene.index.ImpactsEnum;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.OneMergeWrappingMergePolicy;
import org.apache.lucene.index.PostingsEnum;
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;

import java.io.IOException;
import java.util.Iterator;

/**
* This merge policy drops id field postings for all delete documents this can be
* useful to guarantee consistent update performance even if a large number of deleted / updated documents
* are retained. Merging postings away is efficient since lucene visits postings term by term and
* with the original live-docs being available we are adding a negotiable overhead such that we can
* prune soft-deletes by default. Yet, using this merge policy will cause loosing all search capabilities on top of
* soft deleted documents independent of the retention policy. Note, in order for this merge policy to be effective it needs to be added
* before the {@link org.apache.lucene.index.SoftDeletesRetentionMergePolicy} because otherwise only documents that are deleted / removed
* anyways will be pruned.
*/
final class PrunePostingsMergePolicy extends OneMergeWrappingMergePolicy {

PrunePostingsMergePolicy(MergePolicy in, String idField) {
super(in, toWrap -> new OneMerge(toWrap.segments) {
@Override
public CodecReader wrapForMerge(CodecReader reader) throws IOException {
CodecReader wrapped = toWrap.wrapForMerge(reader);
return wrapReader(wrapped, idField);
}
});
}

private static CodecReader wrapReader(CodecReader reader, String idField) {
Bits liveDocs = reader.getLiveDocs();
if (liveDocs == null) {
return reader; // no deleted docs - we are good!
}
final boolean fullyDeletedSegment = reader.numDocs() == 0;
return new FilterCodecReader(reader) {

@Override
public FieldsProducer getPostingsReader() {
FieldsProducer postingsReader = super.getPostingsReader();
if (postingsReader == null) {
return null;
}
return new FieldsProducer() {
@Override
public void close() throws IOException {
postingsReader.close();
}

@Override
public void checkIntegrity() throws IOException {
postingsReader.checkIntegrity();
}

@Override
public Iterator<String> iterator() {
return postingsReader.iterator();
}

@Override
public Terms terms(String field) throws IOException {
Terms in = postingsReader.terms(field);
if (idField.equals(field) && in != null) {
return new FilterLeafReader.FilterTerms(in) {
@Override
public TermsEnum iterator() throws IOException {
TermsEnum iterator = super.iterator();
return new FilteredTermsEnum(iterator, false) {
private PostingsEnum internal;

@Override
protected AcceptStatus accept(BytesRef term) throws IOException {
if (fullyDeletedSegment) {
return AcceptStatus.END; // short-cut this if we don't match anything
}
internal = postings(internal, PostingsEnum.NONE);
if (internal.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) {
return AcceptStatus.YES;
}
return AcceptStatus.NO;
}

@Override
public PostingsEnum postings(PostingsEnum reuse, int flags) throws IOException {
if (reuse instanceof OnlyLiveDocsPostingsEnum) {
OnlyLiveDocsPostingsEnum reuseInstance = (OnlyLiveDocsPostingsEnum) reuse;
reuseInstance.reset(super.postings(reuseInstance.in, flags));
return reuseInstance;
}
return new OnlyLiveDocsPostingsEnum(super.postings(null, flags), liveDocs);
}

@Override
public ImpactsEnum impacts(int flags) throws IOException {
throw new UnsupportedOperationException();
}
};
}
};
} else {
return in;
}
}

@Override
public int size() {
return postingsReader.size();
}

@Override
public long ramBytesUsed() {
return postingsReader.ramBytesUsed();
}
};
}

@Override
public CacheHelper getCoreCacheHelper() {
return null;
}

@Override
public CacheHelper getReaderCacheHelper() {
return null;
}
};
}

private static final class OnlyLiveDocsPostingsEnum extends PostingsEnum {

private final Bits liveDocs;
private PostingsEnum in;

OnlyLiveDocsPostingsEnum(PostingsEnum in, Bits liveDocs) {
this.liveDocs = liveDocs;
reset(in);
}

void reset(PostingsEnum in) {
this.in = in;
}

@Override
public int docID() {
return in.docID();
}

@Override
public int nextDoc() throws IOException {
int docId;
do {
docId = in.nextDoc();
} while (docId != DocIdSetIterator.NO_MORE_DOCS && liveDocs.get(docId) == false);
return docId;
}

@Override
public int advance(int target) {
throw new UnsupportedOperationException();
}

@Override
public long cost() {
return in.cost();
}

@Override
public int freq() throws IOException {
return in.freq();
}

@Override
public int nextPosition() throws IOException {
return in.nextPosition();
}

@Override
public int startOffset() throws IOException {
return in.startOffset();
}

@Override
public int endOffset() throws IOException {
return in.endOffset();
}

@Override
public BytesRef getPayload() throws IOException {
return in.getPayload();
}
}
}
Expand Up @@ -51,6 +51,8 @@
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SoftDeletesRetentionMergePolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.index.TieredMergePolicy;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
Expand Down Expand Up @@ -119,6 +121,8 @@
import org.elasticsearch.index.mapper.RootObjectMapper;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.VersionFieldMapper;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.index.seqno.RetentionLease;
Expand Down Expand Up @@ -1458,6 +1462,61 @@ public void testForceMergeWithoutSoftDeletes() throws IOException {
}
}

/*
* we are testing an edge case here where we have a fully deleted segment that is retained but has all it's IDs pruned away.
*/
public void testLookupVersionWithPrunedAwayIds() throws IOException {
try (Directory dir = newDirectory()) {
IndexWriterConfig indexWriterConfig = new IndexWriterConfig(Lucene.STANDARD_ANALYZER);
indexWriterConfig.setSoftDeletesField(Lucene.SOFT_DELETES_FIELD);
try (IndexWriter writer = new IndexWriter(dir,
indexWriterConfig.setMergePolicy(new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETES_FIELD,
MatchAllDocsQuery::new, new PrunePostingsMergePolicy(indexWriterConfig.getMergePolicy(), "_id"))))) {
org.apache.lucene.document.Document doc = new org.apache.lucene.document.Document();
doc.add(new Field(IdFieldMapper.NAME, "1", IdFieldMapper.Defaults.FIELD_TYPE));
doc.add(new NumericDocValuesField(VersionFieldMapper.NAME, -1));
doc.add(new NumericDocValuesField(SeqNoFieldMapper.NAME, 1));
doc.add(new NumericDocValuesField(SeqNoFieldMapper.PRIMARY_TERM_NAME, 1));
writer.addDocument(doc);
writer.flush();
writer.softUpdateDocument(new Term(IdFieldMapper.NAME, "1"), doc, new NumericDocValuesField(Lucene.SOFT_DELETES_FIELD, 1));
writer.updateNumericDocValue(new Term(IdFieldMapper.NAME, "1"), Lucene.SOFT_DELETES_FIELD, 1);
writer.forceMerge(1);
try (DirectoryReader reader = DirectoryReader.open(writer)) {
assertEquals(1, reader.leaves().size());
assertNull(VersionsAndSeqNoResolver.loadDocIdAndVersion(reader, new Term(IdFieldMapper.NAME, "1"), false));
}
}
}
}

public void testUpdateWithFullyDeletedSegments() throws IOException {
Settings.Builder settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), Integer.MAX_VALUE);
final IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetaData);
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
final Set<String> liveDocs = new HashSet<>();
try (Store store = createStore();
InternalEngine engine = createEngine(config(indexSettings, store, createTempDir(), newMergePolicy(), null,
null, globalCheckpoint::get))) {
int numDocs = scaledRandomIntBetween(10, 100);
for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), B_1, null);
engine.index(indexForDoc(doc));
liveDocs.add(doc.id());
}

for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), B_1, null);
engine.index(indexForDoc(doc));
liveDocs.add(doc.id());
}
}
}

public void testForceMergeWithSoftDeletesRetention() throws Exception {
final long retainedExtraOps = randomLongBetween(0, 10);
Settings.Builder settings = Settings.builder()
Expand Down Expand Up @@ -5734,8 +5793,60 @@ public void testGetReaderAttributes() throws IOException {
assertEquals("ON_HEAP", readerAttributes.get("blocktree.terms.fst._id"));
assertEquals("ON_HEAP", readerAttributes.get("blocktree.terms.fst"));
break;
default:
fail("unknownw type");
default:
fail("unknownw type");
}
}
}

public void testPruneAwayDeletedButRetainedIds() throws Exception {
IOUtils.close(engine, store);
Settings settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build();
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(
IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build());
store = createStore(indexSettings, newDirectory());
LogDocMergePolicy policy = new LogDocMergePolicy();
policy.setMinMergeDocs(10000);
try (InternalEngine engine = createEngine(indexSettings, store, createTempDir(), policy)) {
int numDocs = between(1, 20);
for (int i = 0; i < numDocs; i++) {
index(engine, i);
}
engine.forceMerge(true);
engine.delete(new Engine.Delete("_doc", "0", newUid("0"), primaryTerm.get()));
engine.refresh("test");
// now we have 2 segments since we now added a tombstone plus the old segment with the delete
try (Searcher searcher = engine.acquireSearcher("test")) {
IndexReader reader = searcher.reader();
assertEquals(2, reader.leaves().size());
LeafReaderContext leafReaderContext = reader.leaves().get(0);
LeafReader leafReader = leafReaderContext.reader();
assertEquals("the delete and the tombstone", 1, leafReader.numDeletedDocs());
assertEquals(numDocs, leafReader.maxDoc());
Terms id = leafReader.terms("_id");
assertNotNull(id);
assertEquals("deleted IDs are NOT YET pruned away", reader.numDocs() + 1, id.size());
TermsEnum iterator = id.iterator();
assertTrue(iterator.seekExact(Uid.encodeId("0")));
}

// lets force merge the tombstone and the original segment and make sure the doc is still there but the ID term is gone
engine.forceMerge(true);
engine.refresh("test");
try (Searcher searcher = engine.acquireSearcher("test")) {
IndexReader reader = searcher.reader();
assertEquals(1, reader.leaves().size());
LeafReaderContext leafReaderContext = reader.leaves().get(0);
LeafReader leafReader = leafReaderContext.reader();
assertEquals("the delete and the tombstone", 2, leafReader.numDeletedDocs());
assertEquals(numDocs + 1, leafReader.maxDoc());
Terms id = leafReader.terms("_id");
assertNotNull(id);
assertEquals("deleted IDs are pruned away", reader.numDocs(), id.size());
TermsEnum iterator = id.iterator();
assertFalse(iterator.seekExact(Uid.encodeId("0")));
}
}
}
Expand Down

0 comments on commit 6f4b463

Please sign in to comment.