Skip to content

Conversation

iyupeng
Copy link
Contributor

@iyupeng iyupeng commented Sep 23, 2021

What is the purpose of the change

This pull request follows #14894 which has been inactive for months.
This pull request addresses review comments in the existing pull request and is based on the existing one.
All the code changes are targeted to make it possible to push down local aggregations from computing layer to storage layer for better computing & I/O performance.

Brief change log

  • Add a new configuration of table.optimizer.source.aggregate-pushdown-enabled to control the aggregation push down.
  • Add an abstract class of PushLocalAggIntoScanRuleBase for supporting both with and without sort operator.
  • Add a new rule of PushLocalSortAggIntoScanRule for sort aggregation case.
  • Add a new rule of PushLocalSortAggWithSortIntoScanRule for sort aggregation with sort case.
  • Add a new rule of PushLocalSortAggWithSortAndCalcIntoScanRule for sort aggregation with sort & calc case.
  • Add a new rule of PushLocalHashAggIntoScanRule for hash aggregation case.
  • Add a new rule of PushLocalHashAggWithCalcIntoScanRule for hash aggregation with calc case.
  • Add new push down spec AggregatePushDownSpec to serialize/deserialize aggregations.
  • Implement the SupportsAggregatePushDown interface for TestValuesScanTableSource.
  • Some other unit tests and integration tests.

Verifying this change

This change added tests and can be verified as follows:

  • Added unit test class PushLocalAggIntoTableSourceScanRuleTest for plan verifications
  • Added integration test class LocalAggregatePushDownITCase for end-to-end functional test

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (docs)

@flinkbot
Copy link
Collaborator

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 479edc3 (Thu Sep 23 15:29:59 UTC 2021)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!
  • This pull request references an unassigned Jira ticket. According to the code contribution guide, tickets need to be assigned before starting with the implementation work.

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Sep 23, 2021

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@iyupeng
Copy link
Contributor Author

iyupeng commented Sep 29, 2021

@wuchong @godfreyhe Please take a look, thanks a lot.

@godfreyhe
Copy link
Contributor

@iyupeng sorry for the late response, I start to review it now

Copy link
Contributor

@godfreyhe godfreyhe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution @iyupeng , I left some comment

@iyupeng
Copy link
Contributor Author

iyupeng commented Oct 22, 2021

@godfreyhe Thanks for your comments. I plan to push new commits in a few days.

Copy link
Contributor

@godfreyhe godfreyhe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the quick update @iyupeng, I left some comments about your changes

Comment on lines 1004 to 1008
// Disable push down aggregates to avoid conflicts with existing test cases that verify plans.
tableEnv.getConfig
.getConfiguration
.setBoolean(
OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED, false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, this pr do not change all tests except you added, so remove it ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @godfreyhe, thanks for the comments.

2 existing test cases with local aggregate and 'connector' = 'values' can be affected if we enable aggregate push down:

  1. RankTest.scala#L172, testCreateViewWithRowNumber

  2. TableSourceTest.scala#L118, testProjectWithoutInputRef

We need to change the plans of these 2 test cases in .xml files if we remove above configuration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pushed a new commit 2ee07dc to solve this.

Copy link
Contributor

@godfreyhe godfreyhe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM overall except one minor comment

Copy link
Contributor

@godfreyhe godfreyhe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the quick update, LGTM

@godfreyhe godfreyhe closed this in 34d581b Oct 31, 2021
* <p>Regardless if this interface is implemented or not, a final aggregation is always applied in a
* subsequent operation after the source.
*
* <p>Note: currently, the {@link SupportsAggregatePushDown} is not supported by planner.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iyupeng are we sure that the JavaDocs don't need more updates for this PR?

For example, is this description still correct?

 * <p>Note: The local aggregate push down strategy is all or nothing, it can only be pushed down if
 * all aggregate functions are supported.

Also maybe we should mention the config option that was added in this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @twalthr, the strategy is still all or nothing here. applyAggregates returns a bool value, meaning whether all local aggregates are accepted by underlying data source or not at all.

The docs could be improved, like mentioning the new option, etc.

I could push a hotfix for docs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be great. The more information in the interface description, the better for all implementers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @twalthr I pushed this for doc improvement: #17630

niklassemmler pushed a commit to niklassemmler/flink that referenced this pull request Feb 3, 2022
…le planner

Co-authored-by: Sebastian Liu <liuyang0704@gmail.com>

This closes apache#17344
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants