Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -497,12 +497,7 @@ public ExponentialHistogramBlockBuilder newExponentialHistogramBlockBuilder(int
}

public final ExponentialHistogramBlock newConstantExponentialHistogramBlock(ExponentialHistogram value, int positionCount) {
try (ExponentialHistogramBlockBuilder builder = newExponentialHistogramBlockBuilder(positionCount)) {
for (int i = 0; i < positionCount; i++) {
builder.append(value);
}
return builder.build();
}
return ExponentialHistogramArrayBlock.createConstant(value, positionCount, this);
}

public BlockLoader.Block newExponentialHistogramBlockFromDocValues(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@
package org.elasticsearch.compute.data;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.ReleasableIterator;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.exponentialhistogram.CompressedExponentialHistogram;
import org.elasticsearch.exponentialhistogram.ExponentialHistogram;
import org.elasticsearch.exponentialhistogram.ZeroBucket;

import java.io.IOException;
import java.util.List;
Expand Down Expand Up @@ -106,6 +109,45 @@ private List<Block> getSubBlocks() {
return List.of(sums, valueCounts, zeroThresholds, encodedHistograms, minima, maxima);
}

public static EncodedHistogramData encode(ExponentialHistogram histogram) {
assert histogram != null;
// TODO: check and potentially improve performance and correctness before moving out of tech-preview
// The current implementation encodes the histogram into the format we use for storage on disk
// This format is optimized for minimal memory usage at the cost of encoding speed
// In addition, it only support storing the zero threshold as a double value, which is lossy when merging histograms
// In practice this currently occurs, as the zero threshold is usually 0.0 and not impacted by merges
// And even if it occurs, the error is usually tiny
// We should add a dedicated encoding when building a block from computed histograms which do not originate from doc values
// That encoding should be optimized for speed and support storing the zero threshold as (scale, index) pair
ZeroBucket zeroBucket = histogram.zeroBucket();
BytesStreamOutput encodedBytes = new BytesStreamOutput();
try {
CompressedExponentialHistogram.writeHistogramBytes(
encodedBytes,
histogram.scale(),
histogram.negativeBuckets().iterator(),
histogram.positiveBuckets().iterator()
);
} catch (IOException e) {
throw new RuntimeException("Failed to encode histogram", e);
}
double sum;
if (histogram.valueCount() == 0) {
assert histogram.sum() == 0.0 : "Empty histogram should have sum 0.0 but was " + histogram.sum();
sum = Double.NaN; // we store null/NaN for empty histograms to ensure avg is null/0.0 instead of 0.0/0.0
} else {
sum = histogram.sum();
}
return new EncodedHistogramData(
histogram.valueCount(),
sum,
histogram.min(),
histogram.max(),
zeroBucket.zeroThreshold(),
encodedBytes.bytes().toBytesRef()
);
}

@Override
public ExponentialHistogram getExponentialHistogram(int valueIndex, ExponentialHistogramScratch scratch) {
assert isNull(valueIndex) == false : "tried to get histogram at null position " + valueIndex;
Expand All @@ -125,6 +167,43 @@ public ExponentialHistogram getExponentialHistogram(int valueIndex, ExponentialH
}
}

public static ExponentialHistogramBlock createConstant(ExponentialHistogram histogram, int positionCount, BlockFactory blockFactory) {
EncodedHistogramData data = encode(histogram);
DoubleBlock minBlock = null;
DoubleBlock maxBlock = null;
DoubleBlock sumBlock = null;
DoubleBlock countBlock = null;
DoubleBlock zeroThresholdBlock = null;
BytesRefBlock encodedHistogramBlock = null;
boolean success = false;
try {
countBlock = blockFactory.newConstantDoubleBlockWith(data.count, positionCount);
if (Double.isNaN(data.min)) {
minBlock = (DoubleBlock) blockFactory.newConstantNullBlock(positionCount);
} else {
minBlock = blockFactory.newConstantDoubleBlockWith(data.min, positionCount);
}
if (Double.isNaN(data.max)) {
maxBlock = (DoubleBlock) blockFactory.newConstantNullBlock(positionCount);
} else {
maxBlock = blockFactory.newConstantDoubleBlockWith(data.max, positionCount);
}
if (Double.isNaN(data.sum)) {
sumBlock = (DoubleBlock) blockFactory.newConstantNullBlock(positionCount);
} else {
sumBlock = blockFactory.newConstantDoubleBlockWith(data.sum, positionCount);
}
zeroThresholdBlock = blockFactory.newConstantDoubleBlockWith(data.zeroThreshold, positionCount);
encodedHistogramBlock = blockFactory.newConstantBytesRefBlockWith(data.encodedHistogram, positionCount);
success = true;
return new ExponentialHistogramArrayBlock(minBlock, maxBlock, sumBlock, countBlock, zeroThresholdBlock, encodedHistogramBlock);
} finally {
if (success == false) {
Releasables.close(minBlock, maxBlock, sumBlock, countBlock, zeroThresholdBlock, encodedHistogramBlock);
}
}
}

@Override
public Block buildExponentialHistogramComponentBlock(Component component) {
// as soon as we support multi-values, we need to implement this differently,
Expand Down Expand Up @@ -438,4 +517,6 @@ public int hashCode() {
// this ensures proper equality with null blocks and should be unique enough for practical purposes
return encodedHistograms.hashCode();
}

record EncodedHistogramData(double count, double sum, double min, double max, double zeroThreshold, BytesRef encodedHistogram) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,10 @@
package org.elasticsearch.compute.data;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.exponentialhistogram.CompressedExponentialHistogram;
import org.elasticsearch.exponentialhistogram.ExponentialHistogram;
import org.elasticsearch.exponentialhistogram.ZeroBucket;
import org.elasticsearch.index.mapper.BlockLoader;

import java.io.IOException;

public final class ExponentialHistogramBlockBuilder implements ExponentialHistogramBlock.Builder {

private final DoubleBlock.Builder minimaBuilder;
Expand Down Expand Up @@ -95,45 +90,25 @@ public BlockLoader.BytesRefBuilder encodedHistograms() {
}

public ExponentialHistogramBlockBuilder append(ExponentialHistogram histogram) {
assert histogram != null;
// TODO: fix performance and correctness before using in production code
// The current implementation encodes the histogram into the format we use for storage on disk
// This format is optimized for minimal memory usage at the cost of encoding speed
// In addition, it only support storing the zero threshold as a double value, which is lossy when merging histograms
// We should add a dedicated encoding when building a block from computed histograms which do not originate from doc values
// That encoding should be optimized for speed and support storing the zero threshold as (scale, index) pair
ZeroBucket zeroBucket = histogram.zeroBucket();

BytesStreamOutput encodedBytes = new BytesStreamOutput();
try {
CompressedExponentialHistogram.writeHistogramBytes(
encodedBytes,
histogram.scale(),
histogram.negativeBuckets().iterator(),
histogram.positiveBuckets().iterator()
);
} catch (IOException e) {
throw new RuntimeException("Failed to encode histogram", e);
}
if (Double.isNaN(histogram.min())) {
ExponentialHistogramArrayBlock.EncodedHistogramData data = ExponentialHistogramArrayBlock.encode(histogram);
valueCountsBuilder.appendDouble(data.count());
if (Double.isNaN(data.min())) {
minimaBuilder.appendNull();
} else {
minimaBuilder.appendDouble(histogram.min());
minimaBuilder.appendDouble(data.min());
}
if (Double.isNaN(histogram.max())) {
if (Double.isNaN(data.max())) {
maximaBuilder.appendNull();
} else {
maximaBuilder.appendDouble(histogram.max());
maximaBuilder.appendDouble(data.max());
}
if (histogram.valueCount() == 0) {
assert histogram.sum() == 0.0 : "Empty histogram should have sum 0.0 but was " + histogram.sum();
if (Double.isNaN(data.sum())) {
sumsBuilder.appendNull();
} else {
sumsBuilder.appendDouble(histogram.sum());
sumsBuilder.appendDouble(data.sum());
}
valueCountsBuilder.appendDouble(histogram.valueCount());
zeroThresholdsBuilder.appendDouble(zeroBucket.zeroThreshold());
encodedHistogramsBuilder.appendBytesRef(encodedBytes.bytes().toBytesRef());
zeroThresholdsBuilder.appendDouble(data.zeroThreshold());
encodedHistogramsBuilder.appendBytesRef(data.encodedHistogram());
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,14 @@
*/
public class WriteableExponentialHistogram extends AbstractExponentialHistogram implements GenericNamedWriteable {

// TODO(b/133393): as it turns out, this is also required in production. Therefore we have to properly register this class,
// like in https://github.com/elastic/elasticsearch/pull/135054
// TODO: This is currently only used for serializing literals in tests.
// At the time of writing, it is impossible for ExponentialHistogram literals to appear outside of tests
// as there is no conversion function or other means for constructing them except from a block loader.
// For this reason, this class is currently only available and registered for deserialization in tests,
// so that we don't have to care about backwards compatibility.
// If we eventually have to support ExponentialHistogram literals in production code,
// proper testing for this class needs to be added, similar to the following PR:
// https://github.com/elastic/elasticsearch/pull/135054

private static final String WRITEABLE_NAME = "test_exponential_histogram";

Expand Down