-
Notifications
You must be signed in to change notification settings - Fork 13k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-33171][table planner] Consistent implicit type coercion support for equal and non-equal comparisons for codegen #23478
Conversation
296673c
to
c888fd7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the contribution. The fix looks good to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fengjiajie thanks for contributing this! I left two comments.
@@ -488,6 +488,18 @@ object ScalarOperatorGens { | |||
else if (isNumeric(left.resultType) && isNumeric(right.resultType)) { | |||
generateComparison(ctx, "!=", left, right, resultType) | |||
} | |||
// support date/time/timestamp not equalTo string. | |||
else if ( | |||
(isTimePoint(left.resultType) && isCharacterString(right.resultType)) || |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add optimizations for string literals similar to the handling of generateEquals
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the condition is met, generate code using generateEquals, where the optimizations are also applied and the code is reused.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering why generateNotEquals
doesn't reuse the logic of generateEquals
(which ends up wrapped in a not
expression)? This is the direct cause of the current problem: the logic for data types of generatingNotEquals
and generatingEquals
is not aligned (In addition to the timepoint change here, the multiset and generic types are handled differently).
So why don't we switch to full reuse of generatingEquals
to eliminate these inconsistent behaviors altogether ? Because at the moment I don't see a semantic need to differentiate between equal and nonEqual handling (including three-valued logic).
cc @LadyForest @libenchao because I saw your discussion in jira.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an interesting idea, I like it.
I'm not sure about why this is designed to have a dedicated method originally, I'm guessing that maybe it can directly translate to left != right
which may have slight better performance than !(left == right)
, and will be more complex considering null handling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In order not to lose the simplicity of the operators and the null handling, I've roughly modified a version that can be found below
private def wrapExpressionIfNonEq(
isNonEq: Boolean,
equalsExpr: GeneratedExpression,
resultType: LogicalType): GeneratedExpression = {
if (isNonEq) {
equalsExpr
} else {
GeneratedExpression(
s"(!${equalsExpr.resultTerm})",
equalsExpr.nullTerm,
equalsExpr.code,
resultType)
}
}
private def generateEqualAndNonEqual(
ctx: CodeGeneratorContext,
left: GeneratedExpression,
right: GeneratedExpression,
operator: String,
resultType: LogicalType): GeneratedExpression = {
checkImplicitConversionValidity(left, right)
val nonEq = operator match {
case "==" => false
case "!=" => true
case _ => throw new CodeGenException(s"Unsupported boolean comparison '$operator'.")
}
val canEqual = isInteroperable(left.resultType, right.resultType)
if (isCharacterString(left.resultType) && isCharacterString(right.resultType)) {
generateOperatorIfNotNull(ctx, resultType, left, right)(
(leftTerm, rightTerm) => s"${if (nonEq) "!" else ""}$leftTerm.equals($rightTerm)")
}
// numeric types
else if (isNumeric(left.resultType) && isNumeric(right.resultType)) {
generateComparison(ctx, operator, left, right, resultType)
}
// array types
else if (isArray(left.resultType) && canEqual) {
wrapExpressionIfNonEq(
nonEq,
generateArrayComparison(ctx, left, right, resultType),
resultType)
}
// map types
else if (isMap(left.resultType) && canEqual) {
val mapType = left.resultType.asInstanceOf[MapType]
wrapExpressionIfNonEq(
nonEq,
generateMapComparison(
ctx,
left,
right,
mapType.getKeyType,
mapType.getValueType,
resultType),
resultType)
}
// multiset types
else if (isMultiset(left.resultType) && canEqual) {
val multisetType = left.resultType.asInstanceOf[MultisetType]
wrapExpressionIfNonEq(
nonEq,
generateMapComparison(
ctx,
left,
right,
multisetType.getElementType,
new IntType(false),
resultType),
resultType)
}
// comparable types of same type
else if (isComparable(left.resultType) && canEqual) {
generateComparison(ctx, operator, left, right, resultType)
}
// generic types of same type
else if (isRaw(left.resultType) && canEqual) {
val Seq(resultTerm, nullTerm) = newNames("result", "isNull")
val genericSer = ctx.addReusableTypeSerializer(left.resultType)
val ser = s"$genericSer.getInnerSerializer()"
val code =
s"""
|${left.code}
|${right.code}
|boolean $nullTerm = ${left.nullTerm}|| ${right.nullTerm};
|boolean $resultTerm = ${primitiveDefaultValue(resultType)};
|if (!$nullTerm) {
| ${left.resultTerm}.ensureMaterialized($ser);
| ${right.resultTerm}.ensureMaterialized($ser);
| $resultTerm =
| ${if (nonEq) "!" else ""}${left.resultTerm}.getBinarySection().
| equals(${right.resultTerm}.getBinarySection());
|}
|""".stripMargin
GeneratedExpression(resultTerm, nullTerm, code, resultType)
}
// support date/time/timestamp equalTo string.
// for performance, we cast literal string to literal time.
else if (isTimePoint(left.resultType) && isCharacterString(right.resultType)) {
if (right.literal) {
generateEqualAndNonEqual(
ctx,
left,
generateCastLiteral(ctx, right, left.resultType),
operator,
resultType)
} else {
generateEqualAndNonEqual(
ctx,
left,
generateCast(ctx, right, left.resultType, nullOnFailure = true),
operator,
resultType)
}
} else if (isTimePoint(right.resultType) && isCharacterString(left.resultType)) {
if (left.literal) {
generateEqualAndNonEqual(
ctx,
generateCastLiteral(ctx, left, right.resultType),
right,
operator,
resultType)
} else {
generateEqualAndNonEqual(
ctx,
generateCast(ctx, left, right.resultType, nullOnFailure = true),
right,
operator,
resultType)
}
}
// non comparable types
else {
generateOperatorIfNotNull(ctx, resultType, left, right) {
if (isReference(left.resultType)) {
(leftTerm, rightTerm) => s"${if (nonEq) "!" else ""}$leftTerm.equals($rightTerm)"
} else if (isReference(right.resultType)) {
(leftTerm, rightTerm) => s"${if (nonEq) "!" else ""}$rightTerm.equals($leftTerm)"
} else {
throw new CodeGenException(
s"Incomparable types: ${left.resultType} and " +
s"${right.resultType}")
}
}
}
}
def generateEquals(
ctx: CodeGeneratorContext,
left: GeneratedExpression,
right: GeneratedExpression,
resultType: LogicalType): GeneratedExpression = {
generateEqualAndNonEqual(ctx, left, right, "==", resultType)
}
def generateNotEquals(
ctx: CodeGeneratorContext,
left: GeneratedExpression,
right: GeneratedExpression,
resultType: LogicalType): GeneratedExpression = {
generateEqualAndNonEqual(ctx, left, right, "!=", resultType)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lincoln-lil This is a better solution! Should I include it in the code commit or assign the issue to you? I can also help add some corresponding unit tests if needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
handling in 'wrapExpressionIfNonEq' should be reversed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fengjiajie just feel free to move forward, and let's fix this issue completely!
...-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarOperatorsTest.scala
Outdated
Show resolved
Hide resolved
c888fd7
to
c33f80d
Compare
@LadyForest Thank you for your review, and I have seen the discussion on Jira. |
@lincoln-lil
So now I have only added f23, which corresponds to the TIMESTAMP_WITH_LOCAL_TIME_ZONE. If any further modifications are needed, please let me know. Thank you. |
@flinkbot run azure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fengjiajie thanks for the updates! Just two minor comments.
@@ -488,6 +488,39 @@ object ScalarOperatorGens { | |||
else if (isNumeric(left.resultType) && isNumeric(right.resultType)) { | |||
generateComparison(ctx, "!=", left, right, resultType) | |||
} | |||
// support date/time/timestamp not equalTo string. | |||
// for performance, we cast literal string to literal time. | |||
else if (isTimePoint(left.resultType) && isCharacterString(right.resultType)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we extract common method with generateEquals
?
...rc/test/scala/org/apache/flink/table/planner/expressions/utils/ScalarOperatorsTestBase.scala
Outdated
Show resolved
Hide resolved
@fengjiajie The 5 types are correct, I checked the code in FlinkTypeFactory and also verified it in sql client, the errors in sql client when create a column with TIMESTAMP WITH TIME ZONE: Flink SQL> CREATE TABLE Bid (
> bid STRING,
> price BIGINT,
> rowtime TIMESTAMP WITH TIME ZONE,
> -- declare user_action_time as event time attribute and use 5 seconds delayed watermark strategy
> WATERMARK FOR rowtime AS rowtime - INTERVAL '5' SECOND
> ) WITH ( 'connector' = 'datagen');
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "TIME" at line 4, column 26.
Was expecting:
"LOCAL" ... |
c33f80d
to
0d8b5e0
Compare
@lincoln-lil |
f35ac84
to
b4c7025
Compare
@lincoln-lil Based on the approach you provided, I attempted to write some test cases. However, there are two types that are not covered:
|
@fengjiajie for the non comparable types, I think we can ref to the public static boolean isComparable(LogicalType type) {
return !isRaw(type)
&& !isMap(type)
&& !isMultiset(type)
&& !isRow(type)
&& !isArray(type)
&& !isStructuredType(type);
} for the multiset type, one viable way is using the |
b4c7025
to
5eb961c
Compare
5eb961c
to
df867c1
Compare
|
@fengjiajie IIUC, we should add an explicit type cc @LadyForest If you have time to help check it again that would be great :) |
I'm sorry for not getting back to you sooner. I'll take a look as soon as possible. |
Thanks for bringing up this issue. I think fixing this issue requires both support from the syntax and type factory. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update, looks good to me in general, and I just left some minor comments.
...planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
Show resolved
Hide resolved
...planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
Outdated
Show resolved
Hide resolved
...planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
Outdated
Show resolved
Hide resolved
Thank you for the suggestion. It has been modified. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update, looks good to me!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fengjiajie thanks for fixing this!
@flinkbot run azure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @fengjiajie, thanks for the update.
I noticed that the TPCDS test failed after the change. I have reconsidered and realized that the equal and non-equal comparisons of string types cannot directly invoke the "wrapExpressionIfNonEq" function.
...planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
Outdated
Show resolved
Hide resolved
...planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
Outdated
Show resolved
Hide resolved
...planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
Outdated
Show resolved
Hide resolved
ca2c876
to
0baaacc
Compare
@LadyForest Thank you for helping troubleshoot the issue. |
@lincoln-lil @LadyForest When you have time, please take a look if any further modifications are needed, thanks. |
I'm sorry for the late reply; I'll take a look as soon as possible. |
…t for equal and non-equal comparisons for codegen
0baaacc
to
b5eab3a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update, LGTM
Hi @fengjiajie, could you pick the fix to branch release-1.18 and release-1.17 as well? |
…t for equal and non-equal comparisons for codegen This closes apache#23478
…t for equal and non-equal comparisons for codegen This closes apache#23478
Hi @LadyForest , thank you for moving this forward. cherry-picks to 1.17 and 1.18: |
What is the purpose of the change
When executing the following SQL:
the result is as follows:
The "notEq1" in the first row should be FALSE.
Brief change log
Add TimePoint not equalTo String code in ScalarOperatorGens
Verifying this change
Running the same SQL above will yield the correct result:
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation