Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
5a06fcc
REST tests for percentiles_bucket agg
nik9000 Jun 24, 2022
bc37bb1
ESQL: Fush MV_MIN and MV_MAX into field loading
nik9000 Nov 10, 2025
6581d4e
Merge branch 'main' into esql_fuse_next
nik9000 Nov 11, 2025
d485863
explain
nik9000 Nov 11, 2025
9e677d3
Merge branch 'main' into esql_fuse_next
nik9000 Nov 12, 2025
2ff3ca3
More
nik9000 Nov 12, 2025
3c46443
Instructions for BlockLoader
nik9000 Nov 13, 2025
f7517f4
Docs
nik9000 Nov 13, 2025
cd46d68
Update docs/changelog/138029.yaml
nik9000 Nov 13, 2025
3e1ea1b
Merge branch 'main' into esql_fuse_next
nik9000 Nov 17, 2025
4962474
words
nik9000 Nov 18, 2025
c32fe1a
Merge branch 'main' into esql_fuse_next
nik9000 Nov 18, 2025
6b9962b
Fix merge
nik9000 Nov 19, 2025
e80d609
Merge branch 'main' into esql_fuse_next
nik9000 Nov 20, 2025
1b65487
Merge branch 'main' into esql_fuse_next
nik9000 Nov 21, 2025
91bcf0b
Merge branch 'main' into esql_fuse_next
nik9000 Nov 24, 2025
5273bdb
More tests
nik9000 Nov 24, 2025
ad33027
Merge branch 'main' into esql_fuse_next
nik9000 Nov 24, 2025
a2c7425
Merge remote-tracking branch 'nik9000/esql_fuse_next' into esql_fuse_…
nik9000 Nov 24, 2025
741f80a
Merge branch 'main' into esql_fuse_next
nik9000 Nov 24, 2025
7e78824
Merge branch 'main' into esql_fuse_next
nik9000 Nov 25, 2025
3806255
Merge branch 'main' into esql_fuse_next
nik9000 Nov 25, 2025
94cb7ca
Merge remote-tracking branch 'nik9000' into esql_fuse_next
nik9000 Nov 25, 2025
36a4c3a
Merge remote-tracking branch 'nik9000/esql_fuse_next' into esql_fuse_…
nik9000 Nov 25, 2025
5b80d78
Remove extra
nik9000 Nov 25, 2025
bd86626
Merge branch 'main' into esql_fuse_next
nik9000 Nov 25, 2025
a61e7a7
fixup
nik9000 Nov 25, 2025
bdcf316
Merge branch 'main' into esql_fuse_next
nik9000 Nov 25, 2025
c4008e8
Merge branch 'main' into esql_fuse_next
nik9000 Nov 25, 2025
979c072
Merge branch 'main' into esql_fuse_next
nik9000 Nov 25, 2025
57a2730
Merge remote-tracking branch 'nik9000/esql_fuse_next' into esql_fuse_…
nik9000 Nov 25, 2025
35e7cdc
Undo accident
nik9000 Nov 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/138029.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 138029
summary: Fuse MV_MIN and MV_MAX and document process
area: ES|QL
type: feature
issues: []
3 changes: 3 additions & 0 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,9 @@ tests:
- class: org.elasticsearch.xpack.esql.heap_attack.HeapAttackLookupJoinIT
method: testLookupExplosionBigString
issue: https://github.com/elastic/elasticsearch/issues/138510
- class: org.elasticsearch.xpack.esql.qa.single_node.GenerativeForkIT
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this intended to be muted?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Sorry. Had a pending comment explaining it that I hadn't posted.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those queries are now "optimized incorrectly". I've got one fix for the rule in #138544 so I'm going to get that and this in and work on that query. I'm calling this mute another thing that block release of the field fusion work.

method: test {csv-spec:inlinestats.MvMinMvExpand}
issue: https://github.com/elastic/elasticsearch/issues/137679
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is tracked in the meta-issue blocking deployment of pushing functions to field loads: #137679

- class: org.elasticsearch.xpack.esql.optimizer.rules.physical.local.SubstituteRoundToTests
method: testSubqueryWithCountStarAndDateTrunc {default}
issue: https://github.com/elastic/elasticsearch/issues/138601
Expand Down
162 changes: 158 additions & 4 deletions server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.index.mapper.blockloader.docvalues.BlockDocValuesReader;
import org.elasticsearch.index.mapper.blockloader.docvalues.fn.MvMaxLongsFromDocValuesBlockLoader;
import org.elasticsearch.index.mapper.blockloader.docvalues.fn.MvMinLongsFromDocValuesBlockLoader;
import org.elasticsearch.search.fetch.StoredFieldsSpec;
import org.elasticsearch.search.lookup.Source;

Expand All @@ -25,8 +27,139 @@
import java.util.Map;

/**
* Interface for loading data in a block shape. Instances of this class
* must be immutable and thread safe.
* Loads values from a chunk of lucene documents into a "Block" for the compute engine.
* <p>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My past self would have loved this. Thanks for the comments, will help my future self when my present self forgets about this.

* Think of a Block as an array of values for a sequence of lucene documents. That's
* almost true! For the purposes of implementing {@link BlockLoader}, it's close enough.
* The compute engine operates on arrays because the good folks that build CPUs have
* spent the past 40 years making them really really good at running tight loops over
* arrays of data. So we play along with the CPU and make arrays.
* </p>
* <h2>How to implement</h2>
* <p>
* There are a lot of interesting choices hiding in here to make getting those arrays
* out of lucene work well:
* </p>
* <ul>
* <li>
* {@code doc_values} are already on disk in array-like structures so we prefer
* to just copy them into an array in one loop inside {@link ColumnAtATimeReader}.
* Well, not entirely array-like. {@code doc_values} are designed to be read in
* non-descending order (think {@code 0, 1, 1, 4, 9}) and will fail if they are
* read truly randomly. This lets the doc values implementations have some
* chunking/compression/magic on top of the array-like on disk structure. The
* caller manages this, always putting {@link Docs} in non-descending order.
* Extend {@link BlockDocValuesReader} to implement all this.
* </li>
* <li>
* All stored {@code stored} fields for each document are stored on disk together,
* compressed with a general purpose compression algorithm like
* <a href="https://en.wikipedia.org/wiki/Zstd">Zstd</a>. Blocks of documents are
* compressed together to get a better compression ratio. Just like doc values,
* we read them in non-descending order. Unlike doc values, we read all fields for a
* document at once. Because reading one requires decompressing them all. We do
* this by returning {@code null} from {@link BlockLoader#columnAtATimeReader}
* to signal that we can't load the whole column at once. Instead, we implement a
* {@link RowStrideReader} which the caller will call once for each doc. Extend
* {@link BlockStoredFieldsReader} to implement all this.
* </li>
* <li>
* Fields loaded from {@code _source} are an extra special case of {@code stored}
* fields. {@code _source} itself is just another stored field, compressed in chunks
* with all the other stored fields. It's the original bytes sent when indexing the
* document. Think {@code json} or {@code yaml}. When we need fields from
* {@code _source} we get it from the stored fields reader infrastructure and then
* explode it into a {@link Map} representing the original {@code json} and
* the {@link RowStrideReader} implementation grabs the parts of the {@code json}
* it needs. Extend {@link BlockSourceReader} to implement all this.
* </li>
* <li>
* Synthetic {@code _source} complicates this further by storing fields in somewhat
* unexpected places, but is otherwise like a {@code stored} field reader. Use
* {@link FallbackSyntheticSourceBlockLoader} to implement all this.
* </li>
* </ul>
* <h2>How many to implement</h2>
* <p>
* Generally reads are faster from {@code doc_values}, slower from {@code stored} fields,
* and even slower from {@code _source}. If we get to chose, we pick {@code doc_values}.
* But we work with what's on disk and that's a product of the field type and what the user's
* configured. Picking the optimal choice given what's on disk is the responsibility of each
* field's {@link MappedFieldType#blockLoader} method. The more configurable the field's
* storage strategies the more {@link BlockLoader}s you have to implement to integrate it
* with ESQL. It can get to be a lot. Sorry.
* </p>
* <p>
* For a field to be supported by ESQL fully it has to be loadable if it was configured to be
* stored in any way. It's possible to turn off storage entirely by turning off
* {@code doc_values} and {@code _source} and {@code stored} fields. In that case, it's
* acceptable to return {@link ConstantNullsReader}. User turned the field off, best we can do
* is {@code null}.
* </p>
* <p>
* We also sometimes want to "push" executing some ESQL functions into the block loader itself.
* Usually we do this when it's a ton faster. See the docs for {@code BlockLoaderExpression}
* for why and how we do this.
* </p>
* <p>
* For example, {@code long} fields implement these block loaders:
* </p>
* <ul>
* <li>
* {@link org.elasticsearch.index.mapper.blockloader.docvalues.LongsBlockLoader} to read
* from {@code doc_values}.
* </li>
* <li>
* {@link org.elasticsearch.index.mapper.BlockSourceReader.LongsBlockLoader} to read from
* {@code _source}.
* </li>
* <li>
* A specially configured {@link FallbackSyntheticSourceBlockLoader} to read synthetic
* {@code _source}.
* </li>
* <li>
* {@link MvMinLongsFromDocValuesBlockLoader} to read {@code MV_MIN(long_field)} from
* {@code doc_values}.
* </li>
* <li>
* {@link MvMaxLongsFromDocValuesBlockLoader} to read {@code MV_MAX(long_field)} from
* {@code doc_values}.
* </li>
* </ul>
* <p>
* NOTE: We can't read from {@code long}s from {@code stored} fields which is a
* <a href="https://github.com/elastic/elasticsearch/issues/138019">bug</a>, but maybe not
* a terrible one because it's very uncommon to configure {@code long} to be {@code stored}
* but to disable {@code _source} and {@code doc_values}. Nothing's perfect. Especially
* code.
* </p>
* <h2>Why is {@link AllReader}?</h2>
* <p>
* When we described how to read from {@code doc_values} we said we <strong>prefer</strong>
* to use {@link ColumnAtATimeReader}. But some callers don't support reading column-at-a-time
* and need to read row-by-row. So we also need an implementation of {@link RowStrideReader}
* that reads from {@code doc_values}. Usually it's most convenient to implement both of those
* in the same {@code class}. {@link AllReader} is an interface for those sorts of classes, and
* you'll see it in the {@code doc_values} code frequently.
* </p>
* <h2>Why is {@link #rowStrideStoredFieldSpec}?</h2>
* <p>
* When decompressing {@code stored} fields lucene can skip stored field that aren't used. They
* still have to be decompressed, but they aren't turned into java objects which saves a fair bit
* of work. If you don't need any stored fields return {@link StoredFieldsSpec#NO_REQUIREMENTS}.
* Otherwise, return what you need.
* </p>
* <h2>Thread safety</h2>
* <p>
* Instances of this class must be immutable and thread safe. Instances of
* {@link ColumnAtATimeReader} and {@link RowStrideReader} are all mutable and can only
* be accessed by one thread at a time but <strong>may</strong> be passed between threads.
* See implementations {@link Reader#canReuse} for how that's handled. "Normal" java objects
* don't need to do anything special to be kicked from thread to thread - the transfer itself
* establishes a {@code happens-before} relationship that makes everything you need visible.
* But Lucene's readers aren't "normal" java objects and sometimes need to be rebuilt if we
* shift threads.
* </p>
*/
public interface BlockLoader {
/**
Expand Down Expand Up @@ -115,10 +248,26 @@ interface StoredFields {
Map<String, List<Object>> storedFields() throws IOException;
}

/**
* Build a column-at-a-time reader. <strong>May</strong> return {@code null}
* if the underlying storage needs to be loaded row-by-row. Callers should try
* this first, only falling back to {@link #rowStrideReader} if this returns
* {@code null} or if they can't load column-at-a-time themselves.
*/
@Nullable
ColumnAtATimeReader columnAtATimeReader(LeafReaderContext context) throws IOException;

/**
* Build a row-by-row reader. Must <strong>never</strong> return {@code null},
* evan if the underlying storage prefers to be loaded column-at-a-time. Some
* callers simply can't load column-at-a-time so all implementations must support
* this method.
*/
RowStrideReader rowStrideReader(LeafReaderContext context) throws IOException;

/**
* What {@code stored} fields are needed by this reader.
*/
StoredFieldsSpec rowStrideStoredFieldSpec();

/**
Expand Down Expand Up @@ -540,8 +689,13 @@ Block buildExponentialHistogramBlockDirect(
}

/**
* Marker interface for block results. The compute engine has a fleshed
* out implementation.
* A columnar representation of homogenous data. It has a position (row) count, and
* various data retrieval methods for accessing the underlying data that is stored at a given
* position. In other words, a fancy wrapper over an array.
* <p>
* <strong>This</strong> is just a marker interface for these results. The compute engine
* has fleshed out implementations.
* </p>
*/
interface Block extends Releasable {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@
import org.elasticsearch.index.fielddata.FieldDataContext;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.fielddata.plain.SortedSetOrdinalsIndexFieldData;
import org.elasticsearch.index.mapper.blockloader.BlockLoaderFunctionConfig;
import org.elasticsearch.index.mapper.blockloader.docvalues.BytesRefsFromOrdsBlockLoader;
import org.elasticsearch.index.mapper.blockloader.docvalues.fn.MvMaxBytesRefsFromOrdsBlockLoader;
import org.elasticsearch.index.mapper.blockloader.docvalues.fn.MvMinBytesRefsFromOrdsBlockLoader;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.script.IpFieldScript;
import org.elasticsearch.script.Script;
Expand Down Expand Up @@ -457,7 +460,15 @@ public static Query rangeQuery(
@Override
public BlockLoader blockLoader(BlockLoaderContext blContext) {
if (hasDocValues() && (blContext.fieldExtractPreference() != FieldExtractPreference.STORED || isSyntheticSource)) {
return new BytesRefsFromOrdsBlockLoader(name());
BlockLoaderFunctionConfig cfg = blContext.blockLoaderFunctionConfig();
if (cfg == null) {
return new BytesRefsFromOrdsBlockLoader(name());
}
return switch (cfg.function()) {
case MV_MAX -> new MvMaxBytesRefsFromOrdsBlockLoader(name());
case MV_MIN -> new MvMinBytesRefsFromOrdsBlockLoader(name());
default -> throw new UnsupportedOperationException("unknown fusion config [" + cfg.function() + "]");
};
}

if (isStored()) {
Expand All @@ -475,6 +486,17 @@ public BlockLoader blockLoader(BlockLoaderContext blContext) {
return new BlockSourceReader.IpsBlockLoader(sourceValueFetcher(blContext), lookup);
}

@Override
public boolean supportsBlockLoaderConfig(BlockLoaderFunctionConfig config, FieldExtractPreference preference) {
if (hasDocValues() && (preference != FieldExtractPreference.STORED || isSyntheticSource)) {
return switch (config.function()) {
case MV_MAX, MV_MIN -> true;
default -> false;
};
}
return true;
}

private BlockLoader blockLoaderFromFallbackSyntheticSource(BlockLoaderContext blContext) {
var reader = new IpFallbackSyntheticSourceReader(nullValue);
return new FallbackSyntheticSourceBlockLoader(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
import org.elasticsearch.index.fielddata.plain.SortedSetOrdinalsIndexFieldData;
import org.elasticsearch.index.mapper.blockloader.BlockLoaderFunctionConfig;
import org.elasticsearch.index.mapper.blockloader.docvalues.BytesRefsFromOrdsBlockLoader;
import org.elasticsearch.index.mapper.blockloader.docvalues.fn.MvMaxBytesRefsFromOrdsBlockLoader;
import org.elasticsearch.index.mapper.blockloader.docvalues.fn.MvMinBytesRefsFromOrdsBlockLoader;
import org.elasticsearch.index.mapper.blockloader.docvalues.fn.Utf8CodePointsFromOrdsBlockLoader;
import org.elasticsearch.index.query.AutomatonQueryWithDescription;
import org.elasticsearch.index.query.SearchExecutionContext;
Expand Down Expand Up @@ -732,10 +734,13 @@ public BlockLoader blockLoader(BlockLoaderContext blContext) {
if (cfg == null) {
return new BytesRefsFromOrdsBlockLoader(name());
}
if (cfg.function() == BlockLoaderFunctionConfig.Function.LENGTH) {
return new Utf8CodePointsFromOrdsBlockLoader(((BlockLoaderFunctionConfig.JustWarnings) cfg).warnings(), name());
}
throw new UnsupportedOperationException("unknown fusion config [" + cfg.function() + "]");
return switch (cfg.function()) {
case LENGTH -> new Utf8CodePointsFromOrdsBlockLoader(((BlockLoaderFunctionConfig.JustWarnings) cfg).warnings(), name());
case MV_MAX -> new MvMaxBytesRefsFromOrdsBlockLoader(name());
case MV_MIN -> new MvMinBytesRefsFromOrdsBlockLoader(name());
default -> throw new UnsupportedOperationException("unknown fusion config [" + cfg.function() + "]");
};

}
if (blContext.blockLoaderFunctionConfig() != null) {
throw new UnsupportedOperationException("function fusing only supported for doc values");
Expand Down Expand Up @@ -765,7 +770,10 @@ public Builder builder(BlockFactory factory, int expectedCount) {
@Override
public boolean supportsBlockLoaderConfig(BlockLoaderFunctionConfig config, FieldExtractPreference preference) {
if (hasDocValues() && (preference != FieldExtractPreference.STORED || isSyntheticSourceEnabled())) {
return config.function() == BlockLoaderFunctionConfig.Function.LENGTH;
return switch (config.function()) {
case LENGTH, MV_MAX, MV_MIN -> true;
default -> false;
};
}
return false;
}
Expand Down
Loading