Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for concurrent collection when size is greater than zero #98425

Merged
merged 10 commits into from Aug 15, 2023

Conversation

javanna
Copy link
Member

@javanna javanna commented Aug 13, 2023

Query phase support for concurrency when size > 0

The query phase supports so far concurrency only when size is set to 0 (despite inter-segment concurrency is not yet enabled in the query phase). This commit adds support for concurrent collector managers to the query phase when size is greater than zero (hits are requested), including when scroll is used. The only exception is field collapsing, which relies on custom collectors for which we don't have yet a corresponding collector manager.

Note: this commit does not enable concurrent collection for the query phase. The entire query phase still collects against a single slice, sequentially segment by segment.

Collector managers refactoring / suppliers removal

As part of this change, especially figuring out how to leverage the lucene collector managers for top score doc and top field doc collectors, many simplifications were made, that are explained as follows.

We currently create multiple collector managers for the query phase: one for the top docs collection, one for aggs, and both of them are wrapped into a query phase collector manager. Additionally, each one of those is wrapped into a ProfileCollectorManager if profiling is enabled. While this works, the 1:1 ratio between collectors and collector managers is not necessary and makes things quite hard to follow.

Also, TopDocsCollectorManagerFactory used to be a factory to create the right top docs collector based on the requested features, and it is now a factory of collector managers. We can simplify things and remove one level of indirection by converting it into a collector manager directly. Also, we use some suppliers to keep track of which collector provides what part of the search response (e.g. total hits, top docs, max score). This is because of the collectors wrapping: before the introduction of collector managers, it was difficult to extract the appropriate info from each collector after calling search, problem that was solved using suppliers. Collector managers are a good fit to replace these indirections, as they can return a value, which is the result of the reduction of the results obtained by each slice after collection.

This commit folds all of the different collector managers that we rely on for the query phase into a single one (called QueryPhaseCollectorManager): its newCollector method will create the appropriate collectors depending on the requested features (a bit like TopDocsCollectorManagerFactory used to do), but it will take care of the creation of all the collectors involved, depending on feature requested (size, scroll, collapse) or whether profiling was enabled. This allows us to have a single base collector manager that includes all the logic around creating collectors and reducing their results, as well as returning a concrete result which includes the top docs, terminated after flag, profiling tree if profiling was enabled which entirely removes the need for suppliers in multiple places.

There are no suppliers needed for total hits and top docs, as they are returned directly by the search method. There is no need for collector managers to hold state that is only set once reduce is called. Profiling was greatly simplified as well to no longer rely on suppliers for the profiler results. Instead, the collector results can simply be set to the query profiler itself after search returns. Additionally, ProfileCollectorManager used to have handling for children collectors for top docs and aggs, which was very hard to follow. This is no longer required because QueryPhaseCollectorManager takes care of it all (wrapping top docs, aggs, as well as the top level collector when profiling is enabled) and removes the need for wrapping of collector managers.

// this is a bit of a hack: it allows us to retrieve the last collector that newCollector has returned for sub-collector managers,
// so that we can provide them to InternalProfileCollector's constructor as children. This is fine as newCollector does not get called
// concurrently, but rather in advance before parallelizing the collection
private InternalProfileCollector profileCollector;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

query phase is the only place where we have a top level collector wrapped with a profile collector, as well as its inner collectors (for top docs and aggs) both wrapped with a profile collector. Thanks to removing collector manager wrapping, this is now directly handled in QueryPhaseCollectorManager within newCollector, and this complicated bit can be removed from ProfileCollectorManager, which is only used by the dfs phase. There we do need to wrap the collector manager because we rely on the lucene top docs collector managers.

*/
private Supplier<CollectorResult> collectorResultSupplier;
private CollectorResult collectorResult;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This supplier was really only needed because it was difficult to keep track of the collector (or manager) that provides us with the top level collector result tree. As that is now part of the result returned by the search method, it is very simple to set the result after search has been called which allows us to remove the need for the supplier.

@@ -81,43 +80,4 @@ boolean isThresholdReached() {
return numCollected.getAcquire() >= totalHitsThreshold;
}
}

static class CollectorManager implements org.apache.lucene.search.CollectorManager<PartialHitCountCollector, Void> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This functionality is now folded into QueryPhaseCollectorManager.EmptyHits. This small collector manager has been moved to PartialHitCountCollectorTests to allow for testing this collector in concurrent scenarios (as opposed to testing the collector only in sequential collection scenarios).

aggsCollectorManager,
searchContext.minimumScore()
searchContext,
hasFilterCollector
);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of the wrapping is no longer necessary. Collectors are still wrapped when necessary, but there's a single collector manager taking care of that, including profiling.

if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER && queryResult.terminatedEarly() == null) {
queryResult.terminatedEarly(false);
}
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rather than having this method return a QueryPhaseResult, I folded it into its caller method, as that was the only caller for it.

@@ -52,10 +49,6 @@ public final class QueryPhaseCollector implements Collector {
private final boolean cacheScores;
private boolean terminatedAfter = false;

QueryPhaseCollector(Collector topDocsCollector, Weight postFilterWeight, int terminateAfter, Collector aggsCollector, Float minScore) {
this(topDocsCollector, postFilterWeight, resolveTerminateAfterChecker(terminateAfter), aggsCollector, minScore);
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this as it was needed only for sequential scenarios. In fact the terminate after checker needs to be shared across the different collectors that the same instance of the collector manager creates.

* Wraps two {@link org.apache.lucene.search.CollectorManager}s: one required for top docs collection, and another one optional for
* aggs collection. Applies terminate_after consistently across the different collectors by sharing an atomic counter of collected docs.
*/
static class CollectorManager implements org.apache.lucene.search.CollectorManager<QueryPhaseCollector, Void> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This functionality is now part of QueryPhaseCollectorManager. This collector manager has been simplified and moved to QueryPhaseCollectorTests to allow for testing QueryPhaseCollector in concurrent scenarios, as opposed to testing it only in sequential scenarios.

protected abstract Collector newTopDocsCollector() throws IOException;

@Override
public final QueryPhaseResult reduce(Collection<Collector> collectors) throws IOException {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The casting required in this method is perhaps the only cons I see around removing collector managers wrapping, as you can have many different types of collectors created, that need to be reduced. On the other hand, the structure is quite predictable, and it depends on the logic that newCollector includes above. I think that is reasonable and the pros of removing the wrapping outweigh the need for casting collectors in this method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not ideal, but I agree with the trade-off here to significantly reduce the abstractions.

Copy link
Member Author

@javanna javanna Aug 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also comes from the collector manager design: you create collector, and get the same ones as arguments to reduce. It would probably make things easier to just keep track of the collectors that were created and just reduce those, then you don't need casting etc. that feels though not very aligned with the Lucene design. Perhaps this is something that we could look at simplifying in Lucene, though there are possibly cases where not all collectors that were created are reduced.

* or a {@link TermQuery} and the <code>reader</code> has no deletions,
* -1 otherwise.
*/
static int shortcutTotalHitCount(IndexReader reader, Query query) throws IOException {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is nothing new, it was copied with no changes form TopDocsCollectorManagerFactory. Nothing to look at.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we planning on removing this in favour of Weight.count() / PartialHitCountCollector at some point?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cannot tell you how much time I spent trying to do just that, without luck. When size is 0 (partial hit count collector) we are already off of it, so I was at least successful at that :) But we are not when size is greater than 0. Problem is that hits need to be retrieved up until size, and at the same time track_total_hits needs to be honoured. In short, the lucene top docs collectors don't support retrieving the count from the weight, and it's quite tricky to add that as it does not play well with dynamic pruning, I think.

One option is though to split the top docs collection in two collectors, once for total hits (which can then be partial hit count collector) and one for top docs only. That may be easier than it was when I tried it because we have all the logic in QueryPhaseCollector and I may not need to use a MultiCollector, which would mess up the score mode etc. I may have a look at this again soon.

import java.util.Collection;

/** {@link CollectorManager} that runs in a non-concurrent search. */
public class SingleThreadCollectorManager implements CollectorManager<Collector, Void> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no more need for single threaded collector managers. The only place where we have that need is when field collapsing is used, for which we assert that we do single slices collection directly in QueryPhaseCollectorManager#forCollapsing

private final Collector collector;
private final Supplier<TotalHits> totalHitsSupplier;
private final Supplier<TopDocs> topDocsSupplier;
private final Supplier<Float> maxScoreSupplier;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These three suppliers are no longer needed, which feels great. They were used to set the right values within TopDocsAndMaxScore, which they were coming from different places depending on input parameters. It was previously difficult to trace which collector was providing them and how, due to the wrapping in different places. Now, we can have different subclasses of QueryPhaseCollectorManager (similar to what we previously has in TopDocsCollectorManagerFactory) that provide values depending on their logic, and those values become automatically part of the end result that the collector manager reduces which the search method returns.

This is the main reason to refactor the query phase like this PR does, as part of introducing support for concurrent collection to it. It felt really bad to convert to concurrent collector manager and keep the suppliers around.

@@ -142,110 +137,4 @@ public void testManagerWithSearcher() throws IOException {
assertTrue(result.getTime() > 0);
}
}

public void testManagerWithChildren() throws IOException {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These were testing support for children collectors, which ProfileCollectorManager no longer support

@javanna javanna added >enhancement :Search/Search Search-related issues that do not fall into other categories labels Aug 14, 2023
@javanna javanna marked this pull request as ready for review August 14, 2023 07:12
@elasticsearchmachine elasticsearchmachine added the Team:Search Meta label for search team label Aug 14, 2023
@elasticsearchmachine
Copy link
Collaborator

Hi @javanna, I've created a changelog YAML for you.

@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-search (Team:Search)

Copy link
Contributor

@romseygeek romseygeek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM - this is a really nice cleanup/simplification. I think profiling becomes a lot simpler if/when we get something like LeafCollector.finish() which could remove a lot of the extra casting, but at least for now it's all in one method.

* or a {@link TermQuery} and the <code>reader</code> has no deletions,
* -1 otherwise.
*/
static int shortcutTotalHitCount(IndexReader reader, Query query) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we planning on removing this in favour of Weight.count() / PartialHitCountCollector at some point?

searcher.search(new MatchAllDocsQuery(), manager);
assertFalse(manager.isTerminatedAfter());
assertEquals(numDocs, topScoreDocAdapter.getResult().totalHits.value);
CollectorManager<QueryPhaseCollector, Result<TopDocs, Void>> manager = createCollectorManager(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small suggestion: these might be easier to read with var

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting: certainly less typing. Though I need to replace Void with Object somehow. It is less heavy, but is it easier to read given that it's not entirely clear which type I get?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't made this change for now. Can do that as a follow-up perhaps.

Copy link
Contributor

@JVerwolf JVerwolf left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't able to do a full round today, just left a single comment.

Comment on lines 1107 to 1108
return source == null || source.collapse() == null || source.aggregations() == null
|| source.aggregations().supportsParallelCollection();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocking: should this logic belong to the source, e.g. source.supportsParallelCollection() (by the same virtue of source.aggregations().supportsParallelCollection())?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps yes, but it's a single situation for top docs collection compared to aggs where some of them don't support concurrency. I felt like it may be overkill to add a supportParallelCollection to SearchSourceBuilder. Maybe though that's a good idea, let me play with it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, this made me realize that the conditional was wrong :) and that if I move it to SearchSourceBuilder I can test it separately from enabling parallel collection in the query phase (this method in SearchService can stay unchanged).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here you go: d239f0b

Copy link
Contributor

@jdconrad jdconrad left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! This PR really illustrates how invasive profiling is once again, but the amount of work reduced around child profilers is fantastic.

@@ -1104,7 +1104,8 @@ static boolean supportsParallelCollection(ResultsType resultsType, SearchSourceB
/*
TODO uncomment this block to enable inter-segment concurrency for the query phase
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there already a github issue that this TODO can be linked to?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not an issue, but a PR that @JVerwolf is working on , see #98455

@javanna javanna merged commit 0df27ce into elastic:main Aug 15, 2023
12 checks passed
@javanna javanna deleted the wip/top_doc_manager branch August 15, 2023 12:31
JVerwolf pushed a commit to JVerwolf/elasticsearch that referenced this pull request Aug 15, 2023
…lastic#98425)

The query phase supports so far concurrency only when size is set to 0 (despite inter-segment concurrency is not yet enabled in the query phase). This commit adds support for concurrent collector managers to the query phase when size is greater than zero (hits are requested), including when scroll is used. The only exception is field collapsing, which relies on custom collectors for which we don't have yet a corresponding collector manager.

Note: this commit does not enable concurrent collection for the query phase. The entire query phase still collects against a single slice, sequentially segment by segment.

Collector managers refactoring / suppliers removal
As part of this change, especially figuring out how to leverage the lucene collector managers for top score doc and top field doc collectors, many simplifications were made, that are explained as follows.

We currently create multiple collector managers for the query phase: one for the top docs collection, one for aggs, and both of them are wrapped into a query phase collector manager. Additionally, each one of those is wrapped into a ProfileCollectorManager if profiling is enabled. While this works, the 1:1 ratio between collectors and collector managers is not necessary and makes things quite hard to follow.

Also, TopDocsCollectorManagerFactory used to be a factory to create the right top docs collector based on the requested features, and it is now a factory of collector managers. We can simplify things and remove one level of indirection by converting it into a collector manager directly. Also, we use some suppliers to keep track of which collector provides what part of the search response (e.g. total hits, top docs, max score). This is because of the collectors wrapping: before the introduction of collector managers, it was difficult to extract the appropriate info from each collector after calling search, problem that was solved using suppliers. Collector managers are a good fit to replace these indirections, as they can return a value, which is the result of the reduction of the results obtained by each slice after collection.

This commit folds all of the different collector managers that we rely on for the query phase into a single one (called QueryPhaseCollectorManager): its newCollector method will create the appropriate collectors depending on the requested features (a bit like TopDocsCollectorManagerFactory used to do), but it will take care of the creation of all the collectors involved, depending on feature requested (size, scroll, collapse) or whether profiling was enabled. This allows us to have a single base collector manager that includes all the logic around creating collectors and reducing their results, as well as returning a concrete result which includes the top docs, terminated after flag, profiling tree if profiling was enabled which entirely removes the need for suppliers in multiple places.

There are no suppliers needed for total hits and top docs, as they are returned directly by the search method. There is no need for collector managers to hold state that is only set once reduce is called. Profiling was greatly simplified as well to no longer rely on suppliers for the profiler results. Instead, the collector results can simply be set to the query profiler itself after search returns. Additionally, ProfileCollectorManager used to have handling for children collectors for top docs and aggs, which was very hard to follow. This is no longer required because QueryPhaseCollectorManager takes care of it all (wrapping top docs, aggs, as well as the top level collector when profiling is enabled) and removes the need for wrapping of collector managers.
csoulios pushed a commit to csoulios/elasticsearch that referenced this pull request Aug 18, 2023
…lastic#98425)

The query phase supports so far concurrency only when size is set to 0 (despite inter-segment concurrency is not yet enabled in the query phase). This commit adds support for concurrent collector managers to the query phase when size is greater than zero (hits are requested), including when scroll is used. The only exception is field collapsing, which relies on custom collectors for which we don't have yet a corresponding collector manager.

Note: this commit does not enable concurrent collection for the query phase. The entire query phase still collects against a single slice, sequentially segment by segment.

Collector managers refactoring / suppliers removal
As part of this change, especially figuring out how to leverage the lucene collector managers for top score doc and top field doc collectors, many simplifications were made, that are explained as follows.

We currently create multiple collector managers for the query phase: one for the top docs collection, one for aggs, and both of them are wrapped into a query phase collector manager. Additionally, each one of those is wrapped into a ProfileCollectorManager if profiling is enabled. While this works, the 1:1 ratio between collectors and collector managers is not necessary and makes things quite hard to follow.

Also, TopDocsCollectorManagerFactory used to be a factory to create the right top docs collector based on the requested features, and it is now a factory of collector managers. We can simplify things and remove one level of indirection by converting it into a collector manager directly. Also, we use some suppliers to keep track of which collector provides what part of the search response (e.g. total hits, top docs, max score). This is because of the collectors wrapping: before the introduction of collector managers, it was difficult to extract the appropriate info from each collector after calling search, problem that was solved using suppliers. Collector managers are a good fit to replace these indirections, as they can return a value, which is the result of the reduction of the results obtained by each slice after collection.

This commit folds all of the different collector managers that we rely on for the query phase into a single one (called QueryPhaseCollectorManager): its newCollector method will create the appropriate collectors depending on the requested features (a bit like TopDocsCollectorManagerFactory used to do), but it will take care of the creation of all the collectors involved, depending on feature requested (size, scroll, collapse) or whether profiling was enabled. This allows us to have a single base collector manager that includes all the logic around creating collectors and reducing their results, as well as returning a concrete result which includes the top docs, terminated after flag, profiling tree if profiling was enabled which entirely removes the need for suppliers in multiple places.

There are no suppliers needed for total hits and top docs, as they are returned directly by the search method. There is no need for collector managers to hold state that is only set once reduce is called. Profiling was greatly simplified as well to no longer rely on suppliers for the profiler results. Instead, the collector results can simply be set to the query profiler itself after search returns. Additionally, ProfileCollectorManager used to have handling for children collectors for top docs and aggs, which was very hard to follow. This is no longer required because QueryPhaseCollectorManager takes care of it all (wrapping top docs, aggs, as well as the top level collector when profiling is enabled) and removes the need for wrapping of collector managers.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
>enhancement :Search/Search Search-related issues that do not fall into other categories Team:Search Meta label for search team v8.10.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants