Skip to content

Commit

Permalink
Merge pull request #9945: [BEAM-8396] Use LOOPBACK in FlinkRunner if …
Browse files Browse the repository at this point in the history
…no flink_master is supplied
  • Loading branch information
mxm committed Oct 31, 2019
2 parents b325bd7 + 978a2ed commit b10a8db
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 0 deletions.
4 changes: 4 additions & 0 deletions sdks/python/apache_beam/options/pipeline_options.py
Expand Up @@ -854,6 +854,10 @@ def _add_argparse_args(cls, parser):
'--environment_cache_millis', default=0,
help=('Duration in milliseconds for environment cache within a job. '
'0 means no caching.'))
parser.add_argument(
'--output_executable_path', default=None,
help=('Create an executable jar at this path rather than running '
'the pipeline.'))


class TestOptions(PipelineOptions):
Expand Down
8 changes: 8 additions & 0 deletions sdks/python/apache_beam/runners/portability/flink_runner.py
Expand Up @@ -34,6 +34,14 @@


class FlinkRunner(portable_runner.PortableRunner):
def run_pipeline(self, pipeline, options):
portable_options = options.view_as(pipeline_options.PortableOptions)
if (options.view_as(FlinkRunnerOptions).flink_master in MAGIC_HOST_NAMES
and not portable_options.environment_type
and not portable_options.output_executable_path):
portable_options.environment_type = 'LOOPBACK'
return super(FlinkRunner, self).run_pipeline(pipeline, options)

def default_job_server(self, options):
flink_master = self.add_http_scheme(
options.view_as(FlinkRunnerOptions).flink_master)
Expand Down

0 comments on commit b10a8db

Please sign in to comment.