From 5a06fccfd23b2b3945107b8363c12df00568205d Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Fri, 24 Jun 2022 14:13:17 -0400 Subject: [PATCH 01/13] REST tests for percentiles_bucket agg Adds REST tests for the `percentiles_bucket` pipeline bucket aggregation. This gives us forwards and backwards compatibility tests for these aggs as well as mixed version cluster tests for these aggs. Relates to #26220 --- .../500_percentiles_bucket.yml | 293 ++++++++++++++++++ 1 file changed, 293 insertions(+) create mode 100644 rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search.aggregation/500_percentiles_bucket.yml diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search.aggregation/500_percentiles_bucket.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search.aggregation/500_percentiles_bucket.yml new file mode 100644 index 0000000000000..2133f0b3a0f3a --- /dev/null +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search.aggregation/500_percentiles_bucket.yml @@ -0,0 +1,293 @@ +setup: + - do: + bulk: + index: no_gaps + refresh: true + body: + - { "index": { } } + - { "@timestamp": "2022-01-01T00:00:00", "v": 1 } + - { "index": { } } + - { "@timestamp": "2022-01-01T01:00:00", "v": 2 } + - { "index": { } } + - { "@timestamp": "2022-01-01T02:00:00", "v": 1 } + + - do: + bulk: + index: gaps + refresh: true + body: + - { "index": { } } + - { "@timestamp": "2022-01-01T00:00:00", "v": 1 } + - { "index": { } } + - { "@timestamp": "2022-01-01T02:00:00", "v": 2 } + - { "index": { } } + - { "@timestamp": "2022-01-01T03:00:00", "v": 1 } + +--- +basic: + - do: + search: + index: no_gaps + body: + size: 0 + aggs: + "@timestamp": + date_histogram: + field: "@timestamp" + fixed_interval: 1h + aggs: + v: {avg: {field: v}} + d: + percentiles_bucket: + buckets_path: "@timestamp>v" + - match: { hits.total.value: 3 } + - length: { aggregations.@timestamp.buckets: 3 } + - match: + aggregations.d.values: + 1.0: 1.0 + 5.0: 1.0 + 25.0: 1.0 + 50.0: 1.0 + 75.0: 2.0 + 95.0: 2.0 + 99.0: 2.0 + +--- +format: + - do: + search: + index: no_gaps + body: + size: 0 + aggs: + "@timestamp": + date_histogram: + field: "@timestamp" + fixed_interval: 1h + aggs: + v: {avg: {field: v}} + d: + percentiles_bucket: + buckets_path: "@timestamp>v" + format: "0.00" + - match: { hits.total.value: 3 } + - length: { aggregations.@timestamp.buckets: 3 } + - match: + aggregations.d.values: + 1.0: 1.0 + 5.0: 1.0 + 25.0: 1.0 + 50.0: 1.0 + 75.0: 2.0 + 95.0: 2.0 + 99.0: 2.0 + 1.0_as_string: "1.00" + 5.0_as_string: "1.00" + 50.0_as_string: "1.00" + 25.0_as_string: "1.00" + 75.0_as_string: "2.00" + 95.0_as_string: "2.00" + 99.0_as_string: "2.00" + +--- +gap_policy=skip: + - do: + search: + index: gaps + body: + size: 0 + aggs: + "@timestamp": + date_histogram: + field: "@timestamp" + fixed_interval: 1h + aggs: + v: {avg: {field: v}} + d: + percentiles_bucket: + buckets_path: "@timestamp>v" + gap_policy: skip + - match: { hits.total.value: 3 } + - length: { aggregations.@timestamp.buckets: 4 } + - match: + aggregations.d.values: + 1.0: 1.0 + 5.0: 1.0 + 25.0: 1.0 + 50.0: 1.0 + 75.0: 2.0 + 95.0: 2.0 + 99.0: 2.0 + +--- +gap_policy=insert_zeros: + - do: + search: + index: gaps + body: + size: 0 + aggs: + "@timestamp": + date_histogram: + field: "@timestamp" + fixed_interval: 1h + aggs: + v: {avg: {field: v}} + d: + percentiles_bucket: + buckets_path: "@timestamp>v" + gap_policy: insert_zeros + - match: { hits.total.value: 3 } + - length: { aggregations.@timestamp.buckets: 4 } + - match: + aggregations.d.values: + 1.0: 0.0 + 5.0: 0.0 + 25.0: 1.0 + 50.0: 1.0 + 75.0: 1.0 + 95.0: 2.0 + 99.0: 2.0 + +--- +gap_policy=keep_value: + - do: + search: + index: gaps + body: + size: 0 + aggs: + "@timestamp": + date_histogram: + field: "@timestamp" + fixed_interval: 1h + aggs: + v: {avg: {field: v}} + d: + percentiles_bucket: + buckets_path: "@timestamp>v" + gap_policy: keep_values + - match: { hits.total.value: 3 } + - length: { aggregations.@timestamp.buckets: 4 } + - match: + aggregations.d.values: + 1.0: 1.0 + 5.0: 1.0 + 25.0: 1.0 + 50.0: 1.0 + 75.0: 2.0 + 95.0: 2.0 + 99.0: 2.0 + +--- +dotted name: + - do: + search: + index: no_gaps + body: + size: 0 + aggs: + "@time.stamp": + date_histogram: + field: "@timestamp" + fixed_interval: 1h + aggs: + v: {avg: {field: v}} + d: + percentiles_bucket: + buckets_path: "@time.stamp>v" + - match: { hits.total.value: 3 } + - length: { aggregations.@time\.stamp.buckets: 3 } + - match: + aggregations.d.values: + 1.0: 1.0 + 5.0: 1.0 + 25.0: 1.0 + 50.0: 1.0 + 75.0: 2.0 + 95.0: 2.0 + 99.0: 2.0 + +--- +dotted value: + - do: + search: + index: no_gaps + body: + size: 0 + aggs: + "@timestamp": + date_histogram: + field: "@timestamp" + fixed_interval: 1h + aggs: + v: + percentiles: + field: v + percents: [ 50, 99.9 ] + d: + percentiles_bucket: + buckets_path: "@timestamp>v[99.9]" + - match: { hits.total.value: 3 } + - length: { aggregations.@timestamp.buckets: 3 } + - match: + aggregations.d.values: + 1.0: 1.0 + 5.0: 1.0 + 25.0: 1.0 + 50.0: 1.0 + 75.0: 2.0 + 95.0: 2.0 + 99.0: 2.0 + +--- +no results: + - do: + search: + index: no_gaps + body: + size: 0 + query: + match: + missing_field: not_found + aggs: + "@timestamp": + date_histogram: + field: "@timestamp" + fixed_interval: 1h + aggs: + v: {avg: {field: v}} + d: + percentiles_bucket: + buckets_path: "@timestamp>v" + - match: { hits.total.value: 0 } + - length: { aggregations.@timestamp.buckets: 0 } + - match: + aggregations.d.values: + 1.0: null + 5.0: null + 25.0: null + 50.0: null + 75.0: null + 95.0: null + 99.0: null + +--- +bad path: + - do: + catch: '/Validation Failed: 1: No aggregation \[v\] found for path \[@timestamp>v\];/' + search: + index: no_gaps + body: + size: 0 + query: + match: + missing_field: not_found + aggs: + "@timestamp": + date_histogram: + field: "@timestamp" + fixed_interval: 1h + d: + percentiles_bucket: + buckets_path: "@timestamp>v" From bc37bb13326a5b483416118ee2d7427210054fcb Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Mon, 10 Nov 2025 13:57:31 -0500 Subject: [PATCH 02/13] ESQL: Fush MV_MIN and MV_MAX into field loading --- .../index/mapper/IpFieldMapper.java | 27 ++- .../index/mapper/KeywordFieldMapper.java | 18 +- .../index/mapper/NumberFieldMapper.java | 108 +++++++++- .../BlockLoaderFunctionConfig.java | 4 + .../single_node/PushExpressionToLoadIT.java | 131 +++++++++++++ .../blockloader/BlockLoaderExpression.java | 185 +++++++++++++++++- .../function/scalar/multivalue/MvMax.java | 14 +- .../function/scalar/multivalue/MvMin.java | 14 +- .../LocalLogicalPlanOptimizerTests.java | 2 +- 9 files changed, 492 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/mapper/IpFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/IpFieldMapper.java index deea53d180e10..cf132746af902 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/IpFieldMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/IpFieldMapper.java @@ -33,7 +33,10 @@ import org.elasticsearch.index.fielddata.FieldDataContext; import org.elasticsearch.index.fielddata.IndexFieldData; import org.elasticsearch.index.fielddata.plain.SortedSetOrdinalsIndexFieldData; +import org.elasticsearch.index.mapper.blockloader.BlockLoaderFunctionConfig; import org.elasticsearch.index.mapper.blockloader.docvalues.BytesRefsFromOrdsBlockLoader; +import org.elasticsearch.index.mapper.blockloader.docvalues.MvMaxBytesRefsFromOrdsBlockLoader; +import org.elasticsearch.index.mapper.blockloader.docvalues.MvMinBytesRefsFromOrdsBlockLoader; import org.elasticsearch.index.query.SearchExecutionContext; import org.elasticsearch.script.IpFieldScript; import org.elasticsearch.script.Script; @@ -464,7 +467,15 @@ public static Query rangeQuery( @Override public BlockLoader blockLoader(BlockLoaderContext blContext) { if (hasDocValues() && (blContext.fieldExtractPreference() != FieldExtractPreference.STORED || isSyntheticSource)) { - return new BytesRefsFromOrdsBlockLoader(name()); + BlockLoaderFunctionConfig cfg = blContext.blockLoaderFunctionConfig(); + if (cfg == null) { + return new BytesRefsFromOrdsBlockLoader(name()); + } + return switch (cfg.function()) { + case MV_MAX -> new MvMaxBytesRefsFromOrdsBlockLoader(name()); + case MV_MIN -> new MvMinBytesRefsFromOrdsBlockLoader(name()); + default -> throw new UnsupportedOperationException("unknown fusion config [" + cfg.function() + "]"); + }; } if (isStored()) { @@ -482,6 +493,20 @@ public BlockLoader blockLoader(BlockLoaderContext blContext) { return new BlockSourceReader.IpsBlockLoader(sourceValueFetcher(blContext), lookup); } + @Override + public boolean supportsBlockLoaderConfig(BlockLoaderFunctionConfig config, FieldExtractPreference preference) { + if (hasDocValues() && (preference != FieldExtractPreference.STORED || isSyntheticSource)) { + if (config == null) { + return true; + } + return switch (config.function()) { + case MV_MAX, MV_MIN -> true; + default -> false; + }; + } + return true; + } + private BlockLoader blockLoaderFromFallbackSyntheticSource(BlockLoaderContext blContext) { var reader = new IpFallbackSyntheticSourceReader(nullValue); return new FallbackSyntheticSourceBlockLoader( diff --git a/server/src/main/java/org/elasticsearch/index/mapper/KeywordFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/KeywordFieldMapper.java index 328988e235fcb..97aa1e192cda8 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/KeywordFieldMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/KeywordFieldMapper.java @@ -61,6 +61,8 @@ import org.elasticsearch.index.fielddata.plain.SortedSetOrdinalsIndexFieldData; import org.elasticsearch.index.mapper.blockloader.BlockLoaderFunctionConfig; import org.elasticsearch.index.mapper.blockloader.docvalues.BytesRefsFromOrdsBlockLoader; +import org.elasticsearch.index.mapper.blockloader.docvalues.MvMaxBytesRefsFromOrdsBlockLoader; +import org.elasticsearch.index.mapper.blockloader.docvalues.MvMinBytesRefsFromOrdsBlockLoader; import org.elasticsearch.index.mapper.blockloader.docvalues.Utf8CodePointsFromOrdsBlockLoader; import org.elasticsearch.index.query.AutomatonQueryWithDescription; import org.elasticsearch.index.query.SearchExecutionContext; @@ -755,10 +757,13 @@ public BlockLoader blockLoader(BlockLoaderContext blContext) { if (cfg == null) { return new BytesRefsFromOrdsBlockLoader(name()); } - if (cfg.function() == BlockLoaderFunctionConfig.Function.LENGTH) { - return new Utf8CodePointsFromOrdsBlockLoader(((BlockLoaderFunctionConfig.JustWarnings) cfg).warnings(), name()); - } - throw new UnsupportedOperationException("unknown fusion config [" + cfg.function() + "]"); + return switch (cfg.function()) { + case LENGTH -> new Utf8CodePointsFromOrdsBlockLoader(((BlockLoaderFunctionConfig.JustWarnings) cfg).warnings(), name()); + case MV_MAX -> new MvMaxBytesRefsFromOrdsBlockLoader(name()); + case MV_MIN -> new MvMinBytesRefsFromOrdsBlockLoader(name()); + default -> throw new UnsupportedOperationException("unknown fusion config [" + cfg.function() + "]"); + }; + } if (blContext.blockLoaderFunctionConfig() != null) { throw new UnsupportedOperationException("function fusing only supported for doc values"); @@ -788,7 +793,10 @@ public Builder builder(BlockFactory factory, int expectedCount) { @Override public boolean supportsBlockLoaderConfig(BlockLoaderFunctionConfig config, FieldExtractPreference preference) { if (hasDocValues() && (preference != FieldExtractPreference.STORED || isSyntheticSourceEnabled())) { - return config.function() == BlockLoaderFunctionConfig.Function.LENGTH; + return switch (config.function()) { + case LENGTH, MV_MAX, MV_MIN -> true; + default -> false; + }; } return false; } diff --git a/server/src/main/java/org/elasticsearch/index/mapper/NumberFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/NumberFieldMapper.java index 434e74fefb90a..80c32945e2d82 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/NumberFieldMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/NumberFieldMapper.java @@ -45,9 +45,12 @@ import org.elasticsearch.index.fielddata.plain.SortedDoublesIndexFieldData; import org.elasticsearch.index.fielddata.plain.SortedNumericIndexFieldData; import org.elasticsearch.index.mapper.TimeSeriesParams.MetricType; +import org.elasticsearch.index.mapper.blockloader.BlockLoaderFunctionConfig; import org.elasticsearch.index.mapper.blockloader.docvalues.DoublesBlockLoader; import org.elasticsearch.index.mapper.blockloader.docvalues.IntsBlockLoader; import org.elasticsearch.index.mapper.blockloader.docvalues.LongsBlockLoader; +import org.elasticsearch.index.mapper.blockloader.docvalues.MvMaxIntsFromDocValuesBlockLoader; +import org.elasticsearch.index.mapper.blockloader.docvalues.MvMinIntsFromDocValuesBlockLoader; import org.elasticsearch.index.query.SearchExecutionContext; import org.elasticsearch.script.DoubleFieldScript; import org.elasticsearch.script.LongFieldScript; @@ -488,6 +491,16 @@ BlockLoader blockLoaderFromFallbackSyntheticSource( ) { return floatingPointBlockLoaderFromFallbackSyntheticSource(this, fieldName, nullValue, coerce, blContext); } + + @Override + BlockLoader blockLoaderFromDocValuesMvMin(String fieldName) { + throw new UnsupportedOperationException("coming in 137820"); + } + + @Override + BlockLoader blockLoaderFromDocValuesMvMax(String fieldName) { + throw new UnsupportedOperationException("coming in 137820"); + } }, FLOAT("float", NumericType.FLOAT) { @Override @@ -682,6 +695,16 @@ BlockLoader blockLoaderFromFallbackSyntheticSource( ) { return floatingPointBlockLoaderFromFallbackSyntheticSource(this, fieldName, nullValue, coerce, blContext); } + + @Override + BlockLoader blockLoaderFromDocValuesMvMin(String fieldName) { + throw new UnsupportedOperationException("coming in 137820"); + } + + @Override + BlockLoader blockLoaderFromDocValuesMvMax(String fieldName) { + throw new UnsupportedOperationException("coming in 137820"); + } }, DOUBLE("double", NumericType.DOUBLE) { @Override @@ -842,6 +865,16 @@ BlockLoader blockLoaderFromFallbackSyntheticSource( ) { return floatingPointBlockLoaderFromFallbackSyntheticSource(this, fieldName, nullValue, coerce, blContext); } + + @Override + BlockLoader blockLoaderFromDocValuesMvMin(String fieldName) { + throw new UnsupportedOperationException("coming in 137820"); + } + + @Override + BlockLoader blockLoaderFromDocValuesMvMax(String fieldName) { + throw new UnsupportedOperationException("coming in 137820"); + } }, BYTE("byte", NumericType.BYTE) { @Override @@ -971,6 +1004,16 @@ BlockLoader blockLoaderFromFallbackSyntheticSource( return integerBlockLoaderFromFallbackSyntheticSource(this, fieldName, nullValue, coerce, blContext); } + @Override + BlockLoader blockLoaderFromDocValuesMvMin(String fieldName) { + return new MvMinIntsFromDocValuesBlockLoader(fieldName); + } + + @Override + BlockLoader blockLoaderFromDocValuesMvMax(String fieldName) { + return new MvMaxIntsFromDocValuesBlockLoader(fieldName); + } + private boolean isOutOfRange(Object value) { double doubleValue = objectToDouble(value); return doubleValue < Byte.MIN_VALUE || doubleValue > Byte.MAX_VALUE; @@ -1099,6 +1142,16 @@ BlockLoader blockLoaderFromFallbackSyntheticSource( return integerBlockLoaderFromFallbackSyntheticSource(this, fieldName, nullValue, coerce, blContext); } + @Override + BlockLoader blockLoaderFromDocValuesMvMin(String fieldName) { + return new MvMinIntsFromDocValuesBlockLoader(fieldName); + } + + @Override + BlockLoader blockLoaderFromDocValuesMvMax(String fieldName) { + return new MvMaxIntsFromDocValuesBlockLoader(fieldName); + } + private boolean isOutOfRange(Object value) { double doubleValue = objectToDouble(value); return doubleValue < Short.MIN_VALUE || doubleValue > Short.MAX_VALUE; @@ -1300,6 +1353,16 @@ BlockLoader blockLoaderFromFallbackSyntheticSource( ) { return integerBlockLoaderFromFallbackSyntheticSource(this, fieldName, nullValue, coerce, blContext); } + + @Override + BlockLoader blockLoaderFromDocValuesMvMin(String fieldName) { + return new MvMinIntsFromDocValuesBlockLoader(fieldName); + } + + @Override + BlockLoader blockLoaderFromDocValuesMvMax(String fieldName) { + return new MvMaxIntsFromDocValuesBlockLoader(fieldName); + } }, LONG("long", NumericType.LONG) { @Override @@ -1482,6 +1545,16 @@ public Builder builder(BlockFactory factory, int expectedCount) { }; } + @Override + BlockLoader blockLoaderFromDocValuesMvMin(String fieldName) { + throw new UnsupportedOperationException("coming in 137820"); + } + + @Override + BlockLoader blockLoaderFromDocValuesMvMax(String fieldName) { + throw new UnsupportedOperationException("coming in 137820"); + } + private boolean isOutOfRange(Object value) { if (value instanceof Long) { return false; @@ -1759,6 +1832,10 @@ abstract BlockLoader blockLoaderFromFallbackSyntheticSource( MappedFieldType.BlockLoaderContext blContext ); + abstract BlockLoader blockLoaderFromDocValuesMvMin(String fieldName); + + abstract BlockLoader blockLoaderFromDocValuesMvMax(String fieldName); + // All values that fit into integer are returned as integers private static BlockLoader integerBlockLoaderFromFallbackSyntheticSource( NumberType type, @@ -2014,7 +2091,15 @@ public Function pointReaderIfPossible() { @Override public BlockLoader blockLoader(BlockLoaderContext blContext) { if (hasDocValues() && (blContext.fieldExtractPreference() != FieldExtractPreference.STORED || isSyntheticSource)) { - return type.blockLoaderFromDocValues(name()); + BlockLoaderFunctionConfig cfg = blContext.blockLoaderFunctionConfig(); + if (cfg == null) { + return type.blockLoaderFromDocValues(name()); + } + return switch (cfg.function()) { + case MV_MAX -> type.blockLoaderFromDocValuesMvMax(name()); + case MV_MIN -> type.blockLoaderFromDocValuesMvMin(name()); + default -> throw new UnsupportedOperationException("unknown fusion config [" + cfg.function() + "]"); + }; } // Multi fields don't have fallback synthetic source. @@ -2029,6 +2114,27 @@ public BlockLoader blockLoader(BlockLoaderContext blContext) { return type.blockLoaderFromSource(sourceValueFetcher(blContext.sourcePaths(name()), blContext.indexSettings()), lookup); } + @Override + public boolean supportsBlockLoaderConfig(BlockLoaderFunctionConfig config, FieldExtractPreference preference) { + if (hasDocValues() && (preference != FieldExtractPreference.STORED || isSyntheticSource)) { + if (config == null) { + return true; + } + if (switch (type) { + case HALF_FLOAT, FLOAT, DOUBLE, LONG -> true; + default -> false; + }) { + // removed in 137820 + return false; + } + return switch (config.function()) { + case MV_MAX, MV_MIN -> true; + default -> false; + }; + } + return false; + } + @Override public IndexFieldData.Builder fielddataBuilder(FieldDataContext fieldDataContext) { FielddataOperation operation = fieldDataContext.fielddataOperation(); diff --git a/server/src/main/java/org/elasticsearch/index/mapper/blockloader/BlockLoaderFunctionConfig.java b/server/src/main/java/org/elasticsearch/index/mapper/blockloader/BlockLoaderFunctionConfig.java index 87135b34cfef5..cf08d98fa700e 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/blockloader/BlockLoaderFunctionConfig.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/blockloader/BlockLoaderFunctionConfig.java @@ -24,9 +24,13 @@ public interface BlockLoaderFunctionConfig { */ Function function(); + record JustFunction(Function function) implements BlockLoaderFunctionConfig {} + record JustWarnings(Function function, Warnings warnings) implements BlockLoaderFunctionConfig {} enum Function { + MV_MAX, + MV_MIN, LENGTH, V_COSINE, V_DOT_PRODUCT, diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/PushExpressionToLoadIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/PushExpressionToLoadIT.java index f7bee9e55461e..546c8b87accae 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/PushExpressionToLoadIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/PushExpressionToLoadIT.java @@ -83,6 +83,11 @@ public void testLengthToKeyword() throws IOException { ); } + /** + * We don't support fusing {@code LENGTH} into loading {@code wildcard} fields because + * we haven't written support for fusing functions to loading from its source format. + * We haven't done that because {@code wildcard} fields aren't super common. + */ public void testLengthNotPushedToWildcard() throws IOException { String value = "v".repeat(between(0, 256)); test( @@ -94,6 +99,12 @@ public void testLengthNotPushedToWildcard() throws IOException { ); } + /** + * We don't support fusing {@code LENGTH} into loading {@code text} fields because + * we haven't written support for fusing functions to loading from {@code _source}. + * Usually folks that want to go superfast will use doc values. But those aren't + * even available for {@code text} fields. + */ public void testLengthNotPushedToText() throws IOException { String value = "v".repeat(between(0, 256)); test( @@ -151,6 +162,126 @@ public void testLengthNotPushedToLookupJoinKeywordSameName() throws IOException ); } + public void testMvMinToKeyword() throws IOException { + String min = "a".repeat(between(1, 256)); + String max = "b".repeat(between(1, 256)); + test( + justType("keyword"), + b -> b.startArray("test").value(min).value(max).endArray(), + "| EVAL test = MV_MIN(test)", + matchesList().item(min), + matchesMap().entry("test:column_at_a_time:MvMinBytesRefsFromOrds.SortedSet", 1) + ); + } + + public void testMvMinToIp() throws IOException { + String min = "192.168.0." + between(0, 255); + String max = "192.168.3." + between(0, 255); + test( + justType("ip"), + b -> b.startArray("test").value(min).value(max).endArray(), + "| EVAL test = MV_MIN(test)", + matchesList().item(min), + matchesMap().entry("test:column_at_a_time:MvMinBytesRefsFromOrds.SortedSet", 1) + ); + } + + public void testMvMinToByte() throws IOException { + int min = between(Byte.MIN_VALUE, Byte.MAX_VALUE - 10); + int max = between(min + 1, Byte.MAX_VALUE); + test( + justType("byte"), + b -> b.startArray("test").value(min).value(max).endArray(), + "| EVAL test = MV_MIN(test)", + matchesList().item(min), + matchesMap().entry("test:column_at_a_time:MvMinIntsFromDocValues.Sorted", 1) + ); + } + + public void testMvMinToShort() throws IOException { + int min = between(Short.MIN_VALUE, Short.MAX_VALUE - 10); + int max = between(min + 1, Short.MAX_VALUE); + test( + justType("short"), + b -> b.startArray("test").value(min).value(max).endArray(), + "| EVAL test = MV_MIN(test)", + matchesList().item(min), + matchesMap().entry("test:column_at_a_time:MvMinIntsFromDocValues.Sorted", 1) + ); + } + + public void testMvMinToInt() throws IOException { + int min = between(Integer.MIN_VALUE, Integer.MAX_VALUE - 10); + int max = between(min + 1, Integer.MAX_VALUE); + test( + justType("integer"), + b -> b.startArray("test").value(min).value(max).endArray(), + "| EVAL test = MV_MIN(test)", + matchesList().item(min), + matchesMap().entry("test:column_at_a_time:MvMinIntsFromDocValues.Sorted", 1) + ); + } + + public void testMvMaxToKeyword() throws IOException { + String min = "a".repeat(between(1, 256)); + String max = "b".repeat(between(1, 256)); + test( + justType("keyword"), + b -> b.startArray("test").value(min).value(max).endArray(), + "| EVAL test = MV_MAX(test)", + matchesList().item(max), + matchesMap().entry("test:column_at_a_time:MvMaxBytesRefsFromOrds.SortedSet", 1) + ); + } + + public void testMvMaxToIp() throws IOException { + String min = "192.168.0." + between(0, 255); + String max = "192.168.3." + between(0, 255); + test( + justType("ip"), + b -> b.startArray("test").value(min).value(max).endArray(), + "| EVAL test = MV_MAX(test)", + matchesList().item(max), + matchesMap().entry("test:column_at_a_time:MvMaxBytesRefsFromOrds.SortedSet", 1) + ); + } + + public void testMvMaxToByte() throws IOException { + int min = between(Byte.MIN_VALUE, Byte.MAX_VALUE - 10); + int max = between(min + 1, Byte.MAX_VALUE); + test( + justType("byte"), + b -> b.startArray("test").value(min).value(max).endArray(), + "| EVAL test = MV_MAX(test)", + matchesList().item(max), + matchesMap().entry("test:column_at_a_time:MvMaxIntsFromDocValues.Sorted", 1) + ); + } + + public void testMvMaxToShort() throws IOException { + int min = between(Short.MIN_VALUE, Short.MAX_VALUE - 10); + int max = between(min + 1, Short.MAX_VALUE); + test( + justType("short"), + b -> b.startArray("test").value(min).value(max).endArray(), + "| EVAL test = MV_MAX(test)", + matchesList().item(max), + matchesMap().entry("test:column_at_a_time:MvMaxIntsFromDocValues.Sorted", 1) + ); + } + + public void testMvMaxToInt() throws IOException { + int min = between(Integer.MIN_VALUE, Integer.MAX_VALUE - 10); + int max = between(min + 1, Integer.MAX_VALUE); + test( + justType("integer"), + b -> b.startArray("test").value(min).value(max).endArray(), + "| EVAL test = MV_MAX(test)", + matchesList().item(max), + matchesMap().entry("test:column_at_a_time:MvMaxIntsFromDocValues.Sorted", 1) + ); + } + public void testVCosine() throws IOException { test( justType("dense_vector"), diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/blockloader/BlockLoaderExpression.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/blockloader/BlockLoaderExpression.java index 9325c3fec9032..32b2afdf281d6 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/blockloader/BlockLoaderExpression.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/blockloader/BlockLoaderExpression.java @@ -9,7 +9,12 @@ import org.elasticsearch.compute.data.Block; import org.elasticsearch.core.Nullable; +import org.elasticsearch.index.mapper.BlockLoader; +import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.blockloader.BlockLoaderFunctionConfig; +import org.elasticsearch.index.mapper.blockloader.docvalues.MvMaxBytesRefsFromOrdsBlockLoader; +import org.elasticsearch.index.mapper.blockloader.docvalues.MvMaxIntsFromDocValuesBlockLoader; +import org.elasticsearch.index.mapper.blockloader.docvalues.Utf8CodePointsFromOrdsBlockLoader; import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; import org.elasticsearch.xpack.esql.stats.SearchStats; @@ -43,6 +48,180 @@ * multivalued fields. * * + *

How to implement

+ *
    + *
  1. Implement some block loaders
  2. + *
  3. Unit test the block loaders
  4. + *
  5. Plug the {@link BlockLoader} into the {@link MappedFieldType#blockLoader field mapper}
  6. + *
  7. Implement this interface
  8. + *
  9. Add to {@code PushExpressionToLoadIT}
  10. + *
  11. Maybe add to {@code csv-spec} tests
  12. + *
  13. Get some performance numbers and open a PR
  14. + *
+ *

Implement some block loaders

+ *

+ * Implement a {@link BlockLoader} for each fused code path. There's + * going to be a {@linkplain BlockLoader} per + * {@code x x }. Examples: + *

+ *
    + *
  1. + * {@link Utf8CodePointsFromOrdsBlockLoader} is for {@code LENGTH x keyword x docValues}. + *
  2. + *
  3. + * {@link MvMaxIntsFromDocValuesBlockLoader} is for {@code MV_MAX x int x docValues}. + *
  4. + *
  5. + * {@link MvMaxBytesRefsFromOrdsBlockLoader} is for {@code MV_MAX x (keyword|ip) x doc_values}. + *
  6. + *
+ *

+ * If you wanted to push all loads for a function applied + * to a field type you'd need to optimize all paths which could include: + *

+ *
    + *
  1. {@code doc_values}
  2. + *
  3. {@code stored}
  4. + *
  5. {@code _source}
  6. + *
  7. Funky synthetic {@code _source} cases
  8. + *
  9. Using the search index
  10. + *
+ *

+ * Unless you have a good reason to do otherwise, it's generally fine to start with + * {@code doc_values}. And it might be fine to only implement this fusion + * for {@code doc_values}. Usually, loading {@code stored} fields + * and loading from {@code _source} is so slow that this optimization won't buy you + * much speed proportionally. But this is only a rule of thumb. + * The first extraction push down we implemented violates the rule! It was directly + * to the search index for vector fields. + *

+ *

+ * Note: The {@link Object#toString}s are important in these classes. We expose them + * over the {@code profile} API and use them for tests later on. + *

+ *

Unit test the block loaders

+ *

+ * Build a randomized unit test that + *

+ *
    + *
  1. loads random data
  2. + *
  3. loads using both your new {@link BlockLoader} and the non-fused loader
  4. + *
  5. compares the results
  6. + *
+ *

+ * See the test for {@link Utf8CodePointsFromOrdsBlockLoader} for an example. These tests + * are usually quite parameterized to make sure we cover things like: + *

+ *
    + *
  • + * loading column-at-a-time ({@link BlockLoader.ColumnAtATimeReader#read}) vs + * loading row-by-row ({@link BlockLoader.RowStrideReader#read}) + *
  • + *
  • high vs load cardinality values
  • + *
  • all documents have only single-valued fields vs some have multivalued fields
  • + *
  • some documents are missing values
  • + *
+ *

+ * These unit tests cover a ton of different configurations quickly, and we + * know that we're using the loader. + *

+ *

Plug the {@link BlockLoader} into the {@link MappedFieldType#blockLoader field mapper}

+ *

+ * You must implement: + *

+ *
    + *
  • + * {@link MappedFieldType#supportsBlockLoaderConfig} to control if + * the field is pushed down. Return {@code true} for the configurations that match + * your {@link BlockLoader}s. + *
  • + *
  • + * {@link MappedFieldType#blockLoader} to control how the field + * is pushed down. Return your {@link BlockLoader}s here. + *
  • + *
+ *

Implement this interface

+ *

+ * Implement {@link BlockLoaderExpression}. Generally it's enough to check that + * check if the function is being applied to a {@link FieldAttribute} and do something + * like: + *

+ *
{@code
+ *         if (field instanceof FieldAttribute f && f.dataType() == DataType.KEYWORD) {
+ *             return new PushedBlockLoaderExpression(f, BlockLoaderFunctionConfig.Function.WHATEVER);
+ *         }
+ *         return null;
+ * }
+ *

+ * The rules system will check {@link MappedFieldType#supportsBlockLoaderConfig} for you. + * See the docs for {@link #tryPushToFieldLoading} for more on how to implement it. + *

+ *

Add to {@code PushExpressionToLoadIT}

+ *

+ * Add a case or two to {@code PushExpressionToLoadIT} to prove that we've plugged + * everything in properly. These tests make sure that we're really loading the data + * really using your new {@linkplain BlockLoader}. This is where your nice + * {@link Object#toString}s come into play. That's the key into the profile map that + * shows that your new {@linkplain BlockLoader} is plugged in. + *

+ *

Maybe add to {@code csv-spec} tests

+ *

+ * Look for your function in the csv-spec tests and make sure there are cases that + * contain your function processing each data type you are pushing. For each type, + * make sure the function processes the results of + *

+ *
    + *
  • {@code ROW} - these won't use your new code
  • + *
  • {@code FROM} - these will use your new code
  • + *
  • {@code STATS} or another function - these won't use your new code
  • + *
+ *

+ * It's fairly likely we already have tests for all these cases. + * They are part of our standard practice for adding functions, but there are a lot + * of them, and we may have forgotten some. And, without the pushdown you are + * implementing, they are mostly there for healthy paranoia around rules and + * a hedge against mistakes implementing optimizations in the future. Like the + * optimization you are implementing now! + *

+ *

+ * Anyway, once there are plenty of these tests you should run them via the ESQL + * unit tests and via the single-node integration tests. These tests don't prove + * that your new {@linkplain BlockLoader}s are plugged in. You have + * {@code PushExpressionToLoadIT} for that. Instead, they prove that, when your + * new {@linkplain BlockLoader} is plugged in, it produces + * correct output. So, just like your unit test, but integrated with the entire + * rest of the world. + *

+ *

Get some performance numbers and open a PR

+ *

+ * Now that you can be pretty sure everything is plugged in and working you can + * get some performance numbers. It's generally good to start with a quick and + * dirty script. + * These should show you a performance improvement, and you can use the + * {@code profile} API as a final proof that everything is plugged in. Once that + * looks right you should generally be ok to open a PR. Attach the results of + * your bash script to prove that it's faster. + *

+ *

+ * Next, look for a rally track + * that should improve with your PR. If you find one, and it's + * in the nightlies already, then you have a choice: + *

+ *
    + *
  • Run the rally tests right now to get better numbers
  • + *
  • Wait for the nightlies to run after merging
  • + *
+ *

+ * If the quick and dirty perf testing looked good you are probably safe waiting on + * the nightlies. You should look for them in + * benchmarks.elastic.co. + *

+ *

+ * If there isn't already a rally track then you should add one like this + * PR. How you add + * one of these and how you get it into the nightlies and whether it should be in + * the nightlies is outside the scope of this document. + *

*/ public interface BlockLoaderExpression { /** @@ -66,5 +245,9 @@ public interface BlockLoaderExpression { * @param field the field whose load we're fusing into * @param config the expression's configuration */ - record PushedBlockLoaderExpression(FieldAttribute field, BlockLoaderFunctionConfig config) {} + record PushedBlockLoaderExpression(FieldAttribute field, BlockLoaderFunctionConfig config) { + public PushedBlockLoaderExpression(FieldAttribute field, BlockLoaderFunctionConfig.Function function) { + this(field, new BlockLoaderFunctionConfig.JustFunction(function)); + } + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/multivalue/MvMax.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/multivalue/MvMax.java index e25d1662f9cae..98b79ca438ad0 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/multivalue/MvMax.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/multivalue/MvMax.java @@ -13,14 +13,18 @@ import org.elasticsearch.compute.ann.MvEvaluator; import org.elasticsearch.compute.operator.EvalOperator; import org.elasticsearch.compute.operator.EvalOperator.ExpressionEvaluator; +import org.elasticsearch.index.mapper.blockloader.BlockLoaderFunctionConfig; import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException; import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; import org.elasticsearch.xpack.esql.core.tree.NodeInfo; import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.expression.function.Example; import org.elasticsearch.xpack.esql.expression.function.FunctionInfo; import org.elasticsearch.xpack.esql.expression.function.Param; +import org.elasticsearch.xpack.esql.expression.function.blockloader.BlockLoaderExpression; import org.elasticsearch.xpack.esql.planner.PlannerUtils; +import org.elasticsearch.xpack.esql.stats.SearchStats; import java.io.IOException; import java.util.List; @@ -31,7 +35,7 @@ /** * Reduce a multivalued field to a single valued field containing the maximum value. */ -public class MvMax extends AbstractMultivalueFunction { +public class MvMax extends AbstractMultivalueFunction implements BlockLoaderExpression { public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "MvMax", MvMax::new); @FunctionInfo( @@ -129,4 +133,12 @@ static long process(long current, long v) { static int ascendingIndex(int count) { return count - 1; } + + @Override + public PushedBlockLoaderExpression tryPushToFieldLoading(SearchStats stats) { + if (field instanceof FieldAttribute f) { + return new PushedBlockLoaderExpression(f, BlockLoaderFunctionConfig.Function.MV_MAX); + } + return null; + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/multivalue/MvMin.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/multivalue/MvMin.java index 75590d5d8b43a..791ab7c4b2301 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/multivalue/MvMin.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/multivalue/MvMin.java @@ -13,14 +13,18 @@ import org.elasticsearch.compute.ann.MvEvaluator; import org.elasticsearch.compute.operator.EvalOperator; import org.elasticsearch.compute.operator.EvalOperator.ExpressionEvaluator; +import org.elasticsearch.index.mapper.blockloader.BlockLoaderFunctionConfig; import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException; import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; import org.elasticsearch.xpack.esql.core.tree.NodeInfo; import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.expression.function.Example; import org.elasticsearch.xpack.esql.expression.function.FunctionInfo; import org.elasticsearch.xpack.esql.expression.function.Param; +import org.elasticsearch.xpack.esql.expression.function.blockloader.BlockLoaderExpression; import org.elasticsearch.xpack.esql.planner.PlannerUtils; +import org.elasticsearch.xpack.esql.stats.SearchStats; import java.io.IOException; import java.util.List; @@ -31,7 +35,7 @@ /** * Reduce a multivalued field to a single valued field containing the minimum value. */ -public class MvMin extends AbstractMultivalueFunction { +public class MvMin extends AbstractMultivalueFunction implements BlockLoaderExpression { public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "MvMin", MvMin::new); @FunctionInfo( @@ -129,4 +133,12 @@ static long process(long current, long v) { static int ascendingIndex(int count) { return 0; } + + @Override + public PushedBlockLoaderExpression tryPushToFieldLoading(SearchStats stats) { + if (field instanceof FieldAttribute f) { + return new PushedBlockLoaderExpression(f, BlockLoaderFunctionConfig.Function.MV_MIN); + } + return null; + } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizerTests.java index e7ad9f0eb69da..749202b1a0e46 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizerTests.java @@ -1644,7 +1644,7 @@ public void testLengthPushdownZoo() { return false; }).toList(), hasSize( - 4 // Should be 1 - fix in https://github.com/elastic/elasticsearch/issues/137679 + 4 // Should be 2 - fix in https://github.com/elastic/elasticsearch/issues/137679 ) ); } From d485863b96a39c4fa72e3ac0deafac1e1a6be5a5 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Tue, 11 Nov 2025 16:53:28 -0500 Subject: [PATCH 03/13] explain --- .../function/blockloader/BlockLoaderExpression.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/blockloader/BlockLoaderExpression.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/blockloader/BlockLoaderExpression.java index 32b2afdf281d6..7b89290f4c2ad 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/blockloader/BlockLoaderExpression.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/blockloader/BlockLoaderExpression.java @@ -27,8 +27,8 @@ *
    *
  • * {@code V_COSINE(vector, [constant_vector])} - vector is ~512 floats - * and V_COSINE is one double. We can find the similarity without any - * copies if we combine. + * and V_COSINE is one double. We can do this without copying the floats + * at all if we push. *
  • *
  • * {@code ST_CENTROID(shape)} - shapes can be quite large. Centroids @@ -44,7 +44,7 @@ * {@code MV_COUNT(anything)} - counts are always integers. *
  • *
  • - * {@code MV_MIN} and {@code MV_MAX} - loads much fewer data for + * {@code MV_MIN} and {@code MV_MAX} - loads a single value instead of * multivalued fields. *
  • *
@@ -217,7 +217,7 @@ * benchmarks.elastic.co. *

*

- * If there isn't already a rally track then you should add one like this + * If there isn't already a rally operation then you should add one like this * PR. How you add * one of these and how you get it into the nightlies and whether it should be in * the nightlies is outside the scope of this document. From 2ff3ca30f00db17e32df700e8699c2e89485bdc4 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Wed, 12 Nov 2025 09:00:02 -0500 Subject: [PATCH 04/13] More --- .../index/mapper/IpFieldMapper.java | 4 +- .../index/mapper/NumberFieldMapper.java | 4 +- .../single_node/PushExpressionToLoadIT.java | 81 ++++++++++++++++++- .../blockloader/BlockLoaderExpression.java | 2 +- 4 files changed, 82 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/mapper/IpFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/IpFieldMapper.java index cf132746af902..b5b7b9f7fa24e 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/IpFieldMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/IpFieldMapper.java @@ -35,8 +35,8 @@ import org.elasticsearch.index.fielddata.plain.SortedSetOrdinalsIndexFieldData; import org.elasticsearch.index.mapper.blockloader.BlockLoaderFunctionConfig; import org.elasticsearch.index.mapper.blockloader.docvalues.BytesRefsFromOrdsBlockLoader; -import org.elasticsearch.index.mapper.blockloader.docvalues.MvMaxBytesRefsFromOrdsBlockLoader; -import org.elasticsearch.index.mapper.blockloader.docvalues.MvMinBytesRefsFromOrdsBlockLoader; +import org.elasticsearch.index.mapper.blockloader.docvalues.fn.MvMaxBytesRefsFromOrdsBlockLoader; +import org.elasticsearch.index.mapper.blockloader.docvalues.fn.MvMinBytesRefsFromOrdsBlockLoader; import org.elasticsearch.index.query.SearchExecutionContext; import org.elasticsearch.script.IpFieldScript; import org.elasticsearch.script.Script; diff --git a/server/src/main/java/org/elasticsearch/index/mapper/NumberFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/NumberFieldMapper.java index 4ee10f0a21996..6c77b4352af3f 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/NumberFieldMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/NumberFieldMapper.java @@ -49,11 +49,11 @@ import org.elasticsearch.index.mapper.blockloader.docvalues.DoublesBlockLoader; import org.elasticsearch.index.mapper.blockloader.docvalues.IntsBlockLoader; import org.elasticsearch.index.mapper.blockloader.docvalues.LongsBlockLoader; +import org.elasticsearch.index.mapper.blockloader.docvalues.fn.MvMaxDoublesFromDocValuesBlockLoader; import org.elasticsearch.index.mapper.blockloader.docvalues.fn.MvMaxIntsFromDocValuesBlockLoader; import org.elasticsearch.index.mapper.blockloader.docvalues.fn.MvMaxLongsFromDocValuesBlockLoader; -import org.elasticsearch.index.mapper.blockloader.docvalues.fn.MvMinIntsFromDocValuesBlockLoader; -import org.elasticsearch.index.mapper.blockloader.docvalues.fn.MvMaxDoublesFromDocValuesBlockLoader; import org.elasticsearch.index.mapper.blockloader.docvalues.fn.MvMinDoublesFromDocValuesBlockLoader; +import org.elasticsearch.index.mapper.blockloader.docvalues.fn.MvMinIntsFromDocValuesBlockLoader; import org.elasticsearch.index.mapper.blockloader.docvalues.fn.MvMinLongsFromDocValuesBlockLoader; import org.elasticsearch.index.query.SearchExecutionContext; import org.elasticsearch.script.DoubleFieldScript; diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/PushExpressionToLoadIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/PushExpressionToLoadIT.java index 897e61a9b6689..b0576014ed145 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/PushExpressionToLoadIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/PushExpressionToLoadIT.java @@ -45,6 +45,7 @@ import static org.elasticsearch.xpack.esql.qa.single_node.RestEsqlIT.commonProfile; import static org.elasticsearch.xpack.esql.qa.single_node.RestEsqlIT.fixTypesOnProfile; import static org.hamcrest.Matchers.any; +import static org.hamcrest.Matchers.closeTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.startsWith; @@ -186,6 +187,42 @@ public void testMvMinToIp() throws IOException { ); } + public void testMvMinToHalfFloat() throws IOException { + double min = randomDouble(); + double max = 1 + randomDouble(); + test( + justType("half_float"), + b -> b.startArray("test").value(min).value(max).endArray(), + "| EVAL test = MV_MIN(test)", + matchesList().item(closeTo(min, .1)), + matchesMap().entry("test:column_at_a_time:MvMinDoublesFromDocValues.Sorted", 1) + ); + } + + public void testMvMinToFloat() throws IOException { + double min = randomDouble(); + double max = 1 + randomDouble(); + test( + justType("float"), + b -> b.startArray("test").value(min).value(max).endArray(), + "| EVAL test = MV_MIN(test)", + matchesList().item(closeTo(min, .1)), + matchesMap().entry("test:column_at_a_time:MvMinDoublesFromDocValues.Sorted", 1) + ); + } + + public void testMvMinToDouble() throws IOException { + double min = randomDouble(); + double max = 1 + randomDouble(); + test( + justType("double"), + b -> b.startArray("test").value(min).value(max).endArray(), + "| EVAL test = MV_MIN(test)", + matchesList().item(min), + matchesMap().entry("test:column_at_a_time:MvMinDoublesFromDocValues.Sorted", 1) + ); + } + public void testMvMinToByte() throws IOException { int min = between(Byte.MIN_VALUE, Byte.MAX_VALUE - 10); int max = between(min + 1, Byte.MAX_VALUE); @@ -226,11 +263,11 @@ public void testMvMinToLong() throws IOException { long min = randomLongBetween(Long.MIN_VALUE, Long.MAX_VALUE - 10); long max = randomLongBetween(min + 1, Long.MAX_VALUE); test( - justType("integer"), + justType("long"), b -> b.startArray("test").value(min).value(max).endArray(), "| EVAL test = MV_MIN(test)", matchesList().item(min), - matchesMap().entry("test:column_at_a_time:MvMinIntsFromDocValues.Sorted", 1) + matchesMap().entry("test:column_at_a_time:MvMinLongsFromDocValues.Sorted", 1) ); } @@ -298,11 +335,47 @@ public void testMvMaxToLong() throws IOException { long min = randomLongBetween(Long.MIN_VALUE, Long.MAX_VALUE - 10); long max = randomLongBetween(min + 1, Long.MAX_VALUE); test( - justType("integer"), + justType("long"), b -> b.startArray("test").value(min).value(max).endArray(), "| EVAL test = MV_MAX(test)", matchesList().item(max), - matchesMap().entry("test:column_at_a_time:MvMaxIntsFromDocValues.Sorted", 1) + matchesMap().entry("test:column_at_a_time:MvMaxLongsFromDocValues.Sorted", 1) + ); + } + + public void testMvMaxToHalfFloat() throws IOException { + double min = randomDouble(); + double max = 1 + randomDouble(); + test( + justType("half_float"), + b -> b.startArray("test").value(min).value(max).endArray(), + "| EVAL test = MV_MAX(test)", + matchesList().item(closeTo(max, .1)), + matchesMap().entry("test:column_at_a_time:MvMaxDoublesFromDocValues.Sorted", 1) + ); + } + + public void testMvMaxToFloat() throws IOException { + double min = randomDouble(); + double max = 1 + randomDouble(); + test( + justType("float"), + b -> b.startArray("test").value(min).value(max).endArray(), + "| EVAL test = MV_MAX(test)", + matchesList().item(closeTo(max, .1)), + matchesMap().entry("test:column_at_a_time:MvMaxDoublesFromDocValues.Sorted", 1) + ); + } + + public void testMvMaxToDouble() throws IOException { + double min = randomDouble(); + double max = 1 + randomDouble(); + test( + justType("double"), + b -> b.startArray("test").value(min).value(max).endArray(), + "| EVAL test = MV_MAX(test)", + matchesList().item(max), + matchesMap().entry("test:column_at_a_time:MvMaxDoublesFromDocValues.Sorted", 1) ); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/blockloader/BlockLoaderExpression.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/blockloader/BlockLoaderExpression.java index 79b0b61f6c30a..11dbd72e69bff 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/blockloader/BlockLoaderExpression.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/blockloader/BlockLoaderExpression.java @@ -13,8 +13,8 @@ import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.blockloader.BlockLoaderFunctionConfig; import org.elasticsearch.index.mapper.blockloader.docvalues.fn.MvMaxBytesRefsFromOrdsBlockLoader; -import org.elasticsearch.index.mapper.blockloader.docvalues.fn.Utf8CodePointsFromOrdsBlockLoader; import org.elasticsearch.index.mapper.blockloader.docvalues.fn.MvMaxLongsFromDocValuesBlockLoader; +import org.elasticsearch.index.mapper.blockloader.docvalues.fn.Utf8CodePointsFromOrdsBlockLoader; import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; import org.elasticsearch.xpack.esql.stats.SearchStats; From 3c464436e5bfdd8e1c9222a94034a7f8de0be3ea Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Thu, 13 Nov 2025 10:09:55 -0500 Subject: [PATCH 05/13] Instructions for BlockLoader --- .../index/mapper/BlockLoader.java | 154 +++++++++++++++++- 1 file changed, 150 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java b/server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java index 8a413a73d57e5..6103cc44896c7 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java @@ -17,6 +17,8 @@ import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; import org.elasticsearch.index.mapper.blockloader.docvalues.BlockDocValuesReader; +import org.elasticsearch.index.mapper.blockloader.docvalues.fn.MvMaxLongsFromDocValuesBlockLoader; +import org.elasticsearch.index.mapper.blockloader.docvalues.fn.MvMinLongsFromDocValuesBlockLoader; import org.elasticsearch.search.fetch.StoredFieldsSpec; import org.elasticsearch.search.lookup.Source; @@ -25,8 +27,131 @@ import java.util.Map; /** - * Interface for loading data in a block shape. Instances of this class - * must be immutable and thread safe. + * Loads values from a chunk of lucene documents into a "Block" for the compute engine. + *

+ * Think of a Block as an array of values for a sequence of lucene documents. That's + * almost true. The compute engine operates on arrays because the + * good folks that build CPUs have spent our entire lives making them really really + * good at running tight loops over arrays of data. So we play along with the CPU + * and make arrays. + *

+ *

How to implement

+ *

+ * There are a lot of interesting choices hiding in here to make getting those arrays + * out of lucene work well: + *

+ *
    + *
  • + * {@code doc_values} are already on disk in array-like structures so we prefer + * to just copy them into an array in one loop inside {@link ColumnAtATimeReader}. + * Well, not entirely array-like. {@code doc_values} are designed to be read in + * non-descending order (think {@code 0, 1, 1, 4, 9}) and will fail if they are + * read truly randomly. This lets the doc values implementations have some + * chunking/compression/magic on top of the array-like on disk structure. The + * caller manages this, always putting {@link Docs} in non-descending order. + * Extend {@link BlockDocValuesReader} to implement all this. + *
  • + *
  • + * All stored {@code stored} fields for each document are stored on disk together, + * compressed with a general purpose compression algorithm like + * Zstd. Blocks of documents are + * compressed together to get a better compression ratio. Just like doc values, + * we read them in ascending order. Unlike doc values, we read all fields for a + * document at once. Because reading one requires decompressing them all. We do + * this by returning {@code null} from {@link BlockLoader#columnAtATimeReader} + * to signal that we can't load the whole column at once. We {@code stored} fields + * only implement a {@link RowStrideReader} which the caller will call once for + * each doc. Extend {@link BlockStoredFieldsReader} to implement all this. + *
  • + *
  • + * Fields loaded from {@code _source} are an extra special case of {@code stored} + * fields. {@code _source} itself is just another stored field, compressed in chunks + * with all other stored fields. But it's the original bytes sent when indexing the + * document. Think {@code json} or {@code yaml}. When we need fields from + * {@code _source} we get it from the stored fields reader infrastructure and then + * explode it into a {@code Map} representing the original json and + * the {@link RowStrideReader} implementation grabs the parts of the json it needs. + * Extend {@link BlockSourceReader}. + *
  • + *
  • + * Synthetic {@code _source} complicates this further by storing fields in somewhat + * unexpected places, but is otherwise like a {@code stored} field reader. Use + * {@link FallbackSyntheticSourceBlockLoader} to implement all this. + *
  • + *
+ *

How many to implement

+ *

+ * Generally reads are faster from {@code doc_values}, slower from {@code stored} fields, + * and even slower from {@code _source}. If we get to chose, we pick {@code doc_values}. + * But we work with what's on disk and that's a product of the field type and what the user's + * configured. Picking the optimal choice given what's on disk is the responsibility of each + * fields' {@link MappedFieldType#blockLoader} method. The more configurable the field's + * storage strategies the more {@link BlockLoader}s you have to implement to integrate it + * with ESQL. It can get to be a lot. Sorry. + *

+ *

+ * For a field to be supported by ESQL fully it has to be loadable if it was configured to be + * stored in any way. It's possible to turn off storage entirely by turning off + * {@code doc_values} and {@code _source} and {@code stored} fields. In that case, it's acceptable + * to return {@link ConstantNullsReader}. They turned the field off, there's nothing you can do. + *

+ *

+ * We also sometimes want to "push" executing some ESQL functions into the block loader itself. + * Usually we do this when it's a ton faster. See the docs for {@code BlockLoaderExpression} + * for why and how we do this. + *

+ *

+ * For example, {@code long} fields implement these block loaders: + *

+ *
    + *
  • + * {@link org.elasticsearch.index.mapper.blockloader.docvalues.LongsBlockLoader} to read + * from {@code doc_values}. + *
  • + *
  • + * {@link org.elasticsearch.index.mapper.BlockSourceReader.LongsBlockLoader} to read from + * {@code _source}. + *
  • + *
  • + * A specially configured {@link FallbackSyntheticSourceBlockLoader} to read synthetic + * {@code _source}. + *
  • + *
  • + * {@link MvMinLongsFromDocValuesBlockLoader} to read {@code MV_MIN(long_field)} from + * {@code doc_values}. + *
  • + *
  • + * {@link MvMaxLongsFromDocValuesBlockLoader} to read {@code MV_MAX(long_field)} from + * {@code doc_values}. + *
  • + *
+ *

+ * NOTE: We can't read from {@code long}s from {@code stored} fields which is a + * bug, but maybe not + * a terrible one because it's very uncommon to configure {@code long} to be {@code stored} + * but to disable {@code _source} and {@code doc_values}. Nothing's perfect. Especially + * code. + *

+ *

Why is {@link AllReader}?

+ *

+ * When we described how to read from {@code doc_values} we said we prefer + * to use {@link ColumnAtATimeReader}. But some callers don't support reading column-at-a-time + * and need to read row-by-row. So we also need an implementation of {@link RowStrideReader} + * that reads from {@code doc_values}. Usually it's most convenient to implement both of those + * in the same {@code class}. {@link AllReader} is an interface for those sorts of classes and + * you'll see it in the {@code doc_values} code frequently. + *

+ *

Thread safety

+ *

+ * Instances of this class must be immutable and thread safe. Instances of + * {@link ColumnAtATimeReader} and {@link RowStrideReader} are all mutable and can only + * be accessed by one thread at a time but may be passed between threads. + * See implementations {@link Reader#canReuse} for how that's handled. "Normal" java objects + * don't need to do anything special to be kicked from thread to thread - the transfer itself + * establishes a {@code happens-before} relationship that makes everything you need visible. + * But Lucene's readers aren't "normal" java objects and sometimes need to be rebuilt if we + * shift threads. + *

*/ public interface BlockLoader { /** @@ -115,10 +240,26 @@ interface StoredFields { Map> storedFields() throws IOException; } + /** + * Build a column-at-a-time reader. May return {@code null} + * if the underlying storage needs to be loaded row-by-row. Callers should try + * this first, only falling back to {@link #rowStrideReader} if this returns + * {@code null} or if they can't load column-at-a-time themselves. + */ + @Nullable ColumnAtATimeReader columnAtATimeReader(LeafReaderContext context) throws IOException; + /** + * Build a row-by-row reader. Must never return {@code null}, + * evan if the underlying storage prefers to be loaded column-at-a-time. Some + * callers simply can't load column-at-a-time so all implementations must support + * this method. + */ RowStrideReader rowStrideReader(LeafReaderContext context) throws IOException; + /** + * What {@code stored} fields are needed by this reader. + */ StoredFieldsSpec rowStrideStoredFieldSpec(); /** @@ -525,8 +666,13 @@ Block buildExponentialHistogramBlockDirect( } /** - * Marker interface for block results. The compute engine has a fleshed - * out implementation. + * A columnar representation of homogenous data. It has a position (row) count, and + * various data retrieval methods for accessing the underlying data that is stored at a given + * position. In other words, a fancy wrapper over an array. + *

+ * This is just a marker interface for these results. The compute engine + * has fleshed out implementations. + *

*/ interface Block extends Releasable {} From f7517f44ada169bad94331c63976d7942ef7de66 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Thu, 13 Nov 2025 10:23:56 -0500 Subject: [PATCH 06/13] Docs --- .../index/mapper/BlockLoader.java | 40 +++++++++++-------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java b/server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java index 6103cc44896c7..ef0bb45eabcef 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java @@ -30,10 +30,10 @@ * Loads values from a chunk of lucene documents into a "Block" for the compute engine. *

* Think of a Block as an array of values for a sequence of lucene documents. That's - * almost true. The compute engine operates on arrays because the - * good folks that build CPUs have spent our entire lives making them really really - * good at running tight loops over arrays of data. So we play along with the CPU - * and make arrays. + * almost true! For the purposes of implementing {@link BlockLoader}, it's close enough. + * The compute engine operates on arrays because the good folks that build CPUs have + * spent the past 40 years making them really really good at running tight loops over + * arrays of data. So we play along with the CPU and make arrays. *

*

How to implement

*

@@ -56,22 +56,22 @@ * compressed with a general purpose compression algorithm like * Zstd. Blocks of documents are * compressed together to get a better compression ratio. Just like doc values, - * we read them in ascending order. Unlike doc values, we read all fields for a + * we read them in non-descending order. Unlike doc values, we read all fields for a * document at once. Because reading one requires decompressing them all. We do * this by returning {@code null} from {@link BlockLoader#columnAtATimeReader} - * to signal that we can't load the whole column at once. We {@code stored} fields - * only implement a {@link RowStrideReader} which the caller will call once for - * each doc. Extend {@link BlockStoredFieldsReader} to implement all this. + * to signal that we can't load the whole column at once. Instead, we implement a + * {@link RowStrideReader} which the caller will call once for each doc. Extend + * {@link BlockStoredFieldsReader} to implement all this. * *

  • * Fields loaded from {@code _source} are an extra special case of {@code stored} * fields. {@code _source} itself is just another stored field, compressed in chunks - * with all other stored fields. But it's the original bytes sent when indexing the + * with all the other stored fields. It's the original bytes sent when indexing the * document. Think {@code json} or {@code yaml}. When we need fields from * {@code _source} we get it from the stored fields reader infrastructure and then - * explode it into a {@code Map} representing the original json and - * the {@link RowStrideReader} implementation grabs the parts of the json it needs. - * Extend {@link BlockSourceReader}. + * explode it into a {@link Map} representing the original {@code json} and + * the {@link RowStrideReader} implementation grabs the parts of the {@code json} + * it needs. Extend {@link BlockSourceReader} to implement all this. *
  • *
  • * Synthetic {@code _source} complicates this further by storing fields in somewhat @@ -85,15 +85,16 @@ * and even slower from {@code _source}. If we get to chose, we pick {@code doc_values}. * But we work with what's on disk and that's a product of the field type and what the user's * configured. Picking the optimal choice given what's on disk is the responsibility of each - * fields' {@link MappedFieldType#blockLoader} method. The more configurable the field's + * field's {@link MappedFieldType#blockLoader} method. The more configurable the field's * storage strategies the more {@link BlockLoader}s you have to implement to integrate it * with ESQL. It can get to be a lot. Sorry. *

    *

    * For a field to be supported by ESQL fully it has to be loadable if it was configured to be * stored in any way. It's possible to turn off storage entirely by turning off - * {@code doc_values} and {@code _source} and {@code stored} fields. In that case, it's acceptable - * to return {@link ConstantNullsReader}. They turned the field off, there's nothing you can do. + * {@code doc_values} and {@code _source} and {@code stored} fields. In that case, it's + * acceptable to return {@link ConstantNullsReader}. User turned the field off, best we can do + * is {@code null}. *

    *

    * We also sometimes want to "push" executing some ESQL functions into the block loader itself. @@ -138,9 +139,16 @@ * to use {@link ColumnAtATimeReader}. But some callers don't support reading column-at-a-time * and need to read row-by-row. So we also need an implementation of {@link RowStrideReader} * that reads from {@code doc_values}. Usually it's most convenient to implement both of those - * in the same {@code class}. {@link AllReader} is an interface for those sorts of classes and + * in the same {@code class}. {@link AllReader} is an interface for those sorts of classes, and * you'll see it in the {@code doc_values} code frequently. *

    + *

    Why is {@link #rowStrideStoredFieldSpec}?

    + *

    + * When decompressing {@code stored} fields lucene can skip stored field that aren't used. They + * still have to be decompressed, but they aren't turned into java objects which saves a fair bit + * of work. If you don't need any stored fields return {@link StoredFieldsSpec#NO_REQUIREMENTS}. + * Otherwise, return what you need. + *

    *

    Thread safety

    *

    * Instances of this class must be immutable and thread safe. Instances of From cd46d6850403e3fb3dc00df33d080ddd3613d423 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Thu, 13 Nov 2025 10:28:52 -0500 Subject: [PATCH 07/13] Update docs/changelog/138029.yaml --- docs/changelog/138029.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/138029.yaml diff --git a/docs/changelog/138029.yaml b/docs/changelog/138029.yaml new file mode 100644 index 0000000000000..977bc60d06daf --- /dev/null +++ b/docs/changelog/138029.yaml @@ -0,0 +1,5 @@ +pr: 138029 +summary: Fuse MV_MIN and MV_MAX and document process +area: ES|QL +type: feature +issues: [] From 4962474142941ccf8e9e59fd736d3391bd0214fd Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Tue, 18 Nov 2025 17:17:00 -0500 Subject: [PATCH 08/13] words --- .../function/scalar/EsqlScalarFunction.java | 290 +++++++++++++++--- 1 file changed, 239 insertions(+), 51 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/EsqlScalarFunction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/EsqlScalarFunction.java index 9c259d2020b9b..fd76147a899cd 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/EsqlScalarFunction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/EsqlScalarFunction.java @@ -7,8 +7,13 @@ package org.elasticsearch.xpack.esql.expression.function.scalar; +import org.elasticsearch.common.time.DateFormatter; +import org.elasticsearch.compute.ann.ConvertEvaluator; +import org.elasticsearch.compute.ann.Evaluator; +import org.elasticsearch.compute.ann.MvEvaluator; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.data.Vector; import org.elasticsearch.compute.lucene.LuceneCountOperator; import org.elasticsearch.compute.operator.topn.TopNOperator; import org.elasticsearch.index.mapper.BlockLoader; @@ -18,7 +23,7 @@ import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.evaluator.mapper.EvaluatorMapper; import org.elasticsearch.xpack.esql.expression.function.blockloader.BlockLoaderExpression; -import org.elasticsearch.xpack.esql.expression.function.vector.VectorSimilarityFunction; +import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushTopNToSource; import java.util.List; @@ -62,63 +67,246 @@ *

    Optimizations

    *

    * Scalars are a huge part of the language, and we have a ton of - * different classes of optimizations for them that exist on a performance spectrum + * different classes of optimizations for them that exist on a performance spectrum: *

    *
    {@code
      *  Better         Load Less and
    - * than O(rows)     Run Faster               Run Faster                 Block-at-a-time     Tuple-at-a-time
    + * than O(rows)     Run Faster               Run Faster                 Page-at-a-time     Tuple-at-a-time
      *     |----------------|-------------------------|------------------------------|-------------------|
    - *     ^  ^  ^     ^    ^      ^                              ^    ^   ^   ^     ^      ^            ^
    - *    CF LT ET    FP   BL     MBL                            SE  SIMD NO  CE   EVAL    EE          CASE
    + *     ^  ^  ^     ^    ^      ^                  ^           ^    ^   ^     ^   ^      ^            ^
    + *    CF LT ET    FP   BL     MBL                SE          NO  SIMD RR    VD EVAL    EVE         CASE
      * }
    + *

    {@code CF}: Constant Folding

    + *
    {@code
    + *   | EVAL a = CONCAT("some ", "words")
    + * }
    + *

    + * The fastest way to run a scalar, now and forever, is to run it at compile time. Turn it + * into a constant and propagate it throughout the query. This is called "constant folding" + * and all scalars, when their arguments are constants, are "folded" to a constant. + *

    + *

    {@code LT}: Lucene's TopN

    + *
    {@code
    + *     FROM index METADATA _score
    + *   | WHERE title:"cat"
    + *   | SORT _score DESC
    + *   | LIMIT 10
    + * }
    + *
    {@code
    + *     FROM index
    + *   | EVAL distance = ST_DISTANCE(point, "POINT(12.5683 55.6761)")
    + *   | SORT distance ASC
    + *   | LIMIT 10
    + * }
    + *

    + * Fundamentally, Lucene is a tuple-at-a-time engine that flows the + * min-competitive + * sort key back into the index iteration process, allowing it to skip huge swaths of + * documents. It has quite a few optimizations that soften the blow of it being + * tuple-at-a-time, so these days "push to a lucene topn" is the fastest way you are going + * to run a scalar function. For that to work it has to be a {@code SORT} key and all the + * filters have to be pushable to lucene and lucene has to know how to run the function + * natively. See {@link PushTopNToSource}. + *

    + *

    {@code ET}: Engine TopN (HYPOTHETICAL)

    + *
    {@code
    + *     FROM index METADATA _score
    + *   | WHERE title:"cat"
    + *   | WHERE a < j + LENGTH(candy) // <--- anything un-pushable
    + *   | SORT _score DESC
    + *   | LIMIT 10
    + * }
    + *

    + * If ESQL's {@link TopNOperator} exposed the min-competitive information (see above), and + * we fed it back into the lucene query operators then we too could do better than + * {@code O(matching_rows)} for queries sorting on the results of a scalar. This is like + * the {@code LT} but without as many limitations. Lucene has a 20-year head start on us + * optimizing TopN, so we should continue to use them when + * See issue. + *

    + *

    {@code BL}: Push to {@link BlockLoader}

    + *
    {@code
    + *     FROM index
    + *   | EVAL s = V_COSINE(dense_vector, [0, 1, 2])
    + *   | SORT s desc
    + *   | LIMIT 10
    + * }
    + *
    {@code
    + *     FROM index
    + *   | STATS SUM(LENGTH(message)) // Length is pushed to the BlockLoader
    + * }
    + *

    + * Some functions can take advantage of the on-disk structures to run very fast and should be + * "fused" into field loading using {@link BlockLoaderExpression}. Functions like {@code V_COSINE} + * can use the vector search index to compute the result. Functions like {@code MV_MIN} can + * use the {@code doc_values} encoding mechanism to save a ton of work. Functions like the + * upcoming {@code ST_SIMPLIFY} benefit from this by saving huge numbers of allocations even + * if they can't link into the {@code doc_values} format. We do this by building a + * {@link BlockLoader} for each {@code FUNCTION x FIELD_TYPE x storage mechanism} combination + * so we can get as much speed as possible. + *

    + *

    {@code MBL}: Push to a "mother ship" {@link BlockLoader} (HYPOTHETICAL)

    + *
    {@code
    + *     FROM index
    + *   | STATS SUM(LENGTH(message)), // All of these are pushed to a single BlockLoader
    + *           SUM(SUBSTRING(message, 0, 4)),
    + *        BY trail = SUBSTRING(message, 10, 3)
    + * }
    + *

    + * Pushing functions to a {@link BlockLoader} can involve building a ton + * of distinct {@link BlockLoader}s. Which involves a ton of code and testing and, well, work. + * But it's worth it if you are applying a single function to a field and every single cycle + * counts. Both of these cry out for a more OO-style solution where you build a "mother ship" + * {@linkplain BlockLoader} that operates on, say {@code FIELD_TYPE x storage mechanism} and + * then runs a list of {@code FUNCTION} operations. In some cases this is a bad idea, which + * is why we haven't built it yet. But in plenty of cases it's fine. And, sometimes, we should + * be fine skipping the special purpose block loader in favor of the mother ship. We'd spent + * a few more cycles on each load, but the maintenance advantage is likely worth it for some + * functions. + *

    + *

    {@code EVAL}: Page-at-a-time evaluation

    + *

    + * ESQL evaluates whole pages at once, generally walking a couple of arrays in parallel building + * a result array. This makes which bits are the "hot path" very obvious - they are the loops + * that walk these arrays. We put the "slower" stuff outside those loops: + *

    + *
      + *
    • scratch allocations
    • + *
    • profiling
    • + *
    + *

    {@code VD}: Vector Dispatch

    + *

    + * In Elasticsearch it's normal for fields to sometimes be {@code null} or multivalued. + * There are no constraints on the schema preventing this and, as a search engine, it's + * pretty normal to model things as multivalued fields. We rarely know that a field can + * only be single-valued when we're planning a query. + *

    + *

    + * It's much faster to run a scalar when we know that all of its inputs + * are single valued and non-null. So every scalar function that uses the code generation + * keyed by the {@link Evaluator}, {@link ConvertEvaluator}, and {@link MvEvaluator} + * annotations builds two paths: + *

    *
      - *
    • - * {@code CF}: The fastest way to run a scalar, now and forever, is to run it at compile time, - * turn it into a constant, and propagate this throughout the query. This is called - * "constant folding" and all scalars, when their arguments are constants, are "folded" - * to a constant. Those - *
    • - *
    • - * {@code LT}: Run the TopN operation in Lucene. Fundamentally, lucene is a tuple-at-a-time - * engine that flows the min-competitive - * sort key back into the index iteration process, allowing it to skip huge swaths - * of documents. It has quite a few optimizations that soften the blow of it being - * tuple-at-a-time, so these days "push to a lucene topn" is the fastest way you - * are going to run a scalar function. For that to work it has to be a {@code SORT} - * key and all the filters have to be pushable to lucene and lucene has to know - * how to run the function natively. - *
    • - *
    • - * {@code ET}: (HYPOTHETICAL) If ESQL's {@link TopNOperator} knew - * how to expose min-competitive information, and we fed it back into the lucene - * query operators then we too could do better than {@code O(matching_rows)} for - * queries sorting on the results of a scalar. This is like the {@code LT} but - * without as many limitations. - * See issue. - *
    • - *
    • - * {@code FP}: Some functions can be translated into a sequence of filters and pushed - * to lucene. If you are just counting documents then this can use - * the {@link LuceneCountOperator} which can count the number of matching documents - * directly from the cache, technically being faster than - * {@code O(num_hits)}, but only in ideal circumstances. If we can't push the count - * then it's still very very fast. See PR. - *
    • - *
    • - * {@code BL}: Some functions can take advantage of the on-disk structures to run - * very fast and should be "fused" into field loading using {@link BlockLoaderExpression}. - * This is also helpful for functions that throw out a bunch of data, especially if they - * can use the on disk storage to do so efficiently ({@code MV_MIN}) or if the data being - * loaded is super, ultra big ({@code ST_SIMPLIFY}). All of the {@link VectorSimilarityFunction}s - * are wonderful candidates for this because vectors are large, and they - * can use the search index to avoid loading the vectors into memory at all. This requires - * a {@link BlockLoader} for each {@code FUNCTION x FIELD_TYPE x storage mechanism} - *
    • - *
    • - * {@code MBL}: (HYPOTHETICAL) If we want to fuse many functions to - * loading the same field we're almost certainly - *
    • + *
    • The slower "{@link Block}" path that supports {@code null}s and multivalued fields
    • + *
    • The faster "{@link Vector}" path that supports only single-valued, non-{@code null} fields
    • *
    + *

    {@code NO}: Native Ordinal Evaluation

    + *
    {@code
    + *     FROM index
    + *   | STATS MAX(foo) BY TO_UPPER(verb)
    + * }
    + *

    + * {@code keyword} and {@code ip} fields load their {@code byte[]} shaped values as a + * lookup table, called "ordinals" because Lucene uses that word for it. Some of our functions, + * like {@code TO_UPPER}, process the lookup table itself instead of processing each position. + * This is especially important when grouping on the field because the hashing done by the + * aggregation code also operates on the lookup table. + *

    + *

    {@code SE}: Sorted Execution

    + *
    {@code
    + *     FROM index
    + *   | STATS SUM(MV_DEDUPE(file_size))
    + * }
    + *

    + * Some functions can operate on multivalued fields much faster if their inputs are sorted. And + * inputs loaded from {@code doc_values} are sorted by default. Sometimes even sorted AND + * deduplicated. We store this information on each block in {@link Block.MvOrdering}. + *

    + *

    + * NOTE: Functions that can take advantage of this sorting also tend to be NOOPs for + * single-valued inputs. So they benefit hugely from "Vector Dispatch". + *

    + *

    {@code SIMD}: Single Instruction Multiple Data instructions

    + *
    {@code
    + *     FROM index
    + *   | STATS MAX(lhs + rhs)
    + * }
    + *

    + * Through a combination of "Page-at-a-time evaluation", and "Vector Dispatch" we often + * end up with at least one path that can be turned into a sequence of + * SIMD + * instructions. These are about as fast as you can go and still be `O(matching_rows)`. + * A lot of scalars don't lend themselves perfectly to SIMD, but we make sure those + * that do can take that route. + *

    + *

    {@code RR}: Range Rewrite

    + *
    {@code
    + *     FROM index
    + *   | STATS COUNT(*) BY DATE_TRUNC(1 DAY, @timestamp)
    + * }
    + *

    + * Functions like {@code DATE_TRUNC} can be quite slow, especially when they are using a + * time zone. It can be much faster if it knows the range of dates that it's operating on. + * And we do know that on the data node! We use that information to rewrite the possibly-slow + * {@code DATE_TRUNC} to the always fast {@code ROUND_TO}, which rounds down to fixed rounding + * points. + *

    + *

    + * At the moment this is only done for {@code DATE_TRUNC} which is a very common function, + * but is technically possible for anything that could benefit from knowing the range up front. + *

    + *

    {@code FP}: Filter Pushdown

    + *
    {@code
    + *     FROM index
    + *   | STATS COUNT(*) BY DATE_TRUNC(1 DAY, @timestamp)
    + * }
    + *

    + * If the "Range Rewrite" optimization works, we can sometimes further push the resulting + * {@code ROUND_TO} into a sequence of filters. If you are just counting + * documents then this can use the {@link LuceneCountOperator} which can count the number of + * matching documents directly from the cache, technically being faster than + * {@code O(num_hits)}, but only in ideal circumstances. If we can't push the count then it's + * still very very fast. See PR. + *

    + *

    {@code EVE}: Expensive Variable Evaluator

    + *
    {@code
    + *     FROM index
    + *   | EVAL ts = DATE_PARSE(SUBSTRING(message, 1, 10), date_format_from_the_index)
    + * }
    + *

    + * Functions like {@code DATE_PARSE} need to build something "expensive" per input row, like + * a {@link DateFormatter}. But, often, the expensive thing is constant. In the example above + * the date format comes from the index, but that's quite contrived. These functions generally + * run in the form: + *

    + *
    {@code
    + *     FROM index
    + *   | EVAL ts = DATE_PARSE(SUBSTRING(message, 1, 10), "ISO8601")
    + * }
    + *

    + * These generally have special case evaluators that don't construct the format for each row. + * The others are "expensive variable evaluators" and we avoid them when we can. + *

    + *

    {@code CASE}: {@code CASE} is evaluated row-by-row

    + *
    {@code
    + *     FROM index
    + *   | EVAL f = CASE(d > 0, n / d, 0)
    + * }
    + *
    {@code
    + *     FROM index
    + *   | EVAL f = COALESCE(d, 1 / j)
    + * }
    + *

    + * {@code CASE} and {@code COALESCE} short circuit. In the top example above, that + * means we don't run {@code n / d} unless {@code d > 0}. That prevents us from + * emitting warnings for dividing by 0. In the second example, we don't run {@code 1 / j} + * unless {@code d} is null. In the worst case, we manage this by running row-by-row + * which is super slow. Especially because the engine was designed + * for page-at-a-time execution. + *

    + *

    + * In the best case {@code COALESCE} can see that an input is either all-null or + * all-non-null. Then it never falls back to row-by-row evaluation and is quite fast. + *

    + *

    + * {@code CASE} has a similar optimization: For each incoming {@link Page}, if the + * condition evaluates to a constant, then it executes the corresponding "arm" + * Page-at-a-time. Also! If the "arms" are "fast" and can't throw warnings, then + * {@code CASE} can execute "eagerly" - evaluating all three arguments and just + * plucking values back and forth. The "eager" {@code CASE} evaluator is effectively + * the same as any other page-at-a-time evaluator. + *

    */ public abstract class EsqlScalarFunction extends ScalarFunction implements EvaluatorMapper { protected EsqlScalarFunction(Source source) { From 6b9962bc7635e48cd9ee614049e0794de25191c9 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Wed, 19 Nov 2025 14:21:15 -0500 Subject: [PATCH 09/13] Fix merge --- .../blockloader/BlockLoaderExpression.java | 5 ++++- .../xpack/esql/expression/package-info.java | 7 +++++++ .../LocalLogicalPlanOptimizerTests.java | 16 ++++++---------- 3 files changed, 17 insertions(+), 11 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/blockloader/BlockLoaderExpression.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/blockloader/BlockLoaderExpression.java index f734c16fbc9b5..107a15bcb8e35 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/blockloader/BlockLoaderExpression.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/blockloader/BlockLoaderExpression.java @@ -49,7 +49,10 @@ * multivalued fields. *
  • * - * NOCOMMIT see {@link EsqlScalarFunction} for a discussion around when we implement these optimizations + *

    + * See the docs for {@link EsqlScalarFunction} for how this optimization fits in with + * all the other optimizations we've implemented. + *

    *

    How to implement

    *
      *
    1. Implement some block loaders
    2. diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/package-info.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/package-info.java index 388716ab875e2..2ce6017991bbf 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/package-info.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/package-info.java @@ -1,3 +1,10 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + /** * {@link org.elasticsearch.xpack.esql.core.expression.Expression Expressions} process values * to make more values. There are two kinds: diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizerTests.java index 5fd1b4e401659..39ae5323703d8 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizerTests.java @@ -1749,17 +1749,13 @@ public void testLengthPushdownZoo() { assertThat(relation.output(), hasItem(lastNamePushDownAttr)); assertThat(relation.output(), hasItem(firstNamePushDownAttr)); assertThat(relation.output().stream().filter(a -> { - if (a instanceof FieldAttribute fa) { - if (fa.field() instanceof FunctionEsField fef) { - return fef.functionConfig().function() == BlockLoaderFunctionConfig.Function.LENGTH; - } + if (a instanceof FieldAttribute fa) { + if (fa.field() instanceof FunctionEsField fef) { + return fef.functionConfig().function() == BlockLoaderFunctionConfig.Function.LENGTH; } - return false; - }).toList(), - hasSize( - 2 - ) - ); + } + return false; + }).toList(), hasSize(2)); } public void testLengthInStatsTwice() { From 5273bdb0557ba842595dd1d850e5cdd590177aca Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Mon, 24 Nov 2025 12:17:44 -0500 Subject: [PATCH 10/13] More tests --- muted-tests.yml | 3 + .../index/mapper/IpFieldMapper.java | 3 - .../index/mapper/NumberFieldMapper.java | 3 - .../src/main/resources/floats.csv-spec | 139 +++++++++ .../src/main/resources/ints.csv-spec | 279 ++++++++++++++++++ .../src/main/resources/ip.csv-spec | 97 ++++++ .../src/main/resources/string.csv-spec | 28 ++ .../blockloader/BlockLoaderExpression.java | 2 +- .../xpack/esql/stats/SearchContextStats.java | 3 + 9 files changed, 550 insertions(+), 7 deletions(-) diff --git a/muted-tests.yml b/muted-tests.yml index 82d0fbab96821..cb7f518dd5c16 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -438,6 +438,9 @@ tests: - class: org.elasticsearch.xpack.inference.integration.AuthorizationTaskExecutorIT method: testCreatesEisChatCompletion_DoesNotRemoveEndpointWhenNoLongerAuthorized issue: https://github.com/elastic/elasticsearch/issues/138480 +- class: org.elasticsearch.xpack.esql.qa.single_node.GenerativeForkIT + method: test {csv-spec:inlinestats.MvMinMvExpand} + issue: https://github.com/elastic/elasticsearch/issues/137679 # Examples: # diff --git a/server/src/main/java/org/elasticsearch/index/mapper/IpFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/IpFieldMapper.java index 4a9e088ab567a..5d9d5d7ce7582 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/IpFieldMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/IpFieldMapper.java @@ -489,9 +489,6 @@ public BlockLoader blockLoader(BlockLoaderContext blContext) { @Override public boolean supportsBlockLoaderConfig(BlockLoaderFunctionConfig config, FieldExtractPreference preference) { if (hasDocValues() && (preference != FieldExtractPreference.STORED || isSyntheticSource)) { - if (config == null) { - return true; - } return switch (config.function()) { case MV_MAX, MV_MIN -> true; default -> false; diff --git a/server/src/main/java/org/elasticsearch/index/mapper/NumberFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/NumberFieldMapper.java index fbc3fed294dbd..57c4a8db113df 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/NumberFieldMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/NumberFieldMapper.java @@ -2128,9 +2128,6 @@ public BlockLoader blockLoader(BlockLoaderContext blContext) { @Override public boolean supportsBlockLoaderConfig(BlockLoaderFunctionConfig config, FieldExtractPreference preference) { if (hasDocValues() && (preference != FieldExtractPreference.STORED || isSyntheticSource)) { - if (config == null) { - return true; - } return switch (config.function()) { case MV_MAX, MV_MIN -> true; default -> false; diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/floats.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/floats.csv-spec index 1128cbc6f4106..b10fcff51fa84 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/floats.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/floats.csv-spec @@ -692,3 +692,142 @@ null null null ; + +rowMvMin + ROW d=[1.1, 2.2]::DOUBLE +| EVAL d=MV_MIN(d) +; + +d:double +1.1 +; + +rowMvMax + ROW d=[1.1, 2.2]::DOUBLE +| EVAL d=MV_MAX(d) +; + +d:double +2.2 +; + +groupMv + FROM employees +| STATS MIN(emp_no) BY salary_change +| SORT salary_change DESC +| LIMIT 5 +; + +MIN(emp_no):integer | salary_change:double +10009 | null +10040 | 14.74 +10003 | 14.68 +10023 | 14.63 +10065 | 14.44 +; + +groupMvMin + FROM employees +| STATS MIN(emp_no) BY salary_change=MV_MIN(salary_change) +| SORT salary_change DESC +| LIMIT 5 +; + +MIN(emp_no):integer | salary_change:double +10009 | null +10086 | 13.61 +10003 | 12.82 +10015 | 12.4 +10050 | 8.7 +; + +groupMvMax + FROM employees +| STATS MIN(emp_no) BY salary_change=MV_MAX(salary_change) +| SORT salary_change DESC +| LIMIT 5 +; + +MIN(emp_no):integer | salary_change:double +10009 | null +10040 | 14.74 +10003 | 14.68 +10023 | 14.63 +10065 | 14.44 +; + +valuesMvMinDouble + FROM employees +| WHERE emp_no <= 10009 +| EVAL first_letter = SUBSTRING(first_name, 0, 1) +| STATS MV_MIN(VALUES(salary_change)) BY first_letter +| SORT first_letter ASC +; + +MV_MIN(VALUES(salary_change)):double | first_letter:keyword +-3.9 | A +-7.23 | B +-0.35 | C +1.19 | G +-2.14 | K +12.82 | P +-2.92 | S +-7.06 | T +; + +valuesMvMaxDouble + FROM employees +| WHERE emp_no <= 10009 +| EVAL first_letter = SUBSTRING(first_name, 0, 1) +| STATS MV_MAX(VALUES(salary_change)) BY first_letter +| SORT first_letter ASC +; + +MV_MAX(VALUES(salary_change)):double | first_letter:keyword +-3.9 | A +11.17 | B +13.48 | C +1.19 | G +13.07 | K +14.68 | P +12.68 | S +1.99 | T +; + +valuesMvMinFloat + FROM employees +| WHERE emp_no <= 10009 +| EVAL first_letter = SUBSTRING(first_name, 0, 1) +| STATS MV_MIN(VALUES(height.float)) BY first_letter +| SORT first_letter ASC +; + +MV_MIN(VALUES(height.float)):double | first_letter:keyword +1.559999942779541 | A +2.0799999237060547 | B +1.7799999713897705 | C +2.0299999713897705 | G +2.049999952316284 | K +1.8300000429153442 | P +1.850000023841858 | S +1.7000000476837158 | T +; + +valuesMvMaxFloat + FROM employees +| WHERE emp_no <= 10009 +| EVAL first_letter = SUBSTRING(first_name, 0, 1) +| STATS MV_MAX(VALUES(height.float)) BY first_letter +| SORT first_letter ASC +; + +MV_MAX(VALUES(height.float)):double | first_letter:keyword +1.559999942779541 | A +2.0799999237060547 | B +1.7799999713897705 | C +2.0299999713897705 | G +2.049999952316284 | K +1.8300000429153442 | P +2.0999999046325684 | S +1.7000000476837158 | T +; diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/ints.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/ints.csv-spec index f4b6d41a7a027..8a229f2f841b3 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/ints.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/ints.csv-spec @@ -1003,3 +1003,282 @@ emp_no:integer |salary_change.int:integer 10079 | 7 10086 | 13 ; + +rowMvMinLong + ROW l=[1, 2]::LONG +| EVAL l=MV_MIN(l) +; + +l:long +1 +; + +rowMvMaxLong + ROW l=[1, 2]::LONG +| EVAL l=MV_MAX(l) +; + +l:long +2 +; + +rowMvMinInt + ROW i=[1, 2]::INT +| EVAL i=MV_MIN(i) +; + +i:integer +1 +; + +rowMvMaxInt + ROW i=[1, 2]::INT +| EVAL i=MV_MAX(i) +; + +i:integer +2 +; + +groupMvLong + FROM employees +| STATS MIN(emp_no) BY salary_change=salary_change.long +| SORT salary_change DESC +| LIMIT 5 +; + +MIN(emp_no):integer | salary_change:long +10009 | null +10003 | 14 +10004 | 13 +10003 | 12 +10002 | 11 +; + +groupMvMinLong + FROM employees +| STATS MIN(emp_no) BY salary_change=MV_MIN(salary_change.long) +| SORT salary_change DESC +| LIMIT 5 +; + +MIN(emp_no):integer | salary_change:long +10009 | null +10086 | 13 +10003 | 12 +10044 | 8 +10079 | 7 +; + +groupMvMaxLong + FROM employees +| STATS MIN(emp_no) BY salary_change=MV_MAX(salary_change.long) +| SORT salary_change DESC +| LIMIT 5 +; + +MIN(emp_no):integer | salary_change:long +10009 | null +10003 | 14 +10004 | 13 +10008 | 12 +10002 | 11 +; + +groupMvInt + FROM employees +| STATS MIN(emp_no) BY salary_change=salary_change.int +| SORT salary_change DESC +| LIMIT 5 +; + +MIN(emp_no):integer | salary_change:integer +10009 | null +10003 | 14 +10004 | 13 +10003 | 12 +10002 | 11 +; + +groupMvMinInt + FROM employees +| STATS MIN(emp_no) BY salary_change=MV_MIN(salary_change.int) +| SORT salary_change DESC +| LIMIT 5 +; + +MIN(emp_no):integer | salary_change:integer +10009 | null +10086 | 13 +10003 | 12 +10044 | 8 +10079 | 7 +; + +groupMvMaxInt + FROM employees +| STATS MIN(emp_no) BY salary_change=MV_MAX(salary_change.int) +| SORT salary_change DESC +| LIMIT 5 +; + +MIN(emp_no):integer | salary_change:integer +10009 | null +10003 | 14 +10004 | 13 +10008 | 12 +10002 | 11 +; + +valuesMvMinLong + FROM employees +| WHERE emp_no <= 10009 +| EVAL first_letter = SUBSTRING(first_name, 0, 1) +| STATS MV_MIN(VALUES(languages.long)) BY first_letter +| SORT first_letter ASC +; + +MV_MIN(VALUES(languages.long)):long | first_letter:keyword +3 | A +5 | B +5 | C +2 | G +1 | K +4 | P +1 | S +4 | T +; + +valuesMvMaxLong + FROM employees +| WHERE emp_no <= 10009 +| EVAL first_letter = SUBSTRING(first_name, 0, 1) +| STATS MV_MAX(VALUES(languages.long)) BY first_letter +| SORT first_letter ASC +; + +MV_MAX(VALUES(languages.long)):long | first_letter:keyword +3 | A +5 | B +5 | C +2 | G +1 | K +4 | P +2 | S +4 | T +; + +valuesMvMinInt + FROM employees +| WHERE emp_no <= 10009 +| EVAL first_letter = SUBSTRING(first_name, 0, 1) +| STATS MV_MIN(VALUES(languages)) BY first_letter +| SORT first_letter ASC +; + +MV_MIN(VALUES(languages)):integer | first_letter:keyword +3 | A +5 | B +5 | C +2 | G +1 | K +4 | P +1 | S +4 | T +; + +valuesMvMaxInt + FROM employees +| WHERE emp_no <= 10009 +| EVAL first_letter = SUBSTRING(first_name, 0, 1) +| STATS MV_MAX(VALUES(languages)) BY first_letter +| SORT first_letter ASC +; + +MV_MAX(VALUES(languages)):integer | first_letter:keyword +3 | A +5 | B +5 | C +2 | G +1 | K +4 | P +2 | S +4 | T +; + +valuesMvMinShort + FROM employees +| WHERE emp_no <= 10009 +| EVAL first_letter = SUBSTRING(first_name, 0, 1) +| STATS MV_MIN(VALUES(languages.short)) BY first_letter +| SORT first_letter ASC +; + +MV_MIN(VALUES(languages.short)):integer | first_letter:keyword +3 | A +5 | B +5 | C +2 | G +1 | K +4 | P +1 | S +4 | T +; + +valuesMvMaxShort + FROM employees +| WHERE emp_no <= 10009 +| EVAL first_letter = SUBSTRING(first_name, 0, 1) +| STATS MV_MAX(VALUES(languages.short)) BY first_letter +| SORT first_letter ASC +; + +MV_MAX(VALUES(languages.short)):integer | first_letter:keyword +3 | A +5 | B +5 | C +2 | G +1 | K +4 | P +2 | S +4 | T +; + + +valuesMvMinByte + FROM employees +| WHERE emp_no <= 10009 +| EVAL first_letter = SUBSTRING(first_name, 0, 1) +| STATS MV_MIN(VALUES(languages.byte)) BY first_letter +| SORT first_letter ASC +; + +MV_MIN(VALUES(languages.byte)):integer | first_letter:keyword +3 | A +5 | B +5 | C +2 | G +1 | K +4 | P +1 | S +4 | T +; + +valuesMvMaxByte + FROM employees +| WHERE emp_no <= 10009 +| EVAL first_letter = SUBSTRING(first_name, 0, 1) +| STATS MV_MAX(VALUES(languages.byte)) BY first_letter +| SORT first_letter ASC +; + +MV_MAX(VALUES(languages.byte)):integer | first_letter:keyword +3 | A +5 | B +5 | C +2 | G +1 | K +4 | P +2 | S +4 | T +; diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/ip.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/ip.csv-spec index 6f83b54606e05..615c90f2e2bb1 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/ip.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/ip.csv-spec @@ -572,6 +572,39 @@ required_capability: agg_values fe80::cae2:65ff:fece:feb9 | gamma ; +valuesGroupedMvMin +required_capability: agg_values + + FROM hosts +| EVAL host=SUBSTRING(host, 0, 1) +| STATS ip0=MV_MIN(VALUES(ip0)) BY host +| SORT host +; + + ip0:ip | host:keyword + ::1 | a + 127.0.0.1 | b +fe80::cae2:65ff:fece:feb9 | e +fe80::cae2:65ff:fece:feb9 | g +; + +valuesGroupedMvMax +required_capability: agg_values + + FROM hosts +| EVAL host=SUBSTRING(host, 0, 1) +| STATS ip0=MV_MAX(VALUES(ip0)) BY host +| SORT host +; + + ip0:ip | host:keyword + 127.0.0.1 | a + 127.0.0.1 | b +fe82::cae2:65ff:fece:fec0 | e +fe80::cae2:65ff:fece:feb9 | g + +; + implictCastingEqual required_capability: string_literal_auto_casting_extended from hosts | where mv_first(ip0) == "127.0.0.1" | keep host, ip0 | sort host; @@ -823,3 +856,67 @@ warning:Line 2:20: java.lang.IllegalArgumentException: 'invalid_network' is not ip0:ip |ip1:ip |direction:keyword 127.0.0.1 |8.8.8.8 |null ; + +mvMinRow + ROW ip=["192.168.0.1", "10.10.0.1"]::IP +| EVAL ip=MV_MIN(ip) +; + + ip:ip +10.10.0.1 +; + +mvMaxRow + ROW ip=["192.168.0.1", "10.10.0.1"]::IP +| EVAL ip=MV_MAX(ip) +; + + ip:ip +192.168.0.1 +; + +groupMv + FROM hosts +| STATS COUNT(*) BY ip0=ip0 +| SORT ip0 ASC +; + +COUNT(*):long | ip0:ip +1 | ::1 +4 | 127.0.0.1 +3 | fe80::cae2:65ff:fece:feb9 +1 | fe80::cae2:65ff:fece:fec0 +1 | fe80::cae2:65ff:fece:fec1 +1 | fe81::cae2:65ff:fece:feb9 +1 | fe82::cae2:65ff:fece:fec0 +1 | null +; + +groupMvMin + FROM hosts +| STATS COUNT(*) BY ip0=MV_MIN(ip0) +| SORT ip0 ASC +; + +COUNT(*):long | ip0:ip +1 | ::1 +4 | 127.0.0.1 +3 | fe80::cae2:65ff:fece:feb9 +1 | fe81::cae2:65ff:fece:feb9 +1 | null +; + +groupMvMax + FROM hosts +| STATS COUNT(*) BY ip0=MV_MAX(ip0) +| SORT ip0 ASC +; + +COUNT(*):long | ip0:ip +1 | ::1 +4 | 127.0.0.1 +2 | fe80::cae2:65ff:fece:feb9 +1 | fe80::cae2:65ff:fece:fec1 +1 | fe82::cae2:65ff:fece:fec0 +1 | null +; diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/string.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/string.csv-spec index b0bd91373e002..9c7765584d9c9 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/string.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/string.csv-spec @@ -672,6 +672,34 @@ min(salary):integer | max(salary):integer | job_positions:keyword 25324 | 58715 | Head Human Resources ; +groupByMvMin + FROM employees +| STATS MIN(salary), MAX(salary) BY job_positions=MV_MIN(job_positions) +| SORT job_positions +| LIMIT 5; + +MIN(salary):integer | MAX(salary):integer | job_positions:keyword +25976 | 74970 | Accountant +28941 | 69904 | Architect +29175 | 50249 | Business Analyst +25945 | 74999 | Data Scientist +25324 | 50064 | Head Human Resources +; + +groupByMvMax + FROM employees +| STATS MIN(salary), MAX(salary) BY job_positions=MV_MAX(job_positions) +| SORT job_positions +| LIMIT 5; + +MIN(salary):integer | MAX(salary):integer | job_positions:keyword +47411 | 47411 | Accountant +28941 | 28941 | Architect +39110 | 48942 | Head Human Resources +26436 | 50128 | Internship +25976 | 64675 | Junior Developer +; + convertFromString from employees | sort emp_no | eval positions = to_string(job_positions) | keep emp_no, positions, job_positions | limit 5; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/blockloader/BlockLoaderExpression.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/blockloader/BlockLoaderExpression.java index 107a15bcb8e35..53d5039da5e1e 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/blockloader/BlockLoaderExpression.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/blockloader/BlockLoaderExpression.java @@ -173,7 +173,7 @@ *

      * Look for your function in the csv-spec tests and make sure there are cases that * contain your function processing each data type you are pushing. For each type, - * make sure the function processes the results of + * make sure the function processes the results of: *

      *
        *
      • {@code ROW} - these won't use your new code
      • diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/stats/SearchContextStats.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/stats/SearchContextStats.java index a6343ecbf7d67..d5ac62273bff0 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/stats/SearchContextStats.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/stats/SearchContextStats.java @@ -165,6 +165,9 @@ public boolean supportsLoaderConfig( BlockLoaderFunctionConfig config, MappedFieldType.FieldExtractPreference preference ) { + if (config == null) { + throw new UnsupportedOperationException("config must be provided"); + } for (SearchExecutionContext context : contexts) { MappedFieldType ft = context.getFieldType(name.string()); if (ft == null) { From 5b80d783276d2bae747794393f0050d3b1868f08 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Tue, 25 Nov 2025 09:20:34 -0500 Subject: [PATCH 11/13] Remove extra --- .../aggregations/500_percentiles_bucket.yml | 293 ------------------ 1 file changed, 293 deletions(-) delete mode 100644 modules/aggregations/src/yamlRestTest/resources/rest-api-spec/test/aggregations/500_percentiles_bucket.yml diff --git a/modules/aggregations/src/yamlRestTest/resources/rest-api-spec/test/aggregations/500_percentiles_bucket.yml b/modules/aggregations/src/yamlRestTest/resources/rest-api-spec/test/aggregations/500_percentiles_bucket.yml deleted file mode 100644 index 2133f0b3a0f3a..0000000000000 --- a/modules/aggregations/src/yamlRestTest/resources/rest-api-spec/test/aggregations/500_percentiles_bucket.yml +++ /dev/null @@ -1,293 +0,0 @@ -setup: - - do: - bulk: - index: no_gaps - refresh: true - body: - - { "index": { } } - - { "@timestamp": "2022-01-01T00:00:00", "v": 1 } - - { "index": { } } - - { "@timestamp": "2022-01-01T01:00:00", "v": 2 } - - { "index": { } } - - { "@timestamp": "2022-01-01T02:00:00", "v": 1 } - - - do: - bulk: - index: gaps - refresh: true - body: - - { "index": { } } - - { "@timestamp": "2022-01-01T00:00:00", "v": 1 } - - { "index": { } } - - { "@timestamp": "2022-01-01T02:00:00", "v": 2 } - - { "index": { } } - - { "@timestamp": "2022-01-01T03:00:00", "v": 1 } - ---- -basic: - - do: - search: - index: no_gaps - body: - size: 0 - aggs: - "@timestamp": - date_histogram: - field: "@timestamp" - fixed_interval: 1h - aggs: - v: {avg: {field: v}} - d: - percentiles_bucket: - buckets_path: "@timestamp>v" - - match: { hits.total.value: 3 } - - length: { aggregations.@timestamp.buckets: 3 } - - match: - aggregations.d.values: - 1.0: 1.0 - 5.0: 1.0 - 25.0: 1.0 - 50.0: 1.0 - 75.0: 2.0 - 95.0: 2.0 - 99.0: 2.0 - ---- -format: - - do: - search: - index: no_gaps - body: - size: 0 - aggs: - "@timestamp": - date_histogram: - field: "@timestamp" - fixed_interval: 1h - aggs: - v: {avg: {field: v}} - d: - percentiles_bucket: - buckets_path: "@timestamp>v" - format: "0.00" - - match: { hits.total.value: 3 } - - length: { aggregations.@timestamp.buckets: 3 } - - match: - aggregations.d.values: - 1.0: 1.0 - 5.0: 1.0 - 25.0: 1.0 - 50.0: 1.0 - 75.0: 2.0 - 95.0: 2.0 - 99.0: 2.0 - 1.0_as_string: "1.00" - 5.0_as_string: "1.00" - 50.0_as_string: "1.00" - 25.0_as_string: "1.00" - 75.0_as_string: "2.00" - 95.0_as_string: "2.00" - 99.0_as_string: "2.00" - ---- -gap_policy=skip: - - do: - search: - index: gaps - body: - size: 0 - aggs: - "@timestamp": - date_histogram: - field: "@timestamp" - fixed_interval: 1h - aggs: - v: {avg: {field: v}} - d: - percentiles_bucket: - buckets_path: "@timestamp>v" - gap_policy: skip - - match: { hits.total.value: 3 } - - length: { aggregations.@timestamp.buckets: 4 } - - match: - aggregations.d.values: - 1.0: 1.0 - 5.0: 1.0 - 25.0: 1.0 - 50.0: 1.0 - 75.0: 2.0 - 95.0: 2.0 - 99.0: 2.0 - ---- -gap_policy=insert_zeros: - - do: - search: - index: gaps - body: - size: 0 - aggs: - "@timestamp": - date_histogram: - field: "@timestamp" - fixed_interval: 1h - aggs: - v: {avg: {field: v}} - d: - percentiles_bucket: - buckets_path: "@timestamp>v" - gap_policy: insert_zeros - - match: { hits.total.value: 3 } - - length: { aggregations.@timestamp.buckets: 4 } - - match: - aggregations.d.values: - 1.0: 0.0 - 5.0: 0.0 - 25.0: 1.0 - 50.0: 1.0 - 75.0: 1.0 - 95.0: 2.0 - 99.0: 2.0 - ---- -gap_policy=keep_value: - - do: - search: - index: gaps - body: - size: 0 - aggs: - "@timestamp": - date_histogram: - field: "@timestamp" - fixed_interval: 1h - aggs: - v: {avg: {field: v}} - d: - percentiles_bucket: - buckets_path: "@timestamp>v" - gap_policy: keep_values - - match: { hits.total.value: 3 } - - length: { aggregations.@timestamp.buckets: 4 } - - match: - aggregations.d.values: - 1.0: 1.0 - 5.0: 1.0 - 25.0: 1.0 - 50.0: 1.0 - 75.0: 2.0 - 95.0: 2.0 - 99.0: 2.0 - ---- -dotted name: - - do: - search: - index: no_gaps - body: - size: 0 - aggs: - "@time.stamp": - date_histogram: - field: "@timestamp" - fixed_interval: 1h - aggs: - v: {avg: {field: v}} - d: - percentiles_bucket: - buckets_path: "@time.stamp>v" - - match: { hits.total.value: 3 } - - length: { aggregations.@time\.stamp.buckets: 3 } - - match: - aggregations.d.values: - 1.0: 1.0 - 5.0: 1.0 - 25.0: 1.0 - 50.0: 1.0 - 75.0: 2.0 - 95.0: 2.0 - 99.0: 2.0 - ---- -dotted value: - - do: - search: - index: no_gaps - body: - size: 0 - aggs: - "@timestamp": - date_histogram: - field: "@timestamp" - fixed_interval: 1h - aggs: - v: - percentiles: - field: v - percents: [ 50, 99.9 ] - d: - percentiles_bucket: - buckets_path: "@timestamp>v[99.9]" - - match: { hits.total.value: 3 } - - length: { aggregations.@timestamp.buckets: 3 } - - match: - aggregations.d.values: - 1.0: 1.0 - 5.0: 1.0 - 25.0: 1.0 - 50.0: 1.0 - 75.0: 2.0 - 95.0: 2.0 - 99.0: 2.0 - ---- -no results: - - do: - search: - index: no_gaps - body: - size: 0 - query: - match: - missing_field: not_found - aggs: - "@timestamp": - date_histogram: - field: "@timestamp" - fixed_interval: 1h - aggs: - v: {avg: {field: v}} - d: - percentiles_bucket: - buckets_path: "@timestamp>v" - - match: { hits.total.value: 0 } - - length: { aggregations.@timestamp.buckets: 0 } - - match: - aggregations.d.values: - 1.0: null - 5.0: null - 25.0: null - 50.0: null - 75.0: null - 95.0: null - 99.0: null - ---- -bad path: - - do: - catch: '/Validation Failed: 1: No aggregation \[v\] found for path \[@timestamp>v\];/' - search: - index: no_gaps - body: - size: 0 - query: - match: - missing_field: not_found - aggs: - "@timestamp": - date_histogram: - field: "@timestamp" - fixed_interval: 1h - d: - percentiles_bucket: - buckets_path: "@timestamp>v" From a61e7a72ad86db7f5a798a3c94e1c26516b540f5 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Tue, 25 Nov 2025 12:10:36 -0500 Subject: [PATCH 12/13] fixup --- .../local/PushExpressionsToFieldLoad.java | 109 +++++++++++++----- 1 file changed, 77 insertions(+), 32 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/PushExpressionsToFieldLoad.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/PushExpressionsToFieldLoad.java index e700b77cd7163..d019be3730047 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/PushExpressionsToFieldLoad.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/PushExpressionsToFieldLoad.java @@ -34,9 +34,40 @@ import static org.elasticsearch.xpack.esql.core.expression.Attribute.rawTemporaryName; /** - * Replaces vector similarity functions with a field attribute that applies - * the similarity function during value loading, when one side of the function is a literal. - * It also adds the new field function attribute to the EsRelation output, and adds a projection after it to remove it from the output. + * Replaces {@link Expression}s that can be pushed to field loading with a field attribute + * that calculates the expression during value loading. See {@link BlockLoaderExpression} + * for more about how these loads are implemented and why we do this. + *

        + * This rule runs in one downward (aka output-to-read) pass, making four sorts + * of transformations: + *

        + *
          + *
        • + * When we see a use of a new pushable function we build an + * attribute for the function, record that attribute, and discard it after use. + * For example, {@code EVAL l = LENGTH(message)} becomes + * {@code EVAL l = $$message$LENGTH$1324$$ | DROP $$message$LENGTH$1324$$ }. + * We need the {@code DROP} so we don't change the output schema. + *
        • + *
        • + * When we see a use of pushable function for which we already have an attribute + * we just use it. This looks like the {@code l} attribute in + * {@code EVAL l = LENGTH(message) | EVAL l2 = LENGTH(message)} + *
        • + *
        • + * When we see a PROJECT, add any new attributes to the projection so we can use + * them on previously visited nodes. So {@code KEEP foo | EVAL l = LENGTH(message)} + * becomes + *
          {@code
          + *           | KEEP foo, $$message$LENGTH$1324$$
          + *           | EVAL l = $$message$LENGTH$1324$$
          + *           | DROP $$message$LENGTH$1324$$}
          + *         }
          + *
        • + *
        • + * When we see a relation, add the attribute to it. + *
        • + *
        */ public class PushExpressionsToFieldLoad extends ParameterizedRule { @@ -56,7 +87,7 @@ private class Rule { * The primary indices, lazily initialized. */ private List primaries; - private boolean planWasTransformed = false; + private boolean addedNewAttribute = false; private Rule(LocalLogicalOptimizerContext context, LogicalPlan plan) { this.context = context; @@ -64,37 +95,38 @@ private Rule(LocalLogicalOptimizerContext context, LogicalPlan plan) { } private LogicalPlan doRule(LogicalPlan plan) { - planWasTransformed = false; + addedNewAttribute = false; if (plan instanceof Eval || plan instanceof Filter || plan instanceof Aggregate) { - LogicalPlan transformedPlan = plan.transformExpressionsOnly(Expression.class, e -> { - if (e instanceof BlockLoaderExpression ble) { - return transformExpression(e, ble); - } - return e; - }); + return transformPotentialInvocation(plan); + } + if (addedAttrs.isEmpty()) { + return plan; + } + if (plan instanceof EsqlProject project) { + return transformProject(project); + } + if (plan instanceof EsRelation rel) { + return transformRelation(rel); + } + return plan; + } - // TODO rebuild everything one time rather than after each find. - if (planWasTransformed == false) { - return plan; + private LogicalPlan transformPotentialInvocation(LogicalPlan plan) { + LogicalPlan transformedPlan = plan.transformExpressionsOnly(Expression.class, e -> { + if (e instanceof BlockLoaderExpression ble) { + return transformExpression(e, ble); } - - List previousAttrs = transformedPlan.output(); - // Transforms EsRelation to extract the new attributes - List addedAttrsList = addedAttrs.values().stream().toList(); - transformedPlan = transformedPlan.transformDown(EsRelation.class, esRelation -> { - AttributeSet updatedOutput = esRelation.outputSet().combine(AttributeSet.of(addedAttrsList)); - return esRelation.withAttributes(updatedOutput.stream().toList()); - }); - // Transforms Projects so the new attribute is not discarded - transformedPlan = transformedPlan.transformDown(EsqlProject.class, esProject -> { - List projections = new ArrayList<>(esProject.projections()); - projections.addAll(addedAttrsList); - return esProject.withProjections(projections); - }); - - return new EsqlProject(Source.EMPTY, transformedPlan, previousAttrs); + return e; + }); + if (addedNewAttribute == false) { + /* + * Either didn't see anything pushable or everything pushable already + * has a pushed attribute. + */ + return plan; } - return plan; + // Found a new pushable attribute, discard it *after* use so we don't modify the output. + return new EsqlProject(Source.EMPTY, transformedPlan, transformedPlan.output()); } private Expression transformExpression(Expression e, BlockLoaderExpression ble) { @@ -109,10 +141,23 @@ private Expression transformExpression(Expression e, BlockLoaderExpression ble) if (context.searchStats().supportsLoaderConfig(fuse.field().fieldName(), fuse.config(), preference) == false) { return e; } - planWasTransformed = true; + addedNewAttribute = true; return replaceFieldsForFieldTransformations(e, fuse); } + private LogicalPlan transformProject(EsqlProject project) { + // Preserve any pushed attributes so we can use them later + List projections = new ArrayList<>(project.projections()); + projections.addAll(addedAttrs.values()); + return project.withProjections(projections); + } + + private LogicalPlan transformRelation(EsRelation rel) { + // Add the pushed attribute + AttributeSet updatedOutput = rel.outputSet().combine(AttributeSet.of(addedAttrs.values())); + return rel.withAttributes(updatedOutput.stream().toList()); + } + private Expression replaceFieldsForFieldTransformations(Expression e, BlockLoaderExpression.PushedBlockLoaderExpression fuse) { // Change the expression to a reference of the pushed down function on the field FunctionEsField functionEsField = new FunctionEsField(fuse.field().field(), e.dataType(), fuse.config()); From 35e7cdc20126785a9b5334bf628785cb5fa21e47 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Tue, 25 Nov 2025 14:51:05 -0500 Subject: [PATCH 13/13] Undo accident didn't mean to have this in this PR --- .../local/PushExpressionsToFieldLoad.java | 109 +++++------------- 1 file changed, 32 insertions(+), 77 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/PushExpressionsToFieldLoad.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/PushExpressionsToFieldLoad.java index d019be3730047..e700b77cd7163 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/PushExpressionsToFieldLoad.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/PushExpressionsToFieldLoad.java @@ -34,40 +34,9 @@ import static org.elasticsearch.xpack.esql.core.expression.Attribute.rawTemporaryName; /** - * Replaces {@link Expression}s that can be pushed to field loading with a field attribute - * that calculates the expression during value loading. See {@link BlockLoaderExpression} - * for more about how these loads are implemented and why we do this. - *

        - * This rule runs in one downward (aka output-to-read) pass, making four sorts - * of transformations: - *

        - *
          - *
        • - * When we see a use of a new pushable function we build an - * attribute for the function, record that attribute, and discard it after use. - * For example, {@code EVAL l = LENGTH(message)} becomes - * {@code EVAL l = $$message$LENGTH$1324$$ | DROP $$message$LENGTH$1324$$ }. - * We need the {@code DROP} so we don't change the output schema. - *
        • - *
        • - * When we see a use of pushable function for which we already have an attribute - * we just use it. This looks like the {@code l} attribute in - * {@code EVAL l = LENGTH(message) | EVAL l2 = LENGTH(message)} - *
        • - *
        • - * When we see a PROJECT, add any new attributes to the projection so we can use - * them on previously visited nodes. So {@code KEEP foo | EVAL l = LENGTH(message)} - * becomes - *
          {@code
          - *           | KEEP foo, $$message$LENGTH$1324$$
          - *           | EVAL l = $$message$LENGTH$1324$$
          - *           | DROP $$message$LENGTH$1324$$}
          - *         }
          - *
        • - *
        • - * When we see a relation, add the attribute to it. - *
        • - *
        + * Replaces vector similarity functions with a field attribute that applies + * the similarity function during value loading, when one side of the function is a literal. + * It also adds the new field function attribute to the EsRelation output, and adds a projection after it to remove it from the output. */ public class PushExpressionsToFieldLoad extends ParameterizedRule { @@ -87,7 +56,7 @@ private class Rule { * The primary indices, lazily initialized. */ private List primaries; - private boolean addedNewAttribute = false; + private boolean planWasTransformed = false; private Rule(LocalLogicalOptimizerContext context, LogicalPlan plan) { this.context = context; @@ -95,38 +64,37 @@ private Rule(LocalLogicalOptimizerContext context, LogicalPlan plan) { } private LogicalPlan doRule(LogicalPlan plan) { - addedNewAttribute = false; + planWasTransformed = false; if (plan instanceof Eval || plan instanceof Filter || plan instanceof Aggregate) { - return transformPotentialInvocation(plan); - } - if (addedAttrs.isEmpty()) { - return plan; - } - if (plan instanceof EsqlProject project) { - return transformProject(project); - } - if (plan instanceof EsRelation rel) { - return transformRelation(rel); - } - return plan; - } + LogicalPlan transformedPlan = plan.transformExpressionsOnly(Expression.class, e -> { + if (e instanceof BlockLoaderExpression ble) { + return transformExpression(e, ble); + } + return e; + }); - private LogicalPlan transformPotentialInvocation(LogicalPlan plan) { - LogicalPlan transformedPlan = plan.transformExpressionsOnly(Expression.class, e -> { - if (e instanceof BlockLoaderExpression ble) { - return transformExpression(e, ble); + // TODO rebuild everything one time rather than after each find. + if (planWasTransformed == false) { + return plan; } - return e; - }); - if (addedNewAttribute == false) { - /* - * Either didn't see anything pushable or everything pushable already - * has a pushed attribute. - */ - return plan; + + List previousAttrs = transformedPlan.output(); + // Transforms EsRelation to extract the new attributes + List addedAttrsList = addedAttrs.values().stream().toList(); + transformedPlan = transformedPlan.transformDown(EsRelation.class, esRelation -> { + AttributeSet updatedOutput = esRelation.outputSet().combine(AttributeSet.of(addedAttrsList)); + return esRelation.withAttributes(updatedOutput.stream().toList()); + }); + // Transforms Projects so the new attribute is not discarded + transformedPlan = transformedPlan.transformDown(EsqlProject.class, esProject -> { + List projections = new ArrayList<>(esProject.projections()); + projections.addAll(addedAttrsList); + return esProject.withProjections(projections); + }); + + return new EsqlProject(Source.EMPTY, transformedPlan, previousAttrs); } - // Found a new pushable attribute, discard it *after* use so we don't modify the output. - return new EsqlProject(Source.EMPTY, transformedPlan, transformedPlan.output()); + return plan; } private Expression transformExpression(Expression e, BlockLoaderExpression ble) { @@ -141,23 +109,10 @@ private Expression transformExpression(Expression e, BlockLoaderExpression ble) if (context.searchStats().supportsLoaderConfig(fuse.field().fieldName(), fuse.config(), preference) == false) { return e; } - addedNewAttribute = true; + planWasTransformed = true; return replaceFieldsForFieldTransformations(e, fuse); } - private LogicalPlan transformProject(EsqlProject project) { - // Preserve any pushed attributes so we can use them later - List projections = new ArrayList<>(project.projections()); - projections.addAll(addedAttrs.values()); - return project.withProjections(projections); - } - - private LogicalPlan transformRelation(EsRelation rel) { - // Add the pushed attribute - AttributeSet updatedOutput = rel.outputSet().combine(AttributeSet.of(addedAttrs.values())); - return rel.withAttributes(updatedOutput.stream().toList()); - } - private Expression replaceFieldsForFieldTransformations(Expression e, BlockLoaderExpression.PushedBlockLoaderExpression fuse) { // Change the expression to a reference of the pushed down function on the field FunctionEsField functionEsField = new FunctionEsField(fuse.field().field(), e.dataType(), fuse.config());