Skip to content

Commit

Permalink
Add support extracting keyword fields (#384)
Browse files Browse the repository at this point in the history
This PR adds support for extracting keyword fields from doc values to
ValuesSourceReaderOperator.
  • Loading branch information
dnhatn committed Nov 15, 2022
1 parent 526dbe6 commit 8535bd7
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 86 deletions.
Expand Up @@ -15,6 +15,7 @@
import org.apache.lucene.index.SortedNumericDocValues;
import org.elasticsearch.compute.Experimental;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BytesRefArrayBlock;
import org.elasticsearch.compute.data.ConstantIntBlock;
import org.elasticsearch.compute.data.DoubleArrayBlock;
import org.elasticsearch.compute.data.LongArrayBlock;
Expand All @@ -23,6 +24,7 @@
import org.elasticsearch.compute.operator.OperatorFactory;
import org.elasticsearch.index.fielddata.FieldData;
import org.elasticsearch.index.fielddata.NumericDoubleValues;
import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
import org.elasticsearch.search.aggregations.support.CoreValuesSourceType;
import org.elasticsearch.search.aggregations.support.ValuesSource;
Expand Down Expand Up @@ -212,91 +214,9 @@ private void initState(int ord, int shard) {
private void resetDocValues() {
try {
if (CoreValuesSourceType.NUMERIC.equals(lastValuesSourceType) || CoreValuesSourceType.DATE.equals(lastValuesSourceType)) {
ValuesSource.Numeric numericVS = (ValuesSource.Numeric) lastValuesSource;
if (numericVS.isFloatingPoint()) {
// Extract double values
SortedNumericDoubleValues sortedNumericDocValues = numericVS.doubleValues(lastLeafReaderContext);
final NumericDoubleValues numericDocValues = FieldData.unwrapSingleton(sortedNumericDocValues);
this.docValuesCollector = new DocValuesCollector() {
private double[] values;
private int positionCount;
private int i;

/**
* Store docID internally because class {@link NumericDoubleValues} does not support
* a docID() method.
*/
private int docID = -1;

@Override
public void initBlock(int positionCount) {
this.i = 0;
this.positionCount = positionCount;
this.values = new double[positionCount];
}

@Override
public int docID() {
return docID;
}

@Override
public void collect(int doc) throws IOException {
if (numericDocValues.advanceExact(doc) == false) {
throw new IllegalStateException("sparse fields not supported for now, could not read doc [" + doc + "]");
}
values[i++] = numericDocValues.doubleValue();
docID = doc;
}

@Override
public Block createBlock() {
Block block = new DoubleArrayBlock(values, positionCount);
// Set values[] to null to protect from overwriting this memory by subsequent calls to collect()
// without calling initBlock() first
values = null;
return block;
}
};
} else {
// Extract long values
SortedNumericDocValues sortedNumericDocValues = numericVS.longValues(lastLeafReaderContext);
final NumericDocValues numericDocValues = DocValues.unwrapSingleton(sortedNumericDocValues);
this.docValuesCollector = new DocValuesCollector() {
private long[] values;
private int positionCount;
private int i;

@Override
public void initBlock(int positionCount) {
this.values = new long[positionCount];
this.positionCount = positionCount;
this.i = 0;
}

@Override
public int docID() {
return numericDocValues.docID();
}

@Override
public void collect(int doc) throws IOException {
if (numericDocValues.advanceExact(doc) == false) {
throw new IllegalStateException("sparse fields not supported for now, could not read doc [" + doc + "]");
}
values[i++] = numericDocValues.longValue();
}

@Override
public Block createBlock() {
Block block = new LongArrayBlock(values, positionCount);
// Set values[] to null to protect from overwriting this memory by subsequent calls to collect()
// without calling initBlock() first
values = null;
return block;
}
};
}
resetNumericField((ValuesSource.Numeric) lastValuesSource);
} else if (CoreValuesSourceType.KEYWORD.equals(lastValuesSourceType)) {
resetKeywordField((ValuesSource.Bytes) lastValuesSource);
} else {
throw new IllegalArgumentException("Field type [" + lastValuesSourceType.typeName() + "] is not supported");
}
Expand All @@ -306,6 +226,128 @@ public Block createBlock() {
}
}

private void resetNumericField(ValuesSource.Numeric numericVS) throws IOException {
if (numericVS.isFloatingPoint()) {
// Extract double values
SortedNumericDoubleValues sortedNumericDocValues = numericVS.doubleValues(lastLeafReaderContext);
final NumericDoubleValues numericDocValues = FieldData.unwrapSingleton(sortedNumericDocValues);
this.docValuesCollector = new DocValuesCollector() {
private double[] values;
private int positionCount;
private int i;

/**
* Store docID internally because class {@link NumericDoubleValues} does not support
* a docID() method.
*/
private int docID = -1;

@Override
public void initBlock(int positionCount) {
this.i = 0;
this.positionCount = positionCount;
this.values = new double[positionCount];
}

@Override
public int docID() {
return docID;
}

@Override
public void collect(int doc) throws IOException {
if (numericDocValues.advanceExact(doc) == false) {
throw new IllegalStateException("sparse fields not supported for now, could not read doc [" + doc + "]");
}
values[i++] = numericDocValues.doubleValue();
docID = doc;
}

@Override
public Block createBlock() {
Block block = new DoubleArrayBlock(values, positionCount);
// Set values[] to null to protect from overwriting this memory by subsequent calls to collect()
// without calling initBlock() first
values = null;
return block;
}
};
} else {
// Extract long values
SortedNumericDocValues sortedNumericDocValues = numericVS.longValues(lastLeafReaderContext);
final NumericDocValues numericDocValues = DocValues.unwrapSingleton(sortedNumericDocValues);
this.docValuesCollector = new DocValuesCollector() {
private long[] values;
private int positionCount;
private int i;

@Override
public void initBlock(int positionCount) {
this.values = new long[positionCount];
this.positionCount = positionCount;
this.i = 0;
}

@Override
public int docID() {
return numericDocValues.docID();
}

@Override
public void collect(int doc) throws IOException {
if (numericDocValues.advanceExact(doc) == false) {
throw new IllegalStateException("sparse fields not supported for now, could not read doc [" + doc + "]");
}
values[i++] = numericDocValues.longValue();
}

@Override
public Block createBlock() {
Block block = new LongArrayBlock(values, positionCount);
// Set values[] to null to protect from overwriting this memory by subsequent calls to collect()
// without calling initBlock() first
values = null;
return block;
}
};
}
}

private void resetKeywordField(ValuesSource.Bytes bytesVS) throws IOException {
final SortedBinaryDocValues binaryDV = bytesVS.bytesValues(lastLeafReaderContext);
this.docValuesCollector = new DocValuesCollector() {
private BytesRefArrayBlock.Builder builder;
private int docID = -1;

@Override
public void initBlock(int positionCount) {
builder = BytesRefArrayBlock.builder(positionCount);
}

@Override
public int docID() {
return docID;
}

@Override
public void collect(int doc) throws IOException {
if (binaryDV.advanceExact(doc) == false) {
throw new IllegalStateException("sparse fields not supported for now, could not read doc [" + doc + "]");
}
docID = doc;
if (binaryDV.docValueCount() != 1) {
throw new IllegalStateException("multi-values not supported for now, could not read doc [" + doc + "]");
}
builder.append(binaryDV.nextValue());
}

@Override
public Block createBlock() {
return builder.build();
}
};
}

@Override
public void close() {
lastLeafReaderContext = null;
Expand Down
Expand Up @@ -45,6 +45,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;

@Experimental
@ESIntegTestCase.ClusterScope(scope = SUITE, numDataNodes = 1, numClientNodes = 0, supportsDedicatedMasters = false)
Expand Down Expand Up @@ -368,6 +369,44 @@ public void testESFilter() throws Exception {
}
}

public void testExtractFields() throws Exception {
String indexName = "test_extract_fields";
ElasticsearchAssertions.assertAcked(
client().admin()
.indices()
.prepareCreate(indexName)
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5)))
.setMapping("val", "type=long", "tag", "type=keyword")
.get()
);
int numDocs = randomIntBetween(1, 100);
List<IndexRequestBuilder> indexRequests = new ArrayList<>();
record Doc(long val, String tag) {

}
List<Doc> docs = new ArrayList<>();
for (int i = 0; i < numDocs; i++) {
Doc d = new Doc(i, "tag-" + randomIntBetween(1, 100));
docs.add(d);
indexRequests.add(
client().prepareIndex().setIndex(indexName).setId(Integer.toString(i)).setSource(Map.of("val", d.val, "tag", d.tag))
);
}
indexRandom(true, randomBoolean(), indexRequests);
int limit = randomIntBetween(1, 1); // TODO: increase the limit after resolving the limit issue
String command = "from test_extract_fields | sort val | limit " + limit;
EsqlQueryResponse results = run(command);
logger.info(results);
assertThat(results.values(), hasSize(Math.min(limit, numDocs)));
assertThat(results.columns().get(3).name(), equalTo("val"));
assertThat(results.columns().get(4).name(), equalTo("tag"));
for (int i = 0; i < results.values().size(); i++) {
List<Object> values = results.values().get(i);
assertThat(values.get(3), equalTo(docs.get(i).val));
assertThat(values.get(4), equalTo(docs.get(i).tag));
}
}

private EsqlQueryResponse run(String esqlCommands) {
return new EsqlQueryRequestBuilder(client(), EsqlQueryAction.INSTANCE).query(esqlCommands).pragmas(randomPragmas()).get();
}
Expand Down
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.esql.plugin;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
Expand Down Expand Up @@ -78,7 +79,14 @@ private List<List<Object>> pagesToValues(List<Page> pages) {
List<Object> row = new ArrayList<>(page.getBlockCount());
for (int b = 0; b < page.getBlockCount(); b++) {
Block block = page.getBlock(b);
row.add(block.getObject(i));
Object val = block.getObject(i);
// TODO: Should we do the conversion in Block#getObject instead?
// Or should we add a new method that returns a human representation to Block.
if (val instanceof BytesRef bytes) {
row.add(bytes.utf8ToString());
} else {
row.add(val);
}
}
result.add(row);
}
Expand Down

0 comments on commit 8535bd7

Please sign in to comment.