diff --git a/server/src/main/java/org/elasticsearch/compute/data/BytesRefArrayBlock.java b/server/src/main/java/org/elasticsearch/compute/data/BytesRefArrayBlock.java index abdddc68ec8f9a..cf5f955ae5f246 100644 --- a/server/src/main/java/org/elasticsearch/compute/data/BytesRefArrayBlock.java +++ b/server/src/main/java/org/elasticsearch/compute/data/BytesRefArrayBlock.java @@ -11,15 +11,24 @@ import org.apache.lucene.util.BytesRef; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BytesRefArray; +import org.elasticsearch.core.Nullable; + +import java.util.BitSet; /** * Block implementation that stores an array of {@link org.apache.lucene.util.BytesRef}. */ public final class BytesRefArrayBlock extends Block { + + private static final BytesRef NULL_VALUE = new BytesRef(); private final BytesRefArray bytes; public BytesRefArrayBlock(int positionCount, BytesRefArray bytes) { - super(positionCount); + this(positionCount, bytes, null); + } + + public BytesRefArrayBlock(int positionCount, BytesRefArray bytes, @Nullable BitSet nullsMask) { + super(positionCount, nullsMask); assert bytes.size() == positionCount : bytes.size() + " != " + positionCount; this.bytes = bytes; } @@ -47,9 +56,12 @@ public static final class Builder { private final int positionCount; private final BytesRefArray bytes; + private final BitSet nullsMask; + public Builder(int positionCount) { this.positionCount = positionCount; this.bytes = new BytesRefArray(positionCount, BigArrays.NON_RECYCLING_INSTANCE); + this.nullsMask = new BitSet(positionCount); } /** @@ -62,11 +74,25 @@ public void append(BytesRef value) { bytes.append(value); } + public void appendNull() { + // Retrieve the size of the BytesRefArray so that we infer the current position + // Then use the position to set the bit in the nullsMask + int position = (int) bytes.size(); + nullsMask.set(position); + append(NULL_VALUE); + } + public BytesRefArrayBlock build() { if (bytes.size() != positionCount) { throw new IllegalStateException("Incomplete block; expected " + positionCount + " values; got " + bytes.size()); } - return new BytesRefArrayBlock(positionCount, bytes); + // If nullsMask has no bit set, we pass null as the nulls mask, so that mayHaveNull() returns false + return new BytesRefArrayBlock(positionCount, bytes, nullsMask.cardinality() > 0 ? nullsMask : null); + } + + // Method provided for testing only + protected BytesRefArray getBytes() { + return bytes; } } } diff --git a/server/src/main/java/org/elasticsearch/compute/lucene/BlockDocValuesReader.java b/server/src/main/java/org/elasticsearch/compute/lucene/BlockDocValuesReader.java index 12a4a823a60188..f9825dd251fedd 100644 --- a/server/src/main/java/org/elasticsearch/compute/lucene/BlockDocValuesReader.java +++ b/server/src/main/java/org/elasticsearch/compute/lucene/BlockDocValuesReader.java @@ -25,6 +25,7 @@ import org.elasticsearch.search.aggregations.support.ValuesSourceType; import java.io.IOException; +import java.util.BitSet; /** * A reader that supports reading doc-values from a Lucene segment in Block fashion. @@ -88,6 +89,7 @@ private static class LongValuesReader extends BlockDocValuesReader { public Block readValues(Block docs) throws IOException { final int positionCount = docs.getPositionCount(); final long[] values = new long[positionCount]; + final BitSet nullsMask = new BitSet(positionCount); int lastDoc = -1; for (int i = 0; i < positionCount; i++) { int doc = docs.getInt(i); @@ -95,13 +97,16 @@ public Block readValues(Block docs) throws IOException { if (lastDoc >= doc) { throw new IllegalStateException("docs within same block must be in order"); } - if (numericDocValues.advanceExact(doc) == false) { - throw new IllegalStateException("sparse fields not supported for now, could not read doc [" + doc + "]"); + if (numericDocValues.advanceExact(doc)) { + values[i] = numericDocValues.longValue(); + } else { + nullsMask.set(i); + values[i] = 0L; } - values[i] = numericDocValues.longValue(); lastDoc = doc; } - return new LongArrayBlock(values, positionCount); + // If nullsMask has no bit set, we pass null as the nulls mask, so that mayHaveNull() returns false + return new LongArrayBlock(values, positionCount, nullsMask.cardinality() > 0 ? nullsMask : null); } @Override @@ -122,6 +127,7 @@ private static class DoubleValuesReader extends BlockDocValuesReader { public Block readValues(Block docs) throws IOException { final int positionCount = docs.getPositionCount(); final double[] values = new double[positionCount]; + final BitSet nullsMask = new BitSet(positionCount); int lastDoc = -1; for (int i = 0; i < positionCount; i++) { int doc = docs.getInt(i); @@ -129,14 +135,17 @@ public Block readValues(Block docs) throws IOException { if (lastDoc >= doc) { throw new IllegalStateException("docs within same block must be in order"); } - if (numericDocValues.advanceExact(doc) == false) { - throw new IllegalStateException("sparse fields not supported for now, could not read doc [" + doc + "]"); + if (numericDocValues.advanceExact(doc)) { + values[i] = numericDocValues.doubleValue(); + } else { + nullsMask.set(i); + values[i] = 0.0d; } - values[i] = numericDocValues.doubleValue(); lastDoc = doc; this.docID = doc; } - return new DoubleArrayBlock(values, positionCount); + // If nullsMask has no bit set, we pass null as the nulls mask, so that mayHaveNull() returns false + return new DoubleArrayBlock(values, positionCount, nullsMask.cardinality() > 0 ? nullsMask : null); } @Override @@ -164,13 +173,17 @@ public Block readValues(Block docs) throws IOException { if (lastDoc >= doc) { throw new IllegalStateException("docs within same block must be in order"); } - if (binaryDV.advanceExact(doc) == false) { - throw new IllegalStateException("sparse fields not supported for now, could not read doc [" + doc + "]"); + if (binaryDV.advanceExact(doc)) { + int dvCount = binaryDV.docValueCount(); + if (dvCount != 1) { + throw new IllegalStateException( + "multi-values not supported for now, could not read doc [" + doc + "] with [" + dvCount + "] values" + ); + } + builder.append(binaryDV.nextValue()); + } else { + builder.appendNull(); } - if (binaryDV.docValueCount() != 1) { - throw new IllegalStateException("multi-values not supported for now, could not read doc [" + doc + "]"); - } - builder.append(binaryDV.nextValue()); lastDoc = doc; this.docID = doc; } diff --git a/server/src/test/java/org/elasticsearch/compute/OperatorTests.java b/server/src/test/java/org/elasticsearch/compute/OperatorTests.java index a746039799e094..4cc4abcc588f3f 100644 --- a/server/src/test/java/org/elasticsearch/compute/OperatorTests.java +++ b/server/src/test/java/org/elasticsearch/compute/OperatorTests.java @@ -9,9 +9,11 @@ package org.elasticsearch.compute; import org.apache.lucene.document.Document; +import org.apache.lucene.document.DoubleDocValuesField; import org.apache.lucene.document.Field; import org.apache.lucene.document.LongPoint; import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.document.SortedDocValuesField; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.LeafReaderContext; @@ -61,10 +63,14 @@ import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; +import org.elasticsearch.index.fielddata.IndexFieldDataCache; import org.elasticsearch.index.fielddata.IndexNumericFieldData; +import org.elasticsearch.index.fielddata.plain.SortedDoublesIndexFieldData; import org.elasticsearch.index.fielddata.plain.SortedNumericIndexFieldData; +import org.elasticsearch.index.fielddata.plain.SortedSetOrdinalsIndexFieldData; import org.elasticsearch.index.mapper.KeywordFieldMapper; import org.elasticsearch.index.mapper.Uid; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.search.aggregations.support.CoreValuesSourceType; import org.elasticsearch.search.aggregations.support.ValuesSource; import org.elasticsearch.test.ESTestCase; @@ -291,6 +297,125 @@ public void testOperatorsWithLuceneSlicing() throws IOException { } } + public void testValuesSourceReaderOperatorWithLNulls() throws IOException { + final int numDocs = 100_000; + try (Directory dir = newDirectory(); RandomIndexWriter w = new RandomIndexWriter(random(), dir)) { + Document doc = new Document(); + NumericDocValuesField intField = new NumericDocValuesField("i", 0); + NumericDocValuesField longField = new NumericDocValuesField("j", 0); + NumericDocValuesField doubleField = new DoubleDocValuesField("d", 0); + String kwFieldName = "kw"; + for (int i = 0; i < numDocs; i++) { + doc.clear(); + intField.setLongValue(i); + doc.add(intField); + if (i % 100 != 0) { // Do not set field for every 100 values + longField.setLongValue(i); + doc.add(longField); + doubleField.setDoubleValue(i); + doc.add(doubleField); + doc.add(new SortedDocValuesField(kwFieldName, new BytesRef("kw=" + i))); + } + w.addDocument(doc); + } + w.commit(); + + ValuesSource intVs = new ValuesSource.Numeric.FieldData( + new SortedNumericIndexFieldData( + intField.name(), + IndexNumericFieldData.NumericType.INT, + IndexNumericFieldData.NumericType.INT.getValuesSourceType(), + null + ) + ); + ValuesSource longVs = new ValuesSource.Numeric.FieldData( + new SortedNumericIndexFieldData( + longField.name(), + IndexNumericFieldData.NumericType.LONG, + IndexNumericFieldData.NumericType.LONG.getValuesSourceType(), + null + ) + ); + ValuesSource doubleVs = new ValuesSource.Numeric.FieldData( + new SortedDoublesIndexFieldData( + doubleField.name(), + IndexNumericFieldData.NumericType.DOUBLE, + IndexNumericFieldData.NumericType.DOUBLE.getValuesSourceType(), + null + ) + ); + var breakerService = new NoneCircuitBreakerService(); + var cache = new IndexFieldDataCache.None(); + ValuesSource keywordVs = new ValuesSource.Bytes.FieldData( + new SortedSetOrdinalsIndexFieldData(cache, kwFieldName, CoreValuesSourceType.KEYWORD, breakerService, null) + ); + + try (IndexReader reader = w.getReader()) { + // implements cardinality on value field + Driver driver = new Driver( + new LuceneSourceOperator(reader, 0, new MatchAllDocsQuery()), + List.of( + new ValuesSourceReaderOperator( + List.of(CoreValuesSourceType.NUMERIC), + List.of(intVs), + List.of(reader), + 0, + 1, + 2, + intField.name() + ), + new ValuesSourceReaderOperator( + List.of(CoreValuesSourceType.NUMERIC), + List.of(longVs), + List.of(reader), + 0, + 1, + 2, + longField.name() + ), + new ValuesSourceReaderOperator( + List.of(CoreValuesSourceType.NUMERIC), + List.of(doubleVs), + List.of(reader), + 0, + 1, + 2, + doubleField.name() + ), + new ValuesSourceReaderOperator( + List.of(CoreValuesSourceType.KEYWORD), + List.of(keywordVs), + List.of(reader), + 0, + 1, + 2, + kwFieldName + ) + ), + new PageConsumerOperator(page -> { + logger.debug("New page: {}", page); + Block intValuesBlock = page.getBlock(3); + Block longValuesBlock = page.getBlock(4); + Block doubleValuesBlock = page.getBlock(5); + Block keywordValuesBlock = page.getBlock(6); + + for (int i = 0; i < page.getPositionCount(); i++) { + assertFalse(intValuesBlock.isNull(i)); + long j = intValuesBlock.getLong(i); + // Every 100 documents we set fields to null + boolean fieldIsEmpty = j % 100 == 0; + assertEquals(fieldIsEmpty, longValuesBlock.isNull(i)); + assertEquals(fieldIsEmpty, doubleValuesBlock.isNull(i)); + assertEquals(fieldIsEmpty, keywordValuesBlock.isNull(i)); + } + }), + () -> {} + ); + driver.run(); + } + } + } + public void testQueryOperator() throws IOException { Map docs = new HashMap<>(); CheckedConsumer verifier = reader -> { diff --git a/server/src/test/java/org/elasticsearch/compute/data/BasicBlockTests.java b/server/src/test/java/org/elasticsearch/compute/data/BasicBlockTests.java index c20a92a7c317aa..f0228dd9e77d83 100644 --- a/server/src/test/java/org/elasticsearch/compute/data/BasicBlockTests.java +++ b/server/src/test/java/org/elasticsearch/compute/data/BasicBlockTests.java @@ -166,6 +166,15 @@ public void testBytesRefBlock() { expectThrows(UOE, () -> block.getLong(pos)); expectThrows(UOE, () -> block.getDouble(pos)); } + + assertNullValues( + positionCount, + nulls -> new BytesRefArrayBlock(positionCount, builder.getBytes(), nulls), + (randomNonNullPosition, b) -> assertThat( + values[randomNonNullPosition], + is(b.getBytesRef(randomNonNullPosition, new BytesRef())) + ) + ); } public void testBytesRefBlockBuilder() { @@ -193,6 +202,47 @@ public void testBytesRefBlockBuilder() { assertThat(block.getPositionCount(), equalTo(positionCount)); } + public void testBytesRefBlockBuilderWithNulls() { + int positionCount = randomIntBetween(0, 16 * 1024); + BytesRefArrayBlock.Builder builder = BytesRefArrayBlock.builder(positionCount); + BytesRef[] values = new BytesRef[positionCount]; + for (int i = 0; i < positionCount; i++) { + if (randomBoolean()) { + // Add random sparseness + builder.appendNull(); + values[i] = null; + } else { + BytesRef bytesRef = new BytesRef(randomByteArrayOfLength(between(1, 20))); + if (bytesRef.length > 0 && randomBoolean()) { + bytesRef.offset = randomIntBetween(0, bytesRef.length - 1); + bytesRef.length = randomIntBetween(0, bytesRef.length - bytesRef.offset); + } + values[i] = bytesRef; + if (randomBoolean()) { + bytesRef = BytesRef.deepCopyOf(bytesRef); + } + builder.append(bytesRef); + } + } + BytesRefArrayBlock block = builder.build(); + assertThat(positionCount, is(block.getPositionCount())); + BytesRef bytes = new BytesRef(); + for (int i = 0; i < positionCount; i++) { + int pos = randomIntBetween(0, positionCount - 1); + bytes = block.getBytesRef(pos, bytes); + if (values[pos] == null) { + assertThat(block.isNull(pos), equalTo(true)); + assertThat(bytes, equalTo(new BytesRef())); + } else { + assertThat(bytes, equalTo(values[pos])); + assertThat(block.getObject(pos), equalTo(values[pos])); + } + expectThrows(UOE, () -> block.getInt(pos)); + expectThrows(UOE, () -> block.getLong(pos)); + expectThrows(UOE, () -> block.getDouble(pos)); + } + } + public void testConstantBytesRefBlock() { for (int i = 0; i < 1000; i++) { int positionCount = randomIntBetween(1, Integer.MAX_VALUE);