Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,8 @@ Block buildExponentialHistogramBlockDirect(
Block zeroThresholds,
Block encodedHistograms
);

Block buildTDigestBlockDirect(Block encodedDigests, Block minima, Block maxima, Block sums, Block valueCounts);
}

/**
Expand Down Expand Up @@ -697,4 +699,16 @@ interface ExponentialHistogramBuilder extends Builder {

BytesRefBuilder encodedHistograms();
}

interface TDigestBuilder extends Builder {
DoubleBuilder minima();

DoubleBuilder maxima();

DoubleBuilder sums();

LongBuilder valueCounts();

BytesRefBuilder encodedDigests();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public static TDigestState create(CircuitBreaker breaker, double compression) {
}
}

static TDigestState createOfType(CircuitBreaker breaker, Type type, double compression) {
public static TDigestState createOfType(CircuitBreaker breaker, Type type, double compression) {
breaker.addEstimateBytesAndMaybeBreak(SHALLOW_SIZE, "tdigest-state-create-with-type");
try {
return new TDigestState(breaker, type, compression);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,42 @@ public BlockLoader.Block buildExponentialHistogramBlockDirect(
encodedHistograms
);
}

@Override
public BlockLoader.Block buildTDigestBlockDirect(
BlockLoader.Block encodedDigests,
BlockLoader.Block minima,
BlockLoader.Block maxima,
BlockLoader.Block sums,
BlockLoader.Block valueCounts
) {
TestBlock minBlock = (TestBlock) minima;
TestBlock maxBlock = (TestBlock) maxima;
TestBlock sumBlock = (TestBlock) sums;
TestBlock countBlock = (TestBlock) valueCounts;
TestBlock digestBlock = (TestBlock) encodedDigests;

assert minBlock.size() == digestBlock.size();
assert maxBlock.size() == digestBlock.size();
assert sumBlock.size() == digestBlock.size();
assert countBlock.size() == digestBlock.size();

var values = new ArrayList<>(minBlock.size());

for (int i = 0; i < minBlock.size(); i++) {
// we need to represent this complex block somehow
HashMap<String, Object> value = new HashMap<>();
value.put("min", minBlock.values.get(i));
value.put("max", maxBlock.values.get(i));
value.put("sum", sumBlock.values.get(i));
value.put("value_count", countBlock.values.get(i));
value.put("encoded_digest", digestBlock.values.get(i));

values.add(value);
}

return new TestBlock(values);
}
};
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.analytics.mapper;

import org.apache.lucene.index.LeafReaderContext;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.index.mapper.blockloader.docvalues.BlockDocValuesReader;
import org.elasticsearch.index.mapper.blockloader.docvalues.BytesRefsFromBinaryBlockLoader;
import org.elasticsearch.index.mapper.blockloader.docvalues.DoublesBlockLoader;
import org.elasticsearch.index.mapper.blockloader.docvalues.LongsBlockLoader;

import java.io.IOException;

public class TDigestBlockLoader extends BlockDocValuesReader.DocValuesBlockLoader {
private final DoublesBlockLoader minimaLoader;
private final DoublesBlockLoader maximaLoader;
private final DoublesBlockLoader sumsLoader;
private final LongsBlockLoader valueCountsLoader;
private final BytesRefsFromBinaryBlockLoader encodedDigestLoader;

public TDigestBlockLoader(
BytesRefsFromBinaryBlockLoader encodedDigestLoader,
DoublesBlockLoader minimaLoader,
DoublesBlockLoader maximaLoader,
DoublesBlockLoader sumsLoader,
LongsBlockLoader valueCountsLoader
) {
this.encodedDigestLoader = encodedDigestLoader;
this.minimaLoader = minimaLoader;
this.maximaLoader = maximaLoader;
this.sumsLoader = sumsLoader;
this.valueCountsLoader = valueCountsLoader;
}

@Override
public AllReader reader(LeafReaderContext context) throws IOException {
AllReader encodedDigestReader = encodedDigestLoader.reader(context);
AllReader minimaReader = minimaLoader.reader(context);
AllReader maximaReader = maximaLoader.reader(context);
AllReader sumsReader = sumsLoader.reader(context);
AllReader valueCountsReader = valueCountsLoader.reader(context);

return new TDigestReader(encodedDigestReader, minimaReader, maximaReader, sumsReader, valueCountsReader);
}

@Override
public Builder builder(BlockFactory factory, int expectedCount) {
return null;
}

static class TDigestReader implements AllReader {

private final AllReader encodedDigestReader;
private final AllReader minimaReader;
private final AllReader maximaReader;
private final AllReader sumsReader;
private final AllReader valueCountsReader;

TDigestReader(
AllReader encodedDigestReader,
AllReader minimaReader,
AllReader maximaReader,
AllReader sumsReader,
AllReader valueCountsReader
) {
this.encodedDigestReader = encodedDigestReader;
this.minimaReader = minimaReader;
this.maximaReader = maximaReader;
this.sumsReader = sumsReader;
this.valueCountsReader = valueCountsReader;
}

@Override
public boolean canReuse(int startingDocID) {
return minimaReader.canReuse(startingDocID)
&& maximaReader.canReuse(startingDocID)
&& sumsReader.canReuse(startingDocID)
&& valueCountsReader.canReuse(startingDocID)
&& encodedDigestReader.canReuse(startingDocID);
}

@Override
// Column oriented reader
public Block read(BlockFactory factory, Docs docs, int offset, boolean nullsFiltered) throws IOException {
Block minima = null;
Block maxima = null;
Block sums = null;
Block valueCounts = null;
Block encodedBytes = null;
Block result;
boolean success = false;
try {
minima = minimaReader.read(factory, docs, offset, nullsFiltered);
maxima = maximaReader.read(factory, docs, offset, nullsFiltered);
sums = sumsReader.read(factory, docs, offset, nullsFiltered);
valueCounts = valueCountsReader.read(factory, docs, offset, nullsFiltered);
encodedBytes = encodedDigestReader.read(factory, docs, offset, nullsFiltered);
result = factory.buildTDigestBlockDirect(encodedBytes, minima, maxima, sums, valueCounts);
success = true;
} finally {
if (success == false) {
Releasables.close(minima, maxima, sums, valueCounts, encodedBytes);
}
}
return result;
}

@Override
public void read(int docId, StoredFields storedFields, Builder builder) throws IOException {
ExponentialHistogramBuilder histogramBuilder = (ExponentialHistogramBuilder) builder;
minimaReader.read(docId, storedFields, histogramBuilder.minima());
maximaReader.read(docId, storedFields, histogramBuilder.maxima());
sumsReader.read(docId, storedFields, histogramBuilder.sums());
valueCountsReader.read(docId, storedFields, histogramBuilder.valueCounts());
encodedDigestReader.read(docId, storedFields, histogramBuilder.encodedHistograms());
}

@Override
public String toString() {
return "BlockDocValuesReader.TDigest";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.FeatureFlag;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.fielddata.FieldDataContext;
import org.elasticsearch.index.fielddata.FormattedDocValues;
import org.elasticsearch.index.fielddata.HistogramValue;
Expand All @@ -32,6 +33,7 @@
import org.elasticsearch.index.fielddata.IndexHistogramFieldData;
import org.elasticsearch.index.fielddata.LeafHistogramFieldData;
import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
import org.elasticsearch.index.mapper.BlockLoader;
import org.elasticsearch.index.mapper.CompositeSyntheticFieldLoader;
import org.elasticsearch.index.mapper.DocumentParserContext;
import org.elasticsearch.index.mapper.DocumentParsingException;
Expand All @@ -42,6 +44,9 @@
import org.elasticsearch.index.mapper.MapperBuilderContext;
import org.elasticsearch.index.mapper.SourceValueFetcher;
import org.elasticsearch.index.mapper.ValueFetcher;
import org.elasticsearch.index.mapper.blockloader.docvalues.BytesRefsFromBinaryBlockLoader;
import org.elasticsearch.index.mapper.blockloader.docvalues.DoublesBlockLoader;
import org.elasticsearch.index.mapper.blockloader.docvalues.LongsBlockLoader;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.script.field.DocValuesScriptFieldFactory;
import org.elasticsearch.search.DocValueFormat;
Expand All @@ -58,6 +63,7 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Map;
import java.util.Objects;

import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;

Expand All @@ -79,13 +85,13 @@ private static TDigestFieldMapper toType(FieldMapper in) {
}

public static class Builder extends FieldMapper.Builder {
private static final int DEFAULT_COMPRESSION = 100;
private static final int MAXIMUM_COMPRESSION = 10000;
private static final double DEFAULT_COMPRESSION = 100d;
private static final double MAXIMUM_COMPRESSION = 10000d;

private final Parameter<Map<String, String>> meta = Parameter.metaParam();
private final Parameter<Explicit<Boolean>> ignoreMalformed;
private final Parameter<TDigestState.Type> digestType;
private final Parameter<Integer> compression;
private final Parameter<Double> compression;

public Builder(String name, boolean ignoreMalformedByDefault) {
super(name);
Expand All @@ -102,7 +108,15 @@ public Builder(String name, boolean ignoreMalformedByDefault) {
TDigestState.Type.HYBRID,
TDigestState.Type.class
);
this.compression = Parameter.intParam("compression", false, m -> toType(m).compression, DEFAULT_COMPRESSION).addValidator(c -> {
this.compression = new Parameter<>(
"compression",
false,
() -> DEFAULT_COMPRESSION,
(n, c1, o) -> XContentMapValues.nodeDoubleValue(o),
m -> toType(m).compression,
XContentBuilder::field,
Objects::toString
).addValidator(c -> {
if (c <= 0 || c > MAXIMUM_COMPRESSION) {
throw new IllegalArgumentException(
"compression must be a positive integer between 1 and " + MAXIMUM_COMPRESSION + " was [" + c + "]"
Expand Down Expand Up @@ -135,7 +149,7 @@ public TDigestFieldMapper build(MapperBuilderContext context) {
private final Explicit<Boolean> ignoreMalformed;
private final boolean ignoreMalformedByDefault;
private final TDigestState.Type digestType;
private final int compression;
private final double compression;

public TDigestFieldMapper(String simpleName, MappedFieldType mappedFieldType, BuilderParams builderParams, Builder builder) {
super(simpleName, mappedFieldType, builderParams);
Expand All @@ -154,7 +168,7 @@ public TDigestState.Type digestType() {
return digestType;
}

public int compression() {
public double compression() {
return compression;
}

Expand Down Expand Up @@ -184,6 +198,18 @@ public String typeName() {
return CONTENT_TYPE;
}

@Override
public BlockLoader blockLoader(BlockLoaderContext blContext) {
DoublesBlockLoader minimaLoader = new DoublesBlockLoader(valuesMinSubFieldName(name()), NumericUtils::sortableLongToDouble);
DoublesBlockLoader maximaLoader = new DoublesBlockLoader(valuesMaxSubFieldName(name()), NumericUtils::sortableLongToDouble);
DoublesBlockLoader sumsLoader = new DoublesBlockLoader(valuesSumSubFieldName(name()), NumericUtils::sortableLongToDouble);
LongsBlockLoader valueCountsLoader = new LongsBlockLoader(valuesCountSubFieldName(name()));
BytesRefsFromBinaryBlockLoader digestLoader = new BytesRefsFromBinaryBlockLoader(name());

// TODO: We're constantly passing around this set of 5 things. It would be nice to make a container for that.
return new TDigestBlockLoader(digestLoader, minimaLoader, maximaLoader, sumsLoader, valueCountsLoader);
}

@Override
public ValueFetcher valueFetcher(SearchExecutionContext context, String format) {
return SourceValueFetcher.identity(name(), context, format);
Expand Down Expand Up @@ -444,7 +470,7 @@ private static String valuesMaxSubFieldName(String fullPath) {
}

/** re-usable {@link HistogramValue} implementation */
private static class InternalTDigestValue extends HistogramValue {
static class InternalTDigestValue extends HistogramValue {
double value;
long count;
boolean isExhausted;
Expand Down
Loading