Skip to content

Commit

Permalink
ESQL: shallow copy in EXPAND via shared vecs (#103681)
Browse files Browse the repository at this point in the history
Use shared vectors to avoid allocations when expanding.
  • Loading branch information
alex-spies committed Jan 3, 2024
1 parent ed7bbf7 commit 8a704e7
Show file tree
Hide file tree
Showing 21 changed files with 554 additions and 232 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/103681.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 103681
summary: "ESQL: Expand shallow copy with vecs"
area: ES|QL
type: enhancement
issues:
- 100528
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,27 @@ final class BooleanArrayBlock extends AbstractArrayBlock implements BooleanBlock
BitSet nulls,
MvOrdering mvOrdering,
BlockFactory blockFactory
) {
this(
new BooleanArrayVector(values, firstValueIndexes == null ? positionCount : firstValueIndexes[positionCount], blockFactory),
positionCount,
firstValueIndexes,
nulls,
mvOrdering,
blockFactory
);
}

private BooleanArrayBlock(
BooleanArrayVector vector,
int positionCount,
int[] firstValueIndexes,
BitSet nulls,
MvOrdering mvOrdering,
BlockFactory blockFactory
) {
super(positionCount, firstValueIndexes, nulls, mvOrdering, blockFactory);
this.vector = new BooleanArrayVector(values, values.length, blockFactory);
this.vector = vector;
}

@Override
Expand All @@ -46,6 +64,7 @@ public boolean getBoolean(int valueIndex) {

@Override
public BooleanBlock filter(int... positions) {
// TODO use reference counting to share the vector
try (var builder = blockFactory().newBooleanBlockBuilder(positions.length)) {
for (int pos : positions) {
if (isNull(pos)) {
Expand Down Expand Up @@ -79,21 +98,28 @@ public BooleanBlock expand() {
incRef();
return this;
}
// TODO use reference counting to share the vector
try (var builder = blockFactory().newBooleanBlockBuilder(firstValueIndexes[getPositionCount()])) {
for (int pos = 0; pos < getPositionCount(); pos++) {
if (isNull(pos)) {
builder.appendNull();
continue;
}
int first = getFirstValueIndex(pos);
int end = first + getValueCount(pos);
for (int i = first; i < end; i++) {
builder.appendBoolean(getBoolean(i));
}
}
return builder.mvOrdering(MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING).build();
if (nullsMask == null) {
vector.incRef();
return vector.asBlock();
}

// The following line is correct because positions with multi-values are never null.
int expandedPositionCount = vector.getPositionCount();
long bitSetRamUsedEstimate = BlockRamUsageEstimator.sizeOfBitSet(expandedPositionCount);
blockFactory().adjustBreaker(bitSetRamUsedEstimate, false);

BooleanArrayBlock expanded = new BooleanArrayBlock(
vector,
expandedPositionCount,
null,
shiftNullsToExpandedPositions(),
MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING,
blockFactory()
);
blockFactory().adjustBreaker(expanded.ramBytesUsedOnlyBlock() - bitSetRamUsedEstimate, true);
// We need to incRef after adjusting any breakers, otherwise we might leak the vector if the breaker trips.
vector.incRef();
return expanded;
}

private long ramBytesUsedOnlyBlock() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,27 @@ public BooleanBigArrayBlock(
BitSet nulls,
MvOrdering mvOrdering,
BlockFactory blockFactory
) {
this(
new BooleanBigArrayVector(values, firstValueIndexes == null ? positionCount : firstValueIndexes[positionCount], blockFactory),
positionCount,
firstValueIndexes,
nulls,
mvOrdering,
blockFactory
);
}

private BooleanBigArrayBlock(
BooleanBigArrayVector vector,
int positionCount,
int[] firstValueIndexes,
BitSet nulls,
MvOrdering mvOrdering,
BlockFactory blockFactory
) {
super(positionCount, firstValueIndexes, nulls, mvOrdering, blockFactory);
this.vector = new BooleanBigArrayVector(values, (int) values.size(), blockFactory);
this.vector = vector;
}

@Override
Expand All @@ -47,6 +65,7 @@ public boolean getBoolean(int valueIndex) {

@Override
public BooleanBlock filter(int... positions) {
// TODO use reference counting to share the vector
try (var builder = blockFactory().newBooleanBlockBuilder(positions.length)) {
for (int pos : positions) {
if (isNull(pos)) {
Expand Down Expand Up @@ -80,21 +99,28 @@ public BooleanBlock expand() {
incRef();
return this;
}
// TODO use reference counting to share the vector
try (var builder = blockFactory().newBooleanBlockBuilder(firstValueIndexes[getPositionCount()])) {
for (int pos = 0; pos < getPositionCount(); pos++) {
if (isNull(pos)) {
builder.appendNull();
continue;
}
int first = getFirstValueIndex(pos);
int end = first + getValueCount(pos);
for (int i = first; i < end; i++) {
builder.appendBoolean(getBoolean(i));
}
}
return builder.mvOrdering(MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING).build();
if (nullsMask == null) {
vector.incRef();
return vector.asBlock();
}

// The following line is correct because positions with multi-values are never null.
int expandedPositionCount = vector.getPositionCount();
long bitSetRamUsedEstimate = BlockRamUsageEstimator.sizeOfBitSet(expandedPositionCount);
blockFactory().adjustBreaker(bitSetRamUsedEstimate, false);

BooleanBigArrayBlock expanded = new BooleanBigArrayBlock(
vector,
expandedPositionCount,
null,
shiftNullsToExpandedPositions(),
MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING,
blockFactory()
);
blockFactory().adjustBreaker(expanded.ramBytesUsedOnlyBlock() - bitSetRamUsedEstimate, true);
// We need to incRef after adjusting any breakers, otherwise we might leak the vector if the breaker trips.
vector.incRef();
return expanded;
}

private long ramBytesUsedOnlyBlock() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,27 @@ final class BytesRefArrayBlock extends AbstractArrayBlock implements BytesRefBlo
BitSet nulls,
MvOrdering mvOrdering,
BlockFactory blockFactory
) {
this(
new BytesRefArrayVector(values, firstValueIndexes == null ? positionCount : firstValueIndexes[positionCount], blockFactory),
positionCount,
firstValueIndexes,
nulls,
mvOrdering,
blockFactory
);
}

private BytesRefArrayBlock(
BytesRefArrayVector vector,
int positionCount,
int[] firstValueIndexes,
BitSet nulls,
MvOrdering mvOrdering,
BlockFactory blockFactory
) {
super(positionCount, firstValueIndexes, nulls, mvOrdering, blockFactory);
this.vector = new BytesRefArrayVector(values, (int) values.size(), blockFactory);
this.vector = vector;
}

@Override
Expand All @@ -49,6 +67,7 @@ public BytesRef getBytesRef(int valueIndex, BytesRef dest) {

@Override
public BytesRefBlock filter(int... positions) {
// TODO use reference counting to share the vector
final BytesRef scratch = new BytesRef();
try (var builder = blockFactory().newBytesRefBlockBuilder(positions.length)) {
for (int pos : positions) {
Expand Down Expand Up @@ -83,22 +102,28 @@ public BytesRefBlock expand() {
incRef();
return this;
}
// TODO use reference counting to share the vector
final BytesRef scratch = new BytesRef();
try (var builder = blockFactory().newBytesRefBlockBuilder(firstValueIndexes[getPositionCount()])) {
for (int pos = 0; pos < getPositionCount(); pos++) {
if (isNull(pos)) {
builder.appendNull();
continue;
}
int first = getFirstValueIndex(pos);
int end = first + getValueCount(pos);
for (int i = first; i < end; i++) {
builder.appendBytesRef(getBytesRef(i, scratch));
}
}
return builder.mvOrdering(MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING).build();
if (nullsMask == null) {
vector.incRef();
return vector.asBlock();
}

// The following line is correct because positions with multi-values are never null.
int expandedPositionCount = vector.getPositionCount();
long bitSetRamUsedEstimate = BlockRamUsageEstimator.sizeOfBitSet(expandedPositionCount);
blockFactory().adjustBreaker(bitSetRamUsedEstimate, false);

BytesRefArrayBlock expanded = new BytesRefArrayBlock(
vector,
expandedPositionCount,
null,
shiftNullsToExpandedPositions(),
MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING,
blockFactory()
);
blockFactory().adjustBreaker(expanded.ramBytesUsedOnlyBlock() - bitSetRamUsedEstimate, true);
// We need to incRef after adjusting any breakers, otherwise we might leak the vector if the breaker trips.
vector.incRef();
return expanded;
}

private long ramBytesUsedOnlyBlock() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,27 @@ final class DoubleArrayBlock extends AbstractArrayBlock implements DoubleBlock {
BitSet nulls,
MvOrdering mvOrdering,
BlockFactory blockFactory
) {
this(
new DoubleArrayVector(values, firstValueIndexes == null ? positionCount : firstValueIndexes[positionCount], blockFactory),
positionCount,
firstValueIndexes,
nulls,
mvOrdering,
blockFactory
);
}

private DoubleArrayBlock(
DoubleArrayVector vector,
int positionCount,
int[] firstValueIndexes,
BitSet nulls,
MvOrdering mvOrdering,
BlockFactory blockFactory
) {
super(positionCount, firstValueIndexes, nulls, mvOrdering, blockFactory);
this.vector = new DoubleArrayVector(values, values.length, blockFactory);
this.vector = vector;
}

@Override
Expand All @@ -46,6 +64,7 @@ public double getDouble(int valueIndex) {

@Override
public DoubleBlock filter(int... positions) {
// TODO use reference counting to share the vector
try (var builder = blockFactory().newDoubleBlockBuilder(positions.length)) {
for (int pos : positions) {
if (isNull(pos)) {
Expand Down Expand Up @@ -79,21 +98,28 @@ public DoubleBlock expand() {
incRef();
return this;
}
// TODO use reference counting to share the vector
try (var builder = blockFactory().newDoubleBlockBuilder(firstValueIndexes[getPositionCount()])) {
for (int pos = 0; pos < getPositionCount(); pos++) {
if (isNull(pos)) {
builder.appendNull();
continue;
}
int first = getFirstValueIndex(pos);
int end = first + getValueCount(pos);
for (int i = first; i < end; i++) {
builder.appendDouble(getDouble(i));
}
}
return builder.mvOrdering(MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING).build();
if (nullsMask == null) {
vector.incRef();
return vector.asBlock();
}

// The following line is correct because positions with multi-values are never null.
int expandedPositionCount = vector.getPositionCount();
long bitSetRamUsedEstimate = BlockRamUsageEstimator.sizeOfBitSet(expandedPositionCount);
blockFactory().adjustBreaker(bitSetRamUsedEstimate, false);

DoubleArrayBlock expanded = new DoubleArrayBlock(
vector,
expandedPositionCount,
null,
shiftNullsToExpandedPositions(),
MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING,
blockFactory()
);
blockFactory().adjustBreaker(expanded.ramBytesUsedOnlyBlock() - bitSetRamUsedEstimate, true);
// We need to incRef after adjusting any breakers, otherwise we might leak the vector if the breaker trips.
vector.incRef();
return expanded;
}

private long ramBytesUsedOnlyBlock() {
Expand Down

0 comments on commit 8a704e7

Please sign in to comment.