From 341447237c59d1ef2181aed3014bc7521a7ab000 Mon Sep 17 00:00:00 2001 From: Dawid Wysakowicz Date: Mon, 16 Jul 2018 17:30:17 +0200 Subject: [PATCH] [FLINK-9866] Allow passing command line arguments to standalone job This closes #6344 --- flink-container/docker/docker-compose.yml | 2 +- .../StandaloneJobClusterEntryPoint.java | 15 ++++++++++----- .../StandaloneJobClusterEntryPointTest.java | 5 +++-- .../flink/container/entrypoint/TestJob.java | 4 +++- 4 files changed, 17 insertions(+), 9 deletions(-) diff --git a/flink-container/docker/docker-compose.yml b/flink-container/docker/docker-compose.yml index 81e4c8c8a54b4..5fddff3a5dc6c 100644 --- a/flink-container/docker/docker-compose.yml +++ b/flink-container/docker/docker-compose.yml @@ -24,7 +24,7 @@ services: image: ${FLINK_DOCKER_IMAGE_NAME:-flink-job} ports: - "8081:8081" - command: job-cluster --job-classname ${FLINK_JOB} -Djobmanager.rpc.address=job-cluster + command: job-cluster --job-classname ${FLINK_JOB} -Djobmanager.rpc.address=job-cluster ${FLINK_JOB_ARGUMENTS} taskmanager: image: ${FLINK_DOCKER_IMAGE_NAME:-flink-job} diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java index 47cca4c7d8509..57f7ca239b705 100644 --- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java +++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java @@ -51,20 +51,23 @@ import java.util.concurrent.CompletableFuture; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * {@link JobClusterEntrypoint} which is started with a job in a predefined * location. */ public final class StandaloneJobClusterEntryPoint extends JobClusterEntrypoint { - private static final String[] EMPTY_ARGS = new String[0]; + private final String[] programArguments; @Nonnull private final String jobClassName; - StandaloneJobClusterEntryPoint(Configuration configuration, @Nonnull String jobClassName) { + StandaloneJobClusterEntryPoint(Configuration configuration, @Nonnull String jobClassName, @Nonnull String[] programArguments) { super(configuration); - this.jobClassName = jobClassName; + this.programArguments = checkNotNull(programArguments); + this.jobClassName = checkNotNull(jobClassName); } @Override @@ -84,7 +87,7 @@ protected JobGraph retrieveJobGraph(Configuration configuration) throws FlinkExc private PackagedProgram createPackagedProgram() throws FlinkException { try { final Class mainClass = getClass().getClassLoader().loadClass(jobClassName); - return new PackagedProgram(mainClass, EMPTY_ARGS); + return new PackagedProgram(mainClass, programArguments); } catch (ClassNotFoundException | ProgramInvocationException e) { throw new FlinkException("Could not load the provied entrypoint class.", e); } @@ -148,7 +151,9 @@ public static void main(String[] args) { configuration.setString(ClusterEntrypoint.EXECUTION_MODE, ExecutionMode.DETACHED.toString()); - StandaloneJobClusterEntryPoint entrypoint = new StandaloneJobClusterEntryPoint(configuration, clusterConfiguration.getJobClassName()); + StandaloneJobClusterEntryPoint entrypoint = new StandaloneJobClusterEntryPoint(configuration, + clusterConfiguration.getJobClassName(), + clusterConfiguration.getArgs()); entrypoint.startCluster(); } diff --git a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java index 360799d19a823..d97d2b714d4c9 100644 --- a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java +++ b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java @@ -42,11 +42,12 @@ public void testJobGraphRetrieval() throws FlinkException { configuration.setInteger(CoreOptions.DEFAULT_PARALLELISM, parallelism); final StandaloneJobClusterEntryPoint standaloneJobClusterEntryPoint = new StandaloneJobClusterEntryPoint( configuration, - TestJob.class.getCanonicalName()); + TestJob.class.getCanonicalName(), + new String[] {"--arg", "suffix"}); final JobGraph jobGraph = standaloneJobClusterEntryPoint.retrieveJobGraph(configuration); - assertThat(jobGraph.getName(), is(equalTo(TestJob.class.getCanonicalName()))); + assertThat(jobGraph.getName(), is(equalTo(TestJob.class.getCanonicalName() + "-suffix"))); assertThat(jobGraph.getMaximumParallelism(), is(parallelism)); } diff --git a/flink-container/src/test/java/org/apache/flink/container/entrypoint/TestJob.java b/flink-container/src/test/java/org/apache/flink/container/entrypoint/TestJob.java index 5f8857fc35fb4..ada434dd8b7bf 100644 --- a/flink-container/src/test/java/org/apache/flink/container/entrypoint/TestJob.java +++ b/flink-container/src/test/java/org/apache/flink/container/entrypoint/TestJob.java @@ -18,6 +18,7 @@ package org.apache.flink.container.entrypoint; +import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -35,6 +36,7 @@ public static void main(String[] args) throws Exception { final SingleOutputStreamOperator mapper = source.map(element -> 2 * element); mapper.addSink(new DiscardingSink<>()); - env.execute(TestJob.class.getCanonicalName()); + ParameterTool parameterTool = ParameterTool.fromArgs(args); + env.execute(TestJob.class.getCanonicalName() + "-" + parameterTool.getRequired("arg")); } }