Skip to content

Commit

Permalink
[FLINK-2373] Add configuration parameter to createRemoteEnvironment m…
Browse files Browse the repository at this point in the history
…ethod

This closes #1066
  • Loading branch information
akunft authored and fhueske committed Sep 15, 2015
1 parent 8a75025 commit e78b80c
Show file tree
Hide file tree
Showing 9 changed files with 263 additions and 24 deletions.
Expand Up @@ -54,30 +54,47 @@ public class RemoteExecutor extends PlanExecutor {
private static final Logger LOG = LoggerFactory.getLogger(RemoteExecutor.class);

private final List<String> jarFiles;
private final Configuration configuration;

private final Configuration clientConfiguration;

public RemoteExecutor(String hostname, int port) {
this(hostname, port, Collections.<String>emptyList());
this(hostname, port, Collections.<String>emptyList(), new Configuration());
}

public RemoteExecutor(String hostname, int port, String jarFile) {
this(hostname, port, Collections.singletonList(jarFile));
this(hostname, port, Collections.singletonList(jarFile), new Configuration());
}

public RemoteExecutor(String hostport, String jarFile) {
this(getInetFromHostport(hostport), Collections.singletonList(jarFile));
this(getInetFromHostport(hostport), Collections.singletonList(jarFile), new Configuration());
}

public RemoteExecutor(String hostname, int port, List<String> jarFiles) {
this(new InetSocketAddress(hostname, port), jarFiles);
this(new InetSocketAddress(hostname, port), jarFiles, new Configuration());
}

public RemoteExecutor(String hostname, int port, Configuration clientConfiguration) {
this(hostname, port, Collections.<String>emptyList(), clientConfiguration);
}

public RemoteExecutor(String hostname, int port, String jarFile, Configuration clientConfiguration) {
this(hostname, port, Collections.singletonList(jarFile), clientConfiguration);
}

public RemoteExecutor(String hostport, String jarFile, Configuration clientConfiguration) {
this(getInetFromHostport(hostport), Collections.singletonList(jarFile), clientConfiguration);
}

public RemoteExecutor(String hostname, int port, List<String> jarFiles, Configuration clientConfiguration) {
this(new InetSocketAddress(hostname, port), jarFiles, clientConfiguration);
}

public RemoteExecutor(InetSocketAddress inet, List<String> jarFiles) {
public RemoteExecutor(InetSocketAddress inet, List<String> jarFiles, Configuration clientConfiguration) {
this.jarFiles = jarFiles;
configuration = new Configuration();
this.clientConfiguration = clientConfiguration;

configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, inet.getHostName());
configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, inet.getPort());
clientConfiguration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, inet.getHostName());
clientConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, inet.getPort());
}

@Override
Expand All @@ -87,7 +104,7 @@ public JobExecutionResult executePlan(Plan plan) throws Exception {
}

public JobExecutionResult executePlanWithJars(JobWithJars p) throws Exception {
Client c = new Client(configuration, p.getUserCodeClassLoader(), -1);
Client c = new Client(clientConfiguration, p.getUserCodeClassLoader(), -1);
c.setPrintStatusDuringExecution(isPrintingStatusDuringExecution());

JobSubmissionResult result = c.run(p, -1, true);
Expand All @@ -103,7 +120,7 @@ public JobExecutionResult executeJar(String jarPath, String assemblerClass, Stri
File jarFile = new File(jarPath);
PackagedProgram program = new PackagedProgram(jarFile, assemblerClass, args);

Client c = new Client(configuration, program.getUserCodeClassLoader(), -1);
Client c = new Client(clientConfiguration, program.getUserCodeClassLoader(), -1);
c.setPrintStatusDuringExecution(isPrintingStatusDuringExecution());

JobSubmissionResult result = c.run(program.getPlanWithJars(), -1, true);
Expand All @@ -118,7 +135,7 @@ public JobExecutionResult executeJar(String jarPath, String assemblerClass, Stri
@Override
public String getOptimizerPlanAsJSON(Plan plan) throws Exception {
JobWithJars p = new JobWithJars(plan, this.jarFiles);
Client c = new Client(configuration, p.getUserCodeClassLoader(), -1);
Client c = new Client(clientConfiguration, p.getUserCodeClassLoader(), -1);

OptimizedPlan op = (OptimizedPlan) c.getOptimizedPlan(p, -1);
PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
Expand Down
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.client.program.ProgramInvocationException;
import org.junit.Test;

Expand Down Expand Up @@ -67,7 +68,7 @@ public void testUnresolvableHostname2() {

try {
InetSocketAddress add = new InetSocketAddress(nonExistingHostname, port);
RemoteExecutor exec = new RemoteExecutor(add, Collections.<String>emptyList());
RemoteExecutor exec = new RemoteExecutor(add, Collections.<String>emptyList(), new Configuration());
exec.executePlan(getProgram());
fail("This should fail with an ProgramInvocationException");
}
Expand Down
Expand Up @@ -105,11 +105,12 @@ public static PlanExecutor createLocalExecutor(Configuration configuration) {
*
* @param hostname The address of the JobManager to send the program to.
* @param port The port of the JobManager to send the program to.
* @param clientConfiguration The configuration for the client (Akka, default.parallelism).
* @param jarFiles A list of jar files that contain the user-defined function (UDF) classes and all classes used
* from within the UDFs.
* @return A remote executor.
*/
public static PlanExecutor createRemoteExecutor(String hostname, int port, String... jarFiles) {
public static PlanExecutor createRemoteExecutor(String hostname, int port, Configuration clientConfiguration, String... jarFiles) {
if (hostname == null) {
throw new IllegalArgumentException("The hostname must not be null.");
}
Expand All @@ -123,7 +124,10 @@ public static PlanExecutor createRemoteExecutor(String hostname, int port, Strin
: Arrays.asList(jarFiles);

try {
return reClass.getConstructor(String.class, int.class, List.class).newInstance(hostname, port, files);
PlanExecutor executor = (clientConfiguration == null) ?
reClass.getConstructor(String.class, int.class, List.class).newInstance(hostname, port, files) :
reClass.getConstructor(String.class, int.class, List.class, Configuration.class).newInstance(hostname, port, files, clientConfiguration);
return executor;
}
catch (Throwable t) {
throw new RuntimeException("An error occurred while loading the remote executor ("
Expand Down
Expand Up @@ -1105,6 +1105,28 @@ public static ExecutionEnvironment createRemoteEnvironment(String host, int port
return new RemoteEnvironment(host, port, jarFiles);
}

/**
* Creates a {@link RemoteEnvironment}. The remote environment sends (parts of) the program
* to a cluster for execution. Note that all file paths used in the program must be accessible from the
* cluster. The custom configuration file is used to configure Akka specific configuration parameters
* for the Client only; Program parallelism can be set via {@link ExecutionEnvironment#setParallelism(int)}.
*
* Cluster configuration has to be done in the remotely running Flink instance.
*
* @param host The host name or address of the master (JobManager), where the program should be executed.
* @param port The port of the master (JobManager), where the program should be executed.
* @param clientConfiguration Pass a custom configuration to the Client.
* @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the program uses
* user-defined functions, user-defined input formats, or any libraries, those must be
* provided in the JAR files.
* @return A remote environment that executes the program on a cluster.
*/
public static ExecutionEnvironment createRemoteEnvironment(String host, int port, Configuration clientConfiguration, String... jarFiles) {
RemoteEnvironment rec = new RemoteEnvironment(host, port, jarFiles);
rec.setClientConfiguration(clientConfiguration);
return rec;
}

/**
* Creates a {@link RemoteEnvironment}. The remote environment sends (parts of) the program
* to a cluster for execution. Note that all file paths used in the program must be accessible from the
Expand Down
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.PlanExecutor;
import org.apache.flink.configuration.Configuration;

/**
* An {@link ExecutionEnvironment} that sends programs
Expand All @@ -35,6 +36,8 @@ public class RemoteEnvironment extends ExecutionEnvironment {
protected final int port;

private final String[] jarFiles;

private Configuration clientConfiguration;

/**
* Creates a new RemoteEnvironment that points to the master (JobManager) described by the
Expand Down Expand Up @@ -65,7 +68,7 @@ public RemoteEnvironment(String host, int port, String... jarFiles) {
public JobExecutionResult execute(String jobName) throws Exception {
Plan p = createProgramPlan(jobName);

PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, jarFiles);
PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, clientConfiguration, jarFiles);
executor.setPrintStatusDuringExecution(p.getExecutionConfig().isSysoutLoggingEnabled());

this.lastJobExecutionResult = executor.executePlan(p);
Expand All @@ -78,7 +81,7 @@ public String getExecutionPlan() throws Exception {
p.setDefaultParallelism(getParallelism());
registerCachedFilesWithPlan(p);

PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, jarFiles);
PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, clientConfiguration, jarFiles);
return executor.getOptimizerPlanAsJSON(p);
}

Expand All @@ -87,4 +90,8 @@ public String toString() {
return "Remote Environment (" + this.host + ":" + this.port + " - parallelism = " +
(getParallelism() == -1 ? "default" : getParallelism()) + ") : " + getIdString();
}

public void setClientConfiguration(Configuration clientConfiguration) {
this.clientConfiguration = clientConfiguration;
}
}
Expand Up @@ -134,7 +134,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
def getId: UUID = {
javaEnv.getId
}

/**
* Gets the JobExecutionResult of the last executed job.
*/
Expand Down Expand Up @@ -181,13 +181,13 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
def addDefaultKryoSerializer(clazz: Class[_], serializer: Serializer[_]): Unit = {
javaEnv.addDefaultKryoSerializer(clazz, serializer)
}

/**
* Registers the given type with the serialization stack. If the type is eventually
* serialized as a POJO, then the type is registered with the POJO serializer. If the
* type ends up being serialized with Kryo, then it will be registered at Kryo to make
* sure that only tags are written.
*
*
*/
def registerType(typeClass: Class[_]) {
javaEnv.registerType(typeClass)
Expand Down Expand Up @@ -707,5 +707,32 @@ object ExecutionEnvironment {
javaEnv.setParallelism(parallelism)
new ExecutionEnvironment(javaEnv)
}

/**
* Creates a remote execution environment. The remote environment sends (parts of) the program
* to a cluster for execution. Note that all file paths used in the program must be accessible
* from the cluster. The custom configuration file is used to configure Akka specific
* configuration parameters for the Client only; Program parallelism can be set via
* [[ExecutionEnvironment.setParallelism]].
*
* Cluster configuration has to be done in the remotely running Flink instance.
*
* @param host The host name or address of the master (JobManager), where the program should be
* executed.
* @param port The port of the master (JobManager), where the program should be executed.
* @param clientConfiguration Pass a custom configuration to the Client.
* @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the
* program uses user-defined functions, user-defined input formats, or any
* libraries, those must be provided in the JAR files.
* @return A remote environment that executes the program on a cluster.
*/
def createRemoteEnvironment(
host: String,
port: Int,
clientConfiguration: Configuration,
jarFiles: String*): ExecutionEnvironment = {
val javaEnv = JavaEnv.createRemoteEnvironment(host, port, clientConfiguration, jarFiles: _*)
new ExecutionEnvironment(javaEnv)
}
}

Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.api.common.PlanExecutor;

import org.apache.flink.api.scala.FlinkILoop;
import org.apache.flink.configuration.Configuration;

import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -78,7 +79,7 @@ public JobExecutionResult execute(String jobName) throws Exception {
alljars.add(jarFile);
String[] alljarsArr = new String[alljars.size()];
alljarsArr = alljars.toArray(alljarsArr);
PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, alljarsArr);
PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, new Configuration(), alljarsArr);

executor.setPrintStatusDuringExecution(p.getExecutionConfig().isSysoutLoggingEnabled());
return executor.executePlan(p);
Expand Down
Expand Up @@ -80,9 +80,7 @@ public void mapPartition(Iterable<Integer> values, Collector<Integer> out) throw
out.collect(getRuntimeContext().getIndexOfThisSubtask());
}
});
List<Integer> resultCollection = new ArrayList<Integer>();
result.output(new LocalCollectionOutputFormat<Integer>(resultCollection));
env.execute();
List<Integer> resultCollection = result.collect();
assertEquals(PARALLELISM, resultCollection.size());
}

Expand Down

0 comments on commit e78b80c

Please sign in to comment.