Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-8396] Restore Flink runner LOOPBACK default. #9945

Merged
merged 1 commit into from Oct 31, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions sdks/python/apache_beam/options/pipeline_options.py
Expand Up @@ -854,6 +854,10 @@ def _add_argparse_args(cls, parser):
'--environment_cache_millis', default=0,
help=('Duration in milliseconds for environment cache within a job. '
'0 means no caching.'))
parser.add_argument(
'--output_executable_path', default=None,
help=('Create an executable jar at this path rather than running '
'the pipeline.'))
mxm marked this conversation as resolved.
Show resolved Hide resolved


class TestOptions(PipelineOptions):
Expand Down
8 changes: 8 additions & 0 deletions sdks/python/apache_beam/runners/portability/flink_runner.py
Expand Up @@ -34,6 +34,14 @@


class FlinkRunner(portable_runner.PortableRunner):
def run_pipeline(self, pipeline, options):
portable_options = options.view_as(pipeline_options.PortableOptions)
if (options.view_as(FlinkRunnerOptions).flink_master in MAGIC_HOST_NAMES
and not portable_options.environment_type
and not portable_options.output_executable_path):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@robertwb What about the case where we have a pre-configured Flink job server with a custom master url? This will then force loopback execution although the user probably does not want that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems fair to assume that a pre-configured job server (with a flink_master address) is not possible when using the FlinkRunner.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. The way to use a pre-configured job server (Flink or other) is to use PortableRunner. This FlinkRunner class is all about automatically doing the setup for you, if needed.

(And, yeah, the old version had a bug...)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Now we just need to backport this: #9952

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we conclude the discussion on the ML before merging changes?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK this change was proposed by you and Robert. So it seemed fair to go ahead with it.

portable_options.environment_type = 'LOOPBACK'
return super(FlinkRunner, self).run_pipeline(pipeline, options)

def default_job_server(self, options):
flink_master = self.add_http_scheme(
options.view_as(FlinkRunnerOptions).flink_master)
Expand Down