Skip to content

Commit

Permalink
LUCENE-8608: Extract utility class to iterate over terms docs
Browse files Browse the repository at this point in the history
Today we re-implement the same algorithm in various places
when we want to consume all docs for a set/list of terms. This
caused serious slowdowns for instance in the case of applying
updates fixed in LUCENE-8602. This change extracts the common
usage and shares the interation code including logic to reuse
Terms and PostingsEnum instances as much as possble and adds
tests for it.
  • Loading branch information
s1monw committed Dec 13, 2018
1 parent b072a7d commit 8f4e4a4
Show file tree
Hide file tree
Showing 6 changed files with 250 additions and 130 deletions.
Expand Up @@ -69,11 +69,11 @@ load factor (say 2 * POINTER). Entry is object w/
final AtomicInteger numTermDeletes = new AtomicInteger();
final AtomicInteger numFieldUpdates = new AtomicInteger();

final Map<Term,Integer> deleteTerms = new HashMap<>();
final Map<Term,Integer> deleteTerms = new HashMap<>(); // TODO cut this over to FieldUpdatesBuffer
final Map<Query,Integer> deleteQueries = new HashMap<>();
final List<Integer> deleteDocIDs = new ArrayList<>();

final Map<String,FieldUpdatesBuffer> fieldUpdates = new HashMap<>();
final Map<String, FieldUpdatesBuffer> fieldUpdates = new HashMap<>();


public static final Integer MAX_INT = Integer.valueOf(Integer.MAX_VALUE);
Expand Down Expand Up @@ -109,7 +109,7 @@ public String toString() {
s += " " + deleteDocIDs.size() + " deleted docIDs";
}
if (numFieldUpdates.get() != 0) {
s += " " + numFieldUpdates.get() + " field updates (unique count=" + fieldUpdates.size() + ")";
s += " " + numFieldUpdates.get() + " field updates";
}
if (bytesUsed.get() != 0) {
s += " bytesUsed=" + bytesUsed.get();
Expand Down
Expand Up @@ -24,6 +24,7 @@
import java.util.Map;

import org.apache.lucene.codecs.FieldsConsumer;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.util.CollectionUtil;
import org.apache.lucene.util.FixedBitSet;
import org.apache.lucene.util.IOUtils;
Expand All @@ -41,37 +42,21 @@ private void applyDeletes(SegmentWriteState state, Fields fields) throws IOExcep
Map<Term,Integer> segDeletes = state.segUpdates.deleteTerms;
List<Term> deleteTerms = new ArrayList<>(segDeletes.keySet());
Collections.sort(deleteTerms);
String lastField = null;
TermsEnum termsEnum = null;
PostingsEnum postingsEnum = null;
FrozenBufferedUpdates.TermDocsIterator iterator = new FrozenBufferedUpdates.TermDocsIterator(fields, true);
for(Term deleteTerm : deleteTerms) {
if (deleteTerm.field().equals(lastField) == false) {
lastField = deleteTerm.field();
Terms terms = fields.terms(lastField);
if (terms != null) {
termsEnum = terms.iterator();
} else {
termsEnum = null;
}
}

if (termsEnum != null && termsEnum.seekExact(deleteTerm.bytes())) {
postingsEnum = termsEnum.postings(postingsEnum, 0);
DocIdSetIterator postings = iterator.nextTerm(deleteTerm.field(), deleteTerm.bytes());
if (postings != null ) {
int delDocLimit = segDeletes.get(deleteTerm);
assert delDocLimit < PostingsEnum.NO_MORE_DOCS;
while (true) {
int doc = postingsEnum.nextDoc();
if (doc < delDocLimit) {
if (state.liveDocs == null) {
state.liveDocs = new FixedBitSet(state.segmentInfo.maxDoc());
state.liveDocs.set(0, state.segmentInfo.maxDoc());
}
if (state.liveDocs.get(doc)) {
state.delCountOnFlush++;
state.liveDocs.clear(doc);
}
} else {
break;
int doc;
while ((doc = postings.nextDoc()) < delDocLimit) {
if (state.liveDocs == null) {
state.liveDocs = new FixedBitSet(state.segmentInfo.maxDoc());
state.liveDocs.set(0, state.segmentInfo.maxDoc());
}
if (state.liveDocs.get(doc)) {
state.delCountOnFlush++;
state.liveDocs.clear(doc);
}
}
}
Expand Down
216 changes: 127 additions & 89 deletions lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java
Expand Up @@ -85,7 +85,7 @@ final class FrozenBufferedUpdates {
// only have Queries and doc values updates
private final InfoStream infoStream;

public FrozenBufferedUpdates(InfoStream infoStream, BufferedUpdates updates, SegmentCommitInfo privateSegment) throws IOException {
public FrozenBufferedUpdates(InfoStream infoStream, BufferedUpdates updates, SegmentCommitInfo privateSegment) {
this.infoStream = infoStream;
this.privateSegment = privateSegment;
assert updates.deleteDocIDs.isEmpty();
Expand Down Expand Up @@ -467,9 +467,6 @@ private static long applyDocValuesUpdates(BufferedUpdatesStream.SegmentState seg
long delGen,
boolean segmentPrivateDeletes) throws IOException {

TermsEnum termsEnum = null;
PostingsEnum postingsEnum = null;

// TODO: we can process the updates per DV field, from last to first so that
// if multiple terms affect same document for the same field, we add an update
// only once (that of the last term). To do that, we can keep a bitset which
Expand All @@ -491,26 +488,8 @@ private static long applyDocValuesUpdates(BufferedUpdatesStream.SegmentState seg
boolean isNumeric = value.isNumeric();
FieldUpdatesBuffer.BufferedUpdateIterator iterator = value.iterator();
FieldUpdatesBuffer.BufferedUpdate bufferedUpdate;
String previousField = null;
TermDocsIterator termDocsIterator = new TermDocsIterator(segState.reader, false);
while ((bufferedUpdate = iterator.next()) != null) {
if (previousField == null || previousField.equals(bufferedUpdate.termField) == false) {
previousField = bufferedUpdate.termField;
Terms terms = segState.reader.terms(previousField);
termsEnum = terms == null ? null : terms.iterator();
}
if (termsEnum == null) {
// no terms in this segment for this field
continue;
}

final int limit;
if (delGen == segState.delGen) {
assert segmentPrivateDeletes;
limit = bufferedUpdate.docUpTo;
} else {
limit = Integer.MAX_VALUE;
}

// TODO: we traverse the terms in update order (not term order) so that we
// apply the updates in the correct order, i.e. if two terms update the
// same document, the last one that came in wins, irrespective of the
Expand All @@ -520,23 +499,26 @@ private static long applyDocValuesUpdates(BufferedUpdatesStream.SegmentState seg
// that we cannot rely only on docIDUpto because an app may send two updates
// which will get same docIDUpto, yet will still need to respect the order
// those updates arrived.

// TODO: we could at least *collate* by field?


final BytesRef binaryValue;
final long longValue;
if (bufferedUpdate.hasValue == false) {
longValue = -1;
binaryValue = null;
} else {
longValue = bufferedUpdate.numericValue;
binaryValue = bufferedUpdate.binaryValue;
}

if (termsEnum.seekExact(bufferedUpdate.termValue)) {
// we don't need term frequencies for this
postingsEnum = termsEnum.postings(postingsEnum, PostingsEnum.NONE);
final DocIdSetIterator docIdSetIterator = termDocsIterator.nextTerm(bufferedUpdate.termField, bufferedUpdate.termValue);
if (docIdSetIterator != null) {
final int limit;
if (delGen == segState.delGen) {
assert segmentPrivateDeletes;
limit = bufferedUpdate.docUpTo;
} else {
limit = Integer.MAX_VALUE;
}
final BytesRef binaryValue;
final long longValue;
if (bufferedUpdate.hasValue == false) {
longValue = -1;
binaryValue = null;
} else {
longValue = bufferedUpdate.numericValue;
binaryValue = bufferedUpdate.binaryValue;
}
termDocsIterator.getDocs();
if (dvUpdates == null) {
if (isNumeric) {
if (value.hasSingleValue()) {
Expand Down Expand Up @@ -565,7 +547,7 @@ private static long applyDocValuesUpdates(BufferedUpdatesStream.SegmentState seg
if (segState.rld.sortMap != null && segmentPrivateDeletes) {
// This segment was sorted on flush; we must apply seg-private deletes carefully in this case:
int doc;
while ((doc = postingsEnum.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
while ((doc = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
if (acceptDocs == null || acceptDocs.get(doc)) {
// The limit is in the pre-sorted doc space:
if (segState.rld.sortMap.newToOld(doc) < limit) {
Expand All @@ -576,7 +558,7 @@ private static long applyDocValuesUpdates(BufferedUpdatesStream.SegmentState seg
}
} else {
int doc;
while ((doc = postingsEnum.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
while ((doc = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
if (doc >= limit) {
break; // no more docs that can be updated for this term
}
Expand Down Expand Up @@ -705,57 +687,13 @@ private long applyTermDeletes(BufferedUpdatesStream.SegmentState[] segStates) th
}

FieldTermIterator iter = deleteTerms.iterator();

BytesRef delTerm;
String field = null;
TermsEnum termsEnum = null;
BytesRef readerTerm = null;
PostingsEnum postingsEnum = null;
TermDocsIterator termDocsIterator = new TermDocsIterator(segState.reader, true);
while ((delTerm = iter.next()) != null) {

if (iter.field() != field) {
// field changed
field = iter.field();
Terms terms = segState.reader.terms(field);
if (terms != null) {
termsEnum = terms.iterator();
readerTerm = termsEnum.next();
} else {
termsEnum = null;
}
}

if (termsEnum != null) {
int cmp = delTerm.compareTo(readerTerm);
if (cmp < 0) {
// TODO: can we advance across del terms here?
// move to next del term
continue;
} else if (cmp == 0) {
// fall through
} else if (cmp > 0) {
TermsEnum.SeekStatus status = termsEnum.seekCeil(delTerm);
if (status == TermsEnum.SeekStatus.FOUND) {
// fall through
} else if (status == TermsEnum.SeekStatus.NOT_FOUND) {
readerTerm = termsEnum.term();
continue;
} else {
// TODO: can we advance to next field in deleted terms?
// no more terms in this segment
termsEnum = null;
continue;
}
}

// we don't need term frequencies for this
postingsEnum = termsEnum.postings(postingsEnum, PostingsEnum.NONE);

assert postingsEnum != null;

final DocIdSetIterator iterator = termDocsIterator.nextTerm(iter.field(), delTerm);
if (iterator != null) {
int docID;
while ((docID = postingsEnum.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {

while ((docID = iterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
// NOTE: there is no limit check on the docID
// when deleting by Term (unlike by Query)
// because on flush we apply all Term deletes to
Expand Down Expand Up @@ -820,4 +758,104 @@ public String toString() {
boolean any() {
return deleteTerms.size() > 0 || deleteQueries.length > 0 || fieldUpdatesCount > 0 ;
}

/**
* This class helps iterating a term dictionary and consuming all the docs for each terms.
* It accepts a field, value tuple and returns a {@link DocIdSetIterator} if the field has an entry
* for the given value. It has an optimized way of iterating the term dictionary if the terms are
* passed in sorted order and makes sure terms and postings are reused as much as possible.
*/
static final class TermDocsIterator {
private final TermsProvider provider;
private String field;
private TermsEnum termsEnum;
private PostingsEnum postingsEnum;
private final boolean sortedTerms;
private BytesRef readerTerm;
private BytesRef lastTerm; // only set with asserts

@FunctionalInterface
interface TermsProvider {
Terms terms(String field) throws IOException;
}

TermDocsIterator(Fields fields, boolean sortedTerms) {
this(fields::terms, sortedTerms);
}

TermDocsIterator(LeafReader reader, boolean sortedTerms) {
this(reader::terms, sortedTerms);
}

private TermDocsIterator(TermsProvider provider, boolean sortedTerms) {
this.sortedTerms = sortedTerms;
this.provider = provider;
}

private void setField(String field) throws IOException {
if (this.field == null || this.field.equals(field) == false) {
this.field = field;

Terms terms = provider.terms(field);
if (terms != null) {
termsEnum = terms.iterator();
if (sortedTerms) {
assert (lastTerm = null) == null; // need to reset otherwise we fail the assertSorted below since we sort per field
readerTerm = termsEnum.next();
}
} else {
termsEnum = null;
}
}
}

DocIdSetIterator nextTerm(String field, BytesRef term) throws IOException {
setField(field);
if (termsEnum != null) {
if (sortedTerms) {
assert assertSorted(term);
// in the sorted case we can take advantage of the "seeking forward" property
// this allows us depending on the term dict impl to reuse data-structures internally
// which speed up iteration over terms and docs significantly.
int cmp = term.compareTo(readerTerm);
if (cmp < 0) {
return null; // requested term does not exist in this segment
} else if (cmp == 0) {
return getDocs();
} else if (cmp > 0) {
TermsEnum.SeekStatus status = termsEnum.seekCeil(term);
switch (status) {
case FOUND:
return getDocs();
case NOT_FOUND:
readerTerm = termsEnum.term();
return null;
case END:
// no more terms in this segment
termsEnum = null;
return null;
default:
throw new AssertionError("unknown status");
}
}
} else if (termsEnum.seekExact(term)) {
return getDocs();
}
}
return null;
}

private boolean assertSorted(BytesRef term) {
assert sortedTerms;
assert lastTerm == null || term.compareTo(lastTerm) >= 0 : "boom: " + term.utf8ToString() + " last: " + lastTerm.utf8ToString();
lastTerm = BytesRef.deepCopyOf(term);
return true;
}

private DocIdSetIterator getDocs() throws IOException {
assert termsEnum != null;
return postingsEnum = termsEnum.postings(postingsEnum, PostingsEnum.NONE);
}
}

}
Expand Up @@ -35,7 +35,7 @@ public final class BytesRefArray implements SortableBytesRefArray {
private int lastElement = 0;
private int currentOffset = 0;
private final Counter bytesUsed;

/**
* Creates a new {@link BytesRefArray} with a counter to track allocated bytes
*/
Expand Down Expand Up @@ -190,7 +190,6 @@ public BytesRefIterator iterator(final Comparator<BytesRef> comp) {
final int[] indices = comp == null ? null : sort(comp);
return new BytesRefIterator() {
int pos = 0;

@Override
public BytesRef next() {
if (pos < size) {
Expand Down
Expand Up @@ -35,14 +35,8 @@ public interface BytesRefIterator {
* the end of the iterator is reached.
* @throws IOException If there is a low-level I/O error.
*/
public BytesRef next() throws IOException;
BytesRef next() throws IOException;

/** Singleton BytesRefIterator that iterates over 0 BytesRefs. */
public static final BytesRefIterator EMPTY = new BytesRefIterator() {

@Override
public BytesRef next() {
return null;
}
};
BytesRefIterator EMPTY = () -> null;
}

0 comments on commit 8f4e4a4

Please sign in to comment.