From 75f5546a7323fb79300df8510ca20bdb11604b50 Mon Sep 17 00:00:00 2001 From: Marios Trivyzas Date: Thu, 13 Jun 2019 21:21:32 +0200 Subject: [PATCH] SQL: Fix wrong results when sorting on aggregate (#43154) - Previously, when shorting on an aggregate function the bucket processing ended early when the explicit (LIMIT XXX) or the impliciti limit of 512 was reached. As a consequence, only a set of grouping buckets was processed and the results returned didn't reflect the global ordering. - Previously, the priority queue shorting method had an inverse comparison check and the final response from the priority queue was also returned in the inversed order because of the calls to the `pop()` method. Fixes: #42851 (cherry picked from commit 19909edcfdf5792b38c1363b07379783ebd0e6c4) --- .../sql/qa/single_node/JdbcCsvSpecIT.java | 2 +- .../xpack/sql/qa/ErrorsTestCase.java | 1 + .../sql/qa/cli/CliIntegrationTestCase.java | 8 +- .../xpack/sql/qa/cli/ErrorsTestCase.java | 9 +- .../xpack/sql/qa/cli/FetchSizeTestCase.java | 28 ++++ .../xpack/sql/qa/jdbc/DebugCsvSpec.java | 64 -------- .../xpack/sql/qa/jdbc/ErrorsTestCase.java | 10 ++ .../xpack/sql/qa/jdbc/SqlSpecTestCase.java | 7 + .../xpack/sql/qa/rest/RestSqlTestCase.java | 10 +- .../src/main/resources/agg-ordering.sql-spec | 6 +- .../search/CompositeAggregationCursor.java | 30 ++-- .../execution/search/CompositeAggsRowSet.java | 16 +- .../xpack/sql/execution/search/Querier.java | 149 +++++++++--------- .../search/SchemaCompositeAggsRowSet.java | 2 +- .../sql/execution/search/SourceGenerator.java | 10 +- .../CompositeAggregationCursorTests.java | 2 +- .../sql/execution/search/QuerierTests.java | 109 +++++++++++++ 17 files changed, 295 insertions(+), 168 deletions(-) delete mode 100644 x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/DebugCsvSpec.java create mode 100644 x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/QuerierTests.java diff --git a/x-pack/plugin/sql/qa/single-node/src/test/java/org/elasticsearch/xpack/sql/qa/single_node/JdbcCsvSpecIT.java b/x-pack/plugin/sql/qa/single-node/src/test/java/org/elasticsearch/xpack/sql/qa/single_node/JdbcCsvSpecIT.java index f742b1304a79e..135b3ed57223f 100644 --- a/x-pack/plugin/sql/qa/single-node/src/test/java/org/elasticsearch/xpack/sql/qa/single_node/JdbcCsvSpecIT.java +++ b/x-pack/plugin/sql/qa/single-node/src/test/java/org/elasticsearch/xpack/sql/qa/single_node/JdbcCsvSpecIT.java @@ -32,7 +32,7 @@ public JdbcCsvSpecIT(String fileName, String groupName, String testName, Integer @Override protected int fetchSize() { // using a smaller fetchSize for nested documents' tests to uncover bugs - // similar with https://github.com/elastic/elasticsearch/issues/35176 quicker + // similar to https://github.com/elastic/elasticsearch/issues/35176 quicker return fileName.startsWith("nested") && randomBoolean() ? randomIntBetween(1,5) : super.fetchSize(); } } diff --git a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/ErrorsTestCase.java b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/ErrorsTestCase.java index 5e3b034d75708..c5ae7f63ad06d 100644 --- a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/ErrorsTestCase.java +++ b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/ErrorsTestCase.java @@ -21,4 +21,5 @@ public interface ErrorsTestCase { void testSelectGroupByScore() throws Exception; void testSelectScoreSubField() throws Exception; void testSelectScoreInScalar() throws Exception; + void testHardLimitForSortOnAggregate() throws Exception; } diff --git a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/cli/CliIntegrationTestCase.java b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/cli/CliIntegrationTestCase.java index e7a73cd12d524..cf221bbc14012 100644 --- a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/cli/CliIntegrationTestCase.java +++ b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/cli/CliIntegrationTestCase.java @@ -56,8 +56,8 @@ protected SecurityConfig securityConfig() { return null; } - protected void index(String index, CheckedConsumer body) throws IOException { - Request request = new Request("PUT", "/" + index + "/_doc/1"); + protected void index(String index, int docId, CheckedConsumer body) throws IOException { + Request request = new Request("PUT", "/" + index + "/_doc/" + docId); request.addParameter("refresh", "true"); XContentBuilder builder = JsonXContent.contentBuilder().startObject(); body.accept(builder); @@ -66,6 +66,10 @@ protected void index(String index, CheckedConsumer client().performRequest(request); } + protected void index(String index, CheckedConsumer body) throws IOException { + index(index, 1, body); + } + public String command(String command) throws IOException { return cli.command(command); } diff --git a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/cli/ErrorsTestCase.java b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/cli/ErrorsTestCase.java index ca251a31844c0..a3ad325d0acec 100644 --- a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/cli/ErrorsTestCase.java +++ b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/cli/ErrorsTestCase.java @@ -97,8 +97,15 @@ public void testSelectScoreInScalar() throws Exception { assertEquals("line 1:12: [SCORE()] cannot be an argument to a function" + END, readLine()); } + @Override + public void testHardLimitForSortOnAggregate() throws Exception { + index("test", body -> body.field("a", 1).field("b", 2)); + String commandResult = command("SELECT max(a) max FROM test GROUP BY b ORDER BY max LIMIT 10000"); + assertEquals(START + "Bad request [[3;33;22mThe maximum LIMIT for aggregate sorting is [512], received [10000]" + END, + commandResult); + } + public static void assertFoundOneProblem(String commandResult) { assertEquals(START + "Bad request [[3;33;22mFound 1 problem(s)", commandResult); } - } diff --git a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/cli/FetchSizeTestCase.java b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/cli/FetchSizeTestCase.java index 84f74bcbac137..02de2dff4f7d6 100644 --- a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/cli/FetchSizeTestCase.java +++ b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/cli/FetchSizeTestCase.java @@ -49,4 +49,32 @@ public void testInvalidFetchSize() throws IOException { assertEquals(ErrorsTestCase.START + "Invalid fetch size [[3;33;22m" + Long.MAX_VALUE + ErrorsTestCase.END, command("fetch size = " + Long.MAX_VALUE)); } + + // Test for issue: https://github.com/elastic/elasticsearch/issues/42851 + // Even though fetch size and limit are smaller than the noRows, all buckets + // should be processed to achieve the global ordering of the aggregate function. + public void testOrderingOnAggregate() throws IOException { + Request request = new Request("PUT", "/test/_bulk"); + request.addParameter("refresh", "true"); + StringBuilder bulk = new StringBuilder(); + for (int i = 1; i <= 100; i++) { + bulk.append("{\"index\":{}}\n"); + bulk.append("{\"a\":").append(i).append(", \"b\" : ").append(i).append("}\n"); + } + request.setJsonEntity(bulk.toString()); + client().performRequest(request); + + assertEquals("[?1l>[?1000l[?2004lfetch size set to [90m4[0m", command("fetch size = 4")); + assertEquals("[?1l>[?1000l[?2004lfetch separator set to \"[90m -- fetch sep -- [0m\"", + command("fetch separator = \" -- fetch sep -- \"")); + assertThat(command("SELECT max(b) FROM test GROUP BY a ORDER BY max(b) DESC LIMIT 20"), containsString("max(b)")); + assertThat(readLine(), containsString("----------")); + for (int i = 100; i > 80; i--) { + if (i < 100 && i % 4 == 0) { + assertThat(readLine(), containsString(" -- fetch sep -- ")); + } + assertThat(readLine(), containsString(Integer.toString(i))); + } + assertEquals("", readLine()); + } } diff --git a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/DebugCsvSpec.java b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/DebugCsvSpec.java deleted file mode 100644 index d5a633e5ea388..0000000000000 --- a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/DebugCsvSpec.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.sql.qa.jdbc; - -import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; - -import org.apache.logging.log4j.Logger; -import org.elasticsearch.test.junit.annotations.TestLogging; -import org.elasticsearch.xpack.sql.qa.jdbc.CsvTestUtils.CsvTestCase; - -import java.sql.Connection; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.List; - -import static org.elasticsearch.xpack.sql.qa.jdbc.CsvTestUtils.csvConnection; -import static org.elasticsearch.xpack.sql.qa.jdbc.CsvTestUtils.executeCsvQuery; -import static org.elasticsearch.xpack.sql.qa.jdbc.CsvTestUtils.specParser; - -@TestLogging("org.elasticsearch.xpack.sql:TRACE") -public abstract class DebugCsvSpec extends SpecBaseIntegrationTestCase { - private final CsvTestCase testCase; - - @ParametersFactory(shuffle = false, argumentFormatting = SqlSpecTestCase.PARAM_FORMATTING) - public static List readScriptSpec() throws Exception { - Parser parser = specParser(); - return readScriptSpec("/debug.csv-spec", parser); - } - - public DebugCsvSpec(String fileName, String groupName, String testName, Integer lineNumber, CsvTestCase testCase) { - super(fileName, groupName, testName, lineNumber); - this.testCase = testCase; - } - - @Override - protected void assertResults(ResultSet expected, ResultSet elastic) throws SQLException { - Logger log = logEsResultSet() ? logger : null; - - // - // uncomment this to printout the result set and create new CSV tests - // - JdbcTestUtils.logResultSetMetadata(elastic, log); - JdbcTestUtils.logResultSetData(elastic, log); - //JdbcAssert.assertResultSets(expected, elastic, log); - } - - @Override - protected boolean logEsResultSet() { - return true; - } - - @Override - protected final void doTest() throws Throwable { - try (Connection csv = csvConnection(testCase); Connection es = esJdbc()) { - // pass the testName as table for debugging purposes (in case the underlying reader is missing) - ResultSet expected = executeCsvQuery(csv, testName); - ResultSet elasticResults = executeJdbcQuery(es, testCase.query); - assertResults(expected, elasticResults); - } - } -} \ No newline at end of file diff --git a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/ErrorsTestCase.java b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/ErrorsTestCase.java index be3ba3d096ae2..6f12963634fdb 100644 --- a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/ErrorsTestCase.java +++ b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/ErrorsTestCase.java @@ -116,4 +116,14 @@ public void testSelectScoreInScalar() throws Exception { assertThat(e.getMessage(), startsWith("Found 1 problem(s)\nline 1:12: [SCORE()] cannot be an argument to a function")); } } + + @Override + public void testHardLimitForSortOnAggregate() throws Exception { + index("test", body -> body.field("a", 1).field("b", 2)); + try (Connection c = esJdbc()) { + SQLException e = expectThrows(SQLException.class, () -> + c.prepareStatement("SELECT max(a) max FROM test GROUP BY b ORDER BY max LIMIT 10000").executeQuery()); + assertEquals("The maximum LIMIT for aggregate sorting is [512], received [10000]", e.getMessage()); + } + } } diff --git a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/SqlSpecTestCase.java b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/SqlSpecTestCase.java index ef01dc1fca11e..cfbec77a3e636 100644 --- a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/SqlSpecTestCase.java +++ b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/SqlSpecTestCase.java @@ -65,6 +65,13 @@ public SqlSpecTestCase(String fileName, String groupName, String testName, Integ this.query = query; } + @Override + protected int fetchSize() { + // using a smaller fetchSize for nested documents' tests to uncover bugs + // similar to https://github.com/elastic/elasticsearch/issues/42581 + return randomIntBetween(1, 20); + } + @Override protected final void doTest() throws Throwable { // we skip the tests in case of these locales because ES-SQL is Locale-insensitive for now diff --git a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java index c88f31bb2fd71..5a16261bfbbd7 100644 --- a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java +++ b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.sql.qa.rest; import com.fasterxml.jackson.core.io.JsonStringEncoder; - import org.apache.http.HttpEntity; import org.apache.http.entity.ContentType; import org.apache.http.entity.StringEntity; @@ -314,7 +313,14 @@ public void testSelectScoreInScalar() throws Exception { expectBadRequest(() -> runSql(randomMode(), "SELECT SIN(SCORE()) FROM test"), containsString("line 1:12: [SCORE()] cannot be an argument to a function")); } - + + @Override + public void testHardLimitForSortOnAggregate() throws Exception { + index("{\"a\": 1, \"b\": 2}"); + expectBadRequest(() -> runSql(randomMode(), "SELECT max(a) max FROM test GROUP BY b ORDER BY max LIMIT 10000"), + containsString("The maximum LIMIT for aggregate sorting is [512], received [10000]")); + } + public void testUseColumnarForUnsupportedFormats() throws Exception { String format = randomFrom("txt", "csv", "tsv"); index("{\"foo\":1}"); diff --git a/x-pack/plugin/sql/qa/src/main/resources/agg-ordering.sql-spec b/x-pack/plugin/sql/qa/src/main/resources/agg-ordering.sql-spec index 79d58c48e4469..9a193d76b3166 100644 --- a/x-pack/plugin/sql/qa/src/main/resources/agg-ordering.sql-spec +++ b/x-pack/plugin/sql/qa/src/main/resources/agg-ordering.sql-spec @@ -99,13 +99,13 @@ aggNotSpecifiedWithHavingOnLargeGroupBy SELECT MAX(salary) AS max FROM test_emp GROUP BY emp_no HAVING AVG(salary) > 1000 ORDER BY MIN(salary); aggWithTieBreakerDescAsc -SELECT emp_no, MIN(languages) AS min FROM test_emp GROUP BY emp_no ORDER BY MIN(languages) DESC NULLS FIRST, emp_no ASC; +SELECT emp_no, MIN(languages) AS min FROM test_emp GROUP BY emp_no ORDER BY MIN(languages) DESC NULLS FIRST, emp_no ASC LIMIT 50; aggWithTieBreakerDescDesc -SELECT emp_no, MIN(languages) AS min FROM test_emp GROUP BY emp_no ORDER BY MIN(languages) DESC NULLS FIRST, emp_no DESC; +SELECT emp_no, MIN(languages) AS min FROM test_emp GROUP BY emp_no ORDER BY MIN(languages) DESC NULLS FIRST, emp_no DESC LIMIT 50; aggWithTieBreakerAscDesc -SELECT emp_no, MIN(languages) AS min FROM test_emp GROUP BY emp_no ORDER BY MAX(languages) ASC NULLS FIRST, emp_no DESC; +SELECT emp_no, MIN(languages) AS min FROM test_emp GROUP BY emp_no ORDER BY MAX(languages) ASC NULLS FIRST, emp_no DESC LIMIT 50; aggWithMixOfOrdinals SELECT gender AS g, MAX(salary) AS m FROM test_emp GROUP BY gender ORDER BY 2 DESC LIMIT 3; diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggregationCursor.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggregationCursor.java index b09e98d11c17d..9adf45540f3c9 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggregationCursor.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggregationCursor.java @@ -133,8 +133,9 @@ public void onResponse(SearchResponse r) { return; } - updateCompositeAfterKey(r, query); - CompositeAggsRowSet rowSet = new CompositeAggsRowSet(extractors, mask, r, limit, serializeQuery(query), indices); + boolean hasAfterKey = updateCompositeAfterKey(r, query); + CompositeAggsRowSet rowSet = new CompositeAggsRowSet(extractors, mask, r, limit, + hasAfterKey ? serializeQuery(query) : null, indices); listener.onResponse(rowSet); } catch (Exception ex) { listener.onFailure(ex); @@ -167,7 +168,7 @@ static CompositeAggregation getComposite(SearchResponse response) { throw new SqlIllegalArgumentException("Unrecognized root group found; {}", agg.getClass()); } - static void updateCompositeAfterKey(SearchResponse r, SearchSourceBuilder next) { + static boolean updateCompositeAfterKey(SearchResponse r, SearchSourceBuilder next) { CompositeAggregation composite = getComposite(r); if (composite == null) { @@ -176,22 +177,25 @@ static void updateCompositeAfterKey(SearchResponse r, SearchSourceBuilder next) Map afterKey = composite.afterKey(); // a null after-key means done - if (afterKey != null) { - AggregationBuilder aggBuilder = next.aggregations().getAggregatorFactories().iterator().next(); - // update after-key with the new value - if (aggBuilder instanceof CompositeAggregationBuilder) { - CompositeAggregationBuilder comp = (CompositeAggregationBuilder) aggBuilder; - comp.aggregateAfter(afterKey); - } else { - throw new SqlIllegalArgumentException("Invalid client request; expected a group-by but instead got {}", aggBuilder); - } + if (afterKey == null) { + return false; + } + + AggregationBuilder aggBuilder = next.aggregations().getAggregatorFactories().iterator().next(); + // update after-key with the new value + if (aggBuilder instanceof CompositeAggregationBuilder) { + CompositeAggregationBuilder comp = (CompositeAggregationBuilder) aggBuilder; + comp.aggregateAfter(afterKey); + return true; + } else { + throw new SqlIllegalArgumentException("Invalid client request; expected a group-by but instead got {}", aggBuilder); } } /** * Deserializes the search source from a byte array. */ - static SearchSourceBuilder deserializeQuery(NamedWriteableRegistry registry, byte[] source) throws IOException { + private static SearchSourceBuilder deserializeQuery(NamedWriteableRegistry registry, byte[] source) throws IOException { try (NamedWriteableAwareStreamInput in = new NamedWriteableAwareStreamInput(StreamInput.wrap(source), registry)) { return new SearchSourceBuilder(in); } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggsRowSet.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggsRowSet.java index fbbc839fe1c76..79676b38c4b33 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggsRowSet.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggsRowSet.java @@ -39,18 +39,22 @@ class CompositeAggsRowSet extends ResultRowSet { } // page size - size = limit < 0 ? buckets.size() : Math.min(buckets.size(), limit); + size = limit == -1 ? buckets.size() : Math.min(buckets.size(), limit); if (next == null) { cursor = Cursor.EMPTY; } else { - // compute remaining limit - int remainingLimit = limit - size; + // Compute remaining limit + + // If the limit is -1 then we have a local sorting (sort on aggregate function) that requires all the buckets + // to be processed so we stop only when all data is exhausted. + int remainingLimit = (limit == -1) ? limit : ((limit - size) >= 0 ? (limit - size) : 0); + // if the computed limit is zero, or the size is zero it means either there's nothing left or the limit has been reached // note that a composite agg might be valid but return zero groups (since these can be filtered with HAVING/bucket selector) // however the Querier takes care of that and keeps making requests until either the query is invalid or at least one response - // is returned - if (next == null || size == 0 || remainingLimit == 0) { + // is returned. + if (size == 0 || remainingLimit == 0) { cursor = Cursor.EMPTY; } else { cursor = new CompositeAggregationCursor(next, exts, mask, remainingLimit, indices); @@ -91,4 +95,4 @@ public int size() { public Cursor nextPageCursor() { return cursor; } -} \ No newline at end of file +} diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/Querier.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/Querier.java index fec7000a78780..b90c32c167c28 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/Querier.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/Querier.java @@ -115,7 +115,6 @@ public void query(List output, QueryContainer query, String index, Ac listener = sortingColumns.isEmpty() ? listener : new LocalAggregationSorterListener(listener, sortingColumns, query.limit()); ActionListener l = null; - if (query.isAggsOnly()) { if (query.aggs().useImplicitGroupBy()) { l = new ImplicitGroupActionListener(listener, client, cfg, output, query, search); @@ -131,14 +130,13 @@ public void query(List output, QueryContainer query, String index, Ac } public static SearchRequest prepareRequest(Client client, SearchSourceBuilder source, TimeValue timeout, String... indices) { - SearchRequest search = client.prepareSearch(indices) + return client.prepareSearch(indices) // always track total hits accurately .setTrackTotalHits(true) .setAllowPartialSearchResults(false) .setSource(source) .setTimeout(timeout) .request(); - return search; } /** @@ -153,7 +151,7 @@ class LocalAggregationSorterListener implements ActionListener { private final ActionListener listener; // keep the top N entries. - private final PriorityQueue, Integer>> data; + private final AggSortingQueue data; private final AtomicInteger counter = new AtomicInteger(); private volatile Schema schema; @@ -169,53 +167,13 @@ class LocalAggregationSorterListener implements ActionListener { } else { noLimit = false; if (limit > MAXIMUM_SIZE) { - throw new PlanningException("The maximum LIMIT for aggregate sorting is [{}], received [{}]", limit, MAXIMUM_SIZE); + throw new PlanningException("The maximum LIMIT for aggregate sorting is [{}], received [{}]", MAXIMUM_SIZE, limit); } else { size = limit; } } - this.data = new PriorityQueue, Integer>>(size) { - - // compare row based on the received attribute sort - // if a sort item is not in the list, it is assumed the sorting happened in ES - // and the results are left as is (by using the row ordering), otherwise it is sorted based on the given criteria. - // - // Take for example ORDER BY a, x, b, y - // a, b - are sorted in ES - // x, y - need to be sorted client-side - // sorting on x kicks in, only if the values for a are equal. - - // thanks to @jpountz for the row ordering idea as a way to preserve ordering - @SuppressWarnings("unchecked") - @Override - protected boolean lessThan(Tuple, Integer> l, Tuple, Integer> r) { - for (Tuple tuple : sortingColumns) { - int i = tuple.v1().intValue(); - Comparator comparator = tuple.v2(); - - Object vl = l.v1().get(i); - Object vr = r.v1().get(i); - if (comparator != null) { - int result = comparator.compare(vl, vr); - // if things are equals, move to the next comparator - if (result != 0) { - return result < 0; - } - } - // no comparator means the existing order needs to be preserved - else { - // check the values - if they are equal move to the next comparator - // otherwise return the row order - if (Objects.equals(vl, vr) == false) { - return l.v2().compareTo(r.v2()) < 0; - } - } - } - // everything is equal, fall-back to the row order - return l.v2().compareTo(r.v2()) < 0; - } - }; + this.data = new AggSortingQueue(size, sortingColumns); } @Override @@ -226,9 +184,8 @@ public void onResponse(SchemaRowSet schemaRowSet) { private void doResponse(RowSet rowSet) { // 1. consume all pages received - if (consumeRowSet(rowSet) == false) { - return; - } + consumeRowSet(rowSet); + Cursor cursor = rowSet.nextPageCursor(); // 1a. trigger a next call if there's still data if (cursor != Cursor.EMPTY) { @@ -243,31 +200,21 @@ private void doResponse(RowSet rowSet) { sendResponse(); } - private boolean consumeRowSet(RowSet rowSet) { - // use a synchronized block for visibility purposes (there's no concurrency) + private void consumeRowSet(RowSet rowSet) { ResultRowSet rrs = (ResultRowSet) rowSet; - synchronized (data) { - for (boolean hasRows = rrs.hasCurrentRow(); hasRows; hasRows = rrs.advanceRow()) { - List row = new ArrayList<>(rrs.columnCount()); - rrs.forEachResultColumn(row::add); - // if the queue overflows and no limit was specified, bail out - if (data.insertWithOverflow(new Tuple<>(row, counter.getAndIncrement())) != null && noLimit) { - onFailure(new SqlIllegalArgumentException( - "The default limit [{}] for aggregate sorting has been reached; please specify a LIMIT")); - return false; - } + for (boolean hasRows = rrs.hasCurrentRow(); hasRows; hasRows = rrs.advanceRow()) { + List row = new ArrayList<>(rrs.columnCount()); + rrs.forEachResultColumn(row::add); + // if the queue overflows and no limit was specified, throw an error + if (data.insertWithOverflow(new Tuple<>(row, counter.getAndIncrement())) != null && noLimit) { + onFailure(new SqlIllegalArgumentException( + "The default limit [{}] for aggregate sorting has been reached; please specify a LIMIT", MAXIMUM_SIZE)); } } - return true; } private void sendResponse() { - List> list = new ArrayList<>(data.size()); - Tuple, Integer> pop = null; - while ((pop = data.pop()) != null) { - list.add(pop.v1()); - } - listener.onResponse(new PagingListRowSet(schema, list, schema.size(), cfg.pageSize())); + listener.onResponse(new PagingListRowSet(schema, data.asList(), schema.size(), cfg.pageSize())); } @Override @@ -368,7 +315,7 @@ static class CompositeActionListener extends BaseAggActionListener { @Override protected void handleResponse(SearchResponse response, ActionListener listener) { // there are some results - if (response.getAggregations().asList().size() > 0) { + if (response.getAggregations().asList().isEmpty() == false) { // retry if (CompositeAggregationCursor.shouldRetryDueToEmptyPage(response)) { @@ -378,7 +325,7 @@ protected void handleResponse(SearchResponse response, ActionListener, Integer>> { + + private List> sortingColumns; + + AggSortingQueue(int maxSize, List> sortingColumns) { + super(maxSize); + this.sortingColumns = sortingColumns; + } + + // compare row based on the received attribute sort + // if a sort item is not in the list, it is assumed the sorting happened in ES + // and the results are left as is (by using the row ordering), otherwise it is sorted based on the given criteria. + // + // Take for example ORDER BY a, x, b, y + // a, b - are sorted in ES + // x, y - need to be sorted client-side + // sorting on x kicks in, only if the values for a are equal. + + // thanks to @jpountz for the row ordering idea as a way to preserve ordering + @SuppressWarnings("unchecked") + @Override + protected boolean lessThan(Tuple, Integer> l, Tuple, Integer> r) { + for (Tuple tuple : sortingColumns) { + int i = tuple.v1().intValue(); + Comparator comparator = tuple.v2(); + + Object vl = l.v1().get(i); + Object vr = r.v1().get(i); + if (comparator != null) { + int result = comparator.compare(vl, vr); + // if things are equals, move to the next comparator + if (result != 0) { + return result > 0; + } + } + // no comparator means the existing order needs to be preserved + else { + // check the values - if they are equal move to the next comparator + // otherwise return the row order + if (Objects.equals(vl, vr) == false) { + return l.v2().compareTo(r.v2()) > 0; + } + } + } + // everything is equal, fall-back to the row order + return l.v2().compareTo(r.v2()) > 0; + } + + List> asList() { + List> list = new ArrayList<>(super.size()); + Tuple, Integer> pop; + while ((pop = pop()) != null) { + list.add(0, pop.v1()); + } + return list; + } + } } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SchemaCompositeAggsRowSet.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SchemaCompositeAggsRowSet.java index bad618161e58c..3ca0757dbbff8 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SchemaCompositeAggsRowSet.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SchemaCompositeAggsRowSet.java @@ -33,4 +33,4 @@ class SchemaCompositeAggsRowSet extends CompositeAggsRowSet implements SchemaRow public Schema schema() { return schema; } -} \ No newline at end of file +} diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SourceGenerator.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SourceGenerator.java index 8d9e59617aa4d..4e343c1e54f5a 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SourceGenerator.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SourceGenerator.java @@ -80,9 +80,13 @@ public static SearchSourceBuilder sourceBuilder(QueryContainer container, QueryB if (source.size() == -1) { source.size(sz); } - // limit the composite aggs only for non-local sorting - if (aggBuilder instanceof CompositeAggregationBuilder && container.sortingColumns().isEmpty()) { - ((CompositeAggregationBuilder) aggBuilder).size(sz); + if (aggBuilder instanceof CompositeAggregationBuilder) { + // limit the composite aggs only for non-local sorting + if (container.sortingColumns().isEmpty()) { + ((CompositeAggregationBuilder) aggBuilder).size(sz); + } else { + ((CompositeAggregationBuilder) aggBuilder).size(size); + } } } diff --git a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggregationCursorTests.java b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggregationCursorTests.java index f2dccc396dbd3..8905a17e0e504 100644 --- a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggregationCursorTests.java +++ b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggregationCursorTests.java @@ -81,4 +81,4 @@ static BitSet randomBitSet(int size) { } return mask; } -} \ No newline at end of file +} diff --git a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/QuerierTests.java b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/QuerierTests.java new file mode 100644 index 0000000000000..a6caad899dd89 --- /dev/null +++ b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/QuerierTests.java @@ -0,0 +1,109 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.sql.execution.search; + +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.sql.execution.search.Querier.AggSortingQueue; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + +public class QuerierTests extends ESTestCase { + + @SuppressWarnings("rawtypes") + public void testAggSortingAscending() { + Tuple tuple = new Tuple<>(0, Comparator.naturalOrder()); + Querier.AggSortingQueue queue = new AggSortingQueue(10, Collections.singletonList(tuple)); + for (int i = 50; i >= 0; i--) { + queue.insertWithOverflow(new Tuple<>(Collections.singletonList(i), i)); + } + List> results = queue.asList(); + + assertEquals(10, results.size()); + for (int i = 0; i < 10; i ++) { + assertEquals(i, results.get(i).get(0)); + } + } + + @SuppressWarnings("rawtypes") + public void testAggSortingDescending() { + Tuple tuple = new Tuple<>(0, Comparator.reverseOrder()); + Querier.AggSortingQueue queue = new AggSortingQueue(10, Collections.singletonList(tuple)); + for (int i = 0; i <= 50; i++) { + queue.insertWithOverflow(new Tuple<>(Collections.singletonList(i), i)); + } + List> results = queue.asList(); + + assertEquals(10, results.size()); + for (int i = 0; i < 10; i ++) { + assertEquals(50 - i, results.get(i).get(0)); + } + } + + @SuppressWarnings("rawtypes") + public void testAggSorting_TwoFields() { + List> tuples = new ArrayList<>(2); + tuples.add(new Tuple<>(0, Comparator.reverseOrder())); + tuples.add(new Tuple<>(1, Comparator.naturalOrder())); + Querier.AggSortingQueue queue = new AggSortingQueue(10, tuples); + + for (int i = 1; i <= 100; i++) { + queue.insertWithOverflow(new Tuple<>(Arrays.asList(i % 50 + 1, i), i)); + } + List> results = queue.asList(); + + assertEquals(10, results.size()); + for (int i = 0; i < 10; i++) { + assertEquals(50 - (i / 2), results.get(i).get(0)); + assertEquals(49 - (i / 2) + ((i % 2) * 50), results.get(i).get(1)); + } + } + + @SuppressWarnings("rawtypes") + public void testAggSorting_Randomized() { + // Initialize comparators for fields (columns) + int noColumns = randomIntBetween(3, 10); + List> tuples = new ArrayList<>(noColumns); + boolean[] ordering = new boolean[noColumns]; + for (int j = 0; j < noColumns; j++) { + boolean order = randomBoolean(); + ordering[j] = order; + tuples.add(new Tuple<>(j, order ? Comparator.naturalOrder() : Comparator.reverseOrder())); + } + + // Insert random no of documents (rows) with random 0/1 values for each field + int noDocs = randomIntBetween(10, 50); + int queueSize = randomIntBetween(4, noDocs / 2); + List> expected = new ArrayList<>(noDocs); + Querier.AggSortingQueue queue = new AggSortingQueue(queueSize, tuples); + for (int i = 0; i < noDocs; i++) { + List values = new ArrayList<>(noColumns); + for (int j = 0; j < noColumns; j++) { + values.add(randomBoolean() ? 1 : 0); + } + queue.insertWithOverflow(new Tuple<>(values, i)); + expected.add(values); + } + + List> results = queue.asList(); + assertEquals(queueSize, results.size()); + expected.sort((o1, o2) -> { + for (int j = 0; j < noColumns; j++) { + if (o1.get(j) < o2.get(j)) { + return ordering[j] ? -1 : 1; + } else if (o1.get(j) > o2.get(j)) { + return ordering[j] ? 1 : -1; + } + } + return 0; + }); + assertEquals(expected.subList(0, queueSize), results); + } +}