diff --git a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java index d1be6d26e3625..20169f6339149 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java @@ -54,30 +54,47 @@ public class RemoteExecutor extends PlanExecutor { private static final Logger LOG = LoggerFactory.getLogger(RemoteExecutor.class); private final List jarFiles; - private final Configuration configuration; + + private final Configuration clientConfiguration; public RemoteExecutor(String hostname, int port) { - this(hostname, port, Collections.emptyList()); + this(hostname, port, Collections.emptyList(), new Configuration()); } public RemoteExecutor(String hostname, int port, String jarFile) { - this(hostname, port, Collections.singletonList(jarFile)); + this(hostname, port, Collections.singletonList(jarFile), new Configuration()); } public RemoteExecutor(String hostport, String jarFile) { - this(getInetFromHostport(hostport), Collections.singletonList(jarFile)); + this(getInetFromHostport(hostport), Collections.singletonList(jarFile), new Configuration()); } public RemoteExecutor(String hostname, int port, List jarFiles) { - this(new InetSocketAddress(hostname, port), jarFiles); + this(new InetSocketAddress(hostname, port), jarFiles, new Configuration()); + } + + public RemoteExecutor(String hostname, int port, Configuration clientConfiguration) { + this(hostname, port, Collections.emptyList(), clientConfiguration); + } + + public RemoteExecutor(String hostname, int port, String jarFile, Configuration clientConfiguration) { + this(hostname, port, Collections.singletonList(jarFile), clientConfiguration); + } + + public RemoteExecutor(String hostport, String jarFile, Configuration clientConfiguration) { + this(getInetFromHostport(hostport), Collections.singletonList(jarFile), clientConfiguration); + } + + public RemoteExecutor(String hostname, int port, List jarFiles, Configuration clientConfiguration) { + this(new InetSocketAddress(hostname, port), jarFiles, clientConfiguration); } - public RemoteExecutor(InetSocketAddress inet, List jarFiles) { + public RemoteExecutor(InetSocketAddress inet, List jarFiles, Configuration clientConfiguration) { this.jarFiles = jarFiles; - configuration = new Configuration(); + this.clientConfiguration = clientConfiguration; - configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, inet.getHostName()); - configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, inet.getPort()); + clientConfiguration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, inet.getHostName()); + clientConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, inet.getPort()); } @Override @@ -87,7 +104,7 @@ public JobExecutionResult executePlan(Plan plan) throws Exception { } public JobExecutionResult executePlanWithJars(JobWithJars p) throws Exception { - Client c = new Client(configuration, p.getUserCodeClassLoader(), -1); + Client c = new Client(clientConfiguration, p.getUserCodeClassLoader(), -1); c.setPrintStatusDuringExecution(isPrintingStatusDuringExecution()); JobSubmissionResult result = c.run(p, -1, true); @@ -103,7 +120,7 @@ public JobExecutionResult executeJar(String jarPath, String assemblerClass, Stri File jarFile = new File(jarPath); PackagedProgram program = new PackagedProgram(jarFile, assemblerClass, args); - Client c = new Client(configuration, program.getUserCodeClassLoader(), -1); + Client c = new Client(clientConfiguration, program.getUserCodeClassLoader(), -1); c.setPrintStatusDuringExecution(isPrintingStatusDuringExecution()); JobSubmissionResult result = c.run(program.getPlanWithJars(), -1, true); @@ -118,7 +135,7 @@ public JobExecutionResult executeJar(String jarPath, String assemblerClass, Stri @Override public String getOptimizerPlanAsJSON(Plan plan) throws Exception { JobWithJars p = new JobWithJars(plan, this.jarFiles); - Client c = new Client(configuration, p.getUserCodeClassLoader(), -1); + Client c = new Client(clientConfiguration, p.getUserCodeClassLoader(), -1); OptimizedPlan op = (OptimizedPlan) c.getOptimizedPlan(p, -1); PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator(); diff --git a/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java b/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java index 47236af0d733a..7f67567dd1ec0 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.Plan; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.configuration.Configuration; import org.apache.flink.client.program.ProgramInvocationException; import org.junit.Test; @@ -67,7 +68,7 @@ public void testUnresolvableHostname2() { try { InetSocketAddress add = new InetSocketAddress(nonExistingHostname, port); - RemoteExecutor exec = new RemoteExecutor(add, Collections.emptyList()); + RemoteExecutor exec = new RemoteExecutor(add, Collections.emptyList(), new Configuration()); exec.executePlan(getProgram()); fail("This should fail with an ProgramInvocationException"); } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java b/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java index 74bdb09527860..1294011573c16 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java @@ -105,11 +105,12 @@ public static PlanExecutor createLocalExecutor(Configuration configuration) { * * @param hostname The address of the JobManager to send the program to. * @param port The port of the JobManager to send the program to. + * @param clientConfiguration The configuration for the client (Akka, default.parallelism). * @param jarFiles A list of jar files that contain the user-defined function (UDF) classes and all classes used * from within the UDFs. * @return A remote executor. */ - public static PlanExecutor createRemoteExecutor(String hostname, int port, String... jarFiles) { + public static PlanExecutor createRemoteExecutor(String hostname, int port, Configuration clientConfiguration, String... jarFiles) { if (hostname == null) { throw new IllegalArgumentException("The hostname must not be null."); } @@ -123,7 +124,10 @@ public static PlanExecutor createRemoteExecutor(String hostname, int port, Strin : Arrays.asList(jarFiles); try { - return reClass.getConstructor(String.class, int.class, List.class).newInstance(hostname, port, files); + PlanExecutor executor = (clientConfiguration == null) ? + reClass.getConstructor(String.class, int.class, List.class).newInstance(hostname, port, files) : + reClass.getConstructor(String.class, int.class, List.class, Configuration.class).newInstance(hostname, port, files, clientConfiguration); + return executor; } catch (Throwable t) { throw new RuntimeException("An error occurred while loading the remote executor (" diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index 32049c4f62495..297982cde1771 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -1105,6 +1105,28 @@ public static ExecutionEnvironment createRemoteEnvironment(String host, int port return new RemoteEnvironment(host, port, jarFiles); } + /** + * Creates a {@link RemoteEnvironment}. The remote environment sends (parts of) the program + * to a cluster for execution. Note that all file paths used in the program must be accessible from the + * cluster. The custom configuration file is used to configure Akka specific configuration parameters + * for the Client only; Program parallelism can be set via {@link ExecutionEnvironment#setParallelism(int)}. + * + * Cluster configuration has to be done in the remotely running Flink instance. + * + * @param host The host name or address of the master (JobManager), where the program should be executed. + * @param port The port of the master (JobManager), where the program should be executed. + * @param clientConfiguration Pass a custom configuration to the Client. + * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the program uses + * user-defined functions, user-defined input formats, or any libraries, those must be + * provided in the JAR files. + * @return A remote environment that executes the program on a cluster. + */ + public static ExecutionEnvironment createRemoteEnvironment(String host, int port, Configuration clientConfiguration, String... jarFiles) { + RemoteEnvironment rec = new RemoteEnvironment(host, port, jarFiles); + rec.setClientConfiguration(clientConfiguration); + return rec; + } + /** * Creates a {@link RemoteEnvironment}. The remote environment sends (parts of) the program * to a cluster for execution. Note that all file paths used in the program must be accessible from the diff --git a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java index 6f84077b6b400..1f73e93e51aaf 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.Plan; import org.apache.flink.api.common.PlanExecutor; +import org.apache.flink.configuration.Configuration; /** * An {@link ExecutionEnvironment} that sends programs @@ -35,6 +36,8 @@ public class RemoteEnvironment extends ExecutionEnvironment { protected final int port; private final String[] jarFiles; + + private Configuration clientConfiguration; /** * Creates a new RemoteEnvironment that points to the master (JobManager) described by the @@ -65,7 +68,7 @@ public RemoteEnvironment(String host, int port, String... jarFiles) { public JobExecutionResult execute(String jobName) throws Exception { Plan p = createProgramPlan(jobName); - PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, jarFiles); + PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, clientConfiguration, jarFiles); executor.setPrintStatusDuringExecution(p.getExecutionConfig().isSysoutLoggingEnabled()); this.lastJobExecutionResult = executor.executePlan(p); @@ -78,7 +81,7 @@ public String getExecutionPlan() throws Exception { p.setDefaultParallelism(getParallelism()); registerCachedFilesWithPlan(p); - PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, jarFiles); + PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, clientConfiguration, jarFiles); return executor.getOptimizerPlanAsJSON(p); } @@ -87,4 +90,8 @@ public String toString() { return "Remote Environment (" + this.host + ":" + this.port + " - parallelism = " + (getParallelism() == -1 ? "default" : getParallelism()) + ") : " + getIdString(); } + + public void setClientConfiguration(Configuration clientConfiguration) { + this.clientConfiguration = clientConfiguration; + } } diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala index cdf7211caea3d..39bb7176edfdb 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala @@ -134,7 +134,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) { def getId: UUID = { javaEnv.getId } - + /** * Gets the JobExecutionResult of the last executed job. */ @@ -181,13 +181,13 @@ class ExecutionEnvironment(javaEnv: JavaEnv) { def addDefaultKryoSerializer(clazz: Class[_], serializer: Serializer[_]): Unit = { javaEnv.addDefaultKryoSerializer(clazz, serializer) } - + /** * Registers the given type with the serialization stack. If the type is eventually * serialized as a POJO, then the type is registered with the POJO serializer. If the * type ends up being serialized with Kryo, then it will be registered at Kryo to make * sure that only tags are written. - * + * */ def registerType(typeClass: Class[_]) { javaEnv.registerType(typeClass) @@ -707,5 +707,32 @@ object ExecutionEnvironment { javaEnv.setParallelism(parallelism) new ExecutionEnvironment(javaEnv) } + + /** + * Creates a remote execution environment. The remote environment sends (parts of) the program + * to a cluster for execution. Note that all file paths used in the program must be accessible + * from the cluster. The custom configuration file is used to configure Akka specific + * configuration parameters for the Client only; Program parallelism can be set via + * [[ExecutionEnvironment.setParallelism]]. + * + * Cluster configuration has to be done in the remotely running Flink instance. + * + * @param host The host name or address of the master (JobManager), where the program should be + * executed. + * @param port The port of the master (JobManager), where the program should be executed. + * @param clientConfiguration Pass a custom configuration to the Client. + * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the + * program uses user-defined functions, user-defined input formats, or any + * libraries, those must be provided in the JAR files. + * @return A remote environment that executes the program on a cluster. + */ + def createRemoteEnvironment( + host: String, + port: Int, + clientConfiguration: Configuration, + jarFiles: String*): ExecutionEnvironment = { + val javaEnv = JavaEnv.createRemoteEnvironment(host, port, clientConfiguration, jarFiles: _*) + new ExecutionEnvironment(javaEnv) + } } diff --git a/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java b/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java index 54af5bcc9f181..a7dc7088b51dd 100644 --- a/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java +++ b/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java @@ -25,6 +25,7 @@ import org.apache.flink.api.common.PlanExecutor; import org.apache.flink.api.scala.FlinkILoop; +import org.apache.flink.configuration.Configuration; import java.util.ArrayList; import java.util.Arrays; @@ -78,7 +79,7 @@ public JobExecutionResult execute(String jobName) throws Exception { alljars.add(jarFile); String[] alljarsArr = new String[alljars.size()]; alljarsArr = alljars.toArray(alljarsArr); - PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, alljarsArr); + PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, new Configuration(), alljarsArr); executor.setPrintStatusDuringExecution(p.getExecutionConfig().isSysoutLoggingEnabled()); return executor.executePlan(p); diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ExecutionEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ExecutionEnvironmentITCase.java index 109af1e35b682..f4b3875fd6401 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ExecutionEnvironmentITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ExecutionEnvironmentITCase.java @@ -80,9 +80,7 @@ public void mapPartition(Iterable values, Collector out) throw out.collect(getRuntimeContext().getIndexOfThisSubtask()); } }); - List resultCollection = new ArrayList(); - result.output(new LocalCollectionOutputFormat(resultCollection)); - env.execute(); + List resultCollection = result.collect(); assertEquals(PARALLELISM, resultCollection.size()); } diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java new file mode 100644 index 0000000000000..68b099d6941e3 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.test.javaApiOperators; + +import org.apache.flink.api.common.functions.RichMapPartitionFunction; +import org.apache.flink.api.common.io.GenericInputFormat; +import org.apache.flink.api.common.operators.util.TestNonRichInputFormat; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.LocalCollectionOutputFormat; +import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.GenericInputSplit; +import org.apache.flink.test.util.ForkableFlinkMiniCluster; +import org.apache.flink.util.Collector; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +@SuppressWarnings("serial") +public class RemoteEnvironmentITCase { + + private static final int TM_SLOTS = 4; + + private static final int NUM_TM = 1; + + private static final int USER_DOP = 2; + + private static final String INVALID_STARTUP_TIMEOUT = "0.001 ms"; + + private static final String VALID_STARTUP_TIMEOUT = "100 s"; + + private static ForkableFlinkMiniCluster cluster; + + @BeforeClass + public static void setupCluster() { + try { + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TM); + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, TM_SLOTS); + cluster = new ForkableFlinkMiniCluster(config, false); + cluster.start(); + } + catch (Exception e) { + e.printStackTrace(); + fail("Error starting test cluster: " + e.getMessage()); + } + } + + @AfterClass + public static void tearDownCluster() { + try { + cluster.stop(); + } + catch (Throwable t) { + t.printStackTrace(); + fail("Cluster shutdown caused an exception: " + t.getMessage()); + } + } + + /** + * Ensure that that Akka configuration parameters can be set. + */ + @Test(expected=IllegalArgumentException.class) + public void testInvalidAkkaConfiguration() throws Throwable { + Configuration config = new Configuration(); + config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, INVALID_STARTUP_TIMEOUT); + + final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment( + cluster.hostname(), + cluster.getLeaderRPCPort(), + config + ); + env.getConfig().disableSysoutLogging(); + + DataSet result = env.createInput(new TestNonRichInputFormat()); + result.output(new LocalCollectionOutputFormat(new ArrayList())); + try { + env.execute(); + Assert.fail("Program should not run successfully, cause of invalid akka settings."); + } catch (ProgramInvocationException ex) { + throw ex.getCause(); + } + } + + /** + * Ensure that the program parallelism can be set even if the configuration is supplied. + */ + @Test + public void testUserSpecificParallelism() throws Exception { + Configuration config = new Configuration(); + config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, VALID_STARTUP_TIMEOUT); + + final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment( + cluster.hostname(), + cluster.getLeaderRPCPort(), + config + ); + env.setParallelism(USER_DOP); + env.getConfig().disableSysoutLogging(); + + DataSet result = env.createInput(new ParallelismDependentInputFormat()) + .rebalance() + .mapPartition(new RichMapPartitionFunction() { + @Override + public void mapPartition(Iterable values, Collector out) throws Exception { + out.collect(getRuntimeContext().getIndexOfThisSubtask()); + } + }); + List resultCollection = result.collect(); + assertEquals(USER_DOP, resultCollection.size()); + } + + private static class ParallelismDependentInputFormat extends GenericInputFormat { + + private transient boolean emitted; + + @Override + public GenericInputSplit[] createInputSplits(int numSplits) throws IOException { + assertEquals(USER_DOP, numSplits); + return super.createInputSplits(numSplits); + } + + @Override + public boolean reachedEnd() { + return emitted; + } + + @Override + public Integer nextRecord(Integer reuse) { + if (emitted) { + return null; + } + emitted = true; + return 1; + } + } +}