Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-5520] Flink pipeline option to run SDK harness per subtask. #6524

Merged
merged 1 commit into from Oct 2, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -38,7 +38,6 @@
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.core.construction.RehydratedComponents;
import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
Expand All @@ -64,7 +63,6 @@
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
Expand Down Expand Up @@ -116,17 +114,10 @@ public class FlinkBatchPortablePipelineTranslator
* {@link ExecutionEnvironment}.
*/
public static BatchTranslationContext createTranslationContext(
JobInfo jobInfo, List<String> filesToStage) {
PipelineOptions pipelineOptions;
try {
pipelineOptions = PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions());
} catch (IOException e) {
throw new RuntimeException(e);
}
JobInfo jobInfo, FlinkPipelineOptions pipelineOptions, List<String> filesToStage) {
ExecutionEnvironment executionEnvironment =
FlinkExecutionEnvironments.createBatchExecutionEnvironment(
pipelineOptions.as(FlinkPipelineOptions.class), filesToStage);
return new BatchTranslationContext(jobInfo, executionEnvironment);
FlinkExecutionEnvironments.createBatchExecutionEnvironment(pipelineOptions, filesToStage);
return new BatchTranslationContext(jobInfo, pipelineOptions, executionEnvironment);
}

/** Creates a batch translator. */
Expand Down Expand Up @@ -160,12 +151,15 @@ public static class BatchTranslationContext
implements FlinkPortablePipelineTranslator.TranslationContext {

private final JobInfo jobInfo;
private final FlinkPipelineOptions options;
private final ExecutionEnvironment executionEnvironment;
private final Map<String, DataSet<?>> dataSets;
private final Set<String> danglingDataSets;

private BatchTranslationContext(JobInfo jobInfo, ExecutionEnvironment executionEnvironment) {
private BatchTranslationContext(
JobInfo jobInfo, FlinkPipelineOptions options, ExecutionEnvironment executionEnvironment) {
this.jobInfo = jobInfo;
this.options = options;
this.executionEnvironment = executionEnvironment;
dataSets = new HashMap<>();
danglingDataSets = new HashSet<>();
Expand All @@ -176,6 +170,11 @@ public JobInfo getJobInfo() {
return jobInfo;
}

@Override
public FlinkPipelineOptions getPipelineOptions() {
return options;
}

public ExecutionEnvironment getExecutionEnvironment() {
return executionEnvironment;
}
Expand Down Expand Up @@ -336,7 +335,10 @@ private static <InputT> void translateExecutableStage(
}
FlinkExecutableStageFunction<InputT> function =
new FlinkExecutableStageFunction<>(
stagePayload, context.getJobInfo(), outputMap, FlinkExecutableStageContext.factory());
stagePayload,
context.getJobInfo(),
outputMap,
FlinkExecutableStageContext.factory(context.getPipelineOptions()));

DataSet<WindowedValue<InputT>> inputDataSet =
context.getDataSetOrThrow(stagePayload.getInput());
Expand Down Expand Up @@ -472,18 +474,13 @@ private static <K, V> void translateGroupByKey(
Grouping<WindowedValue<KV<K, V>>> inputGrouping =
inputDataSet.groupBy(new KvKeySelector<>(inputElementCoder.getKeyCoder()));

PipelineOptions options;
try {
options = PipelineOptionsTranslation.fromProto(context.getJobInfo().pipelineOptions());
} catch (IOException e) {
throw new RuntimeException(e);
}
FlinkPartialReduceFunction<K, V, List<V>, ?> partialReduceFunction =
new FlinkPartialReduceFunction<>(
combineFn, windowingStrategy, Collections.emptyMap(), options);
combineFn, windowingStrategy, Collections.emptyMap(), context.getPipelineOptions());

FlinkReduceFunction<K, List<V>, List<V>, ?> reduceFunction =
new FlinkReduceFunction<>(combineFn, windowingStrategy, Collections.emptyMap(), options);
new FlinkReduceFunction<>(
combineFn, windowingStrategy, Collections.emptyMap(), context.getPipelineOptions());

// Partially GroupReduce the values into the intermediate format AccumT (combine)
GroupCombineOperator<WindowedValue<KV<K, V>>, WindowedValue<KV<K, List<V>>>> groupCombine =
Expand Down
Expand Up @@ -108,15 +108,17 @@ private PipelineResult runPipeline() throws Exception {
FlinkBatchPortablePipelineTranslator translator =
FlinkBatchPortablePipelineTranslator.createTranslator();
FlinkBatchPortablePipelineTranslator.BatchTranslationContext context =
FlinkBatchPortablePipelineTranslator.createTranslationContext(jobInfo, filesToStage);
FlinkBatchPortablePipelineTranslator.createTranslationContext(
jobInfo, pipelineOptions, filesToStage);
translator.translate(context, fusedPipeline);
result = context.getExecutionEnvironment().execute(pipelineOptions.getJobName());
} else {
// streaming translation
FlinkStreamingPortablePipelineTranslator translator =
new FlinkStreamingPortablePipelineTranslator();
FlinkStreamingPortablePipelineTranslator.StreamingTranslationContext context =
FlinkStreamingPortablePipelineTranslator.createTranslationContext(jobInfo, filesToStage);
FlinkStreamingPortablePipelineTranslator.createTranslationContext(
jobInfo, pipelineOptions, filesToStage);
translator.translate(context, fusedPipeline);
result = context.getExecutionEnvironment().execute(pipelineOptions.getJobName());
}
Expand Down
Expand Up @@ -27,6 +27,7 @@
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.vendor.protobuf.v3.com.google.protobuf.Struct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -36,16 +37,19 @@ public class FlinkJobInvoker implements JobInvoker {
private static final Logger LOG = LoggerFactory.getLogger(FlinkJobInvoker.class);

public static FlinkJobInvoker create(
ListeningExecutorService executorService, String flinkMasterUrl) {
return new FlinkJobInvoker(executorService, flinkMasterUrl);
ListeningExecutorService executorService,
FlinkJobServerDriver.ServerConfiguration serverConfig) {
return new FlinkJobInvoker(executorService, serverConfig);
}

private final ListeningExecutorService executorService;
private final String flinkMasterUrl;
private final FlinkJobServerDriver.ServerConfiguration serverConfig;

private FlinkJobInvoker(ListeningExecutorService executorService, String flinkMasterUrl) {
private FlinkJobInvoker(
ListeningExecutorService executorService,
FlinkJobServerDriver.ServerConfiguration serverConfig) {
this.executorService = executorService;
this.flinkMasterUrl = flinkMasterUrl;
this.serverConfig = serverConfig;
}

@Override
Expand All @@ -61,7 +65,14 @@ public JobInvocation invoke(
String.format("%s_%s", flinkOptions.getJobName(), UUID.randomUUID().toString());
LOG.info("Invoking job {}", invocationId);

flinkOptions.setFlinkMaster(flinkMasterUrl);
if (FlinkPipelineOptions.AUTO.equals(flinkOptions.getFlinkMaster())) {
flinkOptions.setFlinkMaster(serverConfig.getFlinkMasterUrl());
}

PortablePipelineOptions portableOptions = flinkOptions.as(PortablePipelineOptions.class);
if (portableOptions.getSdkWorkerParallelism() == null) {
portableOptions.setSdkWorkerParallelism(serverConfig.getSdkWorkerParallelism());
}

flinkOptions.setRunner(null);

Expand Down
Expand Up @@ -22,6 +22,7 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.apache.beam.model.pipeline.v1.Endpoints;
Expand All @@ -32,6 +33,7 @@
import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.kohsuke.args4j.CmdLineException;
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
Expand Down Expand Up @@ -62,7 +64,8 @@ public static class ServerConfiguration {
private int artifactPort = 8098;

@Option(name = "--artifacts-dir", usage = "The location to store staged artifact files")
private String artifactStagingPath = "/tmp/beam-artifact-staging";
private String artifactStagingPath =
Paths.get(System.getProperty("java.io.tmpdir"), "beam-artifact-staging").toString();

@Option(
name = "--clean-artifacts-per-job",
Expand All @@ -72,6 +75,20 @@ public static class ServerConfiguration {

@Option(name = "--flink-master-url", usage = "Flink master url to submit job.")
private String flinkMasterUrl = "[auto]";

public String getFlinkMasterUrl() {
return this.flinkMasterUrl;
}

@Option(
name = "--sdk-worker-parallelism",
usage = "Default parallelism for SDK worker processes (see portable pipeline options)"
)
private String sdkWorkerParallelism = PortablePipelineOptions.SDK_WORKER_PARALLELISM_PIPELINE;

public String getSdkWorkerParallelism() {
return this.sdkWorkerParallelism;
}
}

public static void main(String[] args) throws Exception {
Expand Down Expand Up @@ -232,7 +249,7 @@ private GrpcFnServer<BeamFileSystemArtifactStagingService> createArtifactStaging
return artifactStagingService;
}

private JobInvoker createJobInvoker() throws IOException {
return FlinkJobInvoker.create(executor, configuration.flinkMasterUrl);
private JobInvoker createJobInvoker() {
return FlinkJobInvoker.create(executor, configuration);
}
}
Expand Up @@ -31,6 +31,8 @@
public interface FlinkPipelineOptions
extends PipelineOptions, ApplicationNameOptions, StreamingOptions {

String AUTO = "[auto]";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also add the other options here? Or make this an enum?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did not check if/how enum is supported, but this constant is now only used for the master URL. I think it might be good to get rid of the [auto] value for that option as well and just use null instead. As separate change though.


/**
* List of local files to make available to workers.
*
Expand All @@ -56,7 +58,7 @@ public interface FlinkPipelineOptions
"Address of the Flink Master where the Pipeline should be executed. Can"
+ " either be of the form \"host:port\" or one of the special values [local], "
+ "[collection] or [auto].")
@Default.String("[auto]")
@Default.String(AUTO)
String getFlinkMaster();

void setFlinkMaster(String value);
Expand Down
Expand Up @@ -34,6 +34,8 @@ public interface FlinkPortablePipelineTranslator<
/** The context used for pipeline translation. */
interface TranslationContext {
JobInfo getJobInfo();

FlinkPipelineOptions getPipelineOptions();
}

/** Translates the given pipeline. */
Expand Down
Expand Up @@ -35,7 +35,6 @@
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.core.construction.RehydratedComponents;
import org.apache.beam.runners.core.construction.RunnerPCollectionView;
import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
Expand All @@ -59,7 +58,6 @@
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.ViewFn;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.join.UnionCoder;
Expand Down Expand Up @@ -97,16 +95,9 @@ public class FlinkStreamingPortablePipelineTranslator
* {@link StreamExecutionEnvironment}.
*/
public static StreamingTranslationContext createTranslationContext(
JobInfo jobInfo, List<String> filesToStage) {
PipelineOptions pipelineOptions;
try {
pipelineOptions = PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions());
} catch (IOException e) {
throw new RuntimeException(e);
}
JobInfo jobInfo, FlinkPipelineOptions pipelineOptions, List<String> filesToStage) {
StreamExecutionEnvironment executionEnvironment =
FlinkExecutionEnvironments.createStreamExecutionEnvironment(
pipelineOptions.as(FlinkPipelineOptions.class), filesToStage);
FlinkExecutionEnvironments.createStreamExecutionEnvironment(pipelineOptions, filesToStage);
return new StreamingTranslationContext(jobInfo, pipelineOptions, executionEnvironment);
}

Expand All @@ -118,12 +109,14 @@ public static class StreamingTranslationContext
implements FlinkPortablePipelineTranslator.TranslationContext {

private final JobInfo jobInfo;
private final PipelineOptions options;
private final FlinkPipelineOptions options;
private final StreamExecutionEnvironment executionEnvironment;
private final Map<String, DataStream<?>> dataStreams;

private StreamingTranslationContext(
JobInfo jobInfo, PipelineOptions options, StreamExecutionEnvironment executionEnvironment) {
JobInfo jobInfo,
FlinkPipelineOptions options,
StreamExecutionEnvironment executionEnvironment) {
this.jobInfo = jobInfo;
this.options = options;
this.executionEnvironment = executionEnvironment;
Expand All @@ -135,7 +128,8 @@ public JobInfo getJobInfo() {
return jobInfo;
}

public PipelineOptions getPipelineOptions() {
@Override
public FlinkPipelineOptions getPipelineOptions() {
return options;
}

Expand Down Expand Up @@ -529,7 +523,7 @@ private <InputT, OutputT> void translateExecutableStage(
context.getPipelineOptions(),
stagePayload,
context.getJobInfo(),
FlinkExecutableStageContext.factory(),
FlinkExecutableStageContext.factory(context.getPipelineOptions()),
collectionIdToTupleTag);

if (transformedSideInputs.unionTagToView.isEmpty()) {
Expand Down
Expand Up @@ -34,7 +34,7 @@
class FlinkDefaultExecutableStageContext implements FlinkExecutableStageContext, AutoCloseable {
private final JobBundleFactory jobBundleFactory;

private static FlinkDefaultExecutableStageContext create(JobInfo jobInfo) throws Exception {
private static FlinkDefaultExecutableStageContext create(JobInfo jobInfo) {
JobBundleFactory jobBundleFactory =
DefaultJobBundleFactory.create(
jobInfo,
Expand Down Expand Up @@ -74,4 +74,7 @@ public FlinkExecutableStageContext get(JobInfo jobInfo) {
return actualFactory.get(jobInfo);
}
}

static final Factory MULTI_INSTANCE_FACTORY =
(jobInfo) -> FlinkDefaultExecutableStageContext.create(jobInfo);
}
Expand Up @@ -19,8 +19,10 @@

import java.io.Serializable;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.sdk.options.PortablePipelineOptions;

/** The Flink context required in order to execute {@link ExecutableStage stages}. */
public interface FlinkExecutableStageContext extends AutoCloseable {
Expand All @@ -35,8 +37,14 @@ interface Factory extends Serializable {
FlinkExecutableStageContext get(JobInfo jobInfo);
}

static Factory factory() {
return FlinkDefaultExecutableStageContext.ReferenceCountingFactory.REFERENCE_COUNTING;
static Factory factory(FlinkPipelineOptions options) {
PortablePipelineOptions portableOptions = options.as(PortablePipelineOptions.class);
if (PortablePipelineOptions.SDK_WORKER_PARALLELISM_STAGE.equals(
portableOptions.getSdkWorkerParallelism())) {
return FlinkDefaultExecutableStageContext.MULTI_INSTANCE_FACTORY;
} else {
return FlinkDefaultExecutableStageContext.ReferenceCountingFactory.REFERENCE_COUNTING;
}
}

StageBundleFactory getStageBundleFactory(ExecutableStage executableStage);
Expand Down