Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-13708] [table-planner-blink] transformations should be cleared after execution in blink planner #9433

Open
wants to merge 5 commits into
base: master
from

Conversation

@godfreyhe
Copy link
Contributor

commented Aug 14, 2019

What is the purpose of the change

transformations should be cleared after execution in blink planner

Brief change log

  • clear transformations in ExecutorBase

Verifying this change

This change added tests and can be verified as follows:

  • Added testExecuteTwiceUsingSameTableEnv to verify this bug

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
@flinkbot

This comment has been minimized.

Copy link

commented Aug 14, 2019

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit b77e7a2 (Sun Sep 08 13:07:12 UTC 2019)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • 1. The [description] looks good.
  • 2. There is [consensus] that the contribution should go into to Flink.
  • 3. Needs [attention] from.
  • 4. The change fits into the overall [architecture].
  • 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier
@flinkbot

This comment has been minimized.

Copy link

commented Aug 14, 2019

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build
@@ -75,4 +84,41 @@ class TableEnvironmentTest {
" LogicalTableScan(table=[[default_catalog, default_database, MyTable]])\n"
assertEquals(expected, actual)
}

@Test
def testExecuteTwiceUsingSameTableEnv(): Unit = {

This comment has been minimized.

Copy link
@wuchong

wuchong Aug 14, 2019

Member

This is an integration test but added to a unit test class.


@Test
def testExecuteTwiceUsingSameTableEnv(): Unit = {
val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()

This comment has been minimized.

Copy link
@wuchong

wuchong Aug 14, 2019

Member

Does this also works for streaming mode?
From the issue title and description, it seems that we want to fix it both in streaming mode and batch mode.

This comment has been minimized.

Copy link
@godfreyhe

godfreyhe Aug 16, 2019

Author Contributor

added TableEnvironmentITCase to test batch and stream

@Test
def testExecuteTwiceUsingSameTableEnv(): Unit = {
val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val tEnv = TableEnvironmentImpl.create(settings)

This comment has been minimized.

Copy link
@wuchong

wuchong Aug 14, 2019

Member

Use TableEnvironment instead of the implementation?

This comment has been minimized.

Copy link
@godfreyhe

godfreyhe Aug 15, 2019

Author Contributor

use impl class instead of interface class to avoid error: "Static methods in interface require -target:jvm-1.8" in scala

@dawidwys

This comment has been minimized.

Copy link
Contributor

commented Aug 14, 2019

I have a bit different question. Why ExecutorBase buffers the transformations in the first place? It violates the contract of Executor#apply. The assumption is that apply method will really apply the transformations to the ExecutionEnvironment that was provided. We do the buffering in the TableEnvironmentImpl already.

The way it is implemented right now. Such program, which is a valid program will produce no results:

StreamExecutionEnvironment senv = ...
StreamTableEnvironment tEnv = StreamTableEnvirornment.create(senv)
tEnv.sql(...)
senv.execute();
import org.apache.flink.table.planner.utils.{TableTestUtil, TestTableSources}
import org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo
import org.apache.flink.types.Row
import org.apache.flink.util.AbstractID

import org.apache.calcite.plan.RelOptUtil
import org.junit.Assert.assertEquals

This comment has been minimized.

Copy link
@JingsongLi

JingsongLi Aug 14, 2019

Contributor

Looks like TableEnvironmentTest is a planner tester. Why not extends TableTestBase?

@godfreyhe

This comment has been minimized.

Copy link
Contributor Author

commented Aug 15, 2019

I have a bit different question. Why ExecutorBase buffers the transformations in the first place? It violates the contract of Executor#apply. The assumption is that apply method will really apply the transformations to the ExecutionEnvironment that was provided. We do the buffering in the TableEnvironmentImpl already.

The way it is implemented right now. Such program, which is a valid program will produce no results:

StreamExecutionEnvironment senv = ...
StreamTableEnvironment tEnv = StreamTableEnvirornment.create(senv)
tEnv.sql(...)
senv.execute();

different from the implementation of StreamExecutor in flink planner, Executors in blink planner will generate StreamGraph first and execute the stream graph directly. StreamExecutor in blink planner should buffer the transformations to generate a whole StreamGraph (StreamGraph does not support merge now) if the operation translation is eager (using StreamTableEnvirornment). To unify the implementation for batch and stream executor, we defined a buffered transformations in ExecutorBase.

one approach to avoid buffering transformations is StreamExecutor in blink planer does not generate StreamGraph any more, and keep the same implementation as StreamExecutorin flink planner. andBatchExecutorgenerateStreamGraphinapply` method

@dawidwys

This comment has been minimized.

Copy link
Contributor

commented Aug 15, 2019

@godfreyhe I see the problem. TBH I would prefer the approach you suggested more than how it currently is implemented. It supports the old behavior.

@godfreyhe

This comment has been minimized.

Copy link
Contributor Author

commented Aug 15, 2019

@godfreyhe I see the problem. TBH I would prefer the approach you suggested more than how it currently is implemented. It supports the old behavior.

I find another problem today: if explain twice, the second result contains the first result, or if execute after explain, the StreamGraph to execute also contains transformations generated in explain. This bug exists in both planners.

the latest fix is: generate StreamGraph not through StreamExecutionEnvironment but StreamGraphGenerator. This approach could avoid StreamExecutionEnvironment buffer the transformations added in explain method or in execute method, and could generate clean StreamGraph. and this will be fixed in blink planner now.

Copy link
Member

left a comment

The change looks good to me. I only left a suggestion about the test.

cc @dawidwys @twalthr , it would be great if you can have a look too, especially the API part changes.

* limitations under the License.
*/

package org.apache.flink.table.api

This comment has been minimized.

Copy link
@wuchong

wuchong Aug 18, 2019

Member

Put it under org.apache.flink.table.runtime

import _root_.java.io.File


class TableEnvironmentITCase {

This comment has been minimized.

Copy link
@wuchong

wuchong Aug 18, 2019

Member

It should extends AbstractTestBase to make the env parallelism to be 4.

wuchong added a commit to wuchong/flink that referenced this pull request Aug 19, 2019
@dawidwys

This comment has been minimized.

Copy link
Contributor

commented Aug 19, 2019

Could we take a step back here and think once again when/what/why is buffered? Could we prepare such list/description in the corresponding JIRA? As far as I understood it in the bridging APIs there should be no buffering at all and the buffering should happen only in the unified one.

The current state of this PR does not fix the StreamTableEnvironment with blink planner. You can not call execute on StreamExecutionEnvironment. Also I don't understand why should we throw exception in explain method. This sounds like a rather dirty hack around some design flaws.

@wuchong

This comment has been minimized.

Copy link
Member

commented Aug 21, 2019

when/what/why is buffered? I will list from my understanding:

  1. operations should be buffered in unified table env. In order to reuse the shared part in the graph. And should be cleared after execute is invoked. (This is what we do currently.)
  2. transformations should be buffered in Executor not in StreamExecutionEnvironment in blink planner. Blink batch need to set additional properties on the transformation before execute. We should also clear the buffered transformations after execute is invoked.

The current state of this PR does not fix the StreamTableEnvironment with blink planner. You can not call execute on StreamExecutionEnvironment

For StreamTableEnvironment, I don't think we can fix this issue. Because StreamExecutionEnvironment doesn't support execute multiple times. Is that right?

Also I don't understand why should we throw exception in explain method. This sounds like a rather dirty hack around some design flaws.

Currently, StreamTableEnvironment.explain returns nothing even if we applied some queries before. I think throwing an exception will be better to warning users this method is not work properly currently. We can return the buffered logical/physical plan in the future, but it needs more work.

@Override
public String explain(boolean extended) {
// throw exception directly, because the operations to explain are always empty
throw new TableException("'explain' method is unsupported in StreamTableEnvironment.");

This comment has been minimized.

Copy link
@JingsongLi

JingsongLi Aug 21, 2019

Contributor

String explain(Table table) can work in StreamTableEnvironment. This msg is misleading.

This comment has been minimized.

Copy link
@wuchong

wuchong Aug 21, 2019

Member

Yes. We can improve the message.

@dawidwys

This comment has been minimized.

Copy link
Contributor

commented Aug 28, 2019

I get the problems with BatchExecutor I can't think of a better solution for now.

Regarding the:

For StreamTableEnvironment, I don't think we can fix this issue. Because StreamExecutionEnvironment doesn't support execute multiple times. Is that right?

Could you elaborate a bit more. Why do we need that (the multiple time execute method)?

@wuchong

This comment has been minimized.

Copy link
Member

commented Aug 28, 2019

Hi @dawidwys ,

After revisiting the code of StreamExecutionEnvironment, I find that I'm wrong. StreamExecutionEnvironment supports to execute multiple times, because it will clear the buffered transformations.

Regarding to why we need execute multiple times:

  1. we didn't forbid to execute multiple times in the JavaDoc of execute method.
  2. StreamExecutionEnvironment also supports execute multiple times.
  3. We are doing the benchmark for TPC-DS queries and the total time is one of the important metrics. Meanwhile, we will collect and register some statistics in the built-in catalog, this will takes some time because it will spawn some jobs to calculate ndv, row-count, etc... If the TableEnvironment doesn't support execute multiple times, we have to re-calculate the statistics and re-construct TableEnvironment, this will prolong the total benchmark time.
@wuchong

This comment has been minimized.

Copy link
Member

commented Aug 28, 2019

There are three issues:

  1. explain() + execute() ==> will execute the job twice
  2. explain() + explain() ==> will display two same physical plans in the second explain.
  3. execute() + execute() ==> will execute the job twice in the second execute

Blink planner has all of three issue. And flink planner has 1) and 2) issues.

@dawidwys

This comment has been minimized.

Copy link
Contributor

commented Aug 28, 2019

I get that we should change sth. I am trying to understand if we need the whole buffering business in executors. I understand now that we need it in BatchExecutor. But I really see no reason why Blink StreamExecutor should behave differently from Flink's.

Ad. 1 & 2 The problem is not in how the Executor is implemented (at least in the Flink planner), but because of the TableSource interface. Which always adds transformation to a StreamExecutionEnvironment. Generally speaking explain should be stateless.

Just to sum it up a bit. I think changes in this PR are fine for now to improve the BatchExecutor. However, unless you have a good reason for it, and I haven't heard one so far, I would really prefer that the StreamExecutor does not buffer the transformations, because in the end I think we should aim for all the Executors to be stateless.

@wuchong

This comment has been minimized.

Copy link
Member

commented Aug 28, 2019

However, in order to make explain stateless, we have to get a StreamGraphGenerator to generate StreamGraph ourselves. We can't get StreamGraphGenerator from the underlying env. That's why we construct the StreamGraphGenerator in Executor to do this. And we would like to make sure the generation behavior is the same between explain and execute, so we buffers the transformations to generate StreamGraph to submit.

@dawidwys

This comment has been minimized.

Copy link
Contributor

commented Aug 28, 2019

No you don't buffer in this PR in the explain method. In the explain method you just generate StreamGraph from the transformations you got in the explain method. Which is correct.

There is actually one more thing that I don't necessarily like about it. The Planner has to construct the Executor which violates the clear seperation of logical/relational planning and physcial/execution planning. How about we add a method explain to the Executor as well. This method would explain the physical part.

@wuchong

This comment has been minimized.

Copy link
Member

commented Aug 29, 2019

No you don't buffer in this PR in the explain method. In the explain method you just generate StreamGraph from the transformations you got in the explain method. Which is correct.

Yes. We didn't buffer transformations for explain method.

There is actually one more thing that I don't necessarily like about it. The Planner has to construct the Executor which violates the clear seperation of logical/relational planning and physcial/execution planning. How about we add a method explain to the Executor as well. This method would explain the physical part.

How about add a String explainExecutionPlan(List<Transformation<?>>) ?
explain is too generic and it doesn't return logical parts.

@godfreyhe

This comment has been minimized.

Copy link
Contributor Author

commented Sep 8, 2019

so sorry for late reply, and thanks all guys for the suggestion. The implementation of Blink StreamExecutor is same with Flink StreamExecutor now, except two added getStreamGraph methods: one is for explain method, and another is for sql client.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants
You can’t perform that action at this time.