diff --git a/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java b/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java index add8b80262f3d..87f0e09997d46 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java @@ -22,7 +22,6 @@ import org.apache.flink.configuration.Configuration; import java.net.URL; -import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -177,7 +176,7 @@ public static PlanExecutor createLocalExecutor(Configuration configuration) { * @return A remote executor. */ public static PlanExecutor createRemoteExecutor(String hostname, int port, Configuration clientConfiguration, - URL[] jarFiles, URL[] globalClasspaths) { + List jarFiles, List globalClasspaths) { if (hostname == null) { throw new IllegalArgumentException("The hostname must not be null."); } @@ -187,10 +186,10 @@ public static PlanExecutor createRemoteExecutor(String hostname, int port, Confi Class reClass = loadExecutorClass(REMOTE_EXECUTOR_CLASS); - List files = (jarFiles == null || jarFiles.length == 0) ? - Collections.emptyList() : Arrays.asList(jarFiles); - List paths = (globalClasspaths == null || globalClasspaths.length == 0) ? - Collections.emptyList() : Arrays.asList(globalClasspaths); + List files = (jarFiles == null) ? + Collections.emptyList() : jarFiles; + List paths = (globalClasspaths == null) ? + Collections.emptyList() : globalClasspaths; try { PlanExecutor executor = (clientConfiguration == null) ? diff --git a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java index 223ebeea7eb69..34a54babbb545 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java @@ -30,6 +30,10 @@ import java.io.File; import java.net.MalformedURLException; import java.net.URL; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; /** * An {@link ExecutionEnvironment} that sends programs to a cluster for execution. The environment @@ -51,19 +55,19 @@ public class RemoteEnvironment extends ExecutionEnvironment { protected final int port; /** The jar files that need to be attached to each job */ - private final URL[] jarFiles; + protected final List jarFiles; /** The configuration used by the client that connects to the cluster */ - private Configuration clientConfiguration; + protected Configuration clientConfiguration; /** The remote executor lazily created upon first use */ - private PlanExecutor executor; + protected PlanExecutor executor; /** Optional shutdown hook, used in session mode to eagerly terminate the last session */ private Thread shutdownHook; /** The classpaths that need to be attached to each job */ - private final URL[] globalClasspaths; + protected final List globalClasspaths; /** * Creates a new RemoteEnvironment that points to the master (JobManager) described by the @@ -133,26 +137,31 @@ public RemoteEnvironment(String host, int port, Configuration clientConfig, this.port = port; this.clientConfiguration = clientConfig == null ? new Configuration() : clientConfig; if (jarFiles != null) { - this.jarFiles = new URL[jarFiles.length]; - for (int i = 0; i < jarFiles.length; i++) { + this.jarFiles = new ArrayList<>(jarFiles.length); + for (String jarFile : jarFiles) { try { - this.jarFiles[i] = new File(jarFiles[i]).getAbsoluteFile().toURI().toURL(); + this.jarFiles.add(new File(jarFile).getAbsoluteFile().toURI().toURL()); } catch (MalformedURLException e) { throw new IllegalArgumentException("JAR file path invalid", e); } } } else { - this.jarFiles = null; + this.jarFiles = Collections.emptyList(); + } + + if (globalClasspaths == null) { + this.globalClasspaths = Collections.emptyList(); + } else { + this.globalClasspaths = Arrays.asList(globalClasspaths); } - this.globalClasspaths = globalClasspaths; } // ------------------------------------------------------------------------ @Override public JobExecutionResult execute(String jobName) throws Exception { - ensureExecutorCreated(); + PlanExecutor executor = getExecutor(); Plan p = createProgramPlan(jobName); @@ -178,7 +187,11 @@ public String getExecutionPlan() throws Exception { } else { PlanExecutor le = PlanExecutor.createLocalExecutor(null); - return le.getOptimizerPlanAsJSON(p); + String plan = le.getOptimizerPlanAsJSON(p); + + le.stop(); + + return plan; } } @@ -190,7 +203,7 @@ public void startNewSession() throws Exception { installShutdownHook(); } - private void ensureExecutorCreated() throws Exception { + protected PlanExecutor getExecutor() throws Exception { if (executor == null) { executor = PlanExecutor.createRemoteExecutor(host, port, clientConfiguration, jarFiles, globalClasspaths); @@ -202,6 +215,8 @@ private void ensureExecutorCreated() throws Exception { executor.start(); installShutdownHook(); } + + return executor; } // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala index 1b398defc8a70..5bebd48de2a29 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala @@ -18,8 +18,6 @@ package org.apache.flink.runtime.minicluster -import java.util - import akka.actor.{ActorRef, ActorSystem} import org.apache.flink.api.common.JobID diff --git a/flink-scala-shell/src/main/java/org/apache/flink/api/java/JarHelper.java b/flink-scala-shell/src/main/java/org/apache/flink/api/java/JarHelper.java index 83fb34210af56..e00c6022659f3 100644 --- a/flink-scala-shell/src/main/java/org/apache/flink/api/java/JarHelper.java +++ b/flink-scala-shell/src/main/java/org/apache/flink/api/java/JarHelper.java @@ -164,7 +164,7 @@ private void jarDir(File dirOrFile2jar, JarOutputStream jos, String path) File f = new File(dirOrFile2jar, dirList[i]); jarDir(f,jos,subPath); } - } else { + } else if (dirOrFile2jar.exists()) { if (dirOrFile2jar.getCanonicalPath().equals(mDestJarName)) { if (mVerbose) {System.out.println("skipping " + dirOrFile2jar.getPath());} diff --git a/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java b/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java index 52cbfe64750e9..1465e1df9b24f 100644 --- a/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java +++ b/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java @@ -19,14 +19,11 @@ * limitations under the License. */ -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.common.Plan; import org.apache.flink.api.common.PlanExecutor; import org.apache.flink.api.scala.FlinkILoop; import org.apache.flink.configuration.Configuration; -import java.io.File; import java.net.URL; import java.util.ArrayList; import java.util.List; @@ -53,41 +50,36 @@ public class ScalaShellRemoteEnvironment extends RemoteEnvironment { * user-defined functions, user-defined input formats, or any libraries, those must be * provided in the JAR files. */ - public ScalaShellRemoteEnvironment(String host, int port, FlinkILoop flinkILoop, Configuration clientConfig, String... jarFiles) { + public ScalaShellRemoteEnvironment(String host, int port, FlinkILoop flinkILoop, Configuration clientConfig, String... jarFiles) throws Exception { super(host, port, clientConfig, jarFiles, null); this.flinkILoop = flinkILoop; } - /** - * compiles jars from files in the shell virtual directory on the fly, sends and executes it in the remote environment - * - * @param jobName name of the job as string - * @return Result of the computation - * @throws Exception - */ @Override - public JobExecutionResult execute(String jobName) throws Exception { - Plan p = createProgramPlan(jobName); + protected PlanExecutor getExecutor() throws Exception { + // check if we had already started a PlanExecutor. If true, then stop it, because there will + // be a new jar file available for the user code classes + if (this.executor != null) { + this.executor.stop(); + } + // write generated classes to disk so that they can be shipped to the cluster URL jarUrl = flinkILoop.writeFilesToDisk().getAbsoluteFile().toURI().toURL(); - // get "external jars, and add the shell command jar, pass to executor - List alljars = new ArrayList<>(); - // get external (library) jars - String[] extJars = this.flinkILoop.getExternalJars(); + List allJarFiles = new ArrayList<>(jarFiles); + allJarFiles.add(jarUrl); - for (String extJar : extJars) { - URL extJarUrl = new File(extJar).getAbsoluteFile().toURI().toURL(); - alljars.add(extJarUrl); - } + this.executor = PlanExecutor.createRemoteExecutor( + host, + port, + clientConfiguration, + allJarFiles, + globalClasspaths + ); - // add shell commands - alljars.add(jarUrl); - PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, new Configuration(), - alljars.toArray(new URL[alljars.size()]), null); + executor.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled()); - executor.setPrintStatusDuringExecution(p.getExecutionConfig().isSysoutLoggingEnabled()); - return executor.executePlan(p); + return executor; } public static void disableAllContextAndOtherEnvironments() { diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala index b17cf15543d88..3ac5da88a1cca 100644 --- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala +++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala @@ -71,7 +71,12 @@ class FlinkILoop( ScalaShellRemoteEnvironment.resetContextEnvironments() // create our environment that submits against the cluster (local or remote) - val remoteBenv = new ScalaShellRemoteEnvironment(host, port, this, clientConfig) + val remoteBenv = new ScalaShellRemoteEnvironment( + host, + port, + this, + clientConfig, + this.getExternalJars(): _*) val remoteSenv = new ScalaShellRemoteStreamEnvironment(host, port, this); // prevent further instantiation of environments ScalaShellRemoteEnvironment.disableAllContextAndOtherEnvironments() diff --git a/flink-scala-shell/src/test/java/org/apache/flink/api/java/FlinkILoopTest.java b/flink-scala-shell/src/test/java/org/apache/flink/api/java/FlinkILoopTest.java new file mode 100644 index 0000000000000..a3c272f22af6c --- /dev/null +++ b/flink-scala-shell/src/test/java/org/apache/flink/api/java/FlinkILoopTest.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.PlanExecutor; +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.api.scala.FlinkILoop; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.TestLogger; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.BDDMockito; +import org.mockito.Matchers; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import scala.Option; +import scala.tools.nsc.Settings; +import scala.tools.nsc.settings.MutableSettings; + +import java.util.List; + +import static org.junit.Assert.assertEquals; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(PlanExecutor.class) +public class FlinkILoopTest extends TestLogger { + + @Test + public void testConfigurationForwarding() throws Exception { + Configuration configuration = new Configuration(); + configuration.setString("foobar", "foobar"); + FlinkILoop flinkILoop = new FlinkILoop("localhost", 6123, configuration, Option.empty()); + + final TestPlanExecutor testPlanExecutor = new TestPlanExecutor(); + + PowerMockito.mockStatic(PlanExecutor.class); + BDDMockito.given(PlanExecutor.createRemoteExecutor( + Matchers.anyString(), + Matchers.anyInt(), + Matchers.any(Configuration.class), + Matchers.any(java.util.List.class), + Matchers.any(java.util.List.class) + )).willAnswer(new Answer() { + @Override + public PlanExecutor answer(InvocationOnMock invocation) throws Throwable { + testPlanExecutor.setHost((String)invocation.getArguments()[0]); + testPlanExecutor.setPort((Integer)invocation.getArguments()[1]); + testPlanExecutor.setConfiguration((Configuration)invocation.getArguments()[2]); + testPlanExecutor.setJars((List)invocation.getArguments()[3]); + testPlanExecutor.setGlobalClasspaths((List)invocation.getArguments()[4]); + + return testPlanExecutor; + } + }); + + Settings settings = new Settings(); + ((MutableSettings.BooleanSetting)settings.usejavacp()).value_$eq(true); + + flinkILoop.settings_$eq(settings); + flinkILoop.createInterpreter(); + + ExecutionEnvironment env = flinkILoop.scalaBenv().getJavaEnv(); + + env.fromElements(1).output(new DiscardingOutputFormat()); + + env.execute("Test job"); + + Configuration forwardedConfiguration = testPlanExecutor.getConfiguration(); + + assertEquals(configuration, forwardedConfiguration); + } + + static class TestPlanExecutor extends PlanExecutor { + + private String host; + private int port; + private Configuration configuration; + private List jars; + private List globalClasspaths; + + @Override + public void start() throws Exception { + + } + + @Override + public void stop() throws Exception { + + } + + @Override + public boolean isRunning() { + return false; + } + + @Override + public JobExecutionResult executePlan(Plan plan) throws Exception { + return null; + } + + @Override + public String getOptimizerPlanAsJSON(Plan plan) throws Exception { + return null; + } + + @Override + public void endSession(JobID jobID) throws Exception { + + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } + + public Configuration getConfiguration() { + return configuration; + } + + public void setConfiguration(Configuration configuration) { + this.configuration = configuration; + } + + public List getJars() { + return jars; + } + + public void setJars(List jars) { + this.jars = jars; + } + + public List getGlobalClasspaths() { + return globalClasspaths; + } + + public void setGlobalClasspaths(List globalClasspaths) { + this.globalClasspaths = globalClasspaths; + } + } + +}