diff --git a/docs/changelog/138539.yaml b/docs/changelog/138539.yaml new file mode 100644 index 0000000000000..deebeb3bdf0ce --- /dev/null +++ b/docs/changelog/138539.yaml @@ -0,0 +1,5 @@ +pr: 138539 +summary: Handle serialization of null blocks in `AggregateMetricDoubleBlock` +area: ES|QL +type: bug +issues: [] diff --git a/server/src/main/resources/transport/definitions/referable/aggregate_metric_double_typed_block.csv b/server/src/main/resources/transport/definitions/referable/aggregate_metric_double_typed_block.csv new file mode 100644 index 0000000000000..fa1e1034a8011 --- /dev/null +++ b/server/src/main/resources/transport/definitions/referable/aggregate_metric_double_typed_block.csv @@ -0,0 +1 @@ +9227000,9185010 diff --git a/server/src/main/resources/transport/upper_bounds/9.2.csv b/server/src/main/resources/transport/upper_bounds/9.2.csv index 0b2d3c451ce12..5b4cb63943755 100644 --- a/server/src/main/resources/transport/upper_bounds/9.2.csv +++ b/server/src/main/resources/transport/upper_bounds/9.2.csv @@ -1 +1 @@ -index_created_transport_version,9185009 +aggregate_metric_double_typed_block,9185010 diff --git a/server/src/main/resources/transport/upper_bounds/9.3.csv b/server/src/main/resources/transport/upper_bounds/9.3.csv index cc71afa89aa4a..482517b9a61d6 100644 --- a/server/src/main/resources/transport/upper_bounds/9.3.csv +++ b/server/src/main/resources/transport/upper_bounds/9.3.csv @@ -1 +1 @@ -index_created_transport_version,9221000 +aggregate_metric_double_typed_block,9227000 diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateMetricDoubleArrayBlock.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateMetricDoubleArrayBlock.java index e957a7d124ccc..26381269cc2ff 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateMetricDoubleArrayBlock.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateMetricDoubleArrayBlock.java @@ -7,6 +7,7 @@ package org.elasticsearch.compute.data; +import org.elasticsearch.TransportVersion; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.unit.ByteSizeValue; @@ -19,6 +20,8 @@ import java.util.stream.Stream; public final class AggregateMetricDoubleArrayBlock extends AbstractNonThreadSafeRefCounted implements AggregateMetricDoubleBlock { + public static final TransportVersion WRITE_TYPED_BLOCK = TransportVersion.fromName("aggregate_metric_double_typed_block"); + private final DoubleBlock minBlock; private final DoubleBlock maxBlock; private final DoubleBlock sumBlock; @@ -236,7 +239,11 @@ public AggregateMetricDoubleBlock expand() { @Override public void writeTo(StreamOutput out) throws IOException { for (Block block : List.of(minBlock, maxBlock, sumBlock, countBlock)) { - block.writeTo(out); + if (out.getTransportVersion().supports(WRITE_TYPED_BLOCK)) { + Block.writeTypedBlock(block, out); + } else { + block.writeTo(out); + } } } @@ -248,10 +255,17 @@ public static Block readFrom(StreamInput in) throws IOException { IntBlock countBlock = null; BlockStreamInput blockStreamInput = (BlockStreamInput) in; try { - minBlock = DoubleBlock.readFrom(blockStreamInput); - maxBlock = DoubleBlock.readFrom(blockStreamInput); - sumBlock = DoubleBlock.readFrom(blockStreamInput); - countBlock = IntBlock.readFrom(blockStreamInput); + if (in.getTransportVersion().supports(WRITE_TYPED_BLOCK)) { + minBlock = (DoubleBlock) Block.readTypedBlock(blockStreamInput); + maxBlock = (DoubleBlock) Block.readTypedBlock(blockStreamInput); + sumBlock = (DoubleBlock) Block.readTypedBlock(blockStreamInput); + countBlock = (IntBlock) Block.readTypedBlock(blockStreamInput); + } else { + minBlock = DoubleBlock.readFrom(blockStreamInput); + maxBlock = DoubleBlock.readFrom(blockStreamInput); + sumBlock = DoubleBlock.readFrom(blockStreamInput); + countBlock = IntBlock.readFrom(blockStreamInput); + } AggregateMetricDoubleArrayBlock result = new AggregateMetricDoubleArrayBlock(minBlock, maxBlock, sumBlock, countBlock); success = true; return result; diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockSerializationTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockSerializationTests.java index c59714a589e24..748349eea6d9a 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockSerializationTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockSerializationTests.java @@ -440,6 +440,72 @@ public void testCompositeBlock() throws Exception { } } + public void testAggregateMetricDouble() throws IOException { + final int positionCount = randomIntBetween(1, 1000); + DoubleBlock minBlock = (DoubleBlock) RandomBlock.randomBlock( + blockFactory, + randomFrom(ElementType.DOUBLE, ElementType.NULL), + positionCount, + true, + 0, + 1, + 0, + 0 + ).block(); + + DoubleBlock maxBlock = (DoubleBlock) RandomBlock.randomBlock( + blockFactory, + randomFrom(ElementType.DOUBLE, ElementType.NULL), + positionCount, + true, + 0, + 1, + 0, + 0 + ).block(); + + DoubleBlock suBlock = (DoubleBlock) RandomBlock.randomBlock( + blockFactory, + randomFrom(ElementType.DOUBLE, ElementType.NULL), + positionCount, + true, + 0, + 1, + 0, + 0 + ).block(); + + IntBlock countBlock = (IntBlock) RandomBlock.randomBlock( + blockFactory, + randomFrom(ElementType.INT, ElementType.NULL), + positionCount, + true, + 0, + 1, + 0, + 0 + ).block(); + + try (var origBlock = new AggregateMetricDoubleArrayBlock(minBlock, maxBlock, suBlock, countBlock)) { + try ( + AggregateMetricDoubleBlock deserBlock = serializeDeserializeBlockWithVersion( + origBlock, + TransportVersionUtils.randomVersionBetween( + random(), + AggregateMetricDoubleArrayBlock.WRITE_TYPED_BLOCK, + TransportVersion.current() + ) + ) + ) { + assertThat(deserBlock.minBlock(), equalTo(minBlock)); + assertThat(deserBlock.minBlock(), equalTo(minBlock)); + assertThat(deserBlock.minBlock(), equalTo(minBlock)); + assertThat(deserBlock.minBlock(), equalTo(minBlock)); + EqualsHashCodeTestUtils.checkEqualsAndHashCode(deserBlock, unused -> deserBlock); + } + } + } + static BytesRef randomBytesRef() { return new BytesRef(randomAlphaOfLengthBetween(0, 10)); }