Skip to content

Commit

Permalink
ESQL: Add support for reading sparse fields (elastic#454)
Browse files Browse the repository at this point in the history
This PR adds support for reading null values to the `ValuesSourceReaderOperator`
so sparse fields can be extracted.

It builds on the work done in PR elastic#400
  • Loading branch information
csoulios committed Dec 13, 2022
1 parent 6008788 commit cf8ccd0
Show file tree
Hide file tree
Showing 4 changed files with 230 additions and 16 deletions.
Expand Up @@ -11,15 +11,24 @@
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.BytesRefArray;
import org.elasticsearch.core.Nullable;

import java.util.BitSet;

/**
* Block implementation that stores an array of {@link org.apache.lucene.util.BytesRef}.
*/
public final class BytesRefArrayBlock extends Block {

private static final BytesRef NULL_VALUE = new BytesRef();
private final BytesRefArray bytes;

public BytesRefArrayBlock(int positionCount, BytesRefArray bytes) {
super(positionCount);
this(positionCount, bytes, null);
}

public BytesRefArrayBlock(int positionCount, BytesRefArray bytes, @Nullable BitSet nullsMask) {
super(positionCount, nullsMask);
assert bytes.size() == positionCount : bytes.size() + " != " + positionCount;
this.bytes = bytes;
}
Expand Down Expand Up @@ -47,9 +56,12 @@ public static final class Builder {
private final int positionCount;
private final BytesRefArray bytes;

private final BitSet nullsMask;

public Builder(int positionCount) {
this.positionCount = positionCount;
this.bytes = new BytesRefArray(positionCount, BigArrays.NON_RECYCLING_INSTANCE);
this.nullsMask = new BitSet(positionCount);
}

/**
Expand All @@ -62,11 +74,25 @@ public void append(BytesRef value) {
bytes.append(value);
}

public void appendNull() {
// Retrieve the size of the BytesRefArray so that we infer the current position
// Then use the position to set the bit in the nullsMask
int position = (int) bytes.size();
nullsMask.set(position);
append(NULL_VALUE);
}

public BytesRefArrayBlock build() {
if (bytes.size() != positionCount) {
throw new IllegalStateException("Incomplete block; expected " + positionCount + " values; got " + bytes.size());
}
return new BytesRefArrayBlock(positionCount, bytes);
// If nullsMask has no bit set, we pass null as the nulls mask, so that mayHaveNull() returns false
return new BytesRefArrayBlock(positionCount, bytes, nullsMask.cardinality() > 0 ? nullsMask : null);
}

// Method provided for testing only
protected BytesRefArray getBytes() {
return bytes;
}
}
}
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.search.aggregations.support.ValuesSourceType;

import java.io.IOException;
import java.util.BitSet;

/**
* A reader that supports reading doc-values from a Lucene segment in Block fashion.
Expand Down Expand Up @@ -88,20 +89,24 @@ private static class LongValuesReader extends BlockDocValuesReader {
public Block readValues(Block docs) throws IOException {
final int positionCount = docs.getPositionCount();
final long[] values = new long[positionCount];
final BitSet nullsMask = new BitSet(positionCount);
int lastDoc = -1;
for (int i = 0; i < positionCount; i++) {
int doc = docs.getInt(i);
// docs within same block must be in order
if (lastDoc >= doc) {
throw new IllegalStateException("docs within same block must be in order");
}
if (numericDocValues.advanceExact(doc) == false) {
throw new IllegalStateException("sparse fields not supported for now, could not read doc [" + doc + "]");
if (numericDocValues.advanceExact(doc)) {
values[i] = numericDocValues.longValue();
} else {
nullsMask.set(i);
values[i] = 0L;
}
values[i] = numericDocValues.longValue();
lastDoc = doc;
}
return new LongArrayBlock(values, positionCount);
// If nullsMask has no bit set, we pass null as the nulls mask, so that mayHaveNull() returns false
return new LongArrayBlock(values, positionCount, nullsMask.cardinality() > 0 ? nullsMask : null);
}

@Override
Expand All @@ -122,21 +127,25 @@ private static class DoubleValuesReader extends BlockDocValuesReader {
public Block readValues(Block docs) throws IOException {
final int positionCount = docs.getPositionCount();
final double[] values = new double[positionCount];
final BitSet nullsMask = new BitSet(positionCount);
int lastDoc = -1;
for (int i = 0; i < positionCount; i++) {
int doc = docs.getInt(i);
// docs within same block must be in order
if (lastDoc >= doc) {
throw new IllegalStateException("docs within same block must be in order");
}
if (numericDocValues.advanceExact(doc) == false) {
throw new IllegalStateException("sparse fields not supported for now, could not read doc [" + doc + "]");
if (numericDocValues.advanceExact(doc)) {
values[i] = numericDocValues.doubleValue();
} else {
nullsMask.set(i);
values[i] = 0.0d;
}
values[i] = numericDocValues.doubleValue();
lastDoc = doc;
this.docID = doc;
}
return new DoubleArrayBlock(values, positionCount);
// If nullsMask has no bit set, we pass null as the nulls mask, so that mayHaveNull() returns false
return new DoubleArrayBlock(values, positionCount, nullsMask.cardinality() > 0 ? nullsMask : null);
}

@Override
Expand Down Expand Up @@ -164,13 +173,17 @@ public Block readValues(Block docs) throws IOException {
if (lastDoc >= doc) {
throw new IllegalStateException("docs within same block must be in order");
}
if (binaryDV.advanceExact(doc) == false) {
throw new IllegalStateException("sparse fields not supported for now, could not read doc [" + doc + "]");
if (binaryDV.advanceExact(doc)) {
int dvCount = binaryDV.docValueCount();
if (dvCount != 1) {
throw new IllegalStateException(
"multi-values not supported for now, could not read doc [" + doc + "] with [" + dvCount + "] values"
);
}
builder.append(binaryDV.nextValue());
} else {
builder.appendNull();
}
if (binaryDV.docValueCount() != 1) {
throw new IllegalStateException("multi-values not supported for now, could not read doc [" + doc + "]");
}
builder.append(binaryDV.nextValue());
lastDoc = doc;
this.docID = doc;
}
Expand Down
125 changes: 125 additions & 0 deletions server/src/test/java/org/elasticsearch/compute/OperatorTests.java
Expand Up @@ -9,9 +9,11 @@
package org.elasticsearch.compute;

import org.apache.lucene.document.Document;
import org.apache.lucene.document.DoubleDocValuesField;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.SortedDocValuesField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
Expand Down Expand Up @@ -61,10 +63,14 @@
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.fielddata.IndexNumericFieldData;
import org.elasticsearch.index.fielddata.plain.SortedDoublesIndexFieldData;
import org.elasticsearch.index.fielddata.plain.SortedNumericIndexFieldData;
import org.elasticsearch.index.fielddata.plain.SortedSetOrdinalsIndexFieldData;
import org.elasticsearch.index.mapper.KeywordFieldMapper;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.search.aggregations.support.CoreValuesSourceType;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.test.ESTestCase;
Expand Down Expand Up @@ -291,6 +297,125 @@ public void testOperatorsWithLuceneSlicing() throws IOException {
}
}

public void testValuesSourceReaderOperatorWithLNulls() throws IOException {
final int numDocs = 100_000;
try (Directory dir = newDirectory(); RandomIndexWriter w = new RandomIndexWriter(random(), dir)) {
Document doc = new Document();
NumericDocValuesField intField = new NumericDocValuesField("i", 0);
NumericDocValuesField longField = new NumericDocValuesField("j", 0);
NumericDocValuesField doubleField = new DoubleDocValuesField("d", 0);
String kwFieldName = "kw";
for (int i = 0; i < numDocs; i++) {
doc.clear();
intField.setLongValue(i);
doc.add(intField);
if (i % 100 != 0) { // Do not set field for every 100 values
longField.setLongValue(i);
doc.add(longField);
doubleField.setDoubleValue(i);
doc.add(doubleField);
doc.add(new SortedDocValuesField(kwFieldName, new BytesRef("kw=" + i)));
}
w.addDocument(doc);
}
w.commit();

ValuesSource intVs = new ValuesSource.Numeric.FieldData(
new SortedNumericIndexFieldData(
intField.name(),
IndexNumericFieldData.NumericType.INT,
IndexNumericFieldData.NumericType.INT.getValuesSourceType(),
null
)
);
ValuesSource longVs = new ValuesSource.Numeric.FieldData(
new SortedNumericIndexFieldData(
longField.name(),
IndexNumericFieldData.NumericType.LONG,
IndexNumericFieldData.NumericType.LONG.getValuesSourceType(),
null
)
);
ValuesSource doubleVs = new ValuesSource.Numeric.FieldData(
new SortedDoublesIndexFieldData(
doubleField.name(),
IndexNumericFieldData.NumericType.DOUBLE,
IndexNumericFieldData.NumericType.DOUBLE.getValuesSourceType(),
null
)
);
var breakerService = new NoneCircuitBreakerService();
var cache = new IndexFieldDataCache.None();
ValuesSource keywordVs = new ValuesSource.Bytes.FieldData(
new SortedSetOrdinalsIndexFieldData(cache, kwFieldName, CoreValuesSourceType.KEYWORD, breakerService, null)
);

try (IndexReader reader = w.getReader()) {
// implements cardinality on value field
Driver driver = new Driver(
new LuceneSourceOperator(reader, 0, new MatchAllDocsQuery()),
List.of(
new ValuesSourceReaderOperator(
List.of(CoreValuesSourceType.NUMERIC),
List.of(intVs),
List.of(reader),
0,
1,
2,
intField.name()
),
new ValuesSourceReaderOperator(
List.of(CoreValuesSourceType.NUMERIC),
List.of(longVs),
List.of(reader),
0,
1,
2,
longField.name()
),
new ValuesSourceReaderOperator(
List.of(CoreValuesSourceType.NUMERIC),
List.of(doubleVs),
List.of(reader),
0,
1,
2,
doubleField.name()
),
new ValuesSourceReaderOperator(
List.of(CoreValuesSourceType.KEYWORD),
List.of(keywordVs),
List.of(reader),
0,
1,
2,
kwFieldName
)
),
new PageConsumerOperator(page -> {
logger.debug("New page: {}", page);
Block intValuesBlock = page.getBlock(3);
Block longValuesBlock = page.getBlock(4);
Block doubleValuesBlock = page.getBlock(5);
Block keywordValuesBlock = page.getBlock(6);

for (int i = 0; i < page.getPositionCount(); i++) {
assertFalse(intValuesBlock.isNull(i));
long j = intValuesBlock.getLong(i);
// Every 100 documents we set fields to null
boolean fieldIsEmpty = j % 100 == 0;
assertEquals(fieldIsEmpty, longValuesBlock.isNull(i));
assertEquals(fieldIsEmpty, doubleValuesBlock.isNull(i));
assertEquals(fieldIsEmpty, keywordValuesBlock.isNull(i));
}
}),
() -> {}
);
driver.run();
}
}
}

public void testQueryOperator() throws IOException {
Map<BytesRef, Long> docs = new HashMap<>();
CheckedConsumer<DirectoryReader, IOException> verifier = reader -> {
Expand Down
Expand Up @@ -166,6 +166,15 @@ public void testBytesRefBlock() {
expectThrows(UOE, () -> block.getLong(pos));
expectThrows(UOE, () -> block.getDouble(pos));
}

assertNullValues(
positionCount,
nulls -> new BytesRefArrayBlock(positionCount, builder.getBytes(), nulls),
(randomNonNullPosition, b) -> assertThat(
values[randomNonNullPosition],
is(b.getBytesRef(randomNonNullPosition, new BytesRef()))
)
);
}

public void testBytesRefBlockBuilder() {
Expand Down Expand Up @@ -193,6 +202,47 @@ public void testBytesRefBlockBuilder() {
assertThat(block.getPositionCount(), equalTo(positionCount));
}

public void testBytesRefBlockBuilderWithNulls() {
int positionCount = randomIntBetween(0, 16 * 1024);
BytesRefArrayBlock.Builder builder = BytesRefArrayBlock.builder(positionCount);
BytesRef[] values = new BytesRef[positionCount];
for (int i = 0; i < positionCount; i++) {
if (randomBoolean()) {
// Add random sparseness
builder.appendNull();
values[i] = null;
} else {
BytesRef bytesRef = new BytesRef(randomByteArrayOfLength(between(1, 20)));
if (bytesRef.length > 0 && randomBoolean()) {
bytesRef.offset = randomIntBetween(0, bytesRef.length - 1);
bytesRef.length = randomIntBetween(0, bytesRef.length - bytesRef.offset);
}
values[i] = bytesRef;
if (randomBoolean()) {
bytesRef = BytesRef.deepCopyOf(bytesRef);
}
builder.append(bytesRef);
}
}
BytesRefArrayBlock block = builder.build();
assertThat(positionCount, is(block.getPositionCount()));
BytesRef bytes = new BytesRef();
for (int i = 0; i < positionCount; i++) {
int pos = randomIntBetween(0, positionCount - 1);
bytes = block.getBytesRef(pos, bytes);
if (values[pos] == null) {
assertThat(block.isNull(pos), equalTo(true));
assertThat(bytes, equalTo(new BytesRef()));
} else {
assertThat(bytes, equalTo(values[pos]));
assertThat(block.getObject(pos), equalTo(values[pos]));
}
expectThrows(UOE, () -> block.getInt(pos));
expectThrows(UOE, () -> block.getLong(pos));
expectThrows(UOE, () -> block.getDouble(pos));
}
}

public void testConstantBytesRefBlock() {
for (int i = 0; i < 1000; i++) {
int positionCount = randomIntBetween(1, Integer.MAX_VALUE);
Expand Down

0 comments on commit cf8ccd0

Please sign in to comment.