Skip to content

Conversation

@godfreyhe
Copy link
Contributor

@godfreyhe godfreyhe commented May 24, 2019

What is the purpose of the change

Introduce planner rules about aggregate

Brief change log

  • AggregateReduceGroupingRule, that reduces unless grouping columns
  • PruneAggregateCallRule, that removes unreferenced AggregateCall from Aggregate
  • FlinkAggregateRemoveRule, that is copied from Calcite's AggregateRemoveRule, and supports SUM, MIN, MAX, AUXILIARY_GROUP functions in non-empty group aggregate
  • FlinkAggregateJoinTransposeRule, that is copied from Calcite's AggregateJoinTransposeRule, and supports Left/Right outer join and aggregate with AUXILIARY_GROUP

Verifying this change

This change added tests and can be verified as follows:

  • Added rule tests that validate the logical plan after the added rules are applied
  • Added integration tests that validate the execution result the added rules are applied

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@flinkbot
Copy link
Collaborator

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

* on top of a {@link org.apache.calcite.rel.core.Calc} and if possible
* aggregate through the calc or removes the calc.
*
* <p>This is only possible when no condition in calc and the grouping expressions and arguments to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you can't merge condition, why not change the rule match pattern to aggregate on project?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AggregateProjectMergeRule already exists and is in our rule set. The original intention of introducing this rule is to solve the hack in RelDecorrelator for tpch query20, however current cost model can not find best plan, so this rule has not been add to our rule set. we can move this rule from this pr, and introduce it when needed

/**
* This rule is copied from Calcite's {@link org.apache.calcite.rel.rules.AggregateJoinTransposeRule}.
* Modification:
* - Do not match TemporalTableScan since it means that it is a dimension table scan currently.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put this rule into CBO, and we can avoid to deal with such situation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this rule is already in CBO, and CBO does not handle the unspported case (lookup table source doesn't support aggregate).

godfreyhe added 5 commits May 28, 2019 17:24
…regate

rules include:
1. AggregateCalcMergeRule, that recognizes Aggregate on top of a Calc and if possible aggregate through the calc or removes the calc
2. AggregateReduceGroupingRule, that reduces unless grouping columns
3. PruneAggregateCallRule, that removes unreferenced AggregateCall from Aggregate
4. FlinkAggregateRemoveRule, that is copied from Calcite's AggregateRemoveRule, and supports SUM, MIN, MAX, AUXILIARY_GROUP functions in non-empty group aggregate
5. FlinkAggregateJoinTransposeRule, that is copied from Calcite's AggregateJoinTransposeRule, and supports Left/Right outer join and aggregate with AUXILIARY_GROUP
fail reason: the parallelism of shuffle-removed operators are different
Copy link
Contributor

@KurtYoung KurtYoung left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@KurtYoung KurtYoung merged commit 370e0cb into apache:master May 30, 2019
@godfreyhe godfreyhe deleted the FLINK-12610 branch June 1, 2019 09:53
Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest in the future to follow the principle that a test only tests a single thing. This entails that we should not submit multiple queries from the same test method because this makes debugging problems a lot harder.

Comment on lines +140 to +180
private def testSingleAggOnTable(): Unit = {
// group by fix length
checkResult("SELECT a1, b1, count(c1) FROM T1 GROUP BY a1, b1",
Seq(row(2, 1, 1), row(3, 2, 1), row(5, 2, 1), row(6, 3, 1)))
// group by string
checkResult("SELECT a1, c1, count(d1), avg(b1) FROM T1 GROUP BY a1, c1",
Seq(row(2, "A", 0, 1.0), row(3, "A", 1, 2.0), row(5, "B", 1, 2.0), row(6, "C", 1, 3.0)))
checkResult("SELECT c5, d5, avg(b5), avg(a5) FROM T5 WHERE d5 IS NOT NULL GROUP BY c5, d5",
Seq(row("B", "Hi", 2.0, 3.0), row("C", "Hello", null, 1.0),
row("D", "Hello world", 3.0, 4.0), row("E", "Hello world, how are you?", 1.0, 3.0),
row("I", "hahaha", 2.0, 7.0), row("J", "I am fine.", 1.0, 6.0)))
// group by string with null
checkResult("SELECT a1, d1, count(d1) FROM T1 GROUP BY a1, d1",
Seq(row(2, null, 0), row(3, "Hi", 1), row(5, "Hello", 1), row(6, "Hello world", 1)))
checkResult("SELECT c5, d5, avg(b5), avg(a5) FROM T5 GROUP BY c5, d5",
Seq(row("A", null, 1.0, 2.0), row("B", "Hi", 2.0, 3.0), row("C", "Hello", null, 1.0),
row("D", "Hello world", 3.0, 4.0), row("E", "Hello world, how are you?", 1.0, 3.0),
row("F", null, null, 5.0), row("I", "hahaha", 2.0, 7.0), row("J", "I am fine.", 1.0, 6.0)))

checkResult("SELECT a3, b3, count(c3) FROM T3 GROUP BY a3, b3",
Seq(row(1, 10, 1), row(2, 20, 2), row(3, 10, 1), row(4, 20, 1), row(4, null, 1)))
checkResult("SELECT a2, b2, count(c2) FROM T2 GROUP BY a2, b2",
Seq(row(1, 1, 1), row(1, 2, 1), row(2, 3, 0), row(2, 4, 1)))

// group by constants
checkResult("SELECT a1, b1, count(c1) FROM T1 GROUP BY a1, b1, 1, true",
Seq(row(2, 1, 1), row(3, 2, 1), row(5, 2, 1), row(6, 3, 1)))
checkResult("SELECT count(c1) FROM T1 GROUP BY 1, true", Seq(row(4)))

// large data, for hash agg mode it will fallback
checkResult("SELECT a6, c6, avg(b6), count(d6), avg(e6) FROM T6 GROUP BY a6, c6",
(0 until 50000).map(i => row(i, if (i % 500 == 0) null else s"Hello$i", 1D, 1L, 10D))
)
checkResult("SELECT a6, d6, avg(b6), count(c6), avg(e6) FROM T6 GROUP BY a6, d6",
(0 until 50000).map(i => row(i, "Hello world", 1D, if (i % 500 == 0) 0L else 1L, 10D))
)
checkResult("SELECT a6, f6, avg(b6), count(c6), avg(e6) FROM T6 GROUP BY a6, f6",
(0 until 50000).map(i => row(i, new Date(i + 1531820000000L), 1D,
if (i % 500 == 0) 0L else 1L, 10D))
)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A piece of feedback for running multiple jobs from the same test method: It is super hard to figure out what's going on in the logs because multiple jobs are submitted from testSingleAggOnTable. To make things even worse, all jobs have the same name "collect". I would propose to create for every query a dedicated test method with a descriptive name.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants