Skip to content

Commit

Permalink
[BEAM-5520] Portable pipeline option to control SDK harness process p…
Browse files Browse the repository at this point in the history
…arallelism.
  • Loading branch information
tweise committed Oct 1, 2018
1 parent 766a1dc commit 1fdc2f9
Show file tree
Hide file tree
Showing 14 changed files with 142 additions and 68 deletions.
Expand Up @@ -38,7 +38,6 @@
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.core.construction.RehydratedComponents;
import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
Expand All @@ -64,7 +63,6 @@
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
Expand Down Expand Up @@ -116,17 +114,10 @@ public class FlinkBatchPortablePipelineTranslator
* {@link ExecutionEnvironment}.
*/
public static BatchTranslationContext createTranslationContext(
JobInfo jobInfo, List<String> filesToStage) {
PipelineOptions pipelineOptions;
try {
pipelineOptions = PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions());
} catch (IOException e) {
throw new RuntimeException(e);
}
JobInfo jobInfo, FlinkPipelineOptions pipelineOptions, List<String> filesToStage) {
ExecutionEnvironment executionEnvironment =
FlinkExecutionEnvironments.createBatchExecutionEnvironment(
pipelineOptions.as(FlinkPipelineOptions.class), filesToStage);
return new BatchTranslationContext(jobInfo, executionEnvironment);
FlinkExecutionEnvironments.createBatchExecutionEnvironment(pipelineOptions, filesToStage);
return new BatchTranslationContext(jobInfo, pipelineOptions, executionEnvironment);
}

/** Creates a batch translator. */
Expand Down Expand Up @@ -160,12 +151,15 @@ public static class BatchTranslationContext
implements FlinkPortablePipelineTranslator.TranslationContext {

private final JobInfo jobInfo;
private final FlinkPipelineOptions options;
private final ExecutionEnvironment executionEnvironment;
private final Map<String, DataSet<?>> dataSets;
private final Set<String> danglingDataSets;

private BatchTranslationContext(JobInfo jobInfo, ExecutionEnvironment executionEnvironment) {
private BatchTranslationContext(
JobInfo jobInfo, FlinkPipelineOptions options, ExecutionEnvironment executionEnvironment) {
this.jobInfo = jobInfo;
this.options = options;
this.executionEnvironment = executionEnvironment;
dataSets = new HashMap<>();
danglingDataSets = new HashSet<>();
Expand All @@ -176,6 +170,11 @@ public JobInfo getJobInfo() {
return jobInfo;
}

@Override
public FlinkPipelineOptions getPipelineOptions() {
return options;
}

public ExecutionEnvironment getExecutionEnvironment() {
return executionEnvironment;
}
Expand Down Expand Up @@ -336,7 +335,10 @@ private static <InputT> void translateExecutableStage(
}
FlinkExecutableStageFunction<InputT> function =
new FlinkExecutableStageFunction<>(
stagePayload, context.getJobInfo(), outputMap, FlinkExecutableStageContext.factory());
stagePayload,
context.getJobInfo(),
outputMap,
FlinkExecutableStageContext.factory(context.getPipelineOptions()));

DataSet<WindowedValue<InputT>> inputDataSet =
context.getDataSetOrThrow(stagePayload.getInput());
Expand Down Expand Up @@ -472,18 +474,13 @@ private static <K, V> void translateGroupByKey(
Grouping<WindowedValue<KV<K, V>>> inputGrouping =
inputDataSet.groupBy(new KvKeySelector<>(inputElementCoder.getKeyCoder()));

PipelineOptions options;
try {
options = PipelineOptionsTranslation.fromProto(context.getJobInfo().pipelineOptions());
} catch (IOException e) {
throw new RuntimeException(e);
}
FlinkPartialReduceFunction<K, V, List<V>, ?> partialReduceFunction =
new FlinkPartialReduceFunction<>(
combineFn, windowingStrategy, Collections.emptyMap(), options);
combineFn, windowingStrategy, Collections.emptyMap(), context.getPipelineOptions());

FlinkReduceFunction<K, List<V>, List<V>, ?> reduceFunction =
new FlinkReduceFunction<>(combineFn, windowingStrategy, Collections.emptyMap(), options);
new FlinkReduceFunction<>(
combineFn, windowingStrategy, Collections.emptyMap(), context.getPipelineOptions());

// Partially GroupReduce the values into the intermediate format AccumT (combine)
GroupCombineOperator<WindowedValue<KV<K, V>>, WindowedValue<KV<K, List<V>>>> groupCombine =
Expand Down
Expand Up @@ -108,15 +108,17 @@ private PipelineResult runPipeline() throws Exception {
FlinkBatchPortablePipelineTranslator translator =
FlinkBatchPortablePipelineTranslator.createTranslator();
FlinkBatchPortablePipelineTranslator.BatchTranslationContext context =
FlinkBatchPortablePipelineTranslator.createTranslationContext(jobInfo, filesToStage);
FlinkBatchPortablePipelineTranslator.createTranslationContext(
jobInfo, pipelineOptions, filesToStage);
translator.translate(context, fusedPipeline);
result = context.getExecutionEnvironment().execute(pipelineOptions.getJobName());
} else {
// streaming translation
FlinkStreamingPortablePipelineTranslator translator =
new FlinkStreamingPortablePipelineTranslator();
FlinkStreamingPortablePipelineTranslator.StreamingTranslationContext context =
FlinkStreamingPortablePipelineTranslator.createTranslationContext(jobInfo, filesToStage);
FlinkStreamingPortablePipelineTranslator.createTranslationContext(
jobInfo, pipelineOptions, filesToStage);
translator.translate(context, fusedPipeline);
result = context.getExecutionEnvironment().execute(pipelineOptions.getJobName());
}
Expand Down
Expand Up @@ -27,6 +27,7 @@
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.vendor.protobuf.v3.com.google.protobuf.Struct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -36,16 +37,19 @@ public class FlinkJobInvoker implements JobInvoker {
private static final Logger LOG = LoggerFactory.getLogger(FlinkJobInvoker.class);

public static FlinkJobInvoker create(
ListeningExecutorService executorService, String flinkMasterUrl) {
return new FlinkJobInvoker(executorService, flinkMasterUrl);
ListeningExecutorService executorService,
FlinkJobServerDriver.ServerConfiguration serverConfig) {
return new FlinkJobInvoker(executorService, serverConfig);
}

private final ListeningExecutorService executorService;
private final String flinkMasterUrl;
private final FlinkJobServerDriver.ServerConfiguration serverConfig;

private FlinkJobInvoker(ListeningExecutorService executorService, String flinkMasterUrl) {
private FlinkJobInvoker(
ListeningExecutorService executorService,
FlinkJobServerDriver.ServerConfiguration serverConfig) {
this.executorService = executorService;
this.flinkMasterUrl = flinkMasterUrl;
this.serverConfig = serverConfig;
}

@Override
Expand All @@ -61,7 +65,14 @@ public JobInvocation invoke(
String.format("%s_%s", flinkOptions.getJobName(), UUID.randomUUID().toString());
LOG.info("Invoking job {}", invocationId);

flinkOptions.setFlinkMaster(flinkMasterUrl);
if (FlinkPipelineOptions.AUTO.equals(flinkOptions.getFlinkMaster())) {
flinkOptions.setFlinkMaster(serverConfig.getFlinkMasterUrl());
}

PortablePipelineOptions portableOptions = flinkOptions.as(PortablePipelineOptions.class);
if (portableOptions.getSdkWorkerParallelism() == null) {
portableOptions.setSdkWorkerParallelism(serverConfig.getSdkWorkerParallelism());
}

flinkOptions.setRunner(null);

Expand Down
Expand Up @@ -22,6 +22,7 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.apache.beam.model.pipeline.v1.Endpoints;
Expand All @@ -32,6 +33,7 @@
import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.kohsuke.args4j.CmdLineException;
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
Expand Down Expand Up @@ -62,7 +64,8 @@ public static class ServerConfiguration {
private int artifactPort = 8098;

@Option(name = "--artifacts-dir", usage = "The location to store staged artifact files")
private String artifactStagingPath = "/tmp/beam-artifact-staging";
private String artifactStagingPath =
Paths.get(System.getProperty("java.io.tmpdir"), "beam-artifact-staging").toString();

@Option(
name = "--clean-artifacts-per-job",
Expand All @@ -72,6 +75,20 @@ public static class ServerConfiguration {

@Option(name = "--flink-master-url", usage = "Flink master url to submit job.")
private String flinkMasterUrl = "[auto]";

public String getFlinkMasterUrl() {
return this.flinkMasterUrl;
}

@Option(
name = "--sdk-worker-parallelism",
usage = "Default parallelism for SDK worker processes (see portable pipeline options)"
)
private String sdkWorkerParallelism = PortablePipelineOptions.SDK_WORKER_PARALLELISM_PIPELINE;

public String getSdkWorkerParallelism() {
return this.sdkWorkerParallelism;
}
}

public static void main(String[] args) throws Exception {
Expand Down Expand Up @@ -232,7 +249,7 @@ private GrpcFnServer<BeamFileSystemArtifactStagingService> createArtifactStaging
return artifactStagingService;
}

private JobInvoker createJobInvoker() throws IOException {
return FlinkJobInvoker.create(executor, configuration.flinkMasterUrl);
private JobInvoker createJobInvoker() {
return FlinkJobInvoker.create(executor, configuration);
}
}
Expand Up @@ -31,6 +31,8 @@
public interface FlinkPipelineOptions
extends PipelineOptions, ApplicationNameOptions, StreamingOptions {

String AUTO = "[auto]";

/**
* List of local files to make available to workers.
*
Expand All @@ -56,7 +58,7 @@ public interface FlinkPipelineOptions
"Address of the Flink Master where the Pipeline should be executed. Can"
+ " either be of the form \"host:port\" or one of the special values [local], "
+ "[collection] or [auto].")
@Default.String("[auto]")
@Default.String(AUTO)
String getFlinkMaster();

void setFlinkMaster(String value);
Expand Down
Expand Up @@ -34,6 +34,8 @@ public interface FlinkPortablePipelineTranslator<
/** The context used for pipeline translation. */
interface TranslationContext {
JobInfo getJobInfo();

FlinkPipelineOptions getPipelineOptions();
}

/** Translates the given pipeline. */
Expand Down
Expand Up @@ -35,7 +35,6 @@
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.core.construction.RehydratedComponents;
import org.apache.beam.runners.core.construction.RunnerPCollectionView;
import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
Expand All @@ -59,7 +58,6 @@
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.ViewFn;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.join.UnionCoder;
Expand Down Expand Up @@ -97,16 +95,9 @@ public class FlinkStreamingPortablePipelineTranslator
* {@link StreamExecutionEnvironment}.
*/
public static StreamingTranslationContext createTranslationContext(
JobInfo jobInfo, List<String> filesToStage) {
PipelineOptions pipelineOptions;
try {
pipelineOptions = PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions());
} catch (IOException e) {
throw new RuntimeException(e);
}
JobInfo jobInfo, FlinkPipelineOptions pipelineOptions, List<String> filesToStage) {
StreamExecutionEnvironment executionEnvironment =
FlinkExecutionEnvironments.createStreamExecutionEnvironment(
pipelineOptions.as(FlinkPipelineOptions.class), filesToStage);
FlinkExecutionEnvironments.createStreamExecutionEnvironment(pipelineOptions, filesToStage);
return new StreamingTranslationContext(jobInfo, pipelineOptions, executionEnvironment);
}

Expand All @@ -118,12 +109,14 @@ public static class StreamingTranslationContext
implements FlinkPortablePipelineTranslator.TranslationContext {

private final JobInfo jobInfo;
private final PipelineOptions options;
private final FlinkPipelineOptions options;
private final StreamExecutionEnvironment executionEnvironment;
private final Map<String, DataStream<?>> dataStreams;

private StreamingTranslationContext(
JobInfo jobInfo, PipelineOptions options, StreamExecutionEnvironment executionEnvironment) {
JobInfo jobInfo,
FlinkPipelineOptions options,
StreamExecutionEnvironment executionEnvironment) {
this.jobInfo = jobInfo;
this.options = options;
this.executionEnvironment = executionEnvironment;
Expand All @@ -135,7 +128,8 @@ public JobInfo getJobInfo() {
return jobInfo;
}

public PipelineOptions getPipelineOptions() {
@Override
public FlinkPipelineOptions getPipelineOptions() {
return options;
}

Expand Down Expand Up @@ -529,7 +523,7 @@ private <InputT, OutputT> void translateExecutableStage(
context.getPipelineOptions(),
stagePayload,
context.getJobInfo(),
FlinkExecutableStageContext.factory(),
FlinkExecutableStageContext.factory(context.getPipelineOptions()),
collectionIdToTupleTag);

if (transformedSideInputs.unionTagToView.isEmpty()) {
Expand Down
Expand Up @@ -34,7 +34,7 @@
class FlinkDefaultExecutableStageContext implements FlinkExecutableStageContext, AutoCloseable {
private final JobBundleFactory jobBundleFactory;

private static FlinkDefaultExecutableStageContext create(JobInfo jobInfo) throws Exception {
private static FlinkDefaultExecutableStageContext create(JobInfo jobInfo) {
JobBundleFactory jobBundleFactory =
DefaultJobBundleFactory.create(
jobInfo,
Expand Down Expand Up @@ -74,4 +74,7 @@ public FlinkExecutableStageContext get(JobInfo jobInfo) {
return actualFactory.get(jobInfo);
}
}

static final Factory MULTI_INSTANCE_FACTORY =
(jobInfo) -> FlinkDefaultExecutableStageContext.create(jobInfo);
}
Expand Up @@ -19,8 +19,10 @@

import java.io.Serializable;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.sdk.options.PortablePipelineOptions;

/** The Flink context required in order to execute {@link ExecutableStage stages}. */
public interface FlinkExecutableStageContext extends AutoCloseable {
Expand All @@ -35,8 +37,14 @@ interface Factory extends Serializable {
FlinkExecutableStageContext get(JobInfo jobInfo);
}

static Factory factory() {
return FlinkDefaultExecutableStageContext.ReferenceCountingFactory.REFERENCE_COUNTING;
static Factory factory(FlinkPipelineOptions options) {
PortablePipelineOptions portableOptions = options.as(PortablePipelineOptions.class);
if (PortablePipelineOptions.SDK_WORKER_PARALLELISM_STAGE.equals(
portableOptions.getSdkWorkerParallelism())) {
return FlinkDefaultExecutableStageContext.MULTI_INSTANCE_FACTORY;
} else {
return FlinkDefaultExecutableStageContext.ReferenceCountingFactory.REFERENCE_COUNTING;
}
}

StageBundleFactory getStageBundleFactory(ExecutableStage executableStage);
Expand Down

0 comments on commit 1fdc2f9

Please sign in to comment.