-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Decouple within-query concurrency from the index's segment geometry [LUCENE-8675] #9721
Comments
Adrien Grand (@jpountz) (migrated from JIRA) The best way to address such issues is on top of Lucene by having multiple shards whose results can be merged with TopDocs#merge. Parallelizing based on ranges of doc IDs is problematic for some queries, for instance the cost of evaluating a range query over an entire segment or only about a specific range of doc IDs is exactly the same given that it uses data-structures that are organized by value rather than by doc ID. |
Atri Sharma (@atris) (migrated from JIRA) Thanks for the comments. Having a multi shard approach makes sense, but a search is still bottlenecked by the largest segment it needs to scan. If there are many segments of that type, that might become a problem. While I agree that range queries might not be directly benefited from parallel scans, but other queries (such as TermQueries) might be benefitted from a segment parallel scan. In a typical ElasticSearch interactive query, we see spikes when a large segment is hit for an interactive use case. Such cases can be optimized with parallel scans. We should have a method of deciding whether a scan should be parallelized or not, and then let the execution operator get a set of nodes to execute. That is probably outside the scope of this JIRA, but I wanted to open this thread to get the conversation going. |
Michael McCandless (@mikemccand) (migrated from JIRA) I think it'd be interesting to explore intra-segment parallelism, but I agree w/ @jpountz that there are challenges :) If you pass an But if you do have many segments, this can give a nice reduction to query latencies when QPS is well below the searcher's red-line capacity (probably at the expense of some hopefully small loss of red-line throughput because of the added overhead of thread scheduling). For certain use cases (large index, low typical query rate) this is a powerful approach. It's true that one can also divide an index into more shards and run each shard concurrently but then you are also multiplying the fixed query setup cost which in some cases can be relatively significant.
Yeah that's a real problem – these queries traverse the BKD tree per-segment while creating the scorer, which is/can be the costly part, and then produce a bit set which is very fast to iterate over. This phase is not separately visible to the caller, unlike e.g. rewrite that MultiTermQueries use to translate into simpler queries, so it'd be tricky to build intra-segment concurrency on top ... |
Adrien Grand (@jpountz) (migrated from JIRA) If some segments are getting large enough that intra-segment parallelism becomes appealing, then maybe an easier and more efficient way to increase parallelism is to instead reduce the maximum segment size so that inter-segment parallelism has more potential for parallelizing query execution. |
Atri Sharma (@atris) (migrated from JIRA)
Would that not lead to a much higher number of segments than required? That could lead to issues such as a lot of open file handles and too many threads required for scanning (although we would assign multiple small segments to a single thread). Thanks for the point about range queries, that is an important thought. I will follow up with a separate patch on top of this which will do the first phase of BKD iteration and share the generated bitset across N parallel threads, where N is equal to the remaining clauses and each thread intersects a clause with the bitset. |
Michael McCandless (@mikemccand) (migrated from JIRA)
Yeah that is a good workaround given how Lucene works today. It's essentially the same as your original suggestion ("make more shards and search them concurrently"), just at the segment instead of shard level. But this still adds some costs – the per-segment fixed cost for each query. That cost should be less than the per shard fixed cost in the sharded case, but it's still adding some cost. If instead Lucene had a way to divide large segments into multiple work units (and I agree there are challenges with that! – not just BKD and multi-term queries, but e.g. how would early termination work?) then we could pay that per-segment fixed cost once for such segments then let multiple threads share the variable cost work of finding and ranking hits. In our recently launched production index we see sizable jumps in the P99+ query latencies when a large segment merges finish and replicate, because we are using "thread per segment" concurrency that we are hoping we could improve by pushing thread concurrency into individual large segments. |
Atri Sharma (@atris) (migrated from JIRA) Here are the results of luceneutil (patched to generate P50 and P90 and to run concurrent searching within IndexSearcher. Patch is posted to luceneutil repo). Adrien has a valid point about costly scorers not benefitting from this approach. Specifically, range queries can take a hit since BKD Tree's scorer is two phase and is expensive to construct, so doing them per portion of a segment would lead to increase in latency, as is evident from the increase in P90 latency in the above results. I am spending time to evaluate how to tackle this problem and will post any thoughts that I see as viable. These benchmarks are targeted to measure the changes in the "happy" path i.e. the targeted big index sizes and low QPS cases. Luceneutil was configured accordingly (low number of search threads, impacts turned off) In summary, the queries scanning a higher amount of data and having higher read latencies tend to have the maximum improvement. Term queries and queries involving term queries on higher frequency terms get a reasonable latency reduction. The following are P50 and P90 latencies calculated by Luceneutil. P50 Base is the P50 latency of the base, P50 Cmp is the P50 latency of the competitor (patched version), and the same for P90. Note: The QPS jumps are not real. Since Luceneutil was congigured to run a single searcher thread, QPS jump is proportional to the latency drop for task. Luceneutil results: https://gist.github.com/atris/9a06d511fdfa9de1b48b47e09d5ab8d2 I have attached the P50 and P90 latency graphs for high frequency phrase and term queries. It is apparent that queries with high frequency terms have sizeable improvements. To address Adrien's point, I have some ideas to improve performance of BKD tree scorer for this case, will open a separate JIRA issue and link here. @jpountz Are there any other concerns that you see here? Happy to address your feedback.
|
Atri Sharma (@atris) (migrated from JIRA) Repeating the earlier results in a more human readable form
|
Adrien Grand (@jpountz) (migrated from JIRA) I wonder if we could avoid paying the cost of Scorer/BulkScorer initialization multiple times by implementing Cloneable on these classes, similarly to how we use cloning on IndexInputs to consume them from multiple threads. It would require implementing Cloneable on a few other classes, e.g. PostingsEnum, and maybe we'd need to set some restrictions to keep this feature reasonable, e.g. it's only legal to clone when the current doc ID is -1. But this could help parallelize collecting a single segment by assigning each clone its own range of doc IDs. A downside of this approach is that it wouldn't help parallelize the initialization of Scorers, but I don't know if there is a way around it. |
Michael McCandless (@mikemccand) (migrated from JIRA)
+1 |
Thanks @jpountz for bringing this up as a feature for Lucene 10. It would be great to have if we can build it! A few of us in Amazon Product Search (+cc @stefanvodita , @slow-J) had been looking into it out of curiosity. We discussed implementing Cloneable for Scorer/BulkScorer as previously suggested, but it looks like even implementing it just for the TermQuery scorer requires significant code changes, as there are quite a few dependencies that have to be Cloneable too. Maybe there is a hybrid approach? For example, when concurrent segment search is being initialized, it can try calling clone() for Scorer/BulkScorer, but if it throws CloneNotSupportedException, we fall back to creating a new Scorer/BulkScorer instance? Then we can implement the clone method only for scorers that are expensive to initialize, e.g. range query scorers mentioned in the comments above. This approach still takes a lot of up-front effort. I’m curious if anyone has better ideas. The original patch seems to have been lost; it would have been really useful to have. Tagging @atris on the off-chance that he still has the patch or remembers what the implementation looked like. |
My gut feeling is that it would work internally like that, so that we would not have to migrate all queries in one go. But hopefully on the caller side, there would be a single API to call. Instead of the clone() approach, I wonder if we could also allow |
I started to play around with a hacky implementation that:
I don't claim this is a good API design, but it mostly just works and I just wanted to do the least work possible to enable this using (mostly) existing APIs to see what the impact would be on our application. I found the initial implementation in Lucene to be pretty straightforward, but in our application it seems we tie a lot of things to reader leaves that now have to be tied to searcher leaves, and these tend to be stored in arrays based on the number of leaves which now would not be viable - I expect an IndexSearcher might want to dynamically vary the way it slices an index? I guess the main learning I have from that is that we do want to have an explicit API change reflecting this - otherwise it is easy to fall into traps where everything compiles and seems fine, but things are actually quite broken - eg formerly you expected only a single thread to be accessing your per-leaf data structures, but now multiple ones can. Also @stefanvodita pointed me to Luca's branch javanna@6ca5680 which seems to have a somewhat cleaner API. |
but I guess as an execution strategy it kind of made sense to me -- is it really necessary to clone Scorers? Could we create new ones for each searcher-segment or do we think that is too costly due to initialization costs? |
There is another issue associated with this that I only realized when testing. When there are parent/child queries we need to take care not to split document blocks across leaves. I'm not quite sure how to approach this. Does IndexSearcher have access to a map of where doc block boundaries lie? And even if it does, taking those arbitrary bounds into account can be quite challenging in terms of selecting how to chop the index into intervals. |
I guess we can use the new "parent field" to enforce that intervals must end on a non-child document, and we have to refuse to attempt query execution over sub-leaf interval if there are doc blocks and no parent field, at least if the query is block-sensitive. I guess we will be enforcing that in Lucene 10. At first I thought we were not, but then realized I was testing with 9x! |
It depends on queries. For term queries, duplicating the overhead of looking up terms in the terms dict may be ok, but for multi-term queries and point queries that often compute the bit set of matches of the whole segment, this could significantly hurt throughput. Maybe it doesn't have to be this way for the first iteration (progress over perfection), but this feels important to me so that we don't have weird recommendations like "only enable intra-segment concurrency if you don't use multi-term or point queries". Related: in the long term, I'd like inter-segment search concurrency to be enabled by default (#11523 maybe something else we should consider for 10.0), and ideally intra-segment search concurrency too, which is another reason why I care about avoiding introducing major downsides vs. single-threaded search. |
One thing came up during my testing / messing around that I think could significantly affect the API we provide which is whether we want to bake in the algorithm for computing leaves/slices into the |
I would really like it if we could do this. It could be a very tight and effective feedback loop. |
Since IndexSearcher is very cheap to create, you could create a new |
I don't know our IndexSearcher looks a little heavy; I think some of that is our own doing and we could tease it apart, but isn't EG the query cache tied to the IndexSearcher? And we have some other kinds of caches like a TermContextCache. I guess we could find a way to persist these across multiple searchers ... maybe. Then there is this SearcherTaxonomyManager, which is how we get access to the searcher - I don't know if it can manage multiple searchers. I'd really like to see if we can provide some API-level controls on this outside the constructor |
I was thinking a bit about intra-segment concurrency this morning and got thinking specifically about multi-term, point, and vector queries that do most of their heavy-lifting up front (to the point where I've seen a bunch of profiles where relatively little time is spent actually iterating through DISIs). Those queries (or at least their ScorerSuppliers) "know" when they're going to be expensive, so it feels like they're in the best position to say "I should be parallelized". What if ScorerSupplier could take a reference to the IndexSearcher's executor and return a CompletableFuture for the Scorer? Something like TermQuery could return a "completed" future, while "expensive" scorers could be computed on another thread. It could be a quick and easy way to parallelize some of the per-segment computation. |
To add on to this, I was wondering if we could further extend the concurrent logic within a query. For example, in range queries today we traverse the BKD over the whole range. What if we could split the range and give them to an executor to intersect the range? Then we could construct the DISI through multiple threads. Similarly in a terms query, we could get each term to parallely create their BitSets/Iterators and then conjunction/disjunctions over them can happen all at once. |
I'd really like to keep intra-segment parallelism simple and stick to splitting the doc ID space, which is the most natural approach for queries that produce good iterators like term queries without a huge up-front cost. |
I opened an initial draft of my take at intra segment concurrency (#13542) . It needs quite a bit of work and discussion, but I hope it helps as start, hopefully getting intra-segment concurrency into Lucene 10, one can always hope :) |
Segment search is a single threaded operation today, which can be a bottleneck for large analytical queries which index a lot of data and have complex queries which touch multiple segments (imagine a composite query with range query and filters on top). This ticket is for discussing the idea of splitting a single segment into multiple threads based on mutually exclusive document ID ranges.
This will be a two phase effort, the first phase targeting queries returning all matching documents (collectors not terminating early). The second phase patch will introduce staged execution and will build on top of this patch.
Migrated from LUCENE-8675 by Atri Sharma (@atris), 1 vote, updated Aug 04 2022
Attachments: PhraseHighFreqP50.png, PhraseHighFreqP90.png, TermHighFreqP50.png, TermHighFreqP90.png
The text was updated successfully, but these errors were encountered: