Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,8 @@ interface BlockFactory {

AggregateMetricDoubleBuilder aggregateMetricDoubleBuilder(int count);

Block buildAggregateMetricDoubleDirect(Block minBlock, Block maxBlock, Block sumBlock, Block countBlock);

ExponentialHistogramBuilder exponentialHistogramBlockBuilder(int count);

Block buildExponentialHistogramBlockDirect(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,21 @@ public BlockLoader.AggregateMetricDoubleBuilder aggregateMetricDoubleBuilder(int
return new AggregateMetricDoubleBlockBuilder(expectedSize);
}

@Override
public BlockLoader.Block buildAggregateMetricDoubleDirect(
BlockLoader.Block minBlock,
BlockLoader.Block maxBlock,
BlockLoader.Block sumBlock,
BlockLoader.Block countBlock
) {
return AggregateMetricDoubleBlockBuilder.parseAggMetricsToBlock(
(TestBlock) minBlock,
(TestBlock) maxBlock,
(TestBlock) sumBlock,
(TestBlock) countBlock
);
}

@Override
public BlockLoader.ExponentialHistogramBuilder exponentialHistogramBlockBuilder(int count) {
return new ExponentialHistogramBlockBuilder(this, count);
Expand Down Expand Up @@ -644,6 +659,10 @@ public BlockLoader.Block build() {
var sumBlock = sum.build();
var countBlock = count.build();

return parseAggMetricsToBlock(minBlock, maxBlock, sumBlock, countBlock);
}

public static TestBlock parseAggMetricsToBlock(TestBlock minBlock, TestBlock maxBlock, TestBlock sumBlock, TestBlock countBlock) {
assert minBlock.size() == maxBlock.size();
assert maxBlock.size() == sumBlock.size();
assert sumBlock.size() == countBlock.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,20 @@ public AggregateMetricDoubleBlockBuilder newAggregateMetricDoubleBlockBuilder(in
return new AggregateMetricDoubleBlockBuilder(estimatedSize, this);
}

public final AggregateMetricDoubleBlock newAggregateMetricDoubleBlock(
Block minBlock,
Block maxBlock,
Block sumBlock,
Block countBlock
) {
return new AggregateMetricDoubleArrayBlock(
(DoubleBlock) minBlock,
(DoubleBlock) maxBlock,
(DoubleBlock) sumBlock,
(IntBlock) countBlock
);
}

public final AggregateMetricDoubleBlock newConstantAggregateMetricDoubleBlock(
AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral value,
int positions
Expand Down Expand Up @@ -469,6 +483,15 @@ public final AggregateMetricDoubleBlock newConstantAggregateMetricDoubleBlock(
}
}

public BlockLoader.Block newAggregateMetricDoubleBlockFromDocValues(
DoubleBlock minBlock,
DoubleBlock maxBlock,
DoubleBlock sumBlock,
IntBlock countBlock
) {
return new AggregateMetricDoubleArrayBlock(minBlock, maxBlock, sumBlock, countBlock);
}

public ExponentialHistogramBlockBuilder newExponentialHistogramBlockBuilder(int estimatedSize) {
return new ExponentialHistogramBlockBuilder(estimatedSize, this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.compute.data.BytesRefVector;
import org.elasticsearch.compute.data.DoubleBlock;
import org.elasticsearch.compute.data.ElementType;
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.data.OrdinalBytesRefVector;
Expand Down Expand Up @@ -151,6 +152,21 @@ public BlockLoader.AggregateMetricDoubleBuilder aggregateMetricDoubleBuilder(int
return factory.newAggregateMetricDoubleBlockBuilder(count);
}

@Override
public BlockLoader.Block buildAggregateMetricDoubleDirect(
BlockLoader.Block minBlock,
BlockLoader.Block maxBlock,
BlockLoader.Block sumBlock,
BlockLoader.Block countBlock
) {
return factory.newAggregateMetricDoubleBlockFromDocValues(
(DoubleBlock) minBlock,
(DoubleBlock) maxBlock,
(DoubleBlock) sumBlock,
(IntBlock) countBlock
);
}

@Override
public BlockLoader.ExponentialHistogramBuilder exponentialHistogramBlockBuilder(int count) {
return factory.newExponentialHistogramBlockBuilder(count);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.aggregatemetric.mapper;

import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.util.NumericUtils;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.index.mapper.blockloader.docvalues.BlockDocValuesReader;

import java.io.IOException;
import java.util.EnumMap;

public class AggregateMetricDoubleBlockLoader extends BlockDocValuesReader.DocValuesBlockLoader {
final NumberFieldMapper.NumberFieldType minFieldType;
final NumberFieldMapper.NumberFieldType maxFieldType;
final NumberFieldMapper.NumberFieldType sumFieldType;
final NumberFieldMapper.NumberFieldType countFieldType;

AggregateMetricDoubleBlockLoader(EnumMap<AggregateMetricDoubleFieldMapper.Metric, NumberFieldMapper.NumberFieldType> metricsRequested) {
minFieldType = metricsRequested.getOrDefault(AggregateMetricDoubleFieldMapper.Metric.min, null);
maxFieldType = metricsRequested.getOrDefault(AggregateMetricDoubleFieldMapper.Metric.max, null);
sumFieldType = metricsRequested.getOrDefault(AggregateMetricDoubleFieldMapper.Metric.sum, null);
countFieldType = metricsRequested.getOrDefault(AggregateMetricDoubleFieldMapper.Metric.value_count, null);
}

private static NumericDocValues getNumericDocValues(NumberFieldMapper.NumberFieldType field, LeafReader leafReader) throws IOException {
if (field == null) {
return null;
}
String fieldName = field.name();
var values = leafReader.getNumericDocValues(fieldName);
if (values != null) {
return values;
}

var sortedValues = leafReader.getSortedNumericDocValues(fieldName);
return DocValues.unwrapSingleton(sortedValues);
}

@Override
public AllReader reader(LeafReaderContext context) throws IOException {
NumericDocValues minValues = getNumericDocValues(minFieldType, context.reader());
NumericDocValues maxValues = getNumericDocValues(maxFieldType, context.reader());
NumericDocValues sumValues = getNumericDocValues(sumFieldType, context.reader());
NumericDocValues valueCountValues = getNumericDocValues(countFieldType, context.reader());

return new BlockDocValuesReader() {

private int docID = -1;

@Override
protected int docId() {
return docID;
}

@Override
public String toString() {
return "BlockDocValuesReader.AggregateMetricDouble";
}

@Override
public Block read(BlockFactory factory, Docs docs, int offset, boolean nullsFiltered) throws IOException {
boolean success = false;
Block minBlock = null;
Block maxBlock = null;
Block sumBlock = null;
Block countBlock = null;
try {
minBlock = readDoubleSubblock(factory, docs, offset, minValues);
maxBlock = readDoubleSubblock(factory, docs, offset, maxValues);
sumBlock = readDoubleSubblock(factory, docs, offset, sumValues);
countBlock = readIntSubblock(factory, docs, offset, valueCountValues);
Block block = factory.buildAggregateMetricDoubleDirect(minBlock, maxBlock, sumBlock, countBlock);
success = true;
return block;
} finally {
if (success == false) {
Releasables.closeExpectNoException(minBlock, maxBlock, sumBlock, countBlock);
}
}
}

private Block readDoubleSubblock(BlockFactory factory, Docs docs, int offset, NumericDocValues values) throws IOException {
int count = docs.count() - offset;
if (values == null) {
return factory.constantNulls(count);
}
try (DoubleBuilder builder = factory.doubles(count)) {
copyDoubleValuesToBuilder(docs, offset, builder, values);
return builder.build();
}
}

private Block readIntSubblock(BlockFactory factory, Docs docs, int offset, NumericDocValues values) throws IOException {
int count = docs.count() - offset;
if (values == null) {
return factory.constantNulls(count);
}
try (IntBuilder builder = factory.ints(count)) {
copyIntValuesToBuilder(docs, offset, builder, values);
return builder.build();
}
}

private void copyDoubleValuesToBuilder(Docs docs, int offset, DoubleBuilder builder, NumericDocValues values)
throws IOException {
int lastDoc = -1;
for (int i = offset; i < docs.count(); i++) {
int doc = docs.get(i);
if (doc < lastDoc) {
throw new IllegalStateException("docs within same block must be in order");
}
if (values == null || values.advanceExact(doc) == false) {
builder.appendNull();
} else {
double value = NumericUtils.sortableLongToDouble(values.longValue());
lastDoc = doc;
this.docID = doc;
builder.appendDouble(value);
}
}
}

private void copyIntValuesToBuilder(Docs docs, int offset, IntBuilder builder, NumericDocValues values) throws IOException {
int lastDoc = -1;
for (int i = offset; i < docs.count(); i++) {
int doc = docs.get(i);
if (doc < lastDoc) {
throw new IllegalStateException("docs within same block must be in order");
}
if (values == null || values.advanceExact(doc) == false) {
builder.appendNull();
} else {
int value = Math.toIntExact(values.longValue());
lastDoc = doc;
this.docID = doc;
builder.appendInt(value);
}
}
}

@Override
public void read(int docId, StoredFields storedFields, Builder builder) throws IOException {
var blockBuilder = (AggregateMetricDoubleBuilder) builder;
this.docID = docId;
readSingleRow(docId, blockBuilder);
}

private void readSingleRow(int docId, AggregateMetricDoubleBuilder builder) throws IOException {
if (minValues != null && minValues.advanceExact(docId)) {
builder.min().appendDouble(NumericUtils.sortableLongToDouble(minValues.longValue()));
} else {
builder.min().appendNull();
}
if (maxValues != null && maxValues.advanceExact(docId)) {
builder.max().appendDouble(NumericUtils.sortableLongToDouble(maxValues.longValue()));
} else {
builder.max().appendNull();
}
if (sumValues != null && sumValues.advanceExact(docId)) {
builder.sum().appendDouble(NumericUtils.sortableLongToDouble(sumValues.longValue()));
} else {
builder.sum().appendNull();
}
if (valueCountValues != null && valueCountValues.advanceExact(docId)) {
builder.count().appendInt(Math.toIntExact(valueCountValues.longValue()));
} else {
builder.count().appendNull();
}
}
};
}

@Override
public Builder builder(BlockFactory factory, int expectedCount) {
return factory.aggregateMetricDoubleBuilder(expectedCount);
}
}
Loading