New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-5442] Add FlinkOptions to Python SDK #6447
Conversation
7946467
to
747c9c5
Compare
This change would unblock somewhat (we also need the parallelism for testing). But rather than limiting it to the 2 Flink parameters, why not just pass all options that the user specified to the runner? @robertwb can you take a look as well? |
As I wrote in the JIRA, there are some pros to doing an upfront parsing/validation of the options. I think we can parse/validate a list of known options and still pass on the other ones. |
747c9c5
to
c45aac4
Compare
Python PostCommit is broken in master at the moment. |
@@ -662,6 +662,23 @@ def _add_argparse_args(cls, parser): | |||
'in the pipeline when running using the Fn API.')) | |||
|
|||
|
|||
class FlinkOptions(PipelineOptions): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this should go somewhere under runners.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was wondering about this too but I saw that we store all the PipelineOptions here, e.g. GCE, DirectRunner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I guess it doesn't make it any worse. (The way we do things here has always felt slightly unpythonic to me, but cleaning this up is low on the priority list...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm open to cleaning it up. Could move this into a FlinkOptions file in the runners/flink
package, which doesn't exist yet.
@@ -104,6 +104,13 @@ def tearDown(self): | |||
'display_data': [ | |||
DisplayDataItemMatcher('mock_multi_option', ['op1', 'op2'])] | |||
}, | |||
{'flags': ['--flink_master=testmaster:8081', '--parallelism=42'], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It feels odd to put flink-specific tests here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as I see this also tests other Runner Options here, e.g. Dataflow. So it seemed fine to me.
'should be executed. Can either be of the form ' | ||
'\'host:port\' or one of the special values ' | ||
'[local], [collection], or [auto].')) | ||
parser.add_argument('--parallelism', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be --flink_parallelism?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps, flink-master should just be master? Or does this potentially clash with the global namespace?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Flags are inherently a global namespace.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem here is that we want to stay consistent with FlinkPipelineOptions
, so I'd keep it as it is to not introduce a source of confusion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh... yes, that's a valid reason.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@mxm I did see your comment on the JIRA and think that's fine. I just don't see the change here that passes through unknown options. My concern is that we have to open a new pull request for any other option that we may need while we can handle that generically (with a warning perhaps). |
@tweise The issue is not resolved with this PR. My intention was to do this change first to get rid of any blockers for trying out the PortableRunner with Flink. Additionally, we need to make a change to pass on the "unknown" options to the runner. |
parser.add_argument('--flink_master', | ||
type=str, | ||
help= | ||
('Addres of the Flink master where the Pipeline ' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addres => Address
Pipeline => pipeline
837ae7f
to
ecc6ab8
Compare
This adds two common FlinkOptions to the Python SDK which are otherwise filtered out when using the PortableRunner. It makes sense to validate known options and we should eventually add all options in FlinkPipelineOptions. In a follow-up we should ensure that unknown parameters are also forwarded to the Runner which can then decide what to do with them.
ecc6ab8
to
50528d0
Compare
Merging after I ran |
This adds FlinkOptions to the Python SDK to be able to pass any Flink related options which are otherwise filtered out when using the PortableRunner.
CC @tweise
Post-Commit Tests Status (on master branch)