-
Notifications
You must be signed in to change notification settings - Fork 13k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-30570] Fix error inferred partition condition #21622
base: master
Are you sure you want to change the base?
Conversation
fbfa489
to
4759653
Compare
cc @swuferhong |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your contribution @Aitozi, I go through the code and left some comments
remainingPartitions = Collections.emptyList(); | ||
this.data.put(Collections.emptyMap(), Collections.emptyList()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would this cast affect the final result of tests? Or you think it is meaningless?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This cast will fail, because a map can not be cast to a list (it fails when I running test that pushing the empty list partition down)
@@ -209,7 +209,7 @@ object RexNodeExtractor extends Logging { | |||
|
|||
val (partitionPredicates, nonPartitionPredicates) = | |||
conjunctions.partition(isSupportedPartitionPredicate(_, partitionFieldNames, inputFieldNames)) | |||
(partitionPredicates, nonPartitionPredicates) | |||
(partitionPredicates.filter(p => RexUtil.isDeterministic(p)), nonPartitionPredicates) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the better way to implement this code is to judge the isDeterministic()
in isSupportedPartitionPredicate
instead of adding filter after getting partitionPredicates
. These rexNode which is deterministic need to be added into the list of nonPartitionPredicates
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea, I will fix this.
val inputRefFinder = new InputRefVisitor | ||
predicate.accept(inputRefFinder) | ||
// if no fields reached, it's not partition condition | ||
if (inputRefFinder.getFields.length == 0) { | ||
return false | ||
} | ||
try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no need to new an InputRefVisitor
. I think you need to deal it in isSupportedPartitionPredicate.visitor
. Changing the logic of visitor.visitInputRef
and override visitRexCall
if needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea, I will fix this.
|
||
@Test | ||
def testRandCondition(): Unit = { | ||
util.verifyRelPlan("SELECT * FROM PartitionableTable WHERE rand(1) < 0.001") | ||
} | ||
|
||
@Test | ||
def testRandCondition2(): Unit = { | ||
util.verifyRelPlan("SELECT * FROM PartitionableTable WHERE rand(part2) < 0.001") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are the same tests as tests you added in PushPartitionIntoLegacyTableSourceScanRuleTest
. I think there is no need to add these tests. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mainly follow the suggestion as here: #12966 (review)
Added the test in the rule test PushPartitionIntoLegacyTableSourceScanRuleTest
and PartitionableSourceTest
Array("p")) | ||
Assert.assertTrue(p.isEmpty) | ||
Assert.assertTrue(nonP.isEmpty) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can it be combined into one test? Roughly speaking, these tests are similar logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I combine the three test to two, and added some comments.
Hi @swuferhong , thank you for helping review this PR, I have fixed your comment, please take a look again. |
d93c288
to
69b2deb
Compare
The failed CI is caused by: https://issues.apache.org/jira/browse/FLINK-31120 |
@flinkbot run azure |
@swuferhong could you help take a look again? |
LGTM, cc @godfreyhe |
@godfreyhe Can you help take a final look on this PR ? Or the patch need to keep update to fix conflict |
69b2deb
to
df4a9bc
Compare
What is the purpose of the change
This PR is meant to fix the unexpected partition pruning. It fix in two parts:
Verifying this change
Some tests are added to verify it.