Add polymorphic window functions#18169
Conversation
e81fbd0 to
de1f0c3
Compare
There was a problem hiding this comment.
Pull request overview
This PR adds type-specific window value aggregators (and corresponding factory dispatch) so window aggregations preserve precision for integral and BIG_DECIMAL types, and reduces boxing for MIN/MAX on primitive types in pinot-query-runtime.
Changes:
- Update
WindowValueAggregatorFactoryto dispatchSUM/MIN/MAXaggregators based oncolumnDataType.getStoredType(). - Add new polymorphic implementations:
SumLong*,SumBigDecimal*, primitiveMin/Max(Int|Long)*, andMin/MaxComparable*fallbacks; rename existing double-based implementations for clarity. - Add a comprehensive
WindowValueAggregatorTestsuite covering factory dispatch, null/removal behavior, and precision.
Reviewed changes
Copilot reviewed 15 out of 15 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/aggregate/WindowValueAggregatorFactory.java | Dispatch to type-specific SUM/MIN/MAX implementations using stored type. |
| pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/aggregate/SumLongWindowValueAggregator.java | New long-accumulating SUM to avoid double precision loss for large integral values. |
| pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/aggregate/SumDoubleWindowValueAggregator.java | Rename/refocus existing SUM implementation as double-based. |
| pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/aggregate/SumBigDecimalWindowValueAggregator.java | New BigDecimal SUM implementation intended to preserve decimal precision. |
| pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/aggregate/MinIntWindowValueAggregator.java | New primitive INT MIN with fastutil deque for sliding windows. |
| pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/aggregate/MaxIntWindowValueAggregator.java | New primitive INT MAX with fastutil deque for sliding windows. |
| pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/aggregate/MinLongWindowValueAggregator.java | New primitive LONG MIN with fastutil deque (precision-safe vs double). |
| pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/aggregate/MaxLongWindowValueAggregator.java | New primitive LONG MAX with fastutil deque (precision-safe vs double). |
| pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/aggregate/MinDoubleWindowValueAggregator.java | Rename existing MIN implementation as double-based. |
| pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/aggregate/MaxDoubleWindowValueAggregator.java | Rename existing MAX implementation as double-based. |
| pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/aggregate/MinComparableWindowValueAggregator.java | New Comparable-based MIN fallback to preserve non-double types. |
| pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/aggregate/MaxComparableWindowValueAggregator.java | New Comparable-based MAX fallback to preserve non-double types. |
| pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/aggregate/BoolAndWindowValueAggregator.java | Rename class to match file name and window-aggregator naming. |
| pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/aggregate/BoolOrWindowValueAggregator.java | Rename class to match file name and window-aggregator naming. |
| pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/window/aggregate/WindowValueAggregatorTest.java | New unit tests covering factory dispatch and aggregator behaviors/precision. |
| if (value instanceof BigDecimal) { | ||
| return (BigDecimal) value; | ||
| } | ||
| return BigDecimal.valueOf(((Number) value).doubleValue()); |
There was a problem hiding this comment.
Yeah it makes sense, but practically this probably won't be reachable because the factory routes INT / LONG to SumLongWindowValueAggregator, so values arriving at SumBigDecimalWindowValueAggregator should already be BigDecimal instances. It's a low effort fix worth doing though so I've made it.
| @Test(expectedExceptions = UnsupportedOperationException.class) | ||
| public void testComparableMinRemovalUnsupported() { | ||
| WindowValueAggregator<Object> agg = new MinComparableWindowValueAggregator(false); | ||
| agg.addValue(1); | ||
| agg.removeValue(1); | ||
| } |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #18169 +/- ##
============================================
+ Coverage 63.31% 63.35% +0.04%
Complexity 1627 1627
============================================
Files 3229 3237 +8
Lines 196707 196930 +223
Branches 30408 30444 +36
============================================
+ Hits 124538 124773 +235
+ Misses 62192 62159 -33
- Partials 9977 9998 +21
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
|
||
| @Override | ||
| public void addValue(@Nullable Object value) { | ||
| if (value != null) { |
There was a problem hiding this comment.
Not introduced in this PR, but how do we handle null first/last?
There was a problem hiding this comment.
In the current implementation (both pre- and post-PR), nulls are simply skipped in addValue() / removeValue() - they don't participate in MIN/MAX (and other aggregations), which is standard SQL aggregate-over-nulls behavior. For other non aggregate window functions like FIRST_VALUE, LAST_VALUE, we support customizable null handling using the standard IGNORE NULLS / RESPECT NULLS modifiers - #14264. The actual null ordering gets pushed into the sort created below the window by the planner.
| while (!_deque.isEmpty() && _deque.lastLong() > longValue) { | ||
| _deque.dequeueLastLong(); | ||
| } |
There was a problem hiding this comment.
Just for me to understand: It looks like in _deque we have N copies of the min value. It should be more efficient to just keep the min and the number of repetitions we had, right? I guess I'm missing something.
There was a problem hiding this comment.
This is using the classic monotonic deque (ascending minima) pattern for O(N) amortized sliding-window min / max. The deque does not store N copies of the min. It maintains a monotonically non-decreasing sequence of future minimum candidates:
- On
addValue(x): pop from back while back > x, then push x. The deque stays sorted ascending. - On
removeValue(x): iffront == x, pop from front. getCurrentAggregatedValue(): return front (the current window minimum).
Example for window sliding over [3, 1, 4, 1, 5]:
- Add 3: deque = [3]
- Add 1: deque = [1] (3 removed, since 3 > 1)
- Add 4: deque = [1, 4]
- Remove 3: front is 1 != 3, no-op (3 was already purged)
- Add 1: deque = [1, 1] (4 removed, since 4 > 1)
- Remove 1: front == 1, pop → deque = [1]
There was a problem hiding this comment.
A "min + count" approach would fail because when the last copy of the min is removed, you'd need an O(K) rescan to find the next min.
27c2b07 to
60a085a
Compare
|
Documentation PR has been created for the polymorphic window function precision improvements: pinot-contrib/pinot-docs#743 The docs PR adds information about the type-specific aggregators (SumLongWindowValueAggregator, SumBigDecimalWindowValueAggregator, and primitive-based MIN/MAX aggregators) that preserve precision for LONG and BIG_DECIMAL types. |
…ache/pinot#18169) Merged PR 743: Documents the type-specific aggregator improvements in window functions introduced in apache/pinot#18169.
WindowValueAggregatorFactoryby adding type-specific window value aggregator implementationsSumLongWindowValueAggregatorforINT/LONGto avoid precision loss when summing large long values (> 2^53)SumBigDecimalWindowValueAggregatorforBIG_DECIMALto preserve full decimal precisionMinIntWindowValueAggregator/MaxIntWindowValueAggregatorusing fastutilIntArrayFIFOQueueMinLongWindowValueAggregator/MaxLongWindowValueAggregatorusing fastutilLongArrayFIFOQueueMinComparableWindowValueAggregator/MaxComparableWindowValueAggregatoras fallback for types likeBIG_DECIMALcolumnDataType.getStoredType()instead of always using double-based aggregators