Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a merge policy that prunes ID postings for soft-deleted but retained documents #40741

Merged
merged 17 commits into from Jun 6, 2019
Merged
Expand Up @@ -74,7 +74,9 @@ final class PerThreadIDVersionAndSeqNoLookup {
// If a segment contains only no-ops, it does not have _uid but has both _soft_deletes and _tombstone fields.
final NumericDocValues softDeletesDV = reader.getNumericDocValues(Lucene.SOFT_DELETES_FIELD);
final NumericDocValues tombstoneDV = reader.getNumericDocValues(SeqNoFieldMapper.TOMBSTONE_NAME);
if (softDeletesDV == null || tombstoneDV == null) {
// this is a special case when we pruned away all IDs in a segment since all docs are deleted.
final boolean allDocsDeleted = (softDeletesDV != null && reader.numDocs() == 0);
if ((softDeletesDV == null || tombstoneDV == null) && allDocsDeleted == false) {
throw new IllegalArgumentException("reader does not have _uid terms but not a no-op segment; " +
"_soft_deletes [" + softDeletesDV + "], _tombstone [" + tombstoneDV + "]");
}
Expand Down
Expand Up @@ -2176,7 +2176,8 @@ private IndexWriterConfig getIndexWriterConfig() {
iwc.setSoftDeletesField(Lucene.SOFT_DELETES_FIELD);
if (softDeleteEnabled) {
mergePolicy = new RecoverySourcePruneMergePolicy(SourceFieldMapper.RECOVERY_SOURCE_NAME, softDeletesPolicy::getRetentionQuery,
new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETES_FIELD, softDeletesPolicy::getRetentionQuery, mergePolicy));
new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETES_FIELD, softDeletesPolicy::getRetentionQuery,
new PrunePostingsMergePolicy(mergePolicy, IdFieldMapper.NAME)));
}
iwc.setMergePolicy(new ElasticsearchMergePolicy(mergePolicy));
iwc.setSimilarity(engineConfig.getSimilarity());
Expand Down
@@ -0,0 +1,224 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.engine;

import org.apache.lucene.codecs.FieldsProducer;
import org.apache.lucene.index.CodecReader;
import org.apache.lucene.index.FilterCodecReader;
import org.apache.lucene.index.FilterLeafReader;
import org.apache.lucene.index.FilteredTermsEnum;
import org.apache.lucene.index.ImpactsEnum;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.OneMergeWrappingMergePolicy;
import org.apache.lucene.index.PostingsEnum;
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;

import java.io.IOException;
import java.util.Iterator;

/**
* This merge policy drops id field postings for all delete documents this can be
* useful to guarantee consistent update performance even if a large number of deleted / updated documents
* are retained. Merging postings away is efficient since lucene visits postings term by term and
* with the original live-docs being available we are adding a negotiable overhead such that we can
* prune soft-deletes by default. Yet, using this merge policy will cause loosing all search capabilities on top of
* soft deleted documents independent of the retention policy. Note, in order for this merge policy to be effective it needs to be added
* before the {@link org.apache.lucene.index.SoftDeletesRetentionMergePolicy} because otherwise only documents that are deleted / removed
* anyways will be pruned.
*/
final class PrunePostingsMergePolicy extends OneMergeWrappingMergePolicy {

PrunePostingsMergePolicy(MergePolicy in, String idField) {
super(in, toWrap -> new OneMerge(toWrap.segments) {
@Override
public CodecReader wrapForMerge(CodecReader reader) throws IOException {
CodecReader wrapped = toWrap.wrapForMerge(reader);
return wrapReader(wrapped, idField);
}
});
}

private static CodecReader wrapReader(CodecReader reader, String idField) {
Bits liveDocs = reader.getLiveDocs();
if (liveDocs == null) {
return reader; // no deleted docs - we are good!
}
final boolean fullyDeletedSegment = reader.numDocs() == 0;
return new FilterCodecReader(reader) {

@Override
public FieldsProducer getPostingsReader() {
FieldsProducer postingsReader = super.getPostingsReader();
if (postingsReader == null) {
return null;
}
return new FieldsProducer() {
@Override
public void close() throws IOException {
postingsReader.close();
}

@Override
public void checkIntegrity() throws IOException {
postingsReader.checkIntegrity();
}

@Override
public Iterator<String> iterator() {
return postingsReader.iterator();
}

@Override
public Terms terms(String field) throws IOException {
Terms in = postingsReader.terms(field);
if (idField.equals(field) && in != null) {
return new FilterLeafReader.FilterTerms(in) {
@Override
public TermsEnum iterator() throws IOException {
TermsEnum iterator = super.iterator();
return new FilteredTermsEnum(iterator, false) {
private PostingsEnum internal;

@Override
protected AcceptStatus accept(BytesRef term) throws IOException {
if (fullyDeletedSegment) {
return AcceptStatus.END; // short-cut this if we don't match anything
}
internal = postings(internal, PostingsEnum.NONE);
if (internal.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) {
return AcceptStatus.YES;
}
return AcceptStatus.NO;
}

@Override
public PostingsEnum postings(PostingsEnum reuse, int flags) throws IOException {
if (reuse instanceof OnlyLiveDocsPostingsEnum) {
OnlyLiveDocsPostingsEnum reuseInstance = (OnlyLiveDocsPostingsEnum) reuse;
reuseInstance.reset(super.postings(reuseInstance.in, flags));
return reuseInstance;
}
return new OnlyLiveDocsPostingsEnum(super.postings(null, flags), liveDocs);
}

@Override
public ImpactsEnum impacts(int flags) throws IOException {
throw new UnsupportedOperationException();
}
};
}
};
} else {
return in;
}
}

@Override
public int size() {
return postingsReader.size();
}

@Override
public long ramBytesUsed() {
return postingsReader.ramBytesUsed();
}
};
}

@Override
public CacheHelper getCoreCacheHelper() {
return null;
}

@Override
public CacheHelper getReaderCacheHelper() {
return null;
}
};
}

private static final class OnlyLiveDocsPostingsEnum extends PostingsEnum {

private final Bits liveDocs;
private PostingsEnum in;

OnlyLiveDocsPostingsEnum(PostingsEnum in, Bits liveDocs) {
this.liveDocs = liveDocs;
reset(in);
}

void reset(PostingsEnum in) {
this.in = in;
}

@Override
public int docID() {
return in.docID();
}

@Override
public int nextDoc() throws IOException {
int docId;
do {
docId = in.nextDoc();
} while (docId != DocIdSetIterator.NO_MORE_DOCS && liveDocs.get(docId) == false);
return docId;
}

@Override
public int advance(int target) {
throw new UnsupportedOperationException();
}

@Override
public long cost() {
return in.cost();
}

@Override
public int freq() throws IOException {
return in.freq();
}

@Override
public int nextPosition() throws IOException {
return in.nextPosition();
}

@Override
public int startOffset() throws IOException {
return in.startOffset();
}

@Override
public int endOffset() throws IOException {
return in.endOffset();
}

@Override
public BytesRef getPayload() throws IOException {
return in.getPayload();
}
}
}
Expand Up @@ -51,6 +51,8 @@
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SoftDeletesRetentionMergePolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.index.TieredMergePolicy;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
Expand Down Expand Up @@ -119,6 +121,8 @@
import org.elasticsearch.index.mapper.RootObjectMapper;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.VersionFieldMapper;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.index.seqno.RetentionLease;
Expand Down Expand Up @@ -1458,6 +1462,61 @@ public void testForceMergeWithoutSoftDeletes() throws IOException {
}
}

/*
* we are testing an edge case here where we have a fully deleted segment that is retained but has all it's IDs pruned away.
*/
public void testLookupVersionWithPrunedAwayIds() throws IOException {
try (Directory dir = newDirectory()) {
IndexWriterConfig indexWriterConfig = new IndexWriterConfig(Lucene.STANDARD_ANALYZER);
indexWriterConfig.setSoftDeletesField(Lucene.SOFT_DELETES_FIELD);
try (IndexWriter writer = new IndexWriter(dir,
indexWriterConfig.setMergePolicy(new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETES_FIELD,
MatchAllDocsQuery::new, new PrunePostingsMergePolicy(indexWriterConfig.getMergePolicy(), "_id"))))) {
org.apache.lucene.document.Document doc = new org.apache.lucene.document.Document();
doc.add(new Field(IdFieldMapper.NAME, "1", IdFieldMapper.Defaults.FIELD_TYPE));
doc.add(new NumericDocValuesField(VersionFieldMapper.NAME, -1));
doc.add(new NumericDocValuesField(SeqNoFieldMapper.NAME, 1));
doc.add(new NumericDocValuesField(SeqNoFieldMapper.PRIMARY_TERM_NAME, 1));
writer.addDocument(doc);
writer.flush();
writer.softUpdateDocument(new Term(IdFieldMapper.NAME, "1"), doc, new NumericDocValuesField(Lucene.SOFT_DELETES_FIELD, 1));
writer.updateNumericDocValue(new Term(IdFieldMapper.NAME, "1"), Lucene.SOFT_DELETES_FIELD, 1);
writer.forceMerge(1);
try (DirectoryReader reader = DirectoryReader.open(writer)) {
assertEquals(1, reader.leaves().size());
assertNull(VersionsAndSeqNoResolver.loadDocIdAndVersion(reader, new Term(IdFieldMapper.NAME, "1"), false));
}
}
}
}

public void testUpdateWithFullyDeletedSegments() throws IOException {
Settings.Builder settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), Integer.MAX_VALUE);
final IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetaData);
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
final Set<String> liveDocs = new HashSet<>();
try (Store store = createStore();
InternalEngine engine = createEngine(config(indexSettings, store, createTempDir(), newMergePolicy(), null,
null, globalCheckpoint::get))) {
int numDocs = scaledRandomIntBetween(10, 100);
for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), B_1, null);
engine.index(indexForDoc(doc));
liveDocs.add(doc.id());
}

for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), B_1, null);
engine.index(indexForDoc(doc));
liveDocs.add(doc.id());
}
}
}

public void testForceMergeWithSoftDeletesRetention() throws Exception {
final long retainedExtraOps = randomLongBetween(0, 10);
Settings.Builder settings = Settings.builder()
Expand Down Expand Up @@ -5734,8 +5793,60 @@ public void testGetReaderAttributes() throws IOException {
assertEquals("ON_HEAP", readerAttributes.get("blocktree.terms.fst._id"));
assertEquals("ON_HEAP", readerAttributes.get("blocktree.terms.fst"));
break;
default:
fail("unknownw type");
default:
fail("unknownw type");
}
}
}

public void testPruneAwayDeletedButRetainedIds() throws Exception {
IOUtils.close(engine, store);
Settings settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build();
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(
IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build());
store = createStore(indexSettings, newDirectory());
LogDocMergePolicy policy = new LogDocMergePolicy();
policy.setMinMergeDocs(10000);
try (InternalEngine engine = createEngine(indexSettings, store, createTempDir(), policy)) {
int numDocs = between(1, 20);
for (int i = 0; i < numDocs; i++) {
index(engine, i);
}
engine.forceMerge(true);
engine.delete(new Engine.Delete("_doc", "0", newUid("0"), primaryTerm.get()));
engine.refresh("test");
// now we have 2 segments since we now added a tombstone plus the old segment with the delete
try (Searcher searcher = engine.acquireSearcher("test")) {
IndexReader reader = searcher.reader();
assertEquals(2, reader.leaves().size());
LeafReaderContext leafReaderContext = reader.leaves().get(0);
LeafReader leafReader = leafReaderContext.reader();
assertEquals("the delete and the tombstone", 1, leafReader.numDeletedDocs());
assertEquals(numDocs, leafReader.maxDoc());
Terms id = leafReader.terms("_id");
assertNotNull(id);
assertEquals("deleted IDs are NOT YET pruned away", reader.numDocs() + 1, id.size());
TermsEnum iterator = id.iterator();
assertTrue(iterator.seekExact(Uid.encodeId("0")));
}

// lets force merge the tombstone and the original segment and make sure the doc is still there but the ID term is gone
engine.forceMerge(true);
engine.refresh("test");
try (Searcher searcher = engine.acquireSearcher("test")) {
IndexReader reader = searcher.reader();
assertEquals(1, reader.leaves().size());
LeafReaderContext leafReaderContext = reader.leaves().get(0);
LeafReader leafReader = leafReaderContext.reader();
assertEquals("the delete and the tombstone", 2, leafReader.numDeletedDocs());
assertEquals(numDocs + 1, leafReader.maxDoc());
Terms id = leafReader.terms("_id");
assertNotNull(id);
assertEquals("deleted IDs are pruned away", reader.numDocs(), id.size());
TermsEnum iterator = id.iterator();
assertFalse(iterator.seekExact(Uid.encodeId("0")));
}
}
}
Expand Down