Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SQL: Fix wrong results when sorting on aggregate #43154

Merged
merged 5 commits into from Jun 13, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -32,7 +32,7 @@ public JdbcCsvSpecIT(String fileName, String groupName, String testName, Integer
@Override
protected int fetchSize() {
// using a smaller fetchSize for nested documents' tests to uncover bugs
// similar with https://github.com/elastic/elasticsearch/issues/35176 quicker
// similar to https://github.com/elastic/elasticsearch/issues/35176 quicker
return fileName.startsWith("nested") && randomBoolean() ? randomIntBetween(1,5) : super.fetchSize();
}
}
Expand Up @@ -21,4 +21,5 @@ public interface ErrorsTestCase {
void testSelectGroupByScore() throws Exception;
void testSelectScoreSubField() throws Exception;
void testSelectScoreInScalar() throws Exception;
void testHardLimitForSortOnAggregate() throws Exception;
}
Expand Up @@ -56,8 +56,8 @@ protected SecurityConfig securityConfig() {
return null;
}

protected void index(String index, CheckedConsumer<XContentBuilder, IOException> body) throws IOException {
Request request = new Request("PUT", "/" + index + "/_doc/1");
protected void index(String index, int docId, CheckedConsumer<XContentBuilder, IOException> body) throws IOException {
Request request = new Request("PUT", "/" + index + "/_doc/" + docId);
request.addParameter("refresh", "true");
XContentBuilder builder = JsonXContent.contentBuilder().startObject();
body.accept(builder);
Expand All @@ -66,6 +66,10 @@ protected void index(String index, CheckedConsumer<XContentBuilder, IOException>
client().performRequest(request);
}

protected void index(String index, CheckedConsumer<XContentBuilder, IOException> body) throws IOException {
index(index, 1, body);
}

public String command(String command) throws IOException {
return cli.command(command);
}
Expand Down
Expand Up @@ -97,8 +97,15 @@ public void testSelectScoreInScalar() throws Exception {
assertEquals("line 1:12: [SCORE()] cannot be an argument to a function" + END, readLine());
}

@Override
public void testHardLimitForSortOnAggregate() throws Exception {
index("test", body -> body.field("a", 1).field("b", 2));
String commandResult = command("SELECT max(a) max FROM test GROUP BY b ORDER BY max LIMIT 10000");
assertEquals(START + "Bad request [[3;33;22mThe maximum LIMIT for aggregate sorting is [512], received [10000]" + END,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated to this pr but I don't think we should treat ordered GROUP_BY differently than natural GROUP_BY (sorted by keys). We should use the same hard limit for both, the cost of an ordered GROUP_BY depends greatly on the cardinality of the groups and not so much on the size of the priority queue (assuming it remains reasonable ;)).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I agree here. When we don't have an order by on aggregate then we don't need to keep this extra buffer in memory (PriorityQueue) in the SQL module, instead we have a cursor from ES to paginate the results, thus we don't need to apply a hard limit. Or maybe I'm missing something?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The memory to keep the best buckets in the PriorityQueue should be identical to the one that is used in ES when we compute the top hits so it's only a question on where this memory is held. I understand that the priority queue in the client can increase the memory but it shouldn't make a big difference if the size is 512 or 1000 or even 10,000. Most of the time will be spent on paginating the buckets and a limit of 1 or 512 shouldn't make any difference.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thx, opened an issue to track it: #43168

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thinking here is the other way around, we should increase the hard limit of the ordered group_by to match the one that we set to "normal" group_by. 10,000 seems a reasonable value IMO.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, I misunderstood you here. Changed the issue. Thx!

commandResult);
}

public static void assertFoundOneProblem(String commandResult) {
assertEquals(START + "Bad request [[3;33;22mFound 1 problem(s)", commandResult);
}

}
Expand Up @@ -49,4 +49,32 @@ public void testInvalidFetchSize() throws IOException {
assertEquals(ErrorsTestCase.START + "Invalid fetch size [[3;33;22m" + Long.MAX_VALUE + ErrorsTestCase.END,
command("fetch size = " + Long.MAX_VALUE));
}

// Test for issue: https://github.com/elastic/elasticsearch/issues/42851
// Even though fetch size and limit are smaller than the noRows, all buckets
// should be processed to achieve the global ordering of the aggregate function.
public void testOrderingOnAggregate() throws IOException {
Request request = new Request("PUT", "/test/_bulk");
request.addParameter("refresh", "true");
StringBuilder bulk = new StringBuilder();
for (int i = 1; i <= 100; i++) {
bulk.append("{\"index\":{}}\n");
bulk.append("{\"a\":").append(i).append(", \"b\" : ").append(i).append("}\n");
}
request.setJsonEntity(bulk.toString());
client().performRequest(request);

assertEquals("[?1l>[?1000l[?2004lfetch size set to [90m4[0m", command("fetch size = 4"));
assertEquals("[?1l>[?1000l[?2004lfetch separator set to \"[90m -- fetch sep -- [0m\"",
command("fetch separator = \" -- fetch sep -- \""));
assertThat(command("SELECT max(b) FROM test GROUP BY a ORDER BY max(b) DESC LIMIT 20"), containsString("max(b)"));
assertThat(readLine(), containsString("----------"));
for (int i = 100; i > 80; i--) {
if (i < 100 && i % 4 == 0) {
assertThat(readLine(), containsString(" -- fetch sep -- "));
}
assertThat(readLine(), containsString(Integer.toString(i)));
}
assertEquals("", readLine());
}
}

This file was deleted.

Expand Up @@ -116,4 +116,14 @@ public void testSelectScoreInScalar() throws Exception {
assertThat(e.getMessage(), startsWith("Found 1 problem(s)\nline 1:12: [SCORE()] cannot be an argument to a function"));
}
}

@Override
public void testHardLimitForSortOnAggregate() throws Exception {
index("test", body -> body.field("a", 1).field("b", 2));
try (Connection c = esJdbc()) {
SQLException e = expectThrows(SQLException.class, () ->
c.prepareStatement("SELECT max(a) max FROM test GROUP BY b ORDER BY max LIMIT 10000").executeQuery());
assertEquals("The maximum LIMIT for aggregate sorting is [512], received [10000]", e.getMessage());
}
}
}
Expand Up @@ -65,6 +65,13 @@ public SqlSpecTestCase(String fileName, String groupName, String testName, Integ
this.query = query;
}

@Override
protected int fetchSize() {
// using a smaller fetchSize for nested documents' tests to uncover bugs
// similar to https://github.com/elastic/elasticsearch/issues/42581
return randomIntBetween(1, 20);
}

@Override
protected final void doTest() throws Throwable {
// we skip the tests in case of these locales because ES-SQL is Locale-insensitive for now
Expand Down
Expand Up @@ -6,7 +6,6 @@
package org.elasticsearch.xpack.sql.qa.rest;

import com.fasterxml.jackson.core.io.JsonStringEncoder;

import org.apache.http.HttpEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
Expand Down Expand Up @@ -314,7 +313,14 @@ public void testSelectScoreInScalar() throws Exception {
expectBadRequest(() -> runSql(randomMode(), "SELECT SIN(SCORE()) FROM test"),
containsString("line 1:12: [SCORE()] cannot be an argument to a function"));
}


@Override
public void testHardLimitForSortOnAggregate() throws Exception {
index("{\"a\": 1, \"b\": 2}");
expectBadRequest(() -> runSql(randomMode(), "SELECT max(a) max FROM test GROUP BY b ORDER BY max LIMIT 10000"),
containsString("The maximum LIMIT for aggregate sorting is [512], received [10000]"));
}

public void testUseColumnarForUnsupportedFormats() throws Exception {
String format = randomFrom("txt", "csv", "tsv");
index("{\"foo\":1}");
Expand Down
6 changes: 3 additions & 3 deletions x-pack/plugin/sql/qa/src/main/resources/agg-ordering.sql-spec
Expand Up @@ -99,13 +99,13 @@ aggNotSpecifiedWithHavingOnLargeGroupBy
SELECT MAX(salary) AS max FROM test_emp GROUP BY emp_no HAVING AVG(salary) > 1000 ORDER BY MIN(salary);

aggWithTieBreakerDescAsc
SELECT emp_no, MIN(languages) AS min FROM test_emp GROUP BY emp_no ORDER BY MIN(languages) DESC NULLS FIRST, emp_no ASC;
SELECT emp_no, MIN(languages) AS min FROM test_emp GROUP BY emp_no ORDER BY MIN(languages) DESC NULLS FIRST, emp_no ASC LIMIT 50;

aggWithTieBreakerDescDesc
SELECT emp_no, MIN(languages) AS min FROM test_emp GROUP BY emp_no ORDER BY MIN(languages) DESC NULLS FIRST, emp_no DESC;
SELECT emp_no, MIN(languages) AS min FROM test_emp GROUP BY emp_no ORDER BY MIN(languages) DESC NULLS FIRST, emp_no DESC LIMIT 50;

aggWithTieBreakerAscDesc
SELECT emp_no, MIN(languages) AS min FROM test_emp GROUP BY emp_no ORDER BY MAX(languages) ASC NULLS FIRST, emp_no DESC;
SELECT emp_no, MIN(languages) AS min FROM test_emp GROUP BY emp_no ORDER BY MAX(languages) ASC NULLS FIRST, emp_no DESC LIMIT 50;

aggWithMixOfOrdinals
SELECT gender AS g, MAX(salary) AS m FROM test_emp GROUP BY gender ORDER BY 2 DESC LIMIT 3;
Expand Down
Expand Up @@ -132,7 +132,7 @@ public void nextPage(Configuration cfg, Client client, NamedWriteableRegistry re

SearchRequest search = Querier.prepareRequest(client, query, cfg.pageTimeout(), includeFrozen, indices);

client.search(search, new ActionListener<SearchResponse>() {
client.search(search, new ActionListener<>() {
@Override
public void onResponse(SearchResponse r) {
try {
Expand All @@ -143,9 +143,9 @@ public void onResponse(SearchResponse r) {
return;
}

updateCompositeAfterKey(r, query);
CompositeAggsRowSet rowSet = new CompositeAggsRowSet(extractors, mask, r, limit, serializeQuery(query), includeFrozen,
indices);
boolean hasAfterKey = updateCompositeAfterKey(r, query);
CompositeAggsRowSet rowSet = new CompositeAggsRowSet(extractors, mask, r, limit,
hasAfterKey ? serializeQuery(query) : null, includeFrozen, indices);
listener.onResponse(rowSet);
} catch (Exception ex) {
listener.onFailure(ex);
Expand Down Expand Up @@ -178,7 +178,7 @@ static CompositeAggregation getComposite(SearchResponse response) {
throw new SqlIllegalArgumentException("Unrecognized root group found; {}", agg.getClass());
}

static void updateCompositeAfterKey(SearchResponse r, SearchSourceBuilder next) {
static boolean updateCompositeAfterKey(SearchResponse r, SearchSourceBuilder next) {
CompositeAggregation composite = getComposite(r);

if (composite == null) {
Expand All @@ -187,22 +187,25 @@ static void updateCompositeAfterKey(SearchResponse r, SearchSourceBuilder next)

Map<String, Object> afterKey = composite.afterKey();
// a null after-key means done
if (afterKey != null) {
AggregationBuilder aggBuilder = next.aggregations().getAggregatorFactories().iterator().next();
// update after-key with the new value
if (aggBuilder instanceof CompositeAggregationBuilder) {
CompositeAggregationBuilder comp = (CompositeAggregationBuilder) aggBuilder;
comp.aggregateAfter(afterKey);
} else {
throw new SqlIllegalArgumentException("Invalid client request; expected a group-by but instead got {}", aggBuilder);
}
if (afterKey == null) {
return false;
}

AggregationBuilder aggBuilder = next.aggregations().getAggregatorFactories().iterator().next();
// update after-key with the new value
if (aggBuilder instanceof CompositeAggregationBuilder) {
CompositeAggregationBuilder comp = (CompositeAggregationBuilder) aggBuilder;
comp.aggregateAfter(afterKey);
return true;
} else {
throw new SqlIllegalArgumentException("Invalid client request; expected a group-by but instead got {}", aggBuilder);
}
}

/**
* Deserializes the search source from a byte array.
*/
static SearchSourceBuilder deserializeQuery(NamedWriteableRegistry registry, byte[] source) throws IOException {
private static SearchSourceBuilder deserializeQuery(NamedWriteableRegistry registry, byte[] source) throws IOException {
try (NamedWriteableAwareStreamInput in = new NamedWriteableAwareStreamInput(StreamInput.wrap(source), registry)) {
return new SearchSourceBuilder(in);
}
Expand Down
Expand Up @@ -28,8 +28,8 @@ class CompositeAggsRowSet extends ResultRowSet<BucketExtractor> {
private final int size;
private int row = 0;

CompositeAggsRowSet(List<BucketExtractor> exts, BitSet mask, SearchResponse response, int limit, byte[] next,
boolean includeFrozen, String... indices) {
CompositeAggsRowSet(List<BucketExtractor> exts, BitSet mask, SearchResponse response,
int limit, byte[] next, boolean includeFrozen, String... indices) {
super(exts, mask);

CompositeAggregation composite = CompositeAggregationCursor.getComposite(response);
Expand All @@ -40,18 +40,22 @@ class CompositeAggsRowSet extends ResultRowSet<BucketExtractor> {
}

// page size
size = limit < 0 ? buckets.size() : Math.min(buckets.size(), limit);
size = limit == -1 ? buckets.size() : Math.min(buckets.size(), limit);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the original code wasn't good?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will also work, but we introduced the -1 which means "no limit - need to process all buckets", and it's the only negative value we can get now. If you check here you'll see that when we want to stop we set remaining limit to 0. So to sum up, I changed the check to == -1 for more clarity.


if (next == null) {
cursor = Cursor.EMPTY;
} else {
// compute remaining limit
int remainingLimit = limit - size;
// Compute remaining limit

// If the limit is -1 then we have a local sorting (sort on aggregate function) that requires all the buckets
// to be processed so we stop only when all data is exhausted.
int remainingLimit = (limit == -1) ? limit : ((limit - size) >= 0 ? (limit - size) : 0);

// if the computed limit is zero, or the size is zero it means either there's nothing left or the limit has been reached
// note that a composite agg might be valid but return zero groups (since these can be filtered with HAVING/bucket selector)
// however the Querier takes care of that and keeps making requests until either the query is invalid or at least one response
// is returned
if (next == null || size == 0 || remainingLimit == 0) {
// is returned.
if (size == 0 || remainingLimit == 0) {
cursor = Cursor.EMPTY;
} else {
cursor = new CompositeAggregationCursor(next, exts, mask, remainingLimit, includeFrozen, indices);
Expand Down Expand Up @@ -92,4 +96,4 @@ public int size() {
public Cursor nextPageCursor() {
return cursor;
}
}
}