Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/138539.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 138539
summary: Handle serialization of null blocks in `AggregateMetricDoubleBlock`
area: ES|QL
type: bug
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
9227000,9185010
2 changes: 1 addition & 1 deletion server/src/main/resources/transport/upper_bounds/9.2.csv
Original file line number Diff line number Diff line change
@@ -1 +1 @@
index_created_transport_version,9185009
aggregate_metric_double_typed_block,9185010
2 changes: 1 addition & 1 deletion server/src/main/resources/transport/upper_bounds/9.3.csv
Original file line number Diff line number Diff line change
@@ -1 +1 @@
index_created_transport_version,9221000
aggregate_metric_double_typed_block,9227000
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.compute.data;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
Expand All @@ -19,6 +20,8 @@
import java.util.stream.Stream;

public final class AggregateMetricDoubleArrayBlock extends AbstractNonThreadSafeRefCounted implements AggregateMetricDoubleBlock {
public static final TransportVersion WRITE_TYPED_BLOCK = TransportVersion.fromName("aggregate_metric_double_typed_block");

private final DoubleBlock minBlock;
private final DoubleBlock maxBlock;
private final DoubleBlock sumBlock;
Expand Down Expand Up @@ -236,7 +239,11 @@ public AggregateMetricDoubleBlock expand() {
@Override
public void writeTo(StreamOutput out) throws IOException {
for (Block block : List.of(minBlock, maxBlock, sumBlock, countBlock)) {
block.writeTo(out);
if (out.getTransportVersion().supports(WRITE_TYPED_BLOCK)) {
Block.writeTypedBlock(block, out);
} else {
block.writeTo(out);
}
}
}

Expand All @@ -248,10 +255,17 @@ public static Block readFrom(StreamInput in) throws IOException {
IntBlock countBlock = null;
BlockStreamInput blockStreamInput = (BlockStreamInput) in;
try {
minBlock = DoubleBlock.readFrom(blockStreamInput);
maxBlock = DoubleBlock.readFrom(blockStreamInput);
sumBlock = DoubleBlock.readFrom(blockStreamInput);
countBlock = IntBlock.readFrom(blockStreamInput);
if (in.getTransportVersion().supports(WRITE_TYPED_BLOCK)) {
minBlock = (DoubleBlock) Block.readTypedBlock(blockStreamInput);
maxBlock = (DoubleBlock) Block.readTypedBlock(blockStreamInput);
sumBlock = (DoubleBlock) Block.readTypedBlock(blockStreamInput);
countBlock = (IntBlock) Block.readTypedBlock(blockStreamInput);
} else {
minBlock = DoubleBlock.readFrom(blockStreamInput);
maxBlock = DoubleBlock.readFrom(blockStreamInput);
sumBlock = DoubleBlock.readFrom(blockStreamInput);
countBlock = IntBlock.readFrom(blockStreamInput);
}
AggregateMetricDoubleArrayBlock result = new AggregateMetricDoubleArrayBlock(minBlock, maxBlock, sumBlock, countBlock);
success = true;
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,72 @@ public void testCompositeBlock() throws Exception {
}
}

public void testAggregateMetricDouble() throws IOException {
final int positionCount = randomIntBetween(1, 1000);
DoubleBlock minBlock = (DoubleBlock) RandomBlock.randomBlock(
blockFactory,
randomFrom(ElementType.DOUBLE, ElementType.NULL),
positionCount,
true,
0,
1,
0,
0
).block();

DoubleBlock maxBlock = (DoubleBlock) RandomBlock.randomBlock(
blockFactory,
randomFrom(ElementType.DOUBLE, ElementType.NULL),
positionCount,
true,
0,
1,
0,
0
).block();

DoubleBlock suBlock = (DoubleBlock) RandomBlock.randomBlock(
blockFactory,
randomFrom(ElementType.DOUBLE, ElementType.NULL),
positionCount,
true,
0,
1,
0,
0
).block();

IntBlock countBlock = (IntBlock) RandomBlock.randomBlock(
blockFactory,
randomFrom(ElementType.INT, ElementType.NULL),
positionCount,
true,
0,
1,
0,
0
).block();

try (var origBlock = new AggregateMetricDoubleArrayBlock(minBlock, maxBlock, suBlock, countBlock)) {
try (
AggregateMetricDoubleBlock deserBlock = serializeDeserializeBlockWithVersion(
origBlock,
TransportVersionUtils.randomVersionBetween(
random(),
AggregateMetricDoubleArrayBlock.WRITE_TYPED_BLOCK,
TransportVersion.current()
)
)
) {
assertThat(deserBlock.minBlock(), equalTo(minBlock));
assertThat(deserBlock.minBlock(), equalTo(minBlock));
assertThat(deserBlock.minBlock(), equalTo(minBlock));
assertThat(deserBlock.minBlock(), equalTo(minBlock));
EqualsHashCodeTestUtils.checkEqualsAndHashCode(deserBlock, unused -> deserBlock);
}
}
}

static BytesRef randomBytesRef() {
return new BytesRef(randomAlphaOfLengthBetween(0, 10));
}
Expand Down