Skip to content

Commit

Permalink
[FLINK-3774] [shell] Forwards Flink configuration to PlanExecutor
Browse files Browse the repository at this point in the history
The ScalaShellRemoteEnvironment did not properly forward the given Flink configuration
to the PlanExecutor. Consequently, it was not possible to configure the Client to connect
to an HA cluster. This PR corrects the forwarding.

Fix failing FlinkILoopTest with Scala 2.11

This closes #1904.
  • Loading branch information
tillrohrmann committed Apr 25, 2016
1 parent 3f4e2f8 commit 0ee7c79
Show file tree
Hide file tree
Showing 7 changed files with 232 additions and 49 deletions.
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;


import java.net.URL; import java.net.URL;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;


Expand Down Expand Up @@ -177,7 +176,7 @@ public static PlanExecutor createLocalExecutor(Configuration configuration) {
* @return A remote executor. * @return A remote executor.
*/ */
public static PlanExecutor createRemoteExecutor(String hostname, int port, Configuration clientConfiguration, public static PlanExecutor createRemoteExecutor(String hostname, int port, Configuration clientConfiguration,
URL[] jarFiles, URL[] globalClasspaths) { List<URL> jarFiles, List<URL> globalClasspaths) {
if (hostname == null) { if (hostname == null) {
throw new IllegalArgumentException("The hostname must not be null."); throw new IllegalArgumentException("The hostname must not be null.");
} }
Expand All @@ -187,10 +186,10 @@ public static PlanExecutor createRemoteExecutor(String hostname, int port, Confi


Class<? extends PlanExecutor> reClass = loadExecutorClass(REMOTE_EXECUTOR_CLASS); Class<? extends PlanExecutor> reClass = loadExecutorClass(REMOTE_EXECUTOR_CLASS);


List<URL> files = (jarFiles == null || jarFiles.length == 0) ? List<URL> files = (jarFiles == null) ?
Collections.<URL>emptyList() : Arrays.asList(jarFiles); Collections.<URL>emptyList() : jarFiles;
List<URL> paths = (globalClasspaths == null || globalClasspaths.length == 0) ? List<URL> paths = (globalClasspaths == null) ?
Collections.<URL>emptyList() : Arrays.asList(globalClasspaths); Collections.<URL>emptyList() : globalClasspaths;


try { try {
PlanExecutor executor = (clientConfiguration == null) ? PlanExecutor executor = (clientConfiguration == null) ?
Expand Down
Expand Up @@ -30,6 +30,10 @@
import java.io.File; import java.io.File;
import java.net.MalformedURLException; import java.net.MalformedURLException;
import java.net.URL; import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;


/** /**
* An {@link ExecutionEnvironment} that sends programs to a cluster for execution. The environment * An {@link ExecutionEnvironment} that sends programs to a cluster for execution. The environment
Expand All @@ -51,19 +55,19 @@ public class RemoteEnvironment extends ExecutionEnvironment {
protected final int port; protected final int port;


/** The jar files that need to be attached to each job */ /** The jar files that need to be attached to each job */
private final URL[] jarFiles; protected final List<URL> jarFiles;


/** The configuration used by the client that connects to the cluster */ /** The configuration used by the client that connects to the cluster */
private Configuration clientConfiguration; protected Configuration clientConfiguration;


/** The remote executor lazily created upon first use */ /** The remote executor lazily created upon first use */
private PlanExecutor executor; protected PlanExecutor executor;


/** Optional shutdown hook, used in session mode to eagerly terminate the last session */ /** Optional shutdown hook, used in session mode to eagerly terminate the last session */
private Thread shutdownHook; private Thread shutdownHook;


/** The classpaths that need to be attached to each job */ /** The classpaths that need to be attached to each job */
private final URL[] globalClasspaths; protected final List<URL> globalClasspaths;


/** /**
* Creates a new RemoteEnvironment that points to the master (JobManager) described by the * Creates a new RemoteEnvironment that points to the master (JobManager) described by the
Expand Down Expand Up @@ -133,26 +137,31 @@ public RemoteEnvironment(String host, int port, Configuration clientConfig,
this.port = port; this.port = port;
this.clientConfiguration = clientConfig == null ? new Configuration() : clientConfig; this.clientConfiguration = clientConfig == null ? new Configuration() : clientConfig;
if (jarFiles != null) { if (jarFiles != null) {
this.jarFiles = new URL[jarFiles.length]; this.jarFiles = new ArrayList<>(jarFiles.length);
for (int i = 0; i < jarFiles.length; i++) { for (String jarFile : jarFiles) {
try { try {
this.jarFiles[i] = new File(jarFiles[i]).getAbsoluteFile().toURI().toURL(); this.jarFiles.add(new File(jarFile).getAbsoluteFile().toURI().toURL());
} catch (MalformedURLException e) { } catch (MalformedURLException e) {
throw new IllegalArgumentException("JAR file path invalid", e); throw new IllegalArgumentException("JAR file path invalid", e);
} }
} }
} }
else { else {
this.jarFiles = null; this.jarFiles = Collections.emptyList();
}

if (globalClasspaths == null) {
this.globalClasspaths = Collections.emptyList();
} else {
this.globalClasspaths = Arrays.asList(globalClasspaths);
} }
this.globalClasspaths = globalClasspaths;
} }


// ------------------------------------------------------------------------ // ------------------------------------------------------------------------


@Override @Override
public JobExecutionResult execute(String jobName) throws Exception { public JobExecutionResult execute(String jobName) throws Exception {
ensureExecutorCreated(); PlanExecutor executor = getExecutor();


Plan p = createProgramPlan(jobName); Plan p = createProgramPlan(jobName);


Expand All @@ -178,7 +187,11 @@ public String getExecutionPlan() throws Exception {
} }
else { else {
PlanExecutor le = PlanExecutor.createLocalExecutor(null); PlanExecutor le = PlanExecutor.createLocalExecutor(null);
return le.getOptimizerPlanAsJSON(p); String plan = le.getOptimizerPlanAsJSON(p);

le.stop();

return plan;
} }
} }


Expand All @@ -190,7 +203,7 @@ public void startNewSession() throws Exception {
installShutdownHook(); installShutdownHook();
} }


private void ensureExecutorCreated() throws Exception { protected PlanExecutor getExecutor() throws Exception {
if (executor == null) { if (executor == null) {
executor = PlanExecutor.createRemoteExecutor(host, port, clientConfiguration, executor = PlanExecutor.createRemoteExecutor(host, port, clientConfiguration,
jarFiles, globalClasspaths); jarFiles, globalClasspaths);
Expand All @@ -202,6 +215,8 @@ private void ensureExecutorCreated() throws Exception {
executor.start(); executor.start();
installShutdownHook(); installShutdownHook();
} }

return executor;
} }


// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
Expand Down
Expand Up @@ -18,8 +18,6 @@


package org.apache.flink.runtime.minicluster package org.apache.flink.runtime.minicluster


import java.util

import akka.actor.{ActorRef, ActorSystem} import akka.actor.{ActorRef, ActorSystem}
import org.apache.flink.api.common.JobID import org.apache.flink.api.common.JobID


Expand Down
Expand Up @@ -164,7 +164,7 @@ private void jarDir(File dirOrFile2jar, JarOutputStream jos, String path)
File f = new File(dirOrFile2jar, dirList[i]); File f = new File(dirOrFile2jar, dirList[i]);
jarDir(f,jos,subPath); jarDir(f,jos,subPath);
} }
} else { } else if (dirOrFile2jar.exists()) {
if (dirOrFile2jar.getCanonicalPath().equals(mDestJarName)) if (dirOrFile2jar.getCanonicalPath().equals(mDestJarName))
{ {
if (mVerbose) {System.out.println("skipping " + dirOrFile2jar.getPath());} if (mVerbose) {System.out.println("skipping " + dirOrFile2jar.getPath());}
Expand Down
Expand Up @@ -19,14 +19,11 @@
* limitations under the License. * limitations under the License.
*/ */


import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.PlanExecutor; import org.apache.flink.api.common.PlanExecutor;


import org.apache.flink.api.scala.FlinkILoop; import org.apache.flink.api.scala.FlinkILoop;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;


import java.io.File;
import java.net.URL; import java.net.URL;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
Expand All @@ -53,41 +50,36 @@ public class ScalaShellRemoteEnvironment extends RemoteEnvironment {
* user-defined functions, user-defined input formats, or any libraries, those must be * user-defined functions, user-defined input formats, or any libraries, those must be
* provided in the JAR files. * provided in the JAR files.
*/ */
public ScalaShellRemoteEnvironment(String host, int port, FlinkILoop flinkILoop, Configuration clientConfig, String... jarFiles) { public ScalaShellRemoteEnvironment(String host, int port, FlinkILoop flinkILoop, Configuration clientConfig, String... jarFiles) throws Exception {
super(host, port, clientConfig, jarFiles, null); super(host, port, clientConfig, jarFiles, null);
this.flinkILoop = flinkILoop; this.flinkILoop = flinkILoop;
} }


/**
* compiles jars from files in the shell virtual directory on the fly, sends and executes it in the remote environment
*
* @param jobName name of the job as string
* @return Result of the computation
* @throws Exception
*/
@Override @Override
public JobExecutionResult execute(String jobName) throws Exception { protected PlanExecutor getExecutor() throws Exception {
Plan p = createProgramPlan(jobName); // check if we had already started a PlanExecutor. If true, then stop it, because there will
// be a new jar file available for the user code classes
if (this.executor != null) {
this.executor.stop();
}


// write generated classes to disk so that they can be shipped to the cluster
URL jarUrl = flinkILoop.writeFilesToDisk().getAbsoluteFile().toURI().toURL(); URL jarUrl = flinkILoop.writeFilesToDisk().getAbsoluteFile().toURI().toURL();


// get "external jars, and add the shell command jar, pass to executor List<URL> allJarFiles = new ArrayList<>(jarFiles);
List<URL> alljars = new ArrayList<>(); allJarFiles.add(jarUrl);
// get external (library) jars
String[] extJars = this.flinkILoop.getExternalJars();


for (String extJar : extJars) { this.executor = PlanExecutor.createRemoteExecutor(
URL extJarUrl = new File(extJar).getAbsoluteFile().toURI().toURL(); host,
alljars.add(extJarUrl); port,
} clientConfiguration,
allJarFiles,
globalClasspaths
);


// add shell commands executor.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());
alljars.add(jarUrl);
PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, new Configuration(),
alljars.toArray(new URL[alljars.size()]), null);


executor.setPrintStatusDuringExecution(p.getExecutionConfig().isSysoutLoggingEnabled()); return executor;
return executor.executePlan(p);
} }


public static void disableAllContextAndOtherEnvironments() { public static void disableAllContextAndOtherEnvironments() {
Expand Down
Expand Up @@ -71,7 +71,12 @@ class FlinkILoop(
ScalaShellRemoteEnvironment.resetContextEnvironments() ScalaShellRemoteEnvironment.resetContextEnvironments()


// create our environment that submits against the cluster (local or remote) // create our environment that submits against the cluster (local or remote)
val remoteBenv = new ScalaShellRemoteEnvironment(host, port, this, clientConfig) val remoteBenv = new ScalaShellRemoteEnvironment(
host,
port,
this,
clientConfig,
this.getExternalJars(): _*)
val remoteSenv = new ScalaShellRemoteStreamEnvironment(host, port, this); val remoteSenv = new ScalaShellRemoteStreamEnvironment(host, port, this);
// prevent further instantiation of environments // prevent further instantiation of environments
ScalaShellRemoteEnvironment.disableAllContextAndOtherEnvironments() ScalaShellRemoteEnvironment.disableAllContextAndOtherEnvironments()
Expand Down

0 comments on commit 0ee7c79

Please sign in to comment.