Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-3020][streaming] set number of task slots to maxmium parallelism in local execution #1360

Closed
wants to merge 3 commits into from

Conversation

mxm
Copy link
Contributor

@mxm mxm commented Nov 16, 2015

No description provided.

@mxm mxm changed the title [FLINK-3020][streaming] set local default parallelism to maxmium parallelism [FLINK-3020][streaming] set number of task slots to maxmium parallelism in local execution Nov 16, 2015
@mxm mxm force-pushed the FLINK-3020 branch 3 times, most recently from 7e7462f to 1cc92ba Compare November 17, 2015 14:21
@tillrohrmann
Copy link
Contributor

I think the maximum parallelism of the job should be taken if the user has not specified a different parallelism than the default one (-1). I think that's also how the batch part does it (here the LocalExecutor has a field taskManagerNumSlots which can be set). Besides that, looks good to merge.

@mxm
Copy link
Contributor Author

mxm commented Nov 19, 2015

True. That's how it is handled on the batch side. Not sure about this behavior though. If a user sets a default parallelism but uses operators with parallelism > defaultParallelism this would fail, right? The rational behind this is probably to maximize the parallelism for all operators and not have operators with exceptional high parallelism.

@tillrohrmann
Copy link
Contributor

Yes, in such a case it would fail. Thinking about it, you're right that it would give a better user experience if the maximum degree of the job is taken instead of the default parallelism. Maybe we should change it then for the batch part as well.

@mxm
Copy link
Contributor Author

mxm commented Nov 19, 2015

I would also be in favor of changing the local execution to always use the maximum specified parallelism as the number of task slots. IMHO the current behavior is not intuitive. The default parallelism currently acts as a maximum parallelism in local execution.

@StephanEwen
Copy link
Contributor

+1 for taking the max parallelism of all operators

configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, getParallelism());

int parallelism = getParallelism() == defaultLocalParallelism ?
defaultLocalParallelism : jobGraph.getMaximumParallelism();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This currently means that in the default case, the slots do not respect the max parallel operator, correct?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defaultLocalParallelism seems to be the number of available processors. I think this is very unintuitive that the default parallelism is only taken if it equals the number of processors.

This means in case of an operator with a higher dop than the #cores, the program will only fail if the user sets the default parallelism to #cores.

@mxm
Copy link
Contributor Author

mxm commented Nov 19, 2015

Alright. I will push the original pull request version again which uses the max parallelism of all operators. Further, I will open a separate JIRA for the batch side change.

@mxm
Copy link
Contributor Author

mxm commented Nov 19, 2015

Just checked. The batch side always uses the maximum parallelism as the number of task slots (if they are not set explicitly). Till and me actually thought differently. So the proposed changes in this PR align with the batch behavior.

@tillrohrmann
Copy link
Contributor

But this is only true if the taskManagerNumSlots in LocalExecutor are left untouched. Currently, this is the case but this is not enforced.

@mxm
Copy link
Contributor Author

mxm commented Nov 19, 2015

Exactly. But that makes sense, right? If the users explicitly sets the number of task slots, we shouldn't change the number of task slots automatically.

@tillrohrmann
Copy link
Contributor

Yes I think so. I guess it's due to the difference that batch programs are executed using a PlanExecutor whereas streaming programs are directly executed by the StreamExecutionEnvironment. Consequently, there is no direct way of specifying the number of task slots for the used LocalFlinkMiniCluster other than via the parallelism of the job or the default parallelism.

@mxm
Copy link
Contributor Author

mxm commented Nov 20, 2015

I think we agree that we want to set the number of task slots to the maximum parallelism instead of the default one. I'll merge this later on.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants