diff --git a/libs/tdigest/build.gradle b/libs/tdigest/build.gradle index 47fc0dbc239cf..6cdd3bb12ba30 100644 --- a/libs/tdigest/build.gradle +++ b/libs/tdigest/build.gradle @@ -23,6 +23,7 @@ apply plugin: 'elasticsearch.publish' dependencies { api project(':libs:core') + api project(':libs:x-content') api "org.apache.lucene:lucene-core:${versions.lucene}" testImplementation(project(":test:framework")) { diff --git a/libs/tdigest/src/main/java/module-info.java b/libs/tdigest/src/main/java/module-info.java index 79ddbe88ab3d3..beae047e0d777 100644 --- a/libs/tdigest/src/main/java/module-info.java +++ b/libs/tdigest/src/main/java/module-info.java @@ -20,7 +20,9 @@ module org.elasticsearch.tdigest { requires org.elasticsearch.base; requires org.apache.lucene.core; + requires org.elasticsearch.xcontent; exports org.elasticsearch.tdigest; exports org.elasticsearch.tdigest.arrays; + exports org.elasticsearch.tdigest.parsing; } diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/mapper/TDigestParser.java b/libs/tdigest/src/main/java/org/elasticsearch/tdigest/parsing/TDigestParser.java similarity index 66% rename from x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/mapper/TDigestParser.java rename to libs/tdigest/src/main/java/org/elasticsearch/tdigest/parsing/TDigestParser.java index 2b8bdb7bf6613..b884539373d30 100644 --- a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/mapper/TDigestParser.java +++ b/libs/tdigest/src/main/java/org/elasticsearch/tdigest/parsing/TDigestParser.java @@ -1,28 +1,41 @@ /* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * This project is based on a modification of https://github.com/tdunning/t-digest which is licensed under the Apache 2.0 License. */ -package org.elasticsearch.xpack.analytics.mapper; +package org.elasticsearch.tdigest.parsing; -import org.elasticsearch.index.mapper.DocumentParsingException; import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xcontent.XContentLocation; import org.elasticsearch.xcontent.XContentParser; import java.io.IOException; import java.util.ArrayList; import java.util.List; - -import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; -import static org.elasticsearch.xpack.analytics.mapper.TDigestFieldMapper.CENTROIDS_NAME; -import static org.elasticsearch.xpack.analytics.mapper.TDigestFieldMapper.COUNTS_NAME; -import static org.elasticsearch.xpack.analytics.mapper.TDigestFieldMapper.MAX_FIELD_NAME; -import static org.elasticsearch.xpack.analytics.mapper.TDigestFieldMapper.MIN_FIELD_NAME; -import static org.elasticsearch.xpack.analytics.mapper.TDigestFieldMapper.SUM_FIELD_NAME; +import java.util.function.BiFunction; public class TDigestParser { + public static final String CENTROIDS_NAME = "centroids"; + public static final String COUNTS_NAME = "counts"; + public static final String SUM_FIELD_NAME = "sum"; + public static final String MIN_FIELD_NAME = "min"; + public static final String MAX_FIELD_NAME = "max"; private static final ParseField COUNTS_FIELD = new ParseField(COUNTS_NAME); private static final ParseField CENTROIDS_FIELD = new ParseField(CENTROIDS_NAME); @@ -91,9 +104,15 @@ public Long count() { * * @param mappedFieldName the name of the field being parsed, used for error messages * @param parser the parser to use + * @param documentParsingExceptionProvider factory function for generating document parsing exceptions. Required for visibility. * @return the parsed histogram */ - public static ParsedTDigest parse(String mappedFieldName, XContentParser parser) throws IOException { + public static ParsedTDigest parse( + String mappedFieldName, + XContentParser parser, + BiFunction documentParsingExceptionProvider, + ParsingExceptionProvider parsingExceptionProvider + ) throws IOException { ArrayList centroids = null; ArrayList counts = null; Double sum = null; @@ -102,26 +121,26 @@ public static ParsedTDigest parse(String mappedFieldName, XContentParser parser) XContentParser.Token token = parser.currentToken(); while (token != XContentParser.Token.END_OBJECT) { // should be a field - ensureExpectedToken(XContentParser.Token.FIELD_NAME, token, parser); + ensureExpectedToken(XContentParser.Token.FIELD_NAME, token, parser, parsingExceptionProvider); String fieldName = parser.currentName(); if (fieldName.equals(CENTROIDS_FIELD.getPreferredName())) { - centroids = getCentroids(mappedFieldName, parser); + centroids = getCentroids(mappedFieldName, parser, documentParsingExceptionProvider, parsingExceptionProvider); } else if (fieldName.equals(COUNTS_FIELD.getPreferredName())) { - counts = getCounts(mappedFieldName, parser); + counts = getCounts(mappedFieldName, parser, documentParsingExceptionProvider, parsingExceptionProvider); } else if (fieldName.equals(SUM_FIELD.getPreferredName())) { token = parser.nextToken(); - ensureExpectedToken(XContentParser.Token.VALUE_NUMBER, token, parser); + ensureExpectedToken(XContentParser.Token.VALUE_NUMBER, token, parser, parsingExceptionProvider); sum = parser.doubleValue(); } else if (fieldName.equals(MIN_FIELD.getPreferredName())) { token = parser.nextToken(); - ensureExpectedToken(XContentParser.Token.VALUE_NUMBER, token, parser); + ensureExpectedToken(XContentParser.Token.VALUE_NUMBER, token, parser, parsingExceptionProvider); min = parser.doubleValue(); } else if (fieldName.equals(MAX_FIELD.getPreferredName())) { token = parser.nextToken(); - ensureExpectedToken(XContentParser.Token.VALUE_NUMBER, token, parser); + ensureExpectedToken(XContentParser.Token.VALUE_NUMBER, token, parser, parsingExceptionProvider); max = parser.doubleValue(); } else { - throw new DocumentParsingException( + throw documentParsingExceptionProvider.apply( parser.getTokenLocation(), "error parsing field [" + mappedFieldName + "], with unknown parameter [" + fieldName + "]" ); @@ -129,19 +148,19 @@ public static ParsedTDigest parse(String mappedFieldName, XContentParser parser) token = parser.nextToken(); } if (centroids == null) { - throw new DocumentParsingException( + throw documentParsingExceptionProvider.apply( parser.getTokenLocation(), "error parsing field [" + mappedFieldName + "], expected field called [" + CENTROIDS_FIELD.getPreferredName() + "]" ); } if (counts == null) { - throw new DocumentParsingException( + throw documentParsingExceptionProvider.apply( parser.getTokenLocation(), "error parsing field [" + mappedFieldName + "], expected field called [" + COUNTS_FIELD.getPreferredName() + "]" ); } if (centroids.size() != counts.size()) { - throw new DocumentParsingException( + throw documentParsingExceptionProvider.apply( parser.getTokenLocation(), "error parsing field [" + mappedFieldName @@ -165,20 +184,25 @@ public static ParsedTDigest parse(String mappedFieldName, XContentParser parser) return new ParsedTDigest(centroids, counts, sum, min, max); } - private static ArrayList getCounts(String mappedFieldName, XContentParser parser) throws IOException { + private static ArrayList getCounts( + String mappedFieldName, + XContentParser parser, + BiFunction documentParsingExceptionProvider, + ParsingExceptionProvider parsingExceptionProvider + ) throws IOException { ArrayList counts; XContentParser.Token token; token = parser.nextToken(); // should be an array - ensureExpectedToken(XContentParser.Token.START_ARRAY, token, parser); + ensureExpectedToken(XContentParser.Token.START_ARRAY, token, parser, parsingExceptionProvider); counts = new ArrayList<>(); token = parser.nextToken(); while (token != XContentParser.Token.END_ARRAY) { // should be a number - ensureExpectedToken(XContentParser.Token.VALUE_NUMBER, token, parser); + ensureExpectedToken(XContentParser.Token.VALUE_NUMBER, token, parser, parsingExceptionProvider); long count = parser.longValue(); if (count < 0) { - throw new DocumentParsingException( + throw documentParsingExceptionProvider.apply( parser.getTokenLocation(), "error parsing field [" + mappedFieldName + "], [" + COUNTS_FIELD + "] elements must be >= 0 but got " + count ); @@ -189,22 +213,27 @@ private static ArrayList getCounts(String mappedFieldName, XContentParser return counts; } - private static ArrayList getCentroids(String mappedFieldName, XContentParser parser) throws IOException { + private static ArrayList getCentroids( + String mappedFieldName, + XContentParser parser, + BiFunction documentParsingExceptionProvider, + ParsingExceptionProvider parsingExceptionProvider + ) throws IOException { XContentParser.Token token; ArrayList centroids; token = parser.nextToken(); // should be an array - ensureExpectedToken(XContentParser.Token.START_ARRAY, token, parser); + ensureExpectedToken(XContentParser.Token.START_ARRAY, token, parser, parsingExceptionProvider); centroids = new ArrayList<>(); token = parser.nextToken(); double previousVal = -Double.MAX_VALUE; while (token != XContentParser.Token.END_ARRAY) { // should be a number - ensureExpectedToken(XContentParser.Token.VALUE_NUMBER, token, parser); + ensureExpectedToken(XContentParser.Token.VALUE_NUMBER, token, parser, parsingExceptionProvider); double val = parser.doubleValue(); if (val < previousVal) { // centroids must be in increasing order - throw new DocumentParsingException( + throw documentParsingExceptionProvider.apply( parser.getTokenLocation(), "error parsing field [" + mappedFieldName @@ -224,4 +253,23 @@ private static ArrayList getCentroids(String mappedFieldName, XContentPa return centroids; } + /** + * Interface for throwing a parsing exception, needed for visibility + */ + @FunctionalInterface + public interface ParsingExceptionProvider { + RuntimeException apply(XContentParser parser, XContentParser.Token expected, XContentParser.Token actual) throws IOException; + } + + public static void ensureExpectedToken( + XContentParser.Token expected, + XContentParser.Token actual, + XContentParser parser, + ParsingExceptionProvider parsingExceptionProvider + ) throws IOException { + if (actual != expected) { + throw parsingExceptionProvider.apply(parser, expected, actual); + } + } + } diff --git a/libs/tdigest/src/main/java/org/elasticsearch/tdigest/parsing/package-info.java b/libs/tdigest/src/main/java/org/elasticsearch/tdigest/parsing/package-info.java new file mode 100644 index 0000000000000..83e31f73e66f4 --- /dev/null +++ b/libs/tdigest/src/main/java/org/elasticsearch/tdigest/parsing/package-info.java @@ -0,0 +1,27 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * This project is based on a modification of https://github.com/tdunning/t-digest which is licensed under the Apache 2.0 License. + */ + +/** + * Parsing package contains Elasticsearch specific classes for serializing and deserializing + * t-digests from various formats via Elasticsearch's XContent abstraction layer. + */ + +package org.elasticsearch.tdigest.parsing; diff --git a/server/src/main/java/org/elasticsearch/common/xcontent/XContentParserUtils.java b/server/src/main/java/org/elasticsearch/common/xcontent/XContentParserUtils.java index 6390e62f9758f..48ef941c60f4d 100644 --- a/server/src/main/java/org/elasticsearch/common/xcontent/XContentParserUtils.java +++ b/server/src/main/java/org/elasticsearch/common/xcontent/XContentParserUtils.java @@ -86,7 +86,7 @@ public static void expectValueToken(Token actual, XContentParser parser) { } } - private static ParsingException parsingException(XContentParser parser, Token expected, Token actual) { + public static ParsingException parsingException(XContentParser parser, Token expected, Token actual) { return new ParsingException( parser.getTokenLocation(), String.format(Locale.ROOT, "Failed to parse object: expecting token of type [%s] but found [%s]", expected, actual) diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/mapper/TDigestFieldMapper.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/mapper/TDigestFieldMapper.java index 74ced0a1c2e3b..b481fd8d9aab5 100644 --- a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/mapper/TDigestFieldMapper.java +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/mapper/TDigestFieldMapper.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.FeatureFlag; +import org.elasticsearch.common.xcontent.XContentParserUtils; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.index.fielddata.FieldDataContext; import org.elasticsearch.index.fielddata.FormattedDocValues; @@ -54,6 +55,7 @@ import org.elasticsearch.search.aggregations.metrics.TDigestState; import org.elasticsearch.search.sort.BucketedSort; import org.elasticsearch.search.sort.SortOrder; +import org.elasticsearch.tdigest.parsing.TDigestParser; import org.elasticsearch.xcontent.CopyingXContentParser; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentParser; @@ -62,6 +64,7 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.util.List; import java.util.Map; import java.util.Objects; @@ -368,21 +371,14 @@ public void parse(DocumentParserContext context) throws IOException { } subParser.nextToken(); // TODO: Here we should build a t-digest out of the input, based on the settings on the field - TDigestParser.ParsedTDigest parsedTDigest = TDigestParser.parse(fullPath(), subParser); - - BytesStreamOutput streamOutput = new BytesStreamOutput(); - - for (int i = 0; i < parsedTDigest.centroids().size(); i++) { - long count = parsedTDigest.counts().get(i); - assert count >= 0; - // we do not add elements with count == 0 - if (count > 0) { - streamOutput.writeVLong(count); - streamOutput.writeDouble(parsedTDigest.centroids().get(i)); - } - } + TDigestParser.ParsedTDigest parsedTDigest = TDigestParser.parse( + fullPath(), + subParser, + DocumentParsingException::new, + XContentParserUtils::parsingException + ); - BytesRef docValue = streamOutput.bytes().toBytesRef(); + BytesRef docValue = encodeCentroidsAndCounts(parsedTDigest.centroids(), parsedTDigest.counts()); Field digestField = new BinaryDocValuesField(fullPath(), docValue); // Add numeric doc values fields for the summary data @@ -458,6 +454,23 @@ public void parse(DocumentParserContext context) throws IOException { context.path().remove(); } + private static BytesRef encodeCentroidsAndCounts(List centroids, List counts) throws IOException { + BytesStreamOutput streamOutput = new BytesStreamOutput(); + + for (int i = 0; i < centroids.size(); i++) { + long count = counts.get(i); + assert count >= 0; + // we do not add elements with count == 0 + if (count > 0) { + streamOutput.writeVLong(count); + streamOutput.writeDouble(centroids.get(i)); + } + } + + BytesRef docValue = streamOutput.bytes().toBytesRef(); + return docValue; + } + private static String valuesCountSubFieldName(String fullPath) { return fullPath + "._values_count"; } diff --git a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/plugin/EsqlCorePlugin.java b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/plugin/EsqlCorePlugin.java index da808b0083d22..5bcff64bc7149 100644 --- a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/plugin/EsqlCorePlugin.java +++ b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/plugin/EsqlCorePlugin.java @@ -14,4 +14,8 @@ public class EsqlCorePlugin extends Plugin implements ExtensiblePlugin { public static final FeatureFlag EXPONENTIAL_HISTOGRAM_FEATURE_FLAG = new FeatureFlag("esql_exponential_histogram"); + + // Note, there is also a feature flag for the field type in the analytics plugin, but for visibility reasons we need + // another one here. + public static final FeatureFlag T_DIGEST_ESQL_SUPPORT = new FeatureFlag("esql_t_digest_support"); } diff --git a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/DataType.java b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/DataType.java index 8055bcdee0bc5..f2d0be0af6554 100644 --- a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/DataType.java +++ b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/DataType.java @@ -349,6 +349,16 @@ public enum DataType implements Writeable { .underConstruction() ), + /* + TDIGEST( + builder().esType("exponential_histogram") + .estimatedSize(16 * 160)// guess 160 buckets (OTEL default for positive values only histograms) with 16 bytes per bucket + .docValues() + .underConstruction() + ), + + */ + /** * Fields with this type are dense vectors, represented as an array of float values. */ diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockFactory.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockFactory.java index cb8ffdf400a7e..b3144d6060219 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockFactory.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockFactory.java @@ -496,10 +496,18 @@ public ExponentialHistogramBlockBuilder newExponentialHistogramBlockBuilder(int return new ExponentialHistogramBlockBuilder(estimatedSize, this); } + public TDigestBlockBuilder newTDigestBlockBuilder(int estimatedSize) { + return new TDigestBlockBuilder(estimatedSize, this); + } + public final ExponentialHistogramBlock newConstantExponentialHistogramBlock(ExponentialHistogram value, int positionCount) { return ExponentialHistogramArrayBlock.createConstant(value, positionCount, this); } + public final TDigestBlock newConstantTDigestBlock(TDigestHolder value, int positions) { + return TDigestArrayBlock.createConstant(value, positions, this); + } + public BlockLoader.Block newExponentialHistogramBlockFromDocValues( DoubleBlock minima, DoubleBlock maxima, diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockUtils.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockUtils.java index 7dfe664364e51..da7f31913f18c 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockUtils.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockUtils.java @@ -225,9 +225,10 @@ public static void appendValue(Block.Builder builder, Object val, ElementType ty case FLOAT -> ((FloatBlock.Builder) builder).appendFloat((Float) val); case DOUBLE -> ((DoubleBlock.Builder) builder).appendDouble((Double) val); case BOOLEAN -> ((BooleanBlock.Builder) builder).appendBoolean((Boolean) val); + case TDIGEST -> ((TDigestBlockBuilder) builder).append((TDigestHolder) val); case AGGREGATE_METRIC_DOUBLE -> ((AggregateMetricDoubleBlockBuilder) builder).appendLiteral((AggregateMetricDoubleLiteral) val); case EXPONENTIAL_HISTOGRAM -> ((ExponentialHistogramBlockBuilder) builder).append((ExponentialHistogram) val); - default -> throw new UnsupportedOperationException("unsupported element type [" + type + "]"); + case DOC, COMPOSITE, NULL, UNKNOWN -> throw new UnsupportedOperationException("unsupported element type [" + type + "]"); } } @@ -257,6 +258,7 @@ private static Block constantBlock(BlockFactory blockFactory, ElementType type, case AGGREGATE_METRIC_DOUBLE -> blockFactory.newConstantAggregateMetricDoubleBlock((AggregateMetricDoubleLiteral) val, size); case FLOAT -> blockFactory.newConstantFloatBlockWith((float) val, size); case EXPONENTIAL_HISTOGRAM -> blockFactory.newConstantExponentialHistogramBlock((ExponentialHistogram) val, size); + case TDIGEST -> blockFactory.newConstantTDigestBlock((TDigestHolder) val, size); default -> throw new UnsupportedOperationException("unsupported element type [" + type + "]"); }; } @@ -316,6 +318,11 @@ yield new AggregateMetricDoubleLiteral( // return a copy so that the returned value is not bound to the lifetime of the block yield ExponentialHistogram.builder(histogram, ExponentialHistogramCircuitBreaker.noop()).build(); } + case TDIGEST -> { + TDigestBlock tDigestBlock = (TDigestBlock) block; + yield tDigestBlock.getTDigestHolder(offset); + + } case UNKNOWN -> throw new IllegalArgumentException("can't read values from [" + block + "]"); }; } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ConstantNullBlock.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ConstantNullBlock.java index 92b6843ed253e..8eabb7ba171ca 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ConstantNullBlock.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ConstantNullBlock.java @@ -301,6 +301,18 @@ public ExponentialHistogram getExponentialHistogram(int valueIndex, ExponentialH throw new UnsupportedOperationException("null block"); } + @Override + public void serializeTDigest(int valueIndex, SerializedTDigestOutput out, BytesRef scratch) { + assert false : "null block"; + throw new UnsupportedOperationException("null block"); + } + + @Override + public TDigestHolder getTDigestHolder(int valueIndex) { + assert false : "null block"; + throw new UnsupportedOperationException("null block"); + } + @Override public Block buildExponentialHistogramComponentBlock(Component component) { // if all histograms are null, the component block is also a constant null block with the same position count diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ElementType.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ElementType.java index 52a7853e56182..c8890542cf867 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ElementType.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ElementType.java @@ -75,7 +75,8 @@ public enum ElementType { "ExponentialHistogram", BlockFactory::newExponentialHistogramBlockBuilder, ExponentialHistogramArrayBlock::readFrom - ); + ), + TDIGEST(12, "TDigest", BlockFactory::newTDigestBlockBuilder, TDigestArrayBlock::readFrom); private static final TransportVersion ESQL_SERIALIZE_BLOCK_TYPE_CODE = TransportVersion.fromName("esql_serialize_block_type_code"); @@ -126,6 +127,8 @@ public static ElementType fromJava(Class type) { elementType = AGGREGATE_METRIC_DOUBLE; } else if (type != null && ExponentialHistogram.class.isAssignableFrom(type)) { elementType = EXPONENTIAL_HISTOGRAM; + } else if (type != null && TDigestHolder.class.isAssignableFrom(type)) { + elementType = TDIGEST; } else if (type == null || type == Void.class) { elementType = NULL; } else { diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/TDigestArrayBlock.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/TDigestArrayBlock.java index d98929b727091..b276bf0c2f895 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/TDigestArrayBlock.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/TDigestArrayBlock.java @@ -7,6 +7,7 @@ package org.elasticsearch.compute.data; +import org.apache.lucene.util.BytesRef; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.core.ReleasableIterator; @@ -34,6 +35,35 @@ public TDigestArrayBlock( this.maxima = maxima; this.sums = sums; this.valueCounts = valueCounts; + assertInvariants(); + } + + private void assertInvariants() { + for (Block b : getSubBlocks()) { + assert b.isReleased() == false; + assert b.doesHaveMultivaluedFields() == false : "TDigestArrayBlock sub-blocks can't have multi-values but [" + b + "] does"; + assert b.getPositionCount() == getPositionCount() + : "TDigestArrayBlock sub-blocks must have the same position count but [" + + b + + "] has " + + b.getPositionCount() + + " instead of " + + getPositionCount(); + for (int i = 0; i < b.getPositionCount(); i++) { + if (isNull(i)) { + assert b.isNull(i) : "TDigestArrayBlock sub-block [" + b + "] should be null at position " + i + ", but was not"; + } else { + if (b == sums || b == minima || b == maxima) { + // sums / minima / maxima should be null exactly when value count is 0 or the histogram is null + assert b.isNull(i) == (valueCounts.getLong(valueCounts.getFirstValueIndex(i)) == 0) + : "TDigestArrayBlock sums/minima/maxima sub-block [" + b + "] has wrong nullity at position " + i; + } else { + assert b.isNull(i) == false + : "TDigestArrayBlock sub-block [" + b + "] should be non-null at position " + i + ", but was not"; + } + } + } + } } private List getSubBlocks() { @@ -72,7 +102,7 @@ public int getValueCount(int position) { @Override public ElementType elementType() { - throw new UnsupportedOperationException("Need to implement this later"); + return ElementType.TDIGEST; } @Override @@ -200,6 +230,22 @@ public Block deepCopy(BlockFactory blockFactory) { return new TDigestArrayBlock(copiedEncodedDigests, copiedMinima, copiedMaxima, copiedSums, copiedValueCounts); } + void copyInto( + BytesRefBlock.Builder encodedDigestsBuilder, + DoubleBlock.Builder minimaBuilder, + DoubleBlock.Builder maximaBuilder, + DoubleBlock.Builder sumsBuilder, + LongBlock.Builder valueCountsBuilder, + int beginInclusive, + int endExclusive + ) { + encodedDigestsBuilder.copyFrom(encodedDigests, beginInclusive, endExclusive); + minimaBuilder.copyFrom(minima, beginInclusive, endExclusive); + maximaBuilder.copyFrom(maxima, beginInclusive, endExclusive); + sumsBuilder.copyFrom(sums, beginInclusive, endExclusive); + valueCountsBuilder.copyFrom(valueCounts, beginInclusive, endExclusive); + } + @Override public void writeTo(StreamOutput out) throws IOException { Block.writeTypedBlock(encodedDigests, out); @@ -209,6 +255,29 @@ public void writeTo(StreamOutput out) throws IOException { Block.writeTypedBlock(valueCounts, out); } + public static TDigestArrayBlock readFrom(BlockStreamInput in) throws IOException { + BytesRefBlock encodedDigests = null; + DoubleBlock minima = null; + DoubleBlock maxima = null; + DoubleBlock sums = null; + LongBlock valueCounts = null; + + boolean success = false; + try { + encodedDigests = (BytesRefBlock) Block.readTypedBlock(in); + minima = (DoubleBlock) Block.readTypedBlock(in); + maxima = (DoubleBlock) Block.readTypedBlock(in); + sums = (DoubleBlock) Block.readTypedBlock(in); + valueCounts = (LongBlock) Block.readTypedBlock(in); + success = true; + } finally { + if (success == false) { + Releasables.close(minima, maxima, sums, valueCounts, encodedDigests); + } + } + return new TDigestArrayBlock(encodedDigests, minima, maxima, sums, valueCounts); + } + @Override public long ramBytesUsed() { long bytes = 0; @@ -217,4 +286,91 @@ public long ramBytesUsed() { } return bytes; } + + @Override + public void serializeTDigest(int valueIndex, SerializedTDigestOutput out, BytesRef scratch) { + // not that this value count is different from getValueCount(position)! + // this value count represents the number of individual samples the histogram was computed for + long valueCount = valueCounts.getLong(valueCounts.getFirstValueIndex(valueIndex)); + out.appendLong(valueCount); + if (valueCount > 0) { + // sum / min / max are only non-null for non-empty histograms + out.appendDouble(sums.getDouble(sums.getFirstValueIndex(valueIndex))); + out.appendDouble(minima.getDouble(minima.getFirstValueIndex(valueIndex))); + out.appendDouble(maxima.getDouble(maxima.getFirstValueIndex(valueIndex))); + } + out.appendBytesRef(encodedDigests.getBytesRef(encodedDigests.getFirstValueIndex(valueIndex), scratch)); + } + + @Override + public TDigestHolder getTDigestHolder(int offset) { + return new TDigestHolder( + // TODO: Memory tracking? creating a new bytes ref here doesn't seem great + encodedDigests.getBytesRef(offset, new BytesRef()), + minima.getDouble(offset), + maxima.getDouble(offset), + sums.getDouble(offset), + valueCounts.getLong(offset) + ); + } + + public static TDigestBlock createConstant(TDigestHolder histogram, int positionCount, BlockFactory blockFactory) { + DoubleBlock minBlock = null; + DoubleBlock maxBlock = null; + DoubleBlock sumBlock = null; + LongBlock countBlock = null; + BytesRefBlock encodedDigestsBlock = null; + boolean success = false; + try { + countBlock = blockFactory.newConstantLongBlockWith(histogram.getValueCount(), positionCount); + if (Double.isNaN(histogram.getMin())) { + minBlock = (DoubleBlock) blockFactory.newConstantNullBlock(positionCount); + } else { + minBlock = blockFactory.newConstantDoubleBlockWith(histogram.getMin(), positionCount); + } + if (Double.isNaN(histogram.getMax())) { + maxBlock = (DoubleBlock) blockFactory.newConstantNullBlock(positionCount); + } else { + maxBlock = blockFactory.newConstantDoubleBlockWith(histogram.getMax(), positionCount); + } + if (Double.isNaN(histogram.getSum())) { + sumBlock = (DoubleBlock) blockFactory.newConstantNullBlock(positionCount); + } else { + sumBlock = blockFactory.newConstantDoubleBlockWith(histogram.getSum(), positionCount); + } + encodedDigestsBlock = blockFactory.newConstantBytesRefBlockWith(histogram.getEncodedDigest(), positionCount); + success = true; + return new TDigestArrayBlock(encodedDigestsBlock, minBlock, maxBlock, sumBlock, countBlock); + } finally { + if (success == false) { + Releasables.close(minBlock, maxBlock, sumBlock, countBlock, encodedDigestsBlock); + } + } + } + + @Override + public boolean equals(Object o) { + if (o instanceof TDigestBlock block) { + return TDigestBlock.equals(this, block); + } + return false; + } + + boolean equalsAfterTypeCheck(TDigestArrayBlock that) { + return minima.equals(that.minima) + && maxima.equals(that.maxima) + && sums.equals(that.sums) + && valueCounts.equals(that.valueCounts) + && encodedDigests.equals(that.encodedDigests); + } + + @Override + public int hashCode() { + /* + for now we use just the hash of encodedDigests + this ensures proper equality with null blocks and should be unique enough for practical purposes. + This mirrors the behavior in Exponential Histogram + */ + return encodedDigests.hashCode(); + } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/TDigestBlock.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/TDigestBlock.java index 315dbbb7b52f8..8bbc59cad1f0e 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/TDigestBlock.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/TDigestBlock.java @@ -7,4 +7,58 @@ package org.elasticsearch.compute.data; -public sealed interface TDigestBlock extends Block permits ConstantNullBlock, TDigestArrayBlock {} +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.index.mapper.BlockLoader; + +public sealed interface TDigestBlock extends Block permits ConstantNullBlock, TDigestArrayBlock { + + static boolean equals(TDigestBlock blockA, TDigestBlock blockB) { + if (blockA == blockB) { + return true; + } + return switch (blockA) { + case null -> false; + case ConstantNullBlock a -> a.equals(blockB); + case TDigestArrayBlock a -> switch (blockB) { + case null -> false; + case ConstantNullBlock b -> b.equals(a); + case TDigestArrayBlock b -> a.equalsAfterTypeCheck(b); + }; + }; + } + + void serializeTDigest(int valueIndex, SerializedTDigestOutput out, BytesRef scratch); + + /** + * Builder for {@link TDigestBlock} + */ + sealed interface Builder extends Block.Builder, BlockLoader.TDigestBuilder permits TDigestBlockBuilder { + + /** + * Copy the values in {@code block} from the given positon into this builder. + */ + TDigestBlock.Builder copyFrom(TDigestBlock block, int position); + + @Override + TDigestBlock build(); + } + + TDigestHolder getTDigestHolder(int offset); + + interface SerializedTDigestOutput { + void appendDouble(double value); + + void appendLong(long value); + + void appendBytesRef(BytesRef bytesRef); + } + + interface SerializedTDigestInput { + double readDouble(); + + long readLong(); + + BytesRef readBytesRef(BytesRef scratch); + } + +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/TDigestBlockBuilder.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/TDigestBlockBuilder.java new file mode 100644 index 0000000000000..b097e9f070b5a --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/TDigestBlockBuilder.java @@ -0,0 +1,184 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.data; + +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.core.Releasables; +import org.elasticsearch.index.mapper.BlockLoader; + +public final class TDigestBlockBuilder implements TDigestBlock.Builder { + + private final BytesRefBlock.Builder encodedDigestsBuilder; + private final DoubleBlock.Builder minimaBuilder; + private final DoubleBlock.Builder maximaBuilder; + private final DoubleBlock.Builder sumsBuilder; + private final LongBlock.Builder valueCountsBuilder; + + private final BytesRef scratch = new BytesRef(); + + public TDigestBlockBuilder(int size, BlockFactory blockFactory) { + BytesRefBlock.Builder encodedDigestsBuilder = null; + DoubleBlock.Builder minimaBuilder = null; + DoubleBlock.Builder maximaBuilder = null; + DoubleBlock.Builder sumsBuilder = null; + LongBlock.Builder valueCountsBuilder = null; + boolean success = false; + try { + encodedDigestsBuilder = blockFactory.newBytesRefBlockBuilder(size); + minimaBuilder = blockFactory.newDoubleBlockBuilder(size); + maximaBuilder = blockFactory.newDoubleBlockBuilder(size); + sumsBuilder = blockFactory.newDoubleBlockBuilder(size); + valueCountsBuilder = blockFactory.newLongBlockBuilder(size); + this.encodedDigestsBuilder = encodedDigestsBuilder; + this.minimaBuilder = minimaBuilder; + this.maximaBuilder = maximaBuilder; + this.sumsBuilder = sumsBuilder; + this.valueCountsBuilder = valueCountsBuilder; + success = true; + } finally { + if (success == false) { + Releasables.close(encodedDigestsBuilder, minimaBuilder, maximaBuilder, sumsBuilder, valueCountsBuilder); + } + } + } + + @Override + public TDigestBlockBuilder copyFrom(Block block, int beginInclusive, int endExclusive) { + if (block.areAllValuesNull()) { + for (int i = beginInclusive; i < endExclusive; i++) { + appendNull(); + } + } else { + TDigestArrayBlock digestBlock = (TDigestArrayBlock) block; + digestBlock.copyInto( + encodedDigestsBuilder, + minimaBuilder, + maximaBuilder, + sumsBuilder, + valueCountsBuilder, + beginInclusive, + endExclusive + ); + } + return this; + } + + @Override + public TDigestBlock.Builder copyFrom(TDigestBlock block, int position) { + copyFrom(block, position, position + 1); + return this; + } + + @Override + public Block.Builder appendNull() { + encodedDigestsBuilder.appendNull(); + minimaBuilder.appendNull(); + maximaBuilder.appendNull(); + sumsBuilder.appendNull(); + valueCountsBuilder.appendNull(); + return this; + } + + @Override + public Block.Builder beginPositionEntry() { + throw new UnsupportedOperationException("TDigest Blocks do not support multi-values"); + } + + @Override + public Block.Builder endPositionEntry() { + throw new UnsupportedOperationException("TDigest Blocks do not support multi-values"); + } + + @Override + public Block.Builder mvOrdering(Block.MvOrdering mvOrdering) { + assert mvOrdering == Block.MvOrdering.UNORDERED : "TDigests don't have a natural order, so it doesn't make sense to call this"; + return this; + } + + @Override + public long estimatedBytes() { + return encodedDigestsBuilder.estimatedBytes() + minimaBuilder.estimatedBytes() + maximaBuilder.estimatedBytes() + sumsBuilder + .estimatedBytes() + valueCountsBuilder.estimatedBytes(); + } + + @Override + public TDigestBlock build() { + DoubleBlock minima = null; + DoubleBlock maxima = null; + DoubleBlock sums = null; + LongBlock valueCounts = null; + BytesRefBlock encodedDigests = null; + boolean success = false; + try { + minima = minimaBuilder.build(); + maxima = maximaBuilder.build(); + sums = sumsBuilder.build(); + valueCounts = valueCountsBuilder.build(); + encodedDigests = encodedDigestsBuilder.build(); + success = true; + return new TDigestArrayBlock(encodedDigests, minima, maxima, sums, valueCounts); + } finally { + if (success == false) { + Releasables.close(minima, maxima, sums, valueCounts, encodedDigests); + } + } + } + + @Override + public BlockLoader.DoubleBuilder minima() { + return minimaBuilder; + } + + @Override + public BlockLoader.DoubleBuilder maxima() { + return maximaBuilder; + } + + @Override + public BlockLoader.DoubleBuilder sums() { + return sumsBuilder; + } + + @Override + public BlockLoader.LongBuilder valueCounts() { + return valueCountsBuilder; + } + + @Override + public BlockLoader.BytesRefBuilder encodedDigests() { + return encodedDigestsBuilder; + } + + @Override + public void close() { + Releasables.close(encodedDigestsBuilder, minimaBuilder, maximaBuilder, sumsBuilder, valueCountsBuilder); + } + + public void append(TDigestHolder val) { + encodedDigestsBuilder.appendBytesRef(val.getEncodedDigest()); + minimaBuilder.appendDouble(val.getMin()); + maximaBuilder.appendDouble(val.getMax()); + sumsBuilder.appendDouble(val.getSum()); + valueCountsBuilder.appendLong(val.getValueCount()); + } + + public void deserializeAndAppend(TDigestBlock.SerializedTDigestInput input) { + long valueCount = input.readLong(); + valueCountsBuilder.appendLong(valueCount); + if (valueCount > 0) { + sumsBuilder.appendDouble(input.readDouble()); + minimaBuilder.appendDouble(input.readDouble()); + maximaBuilder.appendDouble(input.readDouble()); + } else { + sumsBuilder.appendNull(); + minimaBuilder.appendNull(); + maximaBuilder.appendNull(); + } + encodedDigestsBuilder.appendBytesRef(input.readBytesRef(scratch)); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/TDigestHolder.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/TDigestHolder.java new file mode 100644 index 0000000000000..0ce6b6fcffa6a --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/TDigestHolder.java @@ -0,0 +1,101 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.data; + +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.io.stream.BytesStreamOutput; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; + +/** + * This exists to hold the values from a {@link TDigestBlock}. It is roughly parallel to + * {@link org.elasticsearch.search.aggregations.metrics.TDigestState} in classic aggregations, which we are not using directly because + * the serialization format is pretty bad for ESQL's use case (specifically, encoding the near-constant compression and merge strategy + * data inline as opposed to in a dedicated column isn't great). + */ +public class TDigestHolder { + + private final double min; + private final double max; + private final double sum; + private final long valueCount; + private final BytesRef encodedDigest; + + // TODO - Deal with the empty array case better + public TDigestHolder(BytesRef encodedDigest, double min, double max, double sum, long valueCount) { + this.encodedDigest = encodedDigest; + this.min = min; + this.max = max; + this.sum = sum; + this.valueCount = valueCount; + } + + public TDigestHolder(List centroids, List counts, double min, double max, double sum, long valueCount) + throws IOException { + this(encodeCentroidsAndCounts(centroids, counts), min, max, sum, valueCount); + } + + @Override + public boolean equals(Object o) { + if ((o instanceof TDigestHolder that)) { + return Double.compare(min, that.min) == 0 + && Double.compare(max, that.max) == 0 + && Double.compare(sum, that.sum) == 0 + && valueCount == that.valueCount + && Objects.equals(encodedDigest, that.encodedDigest); + } + return false; + } + + @Override + public int hashCode() { + return Objects.hash(min, max, sum, valueCount, encodedDigest); + } + + private static BytesRef encodeCentroidsAndCounts(List centroids, List counts) throws IOException { + // TODO: This is copied from the method of the same name in TDigestFieldMapper. It would be nice to find a way to reuse that code + BytesStreamOutput streamOutput = new BytesStreamOutput(); + + for (int i = 0; i < centroids.size(); i++) { + long count = counts.get(i); + assert count >= 0; + // we do not add elements with count == 0 + if (count > 0) { + streamOutput.writeVLong(count); + streamOutput.writeDouble(centroids.get(i)); + } + } + + BytesRef docValue = streamOutput.bytes().toBytesRef(); + return docValue; + } + + public BytesRef getEncodedDigest() { + return encodedDigest; + } + + // TODO - compute these if they're not given? or do that at object creation time, maybe. + public double getMax() { + return max; + } + + public double getMin() { + return min; + } + + public double getSum() { + return sum; + } + + public long getValueCount() { + return valueCount; + } + +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/QueryList.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/QueryList.java index c42946ed71777..975d85c55e6af 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/QueryList.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/QueryList.java @@ -189,6 +189,7 @@ public static IntFunction createBlockValueReader(Block block) { case COMPOSITE -> throw new IllegalArgumentException("can't read values from [composite] block"); case AGGREGATE_METRIC_DOUBLE -> throw new IllegalArgumentException("can't read values from [aggregate metric double] block"); case EXPONENTIAL_HISTOGRAM -> throw new IllegalArgumentException("can't read values from [exponential histogram] block"); + case TDIGEST -> throw new IllegalArgumentException("can't read values from [tdigest] block"); case UNKNOWN -> throw new IllegalArgumentException("can't read values from [" + block + "]"); }; } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilder.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilder.java index cff31535f82e0..b51578baff5d9 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilder.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilder.java @@ -56,6 +56,7 @@ static ResultBuilder resultBuilderFor( case DOC -> new ResultBuilderForDoc(blockFactory, (DocVectorEncoder) encoder, positions); case AGGREGATE_METRIC_DOUBLE -> new ResultBuilderForAggregateMetricDouble(blockFactory, positions); case EXPONENTIAL_HISTOGRAM -> new ResultBuilderForExponentialHistogram(blockFactory, positions); + case TDIGEST -> new ResultBuilderForTDigest(blockFactory, positions); default -> { assert false : "Result builder for [" + elementType + "]"; throw new UnsupportedOperationException("Result builder for [" + elementType + "]"); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilderForTDigest.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilderForTDigest.java new file mode 100644 index 0000000000000..e4e2cc0d6472c --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilderForTDigest.java @@ -0,0 +1,74 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.operator.topn; + +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.TDigestBlock; +import org.elasticsearch.compute.data.TDigestBlockBuilder; + +public class ResultBuilderForTDigest implements ResultBuilder { + private final TDigestBlockBuilder builder; + private final ResultBuilderForTDigest.ReusableTopNEncoderInput reusableInput = new ReusableTopNEncoderInput(); + + ResultBuilderForTDigest(BlockFactory blockFactory, int positions) { + this.builder = blockFactory.newTDigestBlockBuilder(positions); + } + + @Override + public void decodeKey(BytesRef keys) { + throw new AssertionError("TDigest can't be a key"); + } + + @Override + public void decodeValue(BytesRef values) { + int count = TopNEncoder.DEFAULT_UNSORTABLE.decodeVInt(values); + if (count == 0) { + builder.appendNull(); + return; + } + assert count == 1 : "TDigest does not support multi values"; + reusableInput.inputValues = values; + builder.deserializeAndAppend(reusableInput); + } + + @Override + public Block build() { + return builder.build(); + } + + @Override + public String toString() { + return "ResultBuilderForExponentialHistogram"; + } + + @Override + public void close() { + builder.close(); + } + + private static final class ReusableTopNEncoderInput implements TDigestBlock.SerializedTDigestInput { + BytesRef inputValues; + + @Override + public double readDouble() { + return TopNEncoder.DEFAULT_UNSORTABLE.decodeDouble(inputValues); + } + + @Override + public long readLong() { + return TopNEncoder.DEFAULT_UNSORTABLE.decodeLong(inputValues); + } + + @Override + public BytesRef readBytesRef(BytesRef scratch) { + return TopNEncoder.DEFAULT_UNSORTABLE.decodeBytesRef(inputValues, scratch); + } + } +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ValueExtractor.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ValueExtractor.java index d5120cddcb761..708c218b2aaac 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ValueExtractor.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ValueExtractor.java @@ -18,6 +18,7 @@ import org.elasticsearch.compute.data.FloatBlock; import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.data.TDigestBlock; import org.elasticsearch.compute.operator.BreakingBytesRefBuilder; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.RefCounted; @@ -55,6 +56,7 @@ static ValueExtractor extractorFor(ElementType elementType, TopNEncoder encoder, case DOC -> new ValueExtractorForDoc(encoder, ((DocBlock) block).asVector()); case AGGREGATE_METRIC_DOUBLE -> new ValueExtractorForAggregateMetricDouble(encoder, (AggregateMetricDoubleBlock) block); case EXPONENTIAL_HISTOGRAM -> new ValueExtractorForExponentialHistogram(encoder, (ExponentialHistogramBlock) block); + case TDIGEST -> new ValueExtractorForTDigest(encoder, (TDigestBlock) block); default -> { assert false : "No value extractor for [" + block.elementType() + "]"; throw new UnsupportedOperationException("No value extractor for [" + block.elementType() + "]"); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ValueExtractorForTDigest.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ValueExtractorForTDigest.java new file mode 100644 index 0000000000000..876c34f2ce1fc --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ValueExtractorForTDigest.java @@ -0,0 +1,62 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.operator.topn; + +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.compute.data.TDigestBlock; +import org.elasticsearch.compute.operator.BreakingBytesRefBuilder; + +public class ValueExtractorForTDigest implements ValueExtractor { + private final TDigestBlock block; + + private final BytesRef scratch = new BytesRef(); + private final ReusableTopNEncoderOutput reusableOutput = new ReusableTopNEncoderOutput(); + + ValueExtractorForTDigest(TopNEncoder encoder, TDigestBlock block) { + assert encoder == TopNEncoder.DEFAULT_UNSORTABLE; + this.block = block; + } + + @Override + public void writeValue(BreakingBytesRefBuilder values, int position) { + // number of multi-values first for compatibility with ValueExtractorForNull + if (block.isNull(position)) { + TopNEncoder.DEFAULT_UNSORTABLE.encodeVInt(0, values); + } else { + assert block.getValueCount(position) == 1 : "Multi-valued ExponentialHistogram blocks are not supported in TopN"; + TopNEncoder.DEFAULT_UNSORTABLE.encodeVInt(1, values); + int valueIndex = block.getFirstValueIndex(position); + reusableOutput.target = values; + block.serializeTDigest(valueIndex, reusableOutput, scratch); + } + } + + @Override + public String toString() { + return "ValueExtractorForExponentialHistogram"; + } + + private static final class ReusableTopNEncoderOutput implements TDigestBlock.SerializedTDigestOutput { + BreakingBytesRefBuilder target; + + @Override + public void appendDouble(double value) { + TopNEncoder.DEFAULT_UNSORTABLE.encodeDouble(value, target); + } + + @Override + public void appendLong(long value) { + TopNEncoder.DEFAULT_UNSORTABLE.encodeLong(value, target); + } + + @Override + public void appendBytesRef(BytesRef value) { + TopNEncoder.DEFAULT_UNSORTABLE.encodeBytesRef(value, target); + } + } +} diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockBuilderCopyFromTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockBuilderCopyFromTests.java index 752250bce5643..f02b3c028aa58 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockBuilderCopyFromTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockBuilderCopyFromTests.java @@ -31,7 +31,8 @@ public static List params() { || e == ElementType.NULL || e == ElementType.DOC || e == ElementType.COMPOSITE - || e == ElementType.EXPONENTIAL_HISTOGRAM) { + || e == ElementType.EXPONENTIAL_HISTOGRAM + || e == ElementType.TDIGEST) { continue; } for (boolean nullAllowed : new boolean[] { false, true }) { diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockBuilderTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockBuilderTests.java index 45f452d7ca188..d5806991d7fa3 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockBuilderTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockBuilderTests.java @@ -43,7 +43,7 @@ public static List params() { private static boolean supportsVectors(ElementType type) { return switch (type) { - case AGGREGATE_METRIC_DOUBLE, EXPONENTIAL_HISTOGRAM -> false; + case AGGREGATE_METRIC_DOUBLE, EXPONENTIAL_HISTOGRAM, TDIGEST -> false; default -> true; }; } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockMultiValuedTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockMultiValuedTests.java index a01ce9664110f..a62c334916189 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockMultiValuedTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockMultiValuedTests.java @@ -50,6 +50,7 @@ public static List params() { || e == ElementType.DOC || e == ElementType.COMPOSITE || e == ElementType.EXPONENTIAL_HISTOGRAM // TODO(b/133393): Enable tests once the block supports lookup + || e == ElementType.TDIGEST || e == ElementType.AGGREGATE_METRIC_DOUBLE) { continue; } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/VectorBuilderTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/VectorBuilderTests.java index 91ec105f20091..5df8330034ed2 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/VectorBuilderTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/VectorBuilderTests.java @@ -35,6 +35,7 @@ public static List params() { || e == ElementType.DOC || e == ElementType.COMPOSITE || e == ElementType.AGGREGATE_METRIC_DOUBLE + || e == ElementType.TDIGEST || e == ElementType.EXPONENTIAL_HISTOGRAM) { continue; } @@ -119,7 +120,8 @@ public void testCranky() { private Vector.Builder vectorBuilder(int estimatedSize, BlockFactory blockFactory) { return switch (elementType) { - case NULL, DOC, COMPOSITE, AGGREGATE_METRIC_DOUBLE, EXPONENTIAL_HISTOGRAM, UNKNOWN -> throw new UnsupportedOperationException(); + case NULL, DOC, COMPOSITE, AGGREGATE_METRIC_DOUBLE, EXPONENTIAL_HISTOGRAM, TDIGEST, UNKNOWN -> + throw new UnsupportedOperationException(); case BOOLEAN -> blockFactory.newBooleanVectorBuilder(estimatedSize); case BYTES_REF -> blockFactory.newBytesRefVectorBuilder(estimatedSize); case FLOAT -> blockFactory.newFloatVectorBuilder(estimatedSize); @@ -131,7 +133,7 @@ private Vector.Builder vectorBuilder(int estimatedSize, BlockFactory blockFactor private void fill(Vector.Builder builder, Vector from) { switch (elementType) { - case NULL, DOC, COMPOSITE, AGGREGATE_METRIC_DOUBLE, UNKNOWN -> throw new UnsupportedOperationException(); + case NULL, DOC, COMPOSITE, AGGREGATE_METRIC_DOUBLE, TDIGEST, UNKNOWN -> throw new UnsupportedOperationException(); case BOOLEAN -> { for (int p = 0; p < from.getPositionCount(); p++) { ((BooleanVector.Builder) builder).appendBoolean(((BooleanVector) from).getBoolean(p)); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/VectorFixedBuilderTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/VectorFixedBuilderTests.java index b72ee46c4b828..5c49f9e1913fa 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/VectorFixedBuilderTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/VectorFixedBuilderTests.java @@ -35,6 +35,7 @@ public static List params() { || elementType == ElementType.DOC || elementType == ElementType.BYTES_REF || elementType == ElementType.AGGREGATE_METRIC_DOUBLE + || elementType == ElementType.TDIGEST || elementType == ElementType.EXPONENTIAL_HISTOGRAM) { continue; } @@ -119,7 +120,7 @@ public void testCranky() { private Vector.Builder vectorBuilder(int size, BlockFactory blockFactory) { return switch (elementType) { - case NULL, BYTES_REF, DOC, COMPOSITE, AGGREGATE_METRIC_DOUBLE, EXPONENTIAL_HISTOGRAM, UNKNOWN -> + case NULL, BYTES_REF, DOC, COMPOSITE, AGGREGATE_METRIC_DOUBLE, EXPONENTIAL_HISTOGRAM, TDIGEST, UNKNOWN -> throw new UnsupportedOperationException(); case BOOLEAN -> blockFactory.newBooleanVectorFixedBuilder(size); case DOUBLE -> blockFactory.newDoubleVectorFixedBuilder(size); @@ -131,7 +132,8 @@ private Vector.Builder vectorBuilder(int size, BlockFactory blockFactory) { private void fill(Vector.Builder builder, Vector from) { switch (elementType) { - case NULL, DOC, COMPOSITE, AGGREGATE_METRIC_DOUBLE, EXPONENTIAL_HISTOGRAM, UNKNOWN -> throw new UnsupportedOperationException(); + case NULL, DOC, COMPOSITE, AGGREGATE_METRIC_DOUBLE, EXPONENTIAL_HISTOGRAM, TDIGEST, UNKNOWN -> + throw new UnsupportedOperationException(); case BOOLEAN -> { for (int p = 0; p < from.getPositionCount(); p++) { ((BooleanVector.FixedBuilder) builder).appendBoolean(((BooleanVector) from).getBoolean(p)); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/lookup/RightChunkedLeftJoinTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/lookup/RightChunkedLeftJoinTests.java index 6ad3cab9f1881..1a1c7748cde3e 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/lookup/RightChunkedLeftJoinTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/lookup/RightChunkedLeftJoinTests.java @@ -230,13 +230,13 @@ private void testRandom(BlockFactory factory) { 1, 10, ElementType[]::new, - () -> RandomBlock.randomElementExcluding(List.of(ElementType.AGGREGATE_METRIC_DOUBLE)) + () -> RandomBlock.randomElementExcluding(List.of(ElementType.AGGREGATE_METRIC_DOUBLE, ElementType.TDIGEST)) ); ElementType[] rightColumns = randomArray( 1, 10, ElementType[]::new, - () -> RandomBlock.randomElementExcluding(List.of(ElementType.AGGREGATE_METRIC_DOUBLE)) + () -> RandomBlock.randomElementExcluding(List.of(ElementType.AGGREGATE_METRIC_DOUBLE, ElementType.TDIGEST)) ); RandomPage left = randomPage(factory, leftColumns, leftSize); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/mvdedupe/MultivalueDedupeTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/mvdedupe/MultivalueDedupeTests.java index 73efcec997eef..8dddbec11bb7e 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/mvdedupe/MultivalueDedupeTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/mvdedupe/MultivalueDedupeTests.java @@ -65,7 +65,8 @@ public static List supportedTypes() { ElementType.COMPOSITE, ElementType.FLOAT, ElementType.AGGREGATE_METRIC_DOUBLE, - ElementType.EXPONENTIAL_HISTOGRAM + ElementType.EXPONENTIAL_HISTOGRAM, + ElementType.TDIGEST )) { continue; } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/ExtractorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/ExtractorTests.java index 60b11e5a290e8..9a4abb824fc86 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/ExtractorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/ExtractorTests.java @@ -130,7 +130,7 @@ public static Iterable parameters() { ) } ); } - case EXPONENTIAL_HISTOGRAM -> + case TDIGEST, EXPONENTIAL_HISTOGRAM -> // multi values are not supported cases.add(valueTestCase("single " + e, e, TopNEncoder.DEFAULT_UNSORTABLE, () -> BlockTestUtils.randomValue(e))); case NULL -> { @@ -211,7 +211,8 @@ public void testNotInKey() { result.decodeValue(values); assertThat(values.length, equalTo(0)); - assertThat(result.build(), equalTo(value)); + Block resultBlock = result.build(); + assertThat(resultBlock, equalTo(value)); } public void testInKey() { diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java index 72020925d0faa..28c44113f15a2 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java @@ -82,6 +82,7 @@ import static org.elasticsearch.compute.data.ElementType.FLOAT; import static org.elasticsearch.compute.data.ElementType.INT; import static org.elasticsearch.compute.data.ElementType.LONG; +import static org.elasticsearch.compute.data.ElementType.TDIGEST; import static org.elasticsearch.compute.operator.topn.TopNEncoder.DEFAULT_SORTABLE; import static org.elasticsearch.compute.operator.topn.TopNEncoder.DEFAULT_UNSORTABLE; import static org.elasticsearch.compute.operator.topn.TopNEncoder.UTF8; @@ -535,7 +536,7 @@ public void testCollectAllValues() { encoders.add(DEFAULT_SORTABLE); for (ElementType e : ElementType.values()) { - if (e == ElementType.UNKNOWN || e == COMPOSITE || e == EXPONENTIAL_HISTOGRAM) { + if (e == ElementType.UNKNOWN || e == COMPOSITE || e == EXPONENTIAL_HISTOGRAM || e == TDIGEST) { continue; } elementTypes.add(e); @@ -606,7 +607,7 @@ public void testCollectAllValues_RandomMultiValues() { for (int type = 0; type < blocksCount; type++) { ElementType e = randomFrom(ElementType.values()); - if (e == ElementType.UNKNOWN || e == COMPOSITE || e == AGGREGATE_METRIC_DOUBLE || e == EXPONENTIAL_HISTOGRAM) { + if (e == ElementType.UNKNOWN || e == COMPOSITE || e == AGGREGATE_METRIC_DOUBLE || e == EXPONENTIAL_HISTOGRAM || e == TDIGEST) { continue; } elementTypes.add(e); @@ -1042,7 +1043,8 @@ public void testRandomMultiValuesTopN() { || t == ElementType.DOC || t == COMPOSITE || t == AGGREGATE_METRIC_DOUBLE - || t == EXPONENTIAL_HISTOGRAM, + || t == EXPONENTIAL_HISTOGRAM + || t == TDIGEST, () -> randomFrom(ElementType.values()) ); elementTypes.add(e); diff --git a/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/BlockTestUtils.java b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/BlockTestUtils.java index 9f0a0be60d5f7..a1a1688dd4e64 100644 --- a/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/BlockTestUtils.java +++ b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/BlockTestUtils.java @@ -27,14 +27,19 @@ import org.elasticsearch.compute.data.LongBlock; import org.elasticsearch.compute.data.OrdinalBytesRefBlock; import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.data.TDigestBlock; +import org.elasticsearch.compute.data.TDigestHolder; import org.elasticsearch.core.Releasables; import org.elasticsearch.exponentialhistogram.ExponentialHistogram; import org.elasticsearch.exponentialhistogram.ExponentialHistogramBuilder; import org.elasticsearch.exponentialhistogram.ExponentialHistogramCircuitBreaker; import org.elasticsearch.exponentialhistogram.ReleasableExponentialHistogram; import org.elasticsearch.exponentialhistogram.ZeroBucket; +import org.elasticsearch.search.aggregations.metrics.TDigestState; +import org.elasticsearch.tdigest.Centroid; import org.hamcrest.Matcher; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -45,9 +50,11 @@ import static org.elasticsearch.compute.data.BlockUtils.toJavaObject; import static org.elasticsearch.test.ESTestCase.between; +import static org.elasticsearch.test.ESTestCase.fail; import static org.elasticsearch.test.ESTestCase.randomBoolean; import static org.elasticsearch.test.ESTestCase.randomDouble; import static org.elasticsearch.test.ESTestCase.randomFloat; +import static org.elasticsearch.test.ESTestCase.randomGaussianDouble; import static org.elasticsearch.test.ESTestCase.randomInt; import static org.elasticsearch.test.ESTestCase.randomIntBetween; import static org.elasticsearch.test.ESTestCase.randomLong; @@ -80,6 +87,7 @@ public static Object randomValue(ElementType e) { between(0, Integer.MAX_VALUE) ); case EXPONENTIAL_HISTOGRAM -> randomExponentialHistogram(); + case TDIGEST -> randomTDigest(); case NULL -> null; case COMPOSITE -> throw new IllegalArgumentException("can't make random values for composite"); case UNKNOWN -> throw new IllegalArgumentException("can't make random values for [" + e + "]"); @@ -320,6 +328,7 @@ public static List> valuesAtPositions(Block block, int from, int to i++, new ExponentialHistogramScratch() ); + case TDIGEST -> ((TDigestBlock) block).getTDigestHolder(i++); default -> throw new IllegalArgumentException("unsupported element type [" + block.elementType() + "]"); }); } @@ -414,6 +423,39 @@ public static ExponentialHistogram randomExponentialHistogram() { return histo; } + public static TDigestHolder randomTDigest() { + // TODO: This is mostly copied from TDigestFieldMapperTests; refactor it. + int size = between(1, 100); + // Note - we use TDigestState to build an actual t-digest for realistic values here + TDigestState digest = TDigestState.createWithoutCircuitBreaking(100); + for (int i = 0; i < size; i++) { + double sample = randomGaussianDouble(); + int count = randomIntBetween(1, Integer.MAX_VALUE); + digest.add(sample, count); + } + List centroids = new ArrayList<>(); + List counts = new ArrayList<>(); + double sum = 0.0; + long valueCount = 0L; + for (Centroid c : digest.centroids()) { + centroids.add(c.mean()); + counts.add(c.count()); + sum += c.mean() * c.count(); + valueCount += c.count(); + } + double min = digest.getMin(); + double max = digest.getMax(); + + TDigestHolder returnValue = null; + try { + returnValue = new TDigestHolder(centroids, counts, min, max, sum, valueCount); + } catch (IOException e) { + // This is a test util, so we're just going to fail the test here + fail(e); + } + return returnValue; + } + private static int dedupe(Map dedupe, BytesRefVector.Builder bytes, BytesRef v) { Integer current = dedupe.get(v); if (current != null) { diff --git a/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/RandomBlock.java b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/RandomBlock.java index 8281a07aa4df2..eff939b4554fa 100644 --- a/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/RandomBlock.java +++ b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/RandomBlock.java @@ -19,6 +19,8 @@ import org.elasticsearch.compute.data.FloatBlock; import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.data.TDigestBlockBuilder; +import org.elasticsearch.compute.data.TDigestHolder; import org.elasticsearch.exponentialhistogram.ExponentialHistogram; import org.elasticsearch.geo.GeometryTestUtils; import org.elasticsearch.geo.ShapeTestUtils; @@ -89,7 +91,7 @@ public static RandomBlock randomBlock( ) { List> values = new ArrayList<>(); Block.MvOrdering mvOrdering = Block.MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING; - if (elementType == ElementType.EXPONENTIAL_HISTOGRAM) { + if (elementType == ElementType.EXPONENTIAL_HISTOGRAM || elementType == ElementType.TDIGEST) { // histograms do not support multi-values // TODO(b/133393) remove this when we support multi-values in exponential histogram blocks minValuesPerPosition = Math.min(1, minValuesPerPosition); @@ -172,6 +174,12 @@ public static RandomBlock randomBlock( b.append(histogram); valuesAtPosition.add(histogram); } + case TDIGEST -> { + TDigestBlockBuilder b = (TDigestBlockBuilder) builder; + TDigestHolder digest = BlockTestUtils.randomTDigest(); + b.append(digest); + valuesAtPosition.add(digest); + } default -> throw new IllegalArgumentException("unsupported element type [" + elementType + "]"); } } diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java index 8bcc45c3ff406..31413ea06d5cc 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java @@ -372,4 +372,20 @@ protected boolean supportsExponentialHistograms() { throw new RuntimeException(e); } } + + @Override + protected boolean supportsTDigestField() { + try { + return RestEsqlTestCase.hasCapabilities( + client(), + List.of(EsqlCapabilities.Cap.TDIGEST_FIELD_TYPE_BASIC_FUNCTIONALITY.capabilityName()) + ) + && RestEsqlTestCase.hasCapabilities( + remoteClusterClient(), + List.of(EsqlCapabilities.Cap.TDIGEST_FIELD_TYPE_BASIC_FUNCTIONALITY.capabilityName()) + ); + } catch (IOException e) { + throw new RuntimeException(e); + } + } } diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlSpecIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlSpecIT.java index 0469741b935b0..5e303e7382547 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlSpecIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlSpecIT.java @@ -62,6 +62,14 @@ protected boolean supportsExponentialHistograms() { ); } + @Override + protected boolean supportsTDigestField() { + return RestEsqlTestCase.hasCapabilities( + client(), + List.of(EsqlCapabilities.Cap.TDIGEST_FIELD_TYPE_BASIC_FUNCTIONALITY.capabilityName()) + ); + } + @Before public void configureChunks() throws IOException { assumeTrue("test clusters were broken", testClustersOk); diff --git a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/EsqlSpecTestCase.java b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/EsqlSpecTestCase.java index 40cca58a481f8..b18880968aa72 100644 --- a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/EsqlSpecTestCase.java +++ b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/EsqlSpecTestCase.java @@ -184,7 +184,8 @@ public void setup() { supportsSourceFieldMapping(), supportsInferenceTestService(), false, - supportsExponentialHistograms() + supportsExponentialHistograms(), + supportsTDigestField() ); return null; }); @@ -293,6 +294,13 @@ protected boolean supportsExponentialHistograms() { ); } + protected boolean supportsTDigestField() { + return RestEsqlTestCase.hasCapabilities( + client(), + List.of(EsqlCapabilities.Cap.TDIGEST_FIELD_TYPE_BASIC_FUNCTIONALITY.capabilityName()) + ); + } + protected void doTest() throws Throwable { doTest(testCase.query); } diff --git a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/generative/GenerativeRestTest.java b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/generative/GenerativeRestTest.java index c5e81a42e0616..24a9c88a026fe 100644 --- a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/generative/GenerativeRestTest.java +++ b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/generative/GenerativeRestTest.java @@ -267,7 +267,7 @@ private static List originalTypes(Map x) { } private List availableIndices() throws IOException { - return availableDatasetsForEs(true, supportsSourceFieldMapping(), false, requiresTimeSeries(), false).stream() + return availableDatasetsForEs(true, supportsSourceFieldMapping(), false, requiresTimeSeries(), false, false).stream() .filter(x -> x.requiresInferenceEndpoint() == false) .map(x -> x.indexName()) .toList(); diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestUtils.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestUtils.java index e50c23b187327..30bead4b0358b 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestUtils.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestUtils.java @@ -15,6 +15,7 @@ import org.elasticsearch.common.time.DateFormatters; import org.elasticsearch.common.time.DateUtils; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.xcontent.XContentParserUtils; import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; @@ -22,6 +23,7 @@ import org.elasticsearch.compute.data.BlockUtils.BuilderWrapper; import org.elasticsearch.compute.data.ElementType; import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.data.TDigestHolder; import org.elasticsearch.core.Booleans; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; @@ -32,8 +34,10 @@ import org.elasticsearch.exponentialhistogram.ExponentialHistogramXContent; import org.elasticsearch.geometry.utils.Geohash; import org.elasticsearch.h3.H3; +import org.elasticsearch.index.mapper.DocumentParsingException; import org.elasticsearch.logging.Logger; import org.elasticsearch.search.aggregations.bucket.geogrid.GeoTileUtils; +import org.elasticsearch.tdigest.parsing.TDigestParser; import org.elasticsearch.test.VersionUtils; import org.elasticsearch.xcontent.XContentParser; import org.elasticsearch.xcontent.XContentParserConfiguration; @@ -512,6 +516,7 @@ public enum Type { ), DENSE_VECTOR(Float::parseFloat, Float.class, false), EXPONENTIAL_HISTOGRAM(CsvTestUtils::parseExponentialHistogram, ExponentialHistogram.class), + TDIGEST(CsvTestUtils::parseTDigest, TDigestHolder.class), UNSUPPORTED(Type::convertUnsupported, Void.class); private static Void convertUnsupported(String s) { @@ -608,6 +613,7 @@ public static Type asType(ElementType elementType, Type actualType) { case COMPOSITE -> throw new IllegalArgumentException("can't assert on composite blocks"); case AGGREGATE_METRIC_DOUBLE -> AGGREGATE_METRIC_DOUBLE; case EXPONENTIAL_HISTOGRAM -> EXPONENTIAL_HISTOGRAM; + case TDIGEST -> TDIGEST; case UNKNOWN -> throw new IllegalArgumentException("Unknown block types cannot be handled"); }; } @@ -733,4 +739,25 @@ private static ExponentialHistogram parseExponentialHistogram(@Nullable String j throw new IllegalArgumentException(e); } } + + private static TDigestHolder parseTDigest(@Nullable String json) { + if (json == null) { + return null; + } + try (XContentParser parser = JsonXContent.jsonXContent.createParser(XContentParserConfiguration.EMPTY, json)) { + if (parser.nextToken() != XContentParser.Token.START_OBJECT) { + throw new IllegalArgumentException("Expected START_OBJECT but found: " + parser.currentToken()); + } + parser.nextToken(); + TDigestParser.ParsedTDigest parsed = TDigestParser.parse( + "field from test data", + parser, + DocumentParsingException::new, + XContentParserUtils::parsingException + ); + return new TDigestHolder(parsed.centroids(), parsed.counts(), parsed.min(), parsed.max(), parsed.sum(), parsed.count()); + } catch (IOException e) { + throw new IllegalArgumentException(e); + } + } } diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java index c6ba01782e10f..16459f53c6a4a 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java @@ -179,6 +179,7 @@ public class CsvTestsDataLoader { "exp_histo_sample-mappings.json", "exp_histo_sample.csv" ).withSetting("exp_histo_sample-settings.json"); + private static final TestDataset TDIGEST_STANDARD_INDEX = new TestDataset("tdigest_standard_index"); public static final Map CSV_DATASET_MAP = Map.ofEntries( Map.entry(EMPLOYEES.indexName, EMPLOYEES), @@ -248,7 +249,8 @@ public class CsvTestsDataLoader { Map.entry(COLORS_CMYK_LOOKUP.indexName, COLORS_CMYK_LOOKUP), Map.entry(MULTI_COLUMN_JOINABLE.indexName, MULTI_COLUMN_JOINABLE), Map.entry(MULTI_COLUMN_JOINABLE_LOOKUP.indexName, MULTI_COLUMN_JOINABLE_LOOKUP), - Map.entry(EXP_HISTO_SAMPLE.indexName, EXP_HISTO_SAMPLE) + Map.entry(EXP_HISTO_SAMPLE.indexName, EXP_HISTO_SAMPLE), + Map.entry(TDIGEST_STANDARD_INDEX.indexName, TDIGEST_STANDARD_INDEX) ); private static final EnrichConfig LANGUAGES_ENRICH = new EnrichConfig("languages_policy", "enrich-policy-languages.json"); @@ -342,7 +344,7 @@ public static void main(String[] args) throws IOException { } try (RestClient client = builder.build()) { - loadDataSetIntoEs(client, true, true, false, false, true, (restClient, indexName, indexMapping, indexSettings) -> { + loadDataSetIntoEs(client, true, true, false, false, true, true, (restClient, indexName, indexMapping, indexSettings) -> { // don't use ESRestTestCase methods here or, if you do, test running the main method before making the change StringBuilder jsonBody = new StringBuilder("{"); if (indexSettings != null && indexSettings.isEmpty() == false) { @@ -366,7 +368,8 @@ public static Set availableDatasetsForEs( boolean supportsSourceFieldMapping, boolean inferenceEnabled, boolean requiresTimeSeries, - boolean exponentialHistogramFieldSupported + boolean exponentialHistogramFieldSupported, + boolean tDigestFieldSupported ) throws IOException { Set testDataSets = new HashSet<>(); @@ -375,7 +378,8 @@ public static Set availableDatasetsForEs( && (supportsIndexModeLookup || isLookupDataset(dataset) == false) && (supportsSourceFieldMapping || isSourceMappingDataset(dataset) == false) && (requiresTimeSeries == false || isTimeSeries(dataset)) - && (exponentialHistogramFieldSupported || containsExponentialHistogramFields(dataset) == false)) { + && (exponentialHistogramFieldSupported || containsExponentialHistogramFields(dataset) == false) + && (tDigestFieldSupported || containsTDigestFields(dataset) == false)) { testDataSets.add(dataset); } } @@ -420,6 +424,27 @@ private static boolean containsExponentialHistogramFields(TestDataset dataset) t return false; } + private static boolean containsTDigestFields(TestDataset dataset) throws IOException { + if (dataset.mappingFileName() == null) { + return false; + } + String mappingJsonText = readTextFile(getResource("/" + dataset.mappingFileName())); + JsonNode mappingNode = new ObjectMapper().readTree(mappingJsonText); + JsonNode properties = mappingNode.get("properties"); + if (properties != null) { + for (var fieldWithValue : properties.properties()) { + JsonNode fieldProperties = fieldWithValue.getValue(); + if (fieldProperties != null) { + JsonNode typeNode = fieldProperties.get("type"); + if (typeNode != null && typeNode.asText().equals("tdigest")) { + return true; + } + } + } + } + return false; + } + private static boolean isTimeSeries(TestDataset dataset) throws IOException { Settings settings = dataset.readSettingsFile(); String mode = settings.get("index.mode"); @@ -432,7 +457,7 @@ public static void loadDataSetIntoEs( boolean supportsSourceFieldMapping, boolean inferenceEnabled ) throws IOException { - loadDataSetIntoEs(client, supportsIndexModeLookup, supportsSourceFieldMapping, inferenceEnabled, false, false); + loadDataSetIntoEs(client, supportsIndexModeLookup, supportsSourceFieldMapping, inferenceEnabled, false, false, false); } public static void loadDataSetIntoEs( @@ -441,7 +466,8 @@ public static void loadDataSetIntoEs( boolean supportsSourceFieldMapping, boolean inferenceEnabled, boolean timeSeriesOnly, - boolean exponentialHistogramFieldSupported + boolean exponentialHistogramFieldSupported, + boolean tDigestFieldSupported ) throws IOException { loadDataSetIntoEs( client, @@ -450,6 +476,7 @@ public static void loadDataSetIntoEs( inferenceEnabled, timeSeriesOnly, exponentialHistogramFieldSupported, + tDigestFieldSupported, (restClient, indexName, indexMapping, indexSettings) -> { ESRestTestCase.createIndex(restClient, indexName, indexSettings, indexMapping, null); } @@ -463,6 +490,7 @@ private static void loadDataSetIntoEs( boolean inferenceEnabled, boolean timeSeriesOnly, boolean exponentialHistogramFieldSupported, + boolean tDigestFieldSupported, IndexCreator indexCreator ) throws IOException { Logger logger = LogManager.getLogger(CsvTestsDataLoader.class); @@ -474,7 +502,8 @@ private static void loadDataSetIntoEs( supportsSourceFieldMapping, inferenceEnabled, timeSeriesOnly, - exponentialHistogramFieldSupported + exponentialHistogramFieldSupported, + tDigestFieldSupported )) { load(client, dataset, logger, indexCreator); loadedDatasets.add(dataset.indexName); diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/data/tdigest_standard_index.csv b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/data/tdigest_standard_index.csv new file mode 100644 index 0000000000000..34e7cbdb61e63 --- /dev/null +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/data/tdigest_standard_index.csv @@ -0,0 +1,2 @@ +@timestamp:date,instance:keyword,responseTime:tdigest +2025-01-01T00:00:00Z,hand-rolled,{"centroids":[0.1\,0.2\,0.3\,0.4\,0.5]\,"counts":[3\,7\,23\,12\,6]} diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/mapping-tdigest_standard_index.json b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/mapping-tdigest_standard_index.json new file mode 100644 index 0000000000000..9ed14cd36e622 --- /dev/null +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/mapping-tdigest_standard_index.json @@ -0,0 +1,13 @@ +{ + "properties": { + "@timestamp": { + "type": "date" + }, + "instance": { + "type": "keyword" + }, + "responseTime": { + "type": "tdigest" + } + } +} diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/tdigest.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/tdigest.csv-spec new file mode 100644 index 0000000000000..1ca152a4d76aa --- /dev/null +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/tdigest.csv-spec @@ -0,0 +1,9 @@ +Make sure we can even load tdigest data +required_capability: tdigest_field_type_basic_functionality + +FROM tdigest_standard_index | KEEP @timestamp,instance; + +@timestamp:date | instance:keyword +2025-01-01T00:00:00Z | hand-rolled + +; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index dc5e4e1bf2203..811442b6aca21 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -20,6 +20,7 @@ import java.util.Set; import static org.elasticsearch.xpack.esql.core.plugin.EsqlCorePlugin.EXPONENTIAL_HISTOGRAM_FEATURE_FLAG; +import static org.elasticsearch.xpack.esql.core.plugin.EsqlCorePlugin.T_DIGEST_ESQL_SUPPORT; /** * A {@link Set} of "capabilities" supported by the {@link RestEsqlQueryAction} @@ -1580,6 +1581,8 @@ public enum Cap { */ EXPONENTIAL_HISTOGRAM_PRE_TECH_PREVIEW_V7(EXPONENTIAL_HISTOGRAM_FEATURE_FLAG), + TDIGEST_FIELD_TYPE_BASIC_FUNCTIONALITY(T_DIGEST_ESQL_SUPPORT), + /** * Create new block when filtering OrdinalBytesRefBlock */ diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AggregateMapper.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AggregateMapper.java index 3981b71f316b0..7c35cbc04aba1 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AggregateMapper.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AggregateMapper.java @@ -113,7 +113,7 @@ private static DataType toDataType(ElementType elementType) { case DOUBLE -> DataType.DOUBLE; case DOC -> DataType.DOC_DATA_TYPE; case EXPONENTIAL_HISTOGRAM -> DataType.EXPONENTIAL_HISTOGRAM; - case FLOAT, NULL, COMPOSITE, AGGREGATE_METRIC_DOUBLE, UNKNOWN -> throw new EsqlIllegalArgumentException( + case FLOAT, NULL, COMPOSITE, AGGREGATE_METRIC_DOUBLE, TDIGEST, UNKNOWN -> throw new EsqlIllegalArgumentException( "unsupported agg type: " + elementType ); };