New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-12869] Add yarn acls capability to flink containers #8760
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit f0cc6b6 (Wed Dec 04 14:57:15 UTC 2019) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your contribution to Apache Flink @ashangit! Sorry that the review took so long.
Currently, the change is untested. Do you think it makes sense to add a new test to YARNSessionFIFOSecuredITCase
?
@ashangit Do you still want to work on this? |
@GJL will look at how I can add some test this week or the next |
@ashangit Alright, thanks for following up. |
Thanks for updating the PR. I'll have another look |
@flinkbot run travis |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have left some minor comments. Please let me know what you think.
for (int nmId = 0; nmId < NUM_NODEMANAGERS; nmId++) { | ||
NodeManager nm = yarnCluster.getNodeManager(nmId); | ||
nm.getNMContext().getContainers().forEach((k, v) -> { | ||
containers.put(k, v.getLaunchContext().getApplicationACLs()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this function should go into YARNSessionFIFOSecuredITCase
. It's not likely that it will be used elsewhere.
Moreover, you are using a functional style (forEach
) with side effects. However, I think when programming in a functional style, one should just return the result. For example:
private static Map<ContainerId, Map<ApplicationAccessType, String>> getRunningContainersAcls() {
return nodeManagersStream()
.flatMap(toContainersStream())
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> getApplicationACLs(entry.getValue())));
}
private static Stream<NodeManager> nodeManagersStream() {
return IntStream
.range(0, NUM_NODEMANAGERS)
.mapToObj(i -> yarnCluster.getNodeManager(i));
}
private static Function<NodeManager, Stream<Map.Entry<ContainerId, Container>>> toContainersStream() {
return nodeManager -> nodeManager.getNMContext().getContainers().entrySet().stream();
}
private static Map<ApplicationAccessType, String> getApplicationACLs(final Container container) {
return container.getLaunchContext().getApplicationACLs();
}
@@ -263,6 +265,13 @@ private static LocalResource registerLocalResource(FileSystem fs, Path remoteRsr | |||
return localResource; | |||
} | |||
|
|||
public static void setAclsFor(ContainerLaunchContext amContainer, org.apache.flink.configuration.Configuration flinkConfig) { | |||
amContainer.setApplicationACLs(new HashMap<ApplicationAccessType, String>(){{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because double brace initialization has some caveats, I wouldn't use it.
@@ -263,6 +265,13 @@ private static LocalResource registerLocalResource(FileSystem fs, Path remoteRsr | |||
return localResource; | |||
} | |||
|
|||
public static void setAclsFor(ContainerLaunchContext amContainer, org.apache.flink.configuration.Configuration flinkConfig) { | |||
amContainer.setApplicationACLs(new HashMap<ApplicationAccessType, String>(){{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would feel more comfortable if we only set ApplicationAccessType.VIEW_APP
and ApplicationAccessType.MODIFY_APP
, if the user actually configured application ACLs. Also, can we make the default values of the new config options null
? What do you think?
/** | ||
* Users and groups to give MODIFY access. | ||
*/ | ||
public static final ConfigOption<String> APPLICATION_ADMIN_ACLS = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking about renaming this to APPLICATION_MODIFY_ACLS
so that it is aligned with Hadoop. What do you think?
@@ -578,6 +579,17 @@ public static int getRunningContainers() { | |||
return count; | |||
} | |||
|
|||
public static HashMap<ContainerId, Map<ApplicationAccessType, String>> getRunningContainersAcls() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to declare Map<...>
as return type.
@ashangit Can I get feedback on the comments I have left? I can finish the remaining work, if you do not have time to work on it. |
Closing due to inactivity. Feel free to reopen if you find time to work on this. As written above, I can also finalize it. |
What is the purpose of the change
Provide yarn application acls mechanism on flink containers to be able to provide specific rights to other users than the one running the job (view logs through the resourcemanager/job history, kill the application)
Brief change log
Verifying this change
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: noDocumentation