From ebaa24b44753e67f3ac57ff065845dd13d543907 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 24 Nov 2025 15:46:31 -0800 Subject: [PATCH] Handle serialization of null blocks in AggregateMetricDoubleBlock (#138539) If a sub-block of an aggregate_metric_double block is a ConstantNullBlock, serialization fails because DoubleBlock.readFrom or IntBlock.readFrom are not compatible with ConstantNullBlock.writeTo. --- docs/changelog/138539.yaml | 5 ++ .../aggregate_metric_double_typed_block.csv | 1 + .../resources/transport/upper_bounds/9.2.csv | 2 +- .../resources/transport/upper_bounds/9.3.csv | 2 +- .../data/AggregateMetricDoubleArrayBlock.java | 24 +++++-- .../compute/data/BlockSerializationTests.java | 66 +++++++++++++++++++ 6 files changed, 93 insertions(+), 7 deletions(-) create mode 100644 docs/changelog/138539.yaml create mode 100644 server/src/main/resources/transport/definitions/referable/aggregate_metric_double_typed_block.csv diff --git a/docs/changelog/138539.yaml b/docs/changelog/138539.yaml new file mode 100644 index 0000000000000..deebeb3bdf0ce --- /dev/null +++ b/docs/changelog/138539.yaml @@ -0,0 +1,5 @@ +pr: 138539 +summary: Handle serialization of null blocks in `AggregateMetricDoubleBlock` +area: ES|QL +type: bug +issues: [] diff --git a/server/src/main/resources/transport/definitions/referable/aggregate_metric_double_typed_block.csv b/server/src/main/resources/transport/definitions/referable/aggregate_metric_double_typed_block.csv new file mode 100644 index 0000000000000..fa1e1034a8011 --- /dev/null +++ b/server/src/main/resources/transport/definitions/referable/aggregate_metric_double_typed_block.csv @@ -0,0 +1 @@ +9227000,9185010 diff --git a/server/src/main/resources/transport/upper_bounds/9.2.csv b/server/src/main/resources/transport/upper_bounds/9.2.csv index 0b2d3c451ce12..5b4cb63943755 100644 --- a/server/src/main/resources/transport/upper_bounds/9.2.csv +++ b/server/src/main/resources/transport/upper_bounds/9.2.csv @@ -1 +1 @@ -index_created_transport_version,9185009 +aggregate_metric_double_typed_block,9185010 diff --git a/server/src/main/resources/transport/upper_bounds/9.3.csv b/server/src/main/resources/transport/upper_bounds/9.3.csv index cc71afa89aa4a..482517b9a61d6 100644 --- a/server/src/main/resources/transport/upper_bounds/9.3.csv +++ b/server/src/main/resources/transport/upper_bounds/9.3.csv @@ -1 +1 @@ -index_created_transport_version,9221000 +aggregate_metric_double_typed_block,9227000 diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateMetricDoubleArrayBlock.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateMetricDoubleArrayBlock.java index e957a7d124ccc..26381269cc2ff 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateMetricDoubleArrayBlock.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateMetricDoubleArrayBlock.java @@ -7,6 +7,7 @@ package org.elasticsearch.compute.data; +import org.elasticsearch.TransportVersion; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.unit.ByteSizeValue; @@ -19,6 +20,8 @@ import java.util.stream.Stream; public final class AggregateMetricDoubleArrayBlock extends AbstractNonThreadSafeRefCounted implements AggregateMetricDoubleBlock { + public static final TransportVersion WRITE_TYPED_BLOCK = TransportVersion.fromName("aggregate_metric_double_typed_block"); + private final DoubleBlock minBlock; private final DoubleBlock maxBlock; private final DoubleBlock sumBlock; @@ -236,7 +239,11 @@ public AggregateMetricDoubleBlock expand() { @Override public void writeTo(StreamOutput out) throws IOException { for (Block block : List.of(minBlock, maxBlock, sumBlock, countBlock)) { - block.writeTo(out); + if (out.getTransportVersion().supports(WRITE_TYPED_BLOCK)) { + Block.writeTypedBlock(block, out); + } else { + block.writeTo(out); + } } } @@ -248,10 +255,17 @@ public static Block readFrom(StreamInput in) throws IOException { IntBlock countBlock = null; BlockStreamInput blockStreamInput = (BlockStreamInput) in; try { - minBlock = DoubleBlock.readFrom(blockStreamInput); - maxBlock = DoubleBlock.readFrom(blockStreamInput); - sumBlock = DoubleBlock.readFrom(blockStreamInput); - countBlock = IntBlock.readFrom(blockStreamInput); + if (in.getTransportVersion().supports(WRITE_TYPED_BLOCK)) { + minBlock = (DoubleBlock) Block.readTypedBlock(blockStreamInput); + maxBlock = (DoubleBlock) Block.readTypedBlock(blockStreamInput); + sumBlock = (DoubleBlock) Block.readTypedBlock(blockStreamInput); + countBlock = (IntBlock) Block.readTypedBlock(blockStreamInput); + } else { + minBlock = DoubleBlock.readFrom(blockStreamInput); + maxBlock = DoubleBlock.readFrom(blockStreamInput); + sumBlock = DoubleBlock.readFrom(blockStreamInput); + countBlock = IntBlock.readFrom(blockStreamInput); + } AggregateMetricDoubleArrayBlock result = new AggregateMetricDoubleArrayBlock(minBlock, maxBlock, sumBlock, countBlock); success = true; return result; diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockSerializationTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockSerializationTests.java index c59714a589e24..748349eea6d9a 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockSerializationTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockSerializationTests.java @@ -440,6 +440,72 @@ public void testCompositeBlock() throws Exception { } } + public void testAggregateMetricDouble() throws IOException { + final int positionCount = randomIntBetween(1, 1000); + DoubleBlock minBlock = (DoubleBlock) RandomBlock.randomBlock( + blockFactory, + randomFrom(ElementType.DOUBLE, ElementType.NULL), + positionCount, + true, + 0, + 1, + 0, + 0 + ).block(); + + DoubleBlock maxBlock = (DoubleBlock) RandomBlock.randomBlock( + blockFactory, + randomFrom(ElementType.DOUBLE, ElementType.NULL), + positionCount, + true, + 0, + 1, + 0, + 0 + ).block(); + + DoubleBlock suBlock = (DoubleBlock) RandomBlock.randomBlock( + blockFactory, + randomFrom(ElementType.DOUBLE, ElementType.NULL), + positionCount, + true, + 0, + 1, + 0, + 0 + ).block(); + + IntBlock countBlock = (IntBlock) RandomBlock.randomBlock( + blockFactory, + randomFrom(ElementType.INT, ElementType.NULL), + positionCount, + true, + 0, + 1, + 0, + 0 + ).block(); + + try (var origBlock = new AggregateMetricDoubleArrayBlock(minBlock, maxBlock, suBlock, countBlock)) { + try ( + AggregateMetricDoubleBlock deserBlock = serializeDeserializeBlockWithVersion( + origBlock, + TransportVersionUtils.randomVersionBetween( + random(), + AggregateMetricDoubleArrayBlock.WRITE_TYPED_BLOCK, + TransportVersion.current() + ) + ) + ) { + assertThat(deserBlock.minBlock(), equalTo(minBlock)); + assertThat(deserBlock.minBlock(), equalTo(minBlock)); + assertThat(deserBlock.minBlock(), equalTo(minBlock)); + assertThat(deserBlock.minBlock(), equalTo(minBlock)); + EqualsHashCodeTestUtils.checkEqualsAndHashCode(deserBlock, unused -> deserBlock); + } + } + } + static BytesRef randomBytesRef() { return new BytesRef(randomAlphaOfLengthBetween(0, 10)); }