diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java index 0f27e5dc30b51..989cf4f2f6da3 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java @@ -37,16 +37,18 @@ public class FlinkJobInvoker implements JobInvoker { public static FlinkJobInvoker create( ListeningExecutorService executorService, - FlinkJobServerDriver.ServerConfiguration configuration) { - return new FlinkJobInvoker(executorService, configuration.getFlinkMasterUrl()); + FlinkJobServerDriver.ServerConfiguration serverConfig) { + return new FlinkJobInvoker(executorService, serverConfig); } private final ListeningExecutorService executorService; - private final String flinkMasterUrl; + private final FlinkJobServerDriver.ServerConfiguration serverConfig; - private FlinkJobInvoker(ListeningExecutorService executorService, String flinkMasterUrl) { + private FlinkJobInvoker( + ListeningExecutorService executorService, + FlinkJobServerDriver.ServerConfiguration serverConfig) { this.executorService = executorService; - this.flinkMasterUrl = flinkMasterUrl; + this.serverConfig = serverConfig; } @Override @@ -62,8 +64,13 @@ public JobInvocation invoke( String.format("%s_%s", flinkOptions.getJobName(), UUID.randomUUID().toString()); LOG.info("Invoking job {}", invocationId); - flinkOptions.setFlinkMaster(flinkMasterUrl); - //flinkOptions.setSdkWorkerParallelism("[stage]"); + if (FlinkPipelineOptions.AUTO.equals(flinkOptions.getFlinkMaster())) { + flinkOptions.setFlinkMaster(serverConfig.getFlinkMasterUrl()); + } + + if (FlinkPipelineOptions.AUTO.equals(flinkOptions.getSdkWorkerParallelism())) { + flinkOptions.setSdkWorkerParallelism(serverConfig.getSdkWorkerParallelism()); + } flinkOptions.setRunner(null); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java index d6c8663a4d67e..5d70fe665abcc 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java @@ -22,6 +22,7 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.IOException; +import java.nio.file.Paths; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import org.apache.beam.model.pipeline.v1.Endpoints; @@ -63,7 +64,7 @@ public static class ServerConfiguration { @Option(name = "--artifacts-dir", usage = "The location to store staged artifact files") private String artifactStagingPath = - System.getProperty("java.io.tmpdir") + "/beam-artifact-staging"; + Paths.get(System.getProperty("java.io.tmpdir"), "/beam-artifact-staging").toString(); @Option( name = "--clean-artifacts-per-job", @@ -77,6 +78,13 @@ public static class ServerConfiguration { public String getFlinkMasterUrl() { return this.flinkMasterUrl; } + + @Option(name = "--sdk-worker-parallelism", usage = "Parallelism of SDK worker processes") + private String sdkWorkerParallelism = "[pipeline]"; + + public String getSdkWorkerParallelism() { + return this.sdkWorkerParallelism; + } } public static void main(String[] args) throws Exception { diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java index ee7f6ca358f00..6f8b30282c7de 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java @@ -31,6 +31,8 @@ public interface FlinkPipelineOptions extends PipelineOptions, ApplicationNameOptions, StreamingOptions { + String AUTO = "[auto]"; + /** * List of local files to make available to workers. * @@ -56,7 +58,7 @@ public interface FlinkPipelineOptions "Address of the Flink Master where the Pipeline should be executed. Can" + " either be of the form \"host:port\" or one of the special values [local], " + "[collection] or [auto].") - @Default.String("[auto]") + @Default.String(AUTO) String getFlinkMaster(); void setFlinkMaster(String value); @@ -187,9 +189,10 @@ public interface FlinkPipelineOptions @Description( "SDK process parallelism for portable pipelines. Currently supported options are " + + "'[auto]' (Let the runner decide) or " + "'[pipeline]' (single SDK harness process per pipeline and task manager JVM) or " - + "'[stage]' (separate SDK harness for every executable stage.") - @Default.String("[pipeline]") + + "'[stage]' (separate SDK harness for every executable stage.)") + @Default.String(AUTO) String getSdkWorkerParallelism(); void setSdkWorkerParallelism(String factory);