Skip to content

Commit

Permalink
Add server option for default sdk worker parallelism.
Browse files Browse the repository at this point in the history
  • Loading branch information
tweise committed Sep 30, 2018
1 parent 0e968c5 commit a1000c3
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 11 deletions.
Expand Up @@ -37,16 +37,18 @@ public class FlinkJobInvoker implements JobInvoker {

public static FlinkJobInvoker create(
ListeningExecutorService executorService,
FlinkJobServerDriver.ServerConfiguration configuration) {
return new FlinkJobInvoker(executorService, configuration.getFlinkMasterUrl());
FlinkJobServerDriver.ServerConfiguration serverConfig) {
return new FlinkJobInvoker(executorService, serverConfig);
}

private final ListeningExecutorService executorService;
private final String flinkMasterUrl;
private final FlinkJobServerDriver.ServerConfiguration serverConfig;

private FlinkJobInvoker(ListeningExecutorService executorService, String flinkMasterUrl) {
private FlinkJobInvoker(
ListeningExecutorService executorService,
FlinkJobServerDriver.ServerConfiguration serverConfig) {
this.executorService = executorService;
this.flinkMasterUrl = flinkMasterUrl;
this.serverConfig = serverConfig;
}

@Override
Expand All @@ -62,8 +64,13 @@ public JobInvocation invoke(
String.format("%s_%s", flinkOptions.getJobName(), UUID.randomUUID().toString());
LOG.info("Invoking job {}", invocationId);

flinkOptions.setFlinkMaster(flinkMasterUrl);
//flinkOptions.setSdkWorkerParallelism("[stage]");
if (FlinkPipelineOptions.AUTO.equals(flinkOptions.getFlinkMaster())) {
flinkOptions.setFlinkMaster(serverConfig.getFlinkMasterUrl());
}

if (FlinkPipelineOptions.AUTO.equals(flinkOptions.getSdkWorkerParallelism())) {
flinkOptions.setSdkWorkerParallelism(serverConfig.getSdkWorkerParallelism());
}

flinkOptions.setRunner(null);

Expand Down
Expand Up @@ -22,6 +22,7 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.apache.beam.model.pipeline.v1.Endpoints;
Expand Down Expand Up @@ -63,7 +64,7 @@ public static class ServerConfiguration {

@Option(name = "--artifacts-dir", usage = "The location to store staged artifact files")
private String artifactStagingPath =
System.getProperty("java.io.tmpdir") + "/beam-artifact-staging";
Paths.get(System.getProperty("java.io.tmpdir"), "/beam-artifact-staging").toString();

@Option(
name = "--clean-artifacts-per-job",
Expand All @@ -77,6 +78,13 @@ public static class ServerConfiguration {
public String getFlinkMasterUrl() {
return this.flinkMasterUrl;
}

@Option(name = "--sdk-worker-parallelism", usage = "Parallelism of SDK worker processes")
private String sdkWorkerParallelism = "[pipeline]";

public String getSdkWorkerParallelism() {
return this.sdkWorkerParallelism;
}
}

public static void main(String[] args) throws Exception {
Expand Down
Expand Up @@ -31,6 +31,8 @@
public interface FlinkPipelineOptions
extends PipelineOptions, ApplicationNameOptions, StreamingOptions {

String AUTO = "[auto]";

/**
* List of local files to make available to workers.
*
Expand All @@ -56,7 +58,7 @@ public interface FlinkPipelineOptions
"Address of the Flink Master where the Pipeline should be executed. Can"
+ " either be of the form \"host:port\" or one of the special values [local], "
+ "[collection] or [auto].")
@Default.String("[auto]")
@Default.String(AUTO)
String getFlinkMaster();

void setFlinkMaster(String value);
Expand Down Expand Up @@ -187,9 +189,10 @@ public interface FlinkPipelineOptions

@Description(
"SDK process parallelism for portable pipelines. Currently supported options are "
+ "'[auto]' (Let the runner decide) or "
+ "'[pipeline]' (single SDK harness process per pipeline and task manager JVM) or "
+ "'[stage]' (separate SDK harness for every executable stage.")
@Default.String("[pipeline]")
+ "'[stage]' (separate SDK harness for every executable stage.)")
@Default.String(AUTO)
String getSdkWorkerParallelism();

void setSdkWorkerParallelism(String factory);
Expand Down

0 comments on commit a1000c3

Please sign in to comment.