From 8f4e4a44f8d0eb78cfb2498a48c7ac6231c6ff3b Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 7 Dec 2018 22:17:26 +0100 Subject: [PATCH] LUCENE-8608: Extract utility class to iterate over terms docs Today we re-implement the same algorithm in various places when we want to consume all docs for a set/list of terms. This caused serious slowdowns for instance in the case of applying updates fixed in LUCENE-8602. This change extracts the common usage and shares the interation code including logic to reuse Terms and PostingsEnum instances as much as possble and adds tests for it. --- .../apache/lucene/index/BufferedUpdates.java | 6 +- .../lucene/index/FreqProxTermsWriter.java | 41 ++-- .../lucene/index/FrozenBufferedUpdates.java | 216 ++++++++++-------- .../org/apache/lucene/util/BytesRefArray.java | 3 +- .../apache/lucene/util/BytesRefIterator.java | 10 +- .../index/TestFrozenBufferedUpdates.java | 104 +++++++++ 6 files changed, 250 insertions(+), 130 deletions(-) create mode 100644 lucene/core/src/test/org/apache/lucene/index/TestFrozenBufferedUpdates.java diff --git a/lucene/core/src/java/org/apache/lucene/index/BufferedUpdates.java b/lucene/core/src/java/org/apache/lucene/index/BufferedUpdates.java index 04b19b779be..d8317f8041e 100644 --- a/lucene/core/src/java/org/apache/lucene/index/BufferedUpdates.java +++ b/lucene/core/src/java/org/apache/lucene/index/BufferedUpdates.java @@ -69,11 +69,11 @@ load factor (say 2 * POINTER). Entry is object w/ final AtomicInteger numTermDeletes = new AtomicInteger(); final AtomicInteger numFieldUpdates = new AtomicInteger(); - final Map deleteTerms = new HashMap<>(); + final Map deleteTerms = new HashMap<>(); // TODO cut this over to FieldUpdatesBuffer final Map deleteQueries = new HashMap<>(); final List deleteDocIDs = new ArrayList<>(); - final Map fieldUpdates = new HashMap<>(); + final Map fieldUpdates = new HashMap<>(); public static final Integer MAX_INT = Integer.valueOf(Integer.MAX_VALUE); @@ -109,7 +109,7 @@ public String toString() { s += " " + deleteDocIDs.size() + " deleted docIDs"; } if (numFieldUpdates.get() != 0) { - s += " " + numFieldUpdates.get() + " field updates (unique count=" + fieldUpdates.size() + ")"; + s += " " + numFieldUpdates.get() + " field updates"; } if (bytesUsed.get() != 0) { s += " bytesUsed=" + bytesUsed.get(); diff --git a/lucene/core/src/java/org/apache/lucene/index/FreqProxTermsWriter.java b/lucene/core/src/java/org/apache/lucene/index/FreqProxTermsWriter.java index e7b6f898a2e..b57f5edf85a 100644 --- a/lucene/core/src/java/org/apache/lucene/index/FreqProxTermsWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/FreqProxTermsWriter.java @@ -24,6 +24,7 @@ import java.util.Map; import org.apache.lucene.codecs.FieldsConsumer; +import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.util.CollectionUtil; import org.apache.lucene.util.FixedBitSet; import org.apache.lucene.util.IOUtils; @@ -41,37 +42,21 @@ private void applyDeletes(SegmentWriteState state, Fields fields) throws IOExcep Map segDeletes = state.segUpdates.deleteTerms; List deleteTerms = new ArrayList<>(segDeletes.keySet()); Collections.sort(deleteTerms); - String lastField = null; - TermsEnum termsEnum = null; - PostingsEnum postingsEnum = null; + FrozenBufferedUpdates.TermDocsIterator iterator = new FrozenBufferedUpdates.TermDocsIterator(fields, true); for(Term deleteTerm : deleteTerms) { - if (deleteTerm.field().equals(lastField) == false) { - lastField = deleteTerm.field(); - Terms terms = fields.terms(lastField); - if (terms != null) { - termsEnum = terms.iterator(); - } else { - termsEnum = null; - } - } - - if (termsEnum != null && termsEnum.seekExact(deleteTerm.bytes())) { - postingsEnum = termsEnum.postings(postingsEnum, 0); + DocIdSetIterator postings = iterator.nextTerm(deleteTerm.field(), deleteTerm.bytes()); + if (postings != null ) { int delDocLimit = segDeletes.get(deleteTerm); assert delDocLimit < PostingsEnum.NO_MORE_DOCS; - while (true) { - int doc = postingsEnum.nextDoc(); - if (doc < delDocLimit) { - if (state.liveDocs == null) { - state.liveDocs = new FixedBitSet(state.segmentInfo.maxDoc()); - state.liveDocs.set(0, state.segmentInfo.maxDoc()); - } - if (state.liveDocs.get(doc)) { - state.delCountOnFlush++; - state.liveDocs.clear(doc); - } - } else { - break; + int doc; + while ((doc = postings.nextDoc()) < delDocLimit) { + if (state.liveDocs == null) { + state.liveDocs = new FixedBitSet(state.segmentInfo.maxDoc()); + state.liveDocs.set(0, state.segmentInfo.maxDoc()); + } + if (state.liveDocs.get(doc)) { + state.delCountOnFlush++; + state.liveDocs.clear(doc); } } } diff --git a/lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java b/lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java index 9477612d1cb..c930a0b00d7 100644 --- a/lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java +++ b/lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java @@ -85,7 +85,7 @@ final class FrozenBufferedUpdates { // only have Queries and doc values updates private final InfoStream infoStream; - public FrozenBufferedUpdates(InfoStream infoStream, BufferedUpdates updates, SegmentCommitInfo privateSegment) throws IOException { + public FrozenBufferedUpdates(InfoStream infoStream, BufferedUpdates updates, SegmentCommitInfo privateSegment) { this.infoStream = infoStream; this.privateSegment = privateSegment; assert updates.deleteDocIDs.isEmpty(); @@ -467,9 +467,6 @@ private static long applyDocValuesUpdates(BufferedUpdatesStream.SegmentState seg long delGen, boolean segmentPrivateDeletes) throws IOException { - TermsEnum termsEnum = null; - PostingsEnum postingsEnum = null; - // TODO: we can process the updates per DV field, from last to first so that // if multiple terms affect same document for the same field, we add an update // only once (that of the last term). To do that, we can keep a bitset which @@ -491,26 +488,8 @@ private static long applyDocValuesUpdates(BufferedUpdatesStream.SegmentState seg boolean isNumeric = value.isNumeric(); FieldUpdatesBuffer.BufferedUpdateIterator iterator = value.iterator(); FieldUpdatesBuffer.BufferedUpdate bufferedUpdate; - String previousField = null; + TermDocsIterator termDocsIterator = new TermDocsIterator(segState.reader, false); while ((bufferedUpdate = iterator.next()) != null) { - if (previousField == null || previousField.equals(bufferedUpdate.termField) == false) { - previousField = bufferedUpdate.termField; - Terms terms = segState.reader.terms(previousField); - termsEnum = terms == null ? null : terms.iterator(); - } - if (termsEnum == null) { - // no terms in this segment for this field - continue; - } - - final int limit; - if (delGen == segState.delGen) { - assert segmentPrivateDeletes; - limit = bufferedUpdate.docUpTo; - } else { - limit = Integer.MAX_VALUE; - } - // TODO: we traverse the terms in update order (not term order) so that we // apply the updates in the correct order, i.e. if two terms update the // same document, the last one that came in wins, irrespective of the @@ -520,23 +499,26 @@ private static long applyDocValuesUpdates(BufferedUpdatesStream.SegmentState seg // that we cannot rely only on docIDUpto because an app may send two updates // which will get same docIDUpto, yet will still need to respect the order // those updates arrived. - // TODO: we could at least *collate* by field? - - - final BytesRef binaryValue; - final long longValue; - if (bufferedUpdate.hasValue == false) { - longValue = -1; - binaryValue = null; - } else { - longValue = bufferedUpdate.numericValue; - binaryValue = bufferedUpdate.binaryValue; - } - - if (termsEnum.seekExact(bufferedUpdate.termValue)) { - // we don't need term frequencies for this - postingsEnum = termsEnum.postings(postingsEnum, PostingsEnum.NONE); + final DocIdSetIterator docIdSetIterator = termDocsIterator.nextTerm(bufferedUpdate.termField, bufferedUpdate.termValue); + if (docIdSetIterator != null) { + final int limit; + if (delGen == segState.delGen) { + assert segmentPrivateDeletes; + limit = bufferedUpdate.docUpTo; + } else { + limit = Integer.MAX_VALUE; + } + final BytesRef binaryValue; + final long longValue; + if (bufferedUpdate.hasValue == false) { + longValue = -1; + binaryValue = null; + } else { + longValue = bufferedUpdate.numericValue; + binaryValue = bufferedUpdate.binaryValue; + } + termDocsIterator.getDocs(); if (dvUpdates == null) { if (isNumeric) { if (value.hasSingleValue()) { @@ -565,7 +547,7 @@ private static long applyDocValuesUpdates(BufferedUpdatesStream.SegmentState seg if (segState.rld.sortMap != null && segmentPrivateDeletes) { // This segment was sorted on flush; we must apply seg-private deletes carefully in this case: int doc; - while ((doc = postingsEnum.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { + while ((doc = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { if (acceptDocs == null || acceptDocs.get(doc)) { // The limit is in the pre-sorted doc space: if (segState.rld.sortMap.newToOld(doc) < limit) { @@ -576,7 +558,7 @@ private static long applyDocValuesUpdates(BufferedUpdatesStream.SegmentState seg } } else { int doc; - while ((doc = postingsEnum.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { + while ((doc = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { if (doc >= limit) { break; // no more docs that can be updated for this term } @@ -705,57 +687,13 @@ private long applyTermDeletes(BufferedUpdatesStream.SegmentState[] segStates) th } FieldTermIterator iter = deleteTerms.iterator(); - BytesRef delTerm; - String field = null; - TermsEnum termsEnum = null; - BytesRef readerTerm = null; - PostingsEnum postingsEnum = null; + TermDocsIterator termDocsIterator = new TermDocsIterator(segState.reader, true); while ((delTerm = iter.next()) != null) { - - if (iter.field() != field) { - // field changed - field = iter.field(); - Terms terms = segState.reader.terms(field); - if (terms != null) { - termsEnum = terms.iterator(); - readerTerm = termsEnum.next(); - } else { - termsEnum = null; - } - } - - if (termsEnum != null) { - int cmp = delTerm.compareTo(readerTerm); - if (cmp < 0) { - // TODO: can we advance across del terms here? - // move to next del term - continue; - } else if (cmp == 0) { - // fall through - } else if (cmp > 0) { - TermsEnum.SeekStatus status = termsEnum.seekCeil(delTerm); - if (status == TermsEnum.SeekStatus.FOUND) { - // fall through - } else if (status == TermsEnum.SeekStatus.NOT_FOUND) { - readerTerm = termsEnum.term(); - continue; - } else { - // TODO: can we advance to next field in deleted terms? - // no more terms in this segment - termsEnum = null; - continue; - } - } - - // we don't need term frequencies for this - postingsEnum = termsEnum.postings(postingsEnum, PostingsEnum.NONE); - - assert postingsEnum != null; - + final DocIdSetIterator iterator = termDocsIterator.nextTerm(iter.field(), delTerm); + if (iterator != null) { int docID; - while ((docID = postingsEnum.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { - + while ((docID = iterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { // NOTE: there is no limit check on the docID // when deleting by Term (unlike by Query) // because on flush we apply all Term deletes to @@ -820,4 +758,104 @@ public String toString() { boolean any() { return deleteTerms.size() > 0 || deleteQueries.length > 0 || fieldUpdatesCount > 0 ; } + + /** + * This class helps iterating a term dictionary and consuming all the docs for each terms. + * It accepts a field, value tuple and returns a {@link DocIdSetIterator} if the field has an entry + * for the given value. It has an optimized way of iterating the term dictionary if the terms are + * passed in sorted order and makes sure terms and postings are reused as much as possible. + */ + static final class TermDocsIterator { + private final TermsProvider provider; + private String field; + private TermsEnum termsEnum; + private PostingsEnum postingsEnum; + private final boolean sortedTerms; + private BytesRef readerTerm; + private BytesRef lastTerm; // only set with asserts + + @FunctionalInterface + interface TermsProvider { + Terms terms(String field) throws IOException; + } + + TermDocsIterator(Fields fields, boolean sortedTerms) { + this(fields::terms, sortedTerms); + } + + TermDocsIterator(LeafReader reader, boolean sortedTerms) { + this(reader::terms, sortedTerms); + } + + private TermDocsIterator(TermsProvider provider, boolean sortedTerms) { + this.sortedTerms = sortedTerms; + this.provider = provider; + } + + private void setField(String field) throws IOException { + if (this.field == null || this.field.equals(field) == false) { + this.field = field; + + Terms terms = provider.terms(field); + if (terms != null) { + termsEnum = terms.iterator(); + if (sortedTerms) { + assert (lastTerm = null) == null; // need to reset otherwise we fail the assertSorted below since we sort per field + readerTerm = termsEnum.next(); + } + } else { + termsEnum = null; + } + } + } + + DocIdSetIterator nextTerm(String field, BytesRef term) throws IOException { + setField(field); + if (termsEnum != null) { + if (sortedTerms) { + assert assertSorted(term); + // in the sorted case we can take advantage of the "seeking forward" property + // this allows us depending on the term dict impl to reuse data-structures internally + // which speed up iteration over terms and docs significantly. + int cmp = term.compareTo(readerTerm); + if (cmp < 0) { + return null; // requested term does not exist in this segment + } else if (cmp == 0) { + return getDocs(); + } else if (cmp > 0) { + TermsEnum.SeekStatus status = termsEnum.seekCeil(term); + switch (status) { + case FOUND: + return getDocs(); + case NOT_FOUND: + readerTerm = termsEnum.term(); + return null; + case END: + // no more terms in this segment + termsEnum = null; + return null; + default: + throw new AssertionError("unknown status"); + } + } + } else if (termsEnum.seekExact(term)) { + return getDocs(); + } + } + return null; + } + + private boolean assertSorted(BytesRef term) { + assert sortedTerms; + assert lastTerm == null || term.compareTo(lastTerm) >= 0 : "boom: " + term.utf8ToString() + " last: " + lastTerm.utf8ToString(); + lastTerm = BytesRef.deepCopyOf(term); + return true; + } + + private DocIdSetIterator getDocs() throws IOException { + assert termsEnum != null; + return postingsEnum = termsEnum.postings(postingsEnum, PostingsEnum.NONE); + } + } + } diff --git a/lucene/core/src/java/org/apache/lucene/util/BytesRefArray.java b/lucene/core/src/java/org/apache/lucene/util/BytesRefArray.java index 00222f48b73..f169d1a99d6 100644 --- a/lucene/core/src/java/org/apache/lucene/util/BytesRefArray.java +++ b/lucene/core/src/java/org/apache/lucene/util/BytesRefArray.java @@ -35,7 +35,7 @@ public final class BytesRefArray implements SortableBytesRefArray { private int lastElement = 0; private int currentOffset = 0; private final Counter bytesUsed; - + /** * Creates a new {@link BytesRefArray} with a counter to track allocated bytes */ @@ -190,7 +190,6 @@ public BytesRefIterator iterator(final Comparator comp) { final int[] indices = comp == null ? null : sort(comp); return new BytesRefIterator() { int pos = 0; - @Override public BytesRef next() { if (pos < size) { diff --git a/lucene/core/src/java/org/apache/lucene/util/BytesRefIterator.java b/lucene/core/src/java/org/apache/lucene/util/BytesRefIterator.java index fa7bb453c79..b1bcefb19ba 100644 --- a/lucene/core/src/java/org/apache/lucene/util/BytesRefIterator.java +++ b/lucene/core/src/java/org/apache/lucene/util/BytesRefIterator.java @@ -35,14 +35,8 @@ public interface BytesRefIterator { * the end of the iterator is reached. * @throws IOException If there is a low-level I/O error. */ - public BytesRef next() throws IOException; + BytesRef next() throws IOException; /** Singleton BytesRefIterator that iterates over 0 BytesRefs. */ - public static final BytesRefIterator EMPTY = new BytesRefIterator() { - - @Override - public BytesRef next() { - return null; - } - }; + BytesRefIterator EMPTY = () -> null; } diff --git a/lucene/core/src/test/org/apache/lucene/index/TestFrozenBufferedUpdates.java b/lucene/core/src/test/org/apache/lucene/index/TestFrozenBufferedUpdates.java new file mode 100644 index 00000000000..e8e50fa997f --- /dev/null +++ b/lucene/core/src/test/org/apache/lucene/index/TestFrozenBufferedUpdates.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.lucene.index; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import com.carrotsearch.randomizedtesting.generators.RandomPicks; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.StringField; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.BytesRefArray; +import org.apache.lucene.util.BytesRefIterator; +import org.apache.lucene.util.Counter; +import org.apache.lucene.util.FixedBitSet; +import org.apache.lucene.util.LuceneTestCase; +import org.apache.lucene.index.FrozenBufferedUpdates.TermDocsIterator; +import org.apache.lucene.util.TestUtil; + +public class TestFrozenBufferedUpdates extends LuceneTestCase { + + public void testTermDocsIterator() throws IOException { + for (int j = 0; j < 5; j++) { + try (Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig())) { + boolean duplicates = random().nextBoolean(); + boolean nonMatches = random().nextBoolean(); + BytesRefArray array = new BytesRefArray(Counter.newCounter()); + int numDocs = 10 + random().nextInt(1000); + Set randomIds = new HashSet<>(); + for (int i = 0; i < numDocs; i++) { + BytesRef id; + do { + id = new BytesRef(TestUtil.randomRealisticUnicodeString(random())); + } while (randomIds.add(id) == false); + } + List asList = new ArrayList<>(randomIds); + for (BytesRef ref : randomIds) { + Document doc = new Document(); + doc.add(new StringField("field", ref, Field.Store.NO)); + array.append(ref); + if (duplicates && rarely()) { + array.append(RandomPicks.randomFrom(random(), asList)); + } + if (nonMatches && rarely()) { + BytesRef id; + do { + id = new BytesRef(TestUtil.randomRealisticUnicodeString(random())); + } while (randomIds.contains(id)); + array.append(id); + } + writer.addDocument(doc); + } + writer.forceMerge(1); + writer.commit(); + try (DirectoryReader reader = DirectoryReader.open(dir)) { + boolean sorted = random().nextBoolean(); + BytesRefIterator values = sorted ? array.iterator(Comparator.naturalOrder()) : array.iterator(); + assertEquals(1, reader.leaves().size()); + TermDocsIterator iterator = new TermDocsIterator(reader.leaves().get(0).reader(), sorted); + FixedBitSet bitSet = new FixedBitSet(reader.maxDoc()); + BytesRef ref; + while ((ref = values.next()) != null) { + DocIdSetIterator docIdSetIterator = iterator.nextTerm("field", ref); + if (nonMatches == false) { + assertNotNull(docIdSetIterator); + } + if (docIdSetIterator != null) { + int doc; + while ((doc = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { + if (duplicates == false) { + assertFalse(bitSet.get(doc)); + } + bitSet.set(doc); + } + } + } + assertEquals(reader.maxDoc(), bitSet.cardinality()); + } + } + } + } +}