Skip to content

Conversation

@StefanRRichter
Copy link
Contributor

This PR limits the maximum parallelism for non-parallel operator to 1.

Furthermore, this improves the default behaviour if the user did not explicitly specify a maximum parallelism. In particular, maximum parallelism can now be derived from savepoints, allowing users that migrate from Flink 1.1 to Flink 1.2 to keep their job unchanged.

@StefanRRichter
Copy link
Contributor Author

cc @uce

Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really good changes @StefanRRichter.

I was wondering whether we could simplify the max parallelism logic in ExecutionJobVertex a little bit. For example, we could have only the field maxParallelism and boolean maxParallelismConfigured which we initialize in the constructor. If the passed parameter equals VALUE_NOT_SET, then we use KeyGroupRangeAssignment.computeDefaultMaxParallelism and set maxParallelismConfigured to false. If not, then we set it to true. Now we only allow changes to maxParallelism if maxParallelismConfigured == false. I think this could simplify the logic a little bit (especially in getMaxParallelism). What do you think?

Apart from that +1 for merging.

this.tasks = tasks;
this.latest = latest;
this.taskStates = taskStates;
this.allowNonRestoredState = allowNonRestoredState;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Precondition checks could be helpful here.

this.allowNonRestoredState = allowNonRestoredState;
}

public boolean assignStates() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method seems a bit lengthy. Maybe we could split it up.

JobVertex jobVertex,
int defaultParallelism,
Time timeout) throws JobException, IOException {
Time timeout) throws JobException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method declaration parameters which are broken into multiple lines are usually indented twice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, but I kept the indentation to avoid formatting changes.


Preconditions.checkArgument(maxParallelism > 0
&& maxParallelism <= KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM,
"Overriding max parallelism is not in valid bounds: " + maxParallelism);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could add the valid bounds here.


List<List<ExecutionEdge>> consumers = partition.getConsumers();

if(consumers.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whitespace missing between if and (

StreamEdge outEdge = outEdgesInOrder.get(i);



Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One line break would probably be enough here.

/**
* Returns the effective max parallelism. This value is determined in the following order of priority:
* <p>
* (maxParallelismConfigured) overrides (maxParallelismOverride) override (max(128, roundUp(parallelism)) / default)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maxParallelismOverride => maxParallelismDerived?

@StefanRRichter
Copy link
Contributor Author

Thanks for the review, @tillrohrmann! I followed all of your suggestions, except for the indentation formatting.

@StefanRRichter
Copy link
Contributor Author

Rebased.

@tillrohrmann
Copy link
Contributor

Changes look good. Travis passed. Merging this PR. Thanks for your work @StefanRRichter :-)

tillrohrmann pushed a commit to tillrohrmann/flink that referenced this pull request Jan 24, 2017
[FLINK-5473] Better default behaviours for unspecified maximum parallelism

This closes apache#3182.
tillrohrmann pushed a commit to tillrohrmann/flink that referenced this pull request Jan 24, 2017
[FLINK-5473] Better default behaviours for unspecified maximum parallelism

This closes apache#3182.
@rmetzger
Copy link
Contributor

I've merged the PR to the release-1.2 branch.

tillrohrmann pushed a commit to tillrohrmann/flink that referenced this pull request Jan 24, 2017
[FLINK-5473] Better default behaviours for unspecified maximum parallelism

This closes apache#3182.
@asfgit asfgit closed this in acfeeaf Jan 24, 2017
joseprupi pushed a commit to joseprupi/flink that referenced this pull request Feb 12, 2017
[FLINK-5473] Better default behaviours for unspecified maximum parallelism

This closes apache#3182.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants