New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-13946] Remove job session related code from ExecutionEnvironment #9607
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 5267206 (Fri Sep 06 13:30:35 UTC 2019) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @kl0u for opening this pull requests. I left some inline comments.
flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
Outdated
Show resolved
Hide resolved
flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
Outdated
Show resolved
Hide resolved
} | ||
ClusterClient<?> client = null; | ||
try { | ||
client = startClusterClient(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inline startClusterClient
and stopClusterClient
make more sense to me.
flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
Outdated
Show resolved
Hide resolved
flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
Show resolved
Hide resolved
flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
Outdated
Show resolved
Hide resolved
Hi @tisonkun ! Thanks for the review. I integrated your comments. Please have a look and let me know what you think. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good! But there is still some superfluous/needlessly complicated code, IMO.
flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
Outdated
Show resolved
Hide resolved
private Configuration createExecutorServiceConfig() { | ||
final Configuration newConfiguration = new Configuration(); | ||
newConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, taskManagerNumSlots); | ||
newConfiguration.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, defaultOverwriteFiles); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I only see usage of this setter in one single test, maybe we can also remove that or at least not use a field for this anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean the setTaskManagerNumSlots()
setter?
flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your updates @kl0u! Generally looks good to me.
This pull request has conflicts with current master and fails on compile error. Please rebase and fix compile error.
I integrated your comments @aljoscha . Please have a look and let me know what you think. |
@@ -93,4 +93,9 @@ public String stopWithSavepoint(JobID jobId, boolean advanceToEndOfEventTime, @N | |||
public Map<String, OptionalFailure<Object>> getAccumulators(JobID jobID, ClassLoader loader) { | |||
return Collections.emptyMap(); | |||
} | |||
|
|||
@Override | |||
public void close() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wried. The latest CI complaint with
/home/travis/build/flink-ci/flink/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/FakeClusterClient.java:[43,8] org.apache.flink.yarn.util.FakeClusterClient is not abstract and does not override abstract method close() in java.lang.AutoCloseable
but actually we implement it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's see what travis says now.
flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
Show resolved
Hide resolved
Hi @kl0u, thanks for your update. I notice the strange state in travis that reports a compile error which should not be there. Please take a look. Also you can mark conversations that addressed or reached a consensus above as "resolved" to fold them, which make ongoing reviews more clear. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @kl0u, this pull request now looks good to me.
+1 to merge. Also it will unblock FLINK-13961 which I'm glad to implement our discussion on JobExecutor(Service)
. Please also take a look at the JIRA. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this looks good now!
What is the purpose of the change
This PR removes code related to JobSessions from the
ExecutionEnvironment
and thePlanExecutor
s. This code was added in the context of FLINK-2097 but it was never activated, as illustrated by the comment at ExecutionEnvironment.java#L285 . The work in this PR is part of the preparation for the upcoming re-design of the whole Client/Executor API.Brief change log
The changes in the subclasses of the
ExecutionEnvironment
remove methods that were setting session-related parameters and reflect the simplification of thePlanExecutor
lifecycle explained below (forLocal
andRemoteEnvironment
).The changes to the
PlanExecutors
have to do with the executor's lifecycle. Now the executor itself controls its lifecycle (start()
andstop()
areprivate
) and we instantiate an executor for each call toexecutePlan()
. This allows to get rid of the reapers from theLocal
andRemoteEnvironments
and thelock
that protected concurrent access to the executor's state.The lifecycle is more explicit now and aligned with the current use of the
ExecutionEnvironment
. If in the future we choose to change this and decide to re-use execution environments, then we can add this functionality back, potentially under a different design/architecture.Verifying this change
This change is a code cleanup so it is covered by existing tests.
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation
Please have a look at this one @aljoscha and @tillrohrmann.