New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SQL: Fix wrong results when sorting on aggregate #43154
Conversation
- Previously, when shorting on an aggregate function the bucket processing ended early when the explicit (LIMIT XXX) or the impliciti limit of 512 was reached. As a consequence, only a set of grouping buckets was processed and the results returned didn't reflect the global ordering. - Previously, the priority queue shorting method had an inverse comparison check and the final response from the priority queue was also returned in the inversed order because of the calls to the `pop()` method. Fixes: elastic#42851
Pinging @elastic/es-search |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch @matriv ;) , I left some minor comments but the fix looks good to me.
public void testHardLimitForSortOnAggregate() throws Exception { | ||
index("test", body -> body.field("a", 1).field("b", 2)); | ||
String commandResult = command("SELECT max(a) max FROM test GROUP BY b ORDER BY max LIMIT 10000"); | ||
assertEquals(START + "Bad request [[3;33;22mThe maximum LIMIT for aggregate sorting is [512], received [10000]" + END, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unrelated to this pr but I don't think we should treat ordered GROUP_BY differently than natural GROUP_BY (sorted by keys). We should use the same hard limit for both, the cost of an ordered GROUP_BY depends greatly on the cardinality of the groups and not so much on the size of the priority queue (assuming it remains reasonable ;)).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think I agree here. When we don't have an order by on aggregate then we don't need to keep this extra buffer in memory (PriorityQueue) in the SQL module, instead we have a cursor from ES to paginate the results, thus we don't need to apply a hard limit. Or maybe I'm missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The memory to keep the best buckets in the PriorityQueue should be identical to the one that is used in ES when we compute the top hits so it's only a question on where this memory is held. I understand that the priority queue in the client can increase the memory but it shouldn't make a big difference if the size is 512 or 1000 or even 10,000. Most of the time will be spent on paginating the buckets and a limit of 1 or 512 shouldn't make any difference.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thx, opened an issue to track it: #43168
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My thinking here is the other way around, we should increase the hard limit of the ordered group_by to match the one that we set to "normal" group_by. 10,000
seems a reasonable value IMO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, I misunderstood you here. Changed the issue. Thx!
Request request = new Request("PUT", "/test/_bulk"); | ||
request.addParameter("refresh", "true"); | ||
StringBuilder bulk = new StringBuilder(); | ||
for (int i = 1; i <= 5000; i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test might be slow, right ? I think you can check that a GROUP_BY on groups with a cardinality greater than the limit and the size works with less than 5000 documents. Is there a way to set the internal size that is used to paginate over the buckets in the sorted group_by ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not that slow and I used a size of 5000 to "escape" from any bugs regarding setting the fetch (defaults to 1000) that could hide this bug. If you still think it's too much I can change to something like 100, limit 20, fetch size 4.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, this is why I asked if you can modify the fetch size here. In such case 100
seems enough
@@ -248,31 +205,22 @@ private void doResponse(RowSet rowSet) { | |||
sendResponse(); | |||
} | |||
|
|||
private boolean consumeRowSet(RowSet rowSet) { | |||
private void consumeRowSet(RowSet rowSet) { | |||
// use a synchronized block for visibility purposes (there's no concurrency) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove the comment now that the synchronized block is gone ?
Thx @jimczi, addressed comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks @matriv
public void testHardLimitForSortOnAggregate() throws Exception { | ||
index("test", body -> body.field("a", 1).field("b", 2)); | ||
String commandResult = command("SELECT max(a) max FROM test GROUP BY b ORDER BY max LIMIT 10000"); | ||
assertEquals(START + "Bad request [[3;33;22mThe maximum LIMIT for aggregate sorting is [512], received [10000]" + END, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My thinking here is the other way around, we should increase the hard limit of the ordered group_by to match the one that we set to "normal" group_by. 10,000
seems a reasonable value IMO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work!
Left some comments.
Also, are there any tests that combine both ES ordering and client-side ordering?
@@ -40,18 +40,22 @@ | |||
} | |||
|
|||
// page size | |||
size = limit < 0 ? buckets.size() : Math.min(buckets.size(), limit); | |||
size = limit == -1 ? buckets.size() : Math.min(buckets.size(), limit); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the original code wasn't good?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will also work, but we introduced the -1
which means "no limit - need to process all buckets", and it's the only negative value we can get now. If you check here you'll see that when we want to stop we set remaining limit to 0
. So to sum up, I changed the check to == -1
for more clarity.
|
||
// If the limit is -1 then we have a local sorting (sort on aggregate function) that requires all the buckets | ||
// to be processed so we stop only when all data is exhausted. | ||
int remainingLimit = limit == -1 ? limit : limit - size >= 0 ? limit - size : 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add some round brackets to this nested ternary operators to make a bit more readable?
for (boolean hasRows = rrs.hasCurrentRow(); hasRows; hasRows = rrs.advanceRow()) { | ||
List<Object> row = new ArrayList<>(rrs.columnCount()); | ||
rrs.forEachResultColumn(row::add); | ||
// if the queue overflows and no limit was specified, bail out |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems this comment is not valid anymore, since you removed the return false;
exiting statement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's valid, since we throw an exception, so I'll just rephrase.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, I didn't look properly at onFailure
method. Please, ignore.
// if the queue overflows and no limit was specified, bail out | ||
if (data.insertWithOverflow(new Tuple<>(row, counter.getAndIncrement())) != null && noLimit) { | ||
onFailure(new SqlIllegalArgumentException( | ||
"The default limit [{}] for aggregate sorting has been reached; please specify a LIMIT", data.size())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default limit [{}] for aggregate....
- why is this data.size()
and not MAXIMUM_SIZE
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's the same actually, since we overflow then the size of the queue (initially set to MAXIMUM_SIZE) is reached. Will changed that though, for more clarity, thx!
ResultRowSet<?> rrs = (ResultRowSet<?>) rowSet; | ||
synchronized (data) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why removing the synchronized
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no concurrency here, synchronized will add locking overhead without need.
} | ||
|
||
@SuppressWarnings("rawtypes") | ||
public void testAggSorting_TwoFields() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be difficult to add a more randomized set of tests? For example, 3 comparators with random ASC/DESC ordering and a more-or-less random short set of int values (random between 0 and 1 for all three values).
For example:
1,2,3,4,5,6,7
0,0,1,0,1,1,0 ASC
0,0,0,0,1,1,0 ASC
0,1,1,0,0,1,1 DESC
Would yield:
2 (0,0,1)
7 (0,0,1)
1 (0,0,0)
4 (0,0,0)
3 (1,0,1)
6 (1,1,1)
5 (1,1,0)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thx for the suggestion! Added the test which provides much more confidence regarding implementation correctness.
@astefan Thanks for review, addressed comments.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great test. LGTM. Left one small comment.
assertEquals(queueSize, results.size()); | ||
expected.sort((o1, o2) -> { | ||
for (int j = 0; j < noColumns; j++) { | ||
if (ordering[j]) { // asc |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can the content of the loop be changed to the following?
if (o1.get(j) < o2.get(j)) {
return ordering[j] ? -1 : 1;
} else if (o1.get(j) > o2.get(j)) {
return ordering[j] ? 1 : -1;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, thx! Applied it.
- Previously, when shorting on an aggregate function the bucket processing ended early when the explicit (LIMIT XXX) or the impliciti limit of 512 was reached. As a consequence, only a set of grouping buckets was processed and the results returned didn't reflect the global ordering. - Previously, the priority queue shorting method had an inverse comparison check and the final response from the priority queue was also returned in the inversed order because of the calls to the `pop()` method. Fixes: #42851 (cherry picked from commit 19909ed)
- Previously, when shorting on an aggregate function the bucket processing ended early when the explicit (LIMIT XXX) or the impliciti limit of 512 was reached. As a consequence, only a set of grouping buckets was processed and the results returned didn't reflect the global ordering. - Previously, the priority queue shorting method had an inverse comparison check and the final response from the priority queue was also returned in the inversed order because of the calls to the `pop()` method. Fixes: #42851 (cherry picked from commit 19909ed)
- Previously, when shorting on an aggregate function the bucket processing ended early when the explicit (LIMIT XXX) or the impliciti limit of 512 was reached. As a consequence, only a set of grouping buckets was processed and the results returned didn't reflect the global ordering. - Previously, the priority queue shorting method had an inverse comparison check and the final response from the priority queue was also returned in the inversed order because of the calls to the `pop()` method. Fixes: #42851 (cherry picked from commit 19909ed)
- Previously, when shorting on an aggregate function the bucket processing ended early when the explicit (LIMIT XXX) or the impliciti limit of 512 was reached. As a consequence, only a set of grouping buckets was processed and the results returned didn't reflect the global ordering. - Previously, the priority queue shorting method had an inverse comparison check and the final response from the priority queue was also returned in the inversed order because of the calls to the `pop()` method. Fixes: #42851 (cherry picked from commit 19909ed)
Previously, when shorting on an aggregate function the bucket
processing ended early when the explicit (LIMIT XXX) or the impliciti
limit of 512 was reached. As a consequence, only a set of grouping
buckets was processed and the results returned didn't reflect the global
ordering.
Previously, the priority queue shorting method had an inverse
comparison check and the final response from the priority queue was also
returned in the inversed order because of the calls to the
pop()
method.
Fixes: #42851