-
Notifications
You must be signed in to change notification settings - Fork 13k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-3774] [shell] Forwards Flink configuration to PlanExecutor #1904
Conversation
The ScalaShellRemoteEnvironment did not properly forward the given Flink configuration to the PlanExecutor. Consequently, it was not possible to configure the Client to connect to an HA cluster. This PR corrects the forwarding.
427fe78
to
665925e
Compare
} | ||
|
||
// ------------------------------------------------------------------------ | ||
|
||
@Override | ||
public JobExecutionResult execute(String jobName) throws Exception { | ||
ensureExecutorCreated(); | ||
PlanExecutor executor = getExecutor(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would it make sense to move the assignment to this.executor
from getExecutor()
to this line? as it stands the ScalaShellRemoteEnvironment executor is never closed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, it's bad that the PlanExecutor
is not stopped after it has been used. I will fix this by checking in ScalaShellRemoteEnvironment.getExecutor
whether this.executor
is set. If true, then it will call this.executor.stop()
. That way, there will always be at most one PlanExecutor
active and the last one is stopped by the dispose
call.
@@ -133,26 +137,31 @@ public RemoteEnvironment(String host, int port, Configuration clientConfig, | |||
this.port = port; | |||
this.clientConfiguration = clientConfig == null ? new Configuration() : clientConfig; | |||
if (jarFiles != null) { | |||
this.jarFiles = new URL[jarFiles.length]; | |||
this.jarFiles = new ArrayList<URL>(jarFiles.length); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick - No need of :(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will fix it.
this.jarFiles = new URL[jarFiles.length]; | ||
for (int i = 0; i < jarFiles.length; i++) { | ||
this.jarFiles = new ArrayList<>(jarFiles.length); | ||
for(String jarFile : jarFiles) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
whitespace missing after for
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will fix it.
Changes look good! +1 to merge |
Failing test case is unrelated. Merging. |
The ScalaShellRemoteEnvironment did not properly forward the given Flink configuration to the PlanExecutor. Consequently, it was not possible to configure the Client to connect to an HA cluster. This PR corrects the forwarding. Fix failing FlinkILoopTest with Scala 2.11 This closes apache#1904.
The ScalaShellRemoteEnvironment did not properly forward the given Flink configuration to the PlanExecutor. Consequently, it was not possible to configure the Client to connect to an HA cluster. This PR corrects the forwarding. Fix failing FlinkILoopTest with Scala 2.11 This closes apache#1904.
Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the How To Contribute guide.
In addition to going through the list, please provide a meaningful description of your changes.
mvn clean verify
has been executed successfully locally or a Travis build has passedThe ScalaShellRemoteEnvironment did not properly forward the given Flink configuration
to the PlanExecutor. Consequently, it was not possible to configure the Client to connect
to an HA cluster. This PR corrects the forwarding.