Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-12936][table-planner-blink] Support intersect all / minus all to blink planner #8898

Merged
merged 2 commits into from Jul 5, 2019

Conversation

JingsongLi
Copy link
Contributor

What is the purpose of the change

Now, we just support intersect and minus, See ReplaceIntersectWithSemiJoinRule and ReplaceMinusWithAntiJoinRule, replace intersect with null aware semi-join and distinct aggregate.
We need support intersect all and minus all too.
Presto and Spark already support them:
prestodb/presto#4918
https://issues.apache.org/jira/browse/SPARK-21274

I think them have a good rewrite design and we can follow them:
1.For intersect all
Input Query

SELECT c1 FROM ut1 INTERSECT ALL SELECT c1 FROM ut2

Rewritten Query

  SELECT c1
    FROM (
         SELECT replicate_row(min_count, c1)
         FROM (
              SELECT c1,
                     IF (vcol1_cnt > vcol2_cnt, vcol2_cnt, vcol1_cnt) AS min_count
              FROM (
                   SELECT   c1, count(vcol1) as vcol1_cnt, count(vcol2) as vcol2_cnt
                   FROM (
                        SELECT c1, true as vcol1, null as vcol2 FROM ut1
                        UNION ALL
                        SELECT c1, null as vcol1, true as vcol2 FROM ut2
                        ) AS union_all
                   GROUP BY c1
                   HAVING vcol1_cnt >= 1 AND vcol2_cnt >= 1
                  )
              )
          )

2.For minus all:
Input Query

SELECT c1 FROM ut1 EXCEPT ALL SELECT c1 FROM ut2

Rewritten Query

 SELECT c1
    FROM (
     SELECT replicate_rows(sum_val, c1)
       FROM (
         SELECT c1, sum_val
           FROM (
             SELECT c1, sum(vcol) AS sum_val
               FROM (
                 SELECT 1L as vcol, c1 FROM ut1
                 UNION ALL
                 SELECT -1L as vcol, c1 FROM ut2
              ) AS union_all
            GROUP BY union_all.c1
          )
        WHERE sum_val > 0
       )
   )

Verifying this change

ut

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? JavaDocs

@flinkbot
Copy link
Collaborator

flinkbot commented Jun 26, 2019

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

Copy link
Contributor

@godfreyhe godfreyhe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for this PR @JingsongLi . please add rule test for RewriteMinusAll and RewriteIntersectAll, just as ReplaceIntersectWithSemiJoinRuleTest and ReplaceMinusWithAntiJoinRule do.

]]>
</Resource>
</TestCase>
<TestCase name="testIntersectAll">
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this case should be in second commit ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I should just squash second and third to a one commit.

@godfreyhe
Copy link
Contributor

update commit message as Support intersect all/minus all in blink planner ?

@JingsongLi JingsongLi changed the title [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner [FLINK-12936][table-planner-blink] Support intersect all / minus all to blink planner Jun 27, 2019
@JingsongLi
Copy link
Contributor Author

thanks for this PR @JingsongLi . please add rule test for RewriteMinusAll and RewriteIntersectAll, just as ReplaceIntersectWithSemiJoinRuleTest and ReplaceMinusWithAntiJoinRule do.

OK, and I will change the names to RewriteMinusAllRule and RewriteIntersectAllRule.

@JingsongLi JingsongLi force-pushed the setall branch 2 times, most recently from 625cba0 to b186176 Compare July 2, 2019 09:07
Copy link
Contributor

@godfreyhe godfreyhe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i left some minor comments

"ReplaceIntersectWithSemiJoinRule") {

override def matches(call: RelOptRuleCall): Boolean = {
val intersect: Intersect = call.rel(0)
// not support intersect all now.
intersect.isDistinct
intersect.isDistinct && intersect.getInputs.size() == 2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

intersect.isDistinct => !intersect.all

and add a TODO remove "intersect.getInputs.size() == 2" limit ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

intersect.isDistinct => !intersect.all

Why?

and add a TODO remove "intersect.getInputs.size() == 2" limit ?

I think this rule should never deal with size bigger than 2, we should introduce a new rule to split it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, Use all SetOp.all and I will add comment to explain these rules just handle the case of input size 2

"ReplaceMinusWithAntiJoinRule") {

override def matches(call: RelOptRuleCall): Boolean = {
val minus: Minus = call.rel(0)
// not support minus all now.
minus.isDistinct
minus.isDistinct && minus.getInputs.size() == 2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minus.isDistinct => !minus.all
add a TODO


override def matches(call: RelOptRuleCall): Boolean = {
val intersect: Intersect = call.rel(0)
intersect.all && intersect.getInputs.size() == 2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a TODO

val leftWithAddedVirtualCols = leftBuilder
.push(left)
.project(leftBuilder.fields(fields) ++
Seq(leftBuilder.alias(leftBuilder.cast(leftBuilder.literal(1L), BIGINT), "vcol")))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please give a more meaningful name instead of vcol

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

name it vcol_marker, there is no appropriate name to describe it.

@godfreyhe
Copy link
Contributor

LGTM, +1 to merge

@JingsongLi
Copy link
Contributor Author

Travis passed(blink planner) in https://travis-ci.org/JingsongLi/flink/builds/554510279

@KurtYoung KurtYoung merged commit b4403f2 into apache:master Jul 5, 2019
@JingsongLi JingsongLi deleted the setall branch July 10, 2019 08:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants