-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-5791] [runtime] [FLIP-6] Slots should be strictly matched according to resource profile when allocating for yarn mode #3304
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Hi @shuai-xu, thanks for your contribution :-) I'm currently working on a new implementation of the slot manager. So we might have to rebase this PR then. But the changes are not too big. I try to review your PR in the next week. |
|
@tillrohrmann ,OK, thank you |
tillrohrmann
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your contribution @shuai-xu. The PR goes into the right direction but I think it needs some more work before we can merge it.
I think the code does not correctly instantiates the slots with the correct resource profiles on the TaskExecutor. It should be made sure that it still works even if we haven't specified a ResourceProfile for the slots. Furthermore, I think the YarnResourceManager has some deficiencies how it maps containers to resource profiles. This should be fixed. And we should add tests which make sure that the code actually does what it is supposed to do.
|
|
||
| private ContainerLaunchContext createTaskExecutorLaunchContext(Resource resource, String containerId, String host) | ||
| private ContainerLaunchContext createTaskExecutorLaunchContext(Resource resource, | ||
| String containerId, String host, Priority priority) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If breaking parameter lists, then every parameter should be in a separate line
|
|
||
| for (int i = 0; i < taskManagerServicesConfiguration.getNumberOfSlots(); i++) { | ||
| for (int i = resourceProfiles.size(); i < taskManagerServicesConfiguration.getNumberOfSlots(); i++) { | ||
| resourceProfiles.add(new ResourceProfile(1.0, 42L)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you still add the default resource profiles if you already got them from the taskManagerServicesConfiguration?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should make sure that Flink still works also if you don't have a resource profile specified. Maybe introducing a resource profile which matches all other resource profiles or by trying to derive it somehow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now only yarn resource manager will add resource profiles in the config, for standalone, there are none. so the resourceProfiles.size() will be zero, still need to add a ResourceProfile here. I think adding one can match all other resource profile is a good idea, I will introduce it.
|
|
||
| private void addResourceProfile(ResourceProfile resourceProfile) { | ||
| this.resourceProfiles.add(resourceProfile); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
imo the configuration object should be immutable. That way one cannot introduce side effects after the object has been created.
| return entry.getKey(); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that is not the right way to query a map. If you have to traverse the whole map to find a value, then there is something wrong with the way you arrange your data.
Could you explain me what the resourcePriorities actually does? To me it looks as if you use this to assign some kind of id to a yarn container which you then use to find out the mapping between the allocated container and the resource profile. That does not seem right at all. Priorities should tell the system how important it is to allocate a container.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a confusing part in yarn. Now the priority is only used to distinguish requests of different resource. It has nothing to do with importance. For slot request of different resources, we should generate different priorities for them, and put priority and resource profile to the resourcePriorities map. When yarn containers come back, we know they should start the task manager with which resource profile by query the map with the container priority. And when task manager offer slot with the resource profile to job master, we can exactly match the slot with the original request.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Querying a map does seem a little trick, but the map is very small, for most flink job, it may only has several elements, as it matters only with the number of operators with different resources.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be changed. Why not allowing the YarnResourceManger to accept also containers whose resource are larger than stated in the request? It should be ok for a slot request to get a slot assigned whose resources exceed those of the request.
| } | ||
| } | ||
| } | ||
| throw new Exception("There is no related resource profile for the priority : " + priority); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Throwing an exception here is a little bit heavy. Better to return null instead.
| /** Context information used to start a TaskExecutor Java process */ | ||
| ContainerLaunchContext taskExecutorLaunchContext = | ||
| createTaskExecutorLaunchContext(container.getResource(), container.getId().toString(), container.getNodeId().getHost()); | ||
| createTaskExecutorLaunchContext(container.getResource(), container.getId().toString(), container.getNodeId().getHost(), container.getPriority()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The resource profile for the allocated container should be retrievable from container itself. This more robust because we don't know whether Yarn could actually fulfil our request.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is a problem. But if we use the resource profile retrieved from container, it may be different from the original request, they can not be matched. And only when the resource requested is too small, yarn will return a container bigger than requested to avoid resource fragmentation, or else it will return what we requested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but this should not be a problem. I think the resource manager should be able to deal with this kind of scenario. For example, if the allocated container offers more resources than requested, it could in the future also be used to fulfill other slot request which require more resources.
| "service shutdown timeout must be greater or equal to 0."); | ||
| this.timerServiceShutdownTimeout = timerServiceShutdownTimeout; | ||
|
|
||
| this.resourceProfiles = new LinkedList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LinkedLists are slow. Please use an ArrayList. Even better to pass the value for resourceProfiles to the constructor.
|
|
||
| /** | ||
| * The config parameter defining the task manager resource profile. | ||
| */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure whether we should expose this configuration parameter. The user should never have to do something with it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it is a internal config. I though flink put all config consts here and use @PublicEvolving to identify the ones can be used by users. Do you have any suggestion for where to put the internal configs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ConfigConstants is annotated being Public. Thus, all elements which don't have an annotation, will inherit that annotation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please put it into TaskManagerServicesConfiguration.
| rpOutput.writeObject(getResourceProfile(priority.getPriority())); | ||
| rpOutput.close(); | ||
| taskManagerConfig.setString(ConfigConstants.TASK_MANAGER_RESOURCE_PROFILE_KEY, | ||
| new String(Base64.encodeBase64(output.toByteArray()))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we shouldn't base64 encode the resource profile into the taskManagerConfig. Instead use InstantiationUtil.writeObjectToConfig to write serialized data to the configuration.
But I'm a little bit torn apart here because so far we used to transfer this kind of information via the environment variables. Maybe @rmetzger can chime in to say what the most idiomatic way to transfer TM data would be.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have to admit that I don't know the answer here. The documentation doesn't mention anything regarding preferences over the different options: https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/WritingYarnApplications.html
I think we should use the
flink-conf.yamlfor setting configuration parameters in Flink, but not for "internal data". Some users read the flink-conf.yaml files on the container for debugging purposes.- Environment variables for small bits such as the application id (this is beneficial for getting the application name in the log4j2 logging configuration when using centralized logging)
- a separate file for larger serialized objects.
Ideally we don't need the third option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank @rmetzger for the explanation. I think maybe we can use flink-conf.yaml temporally and add a separate config file later.
|
hi @tillrohrmann , thank for you careful review. I modify this pr according to your comments. Adding universal resource profile if no resource profile are passed to task executor. And for resource manger, the priority is now only used to distinguish containers of different resource, this is also how the priority is used in yarn.. |
|
The failed case pass in my local work copy, I think maybe it is due to the environment of travis. |
tillrohrmann
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code is still basically untested. You should make sure that the resource profiles are properly transmitted via the configuration, that the right number of slots with the respective resource profiles are instantiated and that the right slots are returned by the SMRFResourceManager. This is all not happening.
Furthermore, I think your priority hack should be changed. It should be possible to let the ResourceManager also handle containers whose resources exceed the requested resource profile.
|
|
||
| @After | ||
| public void teardown() { | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this method is empty, then it's better to not define it all.
|
|
||
| /** | ||
| * The config parameter defining the task manager resource profile. | ||
| */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please put it into TaskManagerServicesConfiguration.
| ResourceProfile that = (ResourceProfile) obj; | ||
| if (this == ResourceProfile.UNIVERSAL || that == ResourceProfile.UNIVERSAL) { | ||
| return true; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here you're basically breaking the Java contract that if a.equals(b), then a.hashCode() == b.hashCode()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The contract of the universal slot should not be reflected via the equals method of an Object. This is highly dangerous.
This pr is for jira #5791.
It has following changes: