Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,60 +7,60 @@

package org.elasticsearch.xpack.aggregatemetric.mapper;

import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.util.NumericUtils;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.index.mapper.blockloader.docvalues.BlockDocValuesReader;
import org.elasticsearch.index.mapper.blockloader.docvalues.DoublesBlockLoader;
import org.elasticsearch.index.mapper.blockloader.docvalues.IntsBlockLoader;

import java.io.IOException;
import java.util.EnumMap;

public class AggregateMetricDoubleBlockLoader extends BlockDocValuesReader.DocValuesBlockLoader {
final NumberFieldMapper.NumberFieldType minFieldType;
final NumberFieldMapper.NumberFieldType maxFieldType;
final NumberFieldMapper.NumberFieldType sumFieldType;
final NumberFieldMapper.NumberFieldType countFieldType;
private final DoublesBlockLoader minLoader;
private final DoublesBlockLoader maxLoader;
private final DoublesBlockLoader sumLoader;
private final IntsBlockLoader countLoader;

AggregateMetricDoubleBlockLoader(EnumMap<AggregateMetricDoubleFieldMapper.Metric, NumberFieldMapper.NumberFieldType> metricsRequested) {
minFieldType = metricsRequested.getOrDefault(AggregateMetricDoubleFieldMapper.Metric.min, null);
maxFieldType = metricsRequested.getOrDefault(AggregateMetricDoubleFieldMapper.Metric.max, null);
sumFieldType = metricsRequested.getOrDefault(AggregateMetricDoubleFieldMapper.Metric.sum, null);
countFieldType = metricsRequested.getOrDefault(AggregateMetricDoubleFieldMapper.Metric.value_count, null);
minLoader = getDoublesBlockLoader(AggregateMetricDoubleFieldMapper.Metric.min, metricsRequested);
maxLoader = getDoublesBlockLoader(AggregateMetricDoubleFieldMapper.Metric.max, metricsRequested);
sumLoader = getDoublesBlockLoader(AggregateMetricDoubleFieldMapper.Metric.sum, metricsRequested);
countLoader = getIntsBlockLoader(AggregateMetricDoubleFieldMapper.Metric.value_count, metricsRequested);
}

private static NumericDocValues getNumericDocValues(NumberFieldMapper.NumberFieldType field, LeafReader leafReader) throws IOException {
if (field == null) {
private static DoublesBlockLoader getDoublesBlockLoader(
AggregateMetricDoubleFieldMapper.Metric metric,
EnumMap<AggregateMetricDoubleFieldMapper.Metric, NumberFieldMapper.NumberFieldType> metricsRequested
) {
if (metricsRequested.containsKey(metric) == false) {
return null;
}
String fieldName = field.name();
var values = leafReader.getNumericDocValues(fieldName);
if (values != null) {
return values;
}
var toLoad = metricsRequested.get(metric);
return new DoublesBlockLoader(toLoad.name(), NumericUtils::sortableLongToDouble);
}

var sortedValues = leafReader.getSortedNumericDocValues(fieldName);
return DocValues.unwrapSingleton(sortedValues);
private static IntsBlockLoader getIntsBlockLoader(
AggregateMetricDoubleFieldMapper.Metric metric,
EnumMap<AggregateMetricDoubleFieldMapper.Metric, NumberFieldMapper.NumberFieldType> metricsRequested
) {
if (metricsRequested.containsKey(metric) == false) {
return null;
}
var toLoad = metricsRequested.get(metric);
return new IntsBlockLoader(toLoad.name());
}

@Override
public AllReader reader(LeafReaderContext context) throws IOException {
NumericDocValues minValues = getNumericDocValues(minFieldType, context.reader());
NumericDocValues maxValues = getNumericDocValues(maxFieldType, context.reader());
NumericDocValues sumValues = getNumericDocValues(sumFieldType, context.reader());
NumericDocValues valueCountValues = getNumericDocValues(countFieldType, context.reader());

return new BlockDocValuesReader() {
AllReader minReader = minLoader != null ? minLoader.reader(context) : null;
AllReader maxReader = maxLoader != null ? maxLoader.reader(context) : null;
AllReader sumReader = sumLoader != null ? sumLoader.reader(context) : null;
AllReader countReader = countLoader != null ? countLoader.reader(context) : null;

private int docID = -1;

@Override
protected int docId() {
return docID;
}
return new AllReader() {

@Override
public String toString() {
Expand All @@ -75,10 +75,13 @@ public Block read(BlockFactory factory, Docs docs, int offset, boolean nullsFilt
Block sumBlock = null;
Block countBlock = null;
try {
minBlock = readDoubleSubblock(factory, docs, offset, minValues);
maxBlock = readDoubleSubblock(factory, docs, offset, maxValues);
sumBlock = readDoubleSubblock(factory, docs, offset, sumValues);
countBlock = readIntSubblock(factory, docs, offset, valueCountValues);
int count = docs.count() - offset;
minBlock = minReader != null ? minReader.read(factory, docs, offset, nullsFiltered) : factory.constantNulls(count);
maxBlock = maxReader != null ? maxReader.read(factory, docs, offset, nullsFiltered) : factory.constantNulls(count);
sumBlock = sumReader != null ? sumReader.read(factory, docs, offset, nullsFiltered) : factory.constantNulls(count);
countBlock = countReader != null
? countReader.read(factory, docs, offset, nullsFiltered)
: factory.constantNulls(count);
Block block = factory.buildAggregateMetricDoubleDirect(minBlock, maxBlock, sumBlock, countBlock);
success = true;
return block;
Expand All @@ -89,94 +92,31 @@ public Block read(BlockFactory factory, Docs docs, int offset, boolean nullsFilt
}
}

private Block readDoubleSubblock(BlockFactory factory, Docs docs, int offset, NumericDocValues values) throws IOException {
int count = docs.count() - offset;
if (values == null) {
return factory.constantNulls(count);
}
try (DoubleBuilder builder = factory.doubles(count)) {
copyDoubleValuesToBuilder(docs, offset, builder, values);
return builder.build();
}
}

private Block readIntSubblock(BlockFactory factory, Docs docs, int offset, NumericDocValues values) throws IOException {
int count = docs.count() - offset;
if (values == null) {
return factory.constantNulls(count);
}
try (IntBuilder builder = factory.ints(count)) {
copyIntValuesToBuilder(docs, offset, builder, values);
return builder.build();
}
}

private void copyDoubleValuesToBuilder(Docs docs, int offset, DoubleBuilder builder, NumericDocValues values)
throws IOException {
int lastDoc = -1;
for (int i = offset; i < docs.count(); i++) {
int doc = docs.get(i);
if (doc < lastDoc) {
throw new IllegalStateException("docs within same block must be in order");
}
if (values == null || values.advanceExact(doc) == false) {
builder.appendNull();
} else {
double value = NumericUtils.sortableLongToDouble(values.longValue());
lastDoc = doc;
this.docID = doc;
builder.appendDouble(value);
}
}
}

private void copyIntValuesToBuilder(Docs docs, int offset, IntBuilder builder, NumericDocValues values) throws IOException {
int lastDoc = -1;
for (int i = offset; i < docs.count(); i++) {
int doc = docs.get(i);
if (doc < lastDoc) {
throw new IllegalStateException("docs within same block must be in order");
}
if (values == null || values.advanceExact(doc) == false) {
builder.appendNull();
} else {
int value = Math.toIntExact(values.longValue());
lastDoc = doc;
this.docID = doc;
builder.appendInt(value);
}
}
}

@Override
public void read(int docId, StoredFields storedFields, Builder builder) throws IOException {
var blockBuilder = (AggregateMetricDoubleBuilder) builder;
this.docID = docId;
readSingleRow(docId, blockBuilder);
readSingleRowFromSubblock(docId, storedFields, blockBuilder.min(), minReader);
readSingleRowFromSubblock(docId, storedFields, blockBuilder.max(), maxReader);
readSingleRowFromSubblock(docId, storedFields, blockBuilder.sum(), sumReader);
readSingleRowFromSubblock(docId, storedFields, blockBuilder.count(), countReader);
}

private void readSingleRow(int docId, AggregateMetricDoubleBuilder builder) throws IOException {
if (minValues != null && minValues.advanceExact(docId)) {
builder.min().appendDouble(NumericUtils.sortableLongToDouble(minValues.longValue()));
} else {
builder.min().appendNull();
}
if (maxValues != null && maxValues.advanceExact(docId)) {
builder.max().appendDouble(NumericUtils.sortableLongToDouble(maxValues.longValue()));
} else {
builder.max().appendNull();
}
if (sumValues != null && sumValues.advanceExact(docId)) {
builder.sum().appendDouble(NumericUtils.sortableLongToDouble(sumValues.longValue()));
} else {
builder.sum().appendNull();
}
if (valueCountValues != null && valueCountValues.advanceExact(docId)) {
builder.count().appendInt(Math.toIntExact(valueCountValues.longValue()));
private void readSingleRowFromSubblock(int docID, StoredFields storedFields, Builder builder, AllReader reader)
throws IOException {
if (reader == null) {
builder.appendNull();
} else {
builder.count().appendNull();
reader.read(docID, storedFields, builder);
}
}

@Override
public boolean canReuse(int startingDocID) {
return (minReader == null || minReader.canReuse(startingDocID))
&& (maxReader == null || maxReader.canReuse(startingDocID))
&& (sumReader == null || sumReader.canReuse(startingDocID))
&& (countReader == null || countReader.canReuse(startingDocID));
}
};
}

Expand Down