-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-14840] Use Executor interface in SQL cli #10313
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-14840] Use Executor interface in SQL cli #10313
Conversation
|
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 64339da (Wed Dec 04 17:08:48 UTC 2019) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. DetailsThe Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
CI report:
Bot commandsThe @flinkbot bot supports the following commands:
|
df63633 to
4a5ca62
Compare
...-sql-client/src/main/java/org/apache/flink/table/client/gateway/ProgramTargetDescriptor.java
Outdated
Show resolved
Hide resolved
kl0u
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work @aljoscha !
I had some comments with the main ones being the one about not closing the jobClient and the one about changing the threading model of the MaterializedCollectBatchResult.
...nt/src/main/java/org/apache/flink/table/client/gateway/local/result/CollectStreamResult.java
Show resolved
Hide resolved
...nt/src/main/java/org/apache/flink/table/client/gateway/local/result/CollectStreamResult.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectBatchResult.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectBatchResult.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectBatchResult.java
Outdated
Show resolved
Hide resolved
...k-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
Outdated
Show resolved
Hide resolved
...link-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
Outdated
Show resolved
Hide resolved
...nk-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java
Show resolved
Hide resolved
...-sql-client/src/main/java/org/apache/flink/table/client/gateway/ProgramTargetDescriptor.java
Outdated
Show resolved
Hide resolved
...-sql-client/src/main/java/org/apache/flink/table/client/gateway/ProgramTargetDescriptor.java
Outdated
Show resolved
Hide resolved
...nk-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java
Outdated
Show resolved
Hide resolved
tisonkun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
...-sql-client/src/main/java/org/apache/flink/table/client/gateway/ProgramTargetDescriptor.java
Show resolved
Hide resolved
...k-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
Outdated
Show resolved
Hide resolved
...k-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
Outdated
Show resolved
Hide resolved
...k-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
Outdated
Show resolved
Hide resolved
...link-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
Show resolved
Hide resolved
...nk-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java
Show resolved
Hide resolved
...nt/src/main/java/org/apache/flink/table/client/gateway/local/result/CollectStreamResult.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectBatchResult.java
Outdated
Show resolved
Hide resolved
|
Thanks for the PR @aljoscha. I would like to take a final look before merging. Please ping me when there are no more concerns. |
|
I think I addressed all your comments. Please take another look. |
|
Is everyone OK with me rebasing and squashing this so that Timo can have a look? |
tisonkun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No more concerns from my side. Go ahead :-)
kl0u
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aljoscha I did my pass and I left some comments mainly having to do with style rather than essence. After integrating whichever you find meaningful, you have my +1, but I would feel more comfortable if someone who also knows more about the SQL Client, like @twalthr, has a look before merging, as I am not that familiar with this part of the codebase.
| CommandLine commandLine, | ||
| Options commandLineOptions, | ||
| List<CustomCommandLine> availableCommandLines) throws FlinkException { | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This options is only used for logging. Do we need it or we can simply print the final executionConfig ?
| } | ||
|
|
||
| private Pipeline createPipeline(String name, Configuration flinkConfig) { | ||
| public Pipeline createPipeline(String name, Configuration flinkConfig) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this method it seems we do not use the flinkConfig or the parallelism in the else branch. Is it ok to remove them? Also the else is not needed as we return in the if branch. So this can become:
public Pipeline createPipeline(String name) {
if (streamExecEnv != null) {
// special case for Blink planner to apply batch optimizations
// note: it also modifies the ExecutionConfig!
if (executor instanceof ExecutorBase) {
return ((ExecutorBase) executor).getStreamGraph(name);
}
return streamExecEnv.getStreamGraph(name);
}
return execEnv.createProgramPlan(name);
}
| boolean awaitJobResult) { | ||
| this.context = context; | ||
| this.jobGraph = jobGraph; | ||
| this.pipeline = pipeline; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
checkNotNull?
| * @param awaitJobResult block for a job execution result from the cluster | ||
| */ | ||
| public ProgramDeployer( | ||
| ExecutionContext<C> context, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also here, for core readability, I think it is easier to pass the configuration directly instead of creating in the deploy and when we fetch the result, have a checkState on the ATTACHED flag so that the reader of the code knows what to expect. The way it is now, it seems like in the deploy, we are setting the ATTACHED flag but it is never used.
WDYT?
| throw e; | ||
| } catch (Exception e) { | ||
| throw new SqlExecutionException("Could not locate a cluster.", e); | ||
| if (awaitJobResult) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about configuration.set(DeploymentOptions.ATTACHED, awaitJobResult); ?
| } catch (Exception e) { | ||
| // ignore | ||
| } | ||
| jobClient = executor.execute(pipeline, configuration); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not directly return executor.execute(pipeline, configuration); ?
64339da to
c10cc10
Compare
|
@flinkbot run travis |
b50be35 to
588f931
Compare
This also does not print the cluster ID and web interface URL anymore when submitting because there is no portable way of retrieving these. Previously, even for YARN the printed cluster id was not usable because it was just the class name of the implementation class.
588f931 to
170f6e3
Compare
What is the purpose of the change
Executorinterface andJobClientinstead of manually going through cluster descriptor/cluster specification.This also does not print the cluster ID and web interface URL anymore
when submitting because there is no portable way of retrieving these.
Previously, even for YARN the printed cluster id was not usable because
it was just the class name of the implementation class.
Verifying this change
This change is covered by existing tests and ITcases that were adapted.
Documentation