-
Notifications
You must be signed in to change notification settings - Fork 13.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-6188] Correctly handle PARALLELISM_DEFAULT in stream operator #3616
Conversation
This is not completely done yet. It seems I still have to change when max parallelism is instantiated for this to work. |
dc2ced9
to
c950f0c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @aljoscha ! This fixes an important bug. The changes look good, as far as I can tell.
I had some suggestions about the tests mainly and some comments on the validation code in the StreamTransformation
.
@@ -50,6 +52,83 @@ | |||
@SuppressWarnings("serial") | |||
public class StreamingJobGraphGeneratorTest extends TestLogger { | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest to add some tests (the same code as the existing ones) with invalid configurations and the expected exception.
@@ -202,7 +203,17 @@ public int getParallelism() { | |||
* @param parallelism The new parallelism to set on this {@code StreamTransformation} | |||
*/ | |||
public void setParallelism(int parallelism) { | |||
Preconditions.checkArgument(parallelism > 0, "Parallelism must be bigger than zero."); | |||
checkArgument(parallelism != ExecutionConfig.PARALLELISM_UNKNOWN, "Cannot specify UNKNOWN_PARALLELISM."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PARALLELISM_UNKNOWN
does not seem to be used anywhere in the codebase, apart from checking against it. Couldn't we remove it? Or am I missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PARALLELISM_UNKNOWN
was removed in FLINK-3980. Not sure why it was added back unless this was unintentional.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@greghogan I see. so I suppose we could remove it. This will simplify the checks here a bit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that in fact it was added here by mistake: ec975aa
checkArgument( | ||
maxParallelism >= parallelism, | ||
"The maximum parallelism must be larger than the parallelism. (parallelism = " + | ||
parallelism + " max-parallelism = " + maxParallelism); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing closing ")" at the end of the error message.
checkArgument( | ||
parallelism != ExecutionConfig.PARALLELISM_DEFAULT, | ||
"A maximum parallelism can only be specified with an explicitly specified " + | ||
"parallelism."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This means that we always have to specify a parallelism
before being able to specify a maxParallelism
. This seems a bit counter-intuitive to me, as the only constraint seems to be that parallelism <= maxParalleliem
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it is but the problem is that the treatment of the "default parallelism" is a bit strange. (The default parallelism is the parallelism that is set from the flink config or by the user on the command line using the -p
parameter. Maybe we have to rework that before we can fix this.
I'm starting to think that we should maybe revert the parallelism/max-parallelism changes on the release-1.2 branch and rework the whole thing properly.
SingleOutputStreamOperator.setParallelism() and StreamTransform.setParallelism() did not correctly handle the case of setting the parallelism to PARALLELISM_DEFAULT.
The tests verify that we instantiate the correct operator and that they correctly pick up the parallelism form the upstream operator.
This was inadvertently re-added in FLINK-4380.
c950f0c
to
bf4e268
Compare
@greghogan @kl0u I addressed your comments, PTAL. |
No description provided.