-
Notifications
You must be signed in to change notification settings - Fork 13.7k
[FLINK-10845][table] Support multiple different DISTINCT aggregates for batch #7079
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xueyumusic Hi, thanks for the PR. The change looks good from my side. I only left some minor suggestions about test improvement.
Best, Hequn
|boolean $nullTerm = false; | ||
|$resultTypeTerm $resultTerm; | ||
|if (${left.nullTerm} || ${right.nullTerm}) { | ||
| $resultTerm = ${left.nullTerm} && ${right.nullTerm}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add some null data in tests to test this logic.
"GROUP BY b " + | ||
"ORDER BY b" | ||
|
||
val data = new scala.collection.mutable.MutableList[(Int, Long, String)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replace this with CollectionDataSets.get5TupleDataSet(env)
.
I think we can add a case when
to generate null data. For example(just an example):
val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e)
tEnv.registerTable("MyTable", t)
val sqlAddNull = "SELECT a, b, c, e, CASE d WHEN 'Hallo' THEN null ELSE d END AS d From MyTable"
val sqlQuery =
"SELECT d, " +
" COUNT(DISTINCT d), " +
" COUNT(DISTINCT e) " +
s"FROM ($sqlAddNull) " +
"GROUP BY d " +
"ORDER BY d"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated, thank you very much for review and suggestions~, @hequn8128
@xueyumusic thanks for the update. Looks good from my side. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @xueyumusic. I added some comments.
|boolean $nullTerm = false; | ||
|$resultTypeTerm $resultTerm; | ||
|if (${left.nullTerm} || ${right.nullTerm}) { | ||
| $resultTerm = ${left.nullTerm} && ${right.nullTerm}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This implicitly assumes that the result term is of type boolean which might not always be the case. This line looks like a bug to me.
left: GeneratedExpression, | ||
right: GeneratedExpression) | ||
right: GeneratedExpression, | ||
compareNull: Boolean = false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use explicit parameters for all methods in code generation instead of default values.
|
||
val sqlQuery = | ||
"SELECT b, " + | ||
" COUNT(DISTINCT b), " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you also add a IS NOT DISTINCT FROM
expression test to org.apache.flink.table.expressions.ScalarOperatorsTest
?
Thanks for review, @twalthr , I updated the codes. |
left: GeneratedExpression, | ||
right: GeneratedExpression) | ||
right: GeneratedExpression, | ||
compareNull: Boolean) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xueyumusic After looking at the amount of changes, I would suggest to leave generateOperatorIfNotNull
untouched. This method is correct as it is. If you need some special null comparing logic. We can create a specialized method only for equals
. We could create a method generateDistinctFrom
that wraps generateEquals
. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think so, generateOperatorIfNotNull
is a low level common method. I updated the code, thanks~ @twalthr
@xueyumusic I just found the following rule |
hi, @twalthr , I tried simply by putting |
Thanks for the update @xueyumusic. Then let's at least do my other comment. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xueyumusic I think this solution is not correct. I think we should simply code generate:
CASE WHEN x IS NULL THEN y IS NULL WHEN y IS NULL THEN x IS NULL ELSE x = y END
this is also what Calcite's rule is doing.
left: GeneratedExpression, | ||
right: GeneratedExpression) | ||
: GeneratedExpression = { | ||
val newleft = left.copy(nullTerm = GeneratedExpression.NEVER_NULL) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks not correct to me. By setting the expression to never null, you basically compare the default values of expressions. Long has 0L
for example. For example, a 0 IS NOT DISTINCT FROM NULL
might return true
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@twalthr I thought the similar way ago and met a problem. If we check left is null then need to generate left code first. However in generateEqual (generateOperatorIfNotNull) it also generate left code, such as
${left.code}
${right.code}
if (${left.nullTerm}) {
return ${right.nullTerm}
} else if (${right.nullTerm}) {
return ${left.nullTerm}
} else {
return generateEqual(left, right)
(here will also generate ${left.code} and ${right.code} in generateOperatorIfNotNull)
}
I don't know how to make left and right code generate once if don't modify generateOperatorIfNotNull..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks that there is one way which generateEqual first such as
generateEquals(left, right)
check left.nullTerm and right.nullTerm
reset nullTerm=false and resultTerm
Do you think it is reasonable? Thanks~ @twalthr
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xueyumusic You can also do it like this (pseudo code):
${left.code}
${right.code}
resultTerm;
if (${left.nullTerm}) {
resultTerm = ${right.nullTerm}
} else if (${right.nullTerm}) {
resultTerm = ${left.nullTerm}
} else {
resultTerm = generateEqual(
false, // no null check required in equals
${left.copy(code = NO_CODE)},
${right.copy(code = NO_CODE)})
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much, @twalthr , This is an elegant solution. I updated the code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @xueyumusic. Looks to me now. Will merge...
…for batch This closes apache#7079.
What is the purpose of the change
This PR tried to support multiple distinct aggregates with different arguments for batch. For other cases such as single distinct or multiple with the same arguments it seemed to have supported. Actually it is the problem of IS_NOT_DISTINCT_FROM codegen support result from calcite rewrite
Brief change log
Verifying this change
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (no)Documentation