diff --git a/libs/tdigest/src/test/java/org/elasticsearch/tdigest/BigCountTests.java b/libs/tdigest/src/test/java/org/elasticsearch/tdigest/BigCountTests.java index ac39bf0f7e8b5..8cd7a40767363 100644 --- a/libs/tdigest/src/test/java/org/elasticsearch/tdigest/BigCountTests.java +++ b/libs/tdigest/src/test/java/org/elasticsearch/tdigest/BigCountTests.java @@ -21,10 +21,12 @@ package org.elasticsearch.tdigest; +import static org.hamcrest.Matchers.lessThanOrEqualTo; + public abstract class BigCountTests extends TDigestTestCase { public void testBigMerge() { - try (TDigest digest = createDigest()) { + try (TDigest digest = createDigest(100)) { for (int i = 0; i < 5; i++) { try (TDigest digestToMerge = getDigest()) { digest.add(digestToMerge); @@ -35,13 +37,25 @@ public void testBigMerge() { } } + /** + * Verify that, at a range of compression values, the size of the produced digest is not much larger than 10 times the compression + */ + public void testCompression() { + for (int compression : new int[] { 100, 500, 1000, 10000 }) { + try (TDigest digest = createDigest(compression)) { + addData(digest); + assertThat("Compression = " + compression, digest.centroidCount(), lessThanOrEqualTo(compression * 10)); + } + } + } + private TDigest getDigest() { - TDigest digest = createDigest(); + TDigest digest = createDigest(100); addData(digest); return digest; } - public TDigest createDigest() { + public TDigest createDigest(int compression) { throw new IllegalStateException("Should have over-ridden createDigest"); } diff --git a/libs/tdigest/src/test/java/org/elasticsearch/tdigest/BigCountTestsMergingDigestTests.java b/libs/tdigest/src/test/java/org/elasticsearch/tdigest/BigCountTestsMergingDigestTests.java index 7a7094691fb95..caf2a43c27c01 100644 --- a/libs/tdigest/src/test/java/org/elasticsearch/tdigest/BigCountTestsMergingDigestTests.java +++ b/libs/tdigest/src/test/java/org/elasticsearch/tdigest/BigCountTestsMergingDigestTests.java @@ -23,7 +23,7 @@ public class BigCountTestsMergingDigestTests extends BigCountTests { @Override - public TDigest createDigest() { - return TDigest.createMergingDigest(arrays(), 100); + public TDigest createDigest(int compression) { + return TDigest.createMergingDigest(arrays(), compression); } } diff --git a/libs/tdigest/src/test/java/org/elasticsearch/tdigest/BigCountTestsTreeDigestTests.java b/libs/tdigest/src/test/java/org/elasticsearch/tdigest/BigCountTestsTreeDigestTests.java index 2978e1c98bcdb..fdfdda1f97b5d 100644 --- a/libs/tdigest/src/test/java/org/elasticsearch/tdigest/BigCountTestsTreeDigestTests.java +++ b/libs/tdigest/src/test/java/org/elasticsearch/tdigest/BigCountTestsTreeDigestTests.java @@ -23,7 +23,7 @@ public class BigCountTestsTreeDigestTests extends BigCountTests { @Override - public TDigest createDigest() { - return TDigest.createAvlTreeDigest(arrays(), 100); + public TDigest createDigest(int compression) { + return TDigest.createAvlTreeDigest(arrays(), compression); } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/TDigestState.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/TDigestState.java index 239d794fe6906..38aea6a6d4eb4 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/TDigestState.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/TDigestState.java @@ -42,7 +42,7 @@ public class TDigestState implements Releasable, Accountable { private final TDigest tdigest; // Supported tdigest types. - protected enum Type { + public enum Type { HYBRID, AVL_TREE, MERGING, diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/AnalyticsPlugin.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/AnalyticsPlugin.java index cb864379a1fc9..b0ad390a36d36 100644 --- a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/AnalyticsPlugin.java +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/AnalyticsPlugin.java @@ -23,6 +23,7 @@ import org.elasticsearch.xpack.analytics.cumulativecardinality.CumulativeCardinalityPipelineAggregationBuilder; import org.elasticsearch.xpack.analytics.cumulativecardinality.InternalSimpleLongValue; import org.elasticsearch.xpack.analytics.mapper.HistogramFieldMapper; +import org.elasticsearch.xpack.analytics.mapper.TDigestFieldMapper; import org.elasticsearch.xpack.analytics.movingPercentiles.MovingPercentilesPipelineAggregationBuilder; import org.elasticsearch.xpack.analytics.multiterms.InternalMultiTerms; import org.elasticsearch.xpack.analytics.multiterms.MultiTermsAggregationBuilder; @@ -140,6 +141,14 @@ public List> getSettings() { @Override public Map getMappers() { + if (TDigestFieldMapper.TDIGEST_FIELD_MAPPER.isEnabled()) { + return Map.of( + HistogramFieldMapper.CONTENT_TYPE, + HistogramFieldMapper.PARSER, + TDigestFieldMapper.CONTENT_TYPE, + TDigestFieldMapper.PARSER + ); + } return Map.of(HistogramFieldMapper.CONTENT_TYPE, HistogramFieldMapper.PARSER); } diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/mapper/ExponentialHistogramParser.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/mapper/ExponentialHistogramParser.java index 886027ec06513..716a395f56374 100644 --- a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/mapper/ExponentialHistogramParser.java +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/mapper/ExponentialHistogramParser.java @@ -113,7 +113,7 @@ private record ParsedZeroBucket(long count, double threshold) { /** * Parses an XContent object into an exponential histogram. - * The parse is expected to point at the next token after {@link XContentParser.Token#START_OBJECT}. + * The parser is expected to point at the next token after {@link XContentParser.Token#START_OBJECT}. * * @param mappedFieldName the name of the field being parsed, used for error messages * @param parser the parser to use diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/mapper/HistogramParser.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/mapper/HistogramParser.java index fde40f09f3cfb..9b657439ca534 100644 --- a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/mapper/HistogramParser.java +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/mapper/HistogramParser.java @@ -31,7 +31,7 @@ public record ParsedHistogram(List values, List counts) {} /** * Parses an XContent object into a histogram. - * The parse is expected to point at the next token after {@link XContentParser.Token#START_OBJECT}. + * The parser is expected to point at the next token after {@link XContentParser.Token#START_OBJECT}. * * @param mappedFieldName the name of the field being parsed, used for error messages * @param parser the parser to use diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/mapper/TDigestFieldMapper.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/mapper/TDigestFieldMapper.java new file mode 100644 index 0000000000000..20149eb1df7d0 --- /dev/null +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/mapper/TDigestFieldMapper.java @@ -0,0 +1,517 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.analytics.mapper; + +import org.apache.lucene.document.BinaryDocValuesField; +import org.apache.lucene.document.Field; +import org.apache.lucene.index.BinaryDocValues; +import org.apache.lucene.index.DocValues; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.SortField; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.TransportVersions; +import org.elasticsearch.common.Explicit; +import org.elasticsearch.common.io.stream.ByteArrayStreamInput; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.FeatureFlag; +import org.elasticsearch.index.fielddata.FieldDataContext; +import org.elasticsearch.index.fielddata.FormattedDocValues; +import org.elasticsearch.index.fielddata.HistogramValue; +import org.elasticsearch.index.fielddata.HistogramValues; +import org.elasticsearch.index.fielddata.IndexFieldData; +import org.elasticsearch.index.fielddata.IndexFieldData.XFieldComparatorSource.Nested; +import org.elasticsearch.index.fielddata.IndexHistogramFieldData; +import org.elasticsearch.index.fielddata.LeafHistogramFieldData; +import org.elasticsearch.index.fielddata.SortedBinaryDocValues; +import org.elasticsearch.index.mapper.CompositeSyntheticFieldLoader; +import org.elasticsearch.index.mapper.DocumentParserContext; +import org.elasticsearch.index.mapper.DocumentParsingException; +import org.elasticsearch.index.mapper.FieldMapper; +import org.elasticsearch.index.mapper.IgnoreMalformedStoredValues; +import org.elasticsearch.index.mapper.IndexType; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.MapperBuilderContext; +import org.elasticsearch.index.mapper.SourceValueFetcher; +import org.elasticsearch.index.mapper.ValueFetcher; +import org.elasticsearch.index.query.SearchExecutionContext; +import org.elasticsearch.script.field.DocValuesScriptFieldFactory; +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.MultiValueMode; +import org.elasticsearch.search.aggregations.metrics.TDigestState; +import org.elasticsearch.search.sort.BucketedSort; +import org.elasticsearch.search.sort.SortOrder; +import org.elasticsearch.xcontent.CopyingXContentParser; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xcontent.XContentSubParser; +import org.elasticsearch.xpack.analytics.aggregations.support.AnalyticsValuesSourceType; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Map; + +import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; + +/** + * Field Mapper for pre-aggregated histograms. + */ +public class TDigestFieldMapper extends FieldMapper { + public static final FeatureFlag TDIGEST_FIELD_MAPPER = new FeatureFlag("tdigest_field_mapper"); + + public static final String CENTROIDS_NAME = "centroids"; + public static final String COUNTS_NAME = "counts"; + public static final String CONTENT_TYPE = "tdigest"; + + private static TDigestFieldMapper toType(FieldMapper in) { + return (TDigestFieldMapper) in; + } + + public static class Builder extends FieldMapper.Builder { + private static final int DEFAULT_COMPRESSION = 100; + private static final int MAXIMUM_COMPRESSION = 10000; + + private final Parameter> meta = Parameter.metaParam(); + private final Parameter> ignoreMalformed; + private final Parameter digestType; + private final Parameter compression; + + public Builder(String name, boolean ignoreMalformedByDefault) { + super(name); + this.ignoreMalformed = Parameter.explicitBoolParam( + "ignore_malformed", + true, + m -> toType(m).ignoreMalformed, + ignoreMalformedByDefault + ); + this.digestType = Parameter.enumParam( + "digest_type", + false, + m -> toType(m).digestType, + TDigestState.Type.HYBRID, + TDigestState.Type.class + ); + this.compression = Parameter.intParam("compression", false, m -> toType(m).compression, DEFAULT_COMPRESSION).addValidator(c -> { + if (c <= 0 || c > MAXIMUM_COMPRESSION) { + throw new IllegalArgumentException( + "compression must be a positive integer between 1 and " + MAXIMUM_COMPRESSION + " was [" + c + "]" + ); + } + }); + } + + @Override + protected Parameter[] getParameters() { + return new Parameter[] { digestType, compression, ignoreMalformed, meta }; + } + + @Override + public TDigestFieldMapper build(MapperBuilderContext context) { + return new TDigestFieldMapper( + leafName(), + new TDigestFieldType(context.buildFullName(leafName()), meta.getValue()), + builderParams(this, context), + this + ); + } + } + + public static final TypeParser PARSER = new TypeParser( + (n, c) -> new Builder(n, IGNORE_MALFORMED_SETTING.get(c.getSettings())), + notInMultiFields(CONTENT_TYPE) + ); + + private final Explicit ignoreMalformed; + private final boolean ignoreMalformedByDefault; + private final TDigestState.Type digestType; + private final int compression; + + public TDigestFieldMapper(String simpleName, MappedFieldType mappedFieldType, BuilderParams builderParams, Builder builder) { + super(simpleName, mappedFieldType, builderParams); + this.ignoreMalformed = builder.ignoreMalformed.getValue(); + this.ignoreMalformedByDefault = builder.ignoreMalformed.getDefaultValue().value(); + this.digestType = builder.digestType.getValue(); + this.compression = builder.compression.getValue(); + } + + @Override + public boolean ignoreMalformed() { + return ignoreMalformed.value(); + } + + public TDigestState.Type digestType() { + return digestType; + } + + public int compression() { + return compression; + } + + @Override + protected String contentType() { + return CONTENT_TYPE; + } + + @Override + public FieldMapper.Builder getMergeBuilder() { + return new Builder(leafName(), ignoreMalformedByDefault).init(this); + } + + @Override + protected void parseCreateField(DocumentParserContext context) { + throw new UnsupportedOperationException("Parsing is implemented in parse(), this method should NEVER be called"); + } + + public static class TDigestFieldType extends MappedFieldType { + + public TDigestFieldType(String name, Map meta) { + super(name, IndexType.docValuesOnly(), false, meta); + } + + @Override + public String typeName() { + return CONTENT_TYPE; + } + + @Override + public ValueFetcher valueFetcher(SearchExecutionContext context, String format) { + return SourceValueFetcher.identity(name(), context, format); + } + + @Override + public boolean isSearchable() { + return false; + } + + @Override + public IndexFieldData.Builder fielddataBuilder(FieldDataContext fieldDataContext) { + failIfNoDocValues(); + // TODO - This needs to be changed to a custom values source type + return (cache, breakerService) -> new IndexHistogramFieldData(name(), AnalyticsValuesSourceType.HISTOGRAM) { + + @Override + public LeafHistogramFieldData load(LeafReaderContext context) { + return new LeafHistogramFieldData() { + @Override + public HistogramValues getHistogramValues() throws IOException { + try { + final BinaryDocValues values = DocValues.getBinary(context.reader(), fieldName); + final InternalHistogramValue value = new InternalHistogramValue(); + return new HistogramValues() { + + @Override + public boolean advanceExact(int doc) throws IOException { + return values.advanceExact(doc); + } + + @Override + public HistogramValue histogram() throws IOException { + try { + value.reset(values.binaryValue()); + return value; + } catch (IOException e) { + throw new IOException("Cannot load doc value", e); + } + } + }; + } catch (IOException e) { + throw new IOException("Cannot load doc values", e); + } + } + + @Override + public DocValuesScriptFieldFactory getScriptFieldFactory(String name) { + throw new UnsupportedOperationException("The [" + CONTENT_TYPE + "] field does not " + "support scripts"); + } + + @Override + public FormattedDocValues getFormattedValues(DocValueFormat format) { + try { + final BinaryDocValues values = DocValues.getBinary(context.reader(), fieldName); + final InternalHistogramValue value = new InternalHistogramValue(); + return new FormattedDocValues() { + @Override + public boolean advanceExact(int docId) throws IOException { + return values.advanceExact(docId); + } + + @Override + public int docValueCount() { + return 1; + } + + @Override + public Object nextValue() throws IOException { + value.reset(values.binaryValue()); + return value; + } + }; + } catch (IOException e) { + throw new UncheckedIOException("Unable to loead histogram doc values", e); + } + } + + @Override + public SortedBinaryDocValues getBytesValues() { + throw new UnsupportedOperationException( + "String representation of doc values " + "for [" + CONTENT_TYPE + "] fields is not supported" + ); + } + + @Override + public long ramBytesUsed() { + return 0; // Unknown + } + + }; + } + + @Override + public LeafHistogramFieldData loadDirect(LeafReaderContext context) { + return load(context); + } + + @Override + public SortField sortField(Object missingValue, MultiValueMode sortMode, Nested nested, boolean reverse) { + throw new UnsupportedOperationException("can't sort on the [" + CONTENT_TYPE + "] field"); + } + + @Override + public BucketedSort newBucketedSort( + BigArrays bigArrays, + Object missingValue, + MultiValueMode sortMode, + Nested nested, + SortOrder sortOrder, + DocValueFormat format, + int bucketSize, + BucketedSort.ExtraData extra + ) { + throw new IllegalArgumentException("can't sort on the [" + CONTENT_TYPE + "] field"); + } + }; + } + + @Override + public Query termQuery(Object value, SearchExecutionContext context) { + throw new IllegalArgumentException( + "[" + CONTENT_TYPE + "] field do not support searching, " + "use dedicated aggregations instead: [" + name() + "]" + ); + } + } + + @Override + protected boolean supportsParsingObject() { + return true; + } + + @Override + public void parse(DocumentParserContext context) throws IOException { + context.path().add(leafName()); + + boolean shouldStoreMalformedDataForSyntheticSource = context.mappingLookup().isSourceSynthetic() && ignoreMalformed(); + XContentParser.Token token; + XContentSubParser subParser = null; + XContentBuilder malformedDataForSyntheticSource = null; + + try { + token = context.parser().currentToken(); + if (token == XContentParser.Token.VALUE_NULL) { + context.path().remove(); + return; + } + // should be an object + ensureExpectedToken(XContentParser.Token.START_OBJECT, token, context.parser()); + if (shouldStoreMalformedDataForSyntheticSource) { + var copyingParser = new CopyingXContentParser(context.parser()); + malformedDataForSyntheticSource = copyingParser.getBuilder(); + subParser = new XContentSubParser(copyingParser); + } else { + subParser = new XContentSubParser(context.parser()); + } + subParser.nextToken(); + // TODO: Here we should build a t-digest out of the input, based on the settings on the field + TDigestParser.ParsedHistogram parsedHistogram = TDigestParser.parse(fullPath(), subParser); + + BytesStreamOutput streamOutput = new BytesStreamOutput(); + for (int i = 0; i < parsedHistogram.centroids().size(); i++) { + long count = parsedHistogram.counts().get(i); + assert count >= 0; + // we do not add elements with count == 0 + if (count > 0) { + streamOutput.writeVLong(count); + streamOutput.writeLong(Double.doubleToRawLongBits(parsedHistogram.centroids().get(i))); + } + } + BytesRef docValue = streamOutput.bytes().toBytesRef(); + Field field = new BinaryDocValuesField(fullPath(), docValue); + if (context.doc().getByKey(fieldType().name()) != null) { + throw new IllegalArgumentException( + "Field [" + + fullPath() + + "] of type [" + + typeName() + + "] doesn't support indexing multiple values for the same field in the same document" + ); + } + context.doc().addWithKey(fieldType().name(), field); + + } catch (Exception ex) { + if (ignoreMalformed.value() == false) { + throw new DocumentParsingException( + context.parser().getTokenLocation(), + "failed to parse field [" + fieldType().name() + "] of type [" + fieldType().typeName() + "]", + ex + ); + } + + if (subParser != null) { + // close the subParser so we advance to the end of the object + subParser.close(); + } else if (shouldStoreMalformedDataForSyntheticSource) { + // We have a malformed value, but it's not an object given that `subParser` is null. + // So we just remember whatever it is. + malformedDataForSyntheticSource = XContentBuilder.builder(context.parser().contentType().xContent()) + .copyCurrentStructure(context.parser()); + } + + if (malformedDataForSyntheticSource != null) { + context.doc().add(IgnoreMalformedStoredValues.storedField(fullPath(), malformedDataForSyntheticSource)); + } + + context.addIgnoredField(fieldType().name()); + } + context.path().remove(); + } + + /** re-usable {@link HistogramValue} implementation */ + private static class InternalHistogramValue extends HistogramValue { + double value; + long count; + boolean isExhausted; + final ByteArrayStreamInput streamInput; + + InternalHistogramValue() { + streamInput = new ByteArrayStreamInput(); + } + + /** reset the value for the histogram */ + void reset(BytesRef bytesRef) { + streamInput.reset(bytesRef.bytes, bytesRef.offset, bytesRef.length); + isExhausted = false; + value = 0; + count = 0; + } + + @Override + public boolean next() throws IOException { + if (streamInput.available() > 0) { + if (streamInput.getTransportVersion().onOrAfter(TransportVersions.V_8_11_X)) { + count = streamInput.readVLong(); + } else { + count = streamInput.readVInt(); + } + value = Double.longBitsToDouble(streamInput.readLong()); + return true; + } + isExhausted = true; + return false; + } + + @Override + public double value() { + if (isExhausted) { + throw new IllegalArgumentException("histogram already exhausted"); + } + return value; + } + + @Override + public long count() { + if (isExhausted) { + throw new IllegalArgumentException("histogram already exhausted"); + } + return count; + } + } + + @Override + protected SyntheticSourceSupport syntheticSourceSupport() { + return new SyntheticSourceSupport.Native( + () -> new CompositeSyntheticFieldLoader( + leafName(), + fullPath(), + new HistogramSyntheticFieldLoader(), + new CompositeSyntheticFieldLoader.MalformedValuesLayer(fullPath()) + ) + ); + } + + private class HistogramSyntheticFieldLoader implements CompositeSyntheticFieldLoader.DocValuesLayer { + private final InternalHistogramValue value = new InternalHistogramValue(); + private BytesRef binaryValue; + + @Override + public DocValuesLoader docValuesLoader(LeafReader leafReader, int[] docIdsInLeaf) throws IOException { + BinaryDocValues docValues = leafReader.getBinaryDocValues(fieldType().name()); + if (docValues == null) { + // No values in this leaf + binaryValue = null; + return null; + } + return docId -> { + if (docValues.advanceExact(docId)) { + binaryValue = docValues.binaryValue(); + return true; + } + binaryValue = null; + return false; + }; + } + + @Override + public boolean hasValue() { + return binaryValue != null; + } + + @Override + public void write(XContentBuilder b) throws IOException { + if (binaryValue == null) { + return; + } + b.startObject(); + + value.reset(binaryValue); + b.startArray(CENTROIDS_NAME); + while (value.next()) { + b.value(value.value()); + } + b.endArray(); + + value.reset(binaryValue); + b.startArray(COUNTS_NAME); + while (value.next()) { + b.value(value.count()); + } + b.endArray(); + + b.endObject(); + } + + @Override + public String fieldName() { + return fullPath(); + } + + @Override + public long valueCount() { + return binaryValue != null ? 1 : 0; + } + }; +} diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/mapper/TDigestParser.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/mapper/TDigestParser.java new file mode 100644 index 0000000000000..58fd39ad187d4 --- /dev/null +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/mapper/TDigestParser.java @@ -0,0 +1,139 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.analytics.mapper; + +import org.elasticsearch.index.mapper.DocumentParsingException; +import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xcontent.XContentParser; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; +import static org.elasticsearch.xpack.analytics.mapper.TDigestFieldMapper.CENTROIDS_NAME; +import static org.elasticsearch.xpack.analytics.mapper.TDigestFieldMapper.COUNTS_NAME; + +public class TDigestParser { + + private static final ParseField COUNTS_FIELD = new ParseField(COUNTS_NAME); + private static final ParseField CENTROIDS_FIELD = new ParseField(CENTROIDS_NAME); + + /** + * A parsed histogram field, can represent either a T-Digest + * @param centroids the centroids, guaranteed to be distinct and in increasing order + * @param counts the counts, guaranteed to be non-negative and of the same length as the centroids array + */ + public record ParsedHistogram(List centroids, List counts) {} + + /** + * Parses an XContent object into a histogram. + * The parser is expected to point at the next token after {@link XContentParser.Token#START_OBJECT}. + * + * @param mappedFieldName the name of the field being parsed, used for error messages + * @param parser the parser to use + * @return the parsed histogram + */ + public static ParsedHistogram parse(String mappedFieldName, XContentParser parser) throws IOException { + ArrayList centroids = null; + ArrayList counts = null; + XContentParser.Token token = parser.currentToken(); + while (token != XContentParser.Token.END_OBJECT) { + // should be a field + ensureExpectedToken(XContentParser.Token.FIELD_NAME, token, parser); + String fieldName = parser.currentName(); + if (fieldName.equals(CENTROIDS_FIELD.getPreferredName())) { + token = parser.nextToken(); + // should be an array + ensureExpectedToken(XContentParser.Token.START_ARRAY, token, parser); + centroids = new ArrayList<>(); + token = parser.nextToken(); + double previousVal = -Double.MAX_VALUE; + while (token != XContentParser.Token.END_ARRAY) { + // should be a number + ensureExpectedToken(XContentParser.Token.VALUE_NUMBER, token, parser); + double val = parser.doubleValue(); + if (val < previousVal) { + // centroids must be in increasing order + throw new DocumentParsingException( + parser.getTokenLocation(), + "error parsing field [" + + mappedFieldName + + "], [" + + CENTROIDS_FIELD + + "] centroids must be in increasing order, got [" + + val + + "] but previous value was [" + + previousVal + + "]" + ); + } + centroids.add(val); + previousVal = val; + token = parser.nextToken(); + } + } else if (fieldName.equals(COUNTS_FIELD.getPreferredName())) { + token = parser.nextToken(); + // should be an array + ensureExpectedToken(XContentParser.Token.START_ARRAY, token, parser); + counts = new ArrayList<>(); + token = parser.nextToken(); + while (token != XContentParser.Token.END_ARRAY) { + // should be a number + ensureExpectedToken(XContentParser.Token.VALUE_NUMBER, token, parser); + long count = parser.longValue(); + if (count < 0) { + throw new DocumentParsingException( + parser.getTokenLocation(), + "error parsing field [" + mappedFieldName + "], [" + COUNTS_FIELD + "] elements must be >= 0 but got " + count + ); + } + counts.add(count); + token = parser.nextToken(); + } + } else { + throw new DocumentParsingException( + parser.getTokenLocation(), + "error parsing field [" + mappedFieldName + "], with unknown parameter [" + fieldName + "]" + ); + } + token = parser.nextToken(); + } + if (centroids == null) { + throw new DocumentParsingException( + parser.getTokenLocation(), + "error parsing field [" + mappedFieldName + "], expected field called [" + CENTROIDS_FIELD.getPreferredName() + "]" + ); + } + if (counts == null) { + throw new DocumentParsingException( + parser.getTokenLocation(), + "error parsing field [" + mappedFieldName + "], expected field called [" + COUNTS_FIELD.getPreferredName() + "]" + ); + } + if (centroids.size() != counts.size()) { + throw new DocumentParsingException( + parser.getTokenLocation(), + "error parsing field [" + + mappedFieldName + + "], expected same length from [" + + CENTROIDS_FIELD.getPreferredName() + + "] and " + + "[" + + COUNTS_FIELD.getPreferredName() + + "] but got [" + + centroids.size() + + " != " + + counts.size() + + "]" + ); + } + return new ParsedHistogram(centroids, counts); + } + +} diff --git a/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/mapper/TDigestFieldMapperTests.java b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/mapper/TDigestFieldMapperTests.java new file mode 100644 index 0000000000000..3e93ee38a1f17 --- /dev/null +++ b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/mapper/TDigestFieldMapperTests.java @@ -0,0 +1,470 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.analytics.mapper; + +import org.elasticsearch.common.Strings; +import org.elasticsearch.core.CheckedConsumer; +import org.elasticsearch.index.mapper.DocumentMapper; +import org.elasticsearch.index.mapper.DocumentParsingException; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.MapperParsingException; +import org.elasticsearch.index.mapper.MapperTestCase; +import org.elasticsearch.index.mapper.ParsedDocument; +import org.elasticsearch.index.mapper.SourceToParse; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.search.aggregations.metrics.TDigestState; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.json.JsonXContent; +import org.elasticsearch.xpack.analytics.AnalyticsPlugin; +import org.junit.AssumptionViolatedException; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; + +public class TDigestFieldMapperTests extends MapperTestCase { + + @Override + protected Object getSampleValueForDocument() { + // TODO - In hybrid mode, this will not even build a t-digest. Let's test with bigger data + return Map.of("centroids", new double[] { 2, 3 }, "counts", new int[] { 0, 4 }); + } + + @Override + protected Object getSampleObjectForDocument() { + return getSampleValueForDocument(); + } + + @Override + protected Collection getPlugins() { + return List.of(new AnalyticsPlugin()); + } + + @Override + protected void minimalMapping(XContentBuilder b) throws IOException { + b.field("type", "tdigest"); + } + + @Override + protected void registerParameters(ParameterChecker checker) throws IOException { + checker.registerUpdateCheck(b -> b.field("ignore_malformed", true), m -> assertTrue(m.ignoreMalformed())); + checker.registerConflictCheck("digest_type", b -> b.field("digest_type", TDigestState.Type.AVL_TREE)); + checker.registerConflictCheck("compression", b -> b.field("compression", 117)); + } + + @Override + protected boolean supportsSearchLookup() { + return false; + } + + @Override + protected boolean supportsStoredFields() { + return false; + } + + public void testParseValue() throws Exception { + DocumentMapper mapper = createDocumentMapper(fieldMapping(this::minimalMapping)); + ParsedDocument doc = mapper.parse( + source(b -> b.startObject("field").field("centroids", new double[] { 2, 3 }).field("counts", new int[] { 0, 4 }).endObject()) + ); + assertThat(doc.rootDoc().getField("field"), notNullValue()); + } + + public void testParseArrayValue() throws Exception { + DocumentMapper mapper = createDocumentMapper(fieldMapping(this::minimalMapping)); + Exception e = expectThrows(DocumentParsingException.class, () -> mapper.parse(source(b -> { + b.startArray("field"); + { + b.startObject().field("counts", new int[] { 2, 2, 3 }).field("centroids", new double[] { 2, 2, 3 }).endObject(); + b.startObject().field("counts", new int[] { 2, 2, 3 }).field("centroids", new double[] { 2, 2, 3 }).endObject(); + } + b.endArray(); + }))); + assertThat( + e.getCause().getMessage(), + containsString("doesn't support indexing multiple values for the same field in the same document") + ); + } + + public void testEmptyArrays() throws Exception { + DocumentMapper mapper = createDocumentMapper(fieldMapping(this::minimalMapping)); + ParsedDocument doc = mapper.parse( + source(b -> b.startObject("field").field("centroids", new double[] {}).field("counts", new int[] {}).endObject()) + ); + assertThat(doc.rootDoc().getField("field"), notNullValue()); + } + + public void testNullValue() throws Exception { + DocumentMapper mapper = createDocumentMapper(fieldMapping(this::minimalMapping)); + ParsedDocument doc = mapper.parse(source(b -> b.nullField("pre_aggregated"))); + assertThat(doc.rootDoc().getField("pre_aggregated"), nullValue()); + } + + public void testMissingFieldCounts() throws Exception { + DocumentMapper mapper = createDocumentMapper(fieldMapping(this::minimalMapping)); + Exception e = expectThrows( + DocumentParsingException.class, + () -> mapper.parse(source(b -> b.startObject("field").field("centroids", new double[] { 2, 2 }).endObject())) + ); + assertThat(e.getCause().getMessage(), containsString("expected field called [counts]")); + } + + @Override + protected boolean supportsIgnoreMalformed() { + return true; + } + + @Override + protected List exampleMalformedValues() { + String randomString = randomAlphaOfLengthBetween(1, 10); + long randomLong = randomLong(); + double randomDouble = randomDouble(); + boolean randomBoolean = randomBoolean(); + + return List.of( + exampleMalformedValue(b -> b.value(randomString)).errorMatches( + "Failed to parse object: expecting token of type [START_OBJECT]" + ), + exampleMalformedValue(b -> b.value(randomLong)).errorMatches("Failed to parse object: expecting token of type [START_OBJECT]"), + exampleMalformedValue(b -> b.value(randomDouble)).errorMatches( + "Failed to parse object: expecting token of type [START_OBJECT]" + ), + exampleMalformedValue(b -> b.value(randomBoolean)).errorMatches( + "Failed to parse object: expecting token of type [START_OBJECT]" + ), + exampleMalformedValue(b -> b.startObject().endObject()).errorMatches("expected field called [centroids]"), + exampleMalformedValue(b -> b.startObject().startArray("centroids").value(2).value(2).endArray().endObject()).errorMatches( + "expected field called [counts]" + ), + exampleMalformedValue(b -> b.startObject().startArray("counts").value(2).value(2).endArray().endObject()).errorMatches( + "expected field called [centroids]" + ), + // Make sure that entire sub-object is preserved in synthetic source + exampleMalformedValue( + b -> b.startObject() + .startArray("centroids") + .value(2) + .endArray() + .field("somefield", randomString) + .array("somearray", randomLong, randomLong) + .startObject("someobject") + .field("nestedfield", randomDouble) + .endObject() + .endObject() + ).errorMatches("unknown parameter [somefield]"), + exampleMalformedValue(b -> b.startArray().value(randomLong).value(randomLong).endArray()).errorMatches( + "expecting token of type [START_OBJECT] but found [VALUE_NUMBER]" + ) + ); + } + + public void testIgnoreMalformedSkipsKeyword() throws Exception { + DocumentMapper mapper = createDocumentMapper(mapping(b -> { + b.startObject("pre_aggregated").field("type", "tdigest").field("ignore_malformed", true).endObject(); + b.startObject("otherField").field("type", "keyword").endObject(); + })); + ParsedDocument doc = mapper.parse(source(b -> b.field("pre_aggregated", "value").field("otherField", "value"))); + assertThat(doc.rootDoc().getField("pre_aggregated"), nullValue()); + assertThat(doc.rootDoc().getField("otherField"), notNullValue()); + } + + public void testIgnoreMalformedSkipsArray() throws Exception { + DocumentMapper mapper = createDocumentMapper(mapping(b -> { + b.startObject("pre_aggregated").field("type", "tdigest").field("ignore_malformed", true).endObject(); + b.startObject("otherField").field("type", "keyword").endObject(); + })); + ParsedDocument doc = mapper.parse(source(b -> b.field("pre_aggregated", new int[] { 2, 2, 2 }).field("otherField", "value"))); + assertThat(doc.rootDoc().getField("pre_aggregated"), nullValue()); + assertThat(doc.rootDoc().getField("otherField"), notNullValue()); + } + + public void testIgnoreMalformedSkipsField() throws Exception { + DocumentMapper mapper = createDocumentMapper(mapping(b -> { + b.startObject("pre_aggregated").field("type", "tdigest").field("ignore_malformed", true).endObject(); + b.startObject("otherField").field("type", "keyword").endObject(); + })); + ParsedDocument doc = mapper.parse(source(b -> { + b.startObject("pre_aggregated").field("centroids", new double[] { 2, 2 }).field("typo", new double[] { 2, 2 }).endObject(); + b.field("otherField", "value"); + })); + assertThat(doc.rootDoc().getField("pre_aggregated"), nullValue()); + assertThat(doc.rootDoc().getField("otherField"), notNullValue()); + } + + public void testIgnoreMalformedSkipsObjects() throws Exception { + DocumentMapper mapper = createDocumentMapper(mapping(b -> { + b.startObject("pre_aggregated").field("type", "tdigest").field("ignore_malformed", true).endObject(); + b.startObject("otherField").field("type", "keyword").endObject(); + })); + ParsedDocument doc = mapper.parse(source(b -> { + b.startObject("pre_aggregated"); + { + b.startObject("centroids"); + { + b.field("centroids", new double[] { 2, 2 }); + b.startObject("otherData"); + { + b.startObject("more").field("toto", 1).endObject(); + } + b.endObject(); + } + b.endObject(); + b.field("counts", new double[] { 2, 2 }); + } + b.endObject(); + b.field("otherField", "value"); + })); + assertThat(doc.rootDoc().getField("pre_aggregated"), nullValue()); + assertThat(doc.rootDoc().getField("otherField"), notNullValue()); + } + + public void testIgnoreMalformedSkipsEmpty() throws Exception { + DocumentMapper mapper = createDocumentMapper(mapping(b -> { + b.startObject("pre_aggregated").field("type", "tdigest").field("ignore_malformed", true).endObject(); + b.startObject("otherField").field("type", "keyword").endObject(); + })); + ParsedDocument doc = mapper.parse(source(b -> b.startObject("pre_aggregated").endObject().field("otherField", "value"))); + assertThat(doc.rootDoc().getField("pre_aggregated"), nullValue()); + assertThat(doc.rootDoc().getField("otherField"), notNullValue()); + } + + public void testMissingFieldValues() throws Exception { + DocumentMapper mapper = createDocumentMapper(fieldMapping(this::minimalMapping)); + Exception e = expectThrows( + DocumentParsingException.class, + () -> mapper.parse(source(b -> b.startObject("field").field("counts", new int[] { 2, 2 }).endObject())) + ); + assertThat(e.getCause().getMessage(), containsString("expected field called [centroids]")); + } + + public void testUnknownField() throws Exception { + DocumentMapper mapper = createDocumentMapper(fieldMapping(this::minimalMapping)); + SourceToParse source = source( + b -> b.startObject("field") + .field("counts", new int[] { 2, 2 }) + .field("centroids", new double[] { 2, 2 }) + .field("unknown", new double[] { 2, 2 }) + .endObject() + ); + + Exception e = expectThrows(DocumentParsingException.class, () -> mapper.parse(source)); + assertThat(e.getCause().getMessage(), containsString("with unknown parameter [unknown]")); + } + + public void testFieldArraysDifferentSize() throws Exception { + DocumentMapper mapper = createDocumentMapper(fieldMapping(this::minimalMapping)); + SourceToParse source = source( + b -> b.startObject("field").field("counts", new int[] { 2, 2 }).field("centroids", new double[] { 2, 2, 3 }).endObject() + ); + Exception e = expectThrows(DocumentParsingException.class, () -> mapper.parse(source)); + assertThat(e.getCause().getMessage(), containsString("expected same length from [centroids] and [counts] but got [3 != 2]")); + } + + public void testFieldCountsNotArray() throws Exception { + DocumentMapper mapper = createDocumentMapper(fieldMapping(this::minimalMapping)); + SourceToParse source = source( + b -> b.startObject("field").field("counts", "bah").field("centroids", new double[] { 2, 2, 3 }).endObject() + ); + Exception e = expectThrows(DocumentParsingException.class, () -> mapper.parse(source)); + assertThat(e.getCause().getMessage(), containsString("expecting token of type [START_ARRAY] but found [VALUE_STRING]")); + } + + public void testFieldCountsStringArray() throws Exception { + DocumentMapper mapper = createDocumentMapper(fieldMapping(this::minimalMapping)); + SourceToParse source = source( + b -> b.startObject("field") + .field("counts", new String[] { "4", "5", "6" }) + .field("centroids", new double[] { 2, 2, 3 }) + .endObject() + ); + Exception e = expectThrows(DocumentParsingException.class, () -> mapper.parse(source)); + assertThat(e.getCause().getMessage(), containsString("expecting token of type [VALUE_NUMBER] but found [VALUE_STRING]")); + } + + public void testFieldValuesStringArray() throws Exception { + DocumentMapper mapper = createDocumentMapper(fieldMapping(this::minimalMapping)); + SourceToParse source = source( + b -> b.field("field") + .startObject() + .field("counts", new int[] { 4, 5, 6 }) + .field("centroids", new String[] { "2", "2", "3" }) + .endObject() + ); + Exception e = expectThrows(DocumentParsingException.class, () -> mapper.parse(source)); + assertThat(e.getCause().getMessage(), containsString("expecting token of type [VALUE_NUMBER] but found [VALUE_STRING]")); + } + + public void testFieldValuesNotArray() throws Exception { + DocumentMapper mapper = createDocumentMapper(fieldMapping(this::minimalMapping)); + SourceToParse source = source( + b -> b.startObject("field").field("counts", new int[] { 2, 2, 3 }).field("centroids", "bah").endObject() + ); + Exception e = expectThrows(DocumentParsingException.class, () -> mapper.parse(source)); + assertThat(e.getCause().getMessage(), containsString("expecting token of type [START_ARRAY] but found [VALUE_STRING]")); + } + + public void testCountIsLong() throws Exception { + DocumentMapper mapper = createDocumentMapper(fieldMapping(this::minimalMapping)); + SourceToParse source = source( + b -> b.startObject("field") + .field("counts", new long[] { 2, 2, Long.MAX_VALUE }) + .field("centroids", new double[] { 2, 2, 3 }) + .endObject() + ); + ParsedDocument doc = mapper.parse(source); + assertThat(doc.rootDoc().getField("field"), notNullValue()); + } + + public void testValuesNotInOrder() throws Exception { + DocumentMapper mapper = createDocumentMapper(fieldMapping(this::minimalMapping)); + SourceToParse source = source( + b -> b.field("field") + .startObject() + .field("counts", new int[] { 2, 8, 4 }) + .field("centroids", new double[] { 2, 3, 2 }) + .endObject() + ); + Exception e = expectThrows(DocumentParsingException.class, () -> mapper.parse(source)); + assertThat( + e.getCause().getMessage(), + containsString(" centroids must be in increasing order, " + "got [2.0] but previous value was [3.0]") + ); + } + + public void testFieldNotObject() throws Exception { + DocumentMapper mapper = createDocumentMapper(fieldMapping(this::minimalMapping)); + SourceToParse source = source(b -> b.field("field", "bah")); + Exception e = expectThrows(DocumentParsingException.class, () -> mapper.parse(source)); + assertThat(e.getCause().getMessage(), containsString("expecting token of type [START_OBJECT] " + "but found [VALUE_STRING]")); + } + + public void testNegativeCount() throws Exception { + DocumentMapper mapper = createDocumentMapper(fieldMapping(this::minimalMapping)); + SourceToParse source = source( + b -> b.startObject("field").field("counts", new int[] { 2, 2, -3 }).field("centroids", new double[] { 2, 2, 3 }).endObject() + ); + Exception e = expectThrows(DocumentParsingException.class, () -> mapper.parse(source)); + assertThat(e.getCause().getMessage(), containsString("[counts] elements must be >= 0 but got -3")); + } + + @Override + protected Object generateRandomInputValue(MappedFieldType ft) { + assumeFalse("Test implemented in a follow up", true); + return null; + } + + public void testCannotBeUsedInMultifields() { + Exception e = expectThrows(MapperParsingException.class, () -> createMapperService(fieldMapping(b -> { + b.field("type", "keyword"); + b.startObject("fields"); + b.startObject("hist"); + b.field("type", "tdigest"); + b.endObject(); + b.endObject(); + }))); + assertThat(e.getMessage(), containsString("Field [hist] of type [tdigest] can't be used in multifields")); + } + + @Override + protected IngestScriptSupport ingestScriptSupport() { + throw new AssumptionViolatedException("not supported"); + } + + @Override + protected List getSortShortcutSupport() { + return List.of(); + } + + public void testArrayValueSyntheticSource() throws Exception { + DocumentMapper mapper = createSytheticSourceMapperService( + fieldMapping(b -> b.field("type", "tdigest").field("ignore_malformed", "true")) + ).documentMapper(); + + String randomString = randomAlphaOfLength(10); + CheckedConsumer arrayValue = b -> { + b.startArray("field"); + { + b.startObject().field("counts", new int[] { 1, 2, 3 }).field("centroids", new double[] { 1, 2, 3 }).endObject(); + b.startObject().field("counts", new int[] { 4, 5, 6 }).field("centroids", new double[] { 4, 5, 6 }).endObject(); + b.value(randomString); + } + b.endArray(); + }; + + XContentBuilder expected = JsonXContent.contentBuilder().startObject(); + // First value comes from synthetic field loader and so is formatted in a specific format (e.g. centroids always come first). + // Other values are stored as is as part of ignore_malformed logic for synthetic source. + { + expected.startArray("field"); + expected.startObject().field("centroids", new double[] { 1, 2, 3 }).field("counts", new int[] { 1, 2, 3 }).endObject(); + expected.startObject().field("counts", new int[] { 4, 5, 6 }).field("centroids", new double[] { 4, 5, 6 }).endObject(); + expected.value(randomString); + expected.endArray(); + } + expected.endObject(); + + String syntheticSource = syntheticSource(mapper, arrayValue); + assertEquals(Strings.toString(expected), syntheticSource); + } + + @Override + protected SyntheticSourceSupport syntheticSourceSupport(boolean ignoreMalformed) { + return new TDigestFieldSyntheticSourceSupport(ignoreMalformed); + } + + private record TDigestFieldSyntheticSourceSupport(boolean ignoreMalformed) implements SyntheticSourceSupport { + @Override + public SyntheticSourceExample example(int maxVals) { + if (randomBoolean()) { + Map value = new LinkedHashMap<>(); + value.put("centroids", List.of(randomDouble())); + value.put("counts", List.of(randomCount())); + return new SyntheticSourceExample(value, value, this::mapping); + } + int size = between(1, maxVals); + List centroids = new ArrayList<>(size); + double prev = randomDouble(); + centroids.add(prev); + while (centroids.size() < size && prev != Double.MAX_VALUE) { + prev = randomDoubleBetween(prev, Double.MAX_VALUE, false); + centroids.add(prev); + } + Map value = new LinkedHashMap<>(); + value.put("centroids", centroids); + value.put("counts", randomList(centroids.size(), centroids.size(), this::randomCount)); + return new SyntheticSourceExample(value, value, this::mapping); + } + + private int randomCount() { + return between(1, Integer.MAX_VALUE); + } + + private void mapping(XContentBuilder b) throws IOException { + b.field("type", "tdigest"); + if (ignoreMalformed) { + b.field("ignore_malformed", true); + } + } + + @Override + public List invalidExample() throws IOException { + return List.of(); + } + } + + @Override + public void testSyntheticSourceKeepArrays() { + // The mapper expects to parse an array of values by default, it's not compatible with array of arrays. + } +}