Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First step towards incremental reduction of query responses #23253

Merged
merged 19 commits into from Feb 21, 2017

Conversation

@s1monw
Copy link
Contributor

commented Feb 19, 2017

Today all query results are buffered up until we received responses of
all shards. This can hold on to a significant amount of memory if the number of
shards is large. This commit adds a first step towards incrementally reducing
aggregations results if a, per search request, configurable amount of responses
are received. If enough query results have been received and buffered all so-far
received aggregation responses will be reduced and released to be GCed.

this PR really needs some reviews and potential discussions but it's a start and outlines what it takes to make this feature work

First step towards incremental reduction of query responses
Today all query results are buffered up until we received responses of
all shards. This can hold on to a significant amount of memory if the number of
shards is large. This commit adds a first step towards incrementally reducing
aggregations results if a, per search request, configurable amount of responses
are received. If enough query results have been received and buffered all so-far
received aggregation responses will be reduced and released to be GCed.
@s1monw

This comment has been minimized.

Copy link
Contributor Author

commented Feb 20, 2017

@elasticmachine test this please

@jpountz
Copy link
Contributor

left a comment

I like this change, I expected it to be more complex than that so this is a good surprise to me! I left some picky coments about naming and comments to make this change a bit easier to read. I think the interesting question is about how many buckets intermediate reduces for terms (or geo-hash) aggregations should produce.

*/
public Stream<Result> stream() {
return results.asList().stream().map(e -> e.value);
}

This comment has been minimized.

Copy link
@jpountz

jpountz Feb 20, 2017

Contributor

Not sure how to address it but when I see both a size() and stream() method on a class, I tend to expect that the stream wraps size elements. I wonder whether we should make naming a bit more explicit to avoid this potential confusion.

if (buffer != null) {
InternalAggregations aggregations = (InternalAggregations) querySearchResult.consumeAggs();
// once the size is incremented to the length of the buffer we know all elements are added
// we also have happens before guarantees due to the memory barrier of the size write

This comment has been minimized.

Copy link
@jpountz

jpountz Feb 20, 2017

Contributor

is this comment outdated?

This comment has been minimized.

Copy link
@s1monw

s1monw Feb 20, 2017

Author Contributor

yeah I had a complex solution first with non-blocking concurrency etc. I didn't go with it apparently

}

/**
* Returns <code>true</code> iff the current reduce phase is the final reduce phase. This indicated if operations like

This comment has been minimized.

Copy link
@jpountz

jpountz Feb 20, 2017

Contributor

s/indicated/indicates/

}
logger.info("test failed. trying to see if it recovers after 1m.", ae);
try {
Thread.sleep(60000);

This comment has been minimized.

Copy link
@jpountz

jpountz Feb 20, 2017

Contributor

!!!

This comment has been minimized.

Copy link
@s1monw

s1monw Feb 20, 2017

Author Contributor

dude!

@@ -228,7 +228,7 @@ public InternalAggregation doReduce(List<InternalAggregation> aggregations, Redu
}
}

final int size = Math.min(requiredSize, buckets.size());
final int size = reduceContext.isFinalReduce() == false ? buckets.size() : Math.min(requiredSize, buckets.size());

This comment has been minimized.

Copy link
@jpountz

jpountz Feb 20, 2017

Contributor

Since one of the goals of this change is to limit memory usage, I wonder whether this should use getShardSize() rather than buckets.size(), this should be a good trade-off between accuracy and memory usage? cc @colings86

This comment has been minimized.

Copy link
@colings86

colings86 Feb 20, 2017

Member

In theory I think this would be a good change to make, however I think we should do it in a separate PR as it may require the error calculations to be tweaked a bit to be correct.

final SearchProfileShardResults shardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults);
return new ReducedQueryPhase(totalHits, fetchHits, maxScore, timedOut, terminatedEarly, firstResult, suggest, aggregations,
shardResults);
}


private InternalAggregations reduceAggsOnly(List<InternalAggregations> aggregationsList) {

This comment has been minimized.

Copy link
@jpountz

jpountz Feb 20, 2017

Contributor

maybe update the name or add a comment to say that this method is about performing an intermediate reduce? (as opposed to final)

/**
* Reduces the given query results and consumes all aggregations and profile results.
* @see QuerySearchResult#consumeAggs()
* @see QuerySearchResult#consumeProfileResult()
*/
public final ReducedQueryPhase reducedQueryPhase(List<? extends AtomicArray.Entry<? extends QuerySearchResultProvider>> queryResults) {
public final ReducedQueryPhase reducedQueryPhase(List<? extends AtomicArray.Entry<? extends QuerySearchResultProvider>> queryResults,
List<InternalAggregations> reducedAggs) {

This comment has been minimized.

Copy link
@jpountz

jpountz Feb 20, 2017

Contributor

Can you add a comment to explain that reducedAggs is the result from intermediate reduce operations?

aggregationsList = new ArrayList<>(queryResults.size());
} else {
aggregationsList = reducedAggs == null ? Collections.emptyList() : reducedAggs;
}

This comment has been minimized.

Copy link
@jpountz

jpountz Feb 20, 2017

Contributor

I think hasAggs is a confusing name given how this method was refactored. Maybe we should restructure the logic a bit with comments to explain what each case maps to? eg. something like

if (reducedAggs != null) {
  // we already have results from intermediate reduces and just need to perform the final reduce
  assert firstResult.hasAggs();
} else if (firstResult.hasAggs()) {
  // the number of shards was less than the buffer size so we reduce agg results directly
} else {
  // no aggregations
}
@s1monw

This comment has been minimized.

Copy link
Contributor Author

commented Feb 20, 2017

@elasticmachine would you bother to test this

@s1monw

This comment has been minimized.

Copy link
Contributor Author

commented Feb 20, 2017

@jpountz I pushed some changes

@jpountz

This comment has been minimized.

Copy link
Contributor

commented Feb 20, 2017

Looks great. Do you have any opinions about the size to use for intermediate reduces of terms aggs? I'm good with pulling this change in and making it a follow-up, this change is already a net improvement as-is.

@colings86
Copy link
Member

left a comment

LGTM, I left a couple of minor comments

@@ -46,6 +46,8 @@
*/
public class SearchRequestBuilder extends ActionRequestBuilder<SearchRequest, SearchResponse, SearchRequestBuilder> {

private int reduceUpTo;

This comment has been minimized.

Copy link
@colings86

colings86 Feb 20, 2017

Member

Is this used? It looks below like we set this directly on the request?

@@ -228,7 +228,7 @@ public InternalAggregation doReduce(List<InternalAggregation> aggregations, Redu
}
}

final int size = Math.min(requiredSize, buckets.size());
final int size = reduceContext.isFinalReduce() == false ? buckets.size() : Math.min(requiredSize, buckets.size());

This comment has been minimized.

Copy link
@colings86

colings86 Feb 20, 2017

Member

In theory I think this would be a good change to make, however I think we should do it in a separate PR as it may require the error calculations to be tweaked a bit to be correct.

s1monw added some commits Feb 20, 2017

@s1monw

This comment has been minimized.

Copy link
Contributor Author

commented Feb 20, 2017

@jpountz I will open followup for the following things:

  • size of terms aggs in incremental reduce #23285
  • fix error calculation in terms aggs #23286
  • exposing the batch size in REST (not part of this PR) #23288
  • removing / raising our softlimit for num of shards in a search request
  • using the same technique for profiling results

I think we should just pull this one in without adding more stuff to it

@jpountz

This comment has been minimized.

Copy link
Contributor

commented Feb 20, 2017

I think we should just pull this one in without adding more stuff to it

yes

@s1monw s1monw merged commit f933f80 into elastic:master Feb 21, 2017

1 of 2 checks passed

elasticsearch-ci Build finished.
Details
CLA Commit author is a member of Elasticsearch
Details

@s1monw s1monw deleted the s1monw:incremental_reduce branch Feb 21, 2017

s1monw added a commit to s1monw/elasticsearch that referenced this pull request Feb 21, 2017

Expose `batched_reduce_size` via `_search`
In elastic#23253 we added an the ability to incrementally reduce search results.
This change exposes the parameter to control the batch since and therefore
the memory consumption of a large search request.

s1monw added a commit that referenced this pull request Feb 21, 2017

Fix incremental reduce randomization in base tests cases
We can and should randomly reduce down to a single result before
we passing the aggs to the final reduce. This commit changes the logic
to do that and ensures we don't trip the assertions the previous imple tripped.

Relates to #23253

s1monw added a commit that referenced this pull request Feb 21, 2017

Never reduce the same agg twice
Some randomization caused reduction of the same agg multiple times
which causes issues on some aggregations.

Relates to #23253

jimczi added a commit that referenced this pull request Feb 21, 2017

Fix comparaison of double in InternalTopHits
InternalTopHits uses "==" to compare hit scores and fails when score is NaN.
This commit changes the comparaison to always use Double.compare.

Relates #23253

s1monw added a commit that referenced this pull request Feb 21, 2017

Expose `batched_reduce_size` via `_search` (#23288)
In #23253 we added an the ability to incrementally reduce search results.
This change exposes the parameter to control the batch since and therefore
the memory consumption of a large search request.

s1monw added a commit to s1monw/elasticsearch that referenced this pull request Feb 22, 2017

Remove BWC layer for number of reduce phases
Both PRs below have been backported to 5.4 such that we can enable
BWC tests of this feature as well as remove version dependend serialization
for search request / responses.

Relates to elastic#23288
Relates to elastic#23253

s1monw added a commit that referenced this pull request Feb 22, 2017

First step towards incremental reduction of query responses (#23253)
Today all query results are buffered up until we received responses of
all shards. This can hold on to a significant amount of memory if the number of
shards is large. This commit adds a first step towards incrementally reducing
aggregations results if a, per search request, configurable amount of responses
are received. If enough query results have been received and buffered all so-far
received aggregation responses will be reduced and released to be GCed.

s1monw added a commit that referenced this pull request Feb 22, 2017

Fix incremental reduce randomization in base tests cases
We can and should randomly reduce down to a single result before
we passing the aggs to the final reduce. This commit changes the logic
to do that and ensures we don't trip the assertions the previous imple tripped.

Relates to #23253

s1monw added a commit that referenced this pull request Feb 22, 2017

Expose `batched_reduce_size` via `_search` (#23288)
In #23253 we added an the ability to incrementally reduce search results.
This change exposes the parameter to control the batch since and therefore
the memory consumption of a large search request.

s1monw added a commit that referenced this pull request Feb 22, 2017

Remove BWC layer for number of reduce phases (#23303)
Both PRs below have been backported to 5.4 such that we can enable
BWC tests of this feature as well as remove version dependend serialization
for search request / responses.

Relates to #23288
Relates to #23253

@epixa epixa referenced this pull request Apr 3, 2017

Closed

Cross-cluster search support #11011

3 of 3 tasks complete
@IdanWo

This comment has been minimized.

Copy link

commented Jul 21, 2017

If I understand it right, the motivation here is to make several small top-10 calculations in the coordinating node, instead of making a single large calculation in the end when all the responses are available? Does this change effect the accuracy of the terms aggregation, as in oppose to the previous approach?

@jpountz

This comment has been minimized.

Copy link
Contributor

commented Jul 21, 2017

Yes, it potentially reduces the accuracy of terms aggregations.

Note that combined with #25658, this change only starts reducing accuracy of the terms aggregations if more than 512 shards have matches. So say that you query 1000 shards but only 300 of them have matches, the accuracy will be the same as today.

@IdanWo

This comment has been minimized.

Copy link

commented Jul 21, 2017

Okay, got it. Sounds cool. It means that this behavior is activated in only specific conditions? Or that this is the new behavior, and it will reduce accuracy in only some conditions?

By the way, will the user see this inaccuracy in the error bounds? (sum_other_doc_count, doc_count_error_upper_bound)? And if not, will this configuration be documented? I believe there should be a page for "tune for search accuracy", while there is a page for "tune for search speed" or "tune for disk usage".

I understand that there's nothing to do when making a request to so many shards at once, but I don't like the approach of "scaling out, performance and latency are always much more important factors than accuracy". No one mentioned that less accurate results will come here, in Elasticsearch 5.4.0 released blog post:

That said, it is quite easy to reach the 1,000 shard limit, especially with the recent release of Cross Cluster Search. As of 5.4.0, Top-N search results and aggregations are reduced in batches of 512, which puts an upper limit on the amount of memory used on the coordinating node, which has allowed us to set the shard soft limit to unlimited by default.

@jpountz

This comment has been minimized.

Copy link
Contributor

commented Jul 21, 2017

Will the user see this inaccuracy in the error bounds?

Yes.

I understand that there's nothing to do when making a request to so many shards at once, but I don't like the approach of "scaling out, performance and latency are much more important factors than accuracy".

I would agree it would be discussable if that was only about performance and latency, but to me this is mostly about cluster stability, which I consider more important than the accuracy of terms aggs.

In my opinion the number of users that are affected by decreased accuracy of terms aggs is low enough that mentioning it in the release notes would be more confusing than helping.

@colings86

This comment has been minimized.

Copy link
Member

commented Jul 21, 2017

@jpountz @IdanWo Actually I am pretty sure that we don't lose any accuracy on the terms aggregation because of incremental reduce because we do not truncate the list of terms until we are doing the 'final' incremental reduction (see https://github.com/elastic/elasticsearch/blob/master/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java#L284) so the accuracy should not be affected. We do have #23285 open to explore truncating the list during the other incremental reductions though and this would indeed affect the accuracy of the terms aggregation if implemented.

@jpountz

This comment has been minimized.

Copy link
Contributor

commented Jul 21, 2017

@colings86 Oh right, I remember we discussed it but had forgotten which approach we took. Thanks for clarifying!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.