New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-5520] Flink pipeline option to run SDK harness per subtask. #6524
[BEAM-5520] Flink pipeline option to run SDK harness per subtask. #6524
Conversation
R: @angoenka |
Added server option to control default behavior. When the pipeline option value is
|
@mxm this will also take care of https://issues.apache.org/jira/browse/BEAM-5516 |
a1000c3
to
9147e0e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. This will be useful for experimenting with portable pipelines.
} | ||
|
||
@Option(name = "--sdk-worker-parallelism", usage = "Parallelism of SDK worker processes") | ||
private String sdkWorkerParallelism = "[pipeline]"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please document what the setting configures and what the default value does.
@@ -31,6 +31,8 @@ | |||
public interface FlinkPipelineOptions | |||
extends PipelineOptions, ApplicationNameOptions, StreamingOptions { | |||
|
|||
String AUTO = "[auto]"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also add the other options here? Or make this an enum?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did not check if/how enum is supported, but this constant is now only used for the master URL. I think it might be good to get rid of the [auto]
value for that option as well and just use null
instead. As separate change though.
+ "'[pipeline]' (single SDK harness process per pipeline and task manager JVM) or " | ||
+ "'[stage]' (separate SDK harness for every executable stage.)") | ||
@Default.String(AUTO) | ||
String getSdkWorkerParallelism(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this change is great but I wonder whether we should expose this as a general PortablePipelineOptions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, done!
static Factory factory() { | ||
return FlinkDefaultExecutableStageContext.ReferenceCountingFactory.REFERENCE_COUNTING; | ||
static Factory factory(FlinkPipelineOptions options) { | ||
if ("[stage]".equals(options.getSdkWorkerParallelism())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps define this constant in the Options?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -141,8 +144,17 @@ public void mapPartition( | |||
|
|||
@Override | |||
public void close() throws Exception { | |||
try (AutoCloseable bundleFactoryCloser = stageBundleFactory) {} | |||
try (AutoCloseable closable = stageContext) {} | |||
// close may be called multiple times when an exception is thrown |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we synchronize close then? The calls could be interleaved.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not called concurrently, it is just being called again when the first call throws an exception.
@@ -171,7 +171,9 @@ public StageBundleFactory forStage(ExecutableStage executableStage) { | |||
|
|||
@Override | |||
public void close() throws Exception { | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whitespace.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
0b54f21
to
3a5d772
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
3a5d772
to
1fdc2f9
Compare
Flink pipeline option to run SDK harness per subtask.
Follow this checklist to help us incorporate your contribution quickly and easily:
[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.It will help us expedite review of your Pull Request if you tag someone (e.g.
@username
) to look at it.Post-Commit Tests Status (on master branch)