Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-20887][table-planner] Disable project merge during sql2rel phase by default to avoid incorrectly project merge #22827

Closed
wants to merge 4 commits into from

Conversation

lincoln-lil
Copy link
Contributor

@lincoln-lil lincoln-lil commented Jun 19, 2023

What is the purpose of the change

FLINK-30841 fixes incorrect project merge in the optimization phase (prevents projects containing non-deterministic calls from being merged with 'wrong results' that are not expected by users, also FLINK-15366 & FLINK-30006 had taken some efforts), but didn't fix the problem completely, as in the case described in this issue, we need to turn off the relevant optimizations in the sql2rel phase(add internal config and turn off by default) to fix it completely.

some more detailed explanation of the relate case, if we keep project merge in sql2rel phase, we won't have any chance to fix it because the projects are been merge already, see the original relnode tree after sql2rel:

== Abstract Syntax Tree ==
LogicalProject(exprs=[[-(+(RAND(), 7), +(RAND(), 5))]])
+- LogicalTableScan(table=[[default_catalog, default_database, SmallTable3]])

Note: this is not a new feature(the newly added config option is not recommended to use, only used as a rollback configuration when users need to restore the behavior of an older version)

Brief change log

    1. pre-step: turn off the project merge optimization in the sql2rel phase by default
    1. fix incorrectly calc merge (but need to change related rules)
    1. fix the fail case due to incorrect structured type's nullability (more details can see FLINK-31830)
    1. fix the incorrectly aggregate project merge for LogicalWindowAggregate

Verifying this change

existing and newly added cases

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @public(Evolving): (yes)
  • The serializers: (no )
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)

@flinkbot
Copy link
Collaborator

flinkbot commented Jun 19, 2023

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@lincoln-lil lincoln-lil changed the title [FLINK-20887][table-planner] Disable project merge & join condition pushdown during sql2rel phase by default to avoid incorrectly project merge [FLINK-20887][table-planner] Disable project merge during sql2rel phase by default to avoid incorrectly project merge Jun 20, 2023
@lincoln-lil lincoln-lil force-pushed the FLINK-20887 branch 3 times, most recently from e284871 to f774338 Compare June 21, 2023 06:42
Copy link
Contributor

@swuferhong swuferhong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. It generally LGTM. Just leave some minor comments.

Comment on lines +361 to +374
private void replaceProgramWithProjectMergeRule() {
FlinkChainedProgram programs = new FlinkChainedProgram<BatchOptimizeContext>();
programs.addLast(
"rules",
FlinkHepRuleSetProgramBuilder.<BatchOptimizeContext>newBuilder()
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
.add(
RuleSets.ofList(
CoreRules.PROJECT_MERGE,
PushProjectIntoTableSourceScanRule.INSTANCE))
.build());
util().replaceBatchProgram(programs);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to add this method in the pre-step commit? Maybe it need to move to the Step2.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the ast changes after we disable project merge during sql2rel phase by default in the 1st commit, and this rule test will fail, so it should stay in 1st commit

import org.apache.calcite.rel.rules.ProjectMergeRule;

/**
* Extends calcite's FilterCalcMergeRule for streaming scenario, modification: does not merge the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extends calcite's ProjectMergeRule

Comment on lines 47 to 50
@Override
public void onMatch(RelOptRuleCall call) {
LogicalProject project = call.rel(0);
LogicalCalc calc = call.rel(1);

List<RexNode> expandProjects =
calc.getProgram().getProjectList().stream()
.map(p -> calc.getProgram().expandLocalRef(p))
.collect(Collectors.toList());
InputRefVisitor inputRefVisitor = new InputRefVisitor();
project.getProjects().forEach(p -> p.accept(inputRefVisitor));
boolean existNonDeterministicRef =
Arrays.stream(inputRefVisitor.getFields())
.anyMatch(i -> !RexUtil.isDeterministic(expandProjects.get(i)));

if (!existNonDeterministicRef) {
super.onMatch(call);
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it more reasonable to re-implement matches() method here, which to be consistent with ProjectMergeRule and CalcMergeRule.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense, I'll update it.

}
}

private def mergeable(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some commets?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

bottomProgram.getProjectList
.map(bottomProgram.expandLocalRef)
.toList)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nits: To achieve the goal of scala-free, would it be better to put these utils methods in a Java class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, this encourage me creating a new class FlinkRelUtil which is more appropriately than the FlinkRexUtil here

Comment on lines 673 to 674
* @param deep
* @param refCounts
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Parameter Description

Comment on lines 57 to 60
project.getProjects().forEach(p -> p.accept(inputRefVisitor));
boolean existNonDeterministicRef =
Arrays.stream(inputRefVisitor.getFields())
.anyMatch(i -> !RexUtil.isDeterministic(expandProjects.get(i)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can resue FlinkRexUtil.Mergeable()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From a maintainability point of view, it is indeed better to use a unified reusable logic here

if (FlinkRexUtil.isMergeable(topProject, bottomProject)) {
super.onMatch(call);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto: is it more reasonable to re-implement matches() method here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, will update

Copy link
Contributor Author

@lincoln-lil lincoln-lil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@swuferhong thank you for reviewing this! I've addressed your comments and updated the pr

Comment on lines 47 to 50
@Override
public void onMatch(RelOptRuleCall call) {
LogicalProject project = call.rel(0);
LogicalCalc calc = call.rel(1);

List<RexNode> expandProjects =
calc.getProgram().getProjectList().stream()
.map(p -> calc.getProgram().expandLocalRef(p))
.collect(Collectors.toList());
InputRefVisitor inputRefVisitor = new InputRefVisitor();
project.getProjects().forEach(p -> p.accept(inputRefVisitor));
boolean existNonDeterministicRef =
Arrays.stream(inputRefVisitor.getFields())
.anyMatch(i -> !RexUtil.isDeterministic(expandProjects.get(i)));

if (!existNonDeterministicRef) {
super.onMatch(call);
}
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense, I'll update it.

if (FlinkRexUtil.isMergeable(topProject, bottomProject)) {
super.onMatch(call);
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, will update

}
}

private def mergeable(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Comment on lines +361 to +374
private void replaceProgramWithProjectMergeRule() {
FlinkChainedProgram programs = new FlinkChainedProgram<BatchOptimizeContext>();
programs.addLast(
"rules",
FlinkHepRuleSetProgramBuilder.<BatchOptimizeContext>newBuilder()
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
.add(
RuleSets.ofList(
CoreRules.PROJECT_MERGE,
PushProjectIntoTableSourceScanRule.INSTANCE))
.build());
util().replaceBatchProgram(programs);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the ast changes after we disable project merge during sql2rel phase by default in the 1st commit, and this rule test will fail, so it should stay in 1st commit

Comment on lines 57 to 60
project.getProjects().forEach(p -> p.accept(inputRefVisitor));
boolean existNonDeterministicRef =
Arrays.stream(inputRefVisitor.getFields())
.anyMatch(i -> !RexUtil.isDeterministic(expandProjects.get(i)));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From a maintainability point of view, it is indeed better to use a unified reusable logic here

bottomProgram.getProjectList
.map(bottomProgram.expandLocalRef)
.toList)
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, this encourage me creating a new class FlinkRelUtil which is more appropriately than the FlinkRexUtil here

Copy link
Contributor

@swuferhong swuferhong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @lincoln-lil . This PR LGTM now.

@lincoln-lil
Copy link
Contributor Author

The squashed commits after rebased master branch succeed in my private pipeline: https://dev.azure.com/lincoln86xy/flink/_build/results?buildId=537&view=results

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants