From 2e097421d39f8768ef7ecc11c349ce1d12c9200a Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Fri, 5 May 2017 14:13:01 +0200 Subject: [PATCH 1/2] Add FlinkPipelineExecutor with subclasses for batch and streaming This replaces the old FlinkPipelineExecutionEnvironment which was responsible for both batch and stream execution, which made the code more complicated. --- .../flink/FlinkBatchPipelineExecutor.java | 90 +++++++ .../FlinkPipelineExecutionEnvironment.java | 253 ------------------ .../runners/flink/FlinkPipelineExecutor.java | 36 +++ .../beam/runners/flink/FlinkRunner.java | 56 ++-- .../runners/flink/FlinkRunnerResultUtil.java | 49 ++++ .../flink/FlinkStreamingPipelineExecutor.java | 142 ++++++++++ .../flink/PipelineTranslationOptimizer.java | 72 ----- 7 files changed, 346 insertions(+), 352 deletions(-) create mode 100644 runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineExecutor.java delete mode 100644 runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java create mode 100644 runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutor.java create mode 100644 runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResultUtil.java create mode 100644 runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineExecutor.java delete mode 100644 runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineExecutor.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineExecutor.java new file mode 100644 index 000000000000..a0402445a2f6 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineExecutor.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import java.util.List; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.java.CollectionEnvironment; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link FlinkPipelineExecutor} that executes a {@link Pipeline} using a Flink + * {@link ExecutionEnvironment}. + */ +class FlinkBatchPipelineExecutor implements FlinkPipelineExecutor { + + private static final Logger LOG = + LoggerFactory.getLogger(FlinkBatchPipelineExecutor.class); + + @Override + public PipelineResult executePipeline( + FlinkRunner runner, Pipeline pipeline, FlinkPipelineOptions options) throws Exception{ + + ExecutionEnvironment env = createBatchExecutionEnvironment(options); + FlinkBatchPipelineTranslator translator = new FlinkBatchPipelineTranslator(env, options); + translator.translate(pipeline); + + JobExecutionResult result = env.execute(options.getJobName()); + + return FlinkRunnerResultUtil.wrapFlinkRunnerResult(LOG, result); + } + + private ExecutionEnvironment createBatchExecutionEnvironment(FlinkPipelineOptions options) { + + String masterUrl = options.getFlinkMaster(); + ExecutionEnvironment flinkBatchEnv; + + // depending on the master, create the right environment. + if (masterUrl.equals("[local]")) { + flinkBatchEnv = ExecutionEnvironment.createLocalEnvironment(); + } else if (masterUrl.equals("[collection]")) { + flinkBatchEnv = new CollectionEnvironment(); + } else if (masterUrl.equals("[auto]")) { + flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment(); + } else if (masterUrl.matches(".*:\\d*")) { + String[] parts = masterUrl.split(":"); + List stagingFiles = options.getFilesToStage(); + flinkBatchEnv = ExecutionEnvironment.createRemoteEnvironment(parts[0], + Integer.parseInt(parts[1]), + stagingFiles.toArray(new String[stagingFiles.size()])); + } else { + LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl); + flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment(); + } + + // set the correct parallelism. + if (options.getParallelism() != -1 && !(flinkBatchEnv instanceof CollectionEnvironment)) { + flinkBatchEnv.setParallelism(options.getParallelism()); + } + + // set parallelism in the options (required by some execution code) + options.setParallelism(flinkBatchEnv.getParallelism()); + + if (options.getObjectReuse()) { + flinkBatchEnv.getConfig().enableObjectReuse(); + } else { + flinkBatchEnv.getConfig().disableObjectReuse(); + } + + return flinkBatchEnv; + } +} diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java deleted file mode 100644 index d2a2016c98a0..000000000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java +++ /dev/null @@ -1,253 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import static com.google.common.base.Preconditions.checkNotNull; - -import java.util.List; -import org.apache.beam.sdk.Pipeline; -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.java.CollectionEnvironment; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.runtime.state.AbstractStateBackend; -import org.apache.flink.streaming.api.TimeCharacteristic; -import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * The class that instantiates and manages the execution of a given job. - * Depending on if the job is a Streaming or Batch processing one, it creates - * the adequate execution environment ({@link ExecutionEnvironment} - * or {@link StreamExecutionEnvironment}), the necessary {@link FlinkPipelineTranslator} - * ({@link FlinkBatchPipelineTranslator} or {@link FlinkStreamingPipelineTranslator}) to - * transform the Beam job into a Flink one, and executes the (translated) job. - */ -class FlinkPipelineExecutionEnvironment { - - private static final Logger LOG = - LoggerFactory.getLogger(FlinkPipelineExecutionEnvironment.class); - - private final FlinkPipelineOptions options; - - /** - * The Flink Batch execution environment. This is instantiated to either a - * {@link org.apache.flink.api.java.CollectionEnvironment}, - * a {@link org.apache.flink.api.java.LocalEnvironment} or - * a {@link org.apache.flink.api.java.RemoteEnvironment}, depending on the configuration - * options. - */ - private ExecutionEnvironment flinkBatchEnv; - - /** - * The Flink Streaming execution environment. This is instantiated to either a - * {@link org.apache.flink.streaming.api.environment.LocalStreamEnvironment} or - * a {@link org.apache.flink.streaming.api.environment.RemoteStreamEnvironment}, depending - * on the configuration options, and more specifically, the url of the master. - */ - private StreamExecutionEnvironment flinkStreamEnv; - - /** - * Creates a {@link FlinkPipelineExecutionEnvironment} with the user-specified parameters in the - * provided {@link FlinkPipelineOptions}. - * - * @param options the user-defined pipeline options. - * */ - FlinkPipelineExecutionEnvironment(FlinkPipelineOptions options) { - this.options = checkNotNull(options); - } - - /** - * Depending on if the job is a Streaming or a Batch one, this method creates - * the necessary execution environment and pipeline translator, and translates - * the {@link org.apache.beam.sdk.values.PCollection} program into - * a {@link org.apache.flink.api.java.DataSet} - * or {@link org.apache.flink.streaming.api.datastream.DataStream} one. - * */ - public void translate(FlinkRunner flinkRunner, Pipeline pipeline) { - this.flinkBatchEnv = null; - this.flinkStreamEnv = null; - - pipeline.replaceAll(FlinkTransformOverrides.getDefaultOverrides(options.isStreaming())); - - PipelineTranslationOptimizer optimizer = - new PipelineTranslationOptimizer(TranslationMode.BATCH, options); - - optimizer.translate(pipeline); - TranslationMode translationMode = optimizer.getTranslationMode(); - - FlinkPipelineTranslator translator; - if (translationMode == TranslationMode.STREAMING) { - this.flinkStreamEnv = createStreamExecutionEnvironment(); - translator = new FlinkStreamingPipelineTranslator(flinkRunner, flinkStreamEnv, options); - } else { - this.flinkBatchEnv = createBatchExecutionEnvironment(); - translator = new FlinkBatchPipelineTranslator(flinkBatchEnv, options); - } - - translator.translate(pipeline); - } - - /** - * Launches the program execution. - * */ - public JobExecutionResult executePipeline() throws Exception { - final String jobName = options.getJobName(); - - if (flinkBatchEnv != null) { - return flinkBatchEnv.execute(jobName); - } else if (flinkStreamEnv != null) { - return flinkStreamEnv.execute(jobName); - } else { - throw new IllegalStateException("The Pipeline has not yet been translated."); - } - } - - /** - * If the submitted job is a batch processing job, this method creates the adequate - * Flink {@link org.apache.flink.api.java.ExecutionEnvironment} depending - * on the user-specified options. - */ - private ExecutionEnvironment createBatchExecutionEnvironment() { - - LOG.info("Creating the required Batch Execution Environment."); - - String masterUrl = options.getFlinkMaster(); - ExecutionEnvironment flinkBatchEnv; - - // depending on the master, create the right environment. - if (masterUrl.equals("[local]")) { - flinkBatchEnv = ExecutionEnvironment.createLocalEnvironment(); - } else if (masterUrl.equals("[collection]")) { - flinkBatchEnv = new CollectionEnvironment(); - } else if (masterUrl.equals("[auto]")) { - flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment(); - } else if (masterUrl.matches(".*:\\d*")) { - String[] parts = masterUrl.split(":"); - List stagingFiles = options.getFilesToStage(); - flinkBatchEnv = ExecutionEnvironment.createRemoteEnvironment(parts[0], - Integer.parseInt(parts[1]), - stagingFiles.toArray(new String[stagingFiles.size()])); - } else { - LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl); - flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment(); - } - - // set the correct parallelism. - if (options.getParallelism() != -1 && !(flinkBatchEnv instanceof CollectionEnvironment)) { - flinkBatchEnv.setParallelism(options.getParallelism()); - } - - // set parallelism in the options (required by some execution code) - options.setParallelism(flinkBatchEnv.getParallelism()); - - if (options.getObjectReuse()) { - flinkBatchEnv.getConfig().enableObjectReuse(); - } else { - flinkBatchEnv.getConfig().disableObjectReuse(); - } - - return flinkBatchEnv; - } - - /** - * If the submitted job is a stream processing job, this method creates the adequate - * Flink {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment} depending - * on the user-specified options. - */ - private StreamExecutionEnvironment createStreamExecutionEnvironment() { - - LOG.info("Creating the required Streaming Environment."); - - String masterUrl = options.getFlinkMaster(); - StreamExecutionEnvironment flinkStreamEnv = null; - - // depending on the master, create the right environment. - if (masterUrl.equals("[local]")) { - flinkStreamEnv = StreamExecutionEnvironment.createLocalEnvironment(); - } else if (masterUrl.equals("[auto]")) { - flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); - } else if (masterUrl.matches(".*:\\d*")) { - String[] parts = masterUrl.split(":"); - List stagingFiles = options.getFilesToStage(); - flinkStreamEnv = StreamExecutionEnvironment.createRemoteEnvironment(parts[0], - Integer.parseInt(parts[1]), stagingFiles.toArray(new String[stagingFiles.size()])); - } else { - LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl); - flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); - } - - // set the correct parallelism. - if (options.getParallelism() != -1) { - flinkStreamEnv.setParallelism(options.getParallelism()); - } - - // set parallelism in the options (required by some execution code) - options.setParallelism(flinkStreamEnv.getParallelism()); - - if (options.getObjectReuse()) { - flinkStreamEnv.getConfig().enableObjectReuse(); - } else { - flinkStreamEnv.getConfig().disableObjectReuse(); - } - - // default to event time - flinkStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - - // for the following 2 parameters, a value of -1 means that Flink will use - // the default values as specified in the configuration. - int numRetries = options.getNumberOfExecutionRetries(); - if (numRetries != -1) { - flinkStreamEnv.setNumberOfExecutionRetries(numRetries); - } - long retryDelay = options.getExecutionRetryDelay(); - if (retryDelay != -1) { - flinkStreamEnv.getConfig().setExecutionRetryDelay(retryDelay); - } - - // A value of -1 corresponds to disabled checkpointing (see CheckpointConfig in Flink). - // If the value is not -1, then the validity checks are applied. - // By default, checkpointing is disabled. - long checkpointInterval = options.getCheckpointingInterval(); - if (checkpointInterval != -1) { - if (checkpointInterval < 1) { - throw new IllegalArgumentException("The checkpoint interval must be positive"); - } - flinkStreamEnv.enableCheckpointing(checkpointInterval, options.getCheckpointingMode()); - flinkStreamEnv.getCheckpointConfig().setCheckpointTimeout( - options.getCheckpointTimeoutMillis()); - boolean externalizedCheckpoint = options.isExternalizedCheckpointsEnabled(); - boolean retainOnCancellation = options.getRetainExternalizedCheckpointsOnCancellation(); - if (externalizedCheckpoint) { - flinkStreamEnv.getCheckpointConfig().enableExternalizedCheckpoints( - retainOnCancellation ? ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION - : ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION); - } - } - - // State backend - final AbstractStateBackend stateBackend = options.getStateBackend(); - if (stateBackend != null) { - flinkStreamEnv.setStateBackend(stateBackend); - } - - return flinkStreamEnv; - } - -} diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutor.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutor.java new file mode 100644 index 000000000000..9dab014d21da --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutor.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; + +/** + * A {@link FlinkPipelineExecutor} can execute a {@link Pipeline} on Flink. + * + *

There are subclasses for batch and for streaming execution. + */ +interface FlinkPipelineExecutor { + + /** + * Executes the given pipeline. + */ + PipelineResult executePipeline( + FlinkRunner runner, Pipeline pipeline, FlinkPipelineOptions options) throws Exception; +} diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java index ca12615be03c..7e9cd2e6ce3e 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java @@ -25,7 +25,6 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; @@ -38,8 +37,8 @@ import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.View; -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.client.program.DetachedEnvironment; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -107,34 +106,21 @@ public PipelineResult run(Pipeline pipeline) { LOG.info("Executing pipeline using FlinkRunner."); - FlinkPipelineExecutionEnvironment env = new FlinkPipelineExecutionEnvironment(options); + boolean streaming = options.isStreaming() || containsUnboundedPCollection(pipeline); - LOG.info("Translating pipeline to Flink program."); - env.translate(this, pipeline); + pipeline.replaceAll(FlinkTransformOverrides.getDefaultOverrides(options.isStreaming())); - JobExecutionResult result; - try { - LOG.info("Starting execution of Flink program."); - result = env.executePipeline(); - } catch (Exception e) { - LOG.error("Pipeline execution failed", e); - throw new RuntimeException("Pipeline execution failed", e); - } - - if (result instanceof DetachedEnvironment.DetachedJobExecutionResult) { - LOG.info("Pipeline submitted in Detached mode"); - return new FlinkDetachedRunnerResult(); + FlinkPipelineExecutor executor; + if (streaming) { + executor = new FlinkStreamingPipelineExecutor(); } else { - LOG.info("Execution finished in {} msecs", result.getNetRuntime()); - Map accumulators = result.getAllAccumulatorResults(); - if (accumulators != null && !accumulators.isEmpty()) { - LOG.info("Final accumulator values:"); - for (Map.Entry entry : result.getAllAccumulatorResults().entrySet()) { - LOG.info("{} : {}", entry.getKey(), entry.getValue()); - } - } + executor = new FlinkBatchPipelineExecutor(); + } - return new FlinkRunnerResult(accumulators, result.getNetRuntime()); + try { + return executor.executePipeline(this, pipeline, options); + } catch (Exception e) { + throw new RuntimeException("Pipeline execution failed.", e); } } @@ -223,4 +209,20 @@ public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { ptransformViewNamesWithNonDeterministicKeyCoders); } } + + private boolean containsUnboundedPCollection(Pipeline p) { + class BoundednessVisitor extends Pipeline.PipelineVisitor.Defaults { + PCollection.IsBounded boundedness = PCollection.IsBounded.BOUNDED; + + @Override + public void visitValue(PValue value, TransformHierarchy.Node producer) { + if (value instanceof PCollection) { + boundedness = boundedness.and(((PCollection) value).isBounded()); + } + } + } + BoundednessVisitor visitor = new BoundednessVisitor(); + p.traverseTopologically(visitor); + return visitor.boundedness == PCollection.IsBounded.UNBOUNDED; + } } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResultUtil.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResultUtil.java new file mode 100644 index 000000000000..0615bcea1f68 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResultUtil.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import java.util.Map; +import org.apache.beam.sdk.PipelineResult; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.client.program.DetachedEnvironment; +import org.slf4j.Logger; + +/** + * Static utility methods for wrapping a Flink job result as a + * {@link org.apache.beam.sdk.PipelineResult}. + */ +public class FlinkRunnerResultUtil { + + static PipelineResult wrapFlinkRunnerResult(Logger log, JobExecutionResult jobResult) { + if (jobResult instanceof DetachedEnvironment.DetachedJobExecutionResult) { + log.info("Pipeline submitted in Detached mode"); + return new FlinkDetachedRunnerResult(); + } else { + log.info("Execution finished in {} msecs", jobResult.getNetRuntime()); + Map accumulators = jobResult.getAllAccumulatorResults(); + if (accumulators != null && !accumulators.isEmpty()) { + log.info("Final accumulator values:"); + for (Map.Entry entry : jobResult.getAllAccumulatorResults().entrySet()) { + log.info("{} : {}", entry.getKey(), entry.getValue()); + } + } + + return new FlinkRunnerResult(accumulators, jobResult.getNetRuntime()); + } + } +} diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineExecutor.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineExecutor.java new file mode 100644 index 000000000000..c031366aee9a --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineExecutor.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import java.util.List; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link FlinkPipelineExecutor} that executes a {@link Pipeline} using a Flink + * {@link StreamExecutionEnvironment}. + */ +class FlinkStreamingPipelineExecutor implements FlinkPipelineExecutor { + + private static final Logger LOG = + LoggerFactory.getLogger(FlinkStreamingPipelineExecutor.class); + + @Override + public PipelineResult executePipeline( + FlinkRunner runner, Pipeline pipeline, FlinkPipelineOptions options) throws Exception { + + StreamExecutionEnvironment env = createStreamExecutionEnvironment(options); + + FlinkStreamingPipelineTranslator translator = + new FlinkStreamingPipelineTranslator(runner, env, options); + translator.translate(pipeline); + + return runPipeline(options, env); + } + + private StreamExecutionEnvironment createStreamExecutionEnvironment( + FlinkPipelineOptions options) { + + String masterUrl = options.getFlinkMaster(); + StreamExecutionEnvironment flinkStreamEnv = null; + + // depending on the master, create the right environment. + if (masterUrl.equals("[local]")) { + flinkStreamEnv = StreamExecutionEnvironment.createLocalEnvironment(); + } else if (masterUrl.equals("[auto]")) { + flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + } else if (masterUrl.matches(".*:\\d*")) { + String[] parts = masterUrl.split(":"); + List stagingFiles = options.getFilesToStage(); + flinkStreamEnv = StreamExecutionEnvironment.createRemoteEnvironment(parts[0], + Integer.parseInt(parts[1]), stagingFiles.toArray(new String[stagingFiles.size()])); + } else { + LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl); + flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + } + + // set the correct parallelism. + if (options.getParallelism() != -1) { + flinkStreamEnv.setParallelism(options.getParallelism()); + } + + // set parallelism in the options (required by some execution code) + options.setParallelism(flinkStreamEnv.getParallelism()); + + if (options.getObjectReuse()) { + flinkStreamEnv.getConfig().enableObjectReuse(); + } else { + flinkStreamEnv.getConfig().disableObjectReuse(); + } + + // default to event time + flinkStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + // for the following 2 parameters, a value of -1 means that Flink will use + // the default values as specified in the configuration. + int numRetries = options.getNumberOfExecutionRetries(); + if (numRetries != -1) { + flinkStreamEnv.setNumberOfExecutionRetries(numRetries); + } + long retryDelay = options.getExecutionRetryDelay(); + if (retryDelay != -1) { + flinkStreamEnv.getConfig().setExecutionRetryDelay(retryDelay); + } + + // A value of -1 corresponds to disabled checkpointing (see CheckpointConfig in Flink). + // If the value is not -1, then the validity checks are applied. + // By default, checkpointing is disabled. + long checkpointInterval = options.getCheckpointingInterval(); + if (checkpointInterval != -1) { + if (checkpointInterval < 1) { + throw new IllegalArgumentException("The checkpoint interval must be positive"); + } + flinkStreamEnv.enableCheckpointing(checkpointInterval, options.getCheckpointingMode()); + flinkStreamEnv.getCheckpointConfig().setCheckpointTimeout( + options.getCheckpointTimeoutMillis()); + boolean externalizedCheckpoint = options.isExternalizedCheckpointsEnabled(); + boolean retainOnCancellation = options.getRetainExternalizedCheckpointsOnCancellation(); + if (externalizedCheckpoint) { + flinkStreamEnv.getCheckpointConfig().enableExternalizedCheckpoints( + retainOnCancellation ? ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION + : ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION); + } + } + + // State backend + final AbstractStateBackend stateBackend = options.getStateBackend(); + if (stateBackend != null) { + flinkStreamEnv.setStateBackend(stateBackend); + } + + return flinkStreamEnv; + } + + /** + * This will use blocking submission so the job-control features of {@link PipelineResult} don't + * work. + */ + private static PipelineResult runPipeline( + FlinkPipelineOptions options, StreamExecutionEnvironment env) throws Exception { + + JobExecutionResult jobResult = env.execute(options.getJobName()); + + return FlinkRunnerResultUtil.wrapFlinkRunnerResult(LOG, jobResult); + } +} diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java deleted file mode 100644 index 3acc3eafca13..000000000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.runners.TransformHierarchy; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PValue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Traverses the Pipeline to determine the {@link TranslationMode} for this pipeline. - */ -class PipelineTranslationOptimizer extends FlinkPipelineTranslator { - - private static final Logger LOG = LoggerFactory.getLogger(PipelineTranslationOptimizer.class); - - private TranslationMode translationMode; - - private final FlinkPipelineOptions options; - - public PipelineTranslationOptimizer(TranslationMode defaultMode, FlinkPipelineOptions options) { - this.translationMode = defaultMode; - this.options = options; - } - - public TranslationMode getTranslationMode() { - - // override user-specified translation mode - if (options.isStreaming()) { - return TranslationMode.STREAMING; - } - - return translationMode; - } - - @Override - public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { - return CompositeBehavior.ENTER_TRANSFORM; - } - - @Override - public void leaveCompositeTransform(TransformHierarchy.Node node) {} - - @Override - public void visitPrimitiveTransform(TransformHierarchy.Node node) { - Class transformClass = node.getTransform().getClass(); - if (transformClass == Read.Unbounded.class) { - LOG.info("Found {}. Switching to streaming execution.", transformClass); - translationMode = TranslationMode.STREAMING; - } - } - - @Override - public void visitValue(PValue value, TransformHierarchy.Node producer) {} -} From 7890d77e4a90ab7773e8fd52d8c4abe31eec8c21 Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Tue, 8 Aug 2017 10:12:40 +0200 Subject: [PATCH 2/2] fixup! address comments --- .../org/apache/beam/runners/flink/FlinkPipelineExecutor.java | 1 - .../org/apache/beam/runners/flink/FlinkRunnerResultUtil.java | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutor.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutor.java index 9dab014d21da..593605e41523 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutor.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutor.java @@ -17,7 +17,6 @@ */ package org.apache.beam.runners.flink; - import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResultUtil.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResultUtil.java index 0615bcea1f68..f46a0ec1be3e 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResultUtil.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResultUtil.java @@ -27,7 +27,7 @@ * Static utility methods for wrapping a Flink job result as a * {@link org.apache.beam.sdk.PipelineResult}. */ -public class FlinkRunnerResultUtil { +class FlinkRunnerResultUtil { static PipelineResult wrapFlinkRunnerResult(Logger log, JobExecutionResult jobResult) { if (jobResult instanceof DetachedEnvironment.DetachedJobExecutionResult) { @@ -38,7 +38,7 @@ static PipelineResult wrapFlinkRunnerResult(Logger log, JobExecutionResult jobRe Map accumulators = jobResult.getAllAccumulatorResults(); if (accumulators != null && !accumulators.isEmpty()) { log.info("Final accumulator values:"); - for (Map.Entry entry : jobResult.getAllAccumulatorResults().entrySet()) { + for (Map.Entry entry : accumulators.entrySet()) { log.info("{} : {}", entry.getKey(), entry.getValue()); } }