-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
handling window frame selection in case of same bound kind #16231
Open
sreemanamala
wants to merge
9
commits into
apache:master
Choose a base branch
from
sreemanamala:range-window
base: master
Could not load branches
Branch not found: {{ refName }}
Could not load tags
Nothing to show
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
f5a5fcd
Initial commit to mark negative indices for preceding in windows
somu-imply 8a6a9d8
Updating error message and handling
somu-imply 014f29a
fixing some tests
somu-imply f8d283a
Minor refactoring
somu-imply 4267443
new branch
sreemanamala 785a403
Merge pull request #6 from somu-imply/rangeWindow
sreemanamala dded3bf
handle -ve offset in computing cummulative aggregate
sreemanamala 2451ddb
handle +ve offset in computing cummulative reverse aggregate
sreemanamala 88ed0fa
handle window apertures in case of same bound kinds
sreemanamala File filter
Filter by extension
Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -75,7 +75,7 @@ public RowsAndColumns aggregateAll( | |
int lowerOffset = frame.getLowerOffset(); | ||
int upperOffset = frame.getUpperOffset(); | ||
|
||
if (numRows < lowerOffset + upperOffset + 1) { | ||
if (numRows < upperOffset - lowerOffset + 1) { | ||
// In this case, there are not enough rows to completely build up the full window aperture before it needs to | ||
// also start contracting the aperture because of the upper offset. So we use a method that specifically | ||
// handles checks for both expanding and reducing the aperture on every iteration. | ||
|
@@ -187,7 +187,7 @@ public AggInterval next() | |
groupToRowIndex(relativeGroupId(1)) | ||
), | ||
Interval.of( | ||
groupToRowIndex(relativeGroupId(-lowerOffset)), | ||
groupToRowIndex(relativeGroupId(lowerOffset < 0 ? lowerOffset : -lowerOffset)), | ||
groupToRowIndex(relativeGroupId(upperOffset)) | ||
) | ||
); | ||
|
@@ -434,7 +434,9 @@ private AppendableRowsAndColumns computeCumulativeAggregates(AggregatorFactory[] | |
// a specialized implementation of this interface against, say, a Frame object that can deal with arrays instead | ||
// of trying to optimize this generic implementation. | ||
Object[][] results = new Object[aggFactories.length][numRows]; | ||
int resultStorageIndex = 0; | ||
|
||
// if the upper offset is -ve, then we need to ignore those many rows prior to the current row | ||
int resultStorageIndex = -1 * Math.min(upperOffset, 0); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: why not use an unary |
||
|
||
AtomicInteger rowIdProvider = new AtomicInteger(0); | ||
final ColumnSelectorFactory columnSelectorFactory = ColumnSelectorFactoryMaker.fromRAC(rac).make(rowIdProvider); | ||
|
@@ -455,7 +457,7 @@ private AppendableRowsAndColumns computeCumulativeAggregates(AggregatorFactory[] | |
} | ||
|
||
// Prime the results | ||
if (rowIdProvider.get() < numRows) { | ||
if (rowIdProvider.get() < numRows && resultStorageIndex < numRows) { | ||
for (int i = 0; i < aggs.length; i++) { | ||
aggs[i].aggregate(); | ||
results[i][resultStorageIndex] = aggs[i].get(); | ||
|
@@ -468,7 +470,8 @@ private AppendableRowsAndColumns computeCumulativeAggregates(AggregatorFactory[] | |
} | ||
|
||
// From here out, we want to aggregate, peel off a row of results and then accumulate the aggregation | ||
for (int rowId = rowIdProvider.get(); rowId < numRows; ++rowId) { | ||
int lastRowIndex = numRows + Math.min(upperOffset, 0); | ||
for (int rowId = rowIdProvider.get(); rowId < lastRowIndex; ++rowId) { | ||
for (int i = 0; i < aggs.length; i++) { | ||
aggs[i].aggregate(); | ||
results[i][resultStorageIndex] = aggs[i].get(); | ||
|
@@ -521,7 +524,9 @@ private AppendableRowsAndColumns computeReverseCumulativeAggregates(AggregatorFa | |
// a specialized implementation of this interface against, say, a Frame object that can deal with arrays instead | ||
// of trying to optimize this generic implementation. | ||
Object[][] results = new Object[aggFactories.length][numRows]; | ||
int resultStorageIndex = numRows - 1; | ||
|
||
// if the lower offset is +ve, then we need to ignore those many rows following to the current row | ||
int resultStorageIndex = numRows - 1 - Math.max(lowerOffset, 0); | ||
|
||
AtomicInteger rowIdProvider = new AtomicInteger(numRows - 1); | ||
final ColumnSelectorFactory columnSelectorFactory = ColumnSelectorFactoryMaker.fromRAC(rac).make(rowIdProvider); | ||
|
@@ -534,15 +539,15 @@ private AppendableRowsAndColumns computeReverseCumulativeAggregates(AggregatorFa | |
} | ||
|
||
// If there is a lower offset, we accumulate those aggregations before starting to generate results | ||
for (int i = 0; i < lowerOffset; ++i) { | ||
for (int i = 0; i < -1 * lowerOffset; ++i) { | ||
for (Aggregator agg : aggs) { | ||
agg.aggregate(); | ||
} | ||
rowIdProvider.decrementAndGet(); | ||
} | ||
|
||
// Prime the results | ||
if (rowIdProvider.get() >= 0) { | ||
if (rowIdProvider.get() >= 0 && resultStorageIndex >= 0) { | ||
for (int i = 0; i < aggs.length; i++) { | ||
aggs[i].aggregate(); | ||
results[i][resultStorageIndex] = aggs[i].get(); | ||
|
@@ -555,7 +560,8 @@ private AppendableRowsAndColumns computeReverseCumulativeAggregates(AggregatorFa | |
} | ||
|
||
// From here out, we want to aggregate, peel off a row of results and then accumulate the aggregation | ||
for (int rowId = rowIdProvider.get(); rowId >= 0; --rowId) { | ||
int firstRowIndex = Math.max(lowerOffset, 0); | ||
for (int rowId = rowIdProvider.get(); rowId >= firstRowIndex; --rowId) { | ||
for (int i = 0; i < aggs.length; i++) { | ||
aggs[i].aggregate(); | ||
results[i][resultStorageIndex] = aggs[i].get(); | ||
|
@@ -606,23 +612,41 @@ private AppendableRowsAndColumns aggregateWindowApertureWellBehaved( | |
// 2. Our window is full, as we walk the rows we take a value off and add a new aggregation | ||
// 3. We are nearing the end of the rows, we need to start shrinking the window aperture | ||
|
||
|
||
int numRows = rac.numRows(); | ||
int windowSize = lowerOffset + upperOffset + 1; | ||
int windowSize = upperOffset + 1 - lowerOffset; | ||
|
||
// We store the results in an Object array for convenience. This is definitely sub-par from a memory management | ||
// point of view as we should use native arrays when possible. This will be fine for now, but it probably makes | ||
// sense to look at optimizing this in the future. That said, such an optimization might best come by having | ||
// a specialized implementation of this interface against, say, a Frame object that can deal with arrays instead | ||
// of trying to optimize this generic implementation. | ||
Object[][] results = new Object[aggFactories.length][numRows]; | ||
int resultStorageIndex = 0; | ||
|
||
// start the storage index after ignoring start rows if upper offset is -ve | ||
int resultStorageIndex = -1 * Math.min(upperOffset, 0); | ||
|
||
AtomicInteger rowIdProvider = new AtomicInteger(0); | ||
final ColumnSelectorFactory columnSelectorFactory = ColumnSelectorFactoryMaker.fromRAC(rac).make(rowIdProvider); | ||
|
||
// This is the number of aggregators to actually aggregate for the current row. | ||
// Which also doubles as the nextIndex to roll through as we roll things in and out of the window | ||
int nextIndex = lowerOffset + 1; | ||
int nextIndex; | ||
int upperLimit; | ||
if (upperOffset > 0 && lowerOffset > 0) { | ||
// consider this case as a regular case by moving the row provider by lowerOffset number of rows | ||
for (int i = 0; i < lowerOffset; i++) { | ||
rowIdProvider.incrementAndGet(); | ||
} | ||
nextIndex = 1; | ||
upperLimit = upperOffset - lowerOffset; | ||
} else if (lowerOffset < 0 && upperOffset < 0) { | ||
nextIndex = upperOffset - lowerOffset + 1; | ||
upperLimit = 0; | ||
} else { | ||
nextIndex = 1 - lowerOffset; | ||
upperLimit = upperOffset; | ||
} | ||
|
||
Aggregator[][] aggregators = new Aggregator[aggFactories.length][windowSize]; | ||
for (int i = 0; i < aggregators.length; i++) { | ||
|
@@ -636,7 +660,7 @@ private AppendableRowsAndColumns aggregateWindowApertureWellBehaved( | |
// The first few rows will slowly build out the window to consume the upper-offset. The window will not | ||
// be full until we have walked upperOffset number of rows, so phase 1 runs until we have consumed | ||
// upperOffset number of rows. | ||
for (int upperIndex = 0; upperIndex < upperOffset; ++upperIndex) { | ||
for (int upperIndex = 0; upperIndex < upperLimit && rowIdProvider.get() < numRows; ++upperIndex) { | ||
for (Aggregator[] aggregator : aggregators) { | ||
for (int j = 0; j < nextIndex; ++j) { | ||
aggregator[j].aggregate(); | ||
|
@@ -653,7 +677,7 @@ private AppendableRowsAndColumns aggregateWindowApertureWellBehaved( | |
// End Phase 1, Enter Phase 2. At this point, nextIndex == windowSize, rowIdProvider is the same as | ||
// upperOffset and the aggregators matrix is entirely non-null. We need to iterate until our window has all of | ||
// the aggregators in it to fill up the final result set. | ||
int endResultStorageIndex = numRows - windowSize; | ||
int endResultStorageIndex = numRows - windowSize - Math.max(lowerOffset, 0); | ||
for (; resultStorageIndex < endResultStorageIndex; ++resultStorageIndex) { | ||
for (Aggregator[] aggregator : aggregators) { | ||
for (Aggregator value : aggregator) { | ||
|
@@ -698,7 +722,8 @@ private AppendableRowsAndColumns aggregateWindowApertureWellBehaved( | |
nextIndex = 0; | ||
} | ||
|
||
for (int rowId = rowIdProvider.get(); rowId < numRows; ++rowId) { | ||
int lastRowIndex = numRows + Math.min(upperOffset, 0); | ||
for (int rowId = rowIdProvider.get(); rowId < lastRowIndex; ++rowId) { | ||
for (Aggregator[] aggregator : aggregators) { | ||
for (int j = nextIndex; j < aggregator.length; ++j) { | ||
aggregator[j].aggregate(); | ||
|
@@ -717,7 +742,8 @@ private AppendableRowsAndColumns aggregateWindowApertureWellBehaved( | |
} | ||
|
||
// End Phase 3, anything left in the window needs to be collected and put into our results | ||
for (; nextIndex < windowSize; ++nextIndex) { | ||
endResultStorageIndex = numRows - Math.max(lowerOffset, 0); | ||
for (; nextIndex < windowSize && resultStorageIndex < endResultStorageIndex; ++nextIndex) { | ||
for (int i = 0; i < aggFactories.length; ++i) { | ||
results[i][resultStorageIndex] = aggregators[i][nextIndex].get(); | ||
aggregators[i][nextIndex] = null; | ||
|
@@ -744,11 +770,20 @@ private AppendableRowsAndColumns aggregateWindowApertureInFlux( | |
// a specialized implementation of this interface against, say, a Frame object that can deal with arrays instead | ||
// of trying to optimize this generic implementation. | ||
Object[][] results = new Object[aggFactories.length][windowSize]; | ||
int resultStorageIndex = 0; | ||
|
||
// start the storage index after ignoring start rows if upper offset is -ve | ||
int resultStorageIndex = -1 * Math.min(upperOffset, 0); | ||
|
||
AtomicInteger rowIdProvider = new AtomicInteger(0); | ||
final ColumnSelectorFactory columnSelectorFactory = ColumnSelectorFactoryMaker.fromRAC(rac).make(rowIdProvider); | ||
|
||
if (upperOffset > 0 && lowerOffset > 0) { | ||
// consider this case as a regular case by moving the row provider by lowerOffset number of rows | ||
for (int i = 0; i < lowerOffset; i++) { | ||
rowIdProvider.incrementAndGet(); | ||
} | ||
} | ||
|
||
Aggregator[][] aggregators = new Aggregator[aggFactories.length][windowSize]; | ||
for (int i = 0; i < aggregators.length; i++) { | ||
final AggregatorFactory aggFactory = aggFactories[i]; | ||
|
@@ -757,11 +792,19 @@ private AppendableRowsAndColumns aggregateWindowApertureInFlux( | |
} | ||
} | ||
|
||
// This is the index to stop at for the current window aperture | ||
// The first row is used by all of the results for the lowerOffset num results, plus 1 for the "current row" | ||
int stopIndex = Math.min(lowerOffset + 1, windowSize); | ||
int startIndex, stopIndex; | ||
if (upperOffset < 0) { | ||
startIndex = -1 * upperOffset; | ||
stopIndex = Math.min(1 - lowerOffset, windowSize); | ||
} else if (lowerOffset > 0) { | ||
startIndex = 0; | ||
stopIndex = Math.min(lowerOffset, windowSize); | ||
} else { | ||
// The first row is used by all the results for the lowerOffset num results, plus 1 for the "current row" | ||
startIndex = 0; | ||
stopIndex = Math.min(1 - lowerOffset, windowSize); | ||
} | ||
|
||
int startIndex = 0; | ||
int rowId = rowIdProvider.get(); | ||
while (rowId < windowSize) { | ||
for (Aggregator[] aggregator : aggregators) { | ||
|
@@ -770,7 +813,7 @@ private AppendableRowsAndColumns aggregateWindowApertureInFlux( | |
} | ||
} | ||
|
||
if (rowId >= upperOffset) { | ||
if (rowId >= upperOffset && resultStorageIndex < windowSize) { | ||
for (int i = 0; i < aggregators.length; ++i) { | ||
results[i][resultStorageIndex] = aggregators[i][startIndex].get(); | ||
aggregators[i][startIndex].close(); | ||
|
@@ -788,7 +831,7 @@ private AppendableRowsAndColumns aggregateWindowApertureInFlux( | |
} | ||
|
||
|
||
for (; startIndex < windowSize; ++startIndex) { | ||
for (; startIndex < windowSize && resultStorageIndex < windowSize; ++startIndex) { | ||
for (int i = 0; i < aggregators.length; ++i) { | ||
results[i][resultStorageIndex] = aggregators[i][startIndex].get(); | ||
aggregators[i][startIndex].close(); | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
16 changes: 16 additions & 0 deletions
16
sql/src/test/resources/calcite/tests/window/window_range_1.sqlTest
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
type: "operatorValidation" | ||
|
||
sql: | | ||
WITH virtual_table as (select m2 ,sum(m1) as summ1 | ||
from foo group by 1 order by summ1 DESC) | ||
select m2,summ1,sum(summ1) OVER (order by m2 rows between 1 PRECEDING and 2 FOLLOWING) as sumfinal | ||
from virtual_table order by 1 | ||
|
||
|
||
expectedResults: | ||
- [1.0,1.0,6.0] | ||
- [2.0,2.0,10.0] | ||
- [3.0,3.0,14.0] | ||
- [4.0,4.0,18.0] | ||
- [5.0,5.0,15.0] | ||
- [6.0,6.0,11.0] |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this doesn't look right - this is effectively:
-Math.abs(lowerOffset)
this part was already working with relative offsets ; so I think it should be simply
relativeGroupId(lowerOffset)
- didn't that worked?