diff --git a/server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java b/server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java index dc018cc368363..61e17c8ebf9b5 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java @@ -535,6 +535,8 @@ Block buildExponentialHistogramBlockDirect( Block zeroThresholds, Block encodedHistograms ); + + Block buildTDigestBlockDirect(Block encodedDigests, Block minima, Block maxima, Block sums, Block valueCounts); } /** @@ -697,4 +699,16 @@ interface ExponentialHistogramBuilder extends Builder { BytesRefBuilder encodedHistograms(); } + + interface TDigestBuilder extends Builder { + DoubleBuilder minima(); + + DoubleBuilder maxima(); + + DoubleBuilder sums(); + + LongBuilder valueCounts(); + + BytesRefBuilder encodedDigests(); + } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/TDigestState.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/TDigestState.java index 38aea6a6d4eb4..cfaa712648d2e 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/TDigestState.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/TDigestState.java @@ -84,7 +84,7 @@ public static TDigestState create(CircuitBreaker breaker, double compression) { } } - static TDigestState createOfType(CircuitBreaker breaker, Type type, double compression) { + public static TDigestState createOfType(CircuitBreaker breaker, Type type, double compression) { breaker.addEstimateBytesAndMaybeBreak(SHALLOW_SIZE, "tdigest-state-create-with-type"); try { return new TDigestState(breaker, type, compression); diff --git a/test/framework/src/main/java/org/elasticsearch/index/mapper/TestBlock.java b/test/framework/src/main/java/org/elasticsearch/index/mapper/TestBlock.java index 0a4e6812ac2b5..2f613b1fbda10 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/mapper/TestBlock.java +++ b/test/framework/src/main/java/org/elasticsearch/index/mapper/TestBlock.java @@ -517,6 +517,42 @@ public BlockLoader.Block buildExponentialHistogramBlockDirect( encodedHistograms ); } + + @Override + public BlockLoader.Block buildTDigestBlockDirect( + BlockLoader.Block encodedDigests, + BlockLoader.Block minima, + BlockLoader.Block maxima, + BlockLoader.Block sums, + BlockLoader.Block valueCounts + ) { + TestBlock minBlock = (TestBlock) minima; + TestBlock maxBlock = (TestBlock) maxima; + TestBlock sumBlock = (TestBlock) sums; + TestBlock countBlock = (TestBlock) valueCounts; + TestBlock digestBlock = (TestBlock) encodedDigests; + + assert minBlock.size() == digestBlock.size(); + assert maxBlock.size() == digestBlock.size(); + assert sumBlock.size() == digestBlock.size(); + assert countBlock.size() == digestBlock.size(); + + var values = new ArrayList<>(minBlock.size()); + + for (int i = 0; i < minBlock.size(); i++) { + // we need to represent this complex block somehow + HashMap value = new HashMap<>(); + value.put("min", minBlock.values.get(i)); + value.put("max", maxBlock.values.get(i)); + value.put("sum", sumBlock.values.get(i)); + value.put("value_count", countBlock.values.get(i)); + value.put("encoded_digest", digestBlock.values.get(i)); + + values.add(value); + } + + return new TestBlock(values); + } }; } diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/mapper/TDigestBlockLoader.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/mapper/TDigestBlockLoader.java new file mode 100644 index 0000000000000..c4123de75963b --- /dev/null +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/mapper/TDigestBlockLoader.java @@ -0,0 +1,128 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.analytics.mapper; + +import org.apache.lucene.index.LeafReaderContext; +import org.elasticsearch.core.Releasables; +import org.elasticsearch.index.mapper.blockloader.docvalues.BlockDocValuesReader; +import org.elasticsearch.index.mapper.blockloader.docvalues.BytesRefsFromBinaryBlockLoader; +import org.elasticsearch.index.mapper.blockloader.docvalues.DoublesBlockLoader; +import org.elasticsearch.index.mapper.blockloader.docvalues.LongsBlockLoader; + +import java.io.IOException; + +public class TDigestBlockLoader extends BlockDocValuesReader.DocValuesBlockLoader { + private final DoublesBlockLoader minimaLoader; + private final DoublesBlockLoader maximaLoader; + private final DoublesBlockLoader sumsLoader; + private final LongsBlockLoader valueCountsLoader; + private final BytesRefsFromBinaryBlockLoader encodedDigestLoader; + + public TDigestBlockLoader( + BytesRefsFromBinaryBlockLoader encodedDigestLoader, + DoublesBlockLoader minimaLoader, + DoublesBlockLoader maximaLoader, + DoublesBlockLoader sumsLoader, + LongsBlockLoader valueCountsLoader + ) { + this.encodedDigestLoader = encodedDigestLoader; + this.minimaLoader = minimaLoader; + this.maximaLoader = maximaLoader; + this.sumsLoader = sumsLoader; + this.valueCountsLoader = valueCountsLoader; + } + + @Override + public AllReader reader(LeafReaderContext context) throws IOException { + AllReader encodedDigestReader = encodedDigestLoader.reader(context); + AllReader minimaReader = minimaLoader.reader(context); + AllReader maximaReader = maximaLoader.reader(context); + AllReader sumsReader = sumsLoader.reader(context); + AllReader valueCountsReader = valueCountsLoader.reader(context); + + return new TDigestReader(encodedDigestReader, minimaReader, maximaReader, sumsReader, valueCountsReader); + } + + @Override + public Builder builder(BlockFactory factory, int expectedCount) { + return null; + } + + static class TDigestReader implements AllReader { + + private final AllReader encodedDigestReader; + private final AllReader minimaReader; + private final AllReader maximaReader; + private final AllReader sumsReader; + private final AllReader valueCountsReader; + + TDigestReader( + AllReader encodedDigestReader, + AllReader minimaReader, + AllReader maximaReader, + AllReader sumsReader, + AllReader valueCountsReader + ) { + this.encodedDigestReader = encodedDigestReader; + this.minimaReader = minimaReader; + this.maximaReader = maximaReader; + this.sumsReader = sumsReader; + this.valueCountsReader = valueCountsReader; + } + + @Override + public boolean canReuse(int startingDocID) { + return minimaReader.canReuse(startingDocID) + && maximaReader.canReuse(startingDocID) + && sumsReader.canReuse(startingDocID) + && valueCountsReader.canReuse(startingDocID) + && encodedDigestReader.canReuse(startingDocID); + } + + @Override + // Column oriented reader + public Block read(BlockFactory factory, Docs docs, int offset, boolean nullsFiltered) throws IOException { + Block minima = null; + Block maxima = null; + Block sums = null; + Block valueCounts = null; + Block encodedBytes = null; + Block result; + boolean success = false; + try { + minima = minimaReader.read(factory, docs, offset, nullsFiltered); + maxima = maximaReader.read(factory, docs, offset, nullsFiltered); + sums = sumsReader.read(factory, docs, offset, nullsFiltered); + valueCounts = valueCountsReader.read(factory, docs, offset, nullsFiltered); + encodedBytes = encodedDigestReader.read(factory, docs, offset, nullsFiltered); + result = factory.buildTDigestBlockDirect(encodedBytes, minima, maxima, sums, valueCounts); + success = true; + } finally { + if (success == false) { + Releasables.close(minima, maxima, sums, valueCounts, encodedBytes); + } + } + return result; + } + + @Override + public void read(int docId, StoredFields storedFields, Builder builder) throws IOException { + ExponentialHistogramBuilder histogramBuilder = (ExponentialHistogramBuilder) builder; + minimaReader.read(docId, storedFields, histogramBuilder.minima()); + maximaReader.read(docId, storedFields, histogramBuilder.maxima()); + sumsReader.read(docId, storedFields, histogramBuilder.sums()); + valueCountsReader.read(docId, storedFields, histogramBuilder.valueCounts()); + encodedDigestReader.read(docId, storedFields, histogramBuilder.encodedHistograms()); + } + + @Override + public String toString() { + return "BlockDocValuesReader.TDigest"; + } + } +} diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/mapper/TDigestFieldMapper.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/mapper/TDigestFieldMapper.java index a6de1a5897541..2f331ad93d3cf 100644 --- a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/mapper/TDigestFieldMapper.java +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/mapper/TDigestFieldMapper.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.FeatureFlag; +import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.index.fielddata.FieldDataContext; import org.elasticsearch.index.fielddata.FormattedDocValues; import org.elasticsearch.index.fielddata.HistogramValue; @@ -32,6 +33,7 @@ import org.elasticsearch.index.fielddata.IndexHistogramFieldData; import org.elasticsearch.index.fielddata.LeafHistogramFieldData; import org.elasticsearch.index.fielddata.SortedBinaryDocValues; +import org.elasticsearch.index.mapper.BlockLoader; import org.elasticsearch.index.mapper.CompositeSyntheticFieldLoader; import org.elasticsearch.index.mapper.DocumentParserContext; import org.elasticsearch.index.mapper.DocumentParsingException; @@ -42,6 +44,9 @@ import org.elasticsearch.index.mapper.MapperBuilderContext; import org.elasticsearch.index.mapper.SourceValueFetcher; import org.elasticsearch.index.mapper.ValueFetcher; +import org.elasticsearch.index.mapper.blockloader.docvalues.BytesRefsFromBinaryBlockLoader; +import org.elasticsearch.index.mapper.blockloader.docvalues.DoublesBlockLoader; +import org.elasticsearch.index.mapper.blockloader.docvalues.LongsBlockLoader; import org.elasticsearch.index.query.SearchExecutionContext; import org.elasticsearch.script.field.DocValuesScriptFieldFactory; import org.elasticsearch.search.DocValueFormat; @@ -58,6 +63,7 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.Map; +import java.util.Objects; import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; @@ -79,13 +85,13 @@ private static TDigestFieldMapper toType(FieldMapper in) { } public static class Builder extends FieldMapper.Builder { - private static final int DEFAULT_COMPRESSION = 100; - private static final int MAXIMUM_COMPRESSION = 10000; + private static final double DEFAULT_COMPRESSION = 100d; + private static final double MAXIMUM_COMPRESSION = 10000d; private final Parameter> meta = Parameter.metaParam(); private final Parameter> ignoreMalformed; private final Parameter digestType; - private final Parameter compression; + private final Parameter compression; public Builder(String name, boolean ignoreMalformedByDefault) { super(name); @@ -102,7 +108,15 @@ public Builder(String name, boolean ignoreMalformedByDefault) { TDigestState.Type.HYBRID, TDigestState.Type.class ); - this.compression = Parameter.intParam("compression", false, m -> toType(m).compression, DEFAULT_COMPRESSION).addValidator(c -> { + this.compression = new Parameter<>( + "compression", + false, + () -> DEFAULT_COMPRESSION, + (n, c1, o) -> XContentMapValues.nodeDoubleValue(o), + m -> toType(m).compression, + XContentBuilder::field, + Objects::toString + ).addValidator(c -> { if (c <= 0 || c > MAXIMUM_COMPRESSION) { throw new IllegalArgumentException( "compression must be a positive integer between 1 and " + MAXIMUM_COMPRESSION + " was [" + c + "]" @@ -135,7 +149,7 @@ public TDigestFieldMapper build(MapperBuilderContext context) { private final Explicit ignoreMalformed; private final boolean ignoreMalformedByDefault; private final TDigestState.Type digestType; - private final int compression; + private final double compression; public TDigestFieldMapper(String simpleName, MappedFieldType mappedFieldType, BuilderParams builderParams, Builder builder) { super(simpleName, mappedFieldType, builderParams); @@ -154,7 +168,7 @@ public TDigestState.Type digestType() { return digestType; } - public int compression() { + public double compression() { return compression; } @@ -184,6 +198,18 @@ public String typeName() { return CONTENT_TYPE; } + @Override + public BlockLoader blockLoader(BlockLoaderContext blContext) { + DoublesBlockLoader minimaLoader = new DoublesBlockLoader(valuesMinSubFieldName(name()), NumericUtils::sortableLongToDouble); + DoublesBlockLoader maximaLoader = new DoublesBlockLoader(valuesMaxSubFieldName(name()), NumericUtils::sortableLongToDouble); + DoublesBlockLoader sumsLoader = new DoublesBlockLoader(valuesSumSubFieldName(name()), NumericUtils::sortableLongToDouble); + LongsBlockLoader valueCountsLoader = new LongsBlockLoader(valuesCountSubFieldName(name())); + BytesRefsFromBinaryBlockLoader digestLoader = new BytesRefsFromBinaryBlockLoader(name()); + + // TODO: We're constantly passing around this set of 5 things. It would be nice to make a container for that. + return new TDigestBlockLoader(digestLoader, minimaLoader, maximaLoader, sumsLoader, valueCountsLoader); + } + @Override public ValueFetcher valueFetcher(SearchExecutionContext context, String format) { return SourceValueFetcher.identity(name(), context, format); @@ -444,7 +470,7 @@ private static String valuesMaxSubFieldName(String fullPath) { } /** re-usable {@link HistogramValue} implementation */ - private static class InternalTDigestValue extends HistogramValue { + static class InternalTDigestValue extends HistogramValue { double value; long count; boolean isExhausted; diff --git a/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/mapper/TDigestFieldBlockLoaderTests.java b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/mapper/TDigestFieldBlockLoaderTests.java new file mode 100644 index 0000000000000..4b23e41a86636 --- /dev/null +++ b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/mapper/TDigestFieldBlockLoaderTests.java @@ -0,0 +1,124 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.analytics.mapper; + +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.core.Types; +import org.elasticsearch.datageneration.datasource.DataSourceHandler; +import org.elasticsearch.datageneration.datasource.DataSourceRequest; +import org.elasticsearch.datageneration.datasource.DataSourceResponse; +import org.elasticsearch.index.mapper.BlockLoaderTestCase; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.search.aggregations.metrics.TDigestState; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.analytics.AnalyticsPlugin; +import org.junit.Before; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +public class TDigestFieldBlockLoaderTests extends BlockLoaderTestCase { + + public TDigestFieldBlockLoaderTests(Params params) { + super(TDigestFieldMapper.CONTENT_TYPE, List.of(DATA_SOURCE_HANDLER), params); + } + + @Override + protected Collection getPlugins() { + return List.of(new AnalyticsPlugin()); + } + + @Before + public void setup() { + assumeTrue("Only when exponential_histogram feature flag is enabled", TDigestFieldMapper.TDIGEST_FIELD_MAPPER.isEnabled()); + } + + private static DataSourceHandler DATA_SOURCE_HANDLER = new DataSourceHandler() { + + @Override + public DataSourceResponse.ObjectArrayGenerator handle(DataSourceRequest.ObjectArrayGenerator request) { + // tdigest does not support multiple values in a document so we can't have object arrays + return new DataSourceResponse.ObjectArrayGenerator(Optional::empty); + } + + @Override + public DataSourceResponse.LeafMappingParametersGenerator handle(DataSourceRequest.LeafMappingParametersGenerator request) { + if (request.fieldType().equals(TDigestFieldMapper.CONTENT_TYPE) == false) { + return null; + } + + return new DataSourceResponse.LeafMappingParametersGenerator(() -> { + var map = new HashMap(); + if (ESTestCase.randomBoolean()) { + map.put("ignore_malformed", ESTestCase.randomBoolean()); + map.put("compression", randomDoubleBetween(1.0, 1000.0, true)); + map.put("digest_type", randomFrom(TDigestState.Type.values())); + } + return map; + }); + } + + @Override + public DataSourceResponse.FieldDataGenerator handle(DataSourceRequest.FieldDataGenerator request) { + if (request.fieldType().equals(TDigestFieldMapper.CONTENT_TYPE) == false) { + return null; + } + return new DataSourceResponse.FieldDataGenerator( + mapping -> TDigestFieldMapperTests.generateRandomFieldValues(randomIntBetween(0, 1_000)) + ); + } + }; + + @Override + public void testBlockLoaderOfMultiField() throws IOException { + // Multi fields are not supported + } + + @Override + protected Object expected(Map fieldMapping, Object value, TestContext testContext) { + Map valueAsMap = Types.forciblyCast(value); + List centroids = Types.forciblyCast(valueAsMap.get("centroids")); + List counts = Types.forciblyCast(valueAsMap.get("counts")); + BytesStreamOutput streamOutput = new BytesStreamOutput(); + + long totalCount = 0; + + // TODO - refactor this, it's duplicated from the parser + for (int i = 0; i < centroids.size(); i++) { + long count = counts.get(i); + totalCount += count; + // we do not add elements with count == 0 + try { + if (count > 0) { + streamOutput.writeVLong(count); + streamOutput.writeDouble(centroids.get(i)); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + long finalTotalCount = totalCount; + return Map.of( + "min", + valueAsMap.get("min"), + "max", + valueAsMap.get("max"), + "sum", + valueAsMap.get("sum"), + "value_count", + finalTotalCount, + "encoded_digest", + streamOutput.bytes().toBytesRef() + ); + } +} diff --git a/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/mapper/TDigestFieldMapperTests.java b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/mapper/TDigestFieldMapperTests.java index dd33a9554ee1e..82fdf53636bab 100644 --- a/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/mapper/TDigestFieldMapperTests.java +++ b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/mapper/TDigestFieldMapperTests.java @@ -450,7 +450,7 @@ public void testArrayValueSyntheticSource() throws Exception { assertEquals(Strings.toString(expected), syntheticSource); } - private static Map generateRandomFieldValues(int maxVals) { + static Map generateRandomFieldValues(int maxVals) { Map value = new LinkedHashMap<>(); int size = between(1, maxVals); TDigestState digest = TDigestState.createWithoutCircuitBreaking(100); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockFactory.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockFactory.java index dff9d421988b5..39b06dcb684cf 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockFactory.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockFactory.java @@ -516,6 +516,16 @@ public BlockLoader.Block newExponentialHistogramBlockFromDocValues( return new ExponentialHistogramArrayBlock(minima, maxima, sums, valueCounts, zeroThresholds, encodedHistograms); } + public BlockLoader.Block newTDigestBlockFromDocValues( + BytesRefBlock encodedDigests, + DoubleBlock minima, + DoubleBlock maxima, + DoubleBlock sums, + LongBlock counts + ) { + return new TDigestArrayBlock(encodedDigests, minima, maxima, sums, counts); + } + public final AggregateMetricDoubleBlock newAggregateMetricDoubleBlock( double[] minValues, double[] maxValues, diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ConstantNullBlock.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ConstantNullBlock.java index 9c47176ab228c..92b6843ed253e 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ConstantNullBlock.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ConstantNullBlock.java @@ -28,7 +28,8 @@ public final class ConstantNullBlock extends AbstractNonThreadSafeRefCounted DoubleBlock, BytesRefBlock, AggregateMetricDoubleBlock, - ExponentialHistogramBlock { + ExponentialHistogramBlock, + TDigestBlock { private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(ConstantNullBlock.class); private final int positionCount; diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/TDigestArrayBlock.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/TDigestArrayBlock.java new file mode 100644 index 0000000000000..d98929b727091 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/TDigestArrayBlock.java @@ -0,0 +1,220 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.data; + +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.ReleasableIterator; +import org.elasticsearch.core.Releasables; + +import java.io.IOException; +import java.util.List; + +public final class TDigestArrayBlock extends AbstractNonThreadSafeRefCounted implements TDigestBlock { + private final DoubleBlock minima; + private final DoubleBlock maxima; + private final DoubleBlock sums; + private final LongBlock valueCounts; + private final BytesRefBlock encodedDigests; + + public TDigestArrayBlock( + BytesRefBlock encodedDigests, + DoubleBlock minima, + DoubleBlock maxima, + DoubleBlock sums, + LongBlock valueCounts + ) { + this.encodedDigests = encodedDigests; + this.minima = minima; + this.maxima = maxima; + this.sums = sums; + this.valueCounts = valueCounts; + } + + private List getSubBlocks() { + return List.of(minima, maxima, sums, valueCounts, encodedDigests); + } + + @Override + protected void closeInternal() { + Releasables.close(getSubBlocks()); + } + + @Override + public Vector asVector() { + return null; + } + + @Override + public int getTotalValueCount() { + return encodedDigests.getTotalValueCount(); + } + + @Override + public int getPositionCount() { + return encodedDigests.getPositionCount(); + } + + @Override + public int getFirstValueIndex(int position) { + return position; + } + + @Override + public int getValueCount(int position) { + return isNull(position) ? 0 : 1; + } + + @Override + public ElementType elementType() { + throw new UnsupportedOperationException("Need to implement this later"); + } + + @Override + public BlockFactory blockFactory() { + return encodedDigests.blockFactory(); + } + + @Override + public void allowPassingToDifferentDriver() { + getSubBlocks().forEach(Block::allowPassingToDifferentDriver); + } + + @Override + public boolean isNull(int position) { + return encodedDigests.isNull(position); + } + + @Override + public boolean mayHaveNulls() { + return encodedDigests.mayHaveNulls(); + } + + @Override + public boolean areAllValuesNull() { + return encodedDigests.areAllValuesNull(); + } + + @Override + public boolean mayHaveMultivaluedFields() { + return false; + } + + @Override + public boolean doesHaveMultivaluedFields() { + return false; + } + + @Override + public Block filter(int... positions) { + /* + TODO: Refactor this degelation pattern. In fact, both AggregateMetricDoubleArrayBlock and ExponentialHistogramArrayBlock + use the same pattern. We should extract a composite block abstract class, I think. + */ + DoubleBlock filteredMinima = null; + DoubleBlock filteredMaxima = null; + DoubleBlock filteredSums = null; + LongBlock filteredValueCounts = null; + BytesRefBlock filteredEncodedDigests = null; + boolean success = false; + try { + filteredEncodedDigests = encodedDigests.filter(positions); + filteredMinima = minima.filter(positions); + filteredMaxima = maxima.filter(positions); + filteredSums = sums.filter(positions); + filteredValueCounts = valueCounts.filter(positions); + success = true; + } finally { + if (success == false) { + Releasables.close(filteredMinima, filteredMaxima, filteredSums, filteredValueCounts, filteredEncodedDigests); + } + } + return new TDigestArrayBlock(filteredEncodedDigests, filteredMinima, filteredMaxima, filteredSums, filteredValueCounts); + } + + @Override + public Block keepMask(BooleanVector mask) { + DoubleBlock filteredMinima = null; + DoubleBlock filteredMaxima = null; + DoubleBlock filteredSums = null; + LongBlock filteredValueCounts = null; + BytesRefBlock filteredEncodedDigests = null; + boolean success = false; + try { + filteredEncodedDigests = encodedDigests.keepMask(mask); + filteredMinima = minima.keepMask(mask); + filteredMaxima = maxima.keepMask(mask); + filteredSums = sums.keepMask(mask); + filteredValueCounts = valueCounts.keepMask(mask); + success = true; + } finally { + if (success == false) { + Releasables.close(filteredMinima, filteredMaxima, filteredSums, filteredValueCounts, filteredEncodedDigests); + } + } + return new TDigestArrayBlock(filteredEncodedDigests, filteredMinima, filteredMaxima, filteredSums, filteredValueCounts); + } + + @Override + public ReleasableIterator lookup(IntBlock positions, ByteSizeValue targetBlockSize) { + throw new UnsupportedOperationException("Lookup is not supported on TDigestArrayBlock"); + } + + @Override + public MvOrdering mvOrdering() { + return MvOrdering.UNORDERED; + } + + @Override + public Block expand() { + // we don't support multivalues so expanding is a no-op + this.incRef(); + return this; + } + + @Override + public Block deepCopy(BlockFactory blockFactory) { + DoubleBlock copiedMinima = null; + DoubleBlock copiedMaxima = null; + DoubleBlock copiedSums = null; + LongBlock copiedValueCounts = null; + BytesRefBlock copiedEncodedDigests = null; + boolean success = false; + try { + copiedEncodedDigests = encodedDigests.deepCopy(blockFactory); + copiedMinima = minima.deepCopy(blockFactory); + copiedMaxima = maxima.deepCopy(blockFactory); + copiedSums = sums.deepCopy(blockFactory); + copiedValueCounts = valueCounts.deepCopy(blockFactory); + success = true; + } finally { + if (success == false) { + Releasables.close(copiedMinima, copiedMaxima, copiedSums, copiedValueCounts, copiedEncodedDigests); + } + } + return new TDigestArrayBlock(copiedEncodedDigests, copiedMinima, copiedMaxima, copiedSums, copiedValueCounts); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + Block.writeTypedBlock(encodedDigests, out); + Block.writeTypedBlock(minima, out); + Block.writeTypedBlock(maxima, out); + Block.writeTypedBlock(sums, out); + Block.writeTypedBlock(valueCounts, out); + } + + @Override + public long ramBytesUsed() { + long bytes = 0; + for (Block b : getSubBlocks()) { + bytes += b.ramBytesUsed(); + } + return bytes; + } +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/TDigestBlock.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/TDigestBlock.java new file mode 100644 index 0000000000000..315dbbb7b52f8 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/TDigestBlock.java @@ -0,0 +1,10 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.data; + +public sealed interface TDigestBlock extends Block permits ConstantNullBlock, TDigestArrayBlock {} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/DelegatingBlockLoaderFactory.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/DelegatingBlockLoaderFactory.java index 5686cfe845dc6..10548ebede2ea 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/DelegatingBlockLoaderFactory.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/DelegatingBlockLoaderFactory.java @@ -195,4 +195,21 @@ public BlockLoader.Block buildExponentialHistogramBlockDirect( (BytesRefBlock) encodedHistograms ); } + + @Override + public BlockLoader.Block buildTDigestBlockDirect( + BlockLoader.Block encodedDigests, + BlockLoader.Block minima, + BlockLoader.Block maxima, + BlockLoader.Block sums, + BlockLoader.Block valueCounts + ) { + return factory.newTDigestBlockFromDocValues( + (BytesRefBlock) encodedDigests, + (DoubleBlock) minima, + (DoubleBlock) maxima, + (DoubleBlock) sums, + (LongBlock) valueCounts + ); + } }