Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lucene/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,8 @@ API Changes
* GITHUB#13820, GITHUB#13825, GITHUB#13830: Corrects DataInput.readGroupVInts to be public and not-final, removes the protected
DataInput.readGroupVInt method. (Zhang Chao, Robert Muir, Uwe Schindler, Dawid Weiss)

* GITHUB#15376, GITHUB#15197: Added prefetching in bkd tree traversal, couple of new api in PointValues visitDocIDs from a position and prepareOrVisitDocIDs to prefetch the IO before visiting docIds (Saurabh Singh)

New Features
---------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3194,7 +3194,7 @@ private static void checkByteVectorValues(
*
* @lucene.internal
*/
public static class VerifyPointsVisitor implements PointValues.IntersectVisitor {
public static class VerifyPointsVisitor implements IntersectVisitor {
private long pointCountSeen;
private int lastDocID = -1;
private final FixedBitSet docsSeen;
Expand Down
6 changes: 4 additions & 2 deletions lucene/core/src/java/org/apache/lucene/index/PointValues.java
Original file line number Diff line number Diff line change
Expand Up @@ -345,13 +345,15 @@ default void grow(int count) {}
* Finds all documents and points matching the provided visitor. This method does not enforce live
* documents, so it's up to the caller to test whether each document is deleted, if necessary.
*/
public final void intersect(IntersectVisitor visitor) throws IOException {
public void intersect(IntersectVisitor visitor) throws IOException {
final PointTree pointTree = getPointTree();
intersect(visitor, pointTree);
assert pointTree.moveToParent() == false;
}

private static void intersect(IntersectVisitor visitor, PointTree pointTree) throws IOException {
/** Finds all documents and points matching the provided visitor for the provided point tree. */
protected static void intersect(IntersectVisitor visitor, PointTree pointTree)
throws IOException {
while (true) {
Relation compare =
visitor.compare(pointTree.getMinPackedValue(), pointTree.getMaxPackedValue());
Expand Down
180 changes: 180 additions & 0 deletions lucene/core/src/java/org/apache/lucene/util/bkd/BKDReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
package org.apache.lucene.util.bkd;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.PointValues;
Expand Down Expand Up @@ -589,6 +591,65 @@ public void visitDocIDs(PointValues.IntersectVisitor visitor) throws IOException
addAll(visitor, false);
}

/** prefetch DocIds below current node */
public void prefetchDocIDs(TwoPhaseIntersectVisitor visitor) throws IOException {
resetNodeDataPosition();
prefetchAll(visitor, false);
}

/** visit Doc Ids for a leafNode at provided input position */
public void visitDocIDs(long position, IntersectVisitor visitor) throws IOException {
visitDocIDs(position, visitor, false);
}

private void visitDocIDs(long position, IntersectVisitor visitor, boolean grown)
throws IOException {
leafNodes.seek(position);
int count = leafNodes.readVInt();
if (!grown) {
visitor.grow(count);
}
docIdsWriter.readInts(leafNodes, count, visitor, scratchIterator.docIDs);
}

private int getLeafNodeOrdinal() {
assert isLeafNode() : "nodeID=" + nodeID + " is not a leaf";
return nodeID - leafNodeOffset;
}

public void prefetchAll(TwoPhaseIntersectVisitor visitor, boolean grown) throws IOException {
if (grown == false) {
final long size = size();
if (size <= Integer.MAX_VALUE) {
visitor.grow((int) size);
grown = true;
}
}
if (isLeafNode()) {
// int count = isLastLeaf() ? config.maxPointsInLeafNode() : lastLeafNodePointCount;
long leafFp = getLeafBlockFP();
int leafNodeOrdinal = getLeafNodeOrdinal();
// Only call prefetch is this is the first leaf node ordinal or the first match in
// contigiuous sequence of matches for leaf nodes
// boolean prefetched = false;
if (visitor.lastDeferredBlockOrdinal() == -1
|| visitor.lastDeferredBlockOrdinal() + 1 < leafNodeOrdinal) {
// System.out.println("Prefetched called on " + leafNodeOrdinal);
leafNodes.prefetch(leafFp, 1);
// prefetched = true;
}
visitor.setLastDeferredBlockOrdinal(leafNodeOrdinal);
visitor.deferBlock(leafFp);
} else {
pushLeft();
prefetchAll(visitor, grown);
pop();
pushRight();
prefetchAll(visitor, grown);
pop();
}
}

public void addAll(PointValues.IntersectVisitor visitor, boolean grown) throws IOException {
if (grown == false) {
final long size = size();
Expand Down Expand Up @@ -1076,4 +1137,123 @@ public long cost() {
return length;
}
}

/**
* We can recurse the {@link BKDPointTree} using {@link TwoPhaseIntersectVisitor}. This visitor
* travere {@link BKDPointTree} in two phases. In the first phase, it recurses over the {@link
* BKDPointTree} optionally triggering IO for some of the blocks and caching them. In the second
* phase, once the recursion is over it visits the cached blocks one by one.
*
* @lucene.experimental
*/
public interface TwoPhaseIntersectVisitor extends IntersectVisitor {
/** return the last deferred block ordinal during recursion. */
public int lastDeferredBlockOrdinal();

/** set last deferred block ordinal */
public void setLastDeferredBlockOrdinal(int leafNodeOrdinal);

/** Defer this block for processing in the second phase. */
public void deferBlock(long leafFp);

/** Returns a snapshot of the currently deferred blocks. */
public List<Long> deferredBlocks();

/** Mark the given block as processed and remove it from the deferred set. */
public void onProcessingDeferredBlock(long leafFp);
}

/**
* Base implementation of {@link TwoPhaseIntersectVisitor} that maintains a list of deferred
* blocks from first phase of traversal and visits them in the second phase.
*
* @lucene.experimental
*/
public abstract static class BaseTwoPhaseIntersectVisitor implements TwoPhaseIntersectVisitor {

int lastDeferredBlockOrdinal = -1;
List<Long> deferredBlocks = new ArrayList<>();

/**
* return the last deferred block ordinal - this is used to avoid prefetching call for
* contiguous ordinals assuming contiguous ordinals prefetching can be taken care by readaheads.
*/
@Override
public int lastDeferredBlockOrdinal() {
return lastDeferredBlockOrdinal;
}

/** set last deferred block ordinal * */
@Override
public void setLastDeferredBlockOrdinal(int leafNodeOrdinal) {
lastDeferredBlockOrdinal = leafNodeOrdinal;
}

/** Defer this block for processing in the second phase. */
@Override
public void deferBlock(long leafFp) {
deferredBlocks.add(leafFp);
}

/** Returns a snapshot of the currently deferred blocks. */
@Override
public List<Long> deferredBlocks() {
return new ArrayList<>(deferredBlocks);
}

/** Mark the given block as processed and remove it from the deferred set. */
@Override
public void onProcessingDeferredBlock(long leafFp) {
deferredBlocks.remove(leafFp);
}
}

/**
* Finds all documents and points matching the provided visitor. This method does not enforce live
* documents, so it's up to the caller to test whether each document is deleted, if necessary.
*/
@Override
public final void intersect(IntersectVisitor visitor) throws IOException {
final BKDPointTree pointTree = (BKDPointTree) getPointTree();
if (visitor instanceof TwoPhaseIntersectVisitor twoPhaseIntersectVisitor) {
intersect(twoPhaseIntersectVisitor, pointTree);
List<Long> fps = twoPhaseIntersectVisitor.deferredBlocks();
for (int i = 0; i < fps.size(); ++i) {
long fp = fps.get(i);
pointTree.visitDocIDs(fp, visitor);
twoPhaseIntersectVisitor.onProcessingDeferredBlock(fp);
}
} else {
intersect(visitor, pointTree);
}
assert pointTree.moveToParent() == false;
}

private static void intersect(TwoPhaseIntersectVisitor visitor, BKDPointTree pointTree)
throws IOException {
while (true) {
Relation compare =
visitor.compare(pointTree.getMinPackedValue(), pointTree.getMaxPackedValue());
if (compare == Relation.CELL_INSIDE_QUERY) {
// This cell is fully inside the query shape: recursively prefetch all points in this cell
// without filtering
pointTree.prefetchDocIDs(visitor);
} else if (compare == Relation.CELL_CROSSES_QUERY) {
// The cell crosses the shape boundary, or the cell fully contains the query, so we fall
// through and do full filtering:
if (pointTree.moveToChild()) {
continue;
}
// TODO: we can assert that the first value here in fact matches what the pointTree
// claimed?
// Leaf node; scan and filter all points in this block:
pointTree.visitDocValues(visitor);
}
while (pointTree.moveToSibling() == false) {
if (pointTree.moveToParent() == false) {
return;
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.BitSet;
import java.util.List;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.codecs.FilterCodec;
import org.apache.lucene.codecs.PointsFormat;
Expand All @@ -39,7 +41,10 @@
import org.apache.lucene.tests.index.BasePointsFormatTestCase;
import org.apache.lucene.tests.index.MockRandomMergePolicy;
import org.apache.lucene.tests.util.TestUtil;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.NumericUtils;
import org.apache.lucene.util.bkd.BKDConfig;
import org.apache.lucene.util.bkd.BKDReader;

public class TestLucene90PointsFormat extends BasePointsFormatTestCase {

Expand Down Expand Up @@ -355,4 +360,134 @@ public Relation compare(byte[] minPackedValue, byte[] maxPackedValue) {
r.close();
dir.close();
}

public void testBasicWithPrefetchVisitor() throws Exception {
Directory dir = newDirectory();
IndexWriterConfig iwc = newIndexWriterConfig();
// Avoid mockRandomMP since it may cause non-optimal merges that make the
// number of points per leaf hard to predict
while (iwc.getMergePolicy() instanceof MockRandomMergePolicy) {
iwc.setMergePolicy(newMergePolicy());
}
IndexWriter w = new IndexWriter(dir, iwc);
byte[] pointValue = new byte[3];
byte[] uniquePointValue = new byte[3];
random().nextBytes(uniquePointValue);
final int numDocs =
TEST_NIGHTLY ? atLeast(10000) : atLeast(500); // at night, make sure we have several leaves
final boolean multiValues = random().nextBoolean();
int totalValues = 0;
for (int i = 0; i < numDocs; ++i) {
Document doc = new Document();
if (i == numDocs / 2) {
totalValues++;
doc.add(new BinaryPoint("f", uniquePointValue));
} else {
final int numValues = (multiValues) ? TestUtil.nextInt(random(), 2, 100) : 1;
for (int j = 0; j < numValues; j++) {
do {
random().nextBytes(pointValue);
} while (Arrays.equals(pointValue, uniquePointValue));
doc.add(new BinaryPoint("f", pointValue));
totalValues++;
}
}
w.addDocument(doc);
}
w.forceMerge(1);
final IndexReader r = DirectoryReader.open(w);
w.close();

final LeafReader lr = getOnlyLeafReader(r);
PointValues points = lr.getPointValues("f");

BKDReader.BaseTwoPhaseIntersectVisitor allPointsVisitor =
new BKDReader.BaseTwoPhaseIntersectVisitor() {
@Override
public void visit(int docID, byte[] packedValue) throws IOException {}

@Override
public void visit(int docID) throws IOException {}

@Override
public Relation compare(byte[] minPackedValue, byte[] maxPackedValue) {
return Relation.CELL_INSIDE_QUERY;
}
};

List<Long> savedBlocks = allPointsVisitor.deferredBlocks();
assertEquals(0, savedBlocks.size()); // Test that all deferred blocks were processed
assertEquals(totalValues, points.estimatePointCount(allPointsVisitor));
assertEquals(numDocs, points.estimateDocCount(allPointsVisitor));

r.close();
dir.close();
}

public void testBasicWithPrefetchCapableVisitor() throws Exception {
Directory dir = newDirectory();
IndexWriterConfig iwc = newIndexWriterConfig();
iwc.setMergePolicy(newLogMergePolicy());
IndexWriter w = new IndexWriter(dir, iwc);
byte[] point = new byte[4];
for (int i = 0; i < 20; i++) {
Document doc = new Document();
NumericUtils.intToSortableBytes(i, point, 0);
doc.add(new BinaryPoint("dim", point));
w.addDocument(doc);
}
w.forceMerge(1);
w.close();

DirectoryReader r = DirectoryReader.open(dir);
LeafReader sub = getOnlyLeafReader(r);
PointValues values = sub.getPointValues("dim");

// Simple test: make sure prefetch capable visitor can visit every doc when cell crosses query:
BitSet seen = new BitSet();
values.intersect(
new BKDReader.BaseTwoPhaseIntersectVisitor() {
@Override
public Relation compare(byte[] minPacked, byte[] maxPacked) {
return Relation.CELL_CROSSES_QUERY;
}

@Override
public void visit(int docID) {
throw new IllegalStateException();
}

@Override
public void visit(int docID, byte[] packedValue) {
seen.set(docID);
assertEquals(docID, NumericUtils.sortableBytesToInt(packedValue, 0));
}
});
assertEquals(20, seen.cardinality());
// Make sure prefetch capable visitor can visit all docs when all docs are inside query
// Also test we are not visiting documents twice based on whether PointTree has a prefetch
// implementation of
// prepareOrVisit or uses the default implementation
seen.clear();
final int[] docCount = {0};
values.intersect(
new BKDReader.BaseTwoPhaseIntersectVisitor() {
@Override
public void visit(int docID) throws IOException {
seen.set(docID);
docCount[0]++;
}

@Override
public void visit(int docID, byte[] packedValue) throws IOException {}

@Override
public Relation compare(byte[] minPackedValue, byte[] maxPackedValue) {
return Relation.CELL_INSIDE_QUERY;
}
});
assertEquals(20, seen.cardinality());
assertEquals(20, docCount[0]);
IOUtils.close(r, dir);
}
}
Loading