From 7a872cfca7619350aafb5e6b72b7ed00f429d1ac Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Iv=C3=A1n=20Cea=20Fontenla?=
Date: Mon, 5 Aug 2024 15:09:19 +0200
Subject: [PATCH 01/31] Remove unused empty constructor
---
.../elasticsearch/compute/aggregation/BooleanState.java | 4 ----
.../elasticsearch/compute/aggregation/DoubleState.java | 4 ----
.../org/elasticsearch/compute/aggregation/FloatState.java | 4 ----
.../org/elasticsearch/compute/aggregation/IntState.java | 4 ----
.../org/elasticsearch/compute/aggregation/LongState.java | 4 ----
.../compute/aggregation/CountAggregatorFunction.java | 2 +-
.../org/elasticsearch/compute/aggregation/X-State.java.st | 8 --------
7 files changed, 1 insertion(+), 29 deletions(-)
diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/BooleanState.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/BooleanState.java
index 7d225c7c06a72..ba4d133dee553 100644
--- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/BooleanState.java
+++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/BooleanState.java
@@ -18,10 +18,6 @@ final class BooleanState implements AggregatorState {
private boolean value;
private boolean seen;
- BooleanState() {
- this(false);
- }
-
BooleanState(boolean init) {
this.value = init;
}
diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/DoubleState.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/DoubleState.java
index f1c92c685bcab..90ecc2c1d3c03 100644
--- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/DoubleState.java
+++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/DoubleState.java
@@ -18,10 +18,6 @@ final class DoubleState implements AggregatorState {
private double value;
private boolean seen;
- DoubleState() {
- this(0);
- }
-
DoubleState(double init) {
this.value = init;
}
diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FloatState.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FloatState.java
index 81bdd39e51b6e..6f608271b6e42 100644
--- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FloatState.java
+++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FloatState.java
@@ -18,10 +18,6 @@ final class FloatState implements AggregatorState {
private float value;
private boolean seen;
- FloatState() {
- this(0);
- }
-
FloatState(float init) {
this.value = init;
}
diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/IntState.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/IntState.java
index e7db40eccf9c8..c539c576ef36d 100644
--- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/IntState.java
+++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/IntState.java
@@ -18,10 +18,6 @@ final class IntState implements AggregatorState {
private int value;
private boolean seen;
- IntState() {
- this(0);
- }
-
IntState(int init) {
this.value = init;
}
diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LongState.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LongState.java
index da78b649782d5..e9d97dcfe7fc1 100644
--- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LongState.java
+++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LongState.java
@@ -18,10 +18,6 @@ final class LongState implements AggregatorState {
private long value;
private boolean seen;
- LongState() {
- this(0);
- }
-
LongState(long init) {
this.value = init;
}
diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/CountAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/CountAggregatorFunction.java
index 13a4204edfd8f..c32f6f4703a79 100644
--- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/CountAggregatorFunction.java
+++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/CountAggregatorFunction.java
@@ -52,7 +52,7 @@ public static List intermediateStateDesc() {
private final boolean countAll;
public static CountAggregatorFunction create(List inputChannels) {
- return new CountAggregatorFunction(inputChannels, new LongState());
+ return new CountAggregatorFunction(inputChannels, new LongState(0));
}
private CountAggregatorFunction(List channels, LongState state) {
diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-State.java.st b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-State.java.st
index 2d2d706c9454f..7e0949c86faaa 100644
--- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-State.java.st
+++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-State.java.st
@@ -18,14 +18,6 @@ final class $Type$State implements AggregatorState {
private $type$ value;
private boolean seen;
- $Type$State() {
-$if(boolean)$
- this(false);
-$else$
- this(0);
-$endif$
- }
-
$Type$State($type$ init) {
this.value = init;
}
From 87db6acca4620a9cfe21b6c2193cf6804d0b425f Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Iv=C3=A1n=20Cea=20Fontenla?=
Date: Mon, 5 Aug 2024 15:18:15 +0200
Subject: [PATCH 02/31] Added fallible single states
---
x-pack/plugin/esql/compute/build.gradle | 26 ++++++++
.../aggregation/BooleanFallibleState.java | 62 +++++++++++++++++++
.../aggregation/DoubleFallibleState.java | 62 +++++++++++++++++++
.../aggregation/FloatFallibleState.java | 62 +++++++++++++++++++
.../compute/aggregation/IntFallibleState.java | 62 +++++++++++++++++++
.../aggregation/LongFallibleState.java | 62 +++++++++++++++++++
.../aggregation/X-FallibleState.java.st | 62 +++++++++++++++++++
7 files changed, 398 insertions(+)
create mode 100644 x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/BooleanFallibleState.java
create mode 100644 x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/DoubleFallibleState.java
create mode 100644 x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FloatFallibleState.java
create mode 100644 x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/IntFallibleState.java
create mode 100644 x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LongFallibleState.java
create mode 100644 x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-FallibleState.java.st
diff --git a/x-pack/plugin/esql/compute/build.gradle b/x-pack/plugin/esql/compute/build.gradle
index d31a7e629003e..d4795ce8d129c 100644
--- a/x-pack/plugin/esql/compute/build.gradle
+++ b/x-pack/plugin/esql/compute/build.gradle
@@ -433,6 +433,32 @@ tasks.named('stringTemplates').configure {
it.inputFile = stateInputFile
it.outputFile = "org/elasticsearch/compute/aggregation/DoubleState.java"
}
+ File fallibleStateInputFile = new File("${projectDir}/src/main/java/org/elasticsearch/compute/aggregation/X-FallibleState.java.st")
+ template {
+ it.properties = booleanProperties
+ it.inputFile = fallibleStateInputFile
+ it.outputFile = "org/elasticsearch/compute/aggregation/BooleanFallibleState.java"
+ }
+ template {
+ it.properties = intProperties
+ it.inputFile = fallibleStateInputFile
+ it.outputFile = "org/elasticsearch/compute/aggregation/IntFallibleState.java"
+ }
+ template {
+ it.properties = longProperties
+ it.inputFile = fallibleStateInputFile
+ it.outputFile = "org/elasticsearch/compute/aggregation/LongFallibleState.java"
+ }
+ template {
+ it.properties = floatProperties
+ it.inputFile = fallibleStateInputFile
+ it.outputFile = "org/elasticsearch/compute/aggregation/FloatFallibleState.java"
+ }
+ template {
+ it.properties = doubleProperties
+ it.inputFile = fallibleStateInputFile
+ it.outputFile = "org/elasticsearch/compute/aggregation/DoubleFallibleState.java"
+ }
// block lookups
File lookupInputFile = new File("${projectDir}/src/main/java/org/elasticsearch/compute/data/X-Lookup.java.st")
template {
diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/BooleanFallibleState.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/BooleanFallibleState.java
new file mode 100644
index 0000000000000..073f31c390a6f
--- /dev/null
+++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/BooleanFallibleState.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.aggregation;
+
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.operator.DriverContext;
+
+/**
+ * Aggregator state for a single boolean.
+ * It stores a third boolean to store if the aggregation failed.
+ * This class is generated. Do not edit it.
+ */
+final class BooleanFallibleState implements AggregatorState {
+ private boolean value;
+ private boolean seen;
+ private boolean failed;
+
+ BooleanFallibleState(boolean init) {
+ this.value = init;
+ }
+
+ boolean booleanValue() {
+ return value;
+ }
+
+ void booleanValue(boolean value) {
+ this.value = value;
+ }
+
+ boolean seen() {
+ return seen;
+ }
+
+ void seen(boolean seen) {
+ this.seen = seen;
+ }
+
+ boolean failed() {
+ return failed;
+ }
+
+ void failed(boolean failed) {
+ this.failed = failed;
+ }
+
+ /** Extracts an intermediate view of the contents of this state. */
+ @Override
+ public void toIntermediate(Block[] blocks, int offset, DriverContext driverContext) {
+ assert blocks.length >= offset + 3;
+ blocks[offset + 0] = driverContext.blockFactory().newConstantBooleanBlockWith(value, 1);
+ blocks[offset + 1] = driverContext.blockFactory().newConstantBooleanBlockWith(seen, 1);
+ blocks[offset + 2] = driverContext.blockFactory().newConstantBooleanBlockWith(failed, 1);
+ }
+
+ @Override
+ public void close() {}
+}
diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/DoubleFallibleState.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/DoubleFallibleState.java
new file mode 100644
index 0000000000000..4cdeddec724bf
--- /dev/null
+++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/DoubleFallibleState.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.aggregation;
+
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.operator.DriverContext;
+
+/**
+ * Aggregator state for a single double.
+ * It stores a third boolean to store if the aggregation failed.
+ * This class is generated. Do not edit it.
+ */
+final class DoubleFallibleState implements AggregatorState {
+ private double value;
+ private boolean seen;
+ private boolean failed;
+
+ DoubleFallibleState(double init) {
+ this.value = init;
+ }
+
+ double doubleValue() {
+ return value;
+ }
+
+ void doubleValue(double value) {
+ this.value = value;
+ }
+
+ boolean seen() {
+ return seen;
+ }
+
+ void seen(boolean seen) {
+ this.seen = seen;
+ }
+
+ boolean failed() {
+ return failed;
+ }
+
+ void failed(boolean failed) {
+ this.failed = failed;
+ }
+
+ /** Extracts an intermediate view of the contents of this state. */
+ @Override
+ public void toIntermediate(Block[] blocks, int offset, DriverContext driverContext) {
+ assert blocks.length >= offset + 3;
+ blocks[offset + 0] = driverContext.blockFactory().newConstantDoubleBlockWith(value, 1);
+ blocks[offset + 1] = driverContext.blockFactory().newConstantBooleanBlockWith(seen, 1);
+ blocks[offset + 2] = driverContext.blockFactory().newConstantBooleanBlockWith(failed, 1);
+ }
+
+ @Override
+ public void close() {}
+}
diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FloatFallibleState.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FloatFallibleState.java
new file mode 100644
index 0000000000000..b050c86258dcd
--- /dev/null
+++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FloatFallibleState.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.aggregation;
+
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.operator.DriverContext;
+
+/**
+ * Aggregator state for a single float.
+ * It stores a third boolean to store if the aggregation failed.
+ * This class is generated. Do not edit it.
+ */
+final class FloatFallibleState implements AggregatorState {
+ private float value;
+ private boolean seen;
+ private boolean failed;
+
+ FloatFallibleState(float init) {
+ this.value = init;
+ }
+
+ float floatValue() {
+ return value;
+ }
+
+ void floatValue(float value) {
+ this.value = value;
+ }
+
+ boolean seen() {
+ return seen;
+ }
+
+ void seen(boolean seen) {
+ this.seen = seen;
+ }
+
+ boolean failed() {
+ return failed;
+ }
+
+ void failed(boolean failed) {
+ this.failed = failed;
+ }
+
+ /** Extracts an intermediate view of the contents of this state. */
+ @Override
+ public void toIntermediate(Block[] blocks, int offset, DriverContext driverContext) {
+ assert blocks.length >= offset + 3;
+ blocks[offset + 0] = driverContext.blockFactory().newConstantFloatBlockWith(value, 1);
+ blocks[offset + 1] = driverContext.blockFactory().newConstantBooleanBlockWith(seen, 1);
+ blocks[offset + 2] = driverContext.blockFactory().newConstantBooleanBlockWith(failed, 1);
+ }
+
+ @Override
+ public void close() {}
+}
diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/IntFallibleState.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/IntFallibleState.java
new file mode 100644
index 0000000000000..360f3fdb009e4
--- /dev/null
+++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/IntFallibleState.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.aggregation;
+
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.operator.DriverContext;
+
+/**
+ * Aggregator state for a single int.
+ * It stores a third boolean to store if the aggregation failed.
+ * This class is generated. Do not edit it.
+ */
+final class IntFallibleState implements AggregatorState {
+ private int value;
+ private boolean seen;
+ private boolean failed;
+
+ IntFallibleState(int init) {
+ this.value = init;
+ }
+
+ int intValue() {
+ return value;
+ }
+
+ void intValue(int value) {
+ this.value = value;
+ }
+
+ boolean seen() {
+ return seen;
+ }
+
+ void seen(boolean seen) {
+ this.seen = seen;
+ }
+
+ boolean failed() {
+ return failed;
+ }
+
+ void failed(boolean failed) {
+ this.failed = failed;
+ }
+
+ /** Extracts an intermediate view of the contents of this state. */
+ @Override
+ public void toIntermediate(Block[] blocks, int offset, DriverContext driverContext) {
+ assert blocks.length >= offset + 3;
+ blocks[offset + 0] = driverContext.blockFactory().newConstantIntBlockWith(value, 1);
+ blocks[offset + 1] = driverContext.blockFactory().newConstantBooleanBlockWith(seen, 1);
+ blocks[offset + 2] = driverContext.blockFactory().newConstantBooleanBlockWith(failed, 1);
+ }
+
+ @Override
+ public void close() {}
+}
diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LongFallibleState.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LongFallibleState.java
new file mode 100644
index 0000000000000..98669ef627d04
--- /dev/null
+++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LongFallibleState.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.aggregation;
+
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.operator.DriverContext;
+
+/**
+ * Aggregator state for a single long.
+ * It stores a third boolean to store if the aggregation failed.
+ * This class is generated. Do not edit it.
+ */
+final class LongFallibleState implements AggregatorState {
+ private long value;
+ private boolean seen;
+ private boolean failed;
+
+ LongFallibleState(long init) {
+ this.value = init;
+ }
+
+ long longValue() {
+ return value;
+ }
+
+ void longValue(long value) {
+ this.value = value;
+ }
+
+ boolean seen() {
+ return seen;
+ }
+
+ void seen(boolean seen) {
+ this.seen = seen;
+ }
+
+ boolean failed() {
+ return failed;
+ }
+
+ void failed(boolean failed) {
+ this.failed = failed;
+ }
+
+ /** Extracts an intermediate view of the contents of this state. */
+ @Override
+ public void toIntermediate(Block[] blocks, int offset, DriverContext driverContext) {
+ assert blocks.length >= offset + 3;
+ blocks[offset + 0] = driverContext.blockFactory().newConstantLongBlockWith(value, 1);
+ blocks[offset + 1] = driverContext.blockFactory().newConstantBooleanBlockWith(seen, 1);
+ blocks[offset + 2] = driverContext.blockFactory().newConstantBooleanBlockWith(failed, 1);
+ }
+
+ @Override
+ public void close() {}
+}
diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-FallibleState.java.st b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-FallibleState.java.st
new file mode 100644
index 0000000000000..27609383e4f61
--- /dev/null
+++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-FallibleState.java.st
@@ -0,0 +1,62 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.aggregation;
+
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.operator.DriverContext;
+
+/**
+ * Aggregator state for a single $type$.
+ * It stores a third boolean to store if the aggregation failed.
+ * This class is generated. Do not edit it.
+ */
+final class $Type$FallibleState implements AggregatorState {
+ private $type$ value;
+ private boolean seen;
+ private boolean failed;
+
+ $Type$FallibleState($type$ init) {
+ this.value = init;
+ }
+
+ $type$ $type$Value() {
+ return value;
+ }
+
+ void $type$Value($type$ value) {
+ this.value = value;
+ }
+
+ boolean seen() {
+ return seen;
+ }
+
+ void seen(boolean seen) {
+ this.seen = seen;
+ }
+
+ boolean failed() {
+ return failed;
+ }
+
+ void failed(boolean failed) {
+ this.failed = failed;
+ }
+
+ /** Extracts an intermediate view of the contents of this state. */
+ @Override
+ public void toIntermediate(Block[] blocks, int offset, DriverContext driverContext) {
+ assert blocks.length >= offset + 3;
+ blocks[offset + 0] = driverContext.blockFactory().newConstant$Type$BlockWith(value, 1);
+ blocks[offset + 1] = driverContext.blockFactory().newConstantBooleanBlockWith(seen, 1);
+ blocks[offset + 2] = driverContext.blockFactory().newConstantBooleanBlockWith(failed, 1);
+ }
+
+ @Override
+ public void close() {}
+}
From 17e9ea25b0544af7b80f6c75bb04ee83b8c8fe79 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Iv=C3=A1n=20Cea=20Fontenla?=
Date: Mon, 5 Aug 2024 15:28:41 +0200
Subject: [PATCH 03/31] Added fallible array state
---
x-pack/plugin/esql/compute/build.gradle | 26 +++
.../BooleanFallibleArrayState.java | 125 +++++++++++++
.../aggregation/DoubleFallibleArrayState.java | 124 +++++++++++++
.../aggregation/FloatFallibleArrayState.java | 124 +++++++++++++
.../aggregation/IntFallibleArrayState.java | 124 +++++++++++++
.../aggregation/LongFallibleArrayState.java | 130 ++++++++++++++
.../AbstractFallibleArrayState.java | 69 ++++++++
.../aggregation/X-FallibleArrayState.java.st | 166 ++++++++++++++++++
8 files changed, 888 insertions(+)
create mode 100644 x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/BooleanFallibleArrayState.java
create mode 100644 x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/DoubleFallibleArrayState.java
create mode 100644 x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FloatFallibleArrayState.java
create mode 100644 x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/IntFallibleArrayState.java
create mode 100644 x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LongFallibleArrayState.java
create mode 100644 x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/AbstractFallibleArrayState.java
create mode 100644 x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-FallibleArrayState.java.st
diff --git a/x-pack/plugin/esql/compute/build.gradle b/x-pack/plugin/esql/compute/build.gradle
index d4795ce8d129c..136b3b5313c90 100644
--- a/x-pack/plugin/esql/compute/build.gradle
+++ b/x-pack/plugin/esql/compute/build.gradle
@@ -517,6 +517,32 @@ tasks.named('stringTemplates').configure {
it.inputFile = arrayStateInputFile
it.outputFile = "org/elasticsearch/compute/aggregation/FloatArrayState.java"
}
+ File fallibleArrayStateInputFile = new File("${projectDir}/src/main/java/org/elasticsearch/compute/aggregation/X-FallibleArrayState.java.st")
+ template {
+ it.properties = booleanProperties
+ it.inputFile = fallibleArrayStateInputFile
+ it.outputFile = "org/elasticsearch/compute/aggregation/BooleanFallibleArrayState.java"
+ }
+ template {
+ it.properties = intProperties
+ it.inputFile = fallibleArrayStateInputFile
+ it.outputFile = "org/elasticsearch/compute/aggregation/IntFallibleArrayState.java"
+ }
+ template {
+ it.properties = longProperties
+ it.inputFile = fallibleArrayStateInputFile
+ it.outputFile = "org/elasticsearch/compute/aggregation/LongFallibleArrayState.java"
+ }
+ template {
+ it.properties = doubleProperties
+ it.inputFile = fallibleArrayStateInputFile
+ it.outputFile = "org/elasticsearch/compute/aggregation/DoubleFallibleArrayState.java"
+ }
+ template {
+ it.properties = floatProperties
+ it.inputFile = fallibleArrayStateInputFile
+ it.outputFile = "org/elasticsearch/compute/aggregation/FloatFallibleArrayState.java"
+ }
File valuesAggregatorInputFile = new File("${projectDir}/src/main/java/org/elasticsearch/compute/aggregation/X-ValuesAggregator.java.st")
template {
it.properties = intProperties
diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/BooleanFallibleArrayState.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/BooleanFallibleArrayState.java
new file mode 100644
index 0000000000000..6367fdfb6617e
--- /dev/null
+++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/BooleanFallibleArrayState.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.aggregation;
+
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.util.BitArray;
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BooleanBlock;
+import org.elasticsearch.compute.data.IntVector;
+import org.elasticsearch.compute.operator.DriverContext;
+import org.elasticsearch.core.Releasables;
+
+/**
+ * Aggregator state for an array of booleans, that also tracks failures.
+ * It is created in a mode where it won't track
+ * the {@code groupId}s that are sent to it and it is the
+ * responsibility of the caller to only fetch values for {@code groupId}s
+ * that it has sent using the {@code selected} parameter when building the
+ * results. This is fine when there are no {@code null} values in the input
+ * data. But once there are null values in the input data it is
+ * much more convenient to only send non-null values and
+ * the tracking built into the grouping code can't track that. In that case
+ * call {@link #enableGroupIdTracking} to transition the state into a mode
+ * where it'll track which {@code groupIds} have been written.
+ *
+ * This class is generated. Do not edit it.
+ *
+ */
+final class BooleanFallibleArrayState extends AbstractFallibleArrayState implements GroupingAggregatorState {
+ private final boolean init;
+
+ private BitArray values;
+ private int size;
+
+ BooleanFallibleArrayState(BigArrays bigArrays, boolean init) {
+ super(bigArrays);
+ this.values = new BitArray(1, bigArrays);
+ this.size = 1;
+ this.values.set(0, init);
+ this.init = init;
+ }
+
+ boolean get(int groupId) {
+ return values.get(groupId);
+ }
+
+ boolean getOrDefault(int groupId) {
+ return groupId < size ? values.get(groupId) : init;
+ }
+
+ void set(int groupId, boolean value) {
+ ensureCapacity(groupId);
+ values.set(groupId, value);
+ trackGroupId(groupId);
+ }
+
+ Block toValuesBlock(org.elasticsearch.compute.data.IntVector selected, DriverContext driverContext) {
+ if (false == trackingGroupIds() && false == anyFailure()) {
+ try (var builder = driverContext.blockFactory().newBooleanVectorFixedBuilder(selected.getPositionCount())) {
+ for (int i = 0; i < selected.getPositionCount(); i++) {
+ builder.appendBoolean(i, values.get(selected.getInt(i)));
+ }
+ return builder.build().asBlock();
+ }
+ }
+ try (BooleanBlock.Builder builder = driverContext.blockFactory().newBooleanBlockBuilder(selected.getPositionCount())) {
+ for (int i = 0; i < selected.getPositionCount(); i++) {
+ int group = selected.getInt(i);
+ if (hasValue(group) && !hasFailed(group)) {
+ builder.appendBoolean(values.get(group));
+ } else {
+ builder.appendNull();
+ }
+ }
+ return builder.build();
+ }
+ }
+
+ private void ensureCapacity(int groupId) {
+ if (groupId >= size) {
+ values.fill(size, groupId + 1, init);
+ size = groupId + 1;
+ }
+ }
+
+ /** Extracts an intermediate view of the contents of this state. */
+ @Override
+ public void toIntermediate(
+ Block[] blocks,
+ int offset,
+ IntVector selected,
+ org.elasticsearch.compute.operator.DriverContext driverContext
+ ) {
+ assert blocks.length >= offset + 3;
+ try (
+ var valuesBuilder = driverContext.blockFactory().newBooleanBlockBuilder(selected.getPositionCount());
+ var hasValueBuilder = driverContext.blockFactory().newBooleanVectorFixedBuilder(selected.getPositionCount());
+ var hasFailedBuilder = driverContext.blockFactory().newBooleanVectorFixedBuilder(selected.getPositionCount())
+ ) {
+ for (int i = 0; i < selected.getPositionCount(); i++) {
+ int group = selected.getInt(i);
+ if (group < size) {
+ valuesBuilder.appendBoolean(values.get(group));
+ } else {
+ valuesBuilder.appendBoolean(false); // TODO can we just use null?
+ }
+ hasValueBuilder.appendBoolean(i, hasValue(group));
+ hasFailedBuilder.appendBoolean(i, hasFailed(group));
+ }
+ blocks[offset + 0] = valuesBuilder.build();
+ blocks[offset + 1] = hasValueBuilder.build().asBlock();
+ blocks[offset + 2] = hasFailedBuilder.build().asBlock();
+ }
+ }
+
+ @Override
+ public void close() {
+ Releasables.close(values, super::close);
+ }
+}
diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/DoubleFallibleArrayState.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/DoubleFallibleArrayState.java
new file mode 100644
index 0000000000000..dd1d60f7bd246
--- /dev/null
+++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/DoubleFallibleArrayState.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.aggregation;
+
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.util.DoubleArray;
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.DoubleBlock;
+import org.elasticsearch.compute.data.IntVector;
+import org.elasticsearch.compute.operator.DriverContext;
+import org.elasticsearch.core.Releasables;
+
+/**
+ * Aggregator state for an array of doubles, that also tracks failures.
+ * It is created in a mode where it won't track
+ * the {@code groupId}s that are sent to it and it is the
+ * responsibility of the caller to only fetch values for {@code groupId}s
+ * that it has sent using the {@code selected} parameter when building the
+ * results. This is fine when there are no {@code null} values in the input
+ * data. But once there are null values in the input data it is
+ * much more convenient to only send non-null values and
+ * the tracking built into the grouping code can't track that. In that case
+ * call {@link #enableGroupIdTracking} to transition the state into a mode
+ * where it'll track which {@code groupIds} have been written.
+ *
+ * This class is generated. Do not edit it.
+ *
+ */
+final class DoubleFallibleArrayState extends AbstractFallibleArrayState implements GroupingAggregatorState {
+ private final double init;
+
+ private DoubleArray values;
+
+ DoubleFallibleArrayState(BigArrays bigArrays, double init) {
+ super(bigArrays);
+ this.values = bigArrays.newDoubleArray(1, false);
+ this.values.set(0, init);
+ this.init = init;
+ }
+
+ double get(int groupId) {
+ return values.get(groupId);
+ }
+
+ double getOrDefault(int groupId) {
+ return groupId < values.size() ? values.get(groupId) : init;
+ }
+
+ void set(int groupId, double value) {
+ ensureCapacity(groupId);
+ values.set(groupId, value);
+ trackGroupId(groupId);
+ }
+
+ Block toValuesBlock(org.elasticsearch.compute.data.IntVector selected, DriverContext driverContext) {
+ if (false == trackingGroupIds() && false == anyFailure()) {
+ try (var builder = driverContext.blockFactory().newDoubleVectorFixedBuilder(selected.getPositionCount())) {
+ for (int i = 0; i < selected.getPositionCount(); i++) {
+ builder.appendDouble(i, values.get(selected.getInt(i)));
+ }
+ return builder.build().asBlock();
+ }
+ }
+ try (DoubleBlock.Builder builder = driverContext.blockFactory().newDoubleBlockBuilder(selected.getPositionCount())) {
+ for (int i = 0; i < selected.getPositionCount(); i++) {
+ int group = selected.getInt(i);
+ if (hasValue(group) && !hasFailed(group)) {
+ builder.appendDouble(values.get(group));
+ } else {
+ builder.appendNull();
+ }
+ }
+ return builder.build();
+ }
+ }
+
+ private void ensureCapacity(int groupId) {
+ if (groupId >= values.size()) {
+ long prevSize = values.size();
+ values = bigArrays.grow(values, groupId + 1);
+ values.fill(prevSize, values.size(), init);
+ }
+ }
+
+ /** Extracts an intermediate view of the contents of this state. */
+ @Override
+ public void toIntermediate(
+ Block[] blocks,
+ int offset,
+ IntVector selected,
+ org.elasticsearch.compute.operator.DriverContext driverContext
+ ) {
+ assert blocks.length >= offset + 3;
+ try (
+ var valuesBuilder = driverContext.blockFactory().newDoubleBlockBuilder(selected.getPositionCount());
+ var hasValueBuilder = driverContext.blockFactory().newBooleanVectorFixedBuilder(selected.getPositionCount());
+ var hasFailedBuilder = driverContext.blockFactory().newBooleanVectorFixedBuilder(selected.getPositionCount())
+ ) {
+ for (int i = 0; i < selected.getPositionCount(); i++) {
+ int group = selected.getInt(i);
+ if (group < values.size()) {
+ valuesBuilder.appendDouble(values.get(group));
+ } else {
+ valuesBuilder.appendDouble(0); // TODO can we just use null?
+ }
+ hasValueBuilder.appendBoolean(i, hasValue(group));
+ hasFailedBuilder.appendBoolean(i, hasFailed(group));
+ }
+ blocks[offset + 0] = valuesBuilder.build();
+ blocks[offset + 1] = hasValueBuilder.build().asBlock();
+ blocks[offset + 2] = hasFailedBuilder.build().asBlock();
+ }
+ }
+
+ @Override
+ public void close() {
+ Releasables.close(values, super::close);
+ }
+}
diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FloatFallibleArrayState.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FloatFallibleArrayState.java
new file mode 100644
index 0000000000000..055cf345033c5
--- /dev/null
+++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FloatFallibleArrayState.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.aggregation;
+
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.util.FloatArray;
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.FloatBlock;
+import org.elasticsearch.compute.data.IntVector;
+import org.elasticsearch.compute.operator.DriverContext;
+import org.elasticsearch.core.Releasables;
+
+/**
+ * Aggregator state for an array of floats, that also tracks failures.
+ * It is created in a mode where it won't track
+ * the {@code groupId}s that are sent to it and it is the
+ * responsibility of the caller to only fetch values for {@code groupId}s
+ * that it has sent using the {@code selected} parameter when building the
+ * results. This is fine when there are no {@code null} values in the input
+ * data. But once there are null values in the input data it is
+ * much more convenient to only send non-null values and
+ * the tracking built into the grouping code can't track that. In that case
+ * call {@link #enableGroupIdTracking} to transition the state into a mode
+ * where it'll track which {@code groupIds} have been written.
+ *
+ * This class is generated. Do not edit it.
+ *
+ */
+final class FloatFallibleArrayState extends AbstractFallibleArrayState implements GroupingAggregatorState {
+ private final float init;
+
+ private FloatArray values;
+
+ FloatFallibleArrayState(BigArrays bigArrays, float init) {
+ super(bigArrays);
+ this.values = bigArrays.newFloatArray(1, false);
+ this.values.set(0, init);
+ this.init = init;
+ }
+
+ float get(int groupId) {
+ return values.get(groupId);
+ }
+
+ float getOrDefault(int groupId) {
+ return groupId < values.size() ? values.get(groupId) : init;
+ }
+
+ void set(int groupId, float value) {
+ ensureCapacity(groupId);
+ values.set(groupId, value);
+ trackGroupId(groupId);
+ }
+
+ Block toValuesBlock(org.elasticsearch.compute.data.IntVector selected, DriverContext driverContext) {
+ if (false == trackingGroupIds() && false == anyFailure()) {
+ try (var builder = driverContext.blockFactory().newFloatVectorFixedBuilder(selected.getPositionCount())) {
+ for (int i = 0; i < selected.getPositionCount(); i++) {
+ builder.appendFloat(i, values.get(selected.getInt(i)));
+ }
+ return builder.build().asBlock();
+ }
+ }
+ try (FloatBlock.Builder builder = driverContext.blockFactory().newFloatBlockBuilder(selected.getPositionCount())) {
+ for (int i = 0; i < selected.getPositionCount(); i++) {
+ int group = selected.getInt(i);
+ if (hasValue(group) && !hasFailed(group)) {
+ builder.appendFloat(values.get(group));
+ } else {
+ builder.appendNull();
+ }
+ }
+ return builder.build();
+ }
+ }
+
+ private void ensureCapacity(int groupId) {
+ if (groupId >= values.size()) {
+ long prevSize = values.size();
+ values = bigArrays.grow(values, groupId + 1);
+ values.fill(prevSize, values.size(), init);
+ }
+ }
+
+ /** Extracts an intermediate view of the contents of this state. */
+ @Override
+ public void toIntermediate(
+ Block[] blocks,
+ int offset,
+ IntVector selected,
+ org.elasticsearch.compute.operator.DriverContext driverContext
+ ) {
+ assert blocks.length >= offset + 3;
+ try (
+ var valuesBuilder = driverContext.blockFactory().newFloatBlockBuilder(selected.getPositionCount());
+ var hasValueBuilder = driverContext.blockFactory().newBooleanVectorFixedBuilder(selected.getPositionCount());
+ var hasFailedBuilder = driverContext.blockFactory().newBooleanVectorFixedBuilder(selected.getPositionCount())
+ ) {
+ for (int i = 0; i < selected.getPositionCount(); i++) {
+ int group = selected.getInt(i);
+ if (group < values.size()) {
+ valuesBuilder.appendFloat(values.get(group));
+ } else {
+ valuesBuilder.appendFloat(0); // TODO can we just use null?
+ }
+ hasValueBuilder.appendBoolean(i, hasValue(group));
+ hasFailedBuilder.appendBoolean(i, hasFailed(group));
+ }
+ blocks[offset + 0] = valuesBuilder.build();
+ blocks[offset + 1] = hasValueBuilder.build().asBlock();
+ blocks[offset + 2] = hasFailedBuilder.build().asBlock();
+ }
+ }
+
+ @Override
+ public void close() {
+ Releasables.close(values, super::close);
+ }
+}
diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/IntFallibleArrayState.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/IntFallibleArrayState.java
new file mode 100644
index 0000000000000..e45d84720ca1a
--- /dev/null
+++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/IntFallibleArrayState.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.aggregation;
+
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.util.IntArray;
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.IntBlock;
+import org.elasticsearch.compute.data.IntVector;
+import org.elasticsearch.compute.operator.DriverContext;
+import org.elasticsearch.core.Releasables;
+
+/**
+ * Aggregator state for an array of ints, that also tracks failures.
+ * It is created in a mode where it won't track
+ * the {@code groupId}s that are sent to it and it is the
+ * responsibility of the caller to only fetch values for {@code groupId}s
+ * that it has sent using the {@code selected} parameter when building the
+ * results. This is fine when there are no {@code null} values in the input
+ * data. But once there are null values in the input data it is
+ * much more convenient to only send non-null values and
+ * the tracking built into the grouping code can't track that. In that case
+ * call {@link #enableGroupIdTracking} to transition the state into a mode
+ * where it'll track which {@code groupIds} have been written.
+ *
+ * This class is generated. Do not edit it.
+ *
+ */
+final class IntFallibleArrayState extends AbstractFallibleArrayState implements GroupingAggregatorState {
+ private final int init;
+
+ private IntArray values;
+
+ IntFallibleArrayState(BigArrays bigArrays, int init) {
+ super(bigArrays);
+ this.values = bigArrays.newIntArray(1, false);
+ this.values.set(0, init);
+ this.init = init;
+ }
+
+ int get(int groupId) {
+ return values.get(groupId);
+ }
+
+ int getOrDefault(int groupId) {
+ return groupId < values.size() ? values.get(groupId) : init;
+ }
+
+ void set(int groupId, int value) {
+ ensureCapacity(groupId);
+ values.set(groupId, value);
+ trackGroupId(groupId);
+ }
+
+ Block toValuesBlock(org.elasticsearch.compute.data.IntVector selected, DriverContext driverContext) {
+ if (false == trackingGroupIds() && false == anyFailure()) {
+ try (var builder = driverContext.blockFactory().newIntVectorFixedBuilder(selected.getPositionCount())) {
+ for (int i = 0; i < selected.getPositionCount(); i++) {
+ builder.appendInt(i, values.get(selected.getInt(i)));
+ }
+ return builder.build().asBlock();
+ }
+ }
+ try (IntBlock.Builder builder = driverContext.blockFactory().newIntBlockBuilder(selected.getPositionCount())) {
+ for (int i = 0; i < selected.getPositionCount(); i++) {
+ int group = selected.getInt(i);
+ if (hasValue(group) && !hasFailed(group)) {
+ builder.appendInt(values.get(group));
+ } else {
+ builder.appendNull();
+ }
+ }
+ return builder.build();
+ }
+ }
+
+ private void ensureCapacity(int groupId) {
+ if (groupId >= values.size()) {
+ long prevSize = values.size();
+ values = bigArrays.grow(values, groupId + 1);
+ values.fill(prevSize, values.size(), init);
+ }
+ }
+
+ /** Extracts an intermediate view of the contents of this state. */
+ @Override
+ public void toIntermediate(
+ Block[] blocks,
+ int offset,
+ IntVector selected,
+ org.elasticsearch.compute.operator.DriverContext driverContext
+ ) {
+ assert blocks.length >= offset + 3;
+ try (
+ var valuesBuilder = driverContext.blockFactory().newIntBlockBuilder(selected.getPositionCount());
+ var hasValueBuilder = driverContext.blockFactory().newBooleanVectorFixedBuilder(selected.getPositionCount());
+ var hasFailedBuilder = driverContext.blockFactory().newBooleanVectorFixedBuilder(selected.getPositionCount())
+ ) {
+ for (int i = 0; i < selected.getPositionCount(); i++) {
+ int group = selected.getInt(i);
+ if (group < values.size()) {
+ valuesBuilder.appendInt(values.get(group));
+ } else {
+ valuesBuilder.appendInt(0); // TODO can we just use null?
+ }
+ hasValueBuilder.appendBoolean(i, hasValue(group));
+ hasFailedBuilder.appendBoolean(i, hasFailed(group));
+ }
+ blocks[offset + 0] = valuesBuilder.build();
+ blocks[offset + 1] = hasValueBuilder.build().asBlock();
+ blocks[offset + 2] = hasFailedBuilder.build().asBlock();
+ }
+ }
+
+ @Override
+ public void close() {
+ Releasables.close(values, super::close);
+ }
+}
diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LongFallibleArrayState.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LongFallibleArrayState.java
new file mode 100644
index 0000000000000..cb69579906871
--- /dev/null
+++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LongFallibleArrayState.java
@@ -0,0 +1,130 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.aggregation;
+
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.util.LongArray;
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.IntVector;
+import org.elasticsearch.compute.data.LongBlock;
+import org.elasticsearch.compute.operator.DriverContext;
+import org.elasticsearch.core.Releasables;
+
+/**
+ * Aggregator state for an array of longs, that also tracks failures.
+ * It is created in a mode where it won't track
+ * the {@code groupId}s that are sent to it and it is the
+ * responsibility of the caller to only fetch values for {@code groupId}s
+ * that it has sent using the {@code selected} parameter when building the
+ * results. This is fine when there are no {@code null} values in the input
+ * data. But once there are null values in the input data it is
+ * much more convenient to only send non-null values and
+ * the tracking built into the grouping code can't track that. In that case
+ * call {@link #enableGroupIdTracking} to transition the state into a mode
+ * where it'll track which {@code groupIds} have been written.
+ *
+ * This class is generated. Do not edit it.
+ *
+ */
+final class LongFallibleArrayState extends AbstractFallibleArrayState implements GroupingAggregatorState {
+ private final long init;
+
+ private LongArray values;
+
+ LongFallibleArrayState(BigArrays bigArrays, long init) {
+ super(bigArrays);
+ this.values = bigArrays.newLongArray(1, false);
+ this.values.set(0, init);
+ this.init = init;
+ }
+
+ long get(int groupId) {
+ return values.get(groupId);
+ }
+
+ long getOrDefault(int groupId) {
+ return groupId < values.size() ? values.get(groupId) : init;
+ }
+
+ void set(int groupId, long value) {
+ ensureCapacity(groupId);
+ values.set(groupId, value);
+ trackGroupId(groupId);
+ }
+
+ void increment(int groupId, long value) {
+ ensureCapacity(groupId);
+ values.increment(groupId, value);
+ trackGroupId(groupId);
+ }
+
+ Block toValuesBlock(org.elasticsearch.compute.data.IntVector selected, DriverContext driverContext) {
+ if (false == trackingGroupIds() && false == anyFailure()) {
+ try (var builder = driverContext.blockFactory().newLongVectorFixedBuilder(selected.getPositionCount())) {
+ for (int i = 0; i < selected.getPositionCount(); i++) {
+ builder.appendLong(i, values.get(selected.getInt(i)));
+ }
+ return builder.build().asBlock();
+ }
+ }
+ try (LongBlock.Builder builder = driverContext.blockFactory().newLongBlockBuilder(selected.getPositionCount())) {
+ for (int i = 0; i < selected.getPositionCount(); i++) {
+ int group = selected.getInt(i);
+ if (hasValue(group) && !hasFailed(group)) {
+ builder.appendLong(values.get(group));
+ } else {
+ builder.appendNull();
+ }
+ }
+ return builder.build();
+ }
+ }
+
+ private void ensureCapacity(int groupId) {
+ if (groupId >= values.size()) {
+ long prevSize = values.size();
+ values = bigArrays.grow(values, groupId + 1);
+ values.fill(prevSize, values.size(), init);
+ }
+ }
+
+ /** Extracts an intermediate view of the contents of this state. */
+ @Override
+ public void toIntermediate(
+ Block[] blocks,
+ int offset,
+ IntVector selected,
+ org.elasticsearch.compute.operator.DriverContext driverContext
+ ) {
+ assert blocks.length >= offset + 3;
+ try (
+ var valuesBuilder = driverContext.blockFactory().newLongBlockBuilder(selected.getPositionCount());
+ var hasValueBuilder = driverContext.blockFactory().newBooleanVectorFixedBuilder(selected.getPositionCount());
+ var hasFailedBuilder = driverContext.blockFactory().newBooleanVectorFixedBuilder(selected.getPositionCount())
+ ) {
+ for (int i = 0; i < selected.getPositionCount(); i++) {
+ int group = selected.getInt(i);
+ if (group < values.size()) {
+ valuesBuilder.appendLong(values.get(group));
+ } else {
+ valuesBuilder.appendLong(0); // TODO can we just use null?
+ }
+ hasValueBuilder.appendBoolean(i, hasValue(group));
+ hasFailedBuilder.appendBoolean(i, hasFailed(group));
+ }
+ blocks[offset + 0] = valuesBuilder.build();
+ blocks[offset + 1] = hasValueBuilder.build().asBlock();
+ blocks[offset + 2] = hasFailedBuilder.build().asBlock();
+ }
+ }
+
+ @Override
+ public void close() {
+ Releasables.close(values, super::close);
+ }
+}
diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/AbstractFallibleArrayState.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/AbstractFallibleArrayState.java
new file mode 100644
index 0000000000000..8a5aa7580d927
--- /dev/null
+++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/AbstractFallibleArrayState.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.aggregation;
+
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.util.BitArray;
+import org.elasticsearch.core.Releasable;
+import org.elasticsearch.core.Releasables;
+
+public class AbstractFallibleArrayState implements Releasable {
+ protected final BigArrays bigArrays;
+
+ private BitArray seen;
+ private BitArray failed;
+
+ public AbstractFallibleArrayState(BigArrays bigArrays) {
+ this.bigArrays = bigArrays;
+ }
+
+ final boolean hasValue(int groupId) {
+ return seen == null || seen.get(groupId);
+ }
+
+ final boolean hasFailed(int groupId) {
+ return failed != null && failed.get(groupId);
+ }
+
+ /**
+ * Switches this array state into tracking which group ids are set. This is
+ * idempotent and fast if already tracking so it's safe to, say, call it once
+ * for every block of values that arrives containing {@code null}.
+ */
+ final void enableGroupIdTracking(SeenGroupIds seenGroupIds) {
+ if (seen == null) {
+ seen = seenGroupIds.seenGroupIds(bigArrays);
+ }
+ }
+
+ protected final void trackGroupId(int groupId) {
+ if (trackingGroupIds()) {
+ seen.set(groupId);
+ }
+ }
+
+ protected final boolean trackingGroupIds() {
+ return seen != null;
+ }
+
+ protected final boolean anyFailure() {
+ return failed != null;
+ }
+
+ protected final void setFailed(int groupId) {
+ if (failed == null) {
+ failed = new BitArray(groupId + 1, bigArrays);
+ }
+ failed.set(groupId);
+ }
+
+ @Override
+ public void close() {
+ Releasables.close(seen);
+ }
+}
diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-FallibleArrayState.java.st b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-FallibleArrayState.java.st
new file mode 100644
index 0000000000000..3c57ab948a79f
--- /dev/null
+++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-FallibleArrayState.java.st
@@ -0,0 +1,166 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.aggregation;
+
+import org.elasticsearch.common.util.BigArrays;
+$if(boolean)$
+import org.elasticsearch.common.util.BitArray;
+$else$
+import org.elasticsearch.common.util.$Type$Array;
+$endif$
+import org.elasticsearch.compute.data.Block;
+$if(long)$
+import org.elasticsearch.compute.data.IntVector;
+$endif$
+import org.elasticsearch.compute.data.$Type$Block;
+$if(int)$
+import org.elasticsearch.compute.data.$Type$Vector;
+$endif$
+$if(boolean||double||float)$
+import org.elasticsearch.compute.data.IntVector;
+$endif$
+import org.elasticsearch.compute.operator.DriverContext;
+import org.elasticsearch.core.Releasables;
+
+/**
+ * Aggregator state for an array of $type$s, that also tracks failures.
+ * It is created in a mode where it won't track
+ * the {@code groupId}s that are sent to it and it is the
+ * responsibility of the caller to only fetch values for {@code groupId}s
+ * that it has sent using the {@code selected} parameter when building the
+ * results. This is fine when there are no {@code null} values in the input
+ * data. But once there are null values in the input data it is
+ * much more convenient to only send non-null values and
+ * the tracking built into the grouping code can't track that. In that case
+ * call {@link #enableGroupIdTracking} to transition the state into a mode
+ * where it'll track which {@code groupIds} have been written.
+ *
+ * This class is generated. Do not edit it.
+ *
+ */
+final class $Type$FallibleArrayState extends AbstractFallibleArrayState implements GroupingAggregatorState {
+ private final $type$ init;
+
+$if(boolean)$
+ private BitArray values;
+ private int size;
+
+$else$
+ private $Type$Array values;
+$endif$
+
+ $Type$FallibleArrayState(BigArrays bigArrays, $type$ init) {
+ super(bigArrays);
+$if(boolean)$
+ this.values = new BitArray(1, bigArrays);
+ this.size = 1;
+$else$
+ this.values = bigArrays.new$Type$Array(1, false);
+$endif$
+ this.values.set(0, init);
+ this.init = init;
+ }
+
+ $type$ get(int groupId) {
+ return values.get(groupId);
+ }
+
+ $type$ getOrDefault(int groupId) {
+$if(boolean)$
+ return groupId < size ? values.get(groupId) : init;
+$else$
+ return groupId < values.size() ? values.get(groupId) : init;
+$endif$
+ }
+
+ void set(int groupId, $type$ value) {
+ ensureCapacity(groupId);
+ values.set(groupId, value);
+ trackGroupId(groupId);
+ }
+
+$if(long)$
+ void increment(int groupId, long value) {
+ ensureCapacity(groupId);
+ values.increment(groupId, value);
+ trackGroupId(groupId);
+ }
+$endif$
+
+ Block toValuesBlock(org.elasticsearch.compute.data.IntVector selected, DriverContext driverContext) {
+ if (false == trackingGroupIds() && false == anyFailure()) {
+ try (var builder = driverContext.blockFactory().new$Type$VectorFixedBuilder(selected.getPositionCount())) {
+ for (int i = 0; i < selected.getPositionCount(); i++) {
+ builder.append$Type$(i, values.get(selected.getInt(i)));
+ }
+ return builder.build().asBlock();
+ }
+ }
+ try ($Type$Block.Builder builder = driverContext.blockFactory().new$Type$BlockBuilder(selected.getPositionCount())) {
+ for (int i = 0; i < selected.getPositionCount(); i++) {
+ int group = selected.getInt(i);
+ if (hasValue(group) && !hasFailed(group)) {
+ builder.append$Type$(values.get(group));
+ } else {
+ builder.appendNull();
+ }
+ }
+ return builder.build();
+ }
+ }
+
+ private void ensureCapacity(int groupId) {
+$if(boolean)$
+ if (groupId >= size) {
+ values.fill(size, groupId + 1, init);
+ size = groupId + 1;
+ }
+$else$
+ if (groupId >= values.size()) {
+ long prevSize = values.size();
+ values = bigArrays.grow(values, groupId + 1);
+ values.fill(prevSize, values.size(), init);
+ }
+$endif$
+ }
+
+ /** Extracts an intermediate view of the contents of this state. */
+ @Override
+ public void toIntermediate(
+ Block[] blocks,
+ int offset,
+ IntVector selected,
+ org.elasticsearch.compute.operator.DriverContext driverContext
+ ) {
+ assert blocks.length >= offset + 3;
+ try (
+ var valuesBuilder = driverContext.blockFactory().new$Type$BlockBuilder(selected.getPositionCount());
+ var hasValueBuilder = driverContext.blockFactory().newBooleanVectorFixedBuilder(selected.getPositionCount());
+ var hasFailedBuilder = driverContext.blockFactory().newBooleanVectorFixedBuilder(selected.getPositionCount())
+ ) {
+ for (int i = 0; i < selected.getPositionCount(); i++) {
+ int group = selected.getInt(i);
+ if (group < $if(boolean)$size$else$values.size()$endif$) {
+ valuesBuilder.append$Type$(values.get(group));
+ } else {
+ valuesBuilder.append$Type$($if(boolean)$false$else$0$endif$); // TODO can we just use null?
+ }
+ hasValueBuilder.appendBoolean(i, hasValue(group));
+ hasFailedBuilder.appendBoolean(i, hasFailed(group));
+ }
+ blocks[offset + 0] = valuesBuilder.build();
+ blocks[offset + 1] = hasValueBuilder.build().asBlock();
+ blocks[offset + 2] = hasFailedBuilder.build().asBlock();
+ }
+ }
+
+ @Override
+ public void close() {
+ Releasables.close(values, super::close);
+ }
+}
From 318d5cdfeaf3f5b30d6c6773ba7cc5aa0df6c329 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Iv=C3=A1n=20Cea=20Fontenla?=
Date: Tue, 6 Aug 2024 14:56:21 +0200
Subject: [PATCH 04/31] Added custom warning object to aggregator function
---
.../elasticsearch/compute/ann/Aggregator.java | 6 ++
...AggregatorFunctionSupplierImplementer.java | 40 ++++++++--
.../compute/gen/AggregatorImplementer.java | 35 +++++++--
.../compute/gen/AggregatorProcessor.java | 37 +++++++++-
.../org/elasticsearch/compute/gen/Types.java | 8 ++
.../SumLongAggregatorFunction.java | 11 ++-
.../SumLongAggregatorFunctionSupplier.java | 15 +++-
.../aggregation/SumLongAggregator.java | 5 +-
.../compute/aggregation/Warnings.java | 74 +++++++++++++++++++
.../expression/function/aggregate/Sum.java | 4 +-
10 files changed, 215 insertions(+), 20 deletions(-)
create mode 100644 x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/Warnings.java
diff --git a/x-pack/plugin/esql/compute/ann/src/main/java/org/elasticsearch/compute/ann/Aggregator.java b/x-pack/plugin/esql/compute/ann/src/main/java/org/elasticsearch/compute/ann/Aggregator.java
index 69db6a1310c9e..444dbcc1b9e58 100644
--- a/x-pack/plugin/esql/compute/ann/src/main/java/org/elasticsearch/compute/ann/Aggregator.java
+++ b/x-pack/plugin/esql/compute/ann/src/main/java/org/elasticsearch/compute/ann/Aggregator.java
@@ -57,4 +57,10 @@
IntermediateState[] value() default {};
+ /**
+ * Exceptions thrown by the `combine*(...)` methods to catch and convert
+ * into a warning and turn into a null value.
+ */
+ Class extends Exception>[] warnExceptions() default {};
+
}
diff --git a/x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/AggregatorFunctionSupplierImplementer.java b/x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/AggregatorFunctionSupplierImplementer.java
index 3f031db2978f9..e43a26e89cb48 100644
--- a/x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/AggregatorFunctionSupplierImplementer.java
+++ b/x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/AggregatorFunctionSupplierImplementer.java
@@ -10,6 +10,7 @@
import com.squareup.javapoet.ClassName;
import com.squareup.javapoet.JavaFile;
import com.squareup.javapoet.MethodSpec;
+import com.squareup.javapoet.TypeName;
import com.squareup.javapoet.TypeSpec;
import org.elasticsearch.compute.ann.Aggregator;
@@ -31,6 +32,7 @@
import static org.elasticsearch.compute.gen.Types.AGGREGATOR_FUNCTION_SUPPLIER;
import static org.elasticsearch.compute.gen.Types.DRIVER_CONTEXT;
import static org.elasticsearch.compute.gen.Types.LIST_INTEGER;
+import static org.elasticsearch.compute.gen.Types.STRING;
/**
* Implements "AggregationFunctionSupplier" from a class annotated with both
@@ -40,6 +42,7 @@ public class AggregatorFunctionSupplierImplementer {
private final TypeElement declarationType;
private final AggregatorImplementer aggregatorImplementer;
private final GroupingAggregatorImplementer groupingAggregatorImplementer;
+ private final boolean hasWarnings;
private final List createParameters;
private final ClassName implementation;
@@ -47,11 +50,13 @@ public AggregatorFunctionSupplierImplementer(
Elements elements,
TypeElement declarationType,
AggregatorImplementer aggregatorImplementer,
- GroupingAggregatorImplementer groupingAggregatorImplementer
+ GroupingAggregatorImplementer groupingAggregatorImplementer,
+ boolean hasWarnings
) {
this.declarationType = declarationType;
this.aggregatorImplementer = aggregatorImplementer;
this.groupingAggregatorImplementer = groupingAggregatorImplementer;
+ this.hasWarnings = hasWarnings;
Set createParameters = new LinkedHashSet<>();
if (aggregatorImplementer != null) {
@@ -86,6 +91,11 @@ private TypeSpec type() {
builder.addModifiers(Modifier.PUBLIC, Modifier.FINAL);
builder.addSuperinterface(AGGREGATOR_FUNCTION_SUPPLIER);
+ if (hasWarnings) {
+ builder.addField(TypeName.INT, "warningsLineNumber");
+ builder.addField(TypeName.INT, "warningsColumnNumber");
+ builder.addField(STRING, "warningsSourceText");
+ }
createParameters.stream().forEach(p -> p.declareField(builder));
builder.addMethod(ctor());
if (aggregatorImplementer != null) {
@@ -100,6 +110,14 @@ private TypeSpec type() {
private MethodSpec ctor() {
MethodSpec.Builder builder = MethodSpec.constructorBuilder().addModifiers(Modifier.PUBLIC);
+ if (hasWarnings) {
+ builder.addParameter(TypeName.INT, "warningsLineNumber");
+ builder.addParameter(TypeName.INT, "warningsColumnNumber");
+ builder.addParameter(STRING, "warningsSourceText");
+ builder.addStatement("this.warningsLineNumber = warningsLineNumber");
+ builder.addStatement("this.warningsColumnNumber = warningsColumnNumber");
+ builder.addStatement("this.warningsSourceText = warningsSourceText");
+ }
createParameters.stream().forEach(p -> p.buildCtor(builder));
return builder.build();
}
@@ -114,14 +132,26 @@ private MethodSpec unsupportedNonGroupingAggregator() {
}
private MethodSpec aggregator() {
- MethodSpec.Builder builder = MethodSpec.methodBuilder("aggregator")
- .addParameter(DRIVER_CONTEXT, "driverContext")
- .returns(aggregatorImplementer.implementation());
+ MethodSpec.Builder builder = MethodSpec.methodBuilder("aggregator");
builder.addAnnotation(Override.class).addModifiers(Modifier.PUBLIC);
+ builder.addParameter(DRIVER_CONTEXT, "driverContext");
+ builder.returns(aggregatorImplementer.implementation());
+
+ if (hasWarnings) {
+ builder.addStatement("var warnings = Warnings.createWarnings(driverContext.warningsMode(), " +
+ "warningsLineNumber, warningsColumnNumber, warningsSourceText)");
+ }
+
builder.addStatement(
"return $T.create($L)",
aggregatorImplementer.implementation(),
- Stream.concat(Stream.of("driverContext, channels"), aggregatorImplementer.createParameters().stream().map(Parameter::name))
+ Stream.concat(
+ Stream.concat(
+ hasWarnings ? Stream.of("warnings") : Stream.of(),
+ Stream.of("driverContext, channels")
+ ),
+ aggregatorImplementer.createParameters().stream().map(Parameter::name)
+ )
.collect(Collectors.joining(", "))
);
diff --git a/x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/AggregatorImplementer.java b/x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/AggregatorImplementer.java
index b3d32a82cc7a9..f302737ea79f2 100644
--- a/x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/AggregatorImplementer.java
+++ b/x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/AggregatorImplementer.java
@@ -25,6 +25,7 @@
import javax.lang.model.element.ExecutableElement;
import javax.lang.model.element.Modifier;
import javax.lang.model.element.TypeElement;
+import javax.lang.model.type.TypeMirror;
import javax.lang.model.util.Elements;
import static java.util.stream.Collectors.joining;
@@ -40,6 +41,7 @@
import static org.elasticsearch.compute.gen.Types.BYTES_REF;
import static org.elasticsearch.compute.gen.Types.BYTES_REF_BLOCK;
import static org.elasticsearch.compute.gen.Types.BYTES_REF_VECTOR;
+import static org.elasticsearch.compute.gen.Types.COMPUTE_WARNINGS;
import static org.elasticsearch.compute.gen.Types.DOUBLE_BLOCK;
import static org.elasticsearch.compute.gen.Types.DOUBLE_VECTOR;
import static org.elasticsearch.compute.gen.Types.DRIVER_CONTEXT;
@@ -68,6 +70,7 @@
*/
public class AggregatorImplementer {
private final TypeElement declarationType;
+ private final List warnExceptions;
private final ExecutableElement init;
private final ExecutableElement combine;
private final ExecutableElement combineValueCount;
@@ -80,8 +83,14 @@ public class AggregatorImplementer {
private final List intermediateState;
private final List createParameters;
- public AggregatorImplementer(Elements elements, TypeElement declarationType, IntermediateState[] interStateAnno) {
+ public AggregatorImplementer(
+ Elements elements,
+ TypeElement declarationType,
+ IntermediateState[] interStateAnno,
+ List warnExceptions
+ ) {
this.declarationType = declarationType;
+ this.warnExceptions = warnExceptions;
this.init = findRequiredMethod(declarationType, new String[] { "init", "initSingle" }, e -> true);
this.stateType = choseStateType();
@@ -202,6 +211,11 @@ private TypeSpec type() {
.initializer(initInterState())
.build()
);
+
+ if (warnExceptions.isEmpty() == false) {
+ builder.addField(COMPUTE_WARNINGS, "warnings", Modifier.PRIVATE, Modifier.FINAL);
+ }
+
builder.addField(DRIVER_CONTEXT, "driverContext", Modifier.PRIVATE, Modifier.FINAL);
builder.addField(stateType, "state", Modifier.PRIVATE, Modifier.FINAL);
builder.addField(LIST_INTEGER, "channels", Modifier.PRIVATE, Modifier.FINAL);
@@ -228,17 +242,22 @@ private TypeSpec type() {
private MethodSpec create() {
MethodSpec.Builder builder = MethodSpec.methodBuilder("create");
builder.addModifiers(Modifier.PUBLIC, Modifier.STATIC).returns(implementation);
+ if (warnExceptions.isEmpty() == false) {
+ builder.addParameter(COMPUTE_WARNINGS, "warnings");
+ }
builder.addParameter(DRIVER_CONTEXT, "driverContext");
builder.addParameter(LIST_INTEGER, "channels");
for (Parameter p : createParameters) {
builder.addParameter(p.type(), p.name());
}
if (createParameters.isEmpty()) {
- builder.addStatement("return new $T(driverContext, channels, $L)", implementation, callInit());
+ builder.addStatement("return new $T($LdriverContext, channels, $L)", implementation,
+ warnExceptions.isEmpty() ? "" : "warnings, ", callInit());
} else {
builder.addStatement(
- "return new $T(driverContext, channels, $L, $L)",
+ "return new $T($LdriverContext, channels, $L, $L)",
implementation,
+ warnExceptions.isEmpty() ? "" : "warnings, ",
callInit(),
createParameters.stream().map(p -> p.name()).collect(joining(", "))
);
@@ -275,16 +294,22 @@ private CodeBlock initInterState() {
private MethodSpec ctor() {
MethodSpec.Builder builder = MethodSpec.constructorBuilder().addModifiers(Modifier.PUBLIC);
+ if (warnExceptions.isEmpty() == false) {
+ builder.addParameter(COMPUTE_WARNINGS, "warnings");
+ }
builder.addParameter(DRIVER_CONTEXT, "driverContext");
builder.addParameter(LIST_INTEGER, "channels");
builder.addParameter(stateType, "state");
+
builder.addStatement("this.driverContext = driverContext");
+ if (warnExceptions.isEmpty() == false) {
+ builder.addStatement("this.warnings = warnings");
+ }
builder.addStatement("this.channels = channels");
builder.addStatement("this.state = state");
for (Parameter p : createParameters()) {
- builder.addParameter(p.type(), p.name());
- builder.addStatement("this.$N = $N", p.name(), p.name());
+ p.buildCtor(builder);
}
return builder.build();
}
diff --git a/x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/AggregatorProcessor.java b/x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/AggregatorProcessor.java
index d07b24047b7e2..0adcd4d8a6cc3 100644
--- a/x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/AggregatorProcessor.java
+++ b/x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/AggregatorProcessor.java
@@ -10,11 +10,15 @@
import com.squareup.javapoet.JavaFile;
import org.elasticsearch.compute.ann.Aggregator;
+import org.elasticsearch.compute.ann.ConvertEvaluator;
+import org.elasticsearch.compute.ann.Evaluator;
import org.elasticsearch.compute.ann.GroupingAggregator;
import org.elasticsearch.compute.ann.IntermediateState;
+import org.elasticsearch.compute.ann.MvEvaluator;
import java.io.IOException;
import java.io.Writer;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.List;
@@ -27,9 +31,11 @@
import javax.annotation.processing.RoundEnvironment;
import javax.lang.model.SourceVersion;
import javax.lang.model.element.AnnotationMirror;
+import javax.lang.model.element.AnnotationValue;
import javax.lang.model.element.Element;
import javax.lang.model.element.ExecutableElement;
import javax.lang.model.element.TypeElement;
+import javax.lang.model.type.TypeMirror;
import javax.tools.Diagnostic;
import javax.tools.JavaFileObject;
@@ -80,9 +86,10 @@ public boolean process(Set extends TypeElement> set, RoundEnvironment roundEnv
}
for (TypeElement aggClass : annotatedClasses) {
AggregatorImplementer implementer = null;
+ var warnExceptionsTypes = warnExceptions(aggClass);
if (aggClass.getAnnotation(Aggregator.class) != null) {
IntermediateState[] intermediateState = aggClass.getAnnotation(Aggregator.class).value();
- implementer = new AggregatorImplementer(env.getElementUtils(), aggClass, intermediateState);
+ implementer = new AggregatorImplementer(env.getElementUtils(), aggClass, intermediateState, warnExceptionsTypes);
write(aggClass, "aggregator", implementer.sourceFile(), env);
}
GroupingAggregatorImplementer groupingAggregatorImplementer = null;
@@ -104,7 +111,13 @@ public boolean process(Set extends TypeElement> set, RoundEnvironment roundEnv
write(
aggClass,
"aggregator function supplier",
- new AggregatorFunctionSupplierImplementer(env.getElementUtils(), aggClass, implementer, groupingAggregatorImplementer)
+ new AggregatorFunctionSupplierImplementer(
+ env.getElementUtils(),
+ aggClass,
+ implementer,
+ groupingAggregatorImplementer,
+ warnExceptionsTypes.isEmpty() == false
+ )
.sourceFile(),
env
);
@@ -133,4 +146,24 @@ public static void write(Object origination, String what, JavaFile file, Process
throw new RuntimeException(e);
}
}
+
+ private static List warnExceptions(Element aggregatorMethod) {
+ List result = new ArrayList<>();
+ for (var mirror : aggregatorMethod.getAnnotationMirrors()) {
+ String annotationType = mirror.getAnnotationType().toString();
+ if (annotationType.equals(Aggregator.class.getName())
+ || annotationType.equals(GroupingAggregator.class.getName())) {
+
+ for (var e : mirror.getElementValues().entrySet()) {
+ if (false == e.getKey().getSimpleName().toString().equals("warnExceptions")) {
+ continue;
+ }
+ for (var v : (List>) e.getValue().getValue()) {
+ result.add((TypeMirror) ((AnnotationValue) v).getValue());
+ }
+ }
+ }
+ }
+ return result;
+ }
}
diff --git a/x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/Types.java b/x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/Types.java
index 3150741ddcb05..55a1c36895a5e 100644
--- a/x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/Types.java
+++ b/x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/Types.java
@@ -27,6 +27,8 @@ public class Types {
private static final String OPERATOR_PACKAGE = PACKAGE + ".operator";
private static final String DATA_PACKAGE = PACKAGE + ".data";
+ static final TypeName STRING = ClassName.get("java.lang", "String");
+
static final TypeName LIST_INTEGER = ParameterizedTypeName.get(ClassName.get(List.class), TypeName.INT.box());
static final ClassName PAGE = ClassName.get(DATA_PACKAGE, "Page");
@@ -127,6 +129,12 @@ public class Types {
);
static final ClassName WARNINGS = ClassName.get("org.elasticsearch.xpack.esql.expression.function", "Warnings");
+ /**
+ * Warnings class used in compute module.
+ * It uses no external dependencies (Like Warnings and Source).
+ */
+ static final ClassName COMPUTE_WARNINGS = ClassName.get("org.elasticsearch.compute.aggregation", "Warnings");
+
static final ClassName SOURCE = ClassName.get("org.elasticsearch.xpack.esql.core.tree", "Source");
diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/SumLongAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/SumLongAggregatorFunction.java
index 38d1b3de78265..8531da13d4c21 100644
--- a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/SumLongAggregatorFunction.java
+++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/SumLongAggregatorFunction.java
@@ -27,22 +27,25 @@ public final class SumLongAggregatorFunction implements AggregatorFunction {
new IntermediateStateDesc("sum", ElementType.LONG),
new IntermediateStateDesc("seen", ElementType.BOOLEAN) );
+ private final Warnings warnings;
+
private final DriverContext driverContext;
private final LongState state;
private final List channels;
- public SumLongAggregatorFunction(DriverContext driverContext, List channels,
- LongState state) {
+ public SumLongAggregatorFunction(Warnings warnings, DriverContext driverContext,
+ List channels, LongState state) {
this.driverContext = driverContext;
+ this.warnings = warnings;
this.channels = channels;
this.state = state;
}
- public static SumLongAggregatorFunction create(DriverContext driverContext,
+ public static SumLongAggregatorFunction create(Warnings warnings, DriverContext driverContext,
List channels) {
- return new SumLongAggregatorFunction(driverContext, channels, new LongState(SumLongAggregator.init()));
+ return new SumLongAggregatorFunction(warnings, driverContext, channels, new LongState(SumLongAggregator.init()));
}
public static List intermediateStateDesc() {
diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/SumLongAggregatorFunctionSupplier.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/SumLongAggregatorFunctionSupplier.java
index b4d36aa526075..8a979111627fd 100644
--- a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/SumLongAggregatorFunctionSupplier.java
+++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/SumLongAggregatorFunctionSupplier.java
@@ -15,15 +15,26 @@
* This class is generated. Do not edit it.
*/
public final class SumLongAggregatorFunctionSupplier implements AggregatorFunctionSupplier {
+ int warningsLineNumber;
+
+ int warningsColumnNumber;
+
+ String warningsSourceText;
+
private final List channels;
- public SumLongAggregatorFunctionSupplier(List channels) {
+ public SumLongAggregatorFunctionSupplier(int warningsLineNumber, int warningsColumnNumber,
+ String warningsSourceText, List channels) {
+ this.warningsLineNumber = warningsLineNumber;
+ this.warningsColumnNumber = warningsColumnNumber;
+ this.warningsSourceText = warningsSourceText;
this.channels = channels;
}
@Override
public SumLongAggregatorFunction aggregator(DriverContext driverContext) {
- return SumLongAggregatorFunction.create(driverContext, channels);
+ var warnings = Warnings.createWarnings(driverContext.warningsMode(), warningsLineNumber, warningsColumnNumber, warningsSourceText);
+ return SumLongAggregatorFunction.create(warnings, driverContext, channels);
}
@Override
diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/SumLongAggregator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/SumLongAggregator.java
index cd6a94e518be8..01db4b4912e36 100644
--- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/SumLongAggregator.java
+++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/SumLongAggregator.java
@@ -11,7 +11,10 @@
import org.elasticsearch.compute.ann.GroupingAggregator;
import org.elasticsearch.compute.ann.IntermediateState;
-@Aggregator({ @IntermediateState(name = "sum", type = "LONG"), @IntermediateState(name = "seen", type = "BOOLEAN") })
+@Aggregator(
+ value = { @IntermediateState(name = "sum", type = "LONG"), @IntermediateState(name = "seen", type = "BOOLEAN") },
+ warnExceptions = ArithmeticException.class
+)
@GroupingAggregator
class SumLongAggregator {
diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/Warnings.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/Warnings.java
new file mode 100644
index 0000000000000..a78e9dd8d62af
--- /dev/null
+++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/Warnings.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.aggregation;
+
+import org.elasticsearch.compute.operator.DriverContext;
+import static org.elasticsearch.common.logging.LoggerMessageFormat.format;
+
+import static org.elasticsearch.common.logging.HeaderWarning.addWarning;
+
+/**
+ * Utilities to collect warnings for running an executor.
+ */
+public class Warnings {
+ static final int MAX_ADDED_WARNINGS = 20;
+
+ private final String location;
+ private final String first;
+
+ private int addedWarnings;
+
+ public static final Warnings NOOP_WARNINGS = new Warnings(-1, -2, "") {
+ @Override
+ public void registerException(Exception exception) {
+ // this space intentionally left blank
+ }
+ };
+
+ /**
+ * Create a new warnings object based on the given mode
+ * @param warningsMode The warnings collection strategy to use
+ * @param lineNumber The line number of the source text. Same as `source.getLineNumber()`
+ * @param columnNumber The column number of the source text. Same as `source.getColumnNumber()`
+ * @param sourceText The source text that caused the warning. Same as `source.text()`
+ * @return A warnings collector object
+ */
+ public static Warnings createWarnings(DriverContext.WarningsMode warningsMode, int lineNumber, int columnNumber, String sourceText) {
+ switch (warningsMode) {
+ case COLLECT -> {
+ return new Warnings(lineNumber, columnNumber, sourceText);
+ }
+ case IGNORE -> {
+ return NOOP_WARNINGS;
+ }
+ }
+ throw new IllegalStateException("Unreachable");
+ }
+
+ public Warnings(int lineNumber, int columnNumber, String sourceText) {
+ location = format("Line {}:{}: ", lineNumber, columnNumber);
+ first = format(
+ null,
+ "{}evaluation of [{}] failed, treating result as null. Only first {} failures recorded.",
+ location,
+ sourceText,
+ MAX_ADDED_WARNINGS
+ );
+ }
+
+ public void registerException(Exception exception) {
+ if (addedWarnings < MAX_ADDED_WARNINGS) {
+ if (addedWarnings == 0) {
+ addWarning(first);
+ }
+ // location needs to be added to the exception too, since the headers are deduplicated
+ addWarning(location + exception.getClass().getName() + ": " + exception.getMessage());
+ addedWarnings++;
+ }
+ }
+}
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Sum.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Sum.java
index 4f85a15732a6f..edde2f7991a62 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Sum.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Sum.java
@@ -12,6 +12,7 @@
import org.elasticsearch.compute.aggregation.SumDoubleAggregatorFunctionSupplier;
import org.elasticsearch.compute.aggregation.SumIntAggregatorFunctionSupplier;
import org.elasticsearch.compute.aggregation.SumLongAggregatorFunctionSupplier;
+import org.elasticsearch.compute.aggregation.Warnings;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
@@ -83,7 +84,8 @@ public DataType dataType() {
@Override
protected AggregatorFunctionSupplier longSupplier(List inputChannels) {
- return new SumLongAggregatorFunctionSupplier(inputChannels);
+ var location = source().source();
+ return new SumLongAggregatorFunctionSupplier(location.getLineNumber(), location.getColumnNumber(), source().text(), inputChannels);
}
@Override
From 4937082bff152fdb48b4491814926c7b9349ea87 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Iv=C3=A1n=20Cea=20Fontenla?=
Date: Tue, 6 Aug 2024 16:20:36 +0200
Subject: [PATCH 05/31] Completed single aggregator implementation
---
.../compute/gen/AggregatorImplementer.java | 118 ++++++++++++------
.../SumLongAggregatorFunction.java | 37 ++++--
.../SumLongGroupingAggregatorFunction.java | 10 +-
.../aggregation/SumLongAggregator.java | 6 +-
4 files changed, 122 insertions(+), 49 deletions(-)
diff --git a/x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/AggregatorImplementer.java b/x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/AggregatorImplementer.java
index f302737ea79f2..47aab77190288 100644
--- a/x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/AggregatorImplementer.java
+++ b/x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/AggregatorImplementer.java
@@ -21,6 +21,10 @@
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
+import java.util.Objects;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import javax.lang.model.element.ExecutableElement;
import javax.lang.model.element.Modifier;
@@ -79,6 +83,7 @@ public class AggregatorImplementer {
private final ClassName implementation;
private final TypeName stateType;
private final boolean stateTypeHasSeen;
+ private final boolean stateTypeHasFailed;
private final boolean valuesIsBytesRef;
private final List intermediateState;
private final List createParameters;
@@ -94,9 +99,12 @@ public AggregatorImplementer(
this.init = findRequiredMethod(declarationType, new String[] { "init", "initSingle" }, e -> true);
this.stateType = choseStateType();
- stateTypeHasSeen = elements.getAllMembers(elements.getTypeElement(stateType.toString()))
+ this.stateTypeHasSeen = elements.getAllMembers(elements.getTypeElement(stateType.toString()))
.stream()
.anyMatch(e -> e.toString().equals("seen()"));
+ this.stateTypeHasFailed = elements.getAllMembers(elements.getTypeElement(stateType.toString()))
+ .stream()
+ .anyMatch(e -> e.toString().equals("failed()"));
this.combine = findRequiredMethod(declarationType, new String[] { "combine" }, e -> {
if (e.getParameters().size() == 0) {
@@ -135,7 +143,10 @@ private TypeName choseStateType() {
if (false == initReturn.isPrimitive()) {
return initReturn;
}
- return ClassName.get("org.elasticsearch.compute.aggregation", firstUpper(initReturn.toString()) + "State");
+ if (warnExceptions.isEmpty()) {
+ return ClassName.get("org.elasticsearch.compute.aggregation", firstUpper(initReturn.toString()) + "State");
+ }
+ return ClassName.get("org.elasticsearch.compute.aggregation", firstUpper(initReturn.toString()) + "FallibleState");
}
static String valueType(ExecutableElement init, ExecutableElement combine) {
@@ -391,20 +402,28 @@ private MethodSpec addRawBlock() {
}
private void combineRawInput(MethodSpec.Builder builder, String blockVariable) {
+ TypeName returnType = TypeName.get(combine.getReturnType());
+ if (warnExceptions.isEmpty() == false) {
+ builder.beginControlFlow("try");
+ }
if (valuesIsBytesRef) {
combineRawInputForBytesRef(builder, blockVariable);
- return;
- }
- TypeName returnType = TypeName.get(combine.getReturnType());
- if (returnType.isPrimitive()) {
+ } else if (returnType.isPrimitive()) {
combineRawInputForPrimitive(returnType, builder, blockVariable);
- return;
- }
- if (returnType == TypeName.VOID) {
+ } else if (returnType == TypeName.VOID) {
combineRawInputForVoid(builder, blockVariable);
- return;
+ } else {
+ throw new IllegalArgumentException("combine must return void or a primitive");
+ }
+ if (warnExceptions.isEmpty() == false) {
+ String catchPattern = "catch ("
+ + warnExceptions.stream().map(m -> "$T").collect(Collectors.joining(" | "))
+ + " e)";
+ builder.nextControlFlow(catchPattern, warnExceptions.stream().map(TypeName::get).toArray());
+ builder.addStatement("warnings.registerException(e)");
+ builder.addStatement("state.failed(true)");
+ builder.endControlFlow();
}
- throw new IllegalArgumentException("combine must return void or a primitive");
}
private void combineRawInputForPrimitive(TypeName returnType, MethodSpec.Builder builder, String blockVariable) {
@@ -448,15 +467,34 @@ private MethodSpec addIntermediateInput() {
}
builder.addStatement("$T.combineIntermediate(state, " + intermediateStateRowAccess() + ")", declarationType);
} else if (hasPrimitiveState()) {
- assert intermediateState.size() == 2;
- assert intermediateState.get(1).name().equals("seen");
- builder.beginControlFlow("if (seen.getBoolean(0))");
- {
- var state = intermediateState.get(0);
- var s = "state.$L($T.combine(state.$L(), " + state.name() + "." + vectorAccessorName(state.elementType()) + "(0)))";
- builder.addStatement(s, primitiveStateMethod(), declarationType, primitiveStateMethod());
- builder.addStatement("state.seen(true)");
- builder.endControlFlow();
+ if (warnExceptions.isEmpty()) {
+ assert intermediateState.size() == 2;
+ assert intermediateState.get(1).name().equals("seen");
+ builder.beginControlFlow("if (seen.getBoolean(0))");
+ {
+ var state = intermediateState.get(0);
+ var s = "state.$L($T.combine(state.$L(), " + state.name() + "." + vectorAccessorName(state.elementType()) + "(0)))";
+ builder.addStatement(s, primitiveStateMethod(), declarationType, primitiveStateMethod());
+ builder.addStatement("state.seen(true)");
+ builder.endControlFlow();
+ }
+ } else {
+ assert intermediateState.size() == 3;
+ assert intermediateState.get(1).name().equals("seen");
+ assert intermediateState.get(2).name().equals("failed");
+ builder.beginControlFlow("if (failed.getBoolean(0))");
+ {
+ builder.addStatement("state.failed(true)");
+ builder.addStatement("state.seen(true)");
+ }
+ builder.nextControlFlow("else if (seen.getBoolean(0))");
+ {
+ var state = intermediateState.get(0);
+ var s = "state.$L($T.combine(state.$L(), " + state.name() + "." + vectorAccessorName(state.elementType()) + "(0)))";
+ builder.addStatement(s, primitiveStateMethod(), declarationType, primitiveStateMethod());
+ builder.addStatement("state.seen(true)");
+ builder.endControlFlow();
+ }
}
} else {
throw new IllegalArgumentException("Don't know how to combine intermediate input. Define combineIntermediate");
@@ -470,15 +508,15 @@ String intermediateStateRowAccess() {
private String primitiveStateMethod() {
switch (stateType.toString()) {
- case "org.elasticsearch.compute.aggregation.BooleanState":
+ case "org.elasticsearch.compute.aggregation.BooleanState", "org.elasticsearch.compute.aggregation.BooleanFallibleState":
return "booleanValue";
- case "org.elasticsearch.compute.aggregation.IntState":
+ case "org.elasticsearch.compute.aggregation.IntState", "org.elasticsearch.compute.aggregation.IntFallibleState":
return "intValue";
- case "org.elasticsearch.compute.aggregation.LongState":
+ case "org.elasticsearch.compute.aggregation.LongState", "org.elasticsearch.compute.aggregation.LongFallibleState":
return "longValue";
- case "org.elasticsearch.compute.aggregation.DoubleState":
+ case "org.elasticsearch.compute.aggregation.DoubleState", "org.elasticsearch.compute.aggregation.DoubleFallibleState":
return "doubleValue";
- case "org.elasticsearch.compute.aggregation.FloatState":
+ case "org.elasticsearch.compute.aggregation.FloatState", "org.elasticsearch.compute.aggregation.FloatFallibleState":
return "floatValue";
default:
throw new IllegalArgumentException(
@@ -505,8 +543,14 @@ private MethodSpec evaluateFinal() {
.addParameter(BLOCK_ARRAY, "blocks")
.addParameter(TypeName.INT, "offset")
.addParameter(DRIVER_CONTEXT, "driverContext");
- if (stateTypeHasSeen) {
- builder.beginControlFlow("if (state.seen() == false)");
+ if (stateTypeHasSeen || stateTypeHasFailed) {
+ var condition = Stream.of(
+ stateTypeHasSeen ? "state.seen() == false" : null,
+ stateTypeHasFailed ? "state.failed()" : null
+ )
+ .filter(Objects::nonNull)
+ .collect(joining(" || "));
+ builder.beginControlFlow("if ($L)", condition);
builder.addStatement("blocks[offset] = driverContext.blockFactory().newConstantNullBlock(1)", BLOCK);
builder.addStatement("return");
builder.endControlFlow();
@@ -521,19 +565,19 @@ private MethodSpec evaluateFinal() {
private void primitiveStateToResult(MethodSpec.Builder builder) {
switch (stateType.toString()) {
- case "org.elasticsearch.compute.aggregation.BooleanState":
+ case "org.elasticsearch.compute.aggregation.BooleanState", "org.elasticsearch.compute.aggregation.BooleanFallibleState":
builder.addStatement("blocks[offset] = driverContext.blockFactory().newConstantBooleanBlockWith(state.booleanValue(), 1)");
return;
- case "org.elasticsearch.compute.aggregation.IntState":
+ case "org.elasticsearch.compute.aggregation.IntState", "org.elasticsearch.compute.aggregation.IntFallibleState":
builder.addStatement("blocks[offset] = driverContext.blockFactory().newConstantIntBlockWith(state.intValue(), 1)");
return;
- case "org.elasticsearch.compute.aggregation.LongState":
+ case "org.elasticsearch.compute.aggregation.LongState", "org.elasticsearch.compute.aggregation.LongFallibleState":
builder.addStatement("blocks[offset] = driverContext.blockFactory().newConstantLongBlockWith(state.longValue(), 1)");
return;
- case "org.elasticsearch.compute.aggregation.DoubleState":
+ case "org.elasticsearch.compute.aggregation.DoubleState", "org.elasticsearch.compute.aggregation.DoubleFallibleState":
builder.addStatement("blocks[offset] = driverContext.blockFactory().newConstantDoubleBlockWith(state.doubleValue(), 1)");
return;
- case "org.elasticsearch.compute.aggregation.FloatState":
+ case "org.elasticsearch.compute.aggregation.FloatState", "org.elasticsearch.compute.aggregation.FloatFallibleState":
builder.addStatement("blocks[offset] = driverContext.blockFactory().newConstantFloatBlockWith(state.floatValue(), 1)");
return;
default:
@@ -559,13 +603,11 @@ private MethodSpec close() {
return builder.build();
}
+ private static final Pattern PRIMITIVE_STATE_PATTERN = Pattern.compile(
+ "org.elasticsearch.compute.aggregation.(Boolean|Int|Long|Double|Float)(Fallible)?State"
+ );
private boolean hasPrimitiveState() {
- return switch (stateType.toString()) {
- case "org.elasticsearch.compute.aggregation.BooleanState", "org.elasticsearch.compute.aggregation.IntState",
- "org.elasticsearch.compute.aggregation.LongState", "org.elasticsearch.compute.aggregation.DoubleState",
- "org.elasticsearch.compute.aggregation.FloatState" -> true;
- default -> false;
- };
+ return PRIMITIVE_STATE_PATTERN.matcher(stateType.toString()).matches();
}
record IntermediateStateDesc(String name, String elementType, boolean block) {
diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/SumLongAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/SumLongAggregatorFunction.java
index 8531da13d4c21..649e478beb9bd 100644
--- a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/SumLongAggregatorFunction.java
+++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/SumLongAggregatorFunction.java
@@ -4,6 +4,7 @@
// 2.0.
package org.elasticsearch.compute.aggregation;
+import java.lang.ArithmeticException;
import java.lang.Integer;
import java.lang.Override;
import java.lang.String;
@@ -25,18 +26,19 @@
public final class SumLongAggregatorFunction implements AggregatorFunction {
private static final List INTERMEDIATE_STATE_DESC = List.of(
new IntermediateStateDesc("sum", ElementType.LONG),
- new IntermediateStateDesc("seen", ElementType.BOOLEAN) );
+ new IntermediateStateDesc("seen", ElementType.BOOLEAN),
+ new IntermediateStateDesc("failed", ElementType.BOOLEAN) );
private final Warnings warnings;
private final DriverContext driverContext;
- private final LongState state;
+ private final LongFallibleState state;
private final List channels;
public SumLongAggregatorFunction(Warnings warnings, DriverContext driverContext,
- List channels, LongState state) {
+ List channels, LongFallibleState state) {
this.driverContext = driverContext;
this.warnings = warnings;
this.channels = channels;
@@ -45,7 +47,7 @@ public SumLongAggregatorFunction(Warnings warnings, DriverContext driverContext,
public static SumLongAggregatorFunction create(Warnings warnings, DriverContext driverContext,
List channels) {
- return new SumLongAggregatorFunction(warnings, driverContext, channels, new LongState(SumLongAggregator.init()));
+ return new SumLongAggregatorFunction(warnings, driverContext, channels, new LongFallibleState(SumLongAggregator.init()));
}
public static List intermediateStateDesc() {
@@ -71,7 +73,12 @@ public void addRawInput(Page page) {
private void addRawVector(LongVector vector) {
state.seen(true);
for (int i = 0; i < vector.getPositionCount(); i++) {
- state.longValue(SumLongAggregator.combine(state.longValue(), vector.getLong(i)));
+ try {
+ state.longValue(SumLongAggregator.combine(state.longValue(), vector.getLong(i)));
+ } catch (ArithmeticException e) {
+ warnings.registerException(e);
+ state.failed(true);
+ }
}
}
@@ -84,7 +91,12 @@ private void addRawBlock(LongBlock block) {
int start = block.getFirstValueIndex(p);
int end = start + block.getValueCount(p);
for (int i = start; i < end; i++) {
- state.longValue(SumLongAggregator.combine(state.longValue(), block.getLong(i)));
+ try {
+ state.longValue(SumLongAggregator.combine(state.longValue(), block.getLong(i)));
+ } catch (ArithmeticException e) {
+ warnings.registerException(e);
+ state.failed(true);
+ }
}
}
}
@@ -105,7 +117,16 @@ public void addIntermediateInput(Page page) {
}
BooleanVector seen = ((BooleanBlock) seenUncast).asVector();
assert seen.getPositionCount() == 1;
- if (seen.getBoolean(0)) {
+ Block failedUncast = page.getBlock(channels.get(2));
+ if (failedUncast.areAllValuesNull()) {
+ return;
+ }
+ BooleanVector failed = ((BooleanBlock) failedUncast).asVector();
+ assert failed.getPositionCount() == 1;
+ if (failed.getBoolean(0)) {
+ state.failed(true);
+ state.seen(true);
+ } else if (seen.getBoolean(0)) {
state.longValue(SumLongAggregator.combine(state.longValue(), sum.getLong(0)));
state.seen(true);
}
@@ -118,7 +139,7 @@ public void evaluateIntermediate(Block[] blocks, int offset, DriverContext drive
@Override
public void evaluateFinal(Block[] blocks, int offset, DriverContext driverContext) {
- if (state.seen() == false) {
+ if (state.seen() == false || state.failed()) {
blocks[offset] = driverContext.blockFactory().newConstantNullBlock(1);
return;
}
diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/SumLongGroupingAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/SumLongGroupingAggregatorFunction.java
index 507aa343aa74e..774419e96666e 100644
--- a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/SumLongGroupingAggregatorFunction.java
+++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/SumLongGroupingAggregatorFunction.java
@@ -27,7 +27,8 @@
public final class SumLongGroupingAggregatorFunction implements GroupingAggregatorFunction {
private static final List INTERMEDIATE_STATE_DESC = List.of(
new IntermediateStateDesc("sum", ElementType.LONG),
- new IntermediateStateDesc("seen", ElementType.BOOLEAN) );
+ new IntermediateStateDesc("seen", ElementType.BOOLEAN),
+ new IntermediateStateDesc("failed", ElementType.BOOLEAN) );
private final LongArrayState state;
@@ -160,7 +161,12 @@ public void addIntermediateInput(int positionOffset, IntVector groups, Page page
return;
}
BooleanVector seen = ((BooleanBlock) seenUncast).asVector();
- assert sum.getPositionCount() == seen.getPositionCount();
+ Block failedUncast = page.getBlock(channels.get(2));
+ if (failedUncast.areAllValuesNull()) {
+ return;
+ }
+ BooleanVector failed = ((BooleanBlock) failedUncast).asVector();
+ assert sum.getPositionCount() == seen.getPositionCount() && sum.getPositionCount() == failed.getPositionCount();
for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
int groupId = Math.toIntExact(groups.getInt(groupPosition));
if (seen.getBoolean(groupPosition + positionOffset)) {
diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/SumLongAggregator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/SumLongAggregator.java
index 01db4b4912e36..754b916f3b8fe 100644
--- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/SumLongAggregator.java
+++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/SumLongAggregator.java
@@ -12,7 +12,11 @@
import org.elasticsearch.compute.ann.IntermediateState;
@Aggregator(
- value = { @IntermediateState(name = "sum", type = "LONG"), @IntermediateState(name = "seen", type = "BOOLEAN") },
+ value = {
+ @IntermediateState(name = "sum", type = "LONG"),
+ @IntermediateState(name = "seen", type = "BOOLEAN"),
+ @IntermediateState(name = "failed", type = "BOOLEAN")
+ },
warnExceptions = ArithmeticException.class
)
@GroupingAggregator
From 7935330c3fbd17016cfe00948d279d56862a8b39 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Iv=C3=A1n=20Cea=20Fontenla?=
Date: Tue, 6 Aug 2024 16:40:41 +0200
Subject: [PATCH 06/31] Updated SumTests and format
---
...AggregatorFunctionSupplierImplementer.java | 14 +++---
.../compute/gen/AggregatorImplementer.java | 18 +++----
.../compute/gen/AggregatorProcessor.java | 9 +---
.../org/elasticsearch/compute/gen/Types.java | 1 -
.../aggregation/SumLongAggregator.java | 3 +-
.../compute/aggregation/Warnings.java | 2 +-
.../expression/function/aggregate/Sum.java | 1 -
.../expression/function/TestCaseSupplier.java | 22 +++++++++
.../function/aggregate/SumTests.java | 47 ++++++++-----------
9 files changed, 61 insertions(+), 56 deletions(-)
diff --git a/x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/AggregatorFunctionSupplierImplementer.java b/x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/AggregatorFunctionSupplierImplementer.java
index e43a26e89cb48..e09ecf657bf01 100644
--- a/x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/AggregatorFunctionSupplierImplementer.java
+++ b/x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/AggregatorFunctionSupplierImplementer.java
@@ -138,21 +138,19 @@ private MethodSpec aggregator() {
builder.returns(aggregatorImplementer.implementation());
if (hasWarnings) {
- builder.addStatement("var warnings = Warnings.createWarnings(driverContext.warningsMode(), " +
- "warningsLineNumber, warningsColumnNumber, warningsSourceText)");
+ builder.addStatement(
+ "var warnings = Warnings.createWarnings(driverContext.warningsMode(), "
+ + "warningsLineNumber, warningsColumnNumber, warningsSourceText)"
+ );
}
builder.addStatement(
"return $T.create($L)",
aggregatorImplementer.implementation(),
Stream.concat(
- Stream.concat(
- hasWarnings ? Stream.of("warnings") : Stream.of(),
- Stream.of("driverContext, channels")
- ),
+ Stream.concat(hasWarnings ? Stream.of("warnings") : Stream.of(), Stream.of("driverContext, channels")),
aggregatorImplementer.createParameters().stream().map(Parameter::name)
- )
- .collect(Collectors.joining(", "))
+ ).collect(Collectors.joining(", "))
);
return builder.build();
diff --git a/x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/AggregatorImplementer.java b/x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/AggregatorImplementer.java
index 47aab77190288..9119309269646 100644
--- a/x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/AggregatorImplementer.java
+++ b/x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/AggregatorImplementer.java
@@ -262,8 +262,12 @@ private MethodSpec create() {
builder.addParameter(p.type(), p.name());
}
if (createParameters.isEmpty()) {
- builder.addStatement("return new $T($LdriverContext, channels, $L)", implementation,
- warnExceptions.isEmpty() ? "" : "warnings, ", callInit());
+ builder.addStatement(
+ "return new $T($LdriverContext, channels, $L)",
+ implementation,
+ warnExceptions.isEmpty() ? "" : "warnings, ",
+ callInit()
+ );
} else {
builder.addStatement(
"return new $T($LdriverContext, channels, $L, $L)",
@@ -416,9 +420,7 @@ private void combineRawInput(MethodSpec.Builder builder, String blockVariable) {
throw new IllegalArgumentException("combine must return void or a primitive");
}
if (warnExceptions.isEmpty() == false) {
- String catchPattern = "catch ("
- + warnExceptions.stream().map(m -> "$T").collect(Collectors.joining(" | "))
- + " e)";
+ String catchPattern = "catch (" + warnExceptions.stream().map(m -> "$T").collect(Collectors.joining(" | ")) + " e)";
builder.nextControlFlow(catchPattern, warnExceptions.stream().map(TypeName::get).toArray());
builder.addStatement("warnings.registerException(e)");
builder.addStatement("state.failed(true)");
@@ -544,10 +546,7 @@ private MethodSpec evaluateFinal() {
.addParameter(TypeName.INT, "offset")
.addParameter(DRIVER_CONTEXT, "driverContext");
if (stateTypeHasSeen || stateTypeHasFailed) {
- var condition = Stream.of(
- stateTypeHasSeen ? "state.seen() == false" : null,
- stateTypeHasFailed ? "state.failed()" : null
- )
+ var condition = Stream.of(stateTypeHasSeen ? "state.seen() == false" : null, stateTypeHasFailed ? "state.failed()" : null)
.filter(Objects::nonNull)
.collect(joining(" || "));
builder.beginControlFlow("if ($L)", condition);
@@ -606,6 +605,7 @@ private MethodSpec close() {
private static final Pattern PRIMITIVE_STATE_PATTERN = Pattern.compile(
"org.elasticsearch.compute.aggregation.(Boolean|Int|Long|Double|Float)(Fallible)?State"
);
+
private boolean hasPrimitiveState() {
return PRIMITIVE_STATE_PATTERN.matcher(stateType.toString()).matches();
}
diff --git a/x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/AggregatorProcessor.java b/x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/AggregatorProcessor.java
index 0adcd4d8a6cc3..9c21af1d75b20 100644
--- a/x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/AggregatorProcessor.java
+++ b/x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/AggregatorProcessor.java
@@ -10,11 +10,8 @@
import com.squareup.javapoet.JavaFile;
import org.elasticsearch.compute.ann.Aggregator;
-import org.elasticsearch.compute.ann.ConvertEvaluator;
-import org.elasticsearch.compute.ann.Evaluator;
import org.elasticsearch.compute.ann.GroupingAggregator;
import org.elasticsearch.compute.ann.IntermediateState;
-import org.elasticsearch.compute.ann.MvEvaluator;
import java.io.IOException;
import java.io.Writer;
@@ -117,8 +114,7 @@ public boolean process(Set extends TypeElement> set, RoundEnvironment roundEnv
implementer,
groupingAggregatorImplementer,
warnExceptionsTypes.isEmpty() == false
- )
- .sourceFile(),
+ ).sourceFile(),
env
);
}
@@ -151,8 +147,7 @@ private static List warnExceptions(Element aggregatorMethod) {
List result = new ArrayList<>();
for (var mirror : aggregatorMethod.getAnnotationMirrors()) {
String annotationType = mirror.getAnnotationType().toString();
- if (annotationType.equals(Aggregator.class.getName())
- || annotationType.equals(GroupingAggregator.class.getName())) {
+ if (annotationType.equals(Aggregator.class.getName()) || annotationType.equals(GroupingAggregator.class.getName())) {
for (var e : mirror.getElementValues().entrySet()) {
if (false == e.getKey().getSimpleName().toString().equals("warnExceptions")) {
diff --git a/x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/Types.java b/x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/Types.java
index 55a1c36895a5e..dd048460d2d13 100644
--- a/x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/Types.java
+++ b/x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/Types.java
@@ -135,7 +135,6 @@ public class Types {
*/
static final ClassName COMPUTE_WARNINGS = ClassName.get("org.elasticsearch.compute.aggregation", "Warnings");
-
static final ClassName SOURCE = ClassName.get("org.elasticsearch.xpack.esql.core.tree", "Source");
static final ClassName BYTES_REF = ClassName.get("org.apache.lucene.util", "BytesRef");
diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/SumLongAggregator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/SumLongAggregator.java
index 754b916f3b8fe..178e4915022e1 100644
--- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/SumLongAggregator.java
+++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/SumLongAggregator.java
@@ -15,8 +15,7 @@
value = {
@IntermediateState(name = "sum", type = "LONG"),
@IntermediateState(name = "seen", type = "BOOLEAN"),
- @IntermediateState(name = "failed", type = "BOOLEAN")
- },
+ @IntermediateState(name = "failed", type = "BOOLEAN") },
warnExceptions = ArithmeticException.class
)
@GroupingAggregator
diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/Warnings.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/Warnings.java
index a78e9dd8d62af..eb2255a4e349b 100644
--- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/Warnings.java
+++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/Warnings.java
@@ -8,9 +8,9 @@
package org.elasticsearch.compute.aggregation;
import org.elasticsearch.compute.operator.DriverContext;
-import static org.elasticsearch.common.logging.LoggerMessageFormat.format;
import static org.elasticsearch.common.logging.HeaderWarning.addWarning;
+import static org.elasticsearch.common.logging.LoggerMessageFormat.format;
/**
* Utilities to collect warnings for running an executor.
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Sum.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Sum.java
index edde2f7991a62..20c1a77a2301e 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Sum.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Sum.java
@@ -12,7 +12,6 @@
import org.elasticsearch.compute.aggregation.SumDoubleAggregatorFunctionSupplier;
import org.elasticsearch.compute.aggregation.SumIntAggregatorFunctionSupplier;
import org.elasticsearch.compute.aggregation.SumLongAggregatorFunctionSupplier;
-import org.elasticsearch.compute.aggregation.Warnings;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/TestCaseSupplier.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/TestCaseSupplier.java
index 6652cca0c4527..b9754c1c57513 100644
--- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/TestCaseSupplier.java
+++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/TestCaseSupplier.java
@@ -1418,6 +1418,28 @@ public TestCase withWarning(String warning) {
);
}
+ public TestCase withWarnings(List warnings) {
+ String[] newWarnings;
+ if (expectedWarnings != null) {
+ newWarnings = Arrays.copyOf(expectedWarnings, expectedWarnings.length + warnings.size());
+ for (int i = 0; i < warnings.size(); i++) {
+ newWarnings[expectedWarnings.length + i] = warnings.get(i);
+ }
+ } else {
+ newWarnings = warnings.toArray(String[]::new);
+ }
+ return new TestCase(
+ data,
+ evaluatorToString,
+ expectedType,
+ matcher,
+ newWarnings,
+ expectedTypeError,
+ foldingExceptionClass,
+ foldingExceptionMessage
+ );
+ }
+
public TestCase withFoldingException(Class extends Throwable> clazz, String message) {
return new TestCase(data, evaluatorToString, expectedType, matcher, expectedWarnings, expectedTypeError, clazz, message);
}
diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/SumTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/SumTests.java
index 4f14dafc8b30d..88db21e69dc5f 100644
--- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/SumTests.java
+++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/SumTests.java
@@ -10,6 +10,7 @@
import com.carrotsearch.randomizedtesting.annotations.Name;
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
+import org.elasticsearch.search.aggregations.metrics.CompensatedSum;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.core.type.DataType;
@@ -35,13 +36,12 @@ public SumTests(@Name("TestCase") Supplier testCaseSu
public static Iterable
- * Can't be removed, as the new aggregator's layout is different.
+ * Should be kept for as long as we need compatibility with the version this was added on, as the new aggregator's layout is different.
*