From 02ff3894e22ebae3631cfc39e541ffa8238404da Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 2 May 2025 10:19:24 -0700 Subject: [PATCH 1/2] Add first_over_time --- x-pack/plugin/esql/compute/build.gradle | 55 +++-- .../FirstOverTimeDoubleAggregator.java | 149 ++++++++++++ .../FirstOverTimeFloatAggregator.java | 149 ++++++++++++ .../FirstOverTimeIntAggregator.java | 149 ++++++++++++ .../FirstOverTimeLongAggregator.java | 147 +++++++++++ .../LastOverTimeDoubleAggregator.java | 4 +- .../LastOverTimeFloatAggregator.java | 4 +- .../LastOverTimeIntAggregator.java | 4 +- .../LastOverTimeLongAggregator.java | 4 +- ...rTimeDoubleAggregatorFunctionSupplier.java | 46 ++++ ...rTimeDoubleGroupingAggregatorFunction.java | 228 ++++++++++++++++++ ...erTimeFloatAggregatorFunctionSupplier.java | 46 ++++ ...erTimeFloatGroupingAggregatorFunction.java | 228 ++++++++++++++++++ ...OverTimeIntAggregatorFunctionSupplier.java | 46 ++++ ...OverTimeIntGroupingAggregatorFunction.java | 226 +++++++++++++++++ ...verTimeLongAggregatorFunctionSupplier.java | 46 ++++ ...verTimeLongGroupingAggregatorFunction.java | 226 +++++++++++++++++ ...a.st => X-ValueOverTimeAggregator.java.st} | 8 +- .../main/resources/k8s-timeseries.csv-spec | 18 ++ .../xpack/esql/action/EsqlCapabilities.java | 9 +- .../function/EsqlFunctionRegistry.java | 2 + .../aggregate/AggregateWritables.java | 1 + .../function/aggregate/FirstOverTime.java | 141 +++++++++++ .../rest-api-spec/test/esql/60_usage.yml | 4 +- 24 files changed, 1903 insertions(+), 37 deletions(-) create mode 100644 x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstOverTimeDoubleAggregator.java create mode 100644 x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstOverTimeFloatAggregator.java create mode 100644 x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstOverTimeIntAggregator.java create mode 100644 x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstOverTimeLongAggregator.java create mode 100644 x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstOverTimeDoubleAggregatorFunctionSupplier.java create mode 100644 x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstOverTimeDoubleGroupingAggregatorFunction.java create mode 100644 x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstOverTimeFloatAggregatorFunctionSupplier.java create mode 100644 x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstOverTimeFloatGroupingAggregatorFunction.java create mode 100644 x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstOverTimeIntAggregatorFunctionSupplier.java create mode 100644 x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstOverTimeIntGroupingAggregatorFunction.java create mode 100644 x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstOverTimeLongAggregatorFunctionSupplier.java create mode 100644 x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstOverTimeLongGroupingAggregatorFunction.java rename x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/{X-LastOverTimeAggregator.java.st => X-ValueOverTimeAggregator.java.st} (95%) create mode 100644 x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/FirstOverTime.java diff --git a/x-pack/plugin/esql/compute/build.gradle b/x-pack/plugin/esql/compute/build.gradle index 2d8a2e445ca7e..3b99fb59776c6 100644 --- a/x-pack/plugin/esql/compute/build.gradle +++ b/x-pack/plugin/esql/compute/build.gradle @@ -79,6 +79,14 @@ def prop(Name, Type, type, Wrapper, TYPE, BYTES, Array, Hash) { ] } +def addOccurrence(props, Occurrence) { + def newProps = props.collectEntries { [(it.key): it.value] } + newProps["Occurrence"] = Occurrence + newProps["First"] = Occurrence == "First" ? "true" : "" + newProps["Last"] = Occurrence == "Last" ? "true" : "" + return newProps +} + tasks.named('stringTemplates').configure { var intProperties = prop("Int", "Int", "int", "Integer", "INT", "Integer.BYTES", "IntArray", "LongHash") var floatProperties = prop("Float", "Float", "float", "Float", "FLOAT", "Float.BYTES", "FloatArray", "LongHash") @@ -862,27 +870,30 @@ tasks.named('stringTemplates').configure { it.outputFile = "org/elasticsearch/xpack/compute/operator/lookup/EnrichResultBuilderForBoolean.java" } - // TODO: add last_over_time for other types: boolean, bytes_refs - File lastOverTimeAggregatorInputFile = file("src/main/java/org/elasticsearch/compute/aggregation/X-LastOverTimeAggregator.java.st") - template { - it.properties = intProperties - it.inputFile = lastOverTimeAggregatorInputFile - it.outputFile = "org/elasticsearch/compute/aggregation/LastOverTimeIntAggregator.java" - } - template { - it.properties = longProperties - it.inputFile = lastOverTimeAggregatorInputFile - it.outputFile = "org/elasticsearch/compute/aggregation/LastOverTimeLongAggregator.java" + // TODO: add {value}_over_time for other types: boolean, bytes_refs + File valueOverTimeAggregatorInputFile = file("src/main/java/org/elasticsearch/compute/aggregation/X-ValueOverTimeAggregator.java.st") + ["First", "Last"].forEach { Occurrence -> + { + template { + it.properties = addOccurrence(intProperties, Occurrence) + it.inputFile = valueOverTimeAggregatorInputFile + it.outputFile = "org/elasticsearch/compute/aggregation/${Occurrence}OverTimeIntAggregator.java" + } + template { + it.properties = addOccurrence(longProperties, Occurrence) + it.inputFile = valueOverTimeAggregatorInputFile + it.outputFile = "org/elasticsearch/compute/aggregation/${Occurrence}OverTimeLongAggregator.java" + } + template { + it.properties = addOccurrence(floatProperties, Occurrence) + it.inputFile = valueOverTimeAggregatorInputFile + it.outputFile = "org/elasticsearch/compute/aggregation/${Occurrence}OverTimeFloatAggregator.java" + } + template { + it.properties = addOccurrence(doubleProperties, Occurrence) + it.inputFile = valueOverTimeAggregatorInputFile + it.outputFile = "org/elasticsearch/compute/aggregation/${Occurrence}OverTimeDoubleAggregator.java" + } + } } - template { - it.properties = floatProperties - it.inputFile = lastOverTimeAggregatorInputFile - it.outputFile = "org/elasticsearch/compute/aggregation/LastOverTimeFloatAggregator.java" - } - template { - it.properties = doubleProperties - it.inputFile = lastOverTimeAggregatorInputFile - it.outputFile = "org/elasticsearch/compute/aggregation/LastOverTimeDoubleAggregator.java" - } - } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstOverTimeDoubleAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstOverTimeDoubleAggregator.java new file mode 100644 index 0000000000000..4785566c077e2 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstOverTimeDoubleAggregator.java @@ -0,0 +1,149 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.aggregation; + +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.DoubleArray; +import org.elasticsearch.common.util.LongArray; +import org.elasticsearch.compute.ann.GroupingAggregator; +import org.elasticsearch.compute.ann.IntermediateState; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.DoubleBlock; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.core.Releasables; + +/** + * A time-series aggregation function that collects the First occurrence value of a time series in a specified interval. + * This class is generated. Edit `X-ValueOverTimeAggregator.java.st` instead. + */ +@GroupingAggregator( + timeseries = true, + value = { @IntermediateState(name = "timestamps", type = "LONG_BLOCK"), @IntermediateState(name = "values", type = "DOUBLE_BLOCK") } +) +public class FirstOverTimeDoubleAggregator { + + public static GroupingState initGrouping(DriverContext driverContext) { + return new GroupingState(driverContext.bigArrays()); + } + + // TODO: Since data in data_streams is sorted by `_tsid` and timestamp in descending order, + // we can read the first encountered value for each group of `_tsid` and time bucket. + public static void combine(GroupingState current, int groupId, long timestamp, double value) { + current.collectValue(groupId, timestamp, value); + } + + public static void combineIntermediate( + GroupingState current, + int groupId, + LongBlock timestamps, // stylecheck + DoubleBlock values, + int otherPosition + ) { + int valueCount = values.getValueCount(otherPosition); + if (valueCount > 0) { + long timestamp = timestamps.getLong(timestamps.getFirstValueIndex(otherPosition)); + int firstIndex = values.getFirstValueIndex(otherPosition); + for (int i = 0; i < valueCount; i++) { + current.collectValue(groupId, timestamp, values.getDouble(firstIndex + i)); + } + } + } + + public static void combineStates(GroupingState current, int currentGroupId, GroupingState otherState, int otherGroupId) { + if (otherGroupId < otherState.timestamps.size() && otherState.hasValue(otherGroupId)) { + var timestamp = otherState.timestamps.get(otherGroupId); + var value = otherState.values.get(otherGroupId); + current.collectValue(currentGroupId, timestamp, value); + } + } + + public static Block evaluateFinal(GroupingState state, IntVector selected, GroupingAggregatorEvaluationContext evalContext) { + return state.evaluateFinal(selected, evalContext); + } + + public static final class GroupingState extends AbstractArrayState { + private final BigArrays bigArrays; + private LongArray timestamps; + private DoubleArray values; + + GroupingState(BigArrays bigArrays) { + super(bigArrays); + this.bigArrays = bigArrays; + boolean success = false; + LongArray timestamps = null; + try { + timestamps = bigArrays.newLongArray(1, false); + this.timestamps = timestamps; + this.values = bigArrays.newDoubleArray(1, false); + success = true; + } finally { + if (success == false) { + Releasables.close(timestamps, values, super::close); + } + } + } + + void collectValue(int groupId, long timestamp, double value) { + if (groupId < timestamps.size()) { + // TODO: handle multiple values? + if (hasValue(groupId) == false || timestamps.get(groupId) > timestamp) { + timestamps.set(groupId, timestamp); + values.set(groupId, value); + } + } else { + timestamps = bigArrays.grow(timestamps, groupId + 1); + values = bigArrays.grow(values, groupId + 1); + timestamps.set(groupId, timestamp); + values.set(groupId, value); + } + trackGroupId(groupId); + } + + @Override + public void close() { + Releasables.close(timestamps, values, super::close); + } + + @Override + public void toIntermediate(Block[] blocks, int offset, IntVector selected, DriverContext driverContext) { + try ( + var timestampsBuilder = driverContext.blockFactory().newLongBlockBuilder(selected.getPositionCount()); + var valuesBuilder = driverContext.blockFactory().newDoubleBlockBuilder(selected.getPositionCount()) + ) { + for (int p = 0; p < selected.getPositionCount(); p++) { + int group = selected.getInt(p); + if (group < timestamps.size() && hasValue(group)) { + timestampsBuilder.appendLong(timestamps.get(group)); + valuesBuilder.appendDouble(values.get(group)); + } else { + timestampsBuilder.appendNull(); + valuesBuilder.appendNull(); + } + } + blocks[offset] = timestampsBuilder.build(); + blocks[offset + 1] = valuesBuilder.build(); + } + } + + Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext evalContext) { + try (var builder = evalContext.blockFactory().newDoubleBlockBuilder(selected.getPositionCount())) { + for (int p = 0; p < selected.getPositionCount(); p++) { + int group = selected.getInt(p); + if (group < timestamps.size() && hasValue(group)) { + builder.appendDouble(values.get(group)); + } else { + builder.appendNull(); + } + } + return builder.build(); + } + } + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstOverTimeFloatAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstOverTimeFloatAggregator.java new file mode 100644 index 0000000000000..2f82410d6714b --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstOverTimeFloatAggregator.java @@ -0,0 +1,149 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.aggregation; + +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.FloatArray; +import org.elasticsearch.common.util.LongArray; +import org.elasticsearch.compute.ann.GroupingAggregator; +import org.elasticsearch.compute.ann.IntermediateState; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.FloatBlock; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.core.Releasables; + +/** + * A time-series aggregation function that collects the First occurrence value of a time series in a specified interval. + * This class is generated. Edit `X-ValueOverTimeAggregator.java.st` instead. + */ +@GroupingAggregator( + timeseries = true, + value = { @IntermediateState(name = "timestamps", type = "LONG_BLOCK"), @IntermediateState(name = "values", type = "FLOAT_BLOCK") } +) +public class FirstOverTimeFloatAggregator { + + public static GroupingState initGrouping(DriverContext driverContext) { + return new GroupingState(driverContext.bigArrays()); + } + + // TODO: Since data in data_streams is sorted by `_tsid` and timestamp in descending order, + // we can read the first encountered value for each group of `_tsid` and time bucket. + public static void combine(GroupingState current, int groupId, long timestamp, float value) { + current.collectValue(groupId, timestamp, value); + } + + public static void combineIntermediate( + GroupingState current, + int groupId, + LongBlock timestamps, // stylecheck + FloatBlock values, + int otherPosition + ) { + int valueCount = values.getValueCount(otherPosition); + if (valueCount > 0) { + long timestamp = timestamps.getLong(timestamps.getFirstValueIndex(otherPosition)); + int firstIndex = values.getFirstValueIndex(otherPosition); + for (int i = 0; i < valueCount; i++) { + current.collectValue(groupId, timestamp, values.getFloat(firstIndex + i)); + } + } + } + + public static void combineStates(GroupingState current, int currentGroupId, GroupingState otherState, int otherGroupId) { + if (otherGroupId < otherState.timestamps.size() && otherState.hasValue(otherGroupId)) { + var timestamp = otherState.timestamps.get(otherGroupId); + var value = otherState.values.get(otherGroupId); + current.collectValue(currentGroupId, timestamp, value); + } + } + + public static Block evaluateFinal(GroupingState state, IntVector selected, GroupingAggregatorEvaluationContext evalContext) { + return state.evaluateFinal(selected, evalContext); + } + + public static final class GroupingState extends AbstractArrayState { + private final BigArrays bigArrays; + private LongArray timestamps; + private FloatArray values; + + GroupingState(BigArrays bigArrays) { + super(bigArrays); + this.bigArrays = bigArrays; + boolean success = false; + LongArray timestamps = null; + try { + timestamps = bigArrays.newLongArray(1, false); + this.timestamps = timestamps; + this.values = bigArrays.newFloatArray(1, false); + success = true; + } finally { + if (success == false) { + Releasables.close(timestamps, values, super::close); + } + } + } + + void collectValue(int groupId, long timestamp, float value) { + if (groupId < timestamps.size()) { + // TODO: handle multiple values? + if (hasValue(groupId) == false || timestamps.get(groupId) > timestamp) { + timestamps.set(groupId, timestamp); + values.set(groupId, value); + } + } else { + timestamps = bigArrays.grow(timestamps, groupId + 1); + values = bigArrays.grow(values, groupId + 1); + timestamps.set(groupId, timestamp); + values.set(groupId, value); + } + trackGroupId(groupId); + } + + @Override + public void close() { + Releasables.close(timestamps, values, super::close); + } + + @Override + public void toIntermediate(Block[] blocks, int offset, IntVector selected, DriverContext driverContext) { + try ( + var timestampsBuilder = driverContext.blockFactory().newLongBlockBuilder(selected.getPositionCount()); + var valuesBuilder = driverContext.blockFactory().newFloatBlockBuilder(selected.getPositionCount()) + ) { + for (int p = 0; p < selected.getPositionCount(); p++) { + int group = selected.getInt(p); + if (group < timestamps.size() && hasValue(group)) { + timestampsBuilder.appendLong(timestamps.get(group)); + valuesBuilder.appendFloat(values.get(group)); + } else { + timestampsBuilder.appendNull(); + valuesBuilder.appendNull(); + } + } + blocks[offset] = timestampsBuilder.build(); + blocks[offset + 1] = valuesBuilder.build(); + } + } + + Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext evalContext) { + try (var builder = evalContext.blockFactory().newFloatBlockBuilder(selected.getPositionCount())) { + for (int p = 0; p < selected.getPositionCount(); p++) { + int group = selected.getInt(p); + if (group < timestamps.size() && hasValue(group)) { + builder.appendFloat(values.get(group)); + } else { + builder.appendNull(); + } + } + return builder.build(); + } + } + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstOverTimeIntAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstOverTimeIntAggregator.java new file mode 100644 index 0000000000000..d99b3d759656e --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstOverTimeIntAggregator.java @@ -0,0 +1,149 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.aggregation; + +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.IntArray; +import org.elasticsearch.common.util.LongArray; +import org.elasticsearch.compute.ann.GroupingAggregator; +import org.elasticsearch.compute.ann.IntermediateState; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.IntBlock; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.core.Releasables; + +/** + * A time-series aggregation function that collects the First occurrence value of a time series in a specified interval. + * This class is generated. Edit `X-ValueOverTimeAggregator.java.st` instead. + */ +@GroupingAggregator( + timeseries = true, + value = { @IntermediateState(name = "timestamps", type = "LONG_BLOCK"), @IntermediateState(name = "values", type = "INT_BLOCK") } +) +public class FirstOverTimeIntAggregator { + + public static GroupingState initGrouping(DriverContext driverContext) { + return new GroupingState(driverContext.bigArrays()); + } + + // TODO: Since data in data_streams is sorted by `_tsid` and timestamp in descending order, + // we can read the first encountered value for each group of `_tsid` and time bucket. + public static void combine(GroupingState current, int groupId, long timestamp, int value) { + current.collectValue(groupId, timestamp, value); + } + + public static void combineIntermediate( + GroupingState current, + int groupId, + LongBlock timestamps, // stylecheck + IntBlock values, + int otherPosition + ) { + int valueCount = values.getValueCount(otherPosition); + if (valueCount > 0) { + long timestamp = timestamps.getLong(timestamps.getFirstValueIndex(otherPosition)); + int firstIndex = values.getFirstValueIndex(otherPosition); + for (int i = 0; i < valueCount; i++) { + current.collectValue(groupId, timestamp, values.getInt(firstIndex + i)); + } + } + } + + public static void combineStates(GroupingState current, int currentGroupId, GroupingState otherState, int otherGroupId) { + if (otherGroupId < otherState.timestamps.size() && otherState.hasValue(otherGroupId)) { + var timestamp = otherState.timestamps.get(otherGroupId); + var value = otherState.values.get(otherGroupId); + current.collectValue(currentGroupId, timestamp, value); + } + } + + public static Block evaluateFinal(GroupingState state, IntVector selected, GroupingAggregatorEvaluationContext evalContext) { + return state.evaluateFinal(selected, evalContext); + } + + public static final class GroupingState extends AbstractArrayState { + private final BigArrays bigArrays; + private LongArray timestamps; + private IntArray values; + + GroupingState(BigArrays bigArrays) { + super(bigArrays); + this.bigArrays = bigArrays; + boolean success = false; + LongArray timestamps = null; + try { + timestamps = bigArrays.newLongArray(1, false); + this.timestamps = timestamps; + this.values = bigArrays.newIntArray(1, false); + success = true; + } finally { + if (success == false) { + Releasables.close(timestamps, values, super::close); + } + } + } + + void collectValue(int groupId, long timestamp, int value) { + if (groupId < timestamps.size()) { + // TODO: handle multiple values? + if (hasValue(groupId) == false || timestamps.get(groupId) > timestamp) { + timestamps.set(groupId, timestamp); + values.set(groupId, value); + } + } else { + timestamps = bigArrays.grow(timestamps, groupId + 1); + values = bigArrays.grow(values, groupId + 1); + timestamps.set(groupId, timestamp); + values.set(groupId, value); + } + trackGroupId(groupId); + } + + @Override + public void close() { + Releasables.close(timestamps, values, super::close); + } + + @Override + public void toIntermediate(Block[] blocks, int offset, IntVector selected, DriverContext driverContext) { + try ( + var timestampsBuilder = driverContext.blockFactory().newLongBlockBuilder(selected.getPositionCount()); + var valuesBuilder = driverContext.blockFactory().newIntBlockBuilder(selected.getPositionCount()) + ) { + for (int p = 0; p < selected.getPositionCount(); p++) { + int group = selected.getInt(p); + if (group < timestamps.size() && hasValue(group)) { + timestampsBuilder.appendLong(timestamps.get(group)); + valuesBuilder.appendInt(values.get(group)); + } else { + timestampsBuilder.appendNull(); + valuesBuilder.appendNull(); + } + } + blocks[offset] = timestampsBuilder.build(); + blocks[offset + 1] = valuesBuilder.build(); + } + } + + Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext evalContext) { + try (var builder = evalContext.blockFactory().newIntBlockBuilder(selected.getPositionCount())) { + for (int p = 0; p < selected.getPositionCount(); p++) { + int group = selected.getInt(p); + if (group < timestamps.size() && hasValue(group)) { + builder.appendInt(values.get(group)); + } else { + builder.appendNull(); + } + } + return builder.build(); + } + } + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstOverTimeLongAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstOverTimeLongAggregator.java new file mode 100644 index 0000000000000..6109370f90f54 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstOverTimeLongAggregator.java @@ -0,0 +1,147 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.aggregation; + +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.LongArray; +import org.elasticsearch.compute.ann.GroupingAggregator; +import org.elasticsearch.compute.ann.IntermediateState; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.core.Releasables; + +/** + * A time-series aggregation function that collects the First occurrence value of a time series in a specified interval. + * This class is generated. Edit `X-ValueOverTimeAggregator.java.st` instead. + */ +@GroupingAggregator( + timeseries = true, + value = { @IntermediateState(name = "timestamps", type = "LONG_BLOCK"), @IntermediateState(name = "values", type = "LONG_BLOCK") } +) +public class FirstOverTimeLongAggregator { + + public static GroupingState initGrouping(DriverContext driverContext) { + return new GroupingState(driverContext.bigArrays()); + } + + // TODO: Since data in data_streams is sorted by `_tsid` and timestamp in descending order, + // we can read the first encountered value for each group of `_tsid` and time bucket. + public static void combine(GroupingState current, int groupId, long timestamp, long value) { + current.collectValue(groupId, timestamp, value); + } + + public static void combineIntermediate( + GroupingState current, + int groupId, + LongBlock timestamps, // stylecheck + LongBlock values, + int otherPosition + ) { + int valueCount = values.getValueCount(otherPosition); + if (valueCount > 0) { + long timestamp = timestamps.getLong(timestamps.getFirstValueIndex(otherPosition)); + int firstIndex = values.getFirstValueIndex(otherPosition); + for (int i = 0; i < valueCount; i++) { + current.collectValue(groupId, timestamp, values.getLong(firstIndex + i)); + } + } + } + + public static void combineStates(GroupingState current, int currentGroupId, GroupingState otherState, int otherGroupId) { + if (otherGroupId < otherState.timestamps.size() && otherState.hasValue(otherGroupId)) { + var timestamp = otherState.timestamps.get(otherGroupId); + var value = otherState.values.get(otherGroupId); + current.collectValue(currentGroupId, timestamp, value); + } + } + + public static Block evaluateFinal(GroupingState state, IntVector selected, GroupingAggregatorEvaluationContext evalContext) { + return state.evaluateFinal(selected, evalContext); + } + + public static final class GroupingState extends AbstractArrayState { + private final BigArrays bigArrays; + private LongArray timestamps; + private LongArray values; + + GroupingState(BigArrays bigArrays) { + super(bigArrays); + this.bigArrays = bigArrays; + boolean success = false; + LongArray timestamps = null; + try { + timestamps = bigArrays.newLongArray(1, false); + this.timestamps = timestamps; + this.values = bigArrays.newLongArray(1, false); + success = true; + } finally { + if (success == false) { + Releasables.close(timestamps, values, super::close); + } + } + } + + void collectValue(int groupId, long timestamp, long value) { + if (groupId < timestamps.size()) { + // TODO: handle multiple values? + if (hasValue(groupId) == false || timestamps.get(groupId) > timestamp) { + timestamps.set(groupId, timestamp); + values.set(groupId, value); + } + } else { + timestamps = bigArrays.grow(timestamps, groupId + 1); + values = bigArrays.grow(values, groupId + 1); + timestamps.set(groupId, timestamp); + values.set(groupId, value); + } + trackGroupId(groupId); + } + + @Override + public void close() { + Releasables.close(timestamps, values, super::close); + } + + @Override + public void toIntermediate(Block[] blocks, int offset, IntVector selected, DriverContext driverContext) { + try ( + var timestampsBuilder = driverContext.blockFactory().newLongBlockBuilder(selected.getPositionCount()); + var valuesBuilder = driverContext.blockFactory().newLongBlockBuilder(selected.getPositionCount()) + ) { + for (int p = 0; p < selected.getPositionCount(); p++) { + int group = selected.getInt(p); + if (group < timestamps.size() && hasValue(group)) { + timestampsBuilder.appendLong(timestamps.get(group)); + valuesBuilder.appendLong(values.get(group)); + } else { + timestampsBuilder.appendNull(); + valuesBuilder.appendNull(); + } + } + blocks[offset] = timestampsBuilder.build(); + blocks[offset + 1] = valuesBuilder.build(); + } + } + + Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext evalContext) { + try (var builder = evalContext.blockFactory().newLongBlockBuilder(selected.getPositionCount())) { + for (int p = 0; p < selected.getPositionCount(); p++) { + int group = selected.getInt(p); + if (group < timestamps.size() && hasValue(group)) { + builder.appendLong(values.get(group)); + } else { + builder.appendNull(); + } + } + return builder.build(); + } + } + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeDoubleAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeDoubleAggregator.java index 9231d345b7c84..e3410317f13c1 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeDoubleAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeDoubleAggregator.java @@ -20,8 +20,8 @@ import org.elasticsearch.core.Releasables; /** - * A time-series aggregation function that collects the most recent value of a time series in a specified interval. - * This class is generated. Edit `X-LastOverTimeAggregator.java.st` instead. + * A time-series aggregation function that collects the Last occurrence value of a time series in a specified interval. + * This class is generated. Edit `X-ValueOverTimeAggregator.java.st` instead. */ @GroupingAggregator( timeseries = true, diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeFloatAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeFloatAggregator.java index f6d47c9b98ed6..14178798cef87 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeFloatAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeFloatAggregator.java @@ -20,8 +20,8 @@ import org.elasticsearch.core.Releasables; /** - * A time-series aggregation function that collects the most recent value of a time series in a specified interval. - * This class is generated. Edit `X-LastOverTimeAggregator.java.st` instead. + * A time-series aggregation function that collects the Last occurrence value of a time series in a specified interval. + * This class is generated. Edit `X-ValueOverTimeAggregator.java.st` instead. */ @GroupingAggregator( timeseries = true, diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeIntAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeIntAggregator.java index 8764a86d03a20..f356941c376a4 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeIntAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeIntAggregator.java @@ -20,8 +20,8 @@ import org.elasticsearch.core.Releasables; /** - * A time-series aggregation function that collects the most recent value of a time series in a specified interval. - * This class is generated. Edit `X-LastOverTimeAggregator.java.st` instead. + * A time-series aggregation function that collects the Last occurrence value of a time series in a specified interval. + * This class is generated. Edit `X-ValueOverTimeAggregator.java.st` instead. */ @GroupingAggregator( timeseries = true, diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeLongAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeLongAggregator.java index 94787db738bf2..1c3e00d98607a 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeLongAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeLongAggregator.java @@ -18,8 +18,8 @@ import org.elasticsearch.core.Releasables; /** - * A time-series aggregation function that collects the most recent value of a time series in a specified interval. - * This class is generated. Edit `X-LastOverTimeAggregator.java.st` instead. + * A time-series aggregation function that collects the Last occurrence value of a time series in a specified interval. + * This class is generated. Edit `X-ValueOverTimeAggregator.java.st` instead. */ @GroupingAggregator( timeseries = true, diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstOverTimeDoubleAggregatorFunctionSupplier.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstOverTimeDoubleAggregatorFunctionSupplier.java new file mode 100644 index 0000000000000..3a144f0789efb --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstOverTimeDoubleAggregatorFunctionSupplier.java @@ -0,0 +1,46 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.util.List; +import org.elasticsearch.compute.operator.DriverContext; + +/** + * {@link AggregatorFunctionSupplier} implementation for {@link FirstOverTimeDoubleAggregator}. + * This class is generated. Edit {@code AggregatorFunctionSupplierImplementer} instead. + */ +public final class FirstOverTimeDoubleAggregatorFunctionSupplier implements AggregatorFunctionSupplier { + public FirstOverTimeDoubleAggregatorFunctionSupplier() { + } + + @Override + public List nonGroupingIntermediateStateDesc() { + throw new UnsupportedOperationException("non-grouping aggregator is not supported"); + } + + @Override + public List groupingIntermediateStateDesc() { + return FirstOverTimeDoubleGroupingAggregatorFunction.intermediateStateDesc(); + } + + @Override + public AggregatorFunction aggregator(DriverContext driverContext, List channels) { + throw new UnsupportedOperationException("non-grouping aggregator is not supported"); + } + + @Override + public FirstOverTimeDoubleGroupingAggregatorFunction groupingAggregator( + DriverContext driverContext, List channels) { + return FirstOverTimeDoubleGroupingAggregatorFunction.create(channels, driverContext); + } + + @Override + public String describe() { + return "first_over_time of doubles"; + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstOverTimeDoubleGroupingAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstOverTimeDoubleGroupingAggregatorFunction.java new file mode 100644 index 0000000000000..b9ee302f45b24 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstOverTimeDoubleGroupingAggregatorFunction.java @@ -0,0 +1,228 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.lang.StringBuilder; +import java.util.List; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.DoubleBlock; +import org.elasticsearch.compute.data.DoubleVector; +import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.data.IntBlock; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.data.LongVector; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.DriverContext; + +/** + * {@link GroupingAggregatorFunction} implementation for {@link FirstOverTimeDoubleAggregator}. + * This class is generated. Edit {@code GroupingAggregatorImplementer} instead. + */ +public final class FirstOverTimeDoubleGroupingAggregatorFunction implements GroupingAggregatorFunction { + private static final List INTERMEDIATE_STATE_DESC = List.of( + new IntermediateStateDesc("timestamps", ElementType.LONG), + new IntermediateStateDesc("values", ElementType.DOUBLE) ); + + private final FirstOverTimeDoubleAggregator.GroupingState state; + + private final List channels; + + private final DriverContext driverContext; + + public FirstOverTimeDoubleGroupingAggregatorFunction(List channels, + FirstOverTimeDoubleAggregator.GroupingState state, DriverContext driverContext) { + this.channels = channels; + this.state = state; + this.driverContext = driverContext; + } + + public static FirstOverTimeDoubleGroupingAggregatorFunction create(List channels, + DriverContext driverContext) { + return new FirstOverTimeDoubleGroupingAggregatorFunction(channels, FirstOverTimeDoubleAggregator.initGrouping(driverContext), driverContext); + } + + public static List intermediateStateDesc() { + return INTERMEDIATE_STATE_DESC; + } + + @Override + public int intermediateBlockCount() { + return INTERMEDIATE_STATE_DESC.size(); + } + + @Override + public GroupingAggregatorFunction.AddInput prepareProcessPage(SeenGroupIds seenGroupIds, + Page page) { + DoubleBlock valuesBlock = page.getBlock(channels.get(0)); + DoubleVector valuesVector = valuesBlock.asVector(); + LongBlock timestampsBlock = page.getBlock(channels.get(1)); + LongVector timestampsVector = timestampsBlock.asVector(); + if (timestampsVector == null) { + throw new IllegalStateException("expected @timestamp vector; but got a block"); + } + if (valuesVector == null) { + if (valuesBlock.mayHaveNulls()) { + state.enableGroupIdTracking(seenGroupIds); + } + return new GroupingAggregatorFunction.AddInput() { + @Override + public void add(int positionOffset, IntBlock groupIds) { + addRawInput(positionOffset, groupIds, valuesBlock, timestampsVector); + } + + @Override + public void add(int positionOffset, IntVector groupIds) { + addRawInput(positionOffset, groupIds, valuesBlock, timestampsVector); + } + + @Override + public void close() { + } + }; + } + return new GroupingAggregatorFunction.AddInput() { + @Override + public void add(int positionOffset, IntBlock groupIds) { + addRawInput(positionOffset, groupIds, valuesVector, timestampsVector); + } + + @Override + public void add(int positionOffset, IntVector groupIds) { + addRawInput(positionOffset, groupIds, valuesVector, timestampsVector); + } + + @Override + public void close() { + } + }; + } + + private void addRawInput(int positionOffset, IntVector groups, DoubleBlock values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + if (values.isNull(groupPosition + positionOffset)) { + continue; + } + int valuesStart = values.getFirstValueIndex(groupPosition + positionOffset); + int valuesEnd = valuesStart + values.getValueCount(groupPosition + positionOffset); + for (int v = valuesStart; v < valuesEnd; v++) { + FirstOverTimeDoubleAggregator.combine(state, groupId, timestamps.getLong(v), values.getDouble(v)); + } + } + } + + private void addRawInput(int positionOffset, IntVector groups, DoubleVector values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + var valuePosition = groupPosition + positionOffset; + FirstOverTimeDoubleAggregator.combine(state, groupId, timestamps.getLong(valuePosition), values.getDouble(valuePosition)); + } + } + + private void addRawInput(int positionOffset, IntBlock groups, DoubleBlock values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + if (values.isNull(groupPosition + positionOffset)) { + continue; + } + int valuesStart = values.getFirstValueIndex(groupPosition + positionOffset); + int valuesEnd = valuesStart + values.getValueCount(groupPosition + positionOffset); + for (int v = valuesStart; v < valuesEnd; v++) { + FirstOverTimeDoubleAggregator.combine(state, groupId, timestamps.getLong(v), values.getDouble(v)); + } + } + } + } + + private void addRawInput(int positionOffset, IntBlock groups, DoubleVector values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + var valuePosition = groupPosition + positionOffset; + FirstOverTimeDoubleAggregator.combine(state, groupId, timestamps.getLong(valuePosition), values.getDouble(valuePosition)); + } + } + } + + @Override + public void selectedMayContainUnseenGroups(SeenGroupIds seenGroupIds) { + state.enableGroupIdTracking(seenGroupIds); + } + + @Override + public void addIntermediateInput(int positionOffset, IntVector groups, Page page) { + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + assert channels.size() == intermediateBlockCount(); + Block timestampsUncast = page.getBlock(channels.get(0)); + if (timestampsUncast.areAllValuesNull()) { + return; + } + LongBlock timestamps = (LongBlock) timestampsUncast; + Block valuesUncast = page.getBlock(channels.get(1)); + if (valuesUncast.areAllValuesNull()) { + return; + } + DoubleBlock values = (DoubleBlock) valuesUncast; + assert timestamps.getPositionCount() == values.getPositionCount(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + FirstOverTimeDoubleAggregator.combineIntermediate(state, groupId, timestamps, values, groupPosition + positionOffset); + } + } + + @Override + public void addIntermediateRowInput(int groupId, GroupingAggregatorFunction input, int position) { + if (input.getClass() != getClass()) { + throw new IllegalArgumentException("expected " + getClass() + "; got " + input.getClass()); + } + FirstOverTimeDoubleAggregator.GroupingState inState = ((FirstOverTimeDoubleGroupingAggregatorFunction) input).state; + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + FirstOverTimeDoubleAggregator.combineStates(state, groupId, inState, position); + } + + @Override + public void evaluateIntermediate(Block[] blocks, int offset, IntVector selected) { + state.toIntermediate(blocks, offset, selected, driverContext); + } + + @Override + public void evaluateFinal(Block[] blocks, int offset, IntVector selected, + GroupingAggregatorEvaluationContext evaluatorContext) { + blocks[offset] = FirstOverTimeDoubleAggregator.evaluateFinal(state, selected, evaluatorContext); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(getClass().getSimpleName()).append("["); + sb.append("channels=").append(channels); + sb.append("]"); + return sb.toString(); + } + + @Override + public void close() { + state.close(); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstOverTimeFloatAggregatorFunctionSupplier.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstOverTimeFloatAggregatorFunctionSupplier.java new file mode 100644 index 0000000000000..b9fdfc0d508e5 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstOverTimeFloatAggregatorFunctionSupplier.java @@ -0,0 +1,46 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.util.List; +import org.elasticsearch.compute.operator.DriverContext; + +/** + * {@link AggregatorFunctionSupplier} implementation for {@link FirstOverTimeFloatAggregator}. + * This class is generated. Edit {@code AggregatorFunctionSupplierImplementer} instead. + */ +public final class FirstOverTimeFloatAggregatorFunctionSupplier implements AggregatorFunctionSupplier { + public FirstOverTimeFloatAggregatorFunctionSupplier() { + } + + @Override + public List nonGroupingIntermediateStateDesc() { + throw new UnsupportedOperationException("non-grouping aggregator is not supported"); + } + + @Override + public List groupingIntermediateStateDesc() { + return FirstOverTimeFloatGroupingAggregatorFunction.intermediateStateDesc(); + } + + @Override + public AggregatorFunction aggregator(DriverContext driverContext, List channels) { + throw new UnsupportedOperationException("non-grouping aggregator is not supported"); + } + + @Override + public FirstOverTimeFloatGroupingAggregatorFunction groupingAggregator( + DriverContext driverContext, List channels) { + return FirstOverTimeFloatGroupingAggregatorFunction.create(channels, driverContext); + } + + @Override + public String describe() { + return "first_over_time of floats"; + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstOverTimeFloatGroupingAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstOverTimeFloatGroupingAggregatorFunction.java new file mode 100644 index 0000000000000..ad3f37cd22a00 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstOverTimeFloatGroupingAggregatorFunction.java @@ -0,0 +1,228 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.lang.StringBuilder; +import java.util.List; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.data.FloatBlock; +import org.elasticsearch.compute.data.FloatVector; +import org.elasticsearch.compute.data.IntBlock; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.data.LongVector; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.DriverContext; + +/** + * {@link GroupingAggregatorFunction} implementation for {@link FirstOverTimeFloatAggregator}. + * This class is generated. Edit {@code GroupingAggregatorImplementer} instead. + */ +public final class FirstOverTimeFloatGroupingAggregatorFunction implements GroupingAggregatorFunction { + private static final List INTERMEDIATE_STATE_DESC = List.of( + new IntermediateStateDesc("timestamps", ElementType.LONG), + new IntermediateStateDesc("values", ElementType.FLOAT) ); + + private final FirstOverTimeFloatAggregator.GroupingState state; + + private final List channels; + + private final DriverContext driverContext; + + public FirstOverTimeFloatGroupingAggregatorFunction(List channels, + FirstOverTimeFloatAggregator.GroupingState state, DriverContext driverContext) { + this.channels = channels; + this.state = state; + this.driverContext = driverContext; + } + + public static FirstOverTimeFloatGroupingAggregatorFunction create(List channels, + DriverContext driverContext) { + return new FirstOverTimeFloatGroupingAggregatorFunction(channels, FirstOverTimeFloatAggregator.initGrouping(driverContext), driverContext); + } + + public static List intermediateStateDesc() { + return INTERMEDIATE_STATE_DESC; + } + + @Override + public int intermediateBlockCount() { + return INTERMEDIATE_STATE_DESC.size(); + } + + @Override + public GroupingAggregatorFunction.AddInput prepareProcessPage(SeenGroupIds seenGroupIds, + Page page) { + FloatBlock valuesBlock = page.getBlock(channels.get(0)); + FloatVector valuesVector = valuesBlock.asVector(); + LongBlock timestampsBlock = page.getBlock(channels.get(1)); + LongVector timestampsVector = timestampsBlock.asVector(); + if (timestampsVector == null) { + throw new IllegalStateException("expected @timestamp vector; but got a block"); + } + if (valuesVector == null) { + if (valuesBlock.mayHaveNulls()) { + state.enableGroupIdTracking(seenGroupIds); + } + return new GroupingAggregatorFunction.AddInput() { + @Override + public void add(int positionOffset, IntBlock groupIds) { + addRawInput(positionOffset, groupIds, valuesBlock, timestampsVector); + } + + @Override + public void add(int positionOffset, IntVector groupIds) { + addRawInput(positionOffset, groupIds, valuesBlock, timestampsVector); + } + + @Override + public void close() { + } + }; + } + return new GroupingAggregatorFunction.AddInput() { + @Override + public void add(int positionOffset, IntBlock groupIds) { + addRawInput(positionOffset, groupIds, valuesVector, timestampsVector); + } + + @Override + public void add(int positionOffset, IntVector groupIds) { + addRawInput(positionOffset, groupIds, valuesVector, timestampsVector); + } + + @Override + public void close() { + } + }; + } + + private void addRawInput(int positionOffset, IntVector groups, FloatBlock values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + if (values.isNull(groupPosition + positionOffset)) { + continue; + } + int valuesStart = values.getFirstValueIndex(groupPosition + positionOffset); + int valuesEnd = valuesStart + values.getValueCount(groupPosition + positionOffset); + for (int v = valuesStart; v < valuesEnd; v++) { + FirstOverTimeFloatAggregator.combine(state, groupId, timestamps.getLong(v), values.getFloat(v)); + } + } + } + + private void addRawInput(int positionOffset, IntVector groups, FloatVector values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + var valuePosition = groupPosition + positionOffset; + FirstOverTimeFloatAggregator.combine(state, groupId, timestamps.getLong(valuePosition), values.getFloat(valuePosition)); + } + } + + private void addRawInput(int positionOffset, IntBlock groups, FloatBlock values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + if (values.isNull(groupPosition + positionOffset)) { + continue; + } + int valuesStart = values.getFirstValueIndex(groupPosition + positionOffset); + int valuesEnd = valuesStart + values.getValueCount(groupPosition + positionOffset); + for (int v = valuesStart; v < valuesEnd; v++) { + FirstOverTimeFloatAggregator.combine(state, groupId, timestamps.getLong(v), values.getFloat(v)); + } + } + } + } + + private void addRawInput(int positionOffset, IntBlock groups, FloatVector values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + var valuePosition = groupPosition + positionOffset; + FirstOverTimeFloatAggregator.combine(state, groupId, timestamps.getLong(valuePosition), values.getFloat(valuePosition)); + } + } + } + + @Override + public void selectedMayContainUnseenGroups(SeenGroupIds seenGroupIds) { + state.enableGroupIdTracking(seenGroupIds); + } + + @Override + public void addIntermediateInput(int positionOffset, IntVector groups, Page page) { + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + assert channels.size() == intermediateBlockCount(); + Block timestampsUncast = page.getBlock(channels.get(0)); + if (timestampsUncast.areAllValuesNull()) { + return; + } + LongBlock timestamps = (LongBlock) timestampsUncast; + Block valuesUncast = page.getBlock(channels.get(1)); + if (valuesUncast.areAllValuesNull()) { + return; + } + FloatBlock values = (FloatBlock) valuesUncast; + assert timestamps.getPositionCount() == values.getPositionCount(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + FirstOverTimeFloatAggregator.combineIntermediate(state, groupId, timestamps, values, groupPosition + positionOffset); + } + } + + @Override + public void addIntermediateRowInput(int groupId, GroupingAggregatorFunction input, int position) { + if (input.getClass() != getClass()) { + throw new IllegalArgumentException("expected " + getClass() + "; got " + input.getClass()); + } + FirstOverTimeFloatAggregator.GroupingState inState = ((FirstOverTimeFloatGroupingAggregatorFunction) input).state; + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + FirstOverTimeFloatAggregator.combineStates(state, groupId, inState, position); + } + + @Override + public void evaluateIntermediate(Block[] blocks, int offset, IntVector selected) { + state.toIntermediate(blocks, offset, selected, driverContext); + } + + @Override + public void evaluateFinal(Block[] blocks, int offset, IntVector selected, + GroupingAggregatorEvaluationContext evaluatorContext) { + blocks[offset] = FirstOverTimeFloatAggregator.evaluateFinal(state, selected, evaluatorContext); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(getClass().getSimpleName()).append("["); + sb.append("channels=").append(channels); + sb.append("]"); + return sb.toString(); + } + + @Override + public void close() { + state.close(); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstOverTimeIntAggregatorFunctionSupplier.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstOverTimeIntAggregatorFunctionSupplier.java new file mode 100644 index 0000000000000..5074c719e2f0f --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstOverTimeIntAggregatorFunctionSupplier.java @@ -0,0 +1,46 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.util.List; +import org.elasticsearch.compute.operator.DriverContext; + +/** + * {@link AggregatorFunctionSupplier} implementation for {@link FirstOverTimeIntAggregator}. + * This class is generated. Edit {@code AggregatorFunctionSupplierImplementer} instead. + */ +public final class FirstOverTimeIntAggregatorFunctionSupplier implements AggregatorFunctionSupplier { + public FirstOverTimeIntAggregatorFunctionSupplier() { + } + + @Override + public List nonGroupingIntermediateStateDesc() { + throw new UnsupportedOperationException("non-grouping aggregator is not supported"); + } + + @Override + public List groupingIntermediateStateDesc() { + return FirstOverTimeIntGroupingAggregatorFunction.intermediateStateDesc(); + } + + @Override + public AggregatorFunction aggregator(DriverContext driverContext, List channels) { + throw new UnsupportedOperationException("non-grouping aggregator is not supported"); + } + + @Override + public FirstOverTimeIntGroupingAggregatorFunction groupingAggregator(DriverContext driverContext, + List channels) { + return FirstOverTimeIntGroupingAggregatorFunction.create(channels, driverContext); + } + + @Override + public String describe() { + return "first_over_time of ints"; + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstOverTimeIntGroupingAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstOverTimeIntGroupingAggregatorFunction.java new file mode 100644 index 0000000000000..9253aa51831b2 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstOverTimeIntGroupingAggregatorFunction.java @@ -0,0 +1,226 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.lang.StringBuilder; +import java.util.List; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.data.IntBlock; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.data.LongVector; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.DriverContext; + +/** + * {@link GroupingAggregatorFunction} implementation for {@link FirstOverTimeIntAggregator}. + * This class is generated. Edit {@code GroupingAggregatorImplementer} instead. + */ +public final class FirstOverTimeIntGroupingAggregatorFunction implements GroupingAggregatorFunction { + private static final List INTERMEDIATE_STATE_DESC = List.of( + new IntermediateStateDesc("timestamps", ElementType.LONG), + new IntermediateStateDesc("values", ElementType.INT) ); + + private final FirstOverTimeIntAggregator.GroupingState state; + + private final List channels; + + private final DriverContext driverContext; + + public FirstOverTimeIntGroupingAggregatorFunction(List channels, + FirstOverTimeIntAggregator.GroupingState state, DriverContext driverContext) { + this.channels = channels; + this.state = state; + this.driverContext = driverContext; + } + + public static FirstOverTimeIntGroupingAggregatorFunction create(List channels, + DriverContext driverContext) { + return new FirstOverTimeIntGroupingAggregatorFunction(channels, FirstOverTimeIntAggregator.initGrouping(driverContext), driverContext); + } + + public static List intermediateStateDesc() { + return INTERMEDIATE_STATE_DESC; + } + + @Override + public int intermediateBlockCount() { + return INTERMEDIATE_STATE_DESC.size(); + } + + @Override + public GroupingAggregatorFunction.AddInput prepareProcessPage(SeenGroupIds seenGroupIds, + Page page) { + IntBlock valuesBlock = page.getBlock(channels.get(0)); + IntVector valuesVector = valuesBlock.asVector(); + LongBlock timestampsBlock = page.getBlock(channels.get(1)); + LongVector timestampsVector = timestampsBlock.asVector(); + if (timestampsVector == null) { + throw new IllegalStateException("expected @timestamp vector; but got a block"); + } + if (valuesVector == null) { + if (valuesBlock.mayHaveNulls()) { + state.enableGroupIdTracking(seenGroupIds); + } + return new GroupingAggregatorFunction.AddInput() { + @Override + public void add(int positionOffset, IntBlock groupIds) { + addRawInput(positionOffset, groupIds, valuesBlock, timestampsVector); + } + + @Override + public void add(int positionOffset, IntVector groupIds) { + addRawInput(positionOffset, groupIds, valuesBlock, timestampsVector); + } + + @Override + public void close() { + } + }; + } + return new GroupingAggregatorFunction.AddInput() { + @Override + public void add(int positionOffset, IntBlock groupIds) { + addRawInput(positionOffset, groupIds, valuesVector, timestampsVector); + } + + @Override + public void add(int positionOffset, IntVector groupIds) { + addRawInput(positionOffset, groupIds, valuesVector, timestampsVector); + } + + @Override + public void close() { + } + }; + } + + private void addRawInput(int positionOffset, IntVector groups, IntBlock values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + if (values.isNull(groupPosition + positionOffset)) { + continue; + } + int valuesStart = values.getFirstValueIndex(groupPosition + positionOffset); + int valuesEnd = valuesStart + values.getValueCount(groupPosition + positionOffset); + for (int v = valuesStart; v < valuesEnd; v++) { + FirstOverTimeIntAggregator.combine(state, groupId, timestamps.getLong(v), values.getInt(v)); + } + } + } + + private void addRawInput(int positionOffset, IntVector groups, IntVector values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + var valuePosition = groupPosition + positionOffset; + FirstOverTimeIntAggregator.combine(state, groupId, timestamps.getLong(valuePosition), values.getInt(valuePosition)); + } + } + + private void addRawInput(int positionOffset, IntBlock groups, IntBlock values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + if (values.isNull(groupPosition + positionOffset)) { + continue; + } + int valuesStart = values.getFirstValueIndex(groupPosition + positionOffset); + int valuesEnd = valuesStart + values.getValueCount(groupPosition + positionOffset); + for (int v = valuesStart; v < valuesEnd; v++) { + FirstOverTimeIntAggregator.combine(state, groupId, timestamps.getLong(v), values.getInt(v)); + } + } + } + } + + private void addRawInput(int positionOffset, IntBlock groups, IntVector values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + var valuePosition = groupPosition + positionOffset; + FirstOverTimeIntAggregator.combine(state, groupId, timestamps.getLong(valuePosition), values.getInt(valuePosition)); + } + } + } + + @Override + public void selectedMayContainUnseenGroups(SeenGroupIds seenGroupIds) { + state.enableGroupIdTracking(seenGroupIds); + } + + @Override + public void addIntermediateInput(int positionOffset, IntVector groups, Page page) { + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + assert channels.size() == intermediateBlockCount(); + Block timestampsUncast = page.getBlock(channels.get(0)); + if (timestampsUncast.areAllValuesNull()) { + return; + } + LongBlock timestamps = (LongBlock) timestampsUncast; + Block valuesUncast = page.getBlock(channels.get(1)); + if (valuesUncast.areAllValuesNull()) { + return; + } + IntBlock values = (IntBlock) valuesUncast; + assert timestamps.getPositionCount() == values.getPositionCount(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + FirstOverTimeIntAggregator.combineIntermediate(state, groupId, timestamps, values, groupPosition + positionOffset); + } + } + + @Override + public void addIntermediateRowInput(int groupId, GroupingAggregatorFunction input, int position) { + if (input.getClass() != getClass()) { + throw new IllegalArgumentException("expected " + getClass() + "; got " + input.getClass()); + } + FirstOverTimeIntAggregator.GroupingState inState = ((FirstOverTimeIntGroupingAggregatorFunction) input).state; + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + FirstOverTimeIntAggregator.combineStates(state, groupId, inState, position); + } + + @Override + public void evaluateIntermediate(Block[] blocks, int offset, IntVector selected) { + state.toIntermediate(blocks, offset, selected, driverContext); + } + + @Override + public void evaluateFinal(Block[] blocks, int offset, IntVector selected, + GroupingAggregatorEvaluationContext evaluatorContext) { + blocks[offset] = FirstOverTimeIntAggregator.evaluateFinal(state, selected, evaluatorContext); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(getClass().getSimpleName()).append("["); + sb.append("channels=").append(channels); + sb.append("]"); + return sb.toString(); + } + + @Override + public void close() { + state.close(); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstOverTimeLongAggregatorFunctionSupplier.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstOverTimeLongAggregatorFunctionSupplier.java new file mode 100644 index 0000000000000..d07c29df61e9b --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstOverTimeLongAggregatorFunctionSupplier.java @@ -0,0 +1,46 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.util.List; +import org.elasticsearch.compute.operator.DriverContext; + +/** + * {@link AggregatorFunctionSupplier} implementation for {@link FirstOverTimeLongAggregator}. + * This class is generated. Edit {@code AggregatorFunctionSupplierImplementer} instead. + */ +public final class FirstOverTimeLongAggregatorFunctionSupplier implements AggregatorFunctionSupplier { + public FirstOverTimeLongAggregatorFunctionSupplier() { + } + + @Override + public List nonGroupingIntermediateStateDesc() { + throw new UnsupportedOperationException("non-grouping aggregator is not supported"); + } + + @Override + public List groupingIntermediateStateDesc() { + return FirstOverTimeLongGroupingAggregatorFunction.intermediateStateDesc(); + } + + @Override + public AggregatorFunction aggregator(DriverContext driverContext, List channels) { + throw new UnsupportedOperationException("non-grouping aggregator is not supported"); + } + + @Override + public FirstOverTimeLongGroupingAggregatorFunction groupingAggregator(DriverContext driverContext, + List channels) { + return FirstOverTimeLongGroupingAggregatorFunction.create(channels, driverContext); + } + + @Override + public String describe() { + return "first_over_time of longs"; + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstOverTimeLongGroupingAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstOverTimeLongGroupingAggregatorFunction.java new file mode 100644 index 0000000000000..e5a372c767b73 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstOverTimeLongGroupingAggregatorFunction.java @@ -0,0 +1,226 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.lang.StringBuilder; +import java.util.List; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.data.IntBlock; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.data.LongVector; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.DriverContext; + +/** + * {@link GroupingAggregatorFunction} implementation for {@link FirstOverTimeLongAggregator}. + * This class is generated. Edit {@code GroupingAggregatorImplementer} instead. + */ +public final class FirstOverTimeLongGroupingAggregatorFunction implements GroupingAggregatorFunction { + private static final List INTERMEDIATE_STATE_DESC = List.of( + new IntermediateStateDesc("timestamps", ElementType.LONG), + new IntermediateStateDesc("values", ElementType.LONG) ); + + private final FirstOverTimeLongAggregator.GroupingState state; + + private final List channels; + + private final DriverContext driverContext; + + public FirstOverTimeLongGroupingAggregatorFunction(List channels, + FirstOverTimeLongAggregator.GroupingState state, DriverContext driverContext) { + this.channels = channels; + this.state = state; + this.driverContext = driverContext; + } + + public static FirstOverTimeLongGroupingAggregatorFunction create(List channels, + DriverContext driverContext) { + return new FirstOverTimeLongGroupingAggregatorFunction(channels, FirstOverTimeLongAggregator.initGrouping(driverContext), driverContext); + } + + public static List intermediateStateDesc() { + return INTERMEDIATE_STATE_DESC; + } + + @Override + public int intermediateBlockCount() { + return INTERMEDIATE_STATE_DESC.size(); + } + + @Override + public GroupingAggregatorFunction.AddInput prepareProcessPage(SeenGroupIds seenGroupIds, + Page page) { + LongBlock valuesBlock = page.getBlock(channels.get(0)); + LongVector valuesVector = valuesBlock.asVector(); + LongBlock timestampsBlock = page.getBlock(channels.get(1)); + LongVector timestampsVector = timestampsBlock.asVector(); + if (timestampsVector == null) { + throw new IllegalStateException("expected @timestamp vector; but got a block"); + } + if (valuesVector == null) { + if (valuesBlock.mayHaveNulls()) { + state.enableGroupIdTracking(seenGroupIds); + } + return new GroupingAggregatorFunction.AddInput() { + @Override + public void add(int positionOffset, IntBlock groupIds) { + addRawInput(positionOffset, groupIds, valuesBlock, timestampsVector); + } + + @Override + public void add(int positionOffset, IntVector groupIds) { + addRawInput(positionOffset, groupIds, valuesBlock, timestampsVector); + } + + @Override + public void close() { + } + }; + } + return new GroupingAggregatorFunction.AddInput() { + @Override + public void add(int positionOffset, IntBlock groupIds) { + addRawInput(positionOffset, groupIds, valuesVector, timestampsVector); + } + + @Override + public void add(int positionOffset, IntVector groupIds) { + addRawInput(positionOffset, groupIds, valuesVector, timestampsVector); + } + + @Override + public void close() { + } + }; + } + + private void addRawInput(int positionOffset, IntVector groups, LongBlock values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + if (values.isNull(groupPosition + positionOffset)) { + continue; + } + int valuesStart = values.getFirstValueIndex(groupPosition + positionOffset); + int valuesEnd = valuesStart + values.getValueCount(groupPosition + positionOffset); + for (int v = valuesStart; v < valuesEnd; v++) { + FirstOverTimeLongAggregator.combine(state, groupId, timestamps.getLong(v), values.getLong(v)); + } + } + } + + private void addRawInput(int positionOffset, IntVector groups, LongVector values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + var valuePosition = groupPosition + positionOffset; + FirstOverTimeLongAggregator.combine(state, groupId, timestamps.getLong(valuePosition), values.getLong(valuePosition)); + } + } + + private void addRawInput(int positionOffset, IntBlock groups, LongBlock values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + if (values.isNull(groupPosition + positionOffset)) { + continue; + } + int valuesStart = values.getFirstValueIndex(groupPosition + positionOffset); + int valuesEnd = valuesStart + values.getValueCount(groupPosition + positionOffset); + for (int v = valuesStart; v < valuesEnd; v++) { + FirstOverTimeLongAggregator.combine(state, groupId, timestamps.getLong(v), values.getLong(v)); + } + } + } + } + + private void addRawInput(int positionOffset, IntBlock groups, LongVector values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + var valuePosition = groupPosition + positionOffset; + FirstOverTimeLongAggregator.combine(state, groupId, timestamps.getLong(valuePosition), values.getLong(valuePosition)); + } + } + } + + @Override + public void selectedMayContainUnseenGroups(SeenGroupIds seenGroupIds) { + state.enableGroupIdTracking(seenGroupIds); + } + + @Override + public void addIntermediateInput(int positionOffset, IntVector groups, Page page) { + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + assert channels.size() == intermediateBlockCount(); + Block timestampsUncast = page.getBlock(channels.get(0)); + if (timestampsUncast.areAllValuesNull()) { + return; + } + LongBlock timestamps = (LongBlock) timestampsUncast; + Block valuesUncast = page.getBlock(channels.get(1)); + if (valuesUncast.areAllValuesNull()) { + return; + } + LongBlock values = (LongBlock) valuesUncast; + assert timestamps.getPositionCount() == values.getPositionCount(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + FirstOverTimeLongAggregator.combineIntermediate(state, groupId, timestamps, values, groupPosition + positionOffset); + } + } + + @Override + public void addIntermediateRowInput(int groupId, GroupingAggregatorFunction input, int position) { + if (input.getClass() != getClass()) { + throw new IllegalArgumentException("expected " + getClass() + "; got " + input.getClass()); + } + FirstOverTimeLongAggregator.GroupingState inState = ((FirstOverTimeLongGroupingAggregatorFunction) input).state; + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + FirstOverTimeLongAggregator.combineStates(state, groupId, inState, position); + } + + @Override + public void evaluateIntermediate(Block[] blocks, int offset, IntVector selected) { + state.toIntermediate(blocks, offset, selected, driverContext); + } + + @Override + public void evaluateFinal(Block[] blocks, int offset, IntVector selected, + GroupingAggregatorEvaluationContext evaluatorContext) { + blocks[offset] = FirstOverTimeLongAggregator.evaluateFinal(state, selected, evaluatorContext); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(getClass().getSimpleName()).append("["); + sb.append("channels=").append(channels); + sb.append("]"); + return sb.toString(); + } + + @Override + public void close() { + state.close(); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-LastOverTimeAggregator.java.st b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-ValueOverTimeAggregator.java.st similarity index 95% rename from x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-LastOverTimeAggregator.java.st rename to x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-ValueOverTimeAggregator.java.st index b189a83873dd7..a830a4a03ef29 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-LastOverTimeAggregator.java.st +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-ValueOverTimeAggregator.java.st @@ -24,14 +24,14 @@ import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.core.Releasables; /** - * A time-series aggregation function that collects the most recent value of a time series in a specified interval. - * This class is generated. Edit `X-LastOverTimeAggregator.java.st` instead. + * A time-series aggregation function that collects the $Occurrence$ occurrence value of a time series in a specified interval. + * This class is generated. Edit `X-ValueOverTimeAggregator.java.st` instead. */ @GroupingAggregator( timeseries = true, value = { @IntermediateState(name = "timestamps", type = "LONG_BLOCK"), @IntermediateState(name = "values", type = "$TYPE$_BLOCK") } ) -public class LastOverTime$Type$Aggregator { +public class $Occurrence$OverTime$Type$Aggregator { public static GroupingState initGrouping(DriverContext driverContext) { return new GroupingState(driverContext.bigArrays()); @@ -97,7 +97,7 @@ public class LastOverTime$Type$Aggregator { void collectValue(int groupId, long timestamp, $type$ value) { if (groupId < timestamps.size()) { // TODO: handle multiple values? - if (hasValue(groupId) == false || timestamps.get(groupId) < timestamp) { + if (hasValue(groupId) == false || timestamps.get(groupId) $if(Last)$<$else$>$endif$ timestamp) { timestamps.set(groupId, timestamp); values.set(groupId, value); } diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec index cbccc0c61e416..02b16048993a4 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec @@ -238,3 +238,21 @@ max_cost:double | cluster:keyword | time_bucket:datetime 12.0 | qa | 2024-05-10T00:08:00.000Z 11.875 | qa | 2024-05-10T00:21:00.000Z ; + +max_of_first_over_time +required_capability: metrics_command +required_capability: first_over_time +TS k8s | STATS max_cost=max(first_over_time(network.cost)) BY cluster, time_bucket = bucket(@timestamp,1minute) | SORT max_cost DESC, time_bucket DESC, cluster | LIMIT 10; + +max_cost:double | cluster:keyword | time_bucket:datetime +11.875 | qa | 2024-05-10T00:09:00.000Z +11.5 | staging | 2024-05-10T00:16:00.000Z +11.25 | prod | 2024-05-10T00:14:00.000Z +11.25 | staging | 2024-05-10T00:02:00.000Z +10.875 | qa | 2024-05-10T00:22:00.000Z +10.0 | staging | 2024-05-10T00:11:00.000Z +9.375 | qa | 2024-05-10T00:20:00.000Z +9.25 | staging | 2024-05-10T00:03:00.000Z +8.375 | staging | 2024-05-10T00:20:00.000Z +8.125 | qa | 2024-05-10T00:06:00.000Z +; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index b201ab7cb4afe..398bca9eb8024 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -1054,7 +1054,14 @@ public enum Cap { /** * Guards a bug fix matching {@code TO_LOWER(f) == ""}. */ - TO_LOWER_EMPTY_STRING; + TO_LOWER_EMPTY_STRING, + + /** + * Support first_over_time aggregation that gets evaluated per time-series + */ + FIRST_OVER_TIME(Build.current().isSnapshot()), + + ; private final boolean enabled; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java index 0bd4e4bda7c5b..c622a26a7080e 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java @@ -22,6 +22,7 @@ import org.elasticsearch.xpack.esql.expression.function.aggregate.AvgOverTime; import org.elasticsearch.xpack.esql.expression.function.aggregate.Count; import org.elasticsearch.xpack.esql.expression.function.aggregate.CountDistinct; +import org.elasticsearch.xpack.esql.expression.function.aggregate.FirstOverTime; import org.elasticsearch.xpack.esql.expression.function.aggregate.LastOverTime; import org.elasticsearch.xpack.esql.expression.function.aggregate.Max; import org.elasticsearch.xpack.esql.expression.function.aggregate.MaxOverTime; @@ -446,6 +447,7 @@ private static FunctionDefinition[][] snapshotFunctions() { def(MaxOverTime.class, uni(MaxOverTime::new), "max_over_time"), def(AvgOverTime.class, uni(AvgOverTime::new), "avg_over_time"), def(LastOverTime.class, LastOverTime::withUnresolvedTimestamp, "last_over_time"), + def(FirstOverTime.class, FirstOverTime::withUnresolvedTimestamp, "first_over_time"), def(Term.class, bi(Term::new), "term") } }; } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java index aedd976b69762..b6232c605ab71 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java @@ -33,6 +33,7 @@ public static List getNamedWriteables() { MaxOverTime.ENTRY, AvgOverTime.ENTRY, LastOverTime.ENTRY, + FirstOverTime.ENTRY, // internal functions ToPartial.ENTRY, FromPartial.ENTRY, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/FirstOverTime.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/FirstOverTime.java new file mode 100644 index 0000000000000..4b7e38627cf99 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/FirstOverTime.java @@ -0,0 +1,141 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.expression.function.aggregate; + +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.compute.aggregation.AggregatorFunctionSupplier; +import org.elasticsearch.compute.aggregation.FirstOverTimeDoubleAggregatorFunctionSupplier; +import org.elasticsearch.compute.aggregation.FirstOverTimeFloatAggregatorFunctionSupplier; +import org.elasticsearch.compute.aggregation.FirstOverTimeIntAggregatorFunctionSupplier; +import org.elasticsearch.compute.aggregation.FirstOverTimeLongAggregatorFunctionSupplier; +import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException; +import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.core.expression.Literal; +import org.elasticsearch.xpack.esql.core.expression.UnresolvedAttribute; +import org.elasticsearch.xpack.esql.core.tree.NodeInfo; +import org.elasticsearch.xpack.esql.core.tree.Source; +import org.elasticsearch.xpack.esql.core.type.DataType; +import org.elasticsearch.xpack.esql.expression.function.FunctionInfo; +import org.elasticsearch.xpack.esql.expression.function.FunctionType; +import org.elasticsearch.xpack.esql.expression.function.OptionalArgument; +import org.elasticsearch.xpack.esql.expression.function.Param; +import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; +import org.elasticsearch.xpack.esql.planner.ToAggregator; + +import java.io.IOException; +import java.util.List; + +import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.DEFAULT; +import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isType; + +public class FirstOverTime extends TimeSeriesAggregateFunction implements OptionalArgument, ToAggregator { + public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( + Expression.class, + "FirstOverTime", + FirstOverTime::new + ); + + private final Expression timestamp; + + @FunctionInfo( + returnType = { "int", "double", "integer", "long" }, + description = "Collect the first occurrence value of a time-series in the specified interval. Available with TS command only", + type = FunctionType.AGGREGATE + ) + public FirstOverTime( + Source source, + @Param(name = "field", type = { "long|int|double|float" }, description = "field") Expression field, + Expression timestamp + ) { + this(source, field, Literal.TRUE, timestamp); + } + + // compatibility constructor used when reading from the stream + private FirstOverTime(Source source, Expression field, Expression filter, List children) { + this(source, field, filter, children.getFirst()); + } + + private FirstOverTime(Source source, Expression field, Expression filter, Expression timestamp) { + super(source, field, filter, List.of(timestamp)); + this.timestamp = timestamp; + } + + public FirstOverTime(StreamInput in) throws IOException { + this( + Source.readFrom((PlanStreamInput) in), + in.readNamedWriteable(Expression.class), + in.readNamedWriteable(Expression.class), + in.readNamedWriteableCollectionAsList(Expression.class) + ); + } + + @Override + public String getWriteableName() { + return ENTRY.name; + } + + public static FirstOverTime withUnresolvedTimestamp(Source source, Expression field) { + return new FirstOverTime(source, field, new UnresolvedAttribute(source, "@timestamp")); + } + + @Override + protected NodeInfo info() { + return NodeInfo.create(this, FirstOverTime::new, field(), timestamp); + } + + @Override + public FirstOverTime replaceChildren(List newChildren) { + if (newChildren.size() != 3) { + assert false : "expected 3 children for field, filter, @timestamp; got " + newChildren; + throw new IllegalArgumentException("expected 3 children for field, filter, @timestamp; got " + newChildren); + } + return new FirstOverTime(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2)); + } + + @Override + public FirstOverTime withFilter(Expression filter) { + return new FirstOverTime(source(), field(), filter, timestamp); + } + + @Override + public DataType dataType() { + return field().dataType(); + } + + @Override + protected TypeResolution resolveType() { + return isType(field(), dt -> dt.isNumeric() && dt != DataType.UNSIGNED_LONG, sourceText(), DEFAULT, "numeric except unsigned_long"); + } + + @Override + public AggregatorFunctionSupplier supplier() { + final DataType type = field().dataType(); + return switch (type) { + case LONG -> new FirstOverTimeLongAggregatorFunctionSupplier(); + case INTEGER -> new FirstOverTimeIntAggregatorFunctionSupplier(); + case DOUBLE -> new FirstOverTimeDoubleAggregatorFunctionSupplier(); + case FLOAT -> new FirstOverTimeFloatAggregatorFunctionSupplier(); + default -> throw EsqlIllegalArgumentException.illegalDataType(type); + }; + } + + @Override + public FirstOverTime perTimeSeriesAggregation() { + return this; + } + + @Override + public String toString() { + return "first_over_time(" + field() + ")"; + } + + Expression timestamp() { + return timestamp; + } +} diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_usage.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_usage.yml index f4a6f48e27ddf..7da3fc1c6a3e0 100644 --- a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_usage.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_usage.yml @@ -33,7 +33,7 @@ setup: path: /_query parameters: [] # A snapshot function was removed in match_function_options, it can't work on mixed cluster tests otherwise. - capabilities: [ snapshot_test_for_telemetry, fn_byte_length, match_function_options, last_over_time] + capabilities: [ snapshot_test_for_telemetry, fn_byte_length, match_function_options, first_over_time] reason: "Test that should only be executed on snapshot versions" - do: {xpack.usage: {}} @@ -123,7 +123,7 @@ setup: - match: {esql.functions.coalesce: $functions_coalesce} - gt: {esql.functions.categorize: $functions_categorize} # Testing for the entire function set isn't feasbile, so we just check that we return the correct count as an approximation. - - length: {esql.functions: 138} # check the "sister" test below for a likely update to the same esql.functions length check + - length: {esql.functions: 139} # check the "sister" test below for a likely update to the same esql.functions length check --- "Basic ESQL usage output (telemetry) non-snapshot version": From f1b870aad6a7459a6f6311b155f30578267a7174 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sun, 4 May 2025 09:15:21 -0700 Subject: [PATCH 2/2] track max_group_id --- .../FirstOverTimeDoubleAggregator.java | 4 +++- .../FirstOverTimeFloatAggregator.java | 4 +++- .../aggregation/FirstOverTimeIntAggregator.java | 4 +++- .../aggregation/FirstOverTimeLongAggregator.java | 4 +++- .../LastOverTimeDoubleAggregator.java | 4 +++- .../aggregation/LastOverTimeFloatAggregator.java | 4 +++- .../aggregation/LastOverTimeIntAggregator.java | 4 +++- .../aggregation/LastOverTimeLongAggregator.java | 4 +++- .../X-ValueOverTimeAggregator.java.st | 4 +++- .../src/main/resources/k8s-settings.json | 2 +- .../src/main/resources/k8s-timeseries.csv-spec | 16 ++++++++-------- 11 files changed, 36 insertions(+), 18 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstOverTimeDoubleAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstOverTimeDoubleAggregator.java index 4785566c077e2..4c860f4b3388a 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstOverTimeDoubleAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstOverTimeDoubleAggregator.java @@ -72,6 +72,7 @@ public static final class GroupingState extends AbstractArrayState { private final BigArrays bigArrays; private LongArray timestamps; private DoubleArray values; + private int maxGroupId = -1; GroupingState(BigArrays bigArrays) { super(bigArrays); @@ -93,7 +94,7 @@ public static final class GroupingState extends AbstractArrayState { void collectValue(int groupId, long timestamp, double value) { if (groupId < timestamps.size()) { // TODO: handle multiple values? - if (hasValue(groupId) == false || timestamps.get(groupId) > timestamp) { + if (groupId > maxGroupId || hasValue(groupId) == false || timestamps.get(groupId) > timestamp) { timestamps.set(groupId, timestamp); values.set(groupId, value); } @@ -103,6 +104,7 @@ void collectValue(int groupId, long timestamp, double value) { timestamps.set(groupId, timestamp); values.set(groupId, value); } + maxGroupId = Math.max(maxGroupId, groupId); trackGroupId(groupId); } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstOverTimeFloatAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstOverTimeFloatAggregator.java index 2f82410d6714b..5439713a2cc21 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstOverTimeFloatAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstOverTimeFloatAggregator.java @@ -72,6 +72,7 @@ public static final class GroupingState extends AbstractArrayState { private final BigArrays bigArrays; private LongArray timestamps; private FloatArray values; + private int maxGroupId = -1; GroupingState(BigArrays bigArrays) { super(bigArrays); @@ -93,7 +94,7 @@ public static final class GroupingState extends AbstractArrayState { void collectValue(int groupId, long timestamp, float value) { if (groupId < timestamps.size()) { // TODO: handle multiple values? - if (hasValue(groupId) == false || timestamps.get(groupId) > timestamp) { + if (groupId > maxGroupId || hasValue(groupId) == false || timestamps.get(groupId) > timestamp) { timestamps.set(groupId, timestamp); values.set(groupId, value); } @@ -103,6 +104,7 @@ void collectValue(int groupId, long timestamp, float value) { timestamps.set(groupId, timestamp); values.set(groupId, value); } + maxGroupId = Math.max(maxGroupId, groupId); trackGroupId(groupId); } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstOverTimeIntAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstOverTimeIntAggregator.java index d99b3d759656e..7af8df14a1e4e 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstOverTimeIntAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstOverTimeIntAggregator.java @@ -72,6 +72,7 @@ public static final class GroupingState extends AbstractArrayState { private final BigArrays bigArrays; private LongArray timestamps; private IntArray values; + private int maxGroupId = -1; GroupingState(BigArrays bigArrays) { super(bigArrays); @@ -93,7 +94,7 @@ public static final class GroupingState extends AbstractArrayState { void collectValue(int groupId, long timestamp, int value) { if (groupId < timestamps.size()) { // TODO: handle multiple values? - if (hasValue(groupId) == false || timestamps.get(groupId) > timestamp) { + if (groupId > maxGroupId || hasValue(groupId) == false || timestamps.get(groupId) > timestamp) { timestamps.set(groupId, timestamp); values.set(groupId, value); } @@ -103,6 +104,7 @@ void collectValue(int groupId, long timestamp, int value) { timestamps.set(groupId, timestamp); values.set(groupId, value); } + maxGroupId = Math.max(maxGroupId, groupId); trackGroupId(groupId); } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstOverTimeLongAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstOverTimeLongAggregator.java index 6109370f90f54..8cfb5333631b8 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstOverTimeLongAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstOverTimeLongAggregator.java @@ -70,6 +70,7 @@ public static final class GroupingState extends AbstractArrayState { private final BigArrays bigArrays; private LongArray timestamps; private LongArray values; + private int maxGroupId = -1; GroupingState(BigArrays bigArrays) { super(bigArrays); @@ -91,7 +92,7 @@ public static final class GroupingState extends AbstractArrayState { void collectValue(int groupId, long timestamp, long value) { if (groupId < timestamps.size()) { // TODO: handle multiple values? - if (hasValue(groupId) == false || timestamps.get(groupId) > timestamp) { + if (groupId > maxGroupId || hasValue(groupId) == false || timestamps.get(groupId) > timestamp) { timestamps.set(groupId, timestamp); values.set(groupId, value); } @@ -101,6 +102,7 @@ void collectValue(int groupId, long timestamp, long value) { timestamps.set(groupId, timestamp); values.set(groupId, value); } + maxGroupId = Math.max(maxGroupId, groupId); trackGroupId(groupId); } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeDoubleAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeDoubleAggregator.java index e3410317f13c1..456a9ff88413a 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeDoubleAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeDoubleAggregator.java @@ -72,6 +72,7 @@ public static final class GroupingState extends AbstractArrayState { private final BigArrays bigArrays; private LongArray timestamps; private DoubleArray values; + private int maxGroupId = -1; GroupingState(BigArrays bigArrays) { super(bigArrays); @@ -93,7 +94,7 @@ public static final class GroupingState extends AbstractArrayState { void collectValue(int groupId, long timestamp, double value) { if (groupId < timestamps.size()) { // TODO: handle multiple values? - if (hasValue(groupId) == false || timestamps.get(groupId) < timestamp) { + if (groupId > maxGroupId || hasValue(groupId) == false || timestamps.get(groupId) < timestamp) { timestamps.set(groupId, timestamp); values.set(groupId, value); } @@ -103,6 +104,7 @@ void collectValue(int groupId, long timestamp, double value) { timestamps.set(groupId, timestamp); values.set(groupId, value); } + maxGroupId = Math.max(maxGroupId, groupId); trackGroupId(groupId); } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeFloatAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeFloatAggregator.java index 14178798cef87..b8dbcaef236fc 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeFloatAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeFloatAggregator.java @@ -72,6 +72,7 @@ public static final class GroupingState extends AbstractArrayState { private final BigArrays bigArrays; private LongArray timestamps; private FloatArray values; + private int maxGroupId = -1; GroupingState(BigArrays bigArrays) { super(bigArrays); @@ -93,7 +94,7 @@ public static final class GroupingState extends AbstractArrayState { void collectValue(int groupId, long timestamp, float value) { if (groupId < timestamps.size()) { // TODO: handle multiple values? - if (hasValue(groupId) == false || timestamps.get(groupId) < timestamp) { + if (groupId > maxGroupId || hasValue(groupId) == false || timestamps.get(groupId) < timestamp) { timestamps.set(groupId, timestamp); values.set(groupId, value); } @@ -103,6 +104,7 @@ void collectValue(int groupId, long timestamp, float value) { timestamps.set(groupId, timestamp); values.set(groupId, value); } + maxGroupId = Math.max(maxGroupId, groupId); trackGroupId(groupId); } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeIntAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeIntAggregator.java index f356941c376a4..6f6f2c8829f46 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeIntAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeIntAggregator.java @@ -72,6 +72,7 @@ public static final class GroupingState extends AbstractArrayState { private final BigArrays bigArrays; private LongArray timestamps; private IntArray values; + private int maxGroupId = -1; GroupingState(BigArrays bigArrays) { super(bigArrays); @@ -93,7 +94,7 @@ public static final class GroupingState extends AbstractArrayState { void collectValue(int groupId, long timestamp, int value) { if (groupId < timestamps.size()) { // TODO: handle multiple values? - if (hasValue(groupId) == false || timestamps.get(groupId) < timestamp) { + if (groupId > maxGroupId || hasValue(groupId) == false || timestamps.get(groupId) < timestamp) { timestamps.set(groupId, timestamp); values.set(groupId, value); } @@ -103,6 +104,7 @@ void collectValue(int groupId, long timestamp, int value) { timestamps.set(groupId, timestamp); values.set(groupId, value); } + maxGroupId = Math.max(maxGroupId, groupId); trackGroupId(groupId); } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeLongAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeLongAggregator.java index 1c3e00d98607a..800988187b85f 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeLongAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LastOverTimeLongAggregator.java @@ -70,6 +70,7 @@ public static final class GroupingState extends AbstractArrayState { private final BigArrays bigArrays; private LongArray timestamps; private LongArray values; + private int maxGroupId = -1; GroupingState(BigArrays bigArrays) { super(bigArrays); @@ -91,7 +92,7 @@ public static final class GroupingState extends AbstractArrayState { void collectValue(int groupId, long timestamp, long value) { if (groupId < timestamps.size()) { // TODO: handle multiple values? - if (hasValue(groupId) == false || timestamps.get(groupId) < timestamp) { + if (groupId > maxGroupId || hasValue(groupId) == false || timestamps.get(groupId) < timestamp) { timestamps.set(groupId, timestamp); values.set(groupId, value); } @@ -101,6 +102,7 @@ void collectValue(int groupId, long timestamp, long value) { timestamps.set(groupId, timestamp); values.set(groupId, value); } + maxGroupId = Math.max(maxGroupId, groupId); trackGroupId(groupId); } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-ValueOverTimeAggregator.java.st b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-ValueOverTimeAggregator.java.st index a830a4a03ef29..a19766509349e 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-ValueOverTimeAggregator.java.st +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-ValueOverTimeAggregator.java.st @@ -76,6 +76,7 @@ public class $Occurrence$OverTime$Type$Aggregator { private final BigArrays bigArrays; private LongArray timestamps; private $Type$Array values; + private int maxGroupId = -1; GroupingState(BigArrays bigArrays) { super(bigArrays); @@ -97,7 +98,7 @@ public class $Occurrence$OverTime$Type$Aggregator { void collectValue(int groupId, long timestamp, $type$ value) { if (groupId < timestamps.size()) { // TODO: handle multiple values? - if (hasValue(groupId) == false || timestamps.get(groupId) $if(Last)$<$else$>$endif$ timestamp) { + if (groupId > maxGroupId || hasValue(groupId) == false || timestamps.get(groupId) $if(Last)$<$else$>$endif$ timestamp) { timestamps.set(groupId, timestamp); values.set(groupId, value); } @@ -107,6 +108,7 @@ public class $Occurrence$OverTime$Type$Aggregator { timestamps.set(groupId, timestamp); values.set(groupId, value); } + maxGroupId = Math.max(maxGroupId, groupId); trackGroupId(groupId); } diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-settings.json b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-settings.json index 1ece98d5d5fb1..9442bb4cf8d9f 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-settings.json +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-settings.json @@ -1,7 +1,7 @@ { "index": { "mode": "time_series", - "routing_path": ["cluster", "name"], + "routing_path": ["cluster", "pod"], "time_series": { "start_time": "2024-05-10T00:00:00Z", "end_time": "2024-05-20T00:00:00Z" diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec index 68ec2598a63c6..1093225e44dd5 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec @@ -263,14 +263,14 @@ required_capability: first_over_time TS k8s | STATS max_cost=max(first_over_time(network.cost)) BY cluster, time_bucket = bucket(@timestamp,1minute) | SORT max_cost DESC, time_bucket DESC, cluster | LIMIT 10; max_cost:double | cluster:keyword | time_bucket:datetime +12.375 | prod | 2024-05-10T00:17:00.000Z +12.375 | qa | 2024-05-10T00:01:00.000Z +12.25 | prod | 2024-05-10T00:19:00.000Z +12.125 | qa | 2024-05-10T00:07:00.000Z +12.125 | staging | 2024-05-10T00:03:00.000Z +11.875 | prod | 2024-05-10T00:15:00.000Z 11.875 | qa | 2024-05-10T00:09:00.000Z +11.75 | qa | 2024-05-10T00:06:00.000Z +11.625 | prod | 2024-05-10T00:12:00.000Z 11.5 | staging | 2024-05-10T00:16:00.000Z -11.25 | prod | 2024-05-10T00:14:00.000Z -11.25 | staging | 2024-05-10T00:02:00.000Z -10.875 | qa | 2024-05-10T00:22:00.000Z -10.0 | staging | 2024-05-10T00:11:00.000Z -9.375 | qa | 2024-05-10T00:20:00.000Z -9.25 | staging | 2024-05-10T00:03:00.000Z -8.375 | staging | 2024-05-10T00:20:00.000Z -8.125 | qa | 2024-05-10T00:06:00.000Z ;