Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-11715][table-planner-blink] Add optimize program to organize optimization phases #7834

Closed
wants to merge 4 commits into from

Conversation

godfreyhe
Copy link
Contributor

@godfreyhe godfreyhe commented Feb 26, 2019

What is the purpose of the change

This commit adds optimize program to organize optimization phases.

Currently, Flink organizes the optimization phases by different methods in Batch(Stream)TableEnvironment#optimize. However this is not easy to extend especially there are more than ten optimization stages in Blink.

So in Blink, each optimization stage is abstracted into a FlinkOptimizeProgram, and FlinkChainedPrograms is responsible for organizing all the programs.

Brief change log

  • Adds FlinkOptimizeProgram interface, and its common subclasses: FlinkRuleSetProgram, FlinkHepRuleSetProgram, FlinkVolcanoProgram, FlinkGroupProgram
  • Adds FlinkChainedPrograms for organizing all the programs

Verifying this change

This change added tests and can be verified as follows:

  • Added test that validates all operations to build a FlinkOptimizeProgram and FlinkChainedPrograms

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (JavaDocs)

@flinkbot
Copy link
Collaborator

flinkbot commented Feb 26, 2019

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot approve description to approve the 1st aspect (similarly, it also supports the consensus, architecture and quality keywords)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval

@KurtYoung
Copy link
Contributor

@flinkbot approve all

@KurtYoung
Copy link
Contributor

@flinkbot disapprove all

*
* @tparam OC OptimizeContext
*/
class FlinkChainedPrograms[OC <: OptimizeContext] extends Logging {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class should also extends FlinkOptimizeProgram?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good idea!

/**
* Gets the Calcite [[Context]] defined in [[org.apache.flink.table.api.TableEnvironment]].
*/
def getContext: Context
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we try to avoid use Calcite's Context to pass some necessary staffs? The Calcite's Context is really obscure to tell what exactly we can get from it. In this case, you might thought use this pass TableConfig around, but no one can know this though this API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only TableConfig is not enough, FunctionCatalog is also needed by RexNodeExtractor which is used in PushFilterIntoTableSourceScanRule and SelectivityEstimator in Blink.

Copy link
Contributor

@KurtYoung KurtYoung Feb 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, what i meant is why not just show these two method to OptimizeContext, like
getTableConfig and getFunctionCatalog. Through Context's API, nobody knows what they can get with it.

Copy link
Contributor Author

@godfreyhe godfreyhe Feb 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For most of programs, they need not to know the content in the Context. The Context instance is only be used to create HepPlanner as a whole instance now: new HepPlanner(hepProgram, context.getContext). Calcite's Context is built in TableEnvironment now, and VolcanoPlanner and HepPlanner are built with this context instance. However they are built in difference places, VolcanoPlanner is built in FlinkRelBuilder and HepPlanner is built in FlinkHepProgram. What I'm worried about is someone may be forget to modify FlinkHepProgram when he/she wants to extend Context in TableEnvironment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't depend on Context to pass things like TableConfig around, is it still necessary to build Context in TableEnvironment? I noticed HepPlanner has a constructor without Context.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A empty Context will be created in AbstractRelOptPlanner if building a HepPlanner without Context. And the rules executed in HepPlanner can not get TableConfig or FunctionCatalog instances set by user.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got your point. What do you think about we just let OptimizeContext to inherit from Calcite's Context, and exposes some meaningful APIs to OptimizeContext. And in each rule which wants to use TableConfig, just get Context and cast it to OptimizeContext.

Another comment on BatchOptimizeContext and StreamOptimizeContext, can we also try to unify these two? I know there are some differences between them in Blink's code, could you consider possibility of it? Since we may unify table environments for both batch and streaming.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's more clear than before that OptimizeContext extends Calcite's Context and exposes meaningful APIs.

BatchOptimizeContext and StreamOptimizeContext can be unified after table environments have been unified. We can do this later.

Copy link
Contributor

@KurtYoung KurtYoung left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me overall, but i have some concerns about Calcite's Context..

@KurtYoung
Copy link
Contributor

merging this...

@KurtYoung KurtYoung closed this in 2328ff3 Mar 1, 2019
@godfreyhe godfreyhe deleted the FLINK-11715 branch March 2, 2019 05:32
HuangZhenQiu pushed a commit to HuangZhenQiu/flink that referenced this pull request Apr 22, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants