Skip to content

Commit

Permalink
Track blocks of intermediate state of aggs (#102562)
Browse files Browse the repository at this point in the history
This PR tracks blocks created when evaluating the intermediate state of aggregation.
  • Loading branch information
dnhatn committed Nov 24, 2023
1 parent 391f291 commit e0df305
Show file tree
Hide file tree
Showing 35 changed files with 98 additions and 98 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/102562.yaml
@@ -0,0 +1,5 @@
pr: 102562
summary: Track blocks of intermediate state of aggs
area: ES|QL
type: enhancement
issues: []
Expand Up @@ -478,8 +478,9 @@ private MethodSpec evaluateIntermediate() {
builder.addAnnotation(Override.class)
.addModifiers(Modifier.PUBLIC)
.addParameter(BLOCK_ARRAY, "blocks")
.addParameter(TypeName.INT, "offset");
builder.addStatement("state.toIntermediate(blocks, offset)");
.addParameter(TypeName.INT, "offset")
.addParameter(DRIVER_CONTEXT, "driverContext");
builder.addStatement("state.toIntermediate(blocks, offset, driverContext)");
return builder.build();
}

Expand Down
Expand Up @@ -8,8 +8,7 @@
package org.elasticsearch.compute.aggregation;

import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.ConstantBooleanVector;
import org.elasticsearch.compute.data.ConstantDoubleVector;
import org.elasticsearch.compute.operator.DriverContext;

/**
* Aggregator state for a single double.
Expand Down Expand Up @@ -45,10 +44,10 @@ void seen(boolean seen) {

/** Extracts an intermediate view of the contents of this state. */
@Override
public void toIntermediate(Block[] blocks, int offset) {
public void toIntermediate(Block[] blocks, int offset, DriverContext driverContext) {
assert blocks.length >= offset + 2;
blocks[offset + 0] = new ConstantDoubleVector(value, 1).asBlock();
blocks[offset + 1] = new ConstantBooleanVector(seen, 1).asBlock();
blocks[offset + 0] = driverContext.blockFactory().newConstantDoubleBlockWith(value, 1);
blocks[offset + 1] = driverContext.blockFactory().newConstantBooleanBlockWith(seen, 1);
}

@Override
Expand Down
Expand Up @@ -8,8 +8,7 @@
package org.elasticsearch.compute.aggregation;

import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.ConstantBooleanVector;
import org.elasticsearch.compute.data.ConstantIntVector;
import org.elasticsearch.compute.operator.DriverContext;

/**
* Aggregator state for a single int.
Expand Down Expand Up @@ -45,10 +44,10 @@ void seen(boolean seen) {

/** Extracts an intermediate view of the contents of this state. */
@Override
public void toIntermediate(Block[] blocks, int offset) {
public void toIntermediate(Block[] blocks, int offset, DriverContext driverContext) {
assert blocks.length >= offset + 2;
blocks[offset + 0] = new ConstantIntVector(value, 1).asBlock();
blocks[offset + 1] = new ConstantBooleanVector(seen, 1).asBlock();
blocks[offset + 0] = driverContext.blockFactory().newConstantIntBlockWith(value, 1);
blocks[offset + 1] = driverContext.blockFactory().newConstantBooleanBlockWith(seen, 1);
}

@Override
Expand Down
Expand Up @@ -8,8 +8,7 @@
package org.elasticsearch.compute.aggregation;

import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.ConstantBooleanVector;
import org.elasticsearch.compute.data.ConstantLongVector;
import org.elasticsearch.compute.operator.DriverContext;

/**
* Aggregator state for a single long.
Expand Down Expand Up @@ -45,10 +44,10 @@ void seen(boolean seen) {

/** Extracts an intermediate view of the contents of this state. */
@Override
public void toIntermediate(Block[] blocks, int offset) {
public void toIntermediate(Block[] blocks, int offset, DriverContext driverContext) {
assert blocks.length >= offset + 2;
blocks[offset + 0] = new ConstantLongVector(value, 1).asBlock();
blocks[offset + 1] = new ConstantBooleanVector(seen, 1).asBlock();
blocks[offset + 0] = driverContext.blockFactory().newConstantLongBlockWith(value, 1);
blocks[offset + 1] = driverContext.blockFactory().newConstantBooleanBlockWith(seen, 1);
}

@Override
Expand Down
Expand Up @@ -98,8 +98,8 @@ public void addIntermediateInput(Page page) {
}

@Override
public void evaluateIntermediate(Block[] blocks, int offset) {
state.toIntermediate(blocks, offset);
public void evaluateIntermediate(Block[] blocks, int offset, DriverContext driverContext) {
state.toIntermediate(blocks, offset, driverContext);
}

@Override
Expand Down
Expand Up @@ -106,8 +106,8 @@ public void addIntermediateInput(Page page) {
}

@Override
public void evaluateIntermediate(Block[] blocks, int offset) {
state.toIntermediate(blocks, offset);
public void evaluateIntermediate(Block[] blocks, int offset, DriverContext driverContext) {
state.toIntermediate(blocks, offset, driverContext);
}

@Override
Expand Down
Expand Up @@ -106,8 +106,8 @@ public void addIntermediateInput(Page page) {
}

@Override
public void evaluateIntermediate(Block[] blocks, int offset) {
state.toIntermediate(blocks, offset);
public void evaluateIntermediate(Block[] blocks, int offset, DriverContext driverContext) {
state.toIntermediate(blocks, offset, driverContext);
}

@Override
Expand Down
Expand Up @@ -106,8 +106,8 @@ public void addIntermediateInput(Page page) {
}

@Override
public void evaluateIntermediate(Block[] blocks, int offset) {
state.toIntermediate(blocks, offset);
public void evaluateIntermediate(Block[] blocks, int offset, DriverContext driverContext) {
state.toIntermediate(blocks, offset, driverContext);
}

@Override
Expand Down
Expand Up @@ -106,8 +106,8 @@ public void addIntermediateInput(Page page) {
}

@Override
public void evaluateIntermediate(Block[] blocks, int offset) {
state.toIntermediate(blocks, offset);
public void evaluateIntermediate(Block[] blocks, int offset, DriverContext driverContext) {
state.toIntermediate(blocks, offset, driverContext);
}

@Override
Expand Down
Expand Up @@ -105,8 +105,8 @@ public void addIntermediateInput(Page page) {
}

@Override
public void evaluateIntermediate(Block[] blocks, int offset) {
state.toIntermediate(blocks, offset);
public void evaluateIntermediate(Block[] blocks, int offset, DriverContext driverContext) {
state.toIntermediate(blocks, offset, driverContext);
}

@Override
Expand Down
Expand Up @@ -105,8 +105,8 @@ public void addIntermediateInput(Page page) {
}

@Override
public void evaluateIntermediate(Block[] blocks, int offset) {
state.toIntermediate(blocks, offset);
public void evaluateIntermediate(Block[] blocks, int offset, DriverContext driverContext) {
state.toIntermediate(blocks, offset, driverContext);
}

@Override
Expand Down
Expand Up @@ -105,8 +105,8 @@ public void addIntermediateInput(Page page) {
}

@Override
public void evaluateIntermediate(Block[] blocks, int offset) {
state.toIntermediate(blocks, offset);
public void evaluateIntermediate(Block[] blocks, int offset, DriverContext driverContext) {
state.toIntermediate(blocks, offset, driverContext);
}

@Override
Expand Down
Expand Up @@ -99,8 +99,8 @@ public void addIntermediateInput(Page page) {
}

@Override
public void evaluateIntermediate(Block[] blocks, int offset) {
state.toIntermediate(blocks, offset);
public void evaluateIntermediate(Block[] blocks, int offset, DriverContext driverContext) {
state.toIntermediate(blocks, offset, driverContext);
}

@Override
Expand Down
Expand Up @@ -99,8 +99,8 @@ public void addIntermediateInput(Page page) {
}

@Override
public void evaluateIntermediate(Block[] blocks, int offset) {
state.toIntermediate(blocks, offset);
public void evaluateIntermediate(Block[] blocks, int offset, DriverContext driverContext) {
state.toIntermediate(blocks, offset, driverContext);
}

@Override
Expand Down
Expand Up @@ -99,8 +99,8 @@ public void addIntermediateInput(Page page) {
}

@Override
public void evaluateIntermediate(Block[] blocks, int offset) {
state.toIntermediate(blocks, offset);
public void evaluateIntermediate(Block[] blocks, int offset, DriverContext driverContext) {
state.toIntermediate(blocks, offset, driverContext);
}

@Override
Expand Down
Expand Up @@ -105,8 +105,8 @@ public void addIntermediateInput(Page page) {
}

@Override
public void evaluateIntermediate(Block[] blocks, int offset) {
state.toIntermediate(blocks, offset);
public void evaluateIntermediate(Block[] blocks, int offset, DriverContext driverContext) {
state.toIntermediate(blocks, offset, driverContext);
}

@Override
Expand Down
Expand Up @@ -105,8 +105,8 @@ public void addIntermediateInput(Page page) {
}

@Override
public void evaluateIntermediate(Block[] blocks, int offset) {
state.toIntermediate(blocks, offset);
public void evaluateIntermediate(Block[] blocks, int offset, DriverContext driverContext) {
state.toIntermediate(blocks, offset, driverContext);
}

@Override
Expand Down
Expand Up @@ -105,8 +105,8 @@ public void addIntermediateInput(Page page) {
}

@Override
public void evaluateIntermediate(Block[] blocks, int offset) {
state.toIntermediate(blocks, offset);
public void evaluateIntermediate(Block[] blocks, int offset, DriverContext driverContext) {
state.toIntermediate(blocks, offset, driverContext);
}

@Override
Expand Down
Expand Up @@ -102,8 +102,8 @@ public void addIntermediateInput(Page page) {
}

@Override
public void evaluateIntermediate(Block[] blocks, int offset) {
state.toIntermediate(blocks, offset);
public void evaluateIntermediate(Block[] blocks, int offset, DriverContext driverContext) {
state.toIntermediate(blocks, offset, driverContext);
}

@Override
Expand Down
Expand Up @@ -102,8 +102,8 @@ public void addIntermediateInput(Page page) {
}

@Override
public void evaluateIntermediate(Block[] blocks, int offset) {
state.toIntermediate(blocks, offset);
public void evaluateIntermediate(Block[] blocks, int offset, DriverContext driverContext) {
state.toIntermediate(blocks, offset, driverContext);
}

@Override
Expand Down
Expand Up @@ -102,8 +102,8 @@ public void addIntermediateInput(Page page) {
}

@Override
public void evaluateIntermediate(Block[] blocks, int offset) {
state.toIntermediate(blocks, offset);
public void evaluateIntermediate(Block[] blocks, int offset, DriverContext driverContext) {
state.toIntermediate(blocks, offset, driverContext);
}

@Override
Expand Down
Expand Up @@ -104,8 +104,8 @@ public void addIntermediateInput(Page page) {
}

@Override
public void evaluateIntermediate(Block[] blocks, int offset) {
state.toIntermediate(blocks, offset);
public void evaluateIntermediate(Block[] blocks, int offset, DriverContext driverContext) {
state.toIntermediate(blocks, offset, driverContext);
}

@Override
Expand Down
Expand Up @@ -107,8 +107,8 @@ public void addIntermediateInput(Page page) {
}

@Override
public void evaluateIntermediate(Block[] blocks, int offset) {
state.toIntermediate(blocks, offset);
public void evaluateIntermediate(Block[] blocks, int offset, DriverContext driverContext) {
state.toIntermediate(blocks, offset, driverContext);
}

@Override
Expand Down
Expand Up @@ -105,8 +105,8 @@ public void addIntermediateInput(Page page) {
}

@Override
public void evaluateIntermediate(Block[] blocks, int offset) {
state.toIntermediate(blocks, offset);
public void evaluateIntermediate(Block[] blocks, int offset, DriverContext driverContext) {
state.toIntermediate(blocks, offset, driverContext);
}

@Override
Expand Down
Expand Up @@ -45,7 +45,7 @@ public void processPage(Page page) {

public void evaluate(Block[] blocks, int offset, DriverContext driverContext) {
if (mode.isOutputPartial()) {
aggregatorFunction.evaluateIntermediate(blocks, offset);
aggregatorFunction.evaluateIntermediate(blocks, offset, driverContext);
} else {
aggregatorFunction.evaluateFinal(blocks, offset, driverContext);
}
Expand Down
Expand Up @@ -18,7 +18,7 @@ public interface AggregatorFunction extends Releasable {

void addIntermediateInput(Page page);

void evaluateIntermediate(Block[] blocks, int offset);
void evaluateIntermediate(Block[] blocks, int offset, DriverContext driverContext);

void evaluateFinal(Block[] blocks, int offset, DriverContext driverContext);

Expand Down
Expand Up @@ -8,10 +8,11 @@
package org.elasticsearch.compute.aggregation;

import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.core.Releasable;

public interface AggregatorState extends Releasable {

/** Extracts an intermediate view of the contents of this state. */
void toIntermediate(Block[] blocks, int offset);
void toIntermediate(Block[] blocks, int offset, DriverContext driverContext);
}
Expand Up @@ -97,13 +97,13 @@ public void addIntermediateInput(Page page) {
}

@Override
public void evaluateIntermediate(Block[] blocks, int offset) {
state.toIntermediate(blocks, offset);
public void evaluateIntermediate(Block[] blocks, int offset, DriverContext driverContext) {
state.toIntermediate(blocks, offset, driverContext);
}

@Override
public void evaluateFinal(Block[] blocks, int offset, DriverContext driverContext) {
blocks[offset] = LongBlock.newConstantBlockWith(state.longValue(), 1, driverContext.blockFactory());
blocks[offset] = driverContext.blockFactory().newConstantLongBlockWith(state.longValue(), 1);
}

@Override
Expand Down
Expand Up @@ -86,10 +86,10 @@ static class SingleState implements AggregatorState {

/** Extracts an intermediate view of the contents of this state. */
@Override
public void toIntermediate(Block[] blocks, int offset) {
public void toIntermediate(Block[] blocks, int offset, DriverContext driverContext) {
assert blocks.length >= offset + 2;
blocks[offset + 0] = BooleanBlock.newConstantBlockWith((bits & BIT_FALSE) != 0, 1);
blocks[offset + 1] = BooleanBlock.newConstantBlockWith((bits & BIT_TRUE) != 0, 1);
blocks[offset + 0] = driverContext.blockFactory().newConstantBooleanBlockWith((bits & BIT_FALSE) != 0, 1);
blocks[offset + 1] = driverContext.blockFactory().newConstantBooleanBlockWith((bits & BIT_TRUE) != 0, 1);
}

@Override
Expand Down
Expand Up @@ -17,7 +17,6 @@
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BytesRefBlock;
import org.elasticsearch.compute.data.ConstantBytesRefVector;
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.core.Releasables;
Expand Down Expand Up @@ -118,9 +117,9 @@ void merge(int groupId, BytesRef other, int otherGroup) {

/** Extracts an intermediate view of the contents of this state. */
@Override
public void toIntermediate(Block[] blocks, int offset) {
public void toIntermediate(Block[] blocks, int offset, DriverContext driverContext) {
assert blocks.length >= offset + 1;
blocks[offset] = new ConstantBytesRefVector(serializeHLL(SINGLE_BUCKET_ORD, hll), 1).asBlock();
blocks[offset] = driverContext.blockFactory().newConstantBytesRefBlockWith(serializeHLL(SINGLE_BUCKET_ORD, hll), 1);
}

@Override
Expand Down

0 comments on commit e0df305

Please sign in to comment.