From 1fdc2f953f7a5c1875964bb557cab7b7e0878091 Mon Sep 17 00:00:00 2001 From: Thomas Weise Date: Fri, 28 Sep 2018 06:57:21 -0700 Subject: [PATCH] [BEAM-5520] Portable pipeline option to control SDK harness process parallelism. --- .../FlinkBatchPortablePipelineTranslator.java | 41 +++++++++---------- .../runners/flink/FlinkJobInvocation.java | 6 ++- .../beam/runners/flink/FlinkJobInvoker.java | 23 ++++++++--- .../runners/flink/FlinkJobServerDriver.java | 23 +++++++++-- .../runners/flink/FlinkPipelineOptions.java | 4 +- .../FlinkPortablePipelineTranslator.java | 2 + ...nkStreamingPortablePipelineTranslator.java | 24 ++++------- .../FlinkDefaultExecutableStageContext.java | 5 ++- .../FlinkExecutableStageContext.java | 12 +++++- .../FlinkExecutableStageFunction.java | 16 +++++++- .../ExecutableStageDoFnOperatorTest.java | 6 ++- .../control/DefaultJobBundleFactory.java | 22 +++++----- .../fnexecution/state/GrpcStateService.java | 9 ++++ .../sdk/options/PortablePipelineOptions.java | 17 +++++++- 14 files changed, 142 insertions(+), 68 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java index 196534041e980..bb3a89031002f 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java @@ -38,7 +38,6 @@ import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId; import org.apache.beam.runners.core.construction.PTransformTranslation; -import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; import org.apache.beam.runners.core.construction.RehydratedComponents; import org.apache.beam.runners.core.construction.WindowingStrategyTranslation; import org.apache.beam.runners.core.construction.graph.ExecutableStage; @@ -64,7 +63,6 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.coders.VoidCoder; -import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.join.RawUnionValue; @@ -116,17 +114,10 @@ public class FlinkBatchPortablePipelineTranslator * {@link ExecutionEnvironment}. */ public static BatchTranslationContext createTranslationContext( - JobInfo jobInfo, List filesToStage) { - PipelineOptions pipelineOptions; - try { - pipelineOptions = PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions()); - } catch (IOException e) { - throw new RuntimeException(e); - } + JobInfo jobInfo, FlinkPipelineOptions pipelineOptions, List filesToStage) { ExecutionEnvironment executionEnvironment = - FlinkExecutionEnvironments.createBatchExecutionEnvironment( - pipelineOptions.as(FlinkPipelineOptions.class), filesToStage); - return new BatchTranslationContext(jobInfo, executionEnvironment); + FlinkExecutionEnvironments.createBatchExecutionEnvironment(pipelineOptions, filesToStage); + return new BatchTranslationContext(jobInfo, pipelineOptions, executionEnvironment); } /** Creates a batch translator. */ @@ -160,12 +151,15 @@ public static class BatchTranslationContext implements FlinkPortablePipelineTranslator.TranslationContext { private final JobInfo jobInfo; + private final FlinkPipelineOptions options; private final ExecutionEnvironment executionEnvironment; private final Map> dataSets; private final Set danglingDataSets; - private BatchTranslationContext(JobInfo jobInfo, ExecutionEnvironment executionEnvironment) { + private BatchTranslationContext( + JobInfo jobInfo, FlinkPipelineOptions options, ExecutionEnvironment executionEnvironment) { this.jobInfo = jobInfo; + this.options = options; this.executionEnvironment = executionEnvironment; dataSets = new HashMap<>(); danglingDataSets = new HashSet<>(); @@ -176,6 +170,11 @@ public JobInfo getJobInfo() { return jobInfo; } + @Override + public FlinkPipelineOptions getPipelineOptions() { + return options; + } + public ExecutionEnvironment getExecutionEnvironment() { return executionEnvironment; } @@ -336,7 +335,10 @@ private static void translateExecutableStage( } FlinkExecutableStageFunction function = new FlinkExecutableStageFunction<>( - stagePayload, context.getJobInfo(), outputMap, FlinkExecutableStageContext.factory()); + stagePayload, + context.getJobInfo(), + outputMap, + FlinkExecutableStageContext.factory(context.getPipelineOptions())); DataSet> inputDataSet = context.getDataSetOrThrow(stagePayload.getInput()); @@ -472,18 +474,13 @@ private static void translateGroupByKey( Grouping>> inputGrouping = inputDataSet.groupBy(new KvKeySelector<>(inputElementCoder.getKeyCoder())); - PipelineOptions options; - try { - options = PipelineOptionsTranslation.fromProto(context.getJobInfo().pipelineOptions()); - } catch (IOException e) { - throw new RuntimeException(e); - } FlinkPartialReduceFunction, ?> partialReduceFunction = new FlinkPartialReduceFunction<>( - combineFn, windowingStrategy, Collections.emptyMap(), options); + combineFn, windowingStrategy, Collections.emptyMap(), context.getPipelineOptions()); FlinkReduceFunction, List, ?> reduceFunction = - new FlinkReduceFunction<>(combineFn, windowingStrategy, Collections.emptyMap(), options); + new FlinkReduceFunction<>( + combineFn, windowingStrategy, Collections.emptyMap(), context.getPipelineOptions()); // Partially GroupReduce the values into the intermediate format AccumT (combine) GroupCombineOperator>, WindowedValue>>> groupCombine = diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java index 8855c3d1be3ca..44c52f1500c2a 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java @@ -108,7 +108,8 @@ private PipelineResult runPipeline() throws Exception { FlinkBatchPortablePipelineTranslator translator = FlinkBatchPortablePipelineTranslator.createTranslator(); FlinkBatchPortablePipelineTranslator.BatchTranslationContext context = - FlinkBatchPortablePipelineTranslator.createTranslationContext(jobInfo, filesToStage); + FlinkBatchPortablePipelineTranslator.createTranslationContext( + jobInfo, pipelineOptions, filesToStage); translator.translate(context, fusedPipeline); result = context.getExecutionEnvironment().execute(pipelineOptions.getJobName()); } else { @@ -116,7 +117,8 @@ private PipelineResult runPipeline() throws Exception { FlinkStreamingPortablePipelineTranslator translator = new FlinkStreamingPortablePipelineTranslator(); FlinkStreamingPortablePipelineTranslator.StreamingTranslationContext context = - FlinkStreamingPortablePipelineTranslator.createTranslationContext(jobInfo, filesToStage); + FlinkStreamingPortablePipelineTranslator.createTranslationContext( + jobInfo, pipelineOptions, filesToStage); translator.translate(context, fusedPipeline); result = context.getExecutionEnvironment().execute(pipelineOptions.getJobName()); } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java index d004a67588f2b..76867bce2c66f 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java @@ -27,6 +27,7 @@ import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation; import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker; +import org.apache.beam.sdk.options.PortablePipelineOptions; import org.apache.beam.vendor.protobuf.v3.com.google.protobuf.Struct; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,16 +37,19 @@ public class FlinkJobInvoker implements JobInvoker { private static final Logger LOG = LoggerFactory.getLogger(FlinkJobInvoker.class); public static FlinkJobInvoker create( - ListeningExecutorService executorService, String flinkMasterUrl) { - return new FlinkJobInvoker(executorService, flinkMasterUrl); + ListeningExecutorService executorService, + FlinkJobServerDriver.ServerConfiguration serverConfig) { + return new FlinkJobInvoker(executorService, serverConfig); } private final ListeningExecutorService executorService; - private final String flinkMasterUrl; + private final FlinkJobServerDriver.ServerConfiguration serverConfig; - private FlinkJobInvoker(ListeningExecutorService executorService, String flinkMasterUrl) { + private FlinkJobInvoker( + ListeningExecutorService executorService, + FlinkJobServerDriver.ServerConfiguration serverConfig) { this.executorService = executorService; - this.flinkMasterUrl = flinkMasterUrl; + this.serverConfig = serverConfig; } @Override @@ -61,7 +65,14 @@ public JobInvocation invoke( String.format("%s_%s", flinkOptions.getJobName(), UUID.randomUUID().toString()); LOG.info("Invoking job {}", invocationId); - flinkOptions.setFlinkMaster(flinkMasterUrl); + if (FlinkPipelineOptions.AUTO.equals(flinkOptions.getFlinkMaster())) { + flinkOptions.setFlinkMaster(serverConfig.getFlinkMasterUrl()); + } + + PortablePipelineOptions portableOptions = flinkOptions.as(PortablePipelineOptions.class); + if (portableOptions.getSdkWorkerParallelism() == null) { + portableOptions.setSdkWorkerParallelism(serverConfig.getSdkWorkerParallelism()); + } flinkOptions.setRunner(null); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java index 0f567f6c6d579..5d82587dc72b5 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java @@ -22,6 +22,7 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.IOException; +import java.nio.file.Paths; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import org.apache.beam.model.pipeline.v1.Endpoints; @@ -32,6 +33,7 @@ import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.PortablePipelineOptions; import org.kohsuke.args4j.CmdLineException; import org.kohsuke.args4j.CmdLineParser; import org.kohsuke.args4j.Option; @@ -62,7 +64,8 @@ public static class ServerConfiguration { private int artifactPort = 8098; @Option(name = "--artifacts-dir", usage = "The location to store staged artifact files") - private String artifactStagingPath = "/tmp/beam-artifact-staging"; + private String artifactStagingPath = + Paths.get(System.getProperty("java.io.tmpdir"), "beam-artifact-staging").toString(); @Option( name = "--clean-artifacts-per-job", @@ -72,6 +75,20 @@ public static class ServerConfiguration { @Option(name = "--flink-master-url", usage = "Flink master url to submit job.") private String flinkMasterUrl = "[auto]"; + + public String getFlinkMasterUrl() { + return this.flinkMasterUrl; + } + + @Option( + name = "--sdk-worker-parallelism", + usage = "Default parallelism for SDK worker processes (see portable pipeline options)" + ) + private String sdkWorkerParallelism = PortablePipelineOptions.SDK_WORKER_PARALLELISM_PIPELINE; + + public String getSdkWorkerParallelism() { + return this.sdkWorkerParallelism; + } } public static void main(String[] args) throws Exception { @@ -232,7 +249,7 @@ private GrpcFnServer createArtifactStaging return artifactStagingService; } - private JobInvoker createJobInvoker() throws IOException { - return FlinkJobInvoker.create(executor, configuration.flinkMasterUrl); + private JobInvoker createJobInvoker() { + return FlinkJobInvoker.create(executor, configuration); } } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java index 81b0e41bb5ce1..9c985a521c296 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java @@ -31,6 +31,8 @@ public interface FlinkPipelineOptions extends PipelineOptions, ApplicationNameOptions, StreamingOptions { + String AUTO = "[auto]"; + /** * List of local files to make available to workers. * @@ -56,7 +58,7 @@ public interface FlinkPipelineOptions "Address of the Flink Master where the Pipeline should be executed. Can" + " either be of the form \"host:port\" or one of the special values [local], " + "[collection] or [auto].") - @Default.String("[auto]") + @Default.String(AUTO) String getFlinkMaster(); void setFlinkMaster(String value); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortablePipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortablePipelineTranslator.java index 45ee3cca68aa4..edbc6d31251ef 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortablePipelineTranslator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortablePipelineTranslator.java @@ -34,6 +34,8 @@ public interface FlinkPortablePipelineTranslator< /** The context used for pipeline translation. */ interface TranslationContext { JobInfo getJobInfo(); + + FlinkPipelineOptions getPipelineOptions(); } /** Translates the given pipeline. */ diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java index 3ce901c678c08..3f4f7c8e34c6c 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java @@ -35,7 +35,6 @@ import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.runners.core.construction.PTransformTranslation; -import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; import org.apache.beam.runners.core.construction.RehydratedComponents; import org.apache.beam.runners.core.construction.RunnerPCollectionView; import org.apache.beam.runners.core.construction.WindowingStrategyTranslation; @@ -59,7 +58,6 @@ import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.VoidCoder; -import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.ViewFn; import org.apache.beam.sdk.transforms.join.RawUnionValue; import org.apache.beam.sdk.transforms.join.UnionCoder; @@ -97,16 +95,9 @@ public class FlinkStreamingPortablePipelineTranslator * {@link StreamExecutionEnvironment}. */ public static StreamingTranslationContext createTranslationContext( - JobInfo jobInfo, List filesToStage) { - PipelineOptions pipelineOptions; - try { - pipelineOptions = PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions()); - } catch (IOException e) { - throw new RuntimeException(e); - } + JobInfo jobInfo, FlinkPipelineOptions pipelineOptions, List filesToStage) { StreamExecutionEnvironment executionEnvironment = - FlinkExecutionEnvironments.createStreamExecutionEnvironment( - pipelineOptions.as(FlinkPipelineOptions.class), filesToStage); + FlinkExecutionEnvironments.createStreamExecutionEnvironment(pipelineOptions, filesToStage); return new StreamingTranslationContext(jobInfo, pipelineOptions, executionEnvironment); } @@ -118,12 +109,14 @@ public static class StreamingTranslationContext implements FlinkPortablePipelineTranslator.TranslationContext { private final JobInfo jobInfo; - private final PipelineOptions options; + private final FlinkPipelineOptions options; private final StreamExecutionEnvironment executionEnvironment; private final Map> dataStreams; private StreamingTranslationContext( - JobInfo jobInfo, PipelineOptions options, StreamExecutionEnvironment executionEnvironment) { + JobInfo jobInfo, + FlinkPipelineOptions options, + StreamExecutionEnvironment executionEnvironment) { this.jobInfo = jobInfo; this.options = options; this.executionEnvironment = executionEnvironment; @@ -135,7 +128,8 @@ public JobInfo getJobInfo() { return jobInfo; } - public PipelineOptions getPipelineOptions() { + @Override + public FlinkPipelineOptions getPipelineOptions() { return options; } @@ -529,7 +523,7 @@ private void translateExecutableStage( context.getPipelineOptions(), stagePayload, context.getJobInfo(), - FlinkExecutableStageContext.factory(), + FlinkExecutableStageContext.factory(context.getPipelineOptions()), collectionIdToTupleTag); if (transformedSideInputs.unionTagToView.isEmpty()) { diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContext.java index 015af26c2e16c..aac8cf91ed86c 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContext.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContext.java @@ -34,7 +34,7 @@ class FlinkDefaultExecutableStageContext implements FlinkExecutableStageContext, AutoCloseable { private final JobBundleFactory jobBundleFactory; - private static FlinkDefaultExecutableStageContext create(JobInfo jobInfo) throws Exception { + private static FlinkDefaultExecutableStageContext create(JobInfo jobInfo) { JobBundleFactory jobBundleFactory = DefaultJobBundleFactory.create( jobInfo, @@ -74,4 +74,7 @@ public FlinkExecutableStageContext get(JobInfo jobInfo) { return actualFactory.get(jobInfo); } } + + static final Factory MULTI_INSTANCE_FACTORY = + (jobInfo) -> FlinkDefaultExecutableStageContext.create(jobInfo); } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageContext.java index 72586c018d52d..4981430288ab5 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageContext.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageContext.java @@ -19,8 +19,10 @@ import java.io.Serializable; import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.flink.FlinkPipelineOptions; import org.apache.beam.runners.fnexecution.control.StageBundleFactory; import org.apache.beam.runners.fnexecution.provisioning.JobInfo; +import org.apache.beam.sdk.options.PortablePipelineOptions; /** The Flink context required in order to execute {@link ExecutableStage stages}. */ public interface FlinkExecutableStageContext extends AutoCloseable { @@ -35,8 +37,14 @@ interface Factory extends Serializable { FlinkExecutableStageContext get(JobInfo jobInfo); } - static Factory factory() { - return FlinkDefaultExecutableStageContext.ReferenceCountingFactory.REFERENCE_COUNTING; + static Factory factory(FlinkPipelineOptions options) { + PortablePipelineOptions portableOptions = options.as(PortablePipelineOptions.class); + if (PortablePipelineOptions.SDK_WORKER_PARALLELISM_STAGE.equals( + portableOptions.getSdkWorkerParallelism())) { + return FlinkDefaultExecutableStageContext.MULTI_INSTANCE_FACTORY; + } else { + return FlinkDefaultExecutableStageContext.ReferenceCountingFactory.REFERENCE_COUNTING; + } } StageBundleFactory getStageBundleFactory(ExecutableStage executableStage); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java index 582c6d5622af1..09c45fa719ae3 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java @@ -43,6 +43,8 @@ import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.configuration.Configuration; import org.apache.flink.util.Collector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Flink operator that passes its input DataSet through an SDK-executed {@link @@ -54,6 +56,7 @@ */ public class FlinkExecutableStageFunction extends RichMapPartitionFunction, RawUnionValue> { + private static final Logger LOG = LoggerFactory.getLogger(FlinkExecutableStageFunction.class); // Main constructor fields. All must be Serializable because Flink distributes Functions to // task managers via java serialization. @@ -141,8 +144,17 @@ public void mapPartition( @Override public void close() throws Exception { - try (AutoCloseable bundleFactoryCloser = stageBundleFactory) {} - try (AutoCloseable closable = stageContext) {} + // close may be called multiple times when an exception is thrown + if (stageContext != null) { + try (@SuppressWarnings("unused") + AutoCloseable bundleFactoryCloser = stageBundleFactory; + @SuppressWarnings("unused") + AutoCloseable closable = stageContext) { + } catch (Exception e) { + LOG.error("Error in close: ", e); + throw e; + } + } stageContext = null; } diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java index 2ba0902cbaa03..fc71d0c0c0d6c 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java @@ -318,6 +318,8 @@ public void testSerialization() { new DoFnOperator.MultiOutputOutputManagerFactory( mainOutput, tagsToOutputTags, tagsToCoders, tagsToIds); + FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class); + ExecutableStageDoFnOperator operator = new ExecutableStageDoFnOperator<>( "transform", @@ -330,10 +332,10 @@ public void testSerialization() { Collections.emptyMap() /* sideInputTagMapping */, Collections.emptyList() /* sideInputs */, Collections.emptyMap() /* sideInputId mapping */, - PipelineOptionsFactory.as(FlinkPipelineOptions.class), + options, stagePayload, jobInfo, - FlinkExecutableStageContext.factory(), + FlinkExecutableStageContext.factory(options), createOutputMap(mainOutput, ImmutableList.of(additionalOutput))); ExecutableStageDoFnOperator clone = SerializationUtils.clone(operator); diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java index e24a3319fabbd..6cbff13c9e555 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java @@ -172,6 +172,7 @@ public StageBundleFactory forStage(ExecutableStage executableStage) { @Override public void close() throws Exception { // Clear the cache. This closes all active environments. + // note this may cause open calls to be cancelled by the peer environmentCache.invalidateAll(); environmentCache.cleanUp(); @@ -182,6 +183,7 @@ public void close() throws Exception { loggingServer.close(); retrievalServer.close(); provisioningServer.close(); + executor.shutdown(); } @@ -306,9 +308,9 @@ private void checkAndInitialize( return; } - EnvironmentFactory.Provider environmentProviderFactory = + EnvironmentFactory.Provider environmentFactoryProvider = environmentFactoryProviderMap.get(environment.getUrn()); - ServerFactory serverFactory = environmentProviderFactory.getServerFactory(); + ServerFactory serverFactory = environmentFactoryProvider.getServerFactory(); this.clientPool = MapControlClientPool.create(); this.executor = Executors.newCachedThreadPool(); @@ -334,15 +336,13 @@ private void checkAndInitialize( GrpcFnServer.allocatePortAndCreateFor(GrpcStateService.create(), serverFactory); this.environmentFactory = - environmentFactoryProviderMap - .get(environment.getUrn()) - .createEnvironmentFactory( - controlServer, - loggingServer, - retrievalServer, - provisioningServer, - clientPool, - stageIdGenerator); + environmentFactoryProvider.createEnvironmentFactory( + controlServer, + loggingServer, + retrievalServer, + provisioningServer, + clientPool, + stageIdGenerator); this.environment = environment; } } diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java index 38b21469d7890..633d7255a8876 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java @@ -28,6 +28,7 @@ import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse; import org.apache.beam.model.fnexecution.v1.BeamFnStateGrpc; import org.apache.beam.runners.fnexecution.FnService; +import org.apache.beam.vendor.grpc.v1.io.grpc.stub.ServerCallStreamObserver; import org.apache.beam.vendor.grpc.v1.io.grpc.stub.StreamObserver; /** An implementation of the Beam Fn State service. */ @@ -51,6 +52,14 @@ public void close() throws Exception { Exception thrown = null; for (Inbound inbound : clients) { try { + // the call may be cancelled because the sdk harness hung up + // (we terminate the environment before terminating the service endpoints) + if (inbound.outboundObserver instanceof ServerCallStreamObserver) { + if (((ServerCallStreamObserver) inbound.outboundObserver).isCancelled()) { + // skip to avoid call already closed exception + continue; + } + } inbound.outboundObserver.onCompleted(); } catch (Exception t) { if (thrown == null) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java index bdb1caf3b4d87..5107c389bb1e2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java @@ -59,7 +59,7 @@ public interface PortablePipelineOptions extends PipelineOptions { + "Possible options are DOCKER and PROCESS.") String getDefaultEnvironmentType(); - void setDefaultEnvironmentType(String envitonmentType); + void setDefaultEnvironmentType(String environmentType); @Description( "Set environment configuration for running the user code.\n" @@ -72,4 +72,19 @@ public interface PortablePipelineOptions extends PipelineOptions { String getDefaultEnvironmentConfig(); void setDefaultEnvironmentConfig(@Nullable String config); + + String SDK_WORKER_PARALLELISM_PIPELINE = "pipeline"; + String SDK_WORKER_PARALLELISM_STAGE = "stage"; + + @Description( + "SDK worker/harness process parallelism. Currently supported options are " + + " (let the runner decide) or '" + + SDK_WORKER_PARALLELISM_PIPELINE + + "' (single SDK harness process per pipeline and runner process) or '" + + SDK_WORKER_PARALLELISM_STAGE + + "' (separate SDK harness for every executable stage).") + @Nullable + String getSdkWorkerParallelism(); + + void setSdkWorkerParallelism(@Nullable String parallelism); }