Skip to content

Commit

Permalink
ESQL: Improve ValuesSourceReaderOperator performance (#415)
Browse files Browse the repository at this point in the history
This PR modifies the ValuesSourceReaderOperator class so that the code
 iterating over the docID block and populating the field values block are 
closer removing any indirections that cause performance drop
  • Loading branch information
csoulios committed Nov 28, 2022
1 parent d2de316 commit ab5caca
Showing 1 changed file with 61 additions and 95 deletions.
Expand Up @@ -163,22 +163,11 @@ public void addInput(Page page) {
}

try {
docValuesCollector.initBlock(docs.getPositionCount());
int lastDoc = -1;
for (int i = 0; i < docs.getPositionCount(); i++) {
int doc = docs.getInt(i);
// docs within same block must be in order
if (lastDoc >= doc) {
throw new IllegalStateException("docs within same block must be in order");
}
docValuesCollector.collect(doc);
lastDoc = doc;
}
Block block = docValuesCollector.createBlock(docs);
lastPage = page.appendBlock(block);
} catch (IOException e) {
throw new UncheckedIOException(e);
}

lastPage = page.appendBlock(docValuesCollector.createBlock());
}
}

Expand Down Expand Up @@ -232,82 +221,67 @@ private void resetNumericField(ValuesSource.Numeric numericVS) throws IOExceptio
SortedNumericDoubleValues sortedNumericDocValues = numericVS.doubleValues(lastLeafReaderContext);
final NumericDoubleValues numericDocValues = FieldData.unwrapSingleton(sortedNumericDocValues);
this.docValuesCollector = new DocValuesCollector() {
private double[] values;
private int positionCount;
private int i;

/**
* Store docID internally because class {@link NumericDoubleValues} does not support
* a docID() method.
*/
private int docID = -1;

@Override
public void initBlock(int positionCount) {
this.i = 0;
this.positionCount = positionCount;
this.values = new double[positionCount];
}

@Override
public int docID() {
return docID;
}

@Override
public void collect(int doc) throws IOException {
if (numericDocValues.advanceExact(doc) == false) {
throw new IllegalStateException("sparse fields not supported for now, could not read doc [" + doc + "]");
public Block createBlock(Block docs) throws IOException {
final int positionCount = docs.getPositionCount();
final double[] values = new double[positionCount];
int lastDoc = -1;
for (int i = 0; i < positionCount; i++) {
int doc = docs.getInt(i);
// docs within same block must be in order
if (lastDoc >= doc) {
throw new IllegalStateException("docs within same block must be in order");
}
if (numericDocValues.advanceExact(doc) == false) {
throw new IllegalStateException("sparse fields not supported for now, could not read doc [" + doc + "]");
}
values[i] = numericDocValues.doubleValue();
lastDoc = doc;
docID = doc;
}
values[i++] = numericDocValues.doubleValue();
docID = doc;
}

@Override
public Block createBlock() {
Block block = new DoubleArrayBlock(values, positionCount);
// Set values[] to null to protect from overwriting this memory by subsequent calls to collect()
// without calling initBlock() first
values = null;
return block;
return new DoubleArrayBlock(values, positionCount);
}
};
} else {
// Extract long values
SortedNumericDocValues sortedNumericDocValues = numericVS.longValues(lastLeafReaderContext);
final NumericDocValues numericDocValues = DocValues.unwrapSingleton(sortedNumericDocValues);
this.docValuesCollector = new DocValuesCollector() {
private long[] values;
private int positionCount;
private int i;

@Override
public void initBlock(int positionCount) {
this.values = new long[positionCount];
this.positionCount = positionCount;
this.i = 0;
}

@Override
public int docID() {
return numericDocValues.docID();
}

@Override
public void collect(int doc) throws IOException {
if (numericDocValues.advanceExact(doc) == false) {
throw new IllegalStateException("sparse fields not supported for now, could not read doc [" + doc + "]");
public Block createBlock(Block docs) throws IOException {
final int positionCount = docs.getPositionCount();
final long[] values = new long[positionCount];
int lastDoc = -1;
for (int i = 0; i < positionCount; i++) {
int doc = docs.getInt(i);
// docs within same block must be in order
if (lastDoc >= doc) {
throw new IllegalStateException("docs within same block must be in order");
}
if (numericDocValues.advanceExact(doc) == false) {
throw new IllegalStateException("sparse fields not supported for now, could not read doc [" + doc + "]");
}
values[i] = numericDocValues.longValue();
lastDoc = doc;
}
values[i++] = numericDocValues.longValue();
}

@Override
public Block createBlock() {
Block block = new LongArrayBlock(values, positionCount);
// Set values[] to null to protect from overwriting this memory by subsequent calls to collect()
// without calling initBlock() first
values = null;
return block;
return new LongArrayBlock(values, positionCount);
}
};
}
Expand All @@ -316,33 +290,34 @@ public Block createBlock() {
private void resetKeywordField(ValuesSource.Bytes bytesVS) throws IOException {
final SortedBinaryDocValues binaryDV = bytesVS.bytesValues(lastLeafReaderContext);
this.docValuesCollector = new DocValuesCollector() {
private BytesRefArrayBlock.Builder builder;
private int docID = -1;

@Override
public void initBlock(int positionCount) {
builder = BytesRefArrayBlock.builder(positionCount);
}

@Override
public int docID() {
return docID;
}

@Override
public void collect(int doc) throws IOException {
if (binaryDV.advanceExact(doc) == false) {
throw new IllegalStateException("sparse fields not supported for now, could not read doc [" + doc + "]");
}
docID = doc;
if (binaryDV.docValueCount() != 1) {
throw new IllegalStateException("multi-values not supported for now, could not read doc [" + doc + "]");
public Block createBlock(Block docs) throws IOException {
final int positionCount = docs.getPositionCount();
BytesRefArrayBlock.Builder builder = BytesRefArrayBlock.builder(positionCount);
int lastDoc = -1;
for (int i = 0; i < docs.getPositionCount(); i++) {
int doc = docs.getInt(i);
// docs within same block must be in order
if (lastDoc >= doc) {
throw new IllegalStateException("docs within same block must be in order");
}
if (binaryDV.advanceExact(doc) == false) {
throw new IllegalStateException("sparse fields not supported for now, could not read doc [" + doc + "]");
}
if (binaryDV.docValueCount() != 1) {
throw new IllegalStateException("multi-values not supported for now, could not read doc [" + doc + "]");
}
builder.append(binaryDV.nextValue());
lastDoc = doc;
docID = doc;
}
builder.append(binaryDV.nextValue());
}

@Override
public Block createBlock() {
return builder.build();
}
};
Expand All @@ -363,29 +338,20 @@ public void close() {
interface DocValuesCollector {

/**
* Initialize {@link Block} memory for storing values. It must always be called
* before collecting documents for a new block.
* @param positionCount the position count for the block
* This method iterates over a block containing document ids and create a block
* containing all extracted values for the collected documents.
*
* @param docs a block containing the documents ids for the documents to read
* @return a {@link Block} with all extracted values
*/
void initBlock(int positionCount);
Block createBlock(Block docs) throws IOException;

/**
* Collect the given {@code doc}
*/
void collect(int doc) throws IOException;

/**
* Returns the following:
* @return the following:
* -1 if nextDoc() or advance(int) were not called yet.
* NO_MORE_DOCS if the iterator has exhausted.
* Otherwise, it should return the doc ID it is currently on.
*/
int docID();

/**
* Create a block containing all extracted values for the collected documents
* @return a {@link Block} with all values
*/
Block createBlock();
}
}

0 comments on commit ab5caca

Please sign in to comment.