-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-27618][sql] Flink supports CUME_DIST function #19727
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
05d76dd to
44e9bc4
Compare
43e7fba to
6e3706f
Compare
|
@flinkbot run azure |
|
@flinkbot run azure |
wuchong
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank @luoyuxia for the contribution. I have left some comments.
| if (expression instanceof UnresolvedReferenceExpression) { | ||
| UnresolvedReferenceExpression expr = (UnresolvedReferenceExpression) expression; | ||
| String name = expr.getName(); | ||
| if (function instanceof SizeBasedWindowFunction |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO
| public interface SizeBasedWindowFunction { | ||
|
|
||
| /** The field for the window size. */ | ||
| UnresolvedReferenceExpression windowSize(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A window function should have only a context-provided window size expression. So we don't need to let users define the expression. The interface can pre-define one. In addition, the expression can be a resolved one, to make it easy to use when code generation (don't need toWindowSizeExpr). For example:
default ResolvedExpression windowSizeAttribute() {
return localRef("window_size", DataTypes.INT());
}Then devs dont' need to implement this method and can directly be used just like method operand(i).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really a good suggestion!
...anner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/AggCodeGenHelper.scala
Outdated
Show resolved
Hide resolved
...ink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/AggCodeGen.scala
Outdated
Show resolved
Hide resolved
...ntime/src/main/java/org/apache/flink/table/runtime/generated/AggsFunctionWithWindowSize.java
Outdated
Show resolved
Hide resolved
...ntime/src/main/java/org/apache/flink/table/runtime/generated/AggsFunctionWithWindowSize.java
Outdated
Show resolved
Hide resolved
|
|
||
| override def setWindowSize(generator: ExprCodeGenerator): String = { | ||
| throw new TableException( | ||
| "Distinct shouldn't set window size, this is a bug, please file a issue.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible that COUNT(DISTINCT) OVER ... may hit this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| } | ||
|
|
||
| @Test | ||
| def testCumeDist(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add additional tests to cover values of order-key containing duplicates.
e206652 to
e94b0d2
Compare
|
@wuchong Thanks for review. I have updated this pr. |
| initialAggregateInformation(aggInfoList) | ||
|
|
||
| // generates all methods body first to add necessary reuse code to context | ||
| val setWindowSizeCode = if (isWindowSizeNeeded) genSetWindowSize() else "" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please follow the way of genRetract() to generate throw exception when window size is not needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on the current implementation, we will always call method setWindowSize, so we can't generate the code to throw exception . But I add some comments for this method genSetWindowSize.
| * Set window size for the aggregate function. Some aggregate functions may requires the size of | ||
| * current window to do calculation. | ||
| */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add more explanation about the "window size", it may confuse devs the relationship with tumble/hop window size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
wuchong
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
What is the purpose of the change
To make Flink support
CUME_DISTfunction.Brief change log
CUME_DISTin FlinkSqlOperatorTable.javasetWindowSizein ...le-runtime/src/main/java/org/apache/flink/table/runtime/generated/AggsHandleFunction.java which enable to generatesetWindowSizefunction in code gen phase.SizeBasedWindowFunction, the methodsetWindowSizewill really set window's size.setWindowSizeto set the window size.SizeBasedWindowFunctionforCUME_DIST.Verifying this change
Test in OverAggregateITCase.scala
Does this pull request potentially affect one of the following parts:
@Public(Evolving): noDocumentation