Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handling window frame selection in case of same bound kind #16231

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public RowsAndColumns aggregateAll(
int lowerOffset = frame.getLowerOffset();
int upperOffset = frame.getUpperOffset();

if (numRows < lowerOffset + upperOffset + 1) {
if (numRows < upperOffset - lowerOffset + 1) {
// In this case, there are not enough rows to completely build up the full window aperture before it needs to
// also start contracting the aperture because of the upper offset. So we use a method that specifically
// handles checks for both expanding and reducing the aperture on every iteration.
Expand Down Expand Up @@ -187,7 +187,7 @@ public AggInterval next()
groupToRowIndex(relativeGroupId(1))
),
Interval.of(
groupToRowIndex(relativeGroupId(-lowerOffset)),
groupToRowIndex(relativeGroupId(lowerOffset < 0 ? lowerOffset : -lowerOffset)),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't look right - this is effectively: -Math.abs(lowerOffset)
this part was already working with relative offsets ; so I think it should be simply relativeGroupId(lowerOffset) - didn't that worked?

groupToRowIndex(relativeGroupId(upperOffset))
)
);
Expand Down Expand Up @@ -434,7 +434,9 @@ private AppendableRowsAndColumns computeCumulativeAggregates(AggregatorFactory[]
// a specialized implementation of this interface against, say, a Frame object that can deal with arrays instead
// of trying to optimize this generic implementation.
Object[][] results = new Object[aggFactories.length][numRows];
int resultStorageIndex = 0;

// if the upper offset is -ve, then we need to ignore those many rows prior to the current row
int resultStorageIndex = -1 * Math.min(upperOffset, 0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why not use an unary - ?


AtomicInteger rowIdProvider = new AtomicInteger(0);
final ColumnSelectorFactory columnSelectorFactory = ColumnSelectorFactoryMaker.fromRAC(rac).make(rowIdProvider);
Expand All @@ -455,7 +457,7 @@ private AppendableRowsAndColumns computeCumulativeAggregates(AggregatorFactory[]
}

// Prime the results
if (rowIdProvider.get() < numRows) {
if (rowIdProvider.get() < numRows && resultStorageIndex < numRows) {
for (int i = 0; i < aggs.length; i++) {
aggs[i].aggregate();
results[i][resultStorageIndex] = aggs[i].get();
Expand All @@ -468,7 +470,8 @@ private AppendableRowsAndColumns computeCumulativeAggregates(AggregatorFactory[]
}

// From here out, we want to aggregate, peel off a row of results and then accumulate the aggregation
for (int rowId = rowIdProvider.get(); rowId < numRows; ++rowId) {
int lastRowIndex = numRows + Math.min(upperOffset, 0);
for (int rowId = rowIdProvider.get(); rowId < lastRowIndex; ++rowId) {
for (int i = 0; i < aggs.length; i++) {
aggs[i].aggregate();
results[i][resultStorageIndex] = aggs[i].get();
Expand Down Expand Up @@ -521,7 +524,9 @@ private AppendableRowsAndColumns computeReverseCumulativeAggregates(AggregatorFa
// a specialized implementation of this interface against, say, a Frame object that can deal with arrays instead
// of trying to optimize this generic implementation.
Object[][] results = new Object[aggFactories.length][numRows];
int resultStorageIndex = numRows - 1;

// if the lower offset is +ve, then we need to ignore those many rows following to the current row
int resultStorageIndex = numRows - 1 - Math.max(lowerOffset, 0);

AtomicInteger rowIdProvider = new AtomicInteger(numRows - 1);
final ColumnSelectorFactory columnSelectorFactory = ColumnSelectorFactoryMaker.fromRAC(rac).make(rowIdProvider);
Expand All @@ -534,15 +539,15 @@ private AppendableRowsAndColumns computeReverseCumulativeAggregates(AggregatorFa
}

// If there is a lower offset, we accumulate those aggregations before starting to generate results
for (int i = 0; i < lowerOffset; ++i) {
for (int i = 0; i < -1 * lowerOffset; ++i) {
for (Aggregator agg : aggs) {
agg.aggregate();
}
rowIdProvider.decrementAndGet();
}

// Prime the results
if (rowIdProvider.get() >= 0) {
if (rowIdProvider.get() >= 0 && resultStorageIndex >= 0) {
for (int i = 0; i < aggs.length; i++) {
aggs[i].aggregate();
results[i][resultStorageIndex] = aggs[i].get();
Expand All @@ -555,7 +560,8 @@ private AppendableRowsAndColumns computeReverseCumulativeAggregates(AggregatorFa
}

// From here out, we want to aggregate, peel off a row of results and then accumulate the aggregation
for (int rowId = rowIdProvider.get(); rowId >= 0; --rowId) {
int firstRowIndex = Math.max(lowerOffset, 0);
for (int rowId = rowIdProvider.get(); rowId >= firstRowIndex; --rowId) {
for (int i = 0; i < aggs.length; i++) {
aggs[i].aggregate();
results[i][resultStorageIndex] = aggs[i].get();
Expand Down Expand Up @@ -606,23 +612,41 @@ private AppendableRowsAndColumns aggregateWindowApertureWellBehaved(
// 2. Our window is full, as we walk the rows we take a value off and add a new aggregation
// 3. We are nearing the end of the rows, we need to start shrinking the window aperture


int numRows = rac.numRows();
int windowSize = lowerOffset + upperOffset + 1;
int windowSize = upperOffset + 1 - lowerOffset;

// We store the results in an Object array for convenience. This is definitely sub-par from a memory management
// point of view as we should use native arrays when possible. This will be fine for now, but it probably makes
// sense to look at optimizing this in the future. That said, such an optimization might best come by having
// a specialized implementation of this interface against, say, a Frame object that can deal with arrays instead
// of trying to optimize this generic implementation.
Object[][] results = new Object[aggFactories.length][numRows];
int resultStorageIndex = 0;

// start the storage index after ignoring start rows if upper offset is -ve
int resultStorageIndex = -1 * Math.min(upperOffset, 0);

AtomicInteger rowIdProvider = new AtomicInteger(0);
final ColumnSelectorFactory columnSelectorFactory = ColumnSelectorFactoryMaker.fromRAC(rac).make(rowIdProvider);

// This is the number of aggregators to actually aggregate for the current row.
// Which also doubles as the nextIndex to roll through as we roll things in and out of the window
int nextIndex = lowerOffset + 1;
int nextIndex;
int upperLimit;
if (upperOffset > 0 && lowerOffset > 0) {
// consider this case as a regular case by moving the row provider by lowerOffset number of rows
for (int i = 0; i < lowerOffset; i++) {
rowIdProvider.incrementAndGet();
}
nextIndex = 1;
upperLimit = upperOffset - lowerOffset;
} else if (lowerOffset < 0 && upperOffset < 0) {
nextIndex = upperOffset - lowerOffset + 1;
upperLimit = 0;
} else {
nextIndex = 1 - lowerOffset;
upperLimit = upperOffset;
}

Aggregator[][] aggregators = new Aggregator[aggFactories.length][windowSize];
for (int i = 0; i < aggregators.length; i++) {
Expand All @@ -636,7 +660,7 @@ private AppendableRowsAndColumns aggregateWindowApertureWellBehaved(
// The first few rows will slowly build out the window to consume the upper-offset. The window will not
// be full until we have walked upperOffset number of rows, so phase 1 runs until we have consumed
// upperOffset number of rows.
for (int upperIndex = 0; upperIndex < upperOffset; ++upperIndex) {
for (int upperIndex = 0; upperIndex < upperLimit && rowIdProvider.get() < numRows; ++upperIndex) {
for (Aggregator[] aggregator : aggregators) {
for (int j = 0; j < nextIndex; ++j) {
aggregator[j].aggregate();
Expand All @@ -653,7 +677,7 @@ private AppendableRowsAndColumns aggregateWindowApertureWellBehaved(
// End Phase 1, Enter Phase 2. At this point, nextIndex == windowSize, rowIdProvider is the same as
// upperOffset and the aggregators matrix is entirely non-null. We need to iterate until our window has all of
// the aggregators in it to fill up the final result set.
int endResultStorageIndex = numRows - windowSize;
int endResultStorageIndex = numRows - windowSize - Math.max(lowerOffset, 0);
for (; resultStorageIndex < endResultStorageIndex; ++resultStorageIndex) {
for (Aggregator[] aggregator : aggregators) {
for (Aggregator value : aggregator) {
Expand Down Expand Up @@ -698,7 +722,8 @@ private AppendableRowsAndColumns aggregateWindowApertureWellBehaved(
nextIndex = 0;
}

for (int rowId = rowIdProvider.get(); rowId < numRows; ++rowId) {
int lastRowIndex = numRows + Math.min(upperOffset, 0);
for (int rowId = rowIdProvider.get(); rowId < lastRowIndex; ++rowId) {
for (Aggregator[] aggregator : aggregators) {
for (int j = nextIndex; j < aggregator.length; ++j) {
aggregator[j].aggregate();
Expand All @@ -717,7 +742,8 @@ private AppendableRowsAndColumns aggregateWindowApertureWellBehaved(
}

// End Phase 3, anything left in the window needs to be collected and put into our results
for (; nextIndex < windowSize; ++nextIndex) {
endResultStorageIndex = numRows - Math.max(lowerOffset, 0);
for (; nextIndex < windowSize && resultStorageIndex < endResultStorageIndex; ++nextIndex) {
for (int i = 0; i < aggFactories.length; ++i) {
results[i][resultStorageIndex] = aggregators[i][nextIndex].get();
aggregators[i][nextIndex] = null;
Expand All @@ -744,11 +770,20 @@ private AppendableRowsAndColumns aggregateWindowApertureInFlux(
// a specialized implementation of this interface against, say, a Frame object that can deal with arrays instead
// of trying to optimize this generic implementation.
Object[][] results = new Object[aggFactories.length][windowSize];
int resultStorageIndex = 0;

// start the storage index after ignoring start rows if upper offset is -ve
int resultStorageIndex = -1 * Math.min(upperOffset, 0);

AtomicInteger rowIdProvider = new AtomicInteger(0);
final ColumnSelectorFactory columnSelectorFactory = ColumnSelectorFactoryMaker.fromRAC(rac).make(rowIdProvider);

if (upperOffset > 0 && lowerOffset > 0) {
// consider this case as a regular case by moving the row provider by lowerOffset number of rows
for (int i = 0; i < lowerOffset; i++) {
rowIdProvider.incrementAndGet();
}
}

Aggregator[][] aggregators = new Aggregator[aggFactories.length][windowSize];
for (int i = 0; i < aggregators.length; i++) {
final AggregatorFactory aggFactory = aggFactories[i];
Expand All @@ -757,11 +792,19 @@ private AppendableRowsAndColumns aggregateWindowApertureInFlux(
}
}

// This is the index to stop at for the current window aperture
// The first row is used by all of the results for the lowerOffset num results, plus 1 for the "current row"
int stopIndex = Math.min(lowerOffset + 1, windowSize);
int startIndex, stopIndex;
if (upperOffset < 0) {
startIndex = -1 * upperOffset;
stopIndex = Math.min(1 - lowerOffset, windowSize);
} else if (lowerOffset > 0) {
startIndex = 0;
stopIndex = Math.min(lowerOffset, windowSize);
} else {
// The first row is used by all the results for the lowerOffset num results, plus 1 for the "current row"
startIndex = 0;
stopIndex = Math.min(1 - lowerOffset, windowSize);
}

int startIndex = 0;
int rowId = rowIdProvider.get();
while (rowId < windowSize) {
for (Aggregator[] aggregator : aggregators) {
Expand All @@ -770,7 +813,7 @@ private AppendableRowsAndColumns aggregateWindowApertureInFlux(
}
}

if (rowId >= upperOffset) {
if (rowId >= upperOffset && resultStorageIndex < windowSize) {
for (int i = 0; i < aggregators.length; ++i) {
results[i][resultStorageIndex] = aggregators[i][startIndex].get();
aggregators[i][startIndex].close();
Expand All @@ -788,7 +831,7 @@ private AppendableRowsAndColumns aggregateWindowApertureInFlux(
}


for (; startIndex < windowSize; ++startIndex) {
for (; startIndex < windowSize && resultStorageIndex < windowSize; ++startIndex) {
for (int i = 0; i < aggregators.length; ++i) {
results[i][resultStorageIndex] = aggregators[i][startIndex].get();
aggregators[i][startIndex].close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public void testWindowedAggregationWindowSmallerThanRows()
FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);

final RowsAndColumns results = agger.aggregateAll(
new WindowFrame(WindowFrame.PeerType.ROWS, false, 1, false, 2, null),
new WindowFrame(WindowFrame.PeerType.ROWS, false, -1, false, 2, null),
new AggregatorFactory[]{
new LongSumAggregatorFactory("sumFromLong", "intCol"),
new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
Expand Down Expand Up @@ -143,7 +143,7 @@ public void testWindowedAggregationWindowSmallerThanRowsOnlyLower()
FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);

final RowsAndColumns results = agger.aggregateAll(
new WindowFrame(WindowFrame.PeerType.ROWS, false, 2, false, 0, null),
new WindowFrame(WindowFrame.PeerType.ROWS, false, -2, false, 0, null),
new AggregatorFactory[]{
new LongSumAggregatorFactory("sumFromLong", "intCol"),
new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
Expand All @@ -169,7 +169,7 @@ public void testWindowedAggregationWindowLargerThanRows()
FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);

final RowsAndColumns results = agger.aggregateAll(
new WindowFrame(WindowFrame.PeerType.ROWS, false, 5, false, 7, null),
new WindowFrame(WindowFrame.PeerType.ROWS, false, -5, false, 7, null),
new AggregatorFactory[]{
new LongSumAggregatorFactory("sumFromLong", "intCol"),
new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
Expand Down Expand Up @@ -197,7 +197,7 @@ public void testWindowedAggregationLowerLargerThanRows()
FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);

final RowsAndColumns results = agger.aggregateAll(
new WindowFrame(WindowFrame.PeerType.ROWS, false, 5, false, 1, null),
new WindowFrame(WindowFrame.PeerType.ROWS, false, -5, false, 1, null),
new AggregatorFactory[]{
new LongSumAggregatorFactory("sumFromLong", "intCol"),
new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
Expand Down Expand Up @@ -225,7 +225,7 @@ public void testWindowedAggregationLowerLargerThanRowsNoUpper()
FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);

final RowsAndColumns results = agger.aggregateAll(
new WindowFrame(WindowFrame.PeerType.ROWS, false, 5, false, 0, null),
new WindowFrame(WindowFrame.PeerType.ROWS, false, -5, false, 0, null),
new AggregatorFactory[]{
new LongSumAggregatorFactory("sumFromLong", "intCol"),
new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
Expand Down Expand Up @@ -337,7 +337,7 @@ public void testWindowedAggregationWindowLargerThanRowsOnlyLower()
FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);

final RowsAndColumns results = agger.aggregateAll(
new WindowFrame(WindowFrame.PeerType.ROWS, false, 5, false, 0, null),
new WindowFrame(WindowFrame.PeerType.ROWS, false, -5, false, 0, null),
new AggregatorFactory[]{
new LongSumAggregatorFactory("sumFromLong", "intCol"),
new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,16 +126,6 @@ public void validateWindow(SqlNode windowOrId, SqlValidatorScope scope, @Nullabl
);
}

if (isPrecedingOrFollowing(lowerBound) &&
isPrecedingOrFollowing(upperBound) &&
lowerBound.getKind() == upperBound.getKind()) {
// this limitation can be lifted when https://github.com/apache/druid/issues/15739 is addressed
throw buildCalciteContextException(
"Query bounds with both lower and upper bounds as PRECEDING or FOLLOWING is not supported.",
windowOrId
);
}

boolean hasBounds = lowerBound != null || upperBound != null;
if (call.getKind() == SqlKind.NTILE && hasBounds) {
throw buildCalciteContextException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ public WindowFrame getWindowFrame()
if (group.lowerBound.isUnbounded() && group.upperBound.isUnbounded()) {
return WindowFrame.unbounded();
}

return new WindowFrame(
group.isRows ? WindowFrame.PeerType.ROWS : WindowFrame.PeerType.RANGE,
group.lowerBound.isUnbounded(),
Expand All @@ -374,7 +375,8 @@ private int figureOutOffset(RexWindowBound bound)
if (bound.isUnbounded() || bound.isCurrentRow()) {
return 0;
}
return getConstant(((RexInputRef) bound.getOffset()).getIndex());
final int value = getConstant(((RexInputRef) bound.getOffset()).getIndex());
return bound.isPreceding() ? -value : value;
}

private int getConstant(int refIndex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ expectedOperators:
frame:
peerType: "RANGE"
lowUnbounded: false
lowOffset: 3
lowOffset: -3
uppUnbounded: false
uppOffset: 2
orderBy:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ expectedOperators:
frame:
peerType: "RANGE"
lowUnbounded: false
lowOffset: 3
lowOffset: -3
uppUnbounded: false
uppOffset: 2
orderBy: [ {column: "d1", direction: ASC} ]
Expand Down
16 changes: 16 additions & 0 deletions sql/src/test/resources/calcite/tests/window/window_range_1.sqlTest
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
type: "operatorValidation"

sql: |
WITH virtual_table as (select m2 ,sum(m1) as summ1
from foo group by 1 order by summ1 DESC)
select m2,summ1,sum(summ1) OVER (order by m2 rows between 1 PRECEDING and 2 FOLLOWING) as sumfinal
from virtual_table order by 1


expectedResults:
- [1.0,1.0,6.0]
- [2.0,2.0,10.0]
- [3.0,3.0,14.0]
- [4.0,4.0,18.0]
- [5.0,5.0,15.0]
- [6.0,6.0,11.0]