Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/134497.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 134497
summary: Limit when we push topn to lucene
area: ES|QL
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public void skipOnAborted() {
*/
public void testSortByManyLongsSuccess() throws IOException {
initManyLongs(10);
// | SORT a, b, i0, i1, ...i500 | KEEP a, b | LIMIT 10000
Map<String, Object> response = sortByManyLongs(500);
ListMatcher columns = matchesList().item(matchesMap().entry("name", "a").entry("type", "long"))
.item(matchesMap().entry("name", "b").entry("type", "long"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public class TopNOperator implements Operator, Accountable {
static final class Row implements Accountable, Releasable {
private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(Row.class);

private final CircuitBreaker breaker;

/**
* The sort key.
*/
Expand All @@ -81,17 +83,11 @@ static final class Row implements Accountable, Releasable {
@Nullable
RefCounted shardRefCounter;

void setShardRefCountersAndShard(RefCounted shardRefCounter) {
if (this.shardRefCounter != null) {
this.shardRefCounter.decRef();
}
this.shardRefCounter = shardRefCounter;
this.shardRefCounter.mustIncRef();
}

Row(CircuitBreaker breaker, List<SortOrder> sortOrders, int preAllocatedKeysSize, int preAllocatedValueSize) {
this.breaker = breaker;
boolean success = false;
try {
breaker.addEstimateBytesAndMaybeBreak(SHALLOW_SIZE, "topn");
keys = new BreakingBytesRefBuilder(breaker, "topn", preAllocatedKeysSize);
values = new BreakingBytesRefBuilder(breaker, "topn", preAllocatedValueSize);
bytesOrder = new BytesOrder(sortOrders, breaker, "topn");
Expand All @@ -111,7 +107,7 @@ public long ramBytesUsed() {
@Override
public void close() {
clearRefCounters();
Releasables.closeExpectNoException(keys, values, bytesOrder);
Releasables.closeExpectNoException(() -> breaker.addWithoutBreaking(-SHALLOW_SIZE), keys, values, bytesOrder);
}

public void clearRefCounters() {
Expand All @@ -120,6 +116,14 @@ public void clearRefCounters() {
}
shardRefCounter = null;
}

void setShardRefCountersAndShard(RefCounted shardRefCounter) {
if (this.shardRefCounter != null) {
this.shardRefCounter.decRef();
}
this.shardRefCounter = shardRefCounter;
this.shardRefCounter.mustIncRef();
}
}

static final class BytesOrder implements Releasable, Accountable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1489,9 +1489,9 @@ public void testRowResizes() {
block.decRef();
op.addInput(new Page(blocks));

// 94 are from the collection process
// 105 are from the objects
// 1 is for the min-heap itself
assertThat(breaker.getMemoryRequestCount(), is(95L));
assertThat(breaker.getMemoryRequestCount(), is(106L));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import static org.elasticsearch.test.MapMatcher.assertMap;
import static org.elasticsearch.test.MapMatcher.matchesMap;
import static org.elasticsearch.xpack.esql.core.type.DataType.isMillisOrNanos;
import static org.elasticsearch.xpack.esql.planner.PhysicalSettings.LUCENE_TOPN_LIMIT;
import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.Mode.SYNC;
import static org.elasticsearch.xpack.esql.tools.ProfileParser.parseProfile;
import static org.elasticsearch.xpack.esql.tools.ProfileParser.readProfileFromResponse;
Expand Down Expand Up @@ -188,6 +189,18 @@ private void setLoggingLevel(String level) throws IOException {
client().performRequest(request);
}

private void setTruncationWindowMax(Integer size) throws IOException {
Request request = new Request("PUT", "/_cluster/settings");
request.setJsonEntity("""
{
"persistent": {
"esql.query.result_truncation_max_size": $SIZE$
}
}
""".replace("$SIZE$", size == null ? "null" : Integer.toString(size)));
client().performRequest(request);
}

public void testIncompatibleMappingsErrors() throws IOException {
// create first index
Request request = new Request("PUT", "/index1");
Expand Down Expand Up @@ -538,6 +551,86 @@ public void testInlineStatsProfile() throws IOException {
);
}

public void testSmallTopNProfile() throws IOException {
testTopNProfile(false);
}

public void testGiantTopNProfile() throws IOException {
testTopNProfile(true);
}

private void testTopNProfile(boolean giant) throws IOException {
try {
setTruncationWindowMax(1000000);
indexTimestampData(1);

int size = between(1, LUCENE_TOPN_LIMIT.get(Settings.EMPTY).intValue() - 1);
if (giant) {
size += LUCENE_TOPN_LIMIT.get(Settings.EMPTY).intValue();
}
RequestObjectBuilder builder = requestObjectBuilder().query(fromIndex() + " | KEEP value | SORT value ASC | LIMIT " + size);

builder.pragmas(Settings.builder().put("data_partitioning", "shard").build());
builder.profile(true);
builder.pragmasOk();

Map<String, Object> result = runEsql(builder);
ListMatcher values = matchesList();
for (int i = 0; i < Math.min(1000, size); i++) {
values = values.item(List.of(i));
}
assertResultMap(
result,
getResultMatcher(result).entry("profile", getProfileMatcher()),
matchesList().item(matchesMap().entry("name", "value").entry("type", "long")),
values
);

@SuppressWarnings("unchecked")
List<Map<String, Object>> profiles = (List<Map<String, Object>>) ((Map<String, Object>) result.get("profile")).get("drivers");
for (Map<String, Object> p : profiles) {
fixTypesOnProfile(p);
assertThat(p, commonProfile());
List<String> sig = new ArrayList<>();
@SuppressWarnings("unchecked")
List<Map<String, Object>> operators = (List<Map<String, Object>>) p.get("operators");
for (Map<String, Object> o : operators) {
sig.add(checkOperatorProfile(o));
}
String description = p.get("description").toString();
switch (description) {
case "data" -> assertMap(
sig,
giant
? matchesList().item("LuceneSourceOperator")
.item("ValuesSourceReaderOperator")
.item("TopNOperator")
.item("ProjectOperator")
.item("ExchangeSinkOperator")
: matchesList().item("LuceneTopNSourceOperator")
.item("ValuesSourceReaderOperator")
.item("ProjectOperator")
.item("ExchangeSinkOperator")
);
case "node_reduce" -> assertThat(
sig,
// If the coordinating node and data node are the same node then we get this
either(matchesList().item("ExchangeSourceOperator").item("ExchangeSinkOperator"))
// If the coordinating node and data node are *not* the same node we get this
.or(matchesList().item("ExchangeSourceOperator").item("TopNOperator").item("ExchangeSinkOperator"))
);
case "final" -> assertMap(
sig,
matchesList().item("ExchangeSourceOperator").item("TopNOperator").item("ProjectOperator").item("OutputOperator")
);
default -> throw new IllegalArgumentException("can't match " + description);
}
}
} finally {
setTruncationWindowMax(null);
}
}

public void testForceSleepsProfile() throws IOException {
assumeTrue("requires pragmas", Build.current().isSnapshot());

Expand Down Expand Up @@ -940,7 +1033,9 @@ private String checkOperatorProfile(Map<String, Object> o) {
.entry("rows_received", greaterThan(0))
.entry("rows_emitted", greaterThan(0))
.entry("ram_used", instanceOf(String.class))
.entry("ram_bytes_used", greaterThan(0));
.entry("ram_bytes_used", greaterThan(0))
.entry("receive_nanos", greaterThan(0))
.entry("emit_nanos", greaterThan(0));
case "LuceneTopNSourceOperator" -> matchesMap().entry("pages_emitted", greaterThan(0))
.entry("rows_emitted", greaterThan(0))
.entry("current", greaterThan(0))
Expand All @@ -951,7 +1046,8 @@ private String checkOperatorProfile(Map<String, Object> o) {
.entry("slice_min", 0)
.entry("process_nanos", greaterThan(0))
.entry("processed_queries", List.of("*:*"))
.entry("slice_index", 0);
.entry("slice_index", 0)
.entry("partitioning_strategies", matchesMap().entry("rest-esql-test:0", "SHARD"));
default -> throw new AssertionError("unexpected status: " + o);
};
MapMatcher expectedOp = matchesMap().entry("operator", startsWith(name));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,15 @@
package org.elasticsearch.xpack.esql.optimizer;

import org.elasticsearch.xpack.esql.core.expression.FoldContext;
import org.elasticsearch.xpack.esql.planner.PhysicalSettings;
import org.elasticsearch.xpack.esql.plugin.EsqlFlags;
import org.elasticsearch.xpack.esql.session.Configuration;
import org.elasticsearch.xpack.esql.stats.SearchStats;

public record LocalPhysicalOptimizerContext(EsqlFlags flags, Configuration configuration, FoldContext foldCtx, SearchStats searchStats) {}
public record LocalPhysicalOptimizerContext(
PhysicalSettings physicalSettings,
EsqlFlags flags,
Configuration configuration,
FoldContext foldCtx,
SearchStats searchStats
) {}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute;
import org.elasticsearch.xpack.esql.core.expression.NameId;
import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
import org.elasticsearch.xpack.esql.expression.Foldables;
import org.elasticsearch.xpack.esql.expression.Order;
import org.elasticsearch.xpack.esql.expression.function.scalar.spatial.BinarySpatialFunction;
import org.elasticsearch.xpack.esql.expression.function.scalar.spatial.SpatialRelatesUtils;
Expand All @@ -28,6 +29,7 @@
import org.elasticsearch.xpack.esql.plan.physical.EvalExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
import org.elasticsearch.xpack.esql.planner.PhysicalSettings;

import java.util.ArrayList;
import java.util.LinkedHashMap;
Expand Down Expand Up @@ -63,7 +65,12 @@ public class PushTopNToSource extends PhysicalOptimizerRules.ParameterizedOptimi

@Override
protected PhysicalPlan rule(TopNExec topNExec, LocalPhysicalOptimizerContext ctx) {
Pushable pushable = evaluatePushable(ctx.foldCtx(), topNExec, LucenePushdownPredicates.from(ctx.searchStats(), ctx.flags()));
Pushable pushable = evaluatePushable(
ctx.physicalSettings(),
ctx.foldCtx(),
topNExec,
LucenePushdownPredicates.from(ctx.searchStats(), ctx.flags())
);
return pushable.rewrite(topNExec);
}

Expand Down Expand Up @@ -124,15 +131,24 @@ public PhysicalPlan rewrite(TopNExec topNExec) {
}
}

private static Pushable evaluatePushable(FoldContext ctx, TopNExec topNExec, LucenePushdownPredicates lucenePushdownPredicates) {
private static Pushable evaluatePushable(
PhysicalSettings physicalSettings,
FoldContext ctx,
TopNExec topNExec,
LucenePushdownPredicates lucenePushdownPredicates
) {
PhysicalPlan child = topNExec.child();
if (child instanceof EsQueryExec queryExec
&& queryExec.canPushSorts()
&& canPushDownOrders(topNExec.order(), lucenePushdownPredicates)) {
&& canPushDownOrders(topNExec.order(), lucenePushdownPredicates)
&& canPushLimit(topNExec, physicalSettings)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks correct.

// With the simplest case of `FROM index | SORT ...` we only allow pushing down if the sort is on a field
return new PushableQueryExec(queryExec);
}
if (child instanceof EvalExec evalExec && evalExec.child() instanceof EsQueryExec queryExec && queryExec.canPushSorts()) {
if (child instanceof EvalExec evalExec
&& evalExec.child() instanceof EsQueryExec queryExec
&& queryExec.canPushSorts()
&& canPushLimit(topNExec, physicalSettings)) {
Comment on lines +148 to +151
Copy link
Contributor

@alex-spies alex-spies Sep 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks right to me, but I think @craigtaverner should have a look, too. I don't remember if it's bad when we somehow cannot push a WHERE dist < 10 | EVAL ST_DISTANCE = (...) to Lucene. Craig, we don't end up with un-executable queries, they're just slow when we cannot push, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. Failing to push down ST_DISTANCE will not cause the query to fail, just run slow. And as I understand it this PR will only block pushdown for extremely large LIMIT values, which are an extreme case anyway.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

10,000

// When we have an EVAL between the FROM and the SORT, we consider pushing down if the sort is on a field and/or
// a distance function defined in the EVAL. We also move the EVAL to after the SORT.
List<Order> orders = topNExec.order();
Expand Down Expand Up @@ -204,6 +220,10 @@ private static boolean canPushDownOrders(List<Order> orders, LucenePushdownPredi
return orders.stream().allMatch(o -> isSortableAttribute.apply(o.child(), lucenePushdownPredicates));
}

private static boolean canPushLimit(TopNExec topn, PhysicalSettings physicalSettings) {
return Foldables.limitValue(topn.limit(), topn.sourceText()) <= physicalSettings.luceneTopNLimit();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: this could be simplified with

public static Integer limitValue(Expression limitField, String sourceText) {
if (limitField instanceof Literal literal) {
Object value = literal.value();
if (value instanceof Integer intValue) {
return intValue;
}
}
throw new EsqlIllegalArgumentException(format(null, "Limit value must be an integer in [{}], found [{}]", sourceText, limitField));
}


private static List<EsQueryExec.Sort> buildFieldSorts(List<Order> orders) {
List<EsQueryExec.Sort> sorts = new ArrayList<>(orders.size());
for (Order o : orders) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec,
int rowEstimatedSize = esQueryExec.estimatedRowSize();
int limit = esQueryExec.limit() != null ? (Integer) esQueryExec.limit().fold(context.foldCtx()) : NO_LIMIT;
boolean scoring = esQueryExec.hasScoring();
if ((sorts != null && sorts.isEmpty() == false)) {
if (sorts != null && sorts.isEmpty() == false) {
List<SortBuilder<?>> sortBuilders = new ArrayList<>(sorts.size());
long estimatedPerRowSortSize = 0;
for (Sort sort : sorts) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@

import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.MemorySizeValue;
import org.elasticsearch.compute.lucene.DataPartitioning;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.monitor.jvm.JvmInfo;

/**
Expand All @@ -35,23 +37,34 @@ public class PhysicalSettings {
Setting.Property.Dynamic
);

public static final Setting<Integer> LUCENE_TOPN_LIMIT = Setting.intSetting(
"esql.lucene_topn_limit",
IndexSettings.MAX_RESULT_WINDOW_SETTING.getDefault(Settings.EMPTY),
-1,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not limit by default?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uh, just realized IndexSettings.MAX_RESULT_WINDOW_SETTING.getDefault(Settings.EMPTY) is a default value

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right! We default to the window's default.

Setting.Property.NodeScope,
Setting.Property.Dynamic
);

private volatile DataPartitioning defaultDataPartitioning;
private volatile ByteSizeValue valuesLoadingJumboSize;
private volatile int luceneTopNLimit;

/**
* Ctor for prod that listens for updates from the {@link ClusterService}.
*/
public PhysicalSettings(ClusterService clusterService) {
clusterService.getClusterSettings().initializeAndWatch(DEFAULT_DATA_PARTITIONING, v -> this.defaultDataPartitioning = v);
clusterService.getClusterSettings().initializeAndWatch(VALUES_LOADING_JUMBO_SIZE, v -> this.valuesLoadingJumboSize = v);
clusterService.getClusterSettings().initializeAndWatch(LUCENE_TOPN_LIMIT, v -> this.luceneTopNLimit = v);
}

/**
* Ctor for testing.
*/
public PhysicalSettings(DataPartitioning defaultDataPartitioning, ByteSizeValue valuesLoadingJumboSize) {
public PhysicalSettings(DataPartitioning defaultDataPartitioning, ByteSizeValue valuesLoadingJumboSize, int luceneTopNLimit) {
this.defaultDataPartitioning = defaultDataPartitioning;
this.valuesLoadingJumboSize = valuesLoadingJumboSize;
this.luceneTopNLimit = luceneTopNLimit;
}

public DataPartitioning defaultDataPartitioning() {
Expand All @@ -61,4 +74,22 @@ public DataPartitioning defaultDataPartitioning() {
public ByteSizeValue valuesLoadingJumboSize() {
return valuesLoadingJumboSize;
}

/**
* Maximum {@code LIMIT} that we're willing to push to Lucene's topn.
* <p>
* Lucene's topn code was designed for <strong>search</strong>
* which typically fetches 10 or 30 or 50 or 100 or 1000 documents.
* That's as many you want on a page, and that's what it's designed for.
* But if you go to, say, page 10, Lucene implements this as a search
* for {@code page_size * page_number} docs and then materializes only
* the last {@code page_size} documents. Traditionally, Elasticsearch
* limits that {@code page_size * page_number} which it calls the
* {@link IndexSettings#MAX_RESULT_WINDOW_SETTING "result window"}.
* So! ESQL defaults to the same default - {@code 10,000}.
* </p>
*/
public int luceneTopNLimit() {
return luceneTopNLimit;
}
}
Loading