Skip to content

Commit

Permalink
Do not use soft-deletes to resolve indexing strategy (#43336)
Browse files Browse the repository at this point in the history
This PR reverts #35230.

Previously, we reply on soft-deletes to fill the mismatch between the
version map and the Lucene index. This is no longer needed after #43202
where we rebuild the version map when opening an engine. Moreover,
PrunePostingsMergePolicy can prune _id of soft-deleted documents out of
order; thus the lookup result including soft-deletes sometimes does not
return the latest version (although it's okay as we only use a valid
result in an engine).

With this change, we use only live documents in Lucene to resolve the
indexing strategy. This is perfectly safe since we keep all deleted
documents after the local checkpoint in the version map.

Closes #42979
  • Loading branch information
dnhatn committed Jun 19, 2019
1 parent 680d6ed commit f47174f
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 87 deletions.
Expand Up @@ -102,38 +102,20 @@ public DocIdAndVersion lookupVersion(BytesRef id, boolean loadSeqNo, LeafReaderC
throws IOException {
assert context.reader().getCoreCacheHelper().getKey().equals(readerKey) :
"context's reader is not the same as the reader class was initialized on.";
int docID = getDocID(id, context.reader().getLiveDocs());
int docID = getDocID(id, context);

if (docID != DocIdSetIterator.NO_MORE_DOCS) {
final NumericDocValues versions = context.reader().getNumericDocValues(VersionFieldMapper.NAME);
if (versions == null) {
throw new IllegalArgumentException("reader misses the [" + VersionFieldMapper.NAME + "] field");
}
if (versions.advanceExact(docID) == false) {
throw new IllegalArgumentException("Document [" + docID + "] misses the [" + VersionFieldMapper.NAME + "] field");
}
final long seqNo;
final long term;
if (loadSeqNo) {
NumericDocValues seqNos = context.reader().getNumericDocValues(SeqNoFieldMapper.NAME);
// remove the null check in 7.0 once we can't read indices with no seq#
if (seqNos != null && seqNos.advanceExact(docID)) {
seqNo = seqNos.longValue();
} else {
seqNo = UNASSIGNED_SEQ_NO;
}
NumericDocValues terms = context.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
if (terms != null && terms.advanceExact(docID)) {
term = terms.longValue();
} else {
term = UNASSIGNED_PRIMARY_TERM;
}

seqNo = readNumericDocValues(context.reader(), SeqNoFieldMapper.NAME, docID);
term = readNumericDocValues(context.reader(), SeqNoFieldMapper.PRIMARY_TERM_NAME, docID);
} else {
seqNo = UNASSIGNED_SEQ_NO;
term = UNASSIGNED_PRIMARY_TERM;
}
return new DocIdAndVersion(docID, versions.longValue(), seqNo, term, context.reader(), context.docBase);
final long version = readNumericDocValues(context.reader(), VersionFieldMapper.NAME, docID);
return new DocIdAndVersion(docID, version, seqNo, term, context.reader(), context.docBase);
} else {
return null;
}
Expand All @@ -143,9 +125,10 @@ public DocIdAndVersion lookupVersion(BytesRef id, boolean loadSeqNo, LeafReaderC
* returns the internal lucene doc id for the given id bytes.
* {@link DocIdSetIterator#NO_MORE_DOCS} is returned if not found
* */
private int getDocID(BytesRef id, Bits liveDocs) throws IOException {
private int getDocID(BytesRef id, LeafReaderContext context) throws IOException {
// termsEnum can possibly be null here if this leaf contains only no-ops.
if (termsEnum != null && termsEnum.seekExact(id)) {
final Bits liveDocs = context.reader().getLiveDocs();
int docID = DocIdSetIterator.NO_MORE_DOCS;
// there may be more than one matching docID, in the case of nested docs, so we want the last one:
docsEnum = termsEnum.postings(docsEnum, 0);
Expand All @@ -161,41 +144,23 @@ private int getDocID(BytesRef id, Bits liveDocs) throws IOException {
}
}

private static long readNumericDocValues(LeafReader reader, String field, int docId) throws IOException {
final NumericDocValues dv = reader.getNumericDocValues(field);
if (dv == null || dv.advanceExact(docId) == false) {
assert false : "document [" + docId + "] does not have docValues for [" + field + "]";
throw new IllegalStateException("document [" + docId + "] does not have docValues for [" + field + "]");
}
return dv.longValue();
}

/** Return null if id is not found. */
DocIdAndSeqNo lookupSeqNo(BytesRef id, LeafReaderContext context) throws IOException {
assert context.reader().getCoreCacheHelper().getKey().equals(readerKey) :
"context's reader is not the same as the reader class was initialized on.";
// termsEnum can possibly be null here if this leaf contains only no-ops.
if (termsEnum != null && termsEnum.seekExact(id)) {
docsEnum = termsEnum.postings(docsEnum, 0);
final Bits liveDocs = context.reader().getLiveDocs();
DocIdAndSeqNo result = null;
int docID = docsEnum.nextDoc();
if (docID != DocIdSetIterator.NO_MORE_DOCS) {
final NumericDocValues seqNoDV = context.reader().getNumericDocValues(SeqNoFieldMapper.NAME);
for (; docID != DocIdSetIterator.NO_MORE_DOCS; docID = docsEnum.nextDoc()) {
final long seqNo;
// remove the null check in 7.0 once we can't read indices with no seq#
if (seqNoDV != null && seqNoDV.advanceExact(docID)) {
seqNo = seqNoDV.longValue();
} else {
seqNo = UNASSIGNED_SEQ_NO;
}
final boolean isLive = (liveDocs == null || liveDocs.get(docID));
if (isLive) {
// The live document must always be the latest copy, thus we can early terminate here.
// If a nested docs is live, we return the first doc which doesn't have term (only the last doc has term).
// This should not be an issue since we no longer use primary term as tier breaker when comparing operations.
assert result == null || result.seqNo <= seqNo :
"the live doc does not have the highest seq_no; live_seq_no=" + seqNo + " < deleted_seq_no=" + result.seqNo;
return new DocIdAndSeqNo(docID, seqNo, context, isLive);
}
if (result == null || result.seqNo < seqNo) {
result = new DocIdAndSeqNo(docID, seqNo, context, isLive);
}
}
}
return result;
final int docID = getDocID(id, context);
if (docID != DocIdSetIterator.NO_MORE_DOCS) {
final long seqNo = readNumericDocValues(context.reader(), SeqNoFieldMapper.NAME, docID);
return new DocIdAndSeqNo(docID, seqNo, context);
} else {
return null;
}
Expand Down
Expand Up @@ -114,13 +114,11 @@ public static class DocIdAndSeqNo {
public final int docId;
public final long seqNo;
public final LeafReaderContext context;
public final boolean isLive;

DocIdAndSeqNo(int docId, long seqNo, LeafReaderContext context, boolean isLive) {
DocIdAndSeqNo(int docId, long seqNo, LeafReaderContext context) {
this.docId = docId;
this.seqNo = seqNo;
this.context = context;
this.isLive = isLive;
}
}

Expand Down Expand Up @@ -149,32 +147,21 @@ public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term,

/**
* Loads the internal docId and sequence number of the latest copy for a given uid from the provided reader.
* The flag {@link DocIdAndSeqNo#isLive} indicates whether the returned document is live or (soft)deleted.
* This returns {@code null} if no such document matching the given term uid.
* The result is either null or the live and latest version of the given uid.
*/
public static DocIdAndSeqNo loadDocIdAndSeqNo(IndexReader reader, Term term) throws IOException {
final PerThreadIDVersionAndSeqNoLookup[] lookups = getLookupState(reader, term.field());
final List<LeafReaderContext> leaves = reader.leaves();
DocIdAndSeqNo latest = null;
// iterate backwards to optimize for the frequently updated documents
// which are likely to be in the last segments
for (int i = leaves.size() - 1; i >= 0; i--) {
final LeafReaderContext leaf = leaves.get(i);
final PerThreadIDVersionAndSeqNoLookup lookup = lookups[leaf.ord];
final DocIdAndSeqNo result = lookup.lookupSeqNo(term.bytes(), leaf);
if (result == null) {
continue;
}
if (result.isLive) {
// The live document must always be the latest copy, thus we can early terminate here.
assert latest == null || latest.seqNo <= result.seqNo :
"the live doc does not have the highest seq_no; live_seq_no=" + result.seqNo + " < deleted_seq_no=" + latest.seqNo;
if (result != null) {
return result;
}
if (latest == null || latest.seqNo < result.seqNo) {
latest = result;
}
}
return latest;
return null;
}
}
Expand Up @@ -705,11 +705,7 @@ private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op)
if (docAndSeqNo == null) {
status = OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND;
} else if (op.seqNo() > docAndSeqNo.seqNo) {
if (docAndSeqNo.isLive) {
status = OpVsLuceneDocStatus.OP_NEWER;
} else {
status = OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND;
}
status = OpVsLuceneDocStatus.OP_NEWER;
} else if (op.seqNo() == docAndSeqNo.seqNo) {
assert localCheckpointTracker.contains(op.seqNo()) || softDeleteEnabled == false :
"local checkpoint tracker is not updated seq_no=" + op.seqNo() + " id=" + op.id();
Expand Down
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.VersionFieldMapper;
import org.elasticsearch.test.ESTestCase;

Expand All @@ -52,6 +53,8 @@ public void testSimple() throws Exception {
Document doc = new Document();
doc.add(new Field(IdFieldMapper.NAME, "6", IdFieldMapper.Defaults.FIELD_TYPE));
doc.add(new NumericDocValuesField(VersionFieldMapper.NAME, 87));
doc.add(new NumericDocValuesField(SeqNoFieldMapper.NAME, randomNonNegativeLong()));
doc.add(new NumericDocValuesField(SeqNoFieldMapper.PRIMARY_TERM_NAME, randomLongBetween(1, Long.MAX_VALUE)));
writer.addDocument(doc);
writer.addDocument(new Document());
DirectoryReader reader = DirectoryReader.open(writer);
Expand Down Expand Up @@ -86,6 +89,8 @@ public void testTwoDocuments() throws Exception {
Document doc = new Document();
doc.add(new Field(IdFieldMapper.NAME, "6", IdFieldMapper.Defaults.FIELD_TYPE));
doc.add(new NumericDocValuesField(VersionFieldMapper.NAME, 87));
doc.add(new NumericDocValuesField(SeqNoFieldMapper.NAME, randomNonNegativeLong()));
doc.add(new NumericDocValuesField(SeqNoFieldMapper.PRIMARY_TERM_NAME, randomLongBetween(1, Long.MAX_VALUE)));
writer.addDocument(doc);
writer.addDocument(doc);
writer.addDocument(new Document());
Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.VersionFieldMapper;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
Expand Down Expand Up @@ -68,6 +69,8 @@ public void testVersions() throws Exception {
Document doc = new Document();
doc.add(new Field(IdFieldMapper.NAME, "1", IdFieldMapper.Defaults.FIELD_TYPE));
doc.add(new NumericDocValuesField(VersionFieldMapper.NAME, 1));
doc.add(new NumericDocValuesField(SeqNoFieldMapper.NAME, randomNonNegativeLong()));
doc.add(new NumericDocValuesField(SeqNoFieldMapper.PRIMARY_TERM_NAME, randomLongBetween(1, Long.MAX_VALUE)));
writer.updateDocument(new Term(IdFieldMapper.NAME, "1"), doc);
directoryReader = reopen(directoryReader);
assertThat(loadDocIdAndVersion(directoryReader, new Term(IdFieldMapper.NAME, "1"), randomBoolean()).version, equalTo(1L));
Expand All @@ -77,6 +80,8 @@ public void testVersions() throws Exception {
Field version = new NumericDocValuesField(VersionFieldMapper.NAME, 2);
doc.add(uid);
doc.add(version);
doc.add(new NumericDocValuesField(SeqNoFieldMapper.NAME, randomNonNegativeLong()));
doc.add(new NumericDocValuesField(SeqNoFieldMapper.PRIMARY_TERM_NAME, randomLongBetween(1, Long.MAX_VALUE)));
writer.updateDocument(new Term(IdFieldMapper.NAME, "1"), doc);
directoryReader = reopen(directoryReader);
assertThat(loadDocIdAndVersion(directoryReader, new Term(IdFieldMapper.NAME, "1"), randomBoolean()).version, equalTo(2L));
Expand All @@ -86,6 +91,8 @@ public void testVersions() throws Exception {
version.setLongValue(3);
doc.add(uid);
doc.add(version);
doc.add(new NumericDocValuesField(SeqNoFieldMapper.NAME, randomNonNegativeLong()));
doc.add(new NumericDocValuesField(SeqNoFieldMapper.PRIMARY_TERM_NAME, randomLongBetween(1, Long.MAX_VALUE)));
writer.updateDocument(new Term(IdFieldMapper.NAME, "1"), doc);

directoryReader = reopen(directoryReader);
Expand Down Expand Up @@ -115,6 +122,8 @@ public void testNestedDocuments() throws IOException {
doc.add(new Field(IdFieldMapper.NAME, "1", IdFieldMapper.Defaults.FIELD_TYPE));
NumericDocValuesField version = new NumericDocValuesField(VersionFieldMapper.NAME, 5L);
doc.add(version);
doc.add(new NumericDocValuesField(SeqNoFieldMapper.NAME, randomNonNegativeLong()));
doc.add(new NumericDocValuesField(SeqNoFieldMapper.PRIMARY_TERM_NAME, randomLongBetween(1, Long.MAX_VALUE)));
docs.add(doc);

writer.updateDocuments(new Term(IdFieldMapper.NAME, "1"), docs);
Expand Down Expand Up @@ -145,6 +154,8 @@ public void testCache() throws Exception {
Document doc = new Document();
doc.add(new Field(IdFieldMapper.NAME, "6", IdFieldMapper.Defaults.FIELD_TYPE));
doc.add(new NumericDocValuesField(VersionFieldMapper.NAME, 87));
doc.add(new NumericDocValuesField(SeqNoFieldMapper.NAME, randomNonNegativeLong()));
doc.add(new NumericDocValuesField(SeqNoFieldMapper.PRIMARY_TERM_NAME, randomLongBetween(1, Long.MAX_VALUE)));
writer.addDocument(doc);
DirectoryReader reader = DirectoryReader.open(writer);
// should increase cache size by 1
Expand All @@ -170,6 +181,8 @@ public void testCacheFilterReader() throws Exception {
Document doc = new Document();
doc.add(new Field(IdFieldMapper.NAME, "6", IdFieldMapper.Defaults.FIELD_TYPE));
doc.add(new NumericDocValuesField(VersionFieldMapper.NAME, 87));
doc.add(new NumericDocValuesField(SeqNoFieldMapper.NAME, randomNonNegativeLong()));
doc.add(new NumericDocValuesField(SeqNoFieldMapper.PRIMARY_TERM_NAME, randomLongBetween(1, Long.MAX_VALUE)));
writer.addDocument(doc);
DirectoryReader reader = DirectoryReader.open(writer);
assertEquals(87, loadDocIdAndVersion(reader, new Term(IdFieldMapper.NAME, "6"), randomBoolean()).version);
Expand Down
Expand Up @@ -4027,7 +4027,6 @@ public void testSequenceIDs() throws Exception {
searchResult.close();
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/42979")
public void testLookupSeqNoByIdInLucene() throws Exception {
int numOps = between(10, 100);
long seqNo = 0;
Expand Down Expand Up @@ -4062,20 +4061,23 @@ public void testLookupSeqNoByIdInLucene() throws Exception {
InternalEngine engine = createEngine(config(indexSettings, store, createTempDir(), newMergePolicy(), null))) {
CheckedRunnable<IOException> lookupAndCheck = () -> {
try (Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
for (String id : latestOps.keySet()) {
String msg = "latestOps=" + latestOps + " op=" + id;
DocIdAndSeqNo docIdAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.reader(), newUid(id));
assertThat(msg, docIdAndSeqNo.seqNo, equalTo(latestOps.get(id).seqNo()));
assertThat(msg, docIdAndSeqNo.isLive,
equalTo(latestOps.get(id).operationType() == Engine.Operation.TYPE.INDEX));
}
assertThat(VersionsAndSeqNoResolver.loadDocIdAndVersion(
searcher.reader(), newUid("any-" + between(1, 10)), randomBoolean()), nullValue());
Map<String, Long> liveOps = latestOps.entrySet().stream()
.filter(e -> e.getValue().operationType() == Engine.Operation.TYPE.INDEX)
.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().seqNo()));
assertThat(getDocIds(engine, true).stream().collect(Collectors.toMap(e -> e.getId(), e -> e.getSeqNo())),
equalTo(liveOps));
for (String id : latestOps.keySet()) {
String msg = "latestOps=" + latestOps + " op=" + id;
DocIdAndSeqNo docIdAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.reader(), newUid(id));
if (liveOps.containsKey(id) == false) {
assertNull(msg, docIdAndSeqNo);
} else {
assertNotNull(msg, docIdAndSeqNo);
assertThat(msg, docIdAndSeqNo.seqNo, equalTo(latestOps.get(id).seqNo()));
}
}
String notFoundId = randomValueOtherThanMany(liveOps::containsKey, () -> Long.toString(randomNonNegativeLong()));
assertNull(VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.reader(), newUid(notFoundId)));
}
};
for (Engine.Operation op : operations) {
Expand Down

0 comments on commit f47174f

Please sign in to comment.