Skip to content

Commit

Permalink
SQL: Fix wrong results when sorting on aggregate (#43154)
Browse files Browse the repository at this point in the history
- Previously, when shorting on an aggregate function the bucket
processing ended early when the explicit (LIMIT XXX) or the impliciti
limit of 512 was reached. As a consequence, only a set of grouping
buckets was processed and the results returned didn't reflect the global
ordering.

- Previously, the priority queue shorting method had an inverse
comparison check and the final response from the priority queue was also
returned in the inversed order because of the calls to the `pop()`
method.

Fixes: #42851

(cherry picked from commit 19909ed)
  • Loading branch information
matriv committed Jun 13, 2019
1 parent c5acce0 commit 75f5546
Show file tree
Hide file tree
Showing 17 changed files with 295 additions and 168 deletions.
Expand Up @@ -32,7 +32,7 @@ public JdbcCsvSpecIT(String fileName, String groupName, String testName, Integer
@Override
protected int fetchSize() {
// using a smaller fetchSize for nested documents' tests to uncover bugs
// similar with https://github.com/elastic/elasticsearch/issues/35176 quicker
// similar to https://github.com/elastic/elasticsearch/issues/35176 quicker
return fileName.startsWith("nested") && randomBoolean() ? randomIntBetween(1,5) : super.fetchSize();
}
}
Expand Up @@ -21,4 +21,5 @@ public interface ErrorsTestCase {
void testSelectGroupByScore() throws Exception;
void testSelectScoreSubField() throws Exception;
void testSelectScoreInScalar() throws Exception;
void testHardLimitForSortOnAggregate() throws Exception;
}
Expand Up @@ -56,8 +56,8 @@ protected SecurityConfig securityConfig() {
return null;
}

protected void index(String index, CheckedConsumer<XContentBuilder, IOException> body) throws IOException {
Request request = new Request("PUT", "/" + index + "/_doc/1");
protected void index(String index, int docId, CheckedConsumer<XContentBuilder, IOException> body) throws IOException {
Request request = new Request("PUT", "/" + index + "/_doc/" + docId);
request.addParameter("refresh", "true");
XContentBuilder builder = JsonXContent.contentBuilder().startObject();
body.accept(builder);
Expand All @@ -66,6 +66,10 @@ protected void index(String index, CheckedConsumer<XContentBuilder, IOException>
client().performRequest(request);
}

protected void index(String index, CheckedConsumer<XContentBuilder, IOException> body) throws IOException {
index(index, 1, body);
}

public String command(String command) throws IOException {
return cli.command(command);
}
Expand Down
Expand Up @@ -97,8 +97,15 @@ public void testSelectScoreInScalar() throws Exception {
assertEquals("line 1:12: [SCORE()] cannot be an argument to a function" + END, readLine());
}

@Override
public void testHardLimitForSortOnAggregate() throws Exception {
index("test", body -> body.field("a", 1).field("b", 2));
String commandResult = command("SELECT max(a) max FROM test GROUP BY b ORDER BY max LIMIT 10000");
assertEquals(START + "Bad request [[3;33;22mThe maximum LIMIT for aggregate sorting is [512], received [10000]" + END,
commandResult);
}

public static void assertFoundOneProblem(String commandResult) {
assertEquals(START + "Bad request [[3;33;22mFound 1 problem(s)", commandResult);
}

}
Expand Up @@ -49,4 +49,32 @@ public void testInvalidFetchSize() throws IOException {
assertEquals(ErrorsTestCase.START + "Invalid fetch size [[3;33;22m" + Long.MAX_VALUE + ErrorsTestCase.END,
command("fetch size = " + Long.MAX_VALUE));
}

// Test for issue: https://github.com/elastic/elasticsearch/issues/42851
// Even though fetch size and limit are smaller than the noRows, all buckets
// should be processed to achieve the global ordering of the aggregate function.
public void testOrderingOnAggregate() throws IOException {
Request request = new Request("PUT", "/test/_bulk");
request.addParameter("refresh", "true");
StringBuilder bulk = new StringBuilder();
for (int i = 1; i <= 100; i++) {
bulk.append("{\"index\":{}}\n");
bulk.append("{\"a\":").append(i).append(", \"b\" : ").append(i).append("}\n");
}
request.setJsonEntity(bulk.toString());
client().performRequest(request);

assertEquals("[?1l>[?1000l[?2004lfetch size set to [90m4[0m", command("fetch size = 4"));
assertEquals("[?1l>[?1000l[?2004lfetch separator set to \"[90m -- fetch sep -- [0m\"",
command("fetch separator = \" -- fetch sep -- \""));
assertThat(command("SELECT max(b) FROM test GROUP BY a ORDER BY max(b) DESC LIMIT 20"), containsString("max(b)"));
assertThat(readLine(), containsString("----------"));
for (int i = 100; i > 80; i--) {
if (i < 100 && i % 4 == 0) {
assertThat(readLine(), containsString(" -- fetch sep -- "));
}
assertThat(readLine(), containsString(Integer.toString(i)));
}
assertEquals("", readLine());
}
}

This file was deleted.

Expand Up @@ -116,4 +116,14 @@ public void testSelectScoreInScalar() throws Exception {
assertThat(e.getMessage(), startsWith("Found 1 problem(s)\nline 1:12: [SCORE()] cannot be an argument to a function"));
}
}

@Override
public void testHardLimitForSortOnAggregate() throws Exception {
index("test", body -> body.field("a", 1).field("b", 2));
try (Connection c = esJdbc()) {
SQLException e = expectThrows(SQLException.class, () ->
c.prepareStatement("SELECT max(a) max FROM test GROUP BY b ORDER BY max LIMIT 10000").executeQuery());
assertEquals("The maximum LIMIT for aggregate sorting is [512], received [10000]", e.getMessage());
}
}
}
Expand Up @@ -65,6 +65,13 @@ public SqlSpecTestCase(String fileName, String groupName, String testName, Integ
this.query = query;
}

@Override
protected int fetchSize() {
// using a smaller fetchSize for nested documents' tests to uncover bugs
// similar to https://github.com/elastic/elasticsearch/issues/42581
return randomIntBetween(1, 20);
}

@Override
protected final void doTest() throws Throwable {
// we skip the tests in case of these locales because ES-SQL is Locale-insensitive for now
Expand Down
Expand Up @@ -6,7 +6,6 @@
package org.elasticsearch.xpack.sql.qa.rest;

import com.fasterxml.jackson.core.io.JsonStringEncoder;

import org.apache.http.HttpEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
Expand Down Expand Up @@ -314,7 +313,14 @@ public void testSelectScoreInScalar() throws Exception {
expectBadRequest(() -> runSql(randomMode(), "SELECT SIN(SCORE()) FROM test"),
containsString("line 1:12: [SCORE()] cannot be an argument to a function"));
}


@Override
public void testHardLimitForSortOnAggregate() throws Exception {
index("{\"a\": 1, \"b\": 2}");
expectBadRequest(() -> runSql(randomMode(), "SELECT max(a) max FROM test GROUP BY b ORDER BY max LIMIT 10000"),
containsString("The maximum LIMIT for aggregate sorting is [512], received [10000]"));
}

public void testUseColumnarForUnsupportedFormats() throws Exception {
String format = randomFrom("txt", "csv", "tsv");
index("{\"foo\":1}");
Expand Down
6 changes: 3 additions & 3 deletions x-pack/plugin/sql/qa/src/main/resources/agg-ordering.sql-spec
Expand Up @@ -99,13 +99,13 @@ aggNotSpecifiedWithHavingOnLargeGroupBy
SELECT MAX(salary) AS max FROM test_emp GROUP BY emp_no HAVING AVG(salary) > 1000 ORDER BY MIN(salary);

aggWithTieBreakerDescAsc
SELECT emp_no, MIN(languages) AS min FROM test_emp GROUP BY emp_no ORDER BY MIN(languages) DESC NULLS FIRST, emp_no ASC;
SELECT emp_no, MIN(languages) AS min FROM test_emp GROUP BY emp_no ORDER BY MIN(languages) DESC NULLS FIRST, emp_no ASC LIMIT 50;

aggWithTieBreakerDescDesc
SELECT emp_no, MIN(languages) AS min FROM test_emp GROUP BY emp_no ORDER BY MIN(languages) DESC NULLS FIRST, emp_no DESC;
SELECT emp_no, MIN(languages) AS min FROM test_emp GROUP BY emp_no ORDER BY MIN(languages) DESC NULLS FIRST, emp_no DESC LIMIT 50;

aggWithTieBreakerAscDesc
SELECT emp_no, MIN(languages) AS min FROM test_emp GROUP BY emp_no ORDER BY MAX(languages) ASC NULLS FIRST, emp_no DESC;
SELECT emp_no, MIN(languages) AS min FROM test_emp GROUP BY emp_no ORDER BY MAX(languages) ASC NULLS FIRST, emp_no DESC LIMIT 50;

aggWithMixOfOrdinals
SELECT gender AS g, MAX(salary) AS m FROM test_emp GROUP BY gender ORDER BY 2 DESC LIMIT 3;
Expand Down
Expand Up @@ -133,8 +133,9 @@ public void onResponse(SearchResponse r) {
return;
}

updateCompositeAfterKey(r, query);
CompositeAggsRowSet rowSet = new CompositeAggsRowSet(extractors, mask, r, limit, serializeQuery(query), indices);
boolean hasAfterKey = updateCompositeAfterKey(r, query);
CompositeAggsRowSet rowSet = new CompositeAggsRowSet(extractors, mask, r, limit,
hasAfterKey ? serializeQuery(query) : null, indices);
listener.onResponse(rowSet);
} catch (Exception ex) {
listener.onFailure(ex);
Expand Down Expand Up @@ -167,7 +168,7 @@ static CompositeAggregation getComposite(SearchResponse response) {
throw new SqlIllegalArgumentException("Unrecognized root group found; {}", agg.getClass());
}

static void updateCompositeAfterKey(SearchResponse r, SearchSourceBuilder next) {
static boolean updateCompositeAfterKey(SearchResponse r, SearchSourceBuilder next) {
CompositeAggregation composite = getComposite(r);

if (composite == null) {
Expand All @@ -176,22 +177,25 @@ static void updateCompositeAfterKey(SearchResponse r, SearchSourceBuilder next)

Map<String, Object> afterKey = composite.afterKey();
// a null after-key means done
if (afterKey != null) {
AggregationBuilder aggBuilder = next.aggregations().getAggregatorFactories().iterator().next();
// update after-key with the new value
if (aggBuilder instanceof CompositeAggregationBuilder) {
CompositeAggregationBuilder comp = (CompositeAggregationBuilder) aggBuilder;
comp.aggregateAfter(afterKey);
} else {
throw new SqlIllegalArgumentException("Invalid client request; expected a group-by but instead got {}", aggBuilder);
}
if (afterKey == null) {
return false;
}

AggregationBuilder aggBuilder = next.aggregations().getAggregatorFactories().iterator().next();
// update after-key with the new value
if (aggBuilder instanceof CompositeAggregationBuilder) {
CompositeAggregationBuilder comp = (CompositeAggregationBuilder) aggBuilder;
comp.aggregateAfter(afterKey);
return true;
} else {
throw new SqlIllegalArgumentException("Invalid client request; expected a group-by but instead got {}", aggBuilder);
}
}

/**
* Deserializes the search source from a byte array.
*/
static SearchSourceBuilder deserializeQuery(NamedWriteableRegistry registry, byte[] source) throws IOException {
private static SearchSourceBuilder deserializeQuery(NamedWriteableRegistry registry, byte[] source) throws IOException {
try (NamedWriteableAwareStreamInput in = new NamedWriteableAwareStreamInput(StreamInput.wrap(source), registry)) {
return new SearchSourceBuilder(in);
}
Expand Down
Expand Up @@ -39,18 +39,22 @@ class CompositeAggsRowSet extends ResultRowSet<BucketExtractor> {
}

// page size
size = limit < 0 ? buckets.size() : Math.min(buckets.size(), limit);
size = limit == -1 ? buckets.size() : Math.min(buckets.size(), limit);

if (next == null) {
cursor = Cursor.EMPTY;
} else {
// compute remaining limit
int remainingLimit = limit - size;
// Compute remaining limit

// If the limit is -1 then we have a local sorting (sort on aggregate function) that requires all the buckets
// to be processed so we stop only when all data is exhausted.
int remainingLimit = (limit == -1) ? limit : ((limit - size) >= 0 ? (limit - size) : 0);

// if the computed limit is zero, or the size is zero it means either there's nothing left or the limit has been reached
// note that a composite agg might be valid but return zero groups (since these can be filtered with HAVING/bucket selector)
// however the Querier takes care of that and keeps making requests until either the query is invalid or at least one response
// is returned
if (next == null || size == 0 || remainingLimit == 0) {
// is returned.
if (size == 0 || remainingLimit == 0) {
cursor = Cursor.EMPTY;
} else {
cursor = new CompositeAggregationCursor(next, exts, mask, remainingLimit, indices);
Expand Down Expand Up @@ -91,4 +95,4 @@ public int size() {
public Cursor nextPageCursor() {
return cursor;
}
}
}

0 comments on commit 75f5546

Please sign in to comment.