Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-12600] [table-planner-blink] Introduce planner rules to do deterministic rewriting on RelNode #8520

Merged
merged 4 commits into from
May 28, 2019

Conversation

godfreyhe
Copy link
Contributor

@godfreyhe godfreyhe commented May 23, 2019

What is the purpose of the change

Introduce planner rules to do deterministic rewriting on RelNode

Brief change log

  • FlinkLimit0RemoveRule, that rewrites limit 0 to empty Values
  • FlinkRewriteSubQueryRule, that rewrites a Filter with condition: (select count from T) > 0 to a Filter with condition: exists(select * from T), which could be converted to SEMI Join by FlinkSubQueryRemoveRule
  • ReplaceIntersectWithSemiJoinRule, that rewrites distinct Intersect to a distinct Aggregate on a SEMI Join
  • ReplaceMinusWithAntiJoinRule, that rewrites distinct Minus to a distinct Aggregate on an ANTI Join
  • introduce FlinkPruneEmptyRules#JOIN_RIGHT_INSTANCE to handle ANTI join with empty right

Verifying this change

This change added tests and can be verified as follows:

  • Added FlinkLimit0RemoveRuleTest that validates the logical plan result after FlinkLimit0RemoveRule is applied, and added Limit0RemoveITCase that validates the execution result
  • Added FlinkRewriteSubQueryRuleTest that validates the logical plan results after FlinkRewriteSubQueryRule is applied
  • Added ReplaceIntersectWithSemiJoinRuleTest that validates the logical plan results after ReplaceIntersectWithSemiJoinRule is applied
  • Added ReplaceMinusWithAntiJoinRuleTest that validates the logical plan results after ReplaceMinusWithAntiJoinRule is applied
  • Added FlinkPruneEmptyRulesTest that validates the logical plan results after FlinkPruneEmptyRules is applied
  • Added SetOperatorsTest that validates the physical plan results after ReplaceMinusWithAntiJoinRule and ReplaceIntersectWithSemiJoinRuleTest are applied

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@flinkbot
Copy link
Collaborator

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.core.{Join, JoinRelType, Values}

object FlinkPruneEmptyRules {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you intend to introduce more than one rule in this object?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PruneEmptyRules in Calcite contains more than one rules, I intend to keep calcite style for this rule. Maybe later, we need to copy other rules from PruneEmptyRules to this file.


/**
* Planner rule that rewrites filter condition like:
* `(select count(*) from T) > 0` to `exists(select * from T)`,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why semi join is better than aggregate?
BTW, i think for query like this, a more efficient one is convert to exists(select * from T limit 1)

Copy link
Contributor Author

@godfreyhe godfreyhe May 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The estimation for SEMI/ANTI join is very inaccurate,so we intend to do deterministic rewriting on SEMI/ANTI join. And we can put this rule to CBO after we improve estimation of SEMI/ANTI join.

yes, we can do similarly rewriting for exists(select * from T limit 1) later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused, the original query is aggregate, but you want to convert it to a semi join, knowing that the estimation of semi join is not reliable.
Take a step back, why semi join is better than aggregate in this case?

Copy link
Contributor Author

@godfreyhe godfreyhe May 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the original query is a scalar query, which will be converted to aggregate + join by Calcite rules.
for example, full sql like SELECT * FROM x WHERE (SELECT COUNT(*) FROM y WHERE d > 10) > 0 ,the logical plan converted by Calcite rules is:

LogicalProject(a=[$0], b=[$1], c=[$2])
+- LogicalProject(a=[$0], b=[$1], c=[$2])
   +- LogicalFilter(condition=[>($3, 0)])
      +- LogicalJoin(condition=[true], joinType=[left])
         :- LogicalTableScan(table=[[x]])
         +- LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
            +- LogicalProject($f0=[0])
               +- LogicalFilter(condition=[>($0, 10)])
                  +- LogicalTableScan(table=[[y]])

i will update the class comments to make it more clear.

…erministic rewriting on RelNode

rules include:
1. FlinkLimit0RemoveRule, that rewrites `limit 0` to empty Values
2. FlinkRewriteSubQueryRule, that rewrites a Filter with condition: `(select count from T) > 0` to a Filter with condition: `exists(select * from T)`, which could be converted to SEMI Join by FlinkSubQueryRemoveRule
3. ReplaceIntersectWithSemiJoinRule, that rewrites distinct Intersect to a distinct Aggregate on a SEMI Join
4.ReplaceMinusWithAntiJoinRule, that rewrites distinct Minus to a distinct Aggregate on an ANTI Join

ps, introduce FlinkPruneEmptyRules#JOIN_RIGHT_INSTANCE to handle ANTI join with empty right
update comments for FlinkRewriteSubQueryRule
Copy link
Contributor

@KurtYoung KurtYoung left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@KurtYoung KurtYoung merged commit b333ddc into apache:master May 28, 2019
@godfreyhe godfreyhe deleted the FLINK-12600 branch June 1, 2019 09:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants