New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-5047] [table] Add sliding group-windows for batch tables #3364
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @twalthr.
I think it looks pretty good. Can we split the PR into two time and count windows? There are lots for different cases to consider when reviewing the code. The PR also needs to be rebased to the current master and the new interface for AggregationFunctions.
After rebasing, the time windows should be pretty much good to merge.
The missing parts for the count windows could be addressed in a second PR.
What do you think?
Fabian
asLong(slide), | ||
returnType) | ||
} else { | ||
// for non-incremental aggregations |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this case? Without partial aggregation, we cannot use tumbling windows to compute sliding windows. So this method is only called if all aggregates support partial aggregation, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. I will remove this case distinction.
if (start > windowStart - windowSize) { | ||
|
||
// prepare output | ||
for (i <- aggregates.indices) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this just copying data from record to the aggregateBuffer?
Doesn't the input record have the same schema as the output record? Isn't is sufficient emit the input record multiple times with adapted outWindowStartIndex
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. Will simplify that.
* @param windowSlide window slide of the sliding window | ||
* @param returnType return type of this function | ||
*/ | ||
class DataSetSlideCountWindowAggReduceGroupFunction( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it makes sense to add forward field annotations for the internal operators.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will create an issue for this.
} | ||
|
||
// this code path should never be reached as we return before the loop finishes | ||
throw new IllegalArgumentException("Group is empty. This should never happen.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can be removed. Combine is only called if there is at least one record.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I cannot remove it. The compiler complains otherwise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, but how about we make that more clear with a comment like:
// This will never happen because the iterator is never null but we have to satisfy the compiler.
private var outWindowStartIndex: Int = _ | ||
|
||
override def open(config: Configuration) { | ||
Preconditions.checkNotNull(aggregates) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can move everything into the constructor and remove the open()
method.
} | ||
|
||
// this code path should never be reached as we return before the loop finishes | ||
throw new IllegalArgumentException("Group is empty. This should never happen.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can be removed. It's the responsibility of the DataSet API to call the user functions correctly. And even if it would not, this function would behave correctly.
private var outWindowStartIndex: Int = _ | ||
|
||
override def open(config: Configuration) { | ||
Preconditions.checkNotNull(aggregates) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move checks and initialization to constructor and remove open()
?
private var output: Row = _ | ||
|
||
override def open(config: Configuration) { | ||
Preconditions.checkNotNull(aggregates) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Except for the initialization of TimeWindowPropertyCollector
everything can be moved into the constructor.
@@ -360,6 +360,19 @@ class GroupWindowTest extends TableTestBase { | |||
.window(Session withGap 7.milli as 'w) // require on a time attribute | |||
.groupBy('string, 'w) | |||
.select('string, 'int.count) | |||
|
|||
val expected = unaryNode( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you add the expected result to a test which is expected to fail?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems that I haven't seen the exception. Thanks.
@@ -146,42 +146,9 @@ class AggregationsITCase extends StreamingMultipleProgramsTestBase { | |||
"Hi,1,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005") | |||
assertEquals(expected.sorted, StreamITCase.testResults.sorted) | |||
} | |||
|
|||
@Test | |||
def testEventTimeSlidingWindow(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this test subsumed by DataStreamAggregateITCase
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
@fhueske thanks for your comments. I will create a time window PR. |
Thanks for the feedback @fhueske. I updated the PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @twalthr,
thanks for the update. I had a few minor comments.
Feel free to merge once these are resolved.
Cheers, Fabian
grouping.length) | ||
|
||
// only pre-tumble if it is worth it | ||
val littleTumblingSize = determineLargestTumblingSize(size, slide) <= 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isLittleTumblingSize
|
||
Preconditions.checkNotNull(aggregatesLength) | ||
|
||
private var intermediateRow: Row = _ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can be removed because we are replicating the input record.
// add one field to store window start | ||
private val intermediateRowArity: Int = groupingKeysLength + aggregatesLength + 1 | ||
|
||
override def open(config: Configuration) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
open()
can be removed
|
||
private var intermediateRow: Row = _ | ||
// add one field to store window start | ||
private val intermediateRowArity: Int = groupingKeysLength + aggregatesLength + 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can be removed
* @param returnType return type of this function | ||
*/ | ||
class DataSetSlideTimeWindowAggFlatMapFunction( | ||
private val aggregatesLength: Int, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aggregatesLength
and groupingKeysLength
can be removed
isParserCaseSensitive: Boolean) | ||
: FlatMapFunction[Row, Row] = { | ||
|
||
val aggregates = transformToAggregateFunctions( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not needed because input type = output type
*/ | ||
def createDataSetSlideWindowPrepareFlatMapFunction( | ||
window: LogicalWindow, | ||
namedAggregates: Seq[CalcitePair[AggregateCall, String]], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove unnecessary parameters namedAggregates
and groupings
* @param windowSlide window slide of the sliding window | ||
* @param returnType return type of this function | ||
*/ | ||
class DataSetSlideTimeWindowAggReduceCombineFunction( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can be merged with DataSetSlideTimeWindowAggReduceGroupFunction
because both can only be used if input is combinable.
@@ -112,12 +112,10 @@ class DataSetTumbleTimeWindowAggReduceGroupFunction( | |||
} | |||
|
|||
// get final aggregate value and set to output. | |||
aggregateMapping.foreach { | |||
case (after, previous) => { | |||
aggregateMapping.foreach { case (after, previous) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
revert this change? Will be fixed with #3489
out.collect(record) | ||
var i = 0 | ||
while (i < record.getArity) { | ||
output.setField(i, record.getField(0)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
record.getField(0)
-> record.getField(i)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point!
This PR implements sliding group-windows. It covers the following cases:
All windows support the overlapping and non-overlapping case. Slide windows are pre-tumbled if possible. This PR also fixes some bugs.
If the general design is ok, I will also implement the missing non-incremental aggregates for count-windows. And add a bit of documentation.