Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use IndexInput#prefetch for terms dictionary lookups. #13359

Open
wants to merge 12 commits into
base: main
Choose a base branch
from

Conversation

jpountz
Copy link
Contributor

@jpountz jpountz commented May 11, 2024

This introduces TermsEnum#prepareSeekExact, which essentially calls IndexInput#prefetch at the right offset for the given term. Then it takes advantage of the fact that BooleanQuery already calls Weight#scorerSupplier on all clauses, before later calling ScorerSupplier#get on all clauses. So TermQuery now calls TermsEnum#prepareSeekExact on Weight#scorerSupplier (if scores are not needed), which in-turn means that the I/O all terms dictionary lookups get parallelized across all term queries of a BooleanQuery on a given segment (intra-segment parallelism).

This introduces `TermsEnum#prepareSeekExact`, which essentially calls
`IndexInput#prefetch` at the right offset for the given term. Then it takes
advantage of the fact that `BooleanQuery` already calls `Weight#scorerSupplier`
on all clauses, before later calling `ScorerSupplier#get` on all clauses. So
`TermQuery` now calls `TermsEnum#prepareSeekExact` on `Weight#scorerSupplier`
(if scores are not needed), which in-turn means that the I/O all terms
dictionary lookups get parallelized across all term queries of a
`BooleanQuery` on a given segment (intra-segment parallelism).
@jpountz
Copy link
Contributor Author

jpountz commented May 11, 2024

This is a draft as I need to do more work on tests and making sure that this new method cannot corrupt the state of the SegmentTermsEnum.

But I created a benchmark that starts looking like running a Lucene query that is encouraging. It creates an index with many terms that have very short postings lists, so that running boolean queries on these terms is heavy on terms dictionary lookups rather than reading postings. Then it manually runs a disjunction over 3 terms (some of these terms may not exist in the index as they are created randomly), computing how long it takes to evaluate all hits. To work properly when running a query, we'd need to move `#bulkScorer` from `Weight` to `ScorerSupplier`, which I intend to do as a follow-up.
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.TieredMergePolicy;
import org.apache.lucene.search.BooleanClause.Occur;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.ScorerSupplier;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.Weight;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;

public class TermsEnumPrefetchBench {

  private static final int NUM_TERMS = 3;
  public static int DUMMY;
  
  public static void main(String[] args) throws Exception {
    Path dirPath = Paths.get(args[0]);
    Directory dir = FSDirectory.open(dirPath);
    if (DirectoryReader.indexExists(dir) == false) {
      TieredMergePolicy mp = new TieredMergePolicy();
      mp.setSegmentsPerTier(100);
      mp.setMaxMergeAtOnce(100);
      mp.setMaxMergedSegmentMB(1024);
      try (IndexWriter w = new IndexWriter(dir, new IndexWriterConfig()
          .setMergePolicy(mp)
          .setRAMBufferSizeMB(1024))) {
        ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        AtomicLong indexed = new AtomicLong(0);
        for (int task = 0; task < 1000; ++task) {
          executor.execute(() -> {
            Random r = ThreadLocalRandom.current();
            for (int i = 0; i < 1_000; ++i) {
              Document doc = new Document();
              for (int j = 0; j < 10_000; ++j) {
                doc.add(new StringField("f", Long.toString(r.nextLong(20_000_000_000L)), Store.NO));
              }
              try {
                w.addDocument(doc);
              } catch (IOException e) {
                throw new UncheckedIOException(e);
              }
              final long actualIndexed = indexed.incrementAndGet(); 
              if (actualIndexed % 10_000 == 0) {
                System.out.println("Indexed: " + actualIndexed);
              }
            }
          });
        }

        executor.shutdown();
        executor.awaitTermination(1, TimeUnit.DAYS);
        w.commit();
        System.out.println("Start force merging");
        w.forceMerge(1);
        System.out.println("Done force merging");
        w.commit();
      }
    }
    List<Long> latencies = new ArrayList<>();
    try (IndexReader reader = DirectoryReader.open(dir)) {
      IndexSearcher searcher = new IndexSearcher(reader);

      Random r = ThreadLocalRandom.current();
      for (int i = 0; i < 10_000; ++i) {
        long start = System.nanoTime();
        BooleanQuery.Builder query = new BooleanQuery.Builder();
        for (int t = 0; t < NUM_TERMS; ++t) {
          query.add(new TermQuery(new Term("f", Long.toString(r.nextLong(20_000_000_000L)))), Occur.SHOULD);
        }
        Weight weight = searcher.createWeight(searcher.rewrite(query.build()), ScoreMode.COMPLETE_NO_SCORES, 1f);
        ScorerSupplier ss = weight.scorerSupplier(reader.leaves().get(0));
        if (ss != null) {
          Scorer scorer = ss.get(Long.MAX_VALUE);
          DocIdSetIterator iter = scorer.iterator();
          for (int d = iter.nextDoc(); d != DocIdSetIterator.NO_MORE_DOCS; d = iter.nextDoc()) {
            DUMMY++;
          }
          long end = System.nanoTime();
          latencies.add((end - start) / 1000);
        }
      }
    }
    latencies.sort(null);
    System.out.println("P50: " + latencies.get(latencies.size() / 2));
    System.out.println("P90: " + latencies.get(latencies.size() * 9 / 10));
    System.out.println("P99: " + latencies.get(latencies.size() * 99 / 100));
  }

}

Without the change:
P50: 286
P90: 403
P99: 532

With the change:
P50: 148
P90: 246
P99: 368

final long fp = in.getFilePointer();
in.seek(fpSeek);
in.prefetch(1); // TODO: could we know the length of the block?
in.seek(fp); // TODO: do we actually need to do this?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really don't like these calls to seek() just to prefetch data. Since it is just prefetching, I'd prefer if this "dance" was an impl detail, if needed.
It would make the code simpler to just pass parameter to prefetch rather than do this.

Then it is clear that the default implementation won't cause harm (unnecessary io) for any directory subclasses

So I think prefetch should take location as argument? It is just a hint and not real i/o by the thread. It's intentionally not sequential and sequential API for it only hurts.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened #13363.

@jpountz jpountz marked this pull request as ready for review May 15, 2024 13:55
@jpountz
Copy link
Contributor Author

jpountz commented May 15, 2024

I iterated a bit on this change:

  • TermsEnum#prepareSeekExact is introduced, which only prefetches data which is later going to be needed by TermsEnum#seekExact.
  • TermStates#build no longer runs on the IndexSearcher threadpool, but in the current thread, leveraging TermsEnum#prepareSeekExact to parallelize I/O across all terms and segments.
  • TermQuery and SynonymQuery call TermsEnum#prepareSeekExact in Weight#scorerSupplier so that the I/O associated with terms dictionary lookups is parallelized across clauses of the same BooleanQuery.

Copy link
Member

@rmuir rmuir left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whew terms stuff is always heavy, i left one concern

* #prepareSeekExact} on multiple terms enums before calling {@link #seekExact(BytesRef)} on the
* same {@link TermsEnum}s.
*/
public void prepareSeekExact(BytesRef text) throws IOException {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we look into subclasses such as FilterLeafReader.FilterTermsEnum to make sure this new method behaves correctly?

@mikemccand
Copy link
Member

But I created a benchmark that starts looking like running a Lucene query that is encouraging

Was this with a forced-cold index?

@jpountz
Copy link
Contributor Author

jpountz commented May 21, 2024

It creates a 50GB terms dictionary while my machine only has ~28GB of RAM for the page cache, so many terms dictionary lookups result in page faults.

// TODO: should we try to reuse the current state of this terms enum when applicable?
BytesRefFSTEnum<BytesRef> indexEnum = new BytesRefFSTEnum<>(fr.index);
InputOutput<BytesRef> output = indexEnum.seekFloor(target);
if (output != null) { // should never be null since we already checked against fr.getMin()?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed -- maybe change to assert?

output.output.bytes, output.output.offset, output.output.length));
final long fpSeek = code >>> Lucene90BlockTreeTermsReader.OUTPUT_FLAGS_NUM_BITS;
initIndexInput();
in.prefetch(fpSeek, 1); // TODO: could we know the length of the block?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can do indexEnum.next() and if that is non-null (it will be null if you are on the very last block -- we could handle that case as well maybe) then get the fp for that next block and subtract the two?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried this but this doesn't work as expected, it sometimes gives me blocks that have prior offsets and I'm not intimate enough with this terms dictionary to understand why, maybe you do?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Egads, that is really weird -- I would not expect those fp to go backwards on .next() -- I thought the FST index was a depth-first traversal of all on-disk (leaf) blocks. I will need to mull some more about this :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe @vsop-479 has some insight?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anyway, I don't think we need to hold up this nice PR for this -- we can try to improve this later. PnP!

Copy link
Contributor

@vsop-479 vsop-479 May 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it sometimes gives me blocks that have prior offsets

I would not expect those fp to go backwards on .next()

I haven't looked into the whole context.
But this will happens when we finished a Block and go back to read its parent block's term.

e.g.
We have terms like: "regular", "request1", "request2", "rest". Set minTermBlockSize to 2, maxTermBlockSize to 3. We will get blocks: b1: ["re"](root), b2: ["gular", "quest", "st"], b3: ["1", "2"].
Then as we call next, we will get the(term: fp) like:
"regular": b2,
"request1": b3,
"request2": b3,
"rest": b2.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this will happens when we finished a Block and go back to read its parent block's term.

Yeah I agree that the rest term will be back in b2 block, but, the FSTEnum we are talking about is the in-memory terms index that holds the file pointer offset to the start of these blocks ... I would have expected the FST to have e.g. in your example:

  • reg -> b2
  • req -> b3
  • rez -> b4

Or so, with b4 > b3 > b2 block file pointers ...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK! This was nagging at me so I dug into it, printing the FSTEnum iteration output on a nightly Lucene benchy index ... I now understand why the pointers indeed go backwards. I think this was the point you were making above @vsop-479 -- sorry I misunderstood at first ;)

It's because when writing the blocks we write "bottoms up" on depth first traversal through the terms, and only write a node when it is finished / returned from. Leaf blocks will be written immediately / in order since they are started, terms come out, finished. But for a non-leaf blocks, first all leaf blocks under them are written (in order), and THEN the non-leaf block is written only when we are done with all those recursions and writing any straggler terms that live in the non-leaf block.

But the prefixes are added to the index FST in the correct (term sorted) order. So this means the file pointer can indeed go backwards when iterating the terms index. I'll mull some more about whether we could (efficiently) know the term block length ...

Copy link
Member

@mikemccand mikemccand left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jpountz -- this is a nice change -- two phased reads!

BytesRef term;
while ((term = termsEnum.next()) != null) {

// This is the term vectors:
postings = termsEnum.postings(postings, PostingsEnum.ALL);
assert postings != null;

if ((seekExactCounter++ & 0xFF) == 0) {
postingsTermsEnum.prepareSeekExact(term);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice -- this is to make sure we are exercising the API?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct.

* #prepareSeekExact} on multiple terms enums before calling {@link #seekExact(BytesRef)} on the
* same {@link TermsEnum}s.
*
* <p><b>NOTE</b>: The terms enum is unpositioned after calling this method.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm is it really that it is unpositioned, or, that this method does not alter the TermsEnum's positioned state?

I.e. if I position it to some term, then call this method, won't it still be positioned on that same (prior) term?

Or are we trying to reserve the future right in the API to break the positioning, even though this first impl preserves it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, I'm trying to reserve the right to update the state of the terms enum through this API in the future. See also AssertingTermsEnum#prepareSeekExact.

*
* <p><b>NOTE</b>: It is not necessary to call this method before calling {@link
* #seekExact(BytesRef, TermState)}. {@link TermsEnum} implementations are expected to implement
* this method in an I/O-free fashion.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method -> that method? Since I think you mean seekExact(BytesRef, TermState) when you say this method here (but the previous this method two lines up is referring to prepareSeekExact)? Pronouns are hard!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

English as a whole is hard. :) I'll fix.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm actually I mean prepareSeekExact when I say this method. I'll replace this method with prepareSeekExact to avoid ambiguity

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, sorry I was confused, you meant the second occurrence of "this method"!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LOL!! Merely communicating about an English sentence, in English, is ESPECIALLY HARD!!

*/
public TermState get(LeafReaderContext ctx) throws IOException {
public Supplier<TermState> get(LeafReaderContext ctx) throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sort of reminds me of two-phase commit, except at read-time not write-time: we now break up these IO heavy read APIs into two phases, now, where step 1 is the intention to get X soon (allowing prefetch to happen, especially concurrently not just in the background of the calling thread, but, across the N different Xs we want to retrieve). Step 2 is to then go and block on the IO to retrieve each of the N Xs. Two phased reads!

@@ -19,6 +19,7 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.function.Supplier;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we open a spinoff issue to maybe add prefetch to TermInSetQuery too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, only TermQuery and SynonymQuery are handled so we could open an issue for every other query, I'm not sure we should open these issues? But indeed, let's think of how to take advantage of prefetching in PointRangeQuery, TermInSetQuery, FeatureQuery, etc.

@@ -150,7 +170,12 @@ public Scorer get(long leadCost) throws IOException {

@Override
public long cost() {
return docFreq;
try {
TermsEnum te = getTermsEnum();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm this getter got more costly. It's too bad TermState is so opaque -- under the hood it (BlockTermState) is already storing docFreq.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that we were already getting a TermsEnum to be able to get the cost before, it just happened before creating the ScorerSupplier. So the additional cost here is the if (termsEnum != null) check under getTermsEnum().

Agreed that it's a pity to pull a terms enum only to get a cost, which is already encapsulated in the term state. Though I don't expect it to be a major issue in practice.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants