Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add IndexInput#prefetch. #13337

Merged
merged 16 commits into from
May 10, 2024
Merged

Add IndexInput#prefetch. #13337

merged 16 commits into from
May 10, 2024

Conversation

jpountz
Copy link
Contributor

@jpountz jpountz commented May 2, 2024

This adds IndexInput#prefetch, which is an optional operation that instructs the IndexInput to start fetching bytes from storage in the background. These bytes will be picked up by follow-up calls to the IndexInput#readXXX methods. In the future, this will help Lucene move from a maximum of one I/O operation per search thread to one I/O operation per search thread per IndexInput. Typically, when running a query on two terms, the I/O into the terms dictionary is sequential today. In the future, we would ideally do these I/Os in parallel using this new API. Note that this will require API changes to some classes including TermsEnum.

I settled on this API because it's simple and wouldn't require making all Lucene APIs asynchronous to take advantage of extra I/O concurrency, which I worry would make the query evaluation logic too complicated.

Currently, only NIOFSDirectory implements this new API. I played with MMapDirectory as well and found an approach that worked better in the benchmark I've been playing with, but I'm not sure it makes sense to implement this API on this directory as it either requires adding an explicit buffer on MMapDirectory, or forcing data to be loaded into the page cache even though the OS may have decided that it's not a good idea due to too few cache hits.

This change will require follow-ups to start using this new API when working with terms dictionaries, postings, etc.

Relates #13179

This adds `IndexInput#prefetch`, which is an optional operation that instructs
the `IndexInput` to start fetching bytes from storage in the background. These
bytes will be picked up by follow-up calls to the `IndexInput#readXXX` methods.
In the future, this will help Lucene move from a maximum of one I/O operation
per search thread to one I/O operation per search thread per `IndexInput`.
Typically, when running a query on two terms, the I/O into the terms dictionary
is sequential today. In the future, we would ideally do these I/Os in parallel
using this new API. Note that this will require API changes to some classes
including `TermsEnum`.

I settled on this API because it's simple and wouldn't require making all
Lucene APIs asynchronous to take advantage of extra I/O concurrency, which I
worry would make the query evaluation logic too complicated.

Currently, only `NIOFSDirectory` implements this new API. I played with
`MMapDirectory` as well and found an approach that worked better in the
benchmark I've been playing with, but I'm not sure it makes sense to implement
this API on this directory as it either requires adding an explicit buffer on
`MMapDirectory`, or forcing data to be loaded into the page cache even though
the OS may have decided that it's not a good idea due to too few cache hits.

This change will require follow-ups to start using this new API when working
with terms dictionaries, postings, etc.

Relates apache#13179
@jpountz jpountz added this to the 10.0.0 milestone May 2, 2024
@jpountz
Copy link
Contributor Author

jpountz commented May 2, 2024

I created the following benchmark to simulate lookups in a terms dictionary that cannot fit in the page cache.
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;

import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.NIOFSDirectory;

public class PrefetchBench {

  private static final int NUM_TERMS = 3;
  private static final long FILE_SIZE = 100L * 1024 * 1024 * 1024; // 100GB
  private static final int NUM_BYTES = 16;
  public static int DUMMY;

  public static void main(String[] args) throws IOException {
    Path filePath = Paths.get(args[0]);
    Path dirPath = filePath.getParent();
    String fileName = filePath.getFileName().toString();
    Random r = ThreadLocalRandom.current();

    try (Directory dir = new NIOFSDirectory(dirPath)) {
      if (Arrays.asList(dir.listAll()).contains(fileName) == false) {
        try (IndexOutput out = dir.createOutput(fileName, IOContext.DEFAULT)) {
          byte[] buf = new byte[8196];
          for (long i = 0; i < FILE_SIZE; i += buf.length) {
            r.nextBytes(buf);
            out.writeBytes(buf, buf.length);
          }
        }
      }

      for (boolean dataFitsInCache : new boolean[] { false, true}) {
        try (IndexInput i0 = dir.openInput("file", IOContext.DEFAULT)) {
          byte[][] b = new byte[NUM_TERMS][];
          for (int i = 0; i < NUM_TERMS; ++i) {
            b[i] = new byte[NUM_BYTES];
          }
          IndexInput[] inputs = new IndexInput[NUM_TERMS];
          if (dataFitsInCache) {
            // 16MB slice that should easily fit in the page cache
            inputs[0] = i0.slice("slice", 0, 16 * 1024 * 1024);
          } else {
            inputs[0] = i0;
          }
          for (int i = 1; i < NUM_TERMS; ++i) {
            inputs[i] = inputs[0].clone();
          }
          final long length = inputs[0].length();
          List<Long>[] latencies = new List[2];
          latencies[0] = new ArrayList<>();
          latencies[1] = new ArrayList<>();
          for (int iter = 0; iter < 10_000; ++iter) {
            final boolean prefetch = (iter & 1) == 0;

            final long start = System.nanoTime();

            for (IndexInput ii : inputs) {
              final long offset = r.nextLong(length - NUM_BYTES);
              ii.seek(offset);
              if (prefetch) {
                ii.prefetch();
              }
            }

            for (int i = 0; i < NUM_TERMS; ++i) {
              inputs[i].readBytes(b[i], 0, b[i].length);
            }

            final long end = System.nanoTime();

            // Prevent the JVM from optimizing away the reads
            DUMMY = Arrays.stream(b).mapToInt(Arrays::hashCode).sum();

            latencies[iter & 1].add((end - start) / 1024);
          }

          latencies[0].sort(null);
          latencies[1].sort(null);

          System.out.println("Data " + (dataFitsInCache ? "fits" : "does not fit") + " in the page cache");
          long prefetchP50 = latencies[0].get(latencies[0].size() / 2);
          long prefetchP90 = latencies[0].get(latencies[0].size() * 9 / 10);
          long noPrefetchP50 = latencies[1].get(latencies[0].size() / 2);
          long noPrefetchP90 = latencies[1].get(latencies[0].size() * 9 / 10);

          System.out.println("  With prefetching:    P50=" + prefetchP50 + "us P90=" + prefetchP90 + "us");
          System.out.println("  Without prefetching: P50=" + noPrefetchP50 + "us P90=" + noPrefetchP90 + "us");
        }
      }
    }
  }

}

It assumes 3 terms that need to read 16 bytes each from the terms dictionary. We compare the time it takes to read these 16 bytes using today's sequential approach vs. taking advantage of the new prefetch() API, both in the case when the terms dictionary fits in the page cache, and when it doesn't. The benchmark prints the following on my machine:

Data does not fit in the page cache
  With prefetching:    P50=103us P90=144us
  Without prefetching: P50=226us P90=247us
Data fits in the page cache
  With prefetching:    P50=14us P90=83us
  Without prefetching: P50=3us P90=73us

Using prefetch() adds a bit of overhead (10us) to each lookup in the best-case scenario when data fits in the page cache, but saves a non-negligible amount of time (100us) when data does not fit in the page cache. Note that these numbers would need to be multiplied by the number of segments to search. Also, this tests with 3 terms while some queries may have much higher numbers of terms. I'd need to test what happens with higher query concurrency, but this looks promising to me.

@jpountz
Copy link
Contributor Author

jpountz commented May 6, 2024

@rmuir asked if we could add support for this on MMapDirectory via madvise + POSIX_MADV_WILLNEED. I pushed a new commit that does this (with several nocommits). This seems to perform well in the above benchmark if I replace NIOFSDirectory with MMapDirectory:

Data does not fit in the page cache
  With prefetching:    P50=87us P90=121us
  Without prefetching: P50=220us P90=239us
Data fits in the page cache
  With prefetching:    P50=1us P90=2us
  Without prefetching: P50=0us P90=0us

@jpountz jpountz requested a review from uschindler May 6, 2024 08:18
@uschindler
Copy link
Contributor

Hi, give me some time to review. I got the concept! I also have some questions about the NIOFS one because I don't like to use twice as much file handles just for the prefetching.

MMap: the pagesize problem is well known. You are not the first one hitting this. I worked aorund it with some hack, too:

} else if (nativeAccess.isPresent() && chunkSizePower >= 21) {
nativeAccess.get().madvise(segment, readAdvice);
}

The problem with page size is: It is known everywhere in the JDK and available via Unsafe and various other places, but all are private. I temporarily tried to add a workaround to use another glibc call to retrieve the page size, but this failes due to differences in enum constants on different platforms (the SC_PAGESIZE constant has a different value on various linux/macos versions) because its a C enum and has no fixed value. So we cannot easily get the page size without hardcoding an integer constant which is not even defined in any header file. The alternative is to use the deprecated/outdated getpagesize() function on libc.... But I don't want to use it as its not posix standardized....

What do you think about this workaround: use 4 K as pagesize, but guard with try/catch and do nothing on IOExc (if unaliged)?

I was about to open an issue on JDK to somehow allow to get page size from JVM in the MemorySegment API because you need it quite often when using them.

@uschindler
Copy link
Contributor

uschindler commented May 6, 2024

We can also use the Hotspot bean to get page size, but this fails on OpenJ9 or any 3rd party JVM. So we could try to get page size from HotSpt bean in Constants.java and save it in OptionInt. If it is undefined, preloading gets disabled:

see those examples in Constants:

/** true if FMA likely means a cpu instruction and not BigDecimal logic. */
private static final boolean HAS_FMA =
(IS_CLIENT_VM == false) && HotspotVMOptions.get("UseFMA").map(Boolean::valueOf).orElse(false);
/** maximum supported vectorsize. */
private static final int MAX_VECTOR_SIZE =
HotspotVMOptions.get("MaxVectorSize").map(Integer::valueOf).orElse(0);
/** true for an AMD cpu with SSE4a instructions. */
private static final boolean HAS_SSE4A =
HotspotVMOptions.get("UseXmmI2F").map(Boolean::valueOf).orElse(false);

@jpountz
Copy link
Contributor Author

jpountz commented May 6, 2024

Thanks for taking a look Uwe, and suggesting approaches for the page size issue! By the way, feel free to push directly to the branch.

I also have some questions about the NIOFS one because I don't like to use twice as much file handles just for the prefetching.

I don't like it either. And actually tests don't like it either, as I've seen more issues with HandleLimitFS due to this change. :)

@uschindler
Copy link
Contributor

Thanks for taking a look Uwe, and suggesting approaches for the page size issue! By the way, feel free to push directly to the branch.

I also have some questions about the NIOFS one because I don't like to use twice as much file handles just for the prefetching.

I don't like it either. And actually tests don't like it either, as I've seen more issues with HandleLimitFS due to this change. :)

Can't we use the same filechannel and do a positional read in another thread (not async)? I like the trick to use a virtual thread, because by that we have no additional thread and instead it hooks into the next I/O call and triggers our read. As this is the case, what's the problem of doing a blocking call (positional) in the virtual thread? If the virtual thread gets blocked it will hand over the call to another virtual thread.

@uschindler
Copy link
Contributor

uschindler commented May 6, 2024

We can also use the Hotspot bean to get page size, but this fails on OpenJ9 or any 3rd party JVM. So we could try to get page size from HotSpt bean in Constants.java and save it in OptionInt. If it is undefined, preloading gets disabled:

see those examples in Constants:

/** true if FMA likely means a cpu instruction and not BigDecimal logic. */
private static final boolean HAS_FMA =
(IS_CLIENT_VM == false) && HotspotVMOptions.get("UseFMA").map(Boolean::valueOf).orElse(false);
/** maximum supported vectorsize. */
private static final int MAX_VECTOR_SIZE =
HotspotVMOptions.get("MaxVectorSize").map(Integer::valueOf).orElse(0);
/** true for an AMD cpu with SSE4a instructions. */
private static final boolean HAS_SSE4A =
HotspotVMOptions.get("UseXmmI2F").map(Boolean::valueOf).orElse(false);

Does not work, we only can get huge page size, not native page size.

$ java -XX:+PrintFlagsFinal -version | grep -i page
   size_t LargePageHeapSizeThreshold               = 134217728                                 {product} {default}
   size_t LargePageSizeInBytes                     = 0                                         {product} {default}
    uintx NUMAPageScanRate                         = 256                                       {product} {default}
     intx StackRedPages                            = 1                                      {pd product} {default}
     intx StackReservedPages                       = 0                                      {pd product} {default}
     intx StackShadowPages                         = 8                                      {pd product} {default}
     intx StackYellowPages                         = 3                                      {pd product} {default}
     bool UseLargePages                            = false                                  {pd product} {default}
     bool UseLargePagesIndividualAllocation        = false                                  {pd product} {ergonomic}

It looks we need to have 2 options:

  • use try/catch around the madvise and do nothing if unaliged on 4k boundaries
  • use the obsolete/deprecated getpagesize() call to libc.

@uschindler
Copy link
Contributor

Hi,

It looks we need to have 2 options:

  • use try/catch around the madvise and do nothing if unaliged on 4k boundaries
  • use the obsolete/deprecated getpagesize() call to libc.

at least libc's int getpagesize() is available on both macos and linux. I doubt that it ever gets removed from glibc as this may break many programs.

So the only viable solution would be to expose and implement this method in NativeAcess via PosixNativeAccess. Should I give it a try?

@jpountz
Copy link
Contributor Author

jpountz commented May 6, 2024

Please go ahead.

@jpountz
Copy link
Contributor Author

jpountz commented May 6, 2024

Can't we use the same filechannel and do a positional read in another thread (not async)?

I gave it a try in the last commit, is this what you had in mind? The benchmark suggests that this still works well, slightly better than the previous approach, now on-par with MMapDirectory.

Data does not fit in the page cache
  With prefetching:    P50=88us P90=124us
  Without prefetching: P50=223us P90=243us
Data fits in the page cache
  With prefetching:    P50=1us P90=2us
  Without prefetching: P50=0us P90=0us

@uschindler
Copy link
Contributor

Yes, that was my idea.

I also quickyl implemented the page size problem. I haven't tested it (on windows at moment). If you like you could quickly check the return value on linux and mac (it's too late now).

I will also remove the nativeaccess passed around in MMapDir and just make a static constant in the MemorySegmentIndexInput.

@jpountz
Copy link
Contributor Author

jpountz commented May 6, 2024

It works on my Linux box and returns 4096. 🎉

@uschindler
Copy link
Contributor

It works on my Linux box and returns 4096. 🎉

We could now also fix my hack regarding smaller chunk sizes and just ensure the chunk size is greater page size to enable madvise (so alignment fits pages for mmap call).

@rmuir
Copy link
Member

rmuir commented May 6, 2024

If madvise does the trick for mmapdir, why not try POSIX_FADV_WILLNEED for the niofs case?

@uschindler
Copy link
Contributor

If madvise does the trick for mmapdir, why not try POSIX_FADV_WILLNEED for the niofs case?

We can't get the file handle in Java (still open issue).

@rmuir
Copy link
Member

rmuir commented May 6, 2024

We can't get the file handle in Java (still open issue).

hmm, ok. I felt like we were able to get it somewhere thru the guts of nio/2 filesystem apis, maybe I am wrong?

@uschindler
Copy link
Contributor

We can't get the file handle in Java (still open issue).

hmm, ok. I felt like we were able to get it somewhere thru the guts of nio/2 filesystem apis, maybe I am wrong?

No it's a known issue. Maurizio and Alan Bateman agreed to fix it.

@rmuir
Copy link
Member

rmuir commented May 6, 2024

ability to madvise/fadvise without resorting to native code would be awesome too. I don't know how it may translate to windows. but it seems like it does exactly what this PR wants to do:

POSIX_FADV_WILLNEED initiates a nonblocking read of the specified region into the page cache. The amount of data read may be decreased by the kernel depending on virtual memory load. (A few megabytes will usually be fully satisfied, and more is rarely useful.)

@uschindler
Copy link
Contributor

uschindler commented May 6, 2024

https://bugs.openjdk.org/browse/JDK-8292771 and the followup: https://bugs.openjdk.org/browse/JDK-8329256

@rmuir
Copy link
Member

rmuir commented May 6, 2024

Thanks Uwe, maybe the correct solution is to simply add the api and implement with madvise() for MMapDirectory, for now? To me this is just another madvise being hooked in.

I feel the bytebuffer/thread dancing in bufferedindexinput is too much. I can't reason about pendingPrefetch and clones and all that. A simple fadvise() seems like a correct solution there as it happens asynchronously in the kernel without hassles: but there's no reason to hold back mmapdir which can use madvise() today.

@jpountz
Copy link
Contributor Author

jpountz commented May 7, 2024

I reverted changes to BufferedIndexInput, agreed to focus on MMapDirectory for now.

@jpountz
Copy link
Contributor Author

jpountz commented May 7, 2024

Some questions about the API, curious to get your thoughts:

  • Should we remove ReadAdvice#WILL_NEED and instead introduce a new API such as NativeAccess#madviseWillNeed. It's a bit awkward right now that we have a ReadAdvice constant that is rejected in the only place where we allow ReadAdvice objects: IOContext. What do you think @uschindler ?
  • Should we rename prefetch() to willNeed() to better draw the parallel with madvise/fadvise?
  • I'm contemplating changing the signature from void prefetch() to void prefetch(long offset, long length). The benefit is that this would allow reading from multiple places with a single IndexInput. E.g. one could parallelize the I/O of reading N stored documents on a single StoredFieldsReader instance instead of creating one reader (with its own IndexInput clone) per document. I'll do this change if there are no objections.

@uschindler
Copy link
Contributor

Some questions about the API, curious to get your thoughts:

  • Should we remove ReadAdvice#WILL_NEED and instead introduce a new API such as NativeAccess#madviseWillNeed. It's a bit awkward right now that we have a ReadAdvice constant that is rejected in the only place where we allow ReadAdvice objects: IOContext. What do you think @uschindler ?

I had the same problem. I don't like the changes in IOContext (i reverted some of them and used a method to disallow specific ReadAdvice). Indeed a new method that is for the WillNeed case only is a good idea. Actually the willneed is different to the readahead settings. So possibly we can separate them in nativeaccess.

  • Should we rename prefetch() to willNeed() to better draw the parallel with madvise/fadvise?

I have no preference.

  • I'm contemplating changing the signature from void prefetch() to void prefetch(long offset, long length). The benefit is that this would allow reading from multiple places with a single IndexInput. E.g. one could parallelize the I/O of reading N stored documents on a single StoredFieldsReader instance instead of creating one reader (with its own IndexInput clone) per document. I'll do this change if there are no objections.

This was my first idea when looking at the first method. With current code it is completely undefined how much the prefetch should read.

Making it absolute is not bad, but should be limited to the RandomAccessIndexInput interface. So basically we can have both variants. The sequential one should be part of IndexInput and is a must to be implemented. The other should not be in IndexInput only in the random slice.

Basically the current code doe snot preload accrodd boundaries, but implemebting that is easy and can be one in MemorySegmentIndexInput in the same way like reading arrays, also cross boundary. It would just be 2 calls.

@rmuir
Copy link
Member

rmuir commented May 7, 2024

  • I'm contemplating changing the signature from void prefetch() to void prefetch(long offset, long length). The benefit is that this would allow reading from multiple places with a single IndexInput. E.g. one could parallelize the I/O of reading N stored documents on a single StoredFieldsReader instance instead of creating one reader (with its own IndexInput clone) per document. I'll do this change if there are no objections.

To me it makes sense to be very specific with the region that is needed. otherwise all this madvising doesn't make a lot of sense... turning off the OS default readahead and supplying our own values that might be gigabytes? better to not call madvise at all :)

I see this stuff as an operating system hint to get better performance, not actually reading data. So requiring a user to slice()/clone() just to give a hint seems like the wrong way. That has too much overhead for some directories (e.g. buffers).

also, i'm a little concerned about low-level parallelization of e.g. individual stored documents. seems like a lot of overhead! if you need 10,000 documents ranges, at least make a single mincore() call first before issuing 10,000 madvise() calls?

@@ -50,6 +51,7 @@ abstract class MemorySegmentIndexInput extends IndexInput implements RandomAcces
final int chunkSizePower;
final Arena arena;
final MemorySegment[] segments;
final Optional<NativeAccess> nativeAccess;
Copy link
Contributor

@uschindler uschindler May 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As this is a singleton, we can make it static and initialize it here. There's no need to pass the optional through constructors and have it in every clone.

We may also make it static final on the provider, but that's unrelated.

Copy link
Contributor

@uschindler uschindler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me looks fine. I only have a minor comment.

@uschindler
Copy link
Contributor

uschindler commented May 8, 2024

In general we may still look at Robert's suggestion. If we plan to send a preload for many slices, we should think of adding another random API to RandomAccessIndexInput, something like:

public void preload(Tuple<long offset, long length> pair...)

This one should first call mincore to figure out if the pages are not already loaded by kernel and then only call madviseWillNeed() for those pages that are not cached already.

@jpountz
Copy link
Contributor Author

jpountz commented May 9, 2024

also, i'm a little concerned about low-level parallelization of e.g. individual stored documents. seems like a lot of overhead! if you need 10,000 documents ranges, at least make a single mincore() call first before issuing 10,000 madvise() calls?

This sounds like a good idea. If a user wants to return 10k stored documents, I wonder if we should also split this into smaller batches to avoid running into a case when some pages from the cache gets claimed by something else before we have a chance to retrieve all these stored documents.

we should think of adding another random API to RandomAccessIndexInput, something like

Thanks Uwe, this sounds like a good suggestion. I'll start looking into using this API for terms dictionary lookups of boolean queries of term queries, which I don't think would need it since we'd do a single seek per clone anyway. But when we later move to vectors, stored fields and term vectors this could be useful.

@jpountz jpountz marked this pull request as ready for review May 9, 2024 21:09
@jpountz jpountz merged commit 3003731 into apache:main May 10, 2024
3 checks passed
@jpountz jpountz deleted the index_input_prefetch branch May 10, 2024 11:42
jpountz added a commit to jpountz/lucene that referenced this pull request May 17, 2024
As Robert pointed out and benchmarks confirmed, there is some (small) overhead
to calling `madvise` via the foreign function API, benchmarks suggest it is in
the order of 1-2us. This is not much for a single call, but may become
non-negligible across many calls. Until now, we only looked into using
prefetch() for terms, skip data and postings start pointers which are a single
prefetch() operation per segment per term.

But we may want to start using it in cases that could result into more calls to
`madvise`, e.g. if we start using it for stored fields and a user requests 10k
documents. In apache#13337, Robert wondered if we could take advantage of `mincore()`
to reduce the overhead of `IndexInput#prefetch()`, which is what this PR is
doing.

For now, this is trying to not add new APIs. Instead, `IndexInput#prefetch`
tracks consecutive hits on the page cache and calls `madvise` less and less
frequently under the hood as the number of cache hits increases.
jpountz added a commit that referenced this pull request May 21, 2024
…AM. (#13381)

As Robert pointed out and benchmarks confirmed, there is some (small) overhead
to calling `madvise` via the foreign function API, benchmarks suggest it is in
the order of 1-2us. This is not much for a single call, but may become
non-negligible across many calls. Until now, we only looked into using
prefetch() for terms, skip data and postings start pointers which are a single
prefetch() operation per segment per term.

But we may want to start using it in cases that could result into more calls to
`madvise`, e.g. if we start using it for stored fields and a user requests 10k
documents. In #13337, Robert wondered if we could take advantage of `mincore()`
to reduce the overhead of `IndexInput#prefetch()`, which is what this PR is
doing via `MemorySegment#isLoaded()`.

`IndexInput#prefetch` tracks consecutive hits on the page cache and calls
`madvise` less and less frequently under the hood as the number of consecutive
cache hits increases.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants