diff --git a/docs/changelog/138029.yaml b/docs/changelog/138029.yaml new file mode 100644 index 0000000000000..977bc60d06daf --- /dev/null +++ b/docs/changelog/138029.yaml @@ -0,0 +1,5 @@ +pr: 138029 +summary: Fuse MV_MIN and MV_MAX and document process +area: ES|QL +type: feature +issues: [] diff --git a/muted-tests.yml b/muted-tests.yml index 235b1f60de5cd..16d503633bc78 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -438,6 +438,9 @@ tests: - class: org.elasticsearch.xpack.esql.heap_attack.HeapAttackLookupJoinIT method: testLookupExplosionBigString issue: https://github.com/elastic/elasticsearch/issues/138510 +- class: org.elasticsearch.xpack.esql.qa.single_node.GenerativeForkIT + method: test {csv-spec:inlinestats.MvMinMvExpand} + issue: https://github.com/elastic/elasticsearch/issues/137679 - class: org.elasticsearch.xpack.esql.optimizer.rules.physical.local.SubstituteRoundToTests method: testSubqueryWithCountStarAndDateTrunc {default} issue: https://github.com/elastic/elasticsearch/issues/138601 diff --git a/server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java b/server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java index 8e5d247aaccb0..341d842a9f7b2 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java @@ -17,6 +17,8 @@ import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; import org.elasticsearch.index.mapper.blockloader.docvalues.BlockDocValuesReader; +import org.elasticsearch.index.mapper.blockloader.docvalues.fn.MvMaxLongsFromDocValuesBlockLoader; +import org.elasticsearch.index.mapper.blockloader.docvalues.fn.MvMinLongsFromDocValuesBlockLoader; import org.elasticsearch.search.fetch.StoredFieldsSpec; import org.elasticsearch.search.lookup.Source; @@ -25,8 +27,139 @@ import java.util.Map; /** - * Interface for loading data in a block shape. Instances of this class - * must be immutable and thread safe. + * Loads values from a chunk of lucene documents into a "Block" for the compute engine. + *
+ * Think of a Block as an array of values for a sequence of lucene documents. That's + * almost true! For the purposes of implementing {@link BlockLoader}, it's close enough. + * The compute engine operates on arrays because the good folks that build CPUs have + * spent the past 40 years making them really really good at running tight loops over + * arrays of data. So we play along with the CPU and make arrays. + *
+ *+ * There are a lot of interesting choices hiding in here to make getting those arrays + * out of lucene work well: + *
+ *+ * Generally reads are faster from {@code doc_values}, slower from {@code stored} fields, + * and even slower from {@code _source}. If we get to chose, we pick {@code doc_values}. + * But we work with what's on disk and that's a product of the field type and what the user's + * configured. Picking the optimal choice given what's on disk is the responsibility of each + * field's {@link MappedFieldType#blockLoader} method. The more configurable the field's + * storage strategies the more {@link BlockLoader}s you have to implement to integrate it + * with ESQL. It can get to be a lot. Sorry. + *
+ *+ * For a field to be supported by ESQL fully it has to be loadable if it was configured to be + * stored in any way. It's possible to turn off storage entirely by turning off + * {@code doc_values} and {@code _source} and {@code stored} fields. In that case, it's + * acceptable to return {@link ConstantNullsReader}. User turned the field off, best we can do + * is {@code null}. + *
+ *+ * We also sometimes want to "push" executing some ESQL functions into the block loader itself. + * Usually we do this when it's a ton faster. See the docs for {@code BlockLoaderExpression} + * for why and how we do this. + *
+ *+ * For example, {@code long} fields implement these block loaders: + *
+ *+ * NOTE: We can't read from {@code long}s from {@code stored} fields which is a + * bug, but maybe not + * a terrible one because it's very uncommon to configure {@code long} to be {@code stored} + * but to disable {@code _source} and {@code doc_values}. Nothing's perfect. Especially + * code. + *
+ *+ * When we described how to read from {@code doc_values} we said we prefer + * to use {@link ColumnAtATimeReader}. But some callers don't support reading column-at-a-time + * and need to read row-by-row. So we also need an implementation of {@link RowStrideReader} + * that reads from {@code doc_values}. Usually it's most convenient to implement both of those + * in the same {@code class}. {@link AllReader} is an interface for those sorts of classes, and + * you'll see it in the {@code doc_values} code frequently. + *
+ *+ * When decompressing {@code stored} fields lucene can skip stored field that aren't used. They + * still have to be decompressed, but they aren't turned into java objects which saves a fair bit + * of work. If you don't need any stored fields return {@link StoredFieldsSpec#NO_REQUIREMENTS}. + * Otherwise, return what you need. + *
+ *+ * Instances of this class must be immutable and thread safe. Instances of + * {@link ColumnAtATimeReader} and {@link RowStrideReader} are all mutable and can only + * be accessed by one thread at a time but may be passed between threads. + * See implementations {@link Reader#canReuse} for how that's handled. "Normal" java objects + * don't need to do anything special to be kicked from thread to thread - the transfer itself + * establishes a {@code happens-before} relationship that makes everything you need visible. + * But Lucene's readers aren't "normal" java objects and sometimes need to be rebuilt if we + * shift threads. + *
*/ public interface BlockLoader { /** @@ -115,10 +248,26 @@ interface StoredFields { Map+ * This is just a marker interface for these results. The compute engine + * has fleshed out implementations. + *
*/ interface Block extends Releasable {} diff --git a/server/src/main/java/org/elasticsearch/index/mapper/IpFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/IpFieldMapper.java index 5f5994b18acba..5d9d5d7ce7582 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/IpFieldMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/IpFieldMapper.java @@ -33,7 +33,10 @@ import org.elasticsearch.index.fielddata.FieldDataContext; import org.elasticsearch.index.fielddata.IndexFieldData; import org.elasticsearch.index.fielddata.plain.SortedSetOrdinalsIndexFieldData; +import org.elasticsearch.index.mapper.blockloader.BlockLoaderFunctionConfig; import org.elasticsearch.index.mapper.blockloader.docvalues.BytesRefsFromOrdsBlockLoader; +import org.elasticsearch.index.mapper.blockloader.docvalues.fn.MvMaxBytesRefsFromOrdsBlockLoader; +import org.elasticsearch.index.mapper.blockloader.docvalues.fn.MvMinBytesRefsFromOrdsBlockLoader; import org.elasticsearch.index.query.SearchExecutionContext; import org.elasticsearch.script.IpFieldScript; import org.elasticsearch.script.Script; @@ -457,7 +460,15 @@ public static Query rangeQuery( @Override public BlockLoader blockLoader(BlockLoaderContext blContext) { if (hasDocValues() && (blContext.fieldExtractPreference() != FieldExtractPreference.STORED || isSyntheticSource)) { - return new BytesRefsFromOrdsBlockLoader(name()); + BlockLoaderFunctionConfig cfg = blContext.blockLoaderFunctionConfig(); + if (cfg == null) { + return new BytesRefsFromOrdsBlockLoader(name()); + } + return switch (cfg.function()) { + case MV_MAX -> new MvMaxBytesRefsFromOrdsBlockLoader(name()); + case MV_MIN -> new MvMinBytesRefsFromOrdsBlockLoader(name()); + default -> throw new UnsupportedOperationException("unknown fusion config [" + cfg.function() + "]"); + }; } if (isStored()) { @@ -475,6 +486,17 @@ public BlockLoader blockLoader(BlockLoaderContext blContext) { return new BlockSourceReader.IpsBlockLoader(sourceValueFetcher(blContext), lookup); } + @Override + public boolean supportsBlockLoaderConfig(BlockLoaderFunctionConfig config, FieldExtractPreference preference) { + if (hasDocValues() && (preference != FieldExtractPreference.STORED || isSyntheticSource)) { + return switch (config.function()) { + case MV_MAX, MV_MIN -> true; + default -> false; + }; + } + return true; + } + private BlockLoader blockLoaderFromFallbackSyntheticSource(BlockLoaderContext blContext) { var reader = new IpFallbackSyntheticSourceReader(nullValue); return new FallbackSyntheticSourceBlockLoader( diff --git a/server/src/main/java/org/elasticsearch/index/mapper/KeywordFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/KeywordFieldMapper.java index 31fd1e404d108..c84c308afdf11 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/KeywordFieldMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/KeywordFieldMapper.java @@ -61,6 +61,8 @@ import org.elasticsearch.index.fielddata.plain.SortedSetOrdinalsIndexFieldData; import org.elasticsearch.index.mapper.blockloader.BlockLoaderFunctionConfig; import org.elasticsearch.index.mapper.blockloader.docvalues.BytesRefsFromOrdsBlockLoader; +import org.elasticsearch.index.mapper.blockloader.docvalues.fn.MvMaxBytesRefsFromOrdsBlockLoader; +import org.elasticsearch.index.mapper.blockloader.docvalues.fn.MvMinBytesRefsFromOrdsBlockLoader; import org.elasticsearch.index.mapper.blockloader.docvalues.fn.Utf8CodePointsFromOrdsBlockLoader; import org.elasticsearch.index.query.AutomatonQueryWithDescription; import org.elasticsearch.index.query.SearchExecutionContext; @@ -732,10 +734,13 @@ public BlockLoader blockLoader(BlockLoaderContext blContext) { if (cfg == null) { return new BytesRefsFromOrdsBlockLoader(name()); } - if (cfg.function() == BlockLoaderFunctionConfig.Function.LENGTH) { - return new Utf8CodePointsFromOrdsBlockLoader(((BlockLoaderFunctionConfig.JustWarnings) cfg).warnings(), name()); - } - throw new UnsupportedOperationException("unknown fusion config [" + cfg.function() + "]"); + return switch (cfg.function()) { + case LENGTH -> new Utf8CodePointsFromOrdsBlockLoader(((BlockLoaderFunctionConfig.JustWarnings) cfg).warnings(), name()); + case MV_MAX -> new MvMaxBytesRefsFromOrdsBlockLoader(name()); + case MV_MIN -> new MvMinBytesRefsFromOrdsBlockLoader(name()); + default -> throw new UnsupportedOperationException("unknown fusion config [" + cfg.function() + "]"); + }; + } if (blContext.blockLoaderFunctionConfig() != null) { throw new UnsupportedOperationException("function fusing only supported for doc values"); @@ -765,7 +770,10 @@ public Builder builder(BlockFactory factory, int expectedCount) { @Override public boolean supportsBlockLoaderConfig(BlockLoaderFunctionConfig config, FieldExtractPreference preference) { if (hasDocValues() && (preference != FieldExtractPreference.STORED || isSyntheticSourceEnabled())) { - return config.function() == BlockLoaderFunctionConfig.Function.LENGTH; + return switch (config.function()) { + case LENGTH, MV_MAX, MV_MIN -> true; + default -> false; + }; } return false; } diff --git a/server/src/main/java/org/elasticsearch/index/mapper/NumberFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/NumberFieldMapper.java index 65d3390309338..57c4a8db113df 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/NumberFieldMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/NumberFieldMapper.java @@ -45,9 +45,16 @@ import org.elasticsearch.index.fielddata.plain.SortedDoublesIndexFieldData; import org.elasticsearch.index.fielddata.plain.SortedNumericIndexFieldData; import org.elasticsearch.index.mapper.TimeSeriesParams.MetricType; +import org.elasticsearch.index.mapper.blockloader.BlockLoaderFunctionConfig; import org.elasticsearch.index.mapper.blockloader.docvalues.DoublesBlockLoader; import org.elasticsearch.index.mapper.blockloader.docvalues.IntsBlockLoader; import org.elasticsearch.index.mapper.blockloader.docvalues.LongsBlockLoader; +import org.elasticsearch.index.mapper.blockloader.docvalues.fn.MvMaxDoublesFromDocValuesBlockLoader; +import org.elasticsearch.index.mapper.blockloader.docvalues.fn.MvMaxIntsFromDocValuesBlockLoader; +import org.elasticsearch.index.mapper.blockloader.docvalues.fn.MvMaxLongsFromDocValuesBlockLoader; +import org.elasticsearch.index.mapper.blockloader.docvalues.fn.MvMinDoublesFromDocValuesBlockLoader; +import org.elasticsearch.index.mapper.blockloader.docvalues.fn.MvMinIntsFromDocValuesBlockLoader; +import org.elasticsearch.index.mapper.blockloader.docvalues.fn.MvMinLongsFromDocValuesBlockLoader; import org.elasticsearch.index.query.SearchExecutionContext; import org.elasticsearch.script.DoubleFieldScript; import org.elasticsearch.script.LongFieldScript; @@ -487,6 +494,16 @@ BlockLoader blockLoaderFromFallbackSyntheticSource( ) { return floatingPointBlockLoaderFromFallbackSyntheticSource(this, fieldName, nullValue, coerce, blContext); } + + @Override + BlockLoader blockLoaderFromDocValuesMvMin(String fieldName) { + return new MvMinDoublesFromDocValuesBlockLoader(fieldName, l -> HalfFloatPoint.sortableShortToHalfFloat((short) l)); + } + + @Override + BlockLoader blockLoaderFromDocValuesMvMax(String fieldName) { + return new MvMaxDoublesFromDocValuesBlockLoader(fieldName, l -> HalfFloatPoint.sortableShortToHalfFloat((short) l)); + } }, FLOAT("float", NumericType.FLOAT) { @Override @@ -685,6 +702,16 @@ BlockLoader blockLoaderFromFallbackSyntheticSource( ) { return floatingPointBlockLoaderFromFallbackSyntheticSource(this, fieldName, nullValue, coerce, blContext); } + + @Override + BlockLoader blockLoaderFromDocValuesMvMin(String fieldName) { + return new MvMinDoublesFromDocValuesBlockLoader(fieldName, l -> NumericUtils.sortableIntToFloat((int) l)); + } + + @Override + BlockLoader blockLoaderFromDocValuesMvMax(String fieldName) { + return new MvMaxDoublesFromDocValuesBlockLoader(fieldName, l -> NumericUtils.sortableIntToFloat((int) l)); + } }, DOUBLE("double", NumericType.DOUBLE) { @Override @@ -849,6 +876,16 @@ BlockLoader blockLoaderFromFallbackSyntheticSource( ) { return floatingPointBlockLoaderFromFallbackSyntheticSource(this, fieldName, nullValue, coerce, blContext); } + + @Override + BlockLoader blockLoaderFromDocValuesMvMin(String fieldName) { + return new MvMinDoublesFromDocValuesBlockLoader(fieldName, NumericUtils::sortableLongToDouble); + } + + @Override + BlockLoader blockLoaderFromDocValuesMvMax(String fieldName) { + return new MvMaxDoublesFromDocValuesBlockLoader(fieldName, NumericUtils::sortableLongToDouble); + } }, BYTE("byte", NumericType.BYTE) { @Override @@ -978,6 +1015,16 @@ BlockLoader blockLoaderFromFallbackSyntheticSource( return integerBlockLoaderFromFallbackSyntheticSource(this, fieldName, nullValue, coerce, blContext); } + @Override + BlockLoader blockLoaderFromDocValuesMvMin(String fieldName) { + return new MvMinIntsFromDocValuesBlockLoader(fieldName); + } + + @Override + BlockLoader blockLoaderFromDocValuesMvMax(String fieldName) { + return new MvMaxIntsFromDocValuesBlockLoader(fieldName); + } + private boolean isOutOfRange(Object value) { double doubleValue = objectToDouble(value); return doubleValue < Byte.MIN_VALUE || doubleValue > Byte.MAX_VALUE; @@ -1106,6 +1153,16 @@ BlockLoader blockLoaderFromFallbackSyntheticSource( return integerBlockLoaderFromFallbackSyntheticSource(this, fieldName, nullValue, coerce, blContext); } + @Override + BlockLoader blockLoaderFromDocValuesMvMin(String fieldName) { + return new MvMinIntsFromDocValuesBlockLoader(fieldName); + } + + @Override + BlockLoader blockLoaderFromDocValuesMvMax(String fieldName) { + return new MvMaxIntsFromDocValuesBlockLoader(fieldName); + } + private boolean isOutOfRange(Object value) { double doubleValue = objectToDouble(value); return doubleValue < Short.MIN_VALUE || doubleValue > Short.MAX_VALUE; @@ -1311,6 +1368,16 @@ BlockLoader blockLoaderFromFallbackSyntheticSource( ) { return integerBlockLoaderFromFallbackSyntheticSource(this, fieldName, nullValue, coerce, blContext); } + + @Override + BlockLoader blockLoaderFromDocValuesMvMin(String fieldName) { + return new MvMinIntsFromDocValuesBlockLoader(fieldName); + } + + @Override + BlockLoader blockLoaderFromDocValuesMvMax(String fieldName) { + return new MvMaxIntsFromDocValuesBlockLoader(fieldName); + } }, LONG("long", NumericType.LONG) { @Override @@ -1497,6 +1564,16 @@ public Builder builder(BlockFactory factory, int expectedCount) { }; } + @Override + BlockLoader blockLoaderFromDocValuesMvMin(String fieldName) { + return new MvMinLongsFromDocValuesBlockLoader(fieldName); + } + + @Override + BlockLoader blockLoaderFromDocValuesMvMax(String fieldName) { + return new MvMaxLongsFromDocValuesBlockLoader(fieldName); + } + private boolean isOutOfRange(Object value) { if (value instanceof Long) { return false; @@ -1766,6 +1843,10 @@ abstract BlockLoader blockLoaderFromFallbackSyntheticSource( MappedFieldType.BlockLoaderContext blContext ); + abstract BlockLoader blockLoaderFromDocValuesMvMin(String fieldName); + + abstract BlockLoader blockLoaderFromDocValuesMvMax(String fieldName); + // All values that fit into integer are returned as integers private static BlockLoader integerBlockLoaderFromFallbackSyntheticSource( NumberType type, @@ -2021,7 +2102,15 @@ public Function+ * Aggregations running on a single "stream" of {@link Block}s should run in + * {@link #SINGLE} mode. This works for aggs that come after a + * {@link TopNOperator} or another agg. + *
+ *+ * But all other aggregations run distributed. On many threads on each data node + * we run in {@link #INITIAL} mode to consume raw data and output just enough to + * finish the job later. All threads on a node dump the data into the same agg + * run in {@link #INTERMEDIATE} mode to perform "node reduction". Then, on the + * coordinating node, the outputs of the "node reduction" goes into the agg in + * {@link #FINAL} mode. + *
+ *+ * Put another way, all data must flow throw aggregations in one of these two sequences: + *
+ *+ * These function appear only in special places in the language that expect to take many inputs + * and produce one output per group key: + *
+ *+ * They always process many input rows to produce their values. If they are built + * without a {@code BY} they produce a single value as output. If they are built + * with a {@code BY} they produce one value per group key as output. + *
+ *+ * See {@link org.elasticsearch.compute.aggregation.AggregatorMode} for important + * information about their execution lifecycle. + *
*/ public abstract class AggregateFunction extends Function implements PostAnalysisPlanVerificationAware { public static final Literal NO_WINDOW = Literal.timeDuration(Source.EMPTY, Duration.ZERO); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/blockloader/BlockLoaderExpression.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/blockloader/BlockLoaderExpression.java index 9325c3fec9032..53d5039da5e1e 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/blockloader/BlockLoaderExpression.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/blockloader/BlockLoaderExpression.java @@ -9,21 +9,27 @@ import org.elasticsearch.compute.data.Block; import org.elasticsearch.core.Nullable; +import org.elasticsearch.index.mapper.BlockLoader; +import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.blockloader.BlockLoaderFunctionConfig; +import org.elasticsearch.index.mapper.blockloader.docvalues.fn.MvMaxBytesRefsFromOrdsBlockLoader; +import org.elasticsearch.index.mapper.blockloader.docvalues.fn.MvMaxLongsFromDocValuesBlockLoader; +import org.elasticsearch.index.mapper.blockloader.docvalues.fn.Utf8CodePointsFromOrdsBlockLoader; import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; +import org.elasticsearch.xpack.esql.expression.function.scalar.EsqlScalarFunction; import org.elasticsearch.xpack.esql.stats.SearchStats; /** * {@link Expression} that can be "pushed" into value loading. Most of the time * we load values into {@link Block}s and then run the expressions on them, but * sometimes it's worth short-circuiting this process and running the expression - * in the tight loop we use for loading: + * in the tight loop we use for loading values. *+ * See the docs for {@link EsqlScalarFunction} for how this optimization fits in with + * all the other optimizations we've implemented. + *
+ *
+ * Implement a {@link BlockLoader} for each fused code path. There's
+ * going to be a {@linkplain BlockLoader} per
+ * {@code
+ * If you wanted to push all loads for a function applied + * to a field type you'd need to optimize all paths which could include: + *
+ *+ * Unless you have a good reason to do otherwise, it's generally fine to start with + * {@code doc_values}. And it might be fine to only implement this fusion + * for {@code doc_values}. Usually, loading {@code stored} fields + * and loading from {@code _source} is so slow that this optimization won't buy you + * much speed proportionally. But this is only a rule of thumb. + * The first extraction push down we implemented violates the rule! It was directly + * to the search index for vector fields. + *
+ *+ * Note: The {@link Object#toString}s are important in these classes. We expose them + * over the {@code profile} API and use them for tests later on. + *
+ *+ * Build a randomized unit test that + *
+ *+ * See the test for {@link Utf8CodePointsFromOrdsBlockLoader} for an example. These tests + * are usually quite parameterized to make sure we cover things like: + *
+ *+ * These unit tests cover a ton of different configurations quickly, and we + * know that we're using the loader. + *
+ *+ * You must implement: + *
+ *+ * Implement {@link BlockLoaderExpression}. Generally it's enough to check that + * check if the function is being applied to a {@link FieldAttribute} and do something + * like: + *
+ *{@code
+ * if (field instanceof FieldAttribute f && f.dataType() == DataType.KEYWORD) {
+ * return new PushedBlockLoaderExpression(f, BlockLoaderFunctionConfig.Function.WHATEVER);
+ * }
+ * return null;
+ * }
+ * + * The rules system will check {@link MappedFieldType#supportsBlockLoaderConfig} for you. + * See the docs for {@link #tryPushToFieldLoading} for more on how to implement it. + *
+ *+ * Add a case or two to {@code PushExpressionToLoadIT} to prove that we've plugged + * everything in properly. These tests make sure that we're really loading the data + * really using your new {@linkplain BlockLoader}. This is where your nice + * {@link Object#toString}s come into play. That's the key into the profile map that + * shows that your new {@linkplain BlockLoader} is plugged in. + *
+ *+ * Look for your function in the csv-spec tests and make sure there are cases that + * contain your function processing each data type you are pushing. For each type, + * make sure the function processes the results of: + *
+ *+ * It's fairly likely we already have tests for all these cases. + * They are part of our standard practice for adding functions, but there are a lot + * of them, and we may have forgotten some. And, without the pushdown you are + * implementing, they are mostly there for healthy paranoia around rules and + * a hedge against mistakes implementing optimizations in the future. Like the + * optimization you are implementing now! + *
+ *+ * Anyway, once there are plenty of these tests you should run them via the ESQL + * unit tests and via the single-node integration tests. These tests don't prove + * that your new {@linkplain BlockLoader}s are plugged in. You have + * {@code PushExpressionToLoadIT} for that. Instead, they prove that, when your + * new {@linkplain BlockLoader} is plugged in, it produces + * correct output. So, just like your unit test, but integrated with the entire + * rest of the world. + *
+ *+ * Now that you can be pretty sure everything is plugged in and working you can + * get some performance numbers. It's generally good to start with a quick and + * dirty script. + * These should show you a performance improvement, and you can use the + * {@code profile} API as a final proof that everything is plugged in. Once that + * looks right you should generally be ok to open a PR. Attach the results of + * your bash script to prove that it's faster. + *
+ *+ * Next, look for a rally track + * that should improve with your PR. If you find one, and it's + * in the nightlies already, then you have a choice: + *
+ *+ * If the quick and dirty perf testing looked good you are probably safe waiting on + * the nightlies. You should look for them in + * benchmarks.elastic.co. + *
+ *+ * If there isn't already a rally operation then you should add one like this + * PR. How you add + * one of these and how you get it into the nightlies and whether it should be in + * the nightlies is outside the scope of this document. + *
*/ public interface BlockLoaderExpression { /** @@ -66,5 +250,9 @@ public interface BlockLoaderExpression { * @param field the field whose load we're fusing into * @param config the expression's configuration */ - record PushedBlockLoaderExpression(FieldAttribute field, BlockLoaderFunctionConfig config) {} + record PushedBlockLoaderExpression(FieldAttribute field, BlockLoaderFunctionConfig config) { + public PushedBlockLoaderExpression(FieldAttribute field, BlockLoaderFunctionConfig.Function function) { + this(field, new BlockLoaderFunctionConfig.JustFunction(function)); + } + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/EsqlScalarFunction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/EsqlScalarFunction.java index 85d15f82f458a..fd76147a899cd 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/EsqlScalarFunction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/EsqlScalarFunction.java @@ -7,23 +7,306 @@ package org.elasticsearch.xpack.esql.expression.function.scalar; +import org.elasticsearch.common.time.DateFormatter; +import org.elasticsearch.compute.ann.ConvertEvaluator; +import org.elasticsearch.compute.ann.Evaluator; +import org.elasticsearch.compute.ann.MvEvaluator; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.data.Vector; +import org.elasticsearch.compute.lucene.LuceneCountOperator; +import org.elasticsearch.compute.operator.topn.TopNOperator; +import org.elasticsearch.index.mapper.BlockLoader; import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.FoldContext; import org.elasticsearch.xpack.esql.core.expression.function.scalar.ScalarFunction; import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.evaluator.mapper.EvaluatorMapper; +import org.elasticsearch.xpack.esql.expression.function.blockloader.BlockLoaderExpression; +import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushTopNToSource; import java.util.List; /** - * A {@code ScalarFunction} is a {@code Function} that takes values from some - * operation and converts each to another value. An example would be - * {@code ABS()}, which takes one value at a time, applies a function to the - * value (abs) and returns a new value. + * A {@code ScalarFunction} is a {@code Function} that makes one output value per + * input row. It operates on a whole {@link Page} of inputs at a time, building + * a {@link Block} of results. + *+ * You see them in the language everywhere: + *
+ *
+ * Let's work the example of {@code CONCAT("foo ", message)}. It's called with a Page
+ * of inputs and resolves both of its parameters, yielding a constant block containing
+ * "foo " and a Block of strings containing {@code message}. It can expect to receive
+ * thousands of {@code message} values in that block. Then it builds and returns the block
+ * {@code "foo
{@code
+ * foo | message | result
+ * --- | ------- | ----------
+ * foo | bar | foo bar
+ * foo | longer | foo longer
+ * ... a thousand rows ...
+ * foo | baz | foo baz
+ * }
+ * + * It does this once per input Page. + *
** We have a guide for writing these in the javadoc for * {@link org.elasticsearch.xpack.esql.expression.function.scalar}. *
+ *+ * Scalars are a huge part of the language, and we have a ton of + * different classes of optimizations for them that exist on a performance spectrum: + *
+ *{@code
+ * Better Load Less and
+ * than O(rows) Run Faster Run Faster Page-at-a-time Tuple-at-a-time
+ * |----------------|-------------------------|------------------------------|-------------------|
+ * ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^
+ * CF LT ET FP BL MBL SE NO SIMD RR VD EVAL EVE CASE
+ * }
+ * {@code
+ * | EVAL a = CONCAT("some ", "words")
+ * }
+ * + * The fastest way to run a scalar, now and forever, is to run it at compile time. Turn it + * into a constant and propagate it throughout the query. This is called "constant folding" + * and all scalars, when their arguments are constants, are "folded" to a constant. + *
+ *{@code
+ * FROM index METADATA _score
+ * | WHERE title:"cat"
+ * | SORT _score DESC
+ * | LIMIT 10
+ * }
+ * {@code
+ * FROM index
+ * | EVAL distance = ST_DISTANCE(point, "POINT(12.5683 55.6761)")
+ * | SORT distance ASC
+ * | LIMIT 10
+ * }
+ * + * Fundamentally, Lucene is a tuple-at-a-time engine that flows the + * min-competitive + * sort key back into the index iteration process, allowing it to skip huge swaths of + * documents. It has quite a few optimizations that soften the blow of it being + * tuple-at-a-time, so these days "push to a lucene topn" is the fastest way you are going + * to run a scalar function. For that to work it has to be a {@code SORT} key and all the + * filters have to be pushable to lucene and lucene has to know how to run the function + * natively. See {@link PushTopNToSource}. + *
+ *{@code
+ * FROM index METADATA _score
+ * | WHERE title:"cat"
+ * | WHERE a < j + LENGTH(candy) // <--- anything un-pushable
+ * | SORT _score DESC
+ * | LIMIT 10
+ * }
+ * + * If ESQL's {@link TopNOperator} exposed the min-competitive information (see above), and + * we fed it back into the lucene query operators then we too could do better than + * {@code O(matching_rows)} for queries sorting on the results of a scalar. This is like + * the {@code LT} but without as many limitations. Lucene has a 20-year head start on us + * optimizing TopN, so we should continue to use them when + * See issue. + *
+ *{@code
+ * FROM index
+ * | EVAL s = V_COSINE(dense_vector, [0, 1, 2])
+ * | SORT s desc
+ * | LIMIT 10
+ * }
+ * {@code
+ * FROM index
+ * | STATS SUM(LENGTH(message)) // Length is pushed to the BlockLoader
+ * }
+ * + * Some functions can take advantage of the on-disk structures to run very fast and should be + * "fused" into field loading using {@link BlockLoaderExpression}. Functions like {@code V_COSINE} + * can use the vector search index to compute the result. Functions like {@code MV_MIN} can + * use the {@code doc_values} encoding mechanism to save a ton of work. Functions like the + * upcoming {@code ST_SIMPLIFY} benefit from this by saving huge numbers of allocations even + * if they can't link into the {@code doc_values} format. We do this by building a + * {@link BlockLoader} for each {@code FUNCTION x FIELD_TYPE x storage mechanism} combination + * so we can get as much speed as possible. + *
+ *{@code
+ * FROM index
+ * | STATS SUM(LENGTH(message)), // All of these are pushed to a single BlockLoader
+ * SUM(SUBSTRING(message, 0, 4)),
+ * BY trail = SUBSTRING(message, 10, 3)
+ * }
+ * + * Pushing functions to a {@link BlockLoader} can involve building a ton + * of distinct {@link BlockLoader}s. Which involves a ton of code and testing and, well, work. + * But it's worth it if you are applying a single function to a field and every single cycle + * counts. Both of these cry out for a more OO-style solution where you build a "mother ship" + * {@linkplain BlockLoader} that operates on, say {@code FIELD_TYPE x storage mechanism} and + * then runs a list of {@code FUNCTION} operations. In some cases this is a bad idea, which + * is why we haven't built it yet. But in plenty of cases it's fine. And, sometimes, we should + * be fine skipping the special purpose block loader in favor of the mother ship. We'd spent + * a few more cycles on each load, but the maintenance advantage is likely worth it for some + * functions. + *
+ *+ * ESQL evaluates whole pages at once, generally walking a couple of arrays in parallel building + * a result array. This makes which bits are the "hot path" very obvious - they are the loops + * that walk these arrays. We put the "slower" stuff outside those loops: + *
+ *+ * In Elasticsearch it's normal for fields to sometimes be {@code null} or multivalued. + * There are no constraints on the schema preventing this and, as a search engine, it's + * pretty normal to model things as multivalued fields. We rarely know that a field can + * only be single-valued when we're planning a query. + *
+ *+ * It's much faster to run a scalar when we know that all of its inputs + * are single valued and non-null. So every scalar function that uses the code generation + * keyed by the {@link Evaluator}, {@link ConvertEvaluator}, and {@link MvEvaluator} + * annotations builds two paths: + *
+ *{@code
+ * FROM index
+ * | STATS MAX(foo) BY TO_UPPER(verb)
+ * }
+ * + * {@code keyword} and {@code ip} fields load their {@code byte[]} shaped values as a + * lookup table, called "ordinals" because Lucene uses that word for it. Some of our functions, + * like {@code TO_UPPER}, process the lookup table itself instead of processing each position. + * This is especially important when grouping on the field because the hashing done by the + * aggregation code also operates on the lookup table. + *
+ *{@code
+ * FROM index
+ * | STATS SUM(MV_DEDUPE(file_size))
+ * }
+ * + * Some functions can operate on multivalued fields much faster if their inputs are sorted. And + * inputs loaded from {@code doc_values} are sorted by default. Sometimes even sorted AND + * deduplicated. We store this information on each block in {@link Block.MvOrdering}. + *
+ *+ * NOTE: Functions that can take advantage of this sorting also tend to be NOOPs for + * single-valued inputs. So they benefit hugely from "Vector Dispatch". + *
+ *{@code
+ * FROM index
+ * | STATS MAX(lhs + rhs)
+ * }
+ * + * Through a combination of "Page-at-a-time evaluation", and "Vector Dispatch" we often + * end up with at least one path that can be turned into a sequence of + * SIMD + * instructions. These are about as fast as you can go and still be `O(matching_rows)`. + * A lot of scalars don't lend themselves perfectly to SIMD, but we make sure those + * that do can take that route. + *
+ *{@code
+ * FROM index
+ * | STATS COUNT(*) BY DATE_TRUNC(1 DAY, @timestamp)
+ * }
+ * + * Functions like {@code DATE_TRUNC} can be quite slow, especially when they are using a + * time zone. It can be much faster if it knows the range of dates that it's operating on. + * And we do know that on the data node! We use that information to rewrite the possibly-slow + * {@code DATE_TRUNC} to the always fast {@code ROUND_TO}, which rounds down to fixed rounding + * points. + *
+ *+ * At the moment this is only done for {@code DATE_TRUNC} which is a very common function, + * but is technically possible for anything that could benefit from knowing the range up front. + *
+ *{@code
+ * FROM index
+ * | STATS COUNT(*) BY DATE_TRUNC(1 DAY, @timestamp)
+ * }
+ * + * If the "Range Rewrite" optimization works, we can sometimes further push the resulting + * {@code ROUND_TO} into a sequence of filters. If you are just counting + * documents then this can use the {@link LuceneCountOperator} which can count the number of + * matching documents directly from the cache, technically being faster than + * {@code O(num_hits)}, but only in ideal circumstances. If we can't push the count then it's + * still very very fast. See PR. + *
+ *{@code
+ * FROM index
+ * | EVAL ts = DATE_PARSE(SUBSTRING(message, 1, 10), date_format_from_the_index)
+ * }
+ * + * Functions like {@code DATE_PARSE} need to build something "expensive" per input row, like + * a {@link DateFormatter}. But, often, the expensive thing is constant. In the example above + * the date format comes from the index, but that's quite contrived. These functions generally + * run in the form: + *
+ *{@code
+ * FROM index
+ * | EVAL ts = DATE_PARSE(SUBSTRING(message, 1, 10), "ISO8601")
+ * }
+ * + * These generally have special case evaluators that don't construct the format for each row. + * The others are "expensive variable evaluators" and we avoid them when we can. + *
+ *{@code
+ * FROM index
+ * | EVAL f = CASE(d > 0, n / d, 0)
+ * }
+ * {@code
+ * FROM index
+ * | EVAL f = COALESCE(d, 1 / j)
+ * }
+ * + * {@code CASE} and {@code COALESCE} short circuit. In the top example above, that + * means we don't run {@code n / d} unless {@code d > 0}. That prevents us from + * emitting warnings for dividing by 0. In the second example, we don't run {@code 1 / j} + * unless {@code d} is null. In the worst case, we manage this by running row-by-row + * which is super slow. Especially because the engine was designed + * for page-at-a-time execution. + *
+ *+ * In the best case {@code COALESCE} can see that an input is either all-null or + * all-non-null. Then it never falls back to row-by-row evaluation and is quite fast. + *
+ *+ * {@code CASE} has a similar optimization: For each incoming {@link Page}, if the + * condition evaluates to a constant, then it executes the corresponding "arm" + * Page-at-a-time. Also! If the "arms" are "fast" and can't throw warnings, then + * {@code CASE} can execute "eagerly" - evaluating all three arguments and just + * plucking values back and forth. The "eager" {@code CASE} evaluator is effectively + * the same as any other page-at-a-time evaluator. + *
*/ public abstract class EsqlScalarFunction extends ScalarFunction implements EvaluatorMapper { protected EsqlScalarFunction(Source source) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/multivalue/MvMax.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/multivalue/MvMax.java index e25d1662f9cae..98b79ca438ad0 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/multivalue/MvMax.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/multivalue/MvMax.java @@ -13,14 +13,18 @@ import org.elasticsearch.compute.ann.MvEvaluator; import org.elasticsearch.compute.operator.EvalOperator; import org.elasticsearch.compute.operator.EvalOperator.ExpressionEvaluator; +import org.elasticsearch.index.mapper.blockloader.BlockLoaderFunctionConfig; import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException; import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; import org.elasticsearch.xpack.esql.core.tree.NodeInfo; import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.expression.function.Example; import org.elasticsearch.xpack.esql.expression.function.FunctionInfo; import org.elasticsearch.xpack.esql.expression.function.Param; +import org.elasticsearch.xpack.esql.expression.function.blockloader.BlockLoaderExpression; import org.elasticsearch.xpack.esql.planner.PlannerUtils; +import org.elasticsearch.xpack.esql.stats.SearchStats; import java.io.IOException; import java.util.List; @@ -31,7 +35,7 @@ /** * Reduce a multivalued field to a single valued field containing the maximum value. */ -public class MvMax extends AbstractMultivalueFunction { +public class MvMax extends AbstractMultivalueFunction implements BlockLoaderExpression { public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "MvMax", MvMax::new); @FunctionInfo( @@ -129,4 +133,12 @@ static long process(long current, long v) { static int ascendingIndex(int count) { return count - 1; } + + @Override + public PushedBlockLoaderExpression tryPushToFieldLoading(SearchStats stats) { + if (field instanceof FieldAttribute f) { + return new PushedBlockLoaderExpression(f, BlockLoaderFunctionConfig.Function.MV_MAX); + } + return null; + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/multivalue/MvMin.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/multivalue/MvMin.java index 75590d5d8b43a..791ab7c4b2301 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/multivalue/MvMin.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/multivalue/MvMin.java @@ -13,14 +13,18 @@ import org.elasticsearch.compute.ann.MvEvaluator; import org.elasticsearch.compute.operator.EvalOperator; import org.elasticsearch.compute.operator.EvalOperator.ExpressionEvaluator; +import org.elasticsearch.index.mapper.blockloader.BlockLoaderFunctionConfig; import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException; import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; import org.elasticsearch.xpack.esql.core.tree.NodeInfo; import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.expression.function.Example; import org.elasticsearch.xpack.esql.expression.function.FunctionInfo; import org.elasticsearch.xpack.esql.expression.function.Param; +import org.elasticsearch.xpack.esql.expression.function.blockloader.BlockLoaderExpression; import org.elasticsearch.xpack.esql.planner.PlannerUtils; +import org.elasticsearch.xpack.esql.stats.SearchStats; import java.io.IOException; import java.util.List; @@ -31,7 +35,7 @@ /** * Reduce a multivalued field to a single valued field containing the minimum value. */ -public class MvMin extends AbstractMultivalueFunction { +public class MvMin extends AbstractMultivalueFunction implements BlockLoaderExpression { public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "MvMin", MvMin::new); @FunctionInfo( @@ -129,4 +133,12 @@ static long process(long current, long v) { static int ascendingIndex(int count) { return 0; } + + @Override + public PushedBlockLoaderExpression tryPushToFieldLoading(SearchStats stats) { + if (field instanceof FieldAttribute f) { + return new PushedBlockLoaderExpression(f, BlockLoaderFunctionConfig.Function.MV_MIN); + } + return null; + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/package-info.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/package-info.java new file mode 100644 index 0000000000000..2ce6017991bbf --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/package-info.java @@ -0,0 +1,22 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +/** + * {@link org.elasticsearch.xpack.esql.core.expression.Expression Expressions} process values + * to make more values. There are two kinds: + *