Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
95 commits
Select commit Hold shift + click to select a range
3322f34
Add initial blockhash with TreeSet and basic operator test
ivancea Apr 16, 2025
fcdfbd4
Fix dedupe for nulls and multivalues
ivancea Apr 16, 2025
7282e98
Added tests for null order and sort order
ivancea Apr 16, 2025
b6d7265
Merge branch 'main' into esql-top-n-agg-grouping
ivancea Apr 21, 2025
4950f5f
Move generic BlockHash code to base TestCase
ivancea Apr 21, 2025
a425c5d
Initial TopNBlockHash test structure, WIP
ivancea Apr 21, 2025
665ace3
Added TopNBlockHash test without nulls, and fixed bug in ords
ivancea Apr 22, 2025
6616fdf
Fix BlockHashTestCase keys assertion early return not checking all keys
ivancea Apr 22, 2025
d838057
Added nulls tests, fixed dedupe lookup nulls handling and nulls error…
ivancea Apr 22, 2025
e78fac5
Added multivalue tests and fixed dedupe with multivalues and no matches
ivancea Apr 22, 2025
143d86f
Update docs/changelog/127148.yaml
ivancea Apr 22, 2025
fa3a779
[CI] Auto commit changes from spotless
Apr 22, 2025
4b212cd
Remove TopNBlockHash abstract class
ivancea Apr 22, 2025
87eb135
Remove unused class
ivancea Apr 23, 2025
c9635f9
Merge branch 'main' into esql-top-n-agg-grouping
ivancea Apr 23, 2025
29fa4d1
Fix SeenGroupIds and added extra tests for it
ivancea Apr 23, 2025
7ca3ce3
Included TopNLongBlockHash into BlockHash.ubuild() and GroupSpec logi…
ivancea Apr 24, 2025
13900f6
[CI] Auto commit changes from spotless
Apr 24, 2025
c778b3b
Keep the last value in the top while using TreeSet, for performance
ivancea Apr 25, 2025
270eb6b
Merge branch 'main' into esql-top-n-agg-grouping
ivancea Apr 25, 2025
5483dc2
Use LongBucketedSort and LongHash for unique seen values
ivancea Apr 25, 2025
3a775ac
Add field recording amount on values in top structure
ivancea Apr 25, 2025
2dde5c1
Fixed to have both last top value and value count
ivancea Apr 29, 2025
a5855c1
Specialize block parameters on AddInput
ivancea Apr 30, 2025
5176663
Call the specific add() methods for eacj block type
ivancea Apr 30, 2025
fb670bd
Implement custom add in HashAggregationOperator
ivancea Apr 30, 2025
ea433a2
Merge branch 'main' into esql-top-n-agg-grouping-tests
ivancea May 5, 2025
fd97a8c
Updated new aggs
ivancea May 5, 2025
ddb6735
Added custom sorted structure
ivancea May 5, 2025
3d83784
Improve limit migration logic
ivancea May 5, 2025
23195a8
Merge branch 'main' into esql-top-n-agg-grouping-tests
ivancea May 7, 2025
afff00e
Merge branch 'main' into esql-top-n-agg-grouping
ivancea May 20, 2025
a54ba97
Spotless
ivancea May 20, 2025
20f75c1
Merge branch 'main' into esql-top-n-agg-grouping-tests
ivancea May 21, 2025
8d101fb
Remove count var
ivancea May 21, 2025
c27faa0
Removed lastValue field
ivancea May 22, 2025
9cff744
Added LonghHash for early detection of existing values
ivancea May 22, 2025
22da89e
Revert "Added LonghHash for early detection of existing values"
ivancea May 22, 2025
9c12c35
Merge branch 'main' into esql-top-n-agg-grouping-tests
ivancea May 27, 2025
714aed9
Merge branch 'main' into esql-top-n-agg-grouping
ivancea May 28, 2025
9d8f749
Merge branch 'esql-top-n-agg-grouping' into esql-top-n-agg-grouping-t…
ivancea May 28, 2025
ed18e7c
Make operator tests not depend directly on the BlockHash implementation
ivancea May 28, 2025
16c88eb
Restore original BucketedSort
ivancea May 28, 2025
2b1f868
Improve migration and remove unused BucketedSort test
ivancea May 28, 2025
f0349f4
Remove fixed TO-DOs
ivancea May 28, 2025
e7ebd05
Fix potential memory leak on circuit breaker error
ivancea May 28, 2025
40b7682
Merge branch 'main' into esql-top-n-agg-grouping
ivancea May 29, 2025
76291a6
Renamed topN set and added tests
ivancea May 29, 2025
dd48d97
Remove unused code from TopNMultivalueDedupeLong
ivancea May 29, 2025
e63b73a
Fix missing success=true in constructor
ivancea May 29, 2025
ebcab5f
Merge branch 'main' into esql-top-n-agg-ql
ivancea May 30, 2025
3916c28
Initial test with attributes within Aggregate
ivancea Jun 5, 2025
ee00361
WIP: TopNAggregate nodes, exec not fully implemented yet
ivancea Jun 6, 2025
ab0452b
Revert "WIP: TopNAggregate nodes, exec not fully implemented yet"
ivancea Jun 9, 2025
a3db630
Added LognHash to early discard values in set and avoid logn searches
ivancea Jun 9, 2025
531711b
Revert "Added LognHash to early discard values in set and avoid logn …
ivancea Jun 9, 2025
92311cd
Reapply "WIP: TopNAggregate nodes, exec not fully implemented yet"
ivancea Jun 9, 2025
70095cf
Fully implemented TopNAggregate, optimizations and rules to be reviewed
ivancea Jun 9, 2025
54aec22
Added TopNAgg csv tests for count and multiple aggs, which can be pus…
ivancea Jun 11, 2025
b2c42e4
Merge branch 'main' into esql-top-n-agg-grouping
ivancea Jun 11, 2025
7446e77
Merge branch 'esql-top-n-agg-grouping' into esql-top-n-agg-ql
ivancea Jun 11, 2025
31a2300
Merge branch 'main' into esql-top-n-agg-ql
ivancea Jun 11, 2025
d0677c0
Updated Physicial rules and fixed Spatial rule for time series
ivancea Jun 11, 2025
74e780c
Format
ivancea Jun 11, 2025
187dc92
Update with() methods
ivancea Jun 11, 2025
6cff9fb
Assert no TimeSeriesAggregate is converted, and fix logical optimizer…
ivancea Jun 12, 2025
4ce6465
Merge branch 'main' into esql-top-n-agg-ql
ivancea Jun 12, 2025
a2d4638
Replaces missing cases of AggregateExec to AbstractAggregateExec and …
ivancea Jun 12, 2025
3ca55b1
Fixed planner tests and added some extra ones
ivancea Jun 16, 2025
cc9fa02
Merge branch 'main' into esql-top-n-agg-ql
ivancea Jun 17, 2025
a7edabf
Made TopNAggregate a child of Aggregate
ivancea Jun 17, 2025
56ba38b
Merge branch 'main' into esql-top-n-agg-ql
ivancea Jun 25, 2025
7132241
Added pragma to choose the max limit to use the topN agg enhancement
ivancea Jun 25, 2025
3e41bce
Fix outdated test
ivancea Jun 25, 2025
0eb2510
Merge branch 'main' into esql-top-n-agg-ql
ivancea Jun 26, 2025
375e83d
Fix TimeSeries resolution
ivancea Jun 26, 2025
9661efe
Improved replacement rule assertion
ivancea Jun 26, 2025
b31d5c6
Update BytesRef handling
ivancea Jun 26, 2025
ae2c213
Update docs/changelog/130111.yaml
ivancea Jun 26, 2025
8031385
Fixed benchmarks
ivancea Jun 26, 2025
e490ed2
Fix outdated test
ivancea Jun 26, 2025
a32e843
Added docs on BlockHash pragma param
ivancea Jun 27, 2025
d27069f
Removed AbstractAggregateExec
ivancea Jun 27, 2025
76099ca
Restore AggregateExec field visibility
ivancea Jun 27, 2025
e1a0dd3
Restore AggregateExec method order to simplify review
ivancea Jun 27, 2025
027fd2b
Remove copypasted comment and fix calls after new field visibility
ivancea Jun 27, 2025
3594413
Merge branch 'main' into esql-top-n-agg-ql
ivancea Jun 30, 2025
26676b9
Merge branch 'main' into esql-top-n-agg-ql
ivancea Jul 10, 2025
a870426
Merge branch 'main' into esql-top-n-agg-ql
ivancea Jul 11, 2025
69946a7
Merge branch 'main' into esql-top-n-agg-ql
ivancea Jul 14, 2025
64fd705
Added topN stats generative test generator
ivancea Jul 14, 2025
adcfe2f
Changed pragma default value to 0, and fully disabled TopNAggs on 0 l…
ivancea Jul 15, 2025
e925a98
Fix build
ivancea Jul 15, 2025
ccdfc7e
Merge branch 'main' into esql-top-n-agg-ql
ivancea Jul 18, 2025
f8fee53
Fix test compilation
ivancea Jul 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ private static Operator operator(DriverContext driverContext, String grouping, S
};
return new HashAggregationOperator(
List.of(supplier(op, dataType, filter).groupingAggregatorFactory(AggregatorMode.SINGLE, List.of(groups.size()))),
() -> BlockHash.build(groups, driverContext.blockFactory(), 16 * 1024, false),
() -> BlockHash.build(groups, driverContext.blockFactory(), 16 * 1024, false, TOP_N_LIMIT),
driverContext
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ private static Operator operator(DriverContext driverContext, int groups, String
List<BlockHash.GroupSpec> groupSpec = List.of(new BlockHash.GroupSpec(0, ElementType.LONG));
return new HashAggregationOperator(
List.of(supplier(dataType).groupingAggregatorFactory(AggregatorMode.SINGLE, List.of(1))),
() -> BlockHash.build(groupSpec, driverContext.blockFactory(), 16 * 1024, false),
() -> BlockHash.build(groupSpec, driverContext.blockFactory(), 16 * 1024, false, 100),
driverContext
) {
@Override
Expand Down
5 changes: 5 additions & 0 deletions docs/changelog/130111.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 130111
summary: Plug TopN agg groupings filtering into the language
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -159,12 +159,20 @@ public boolean isCategorize() {
* null handling and remove this flag, but we need to disable these in
* production until we can. And this lets us continue to compile and
* test them.
* @param maxTopNLimit the maximum limit for TopN groups to use a TopNBlockHash.
* This usually comes from {@code QueryPragma.maxTopNAggsLimit()}.
*/
public static BlockHash build(List<GroupSpec> groups, BlockFactory blockFactory, int emitBatchSize, boolean allowBrokenOptimizations) {
public static BlockHash build(
List<GroupSpec> groups,
BlockFactory blockFactory,
int emitBatchSize,
boolean allowBrokenOptimizations,
int maxTopNLimit
) {
if (groups.size() == 1) {
GroupSpec group = groups.get(0);
if (group.topNDef() != null && group.elementType() == ElementType.LONG) {
TopNDef topNDef = group.topNDef();
TopNDef topNDef = group.topNDef();
if (topNDef != null && maxTopNLimit > 0 && group.elementType() == ElementType.LONG && topNDef.limit() < maxTopNLimit) {
return new LongTopNBlockHash(group.channel(), topNDef.asc(), topNDef.nullsFirst(), topNDef.limit(), blockFactory);
}
return newForElementType(group.channel(), group.elementType(), blockFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ public record HashAggregationOperatorFactory(
AggregatorMode aggregatorMode,
List<GroupingAggregator.Factory> aggregators,
int maxPageSize,
AnalysisRegistry analysisRegistry
AnalysisRegistry analysisRegistry,
int maxTopNLimit
) implements OperatorFactory {
@Override
public Operator get(DriverContext driverContext) {
Expand All @@ -65,7 +66,7 @@ public Operator get(DriverContext driverContext) {
}
return new HashAggregationOperator(
aggregators,
() -> BlockHash.build(groups, driverContext.blockFactory(), maxPageSize, false),
() -> BlockHash.build(groups, driverContext.blockFactory(), maxPageSize, false, maxTopNLimit),
driverContext
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public record Factory(
List<BlockHash.GroupSpec> groups,
AggregatorMode aggregatorMode,
List<GroupingAggregator.Factory> aggregators,
int maxPageSize
int maxPageSize,
int maxTopNLimit
) implements OperatorFactory {
@Override
public Operator get(DriverContext driverContext) {
Expand All @@ -48,7 +49,8 @@ public Operator get(DriverContext driverContext) {
groups,
driverContext.blockFactory(),
maxPageSize,
true // we can enable optimizations as the inputs are vectors
true, // we can enable optimizations as the inputs are vectors
maxTopNLimit
);
}
}, driverContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,15 +117,17 @@ private Operator.OperatorFactory simpleWithMode(
mode,
List.of(supplier.groupingAggregatorFactory(mode, channels(mode))),
randomPageSize(),
null
null,
100
);
} else {
return new RandomizingHashAggregationOperatorFactory(
List.of(new BlockHash.GroupSpec(0, ElementType.LONG)),
mode,
List.of(supplier.groupingAggregatorFactory(mode, channels(mode))),
randomPageSize(),
null
null,
100
);
}
}
Expand Down Expand Up @@ -824,7 +826,8 @@ private record RandomizingHashAggregationOperatorFactory(
AggregatorMode aggregatorMode,
List<GroupingAggregator.Factory> aggregators,
int maxPageSize,
AnalysisRegistry analysisRegistry
AnalysisRegistry analysisRegistry,
int maxTopNLimit
) implements Operator.OperatorFactory {

@Override
Expand All @@ -838,7 +841,7 @@ public Operator get(DriverContext driverContext) {
analysisRegistry,
maxPageSize
)
: BlockHash.build(groups, driverContext.blockFactory(), maxPageSize, false);
: BlockHash.build(groups, driverContext.blockFactory(), maxPageSize, false, maxTopNLimit);

return new BlockHashWrapper(driverContext.blockFactory(), blockHash) {
@Override
Expand Down Expand Up @@ -886,7 +889,8 @@ public String describe() {
aggregatorMode,
aggregators,
maxPageSize,
analysisRegistry
analysisRegistry,
maxTopNLimit
).describe();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ private BlockHash newBlockHash(BlockFactory blockFactory, int emitBatchSize, Lis
}
return forcePackedHash
? new PackedValuesBlockHash(specs, blockFactory, emitBatchSize)
: BlockHash.build(specs, blockFactory, emitBatchSize, true);
: BlockHash.build(specs, blockFactory, emitBatchSize, true, 100);
}

private static final int LOOKUP_POSITIONS = 1_000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.startsWith;

public class BlockHashTests extends BlockHashTestCase {
Expand Down Expand Up @@ -1544,7 +1546,8 @@ public void testTimeSeriesBlockHash() throws Exception {
List.of(new BlockHash.GroupSpec(0, ElementType.BYTES_REF), new BlockHash.GroupSpec(1, ElementType.LONG)),
blockFactory,
32 * 1024,
forcePackedHash
forcePackedHash,
100
);
int numPages = between(1, 100);
int globalTsid = -1;
Expand Down Expand Up @@ -1660,6 +1663,34 @@ public void close() {
}
}

public void testTopNBlockHashLimit() {
int limit = randomIntBetween(1, 100);

try (
var hash = BlockHash.build(
List.of(new BlockHash.GroupSpec(0, ElementType.LONG, null, new BlockHash.TopNDef(0, true, false, limit))),
blockFactory,
32 * 1024,
forcePackedHash,
randomIntBetween(limit, 1000)
)
) {
assertThat(hash, instanceOf(LongTopNBlockHash.class));
}

try (
var hash = BlockHash.build(
List.of(new BlockHash.GroupSpec(0, ElementType.LONG, null, new BlockHash.TopNDef(0, true, false, limit))),
blockFactory,
32 * 1024,
forcePackedHash,
randomIntBetween(1, limit)
)
) {
assertThat(hash, not(instanceOf(LongTopNBlockHash.class)));
}
}

/**
* Hash some values into a single block of group ids. If the hash produces
* more than one block of group ids this will fail.
Expand Down Expand Up @@ -1713,6 +1744,6 @@ private BlockHash buildBlockHash(int emitBatchSize, Block... values) {
}
return forcePackedHash
? new PackedValuesBlockHash(specs, blockFactory, emitBatchSize)
: BlockHash.build(specs, blockFactory, emitBatchSize, true);
: BlockHash.build(specs, blockFactory, emitBatchSize, true, 100);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,8 @@ public void testCategorize_withDriver() {
new MaxLongAggregatorFunctionSupplier().groupingAggregatorFactory(AggregatorMode.INITIAL, List.of(1))
),
16 * 1024,
analysisRegistry
analysisRegistry,
100
).get(driverContext)
),
new PageConsumerOperator(intermediateOutput::add)
Expand All @@ -532,7 +533,8 @@ public void testCategorize_withDriver() {
new MaxLongAggregatorFunctionSupplier().groupingAggregatorFactory(AggregatorMode.INITIAL, List.of(1))
),
16 * 1024,
analysisRegistry
analysisRegistry,
100
).get(driverContext)
),
new PageConsumerOperator(intermediateOutput::add)
Expand All @@ -553,7 +555,8 @@ public void testCategorize_withDriver() {
new MaxLongAggregatorFunctionSupplier().groupingAggregatorFactory(AggregatorMode.FINAL, List.of(3, 4))
),
16 * 1024,
analysisRegistry
analysisRegistry,
100
).get(driverContext)
),
new PageConsumerOperator(finalOutput::add)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ public void testCategorize_withDriver() {
AggregatorMode.INITIAL,
List.of(new ValuesBytesRefAggregatorFunctionSupplier().groupingAggregatorFactory(AggregatorMode.INITIAL, List.of(0))),
16 * 1024,
analysisRegistry
analysisRegistry,
100
).get(driverContext)
),
new PageConsumerOperator(intermediateOutput::add)
Expand All @@ -167,7 +168,8 @@ public void testCategorize_withDriver() {
AggregatorMode.INITIAL,
List.of(new ValuesBytesRefAggregatorFunctionSupplier().groupingAggregatorFactory(AggregatorMode.INITIAL, List.of(0))),
16 * 1024,
analysisRegistry
analysisRegistry,
100
).get(driverContext)
),
new PageConsumerOperator(intermediateOutput::add)
Expand All @@ -185,7 +187,8 @@ public void testCategorize_withDriver() {
AggregatorMode.FINAL,
List.of(new ValuesBytesRefAggregatorFunctionSupplier().groupingAggregatorFactory(AggregatorMode.FINAL, List.of(2))),
16 * 1024,
analysisRegistry
analysisRegistry,
100
).get(driverContext)
),
new PageConsumerOperator(finalOutput::add)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ protected Operator.OperatorFactory simpleWithMode(SimpleOptions options, Aggrega
new MaxLongAggregatorFunctionSupplier().groupingAggregatorFactory(mode, maxChannels)
),
randomPageSize(),
null
null,
100
);
}

Expand Down Expand Up @@ -103,6 +104,7 @@ protected void assertSimpleOutput(List<Page> input, List<Page> results) {

public void testTopNNullsLast() {
boolean ascOrder = randomBoolean();
int limit = 3;
var groups = new Long[] { 0L, 10L, 20L, 30L, 40L, 50L };
if (ascOrder) {
Arrays.sort(groups, Comparator.reverseOrder());
Expand All @@ -113,14 +115,15 @@ public void testTopNNullsLast() {

try (
var operator = new HashAggregationOperator.HashAggregationOperatorFactory(
List.of(new BlockHash.GroupSpec(groupChannel, ElementType.LONG, null, new BlockHash.TopNDef(0, ascOrder, false, 3))),
List.of(new BlockHash.GroupSpec(groupChannel, ElementType.LONG, null, new BlockHash.TopNDef(0, ascOrder, false, limit))),
mode,
List.of(
new SumLongAggregatorFunctionSupplier().groupingAggregatorFactory(mode, aggregatorChannels),
new MaxLongAggregatorFunctionSupplier().groupingAggregatorFactory(mode, aggregatorChannels)
),
randomPageSize(),
null
null,
randomIntBetween(limit, 1000)
).get(driverContext())
) {
var page = new Page(
Expand Down Expand Up @@ -180,6 +183,7 @@ public void testTopNNullsLast() {

public void testTopNNullsFirst() {
boolean ascOrder = randomBoolean();
int limit = 3;
var groups = new Long[] { 0L, 10L, 20L, 30L, 40L, 50L };
if (ascOrder) {
Arrays.sort(groups, Comparator.reverseOrder());
Expand All @@ -190,14 +194,15 @@ public void testTopNNullsFirst() {

try (
var operator = new HashAggregationOperator.HashAggregationOperatorFactory(
List.of(new BlockHash.GroupSpec(groupChannel, ElementType.LONG, null, new BlockHash.TopNDef(0, ascOrder, true, 3))),
List.of(new BlockHash.GroupSpec(groupChannel, ElementType.LONG, null, new BlockHash.TopNDef(0, ascOrder, true, limit))),
mode,
List.of(
new SumLongAggregatorFunctionSupplier().groupingAggregatorFactory(mode, aggregatorChannels),
new MaxLongAggregatorFunctionSupplier().groupingAggregatorFactory(mode, aggregatorChannels)
),
randomPageSize(),
null
null,
randomIntBetween(limit, 1000)
).get(driverContext())
) {
var page = new Page(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.xpack.esql.qa.rest.generative.command.pipe.RenameGenerator;
import org.elasticsearch.xpack.esql.qa.rest.generative.command.pipe.SortGenerator;
import org.elasticsearch.xpack.esql.qa.rest.generative.command.pipe.StatsGenerator;
import org.elasticsearch.xpack.esql.qa.rest.generative.command.pipe.TopNStatsGenerator;
import org.elasticsearch.xpack.esql.qa.rest.generative.command.pipe.WhereGenerator;
import org.elasticsearch.xpack.esql.qa.rest.generative.command.source.FromGenerator;

Expand Down Expand Up @@ -65,6 +66,7 @@ public record QueryExecuted(String query, int depth, List<Column> outputSchema,
RenameGenerator.INSTANCE,
SortGenerator.INSTANCE,
StatsGenerator.INSTANCE,
TopNStatsGenerator.INSTANCE,
WhereGenerator.INSTANCE
);

Expand Down
Loading
Loading