diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 22d15865c95e6..3bba614df988c 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -854,6 +854,10 @@ def _add_argparse_args(cls, parser): '--environment_cache_millis', default=0, help=('Duration in milliseconds for environment cache within a job. ' '0 means no caching.')) + parser.add_argument( + '--output_executable_path', default=None, + help=('Create an executable jar at this path rather than running ' + 'the pipeline.')) class TestOptions(PipelineOptions): diff --git a/sdks/python/apache_beam/runners/portability/flink_runner.py b/sdks/python/apache_beam/runners/portability/flink_runner.py index 9426447e874a5..b8765bdb9f4da 100644 --- a/sdks/python/apache_beam/runners/portability/flink_runner.py +++ b/sdks/python/apache_beam/runners/portability/flink_runner.py @@ -34,6 +34,14 @@ class FlinkRunner(portable_runner.PortableRunner): + def run_pipeline(self, pipeline, options): + portable_options = options.view_as(pipeline_options.PortableOptions) + if (options.view_as(FlinkRunnerOptions).flink_master in MAGIC_HOST_NAMES + and not portable_options.environment_type + and not portable_options.output_executable_path): + portable_options.environment_type = 'LOOPBACK' + return super(FlinkRunner, self).run_pipeline(pipeline, options) + def default_job_server(self, options): flink_master = self.add_http_scheme( options.view_as(FlinkRunnerOptions).flink_master)