Skip to content

Commit

Permalink
Abort sorting in case of local agg sort queue overflow (#65687)
Browse files Browse the repository at this point in the history
In case the local agg sorter queue gets full and no limit has been provided,
the local sorter will now erroneously call the failure callback for every
single row in the original rowset that's left over the local queue limit
(instead for just the first one).  The failure response is dispatched in any
case, so this is relatively harmless.  The sorter continues iterating on the
original response fetching subsequent pages. In case of correct Elasticsearch
behaviour, this is also harmless, it'll just trigger a number of internal
exceptions. However, in case of a pagination defect in Elasticsearch (like
GH#65685, where the same search_after is returned), this will result in an
effective spin loop, potentially rendering eventually the node unresponsive.

This PR simply breaks both the inner loop iterating over the current unsorted
rowset, as well as the outer one, iterating over the left pages.

It also fixes an outdated documentation limitation.

(cherry picked from commit 638402c)
  • Loading branch information
bpintea committed Dec 7, 2020
1 parent 1ed5a56 commit 2ec53ea
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 10 deletions.
6 changes: 3 additions & 3 deletions docs/reference/sql/limitations.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ For such situations, {es-sql} will display an error message.
[discrete]
=== Paginating nested inner hits

When SELECTing a nested field, pagination will not work as expected, {es-sql} will return __at least__ the page size records.
When SELECTing a nested field, pagination will not work as expected, {es-sql} will return __at least__ the page size records.
This is because of the way nested queries work in {es}: the root nested field will be returned and it's matching inner nested fields as well,
pagination taking place on the **root nested document and not on its inner hits**.

Expand All @@ -105,8 +105,8 @@ When multiple values are returned for a field, by default, {es-sql} will throw a
=== Sorting by aggregation

When doing aggregations (`GROUP BY`) {es-sql} relies on {es}'s `composite` aggregation for its support for paginating results.
However this type of aggregation does come with a limitation: sorting can only be applied on the key used for the aggregation's buckets.
{es-sql} overcomes this limitation by doing client-side sorting however as a safety measure, allows only up to *512* rows.
However this type of aggregation does come with a limitation: sorting can only be applied on the key used for the aggregation's buckets.
{es-sql} overcomes this limitation by doing client-side sorting however as a safety measure, allows only up to *65535* rows.

It is recommended to use `LIMIT` for queries that use sorting by aggregation, essentially indicating the top N results that are desired:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public void query(List<Attribute> output, QueryContainer query, String index, Ac
client.search(search, l);
}

public static SearchRequest prepareRequest(Client client, SearchSourceBuilder source, TimeValue timeout, boolean includeFrozen,
public static SearchRequest prepareRequest(Client client, SearchSourceBuilder source, TimeValue timeout, boolean includeFrozen,
String... indices) {
return client.prepareSearch(indices)
// always track total hits accurately
Expand All @@ -150,7 +150,7 @@ public static SearchRequest prepareRequest(Client client, SearchSourceBuilder so
includeFrozen ? IndexResolver.FIELD_CAPS_FROZEN_INDICES_OPTIONS : IndexResolver.FIELD_CAPS_INDICES_OPTIONS)
.request();
}

protected static void logSearchResponse(SearchResponse response, Logger logger) {
List<Aggregation> aggs = Collections.emptyList();
if (response.getAggregations() != null) {
Expand All @@ -160,7 +160,7 @@ protected static void logSearchResponse(SearchResponse response, Logger logger)
for (int i = 0; i < aggs.size(); i++) {
aggsNames.append(aggs.get(i).getName() + (i + 1 == aggs.size() ? "" : ", "));
}

logger.trace("Got search response [hits {} {}, {} aggregations: [{}], {} failed shards, {} skipped shards, "
+ "{} successful shards, {} total shards, took {}, timed out [{}]]",
response.getHits().getTotalHits().relation.toString(),
Expand All @@ -177,7 +177,7 @@ protected static void logSearchResponse(SearchResponse response, Logger logger)

/**
* Listener used for local sorting (typically due to aggregations used inside `ORDER BY`).
*
*
* This listener consumes the whole result set, sorts it in memory then sends the paginated
* results back to the client.
*/
Expand All @@ -191,6 +191,7 @@ class LocalAggregationSorterListener implements ActionListener<Page> {
private final AtomicInteger counter = new AtomicInteger();
private volatile Schema schema;

// Note: when updating this value propagate it to the limitations.asciidoc page as well.
private static final int MAXIMUM_SIZE = MultiBucketConsumerService.DEFAULT_MAX_BUCKETS;
private final boolean noLimit;

Expand Down Expand Up @@ -225,7 +226,9 @@ public void onResponse(Page page) {
}

// 1. consume all pages received
consumeRowSet(page.rowSet());
if (consumeRowSet(page.rowSet()) == false) {
return;
}

Cursor cursor = page.next();
// 1a. trigger a next call if there's still data
Expand All @@ -241,7 +244,7 @@ public void onResponse(Page page) {
sendResponse();
}

private void consumeRowSet(RowSet rowSet) {
private boolean consumeRowSet(RowSet rowSet) {
ResultRowSet<?> rrs = (ResultRowSet<?>) rowSet;
for (boolean hasRows = rrs.hasCurrentRow(); hasRows; hasRows = rrs.advanceRow()) {
List<Object> row = new ArrayList<>(rrs.columnCount());
Expand All @@ -250,8 +253,10 @@ private void consumeRowSet(RowSet rowSet) {
if (data.insertWithOverflow(new Tuple<>(row, counter.getAndIncrement())) != null && noLimit) {
onFailure(new SqlIllegalArgumentException(
"The default limit [{}] for aggregate sorting has been reached; please specify a LIMIT", MAXIMUM_SIZE));
return false;
}
}
return true;
}

private void sendResponse() {
Expand Down Expand Up @@ -307,7 +312,7 @@ protected void handleResponse(SearchResponse response, ActionListener<Page> list
if (log.isTraceEnabled()) {
logSearchResponse(response, log);
}

Aggregations aggs = response.getAggregations();
if (aggs != null) {
Aggregation agg = aggs.get(Aggs.ROOT_GROUP_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,28 @@
*/
package org.elasticsearch.xpack.sql.execution.search;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ql.type.Schema;
import org.elasticsearch.xpack.sql.SqlTestUtils;
import org.elasticsearch.xpack.sql.execution.search.Querier.AggSortingQueue;
import org.elasticsearch.xpack.sql.session.Cursor;
import org.elasticsearch.xpack.sql.session.SchemaRowSet;
import org.elasticsearch.xpack.sql.session.SqlSession;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import static java.util.Collections.emptyList;
import static org.elasticsearch.xpack.sql.execution.search.ScrollCursorTests.randomHitExtractor;

public class QuerierTests extends ESTestCase {

Expand Down Expand Up @@ -171,4 +184,84 @@ public void testAggSorting_Randomized() {
});
assertEquals(expected.subList(0, queueSize), results);
}

public void testFullQueueSortingOnLocalSort() {
Tuple<Integer, Integer> actions = runLocalAggSorterWithNoLimit(MultiBucketConsumerService.DEFAULT_MAX_BUCKETS);

assertEquals("Exactly one response expected", 1, actions.v1().intValue());
assertEquals("No failures expected", 0, actions.v2().intValue());
}

public void testQueueOverflowSortingOnLocalSort() {
Tuple<Integer, Integer> actions = runLocalAggSorterWithNoLimit(MultiBucketConsumerService.DEFAULT_MAX_BUCKETS + 2);

assertEquals("No response expected", 0, actions.v1().intValue());
assertEquals("Exactly one failure expected", 1, actions.v2().intValue());
}

Tuple<Integer, Integer> runLocalAggSorterWithNoLimit(int dataSize) {
class TestResultRowSet<E extends NamedWriteable> extends ResultRowSet<E> implements SchemaRowSet {

private int rowCounter = 0;
private final int dataSize;

TestResultRowSet(List<E> extractors, BitSet mask, int dataSize) {
super(extractors, mask);
this.dataSize = dataSize;
}

@Override
protected Object extractValue(NamedWriteable namedWriteable) {
return rowCounter++;
}

@Override
protected boolean doHasCurrent() {
return true;
}

@Override
protected boolean doNext() {
return rowCounter < dataSize;
}

@Override
protected void doReset() {
}

@Override
public Schema schema() {
return new Schema(emptyList(), emptyList());
}

@Override
public int size() {
return dataSize; // irrelevant
}
};

Cursor.Page page = new Cursor.Page(new TestResultRowSet<NamedWriteable>(List.of(randomHitExtractor(0)), new BitSet(), dataSize),
Cursor.EMPTY);

AtomicInteger responses = new AtomicInteger();
AtomicInteger failures = new AtomicInteger();
ActionListener<Cursor.Page> listener = new ActionListener<>() {
@Override
public void onResponse(Cursor.Page page) {
responses.getAndIncrement();
}

@Override
public void onFailure(Exception e) {
failures.getAndIncrement();
}
};

SqlSession session = new SqlSession(SqlTestUtils.TEST_CFG, null, null, null, null, null, null, null, null);
Querier querier = new Querier(session);
Querier.LocalAggregationSorterListener localSorter = querier.new LocalAggregationSorterListener(listener, emptyList(), -1);
localSorter.onResponse(page);

return new Tuple<>(responses.get(), failures.get());
}
}

0 comments on commit 2ec53ea

Please sign in to comment.