Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Preceding with negative indices in window function #15783

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public RowsAndColumns aggregateAll(
int lowerOffset = frame.getLowerOffset();
int upperOffset = frame.getUpperOffset();

if (numRows < lowerOffset + upperOffset + 1) {
if (numRows < upperOffset - lowerOffset + 1) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should also probably take a look at: WindowFrame#getLowerOffsetClamped
and the callsites of that method in this file

by the way: I think we should retire the old rows processing logic and leave that as well to the one handling the range stuff - it should be on-par in performance; but could handle some edge cases a bit better.

// In this case, there are not enough rows to completely build up the full window aperture before it needs to
// also start contracting the aperture because of the upper offset. So we use a method that specifically
// handles checks for both expanding and reducing the aperture on every iteration.
Expand Down Expand Up @@ -187,7 +187,7 @@ public AggInterval next()
groupToRowIndex(relativeGroupId(1))
),
Interval.of(
groupToRowIndex(relativeGroupId(-lowerOffset)),
groupToRowIndex(relativeGroupId(lowerOffset < 0 ? lowerOffset : -lowerOffset)),
groupToRowIndex(relativeGroupId(upperOffset))
)
);
Expand Down Expand Up @@ -606,8 +606,9 @@ private AppendableRowsAndColumns aggregateWindowApertureWellBehaved(
// 2. Our window is full, as we walk the rows we take a value off and add a new aggregation
// 3. We are nearing the end of the rows, we need to start shrinking the window aperture


int numRows = rac.numRows();
int windowSize = lowerOffset + upperOffset + 1;
int windowSize = upperOffset + 1 - lowerOffset;

// We store the results in an Object array for convenience. This is definitely sub-par from a memory management
// point of view as we should use native arrays when possible. This will be fine for now, but it probably makes
Expand All @@ -622,7 +623,18 @@ private AppendableRowsAndColumns aggregateWindowApertureWellBehaved(

// This is the number of aggregators to actually aggregate for the current row.
// Which also doubles as the nextIndex to roll through as we roll things in and out of the window
int nextIndex = lowerOffset + 1;
int nextIndex;
int upperLimit;
if (upperOffset > 0 && lowerOffset > 0) {
nextIndex = 1;
upperLimit = upperOffset - lowerOffset;
} else if (lowerOffset < 0 && upperOffset < 0) {
nextIndex = upperOffset - lowerOffset + 1;
upperLimit = 0;
} else {
nextIndex = 1 - lowerOffset;
upperLimit = upperOffset;
}

Aggregator[][] aggregators = new Aggregator[aggFactories.length][windowSize];
for (int i = 0; i < aggregators.length; i++) {
Expand All @@ -636,7 +648,7 @@ private AppendableRowsAndColumns aggregateWindowApertureWellBehaved(
// The first few rows will slowly build out the window to consume the upper-offset. The window will not
// be full until we have walked upperOffset number of rows, so phase 1 runs until we have consumed
// upperOffset number of rows.
for (int upperIndex = 0; upperIndex < upperOffset; ++upperIndex) {
for (int upperIndex = 0; upperIndex < upperLimit; ++upperIndex) {
for (Aggregator[] aggregator : aggregators) {
for (int j = 0; j < nextIndex; ++j) {
aggregator[j].aggregate();
Expand Down Expand Up @@ -759,7 +771,7 @@ private AppendableRowsAndColumns aggregateWindowApertureInFlux(

// This is the index to stop at for the current window aperture
// The first row is used by all of the results for the lowerOffset num results, plus 1 for the "current row"
int stopIndex = Math.min(lowerOffset + 1, windowSize);
int stopIndex = Math.min(Math.abs(lowerOffset) + 1, windowSize);

int startIndex = 0;
int rowId = rowIdProvider.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public void testWindowedAggregationWindowSmallerThanRows()
FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);

final RowsAndColumns results = agger.aggregateAll(
new WindowFrame(WindowFrame.PeerType.ROWS, false, 1, false, 2, null),
new WindowFrame(WindowFrame.PeerType.ROWS, false, -1, false, 2, null),
new AggregatorFactory[]{
new LongSumAggregatorFactory("sumFromLong", "intCol"),
new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
Expand Down Expand Up @@ -143,7 +143,7 @@ public void testWindowedAggregationWindowSmallerThanRowsOnlyLower()
FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);

final RowsAndColumns results = agger.aggregateAll(
new WindowFrame(WindowFrame.PeerType.ROWS, false, 2, false, 0, null),
new WindowFrame(WindowFrame.PeerType.ROWS, false, -2, false, 0, null),
new AggregatorFactory[]{
new LongSumAggregatorFactory("sumFromLong", "intCol"),
new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
Expand All @@ -169,7 +169,7 @@ public void testWindowedAggregationWindowLargerThanRows()
FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);

final RowsAndColumns results = agger.aggregateAll(
new WindowFrame(WindowFrame.PeerType.ROWS, false, 5, false, 7, null),
new WindowFrame(WindowFrame.PeerType.ROWS, false, -5, false, 7, null),
new AggregatorFactory[]{
new LongSumAggregatorFactory("sumFromLong", "intCol"),
new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
Expand Down Expand Up @@ -197,7 +197,7 @@ public void testWindowedAggregationLowerLargerThanRows()
FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);

final RowsAndColumns results = agger.aggregateAll(
new WindowFrame(WindowFrame.PeerType.ROWS, false, 5, false, 1, null),
new WindowFrame(WindowFrame.PeerType.ROWS, false, -5, false, 1, null),
new AggregatorFactory[]{
new LongSumAggregatorFactory("sumFromLong", "intCol"),
new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
Expand Down Expand Up @@ -225,7 +225,7 @@ public void testWindowedAggregationLowerLargerThanRowsNoUpper()
FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);

final RowsAndColumns results = agger.aggregateAll(
new WindowFrame(WindowFrame.PeerType.ROWS, false, 5, false, 0, null),
new WindowFrame(WindowFrame.PeerType.ROWS, false, -5, false, 0, null),
new AggregatorFactory[]{
new LongSumAggregatorFactory("sumFromLong", "intCol"),
new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
Expand Down Expand Up @@ -337,7 +337,7 @@ public void testWindowedAggregationWindowLargerThanRowsOnlyLower()
FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);

final RowsAndColumns results = agger.aggregateAll(
new WindowFrame(WindowFrame.PeerType.ROWS, false, 5, false, 0, null),
new WindowFrame(WindowFrame.PeerType.ROWS, false, -5, false, 0, null),
new AggregatorFactory[]{
new LongSumAggregatorFactory("sumFromLong", "intCol"),
new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ public WindowFrame getWindowFrame()
if (group.lowerBound.isUnbounded() && group.upperBound.isUnbounded()) {
return WindowFrame.unbounded();
}

return new WindowFrame(
group.isRows ? WindowFrame.PeerType.ROWS : WindowFrame.PeerType.RANGE,
group.lowerBound.isUnbounded(),
Expand All @@ -373,7 +374,8 @@ private int figureOutOffset(RexWindowBound bound)
if (bound.isUnbounded() || bound.isCurrentRow()) {
return 0;
}
return getConstant(((RexInputRef) bound.getOffset()).getIndex());
final int value = getConstant(((RexInputRef) bound.getOffset()).getIndex());
return bound.isPreceding() ? -value : value;
}

private int getConstant(int refIndex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ expectedOperators:
frame:
peerType: "RANGE"
lowUnbounded: false
lowOffset: 3
lowOffset: -3
uppUnbounded: false
uppOffset: 2
orderBy:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ expectedOperators:
frame:
peerType: "RANGE"
lowUnbounded: false
lowOffset: 3
lowOffset: -3
uppUnbounded: false
uppOffset: 2
orderBy: [ {column: "d1", direction: ASC} ]
Expand Down
16 changes: 16 additions & 0 deletions sql/src/test/resources/calcite/tests/window/window_range_1.sqlTest
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
type: "operatorValidation"

sql: |
WITH virtual_table as (select m2 ,sum(m1) as summ1
from foo group by 1 order by summ1 DESC)
select m2,summ1,sum(summ1) OVER (order by m2 rows between 1 PRECEDING and 2 FOLLOWING) as sumfinal
from virtual_table order by 1


expectedResults:
- [1.0,1.0,6.0]
- [2.0,2.0,10.0]
- [3.0,3.0,14.0]
- [4.0,4.0,18.0]
- [5.0,5.0,15.0]
- [6.0,6.0,11.0]